From 73d7bab8844bb21c7a9143c30800c2d11d411e42 Mon Sep 17 00:00:00 2001 From: rtkay123 Date: Sun, 17 Aug 2025 20:02:49 +0200 Subject: feat: typology processor (#8) --- crates/typologies/src/processor.rs | 124 +++++++++++++++++++++++++++++++++++++ 1 file changed, 124 insertions(+) create mode 100644 crates/typologies/src/processor.rs (limited to 'crates/typologies/src/processor.rs') diff --git a/crates/typologies/src/processor.rs b/crates/typologies/src/processor.rs new file mode 100644 index 0000000..a2a3b17 --- /dev/null +++ b/crates/typologies/src/processor.rs @@ -0,0 +1,124 @@ +mod driver; +mod publish; +mod reload; +mod typology; + +use std::sync::Arc; + +use anyhow::Result; +use async_nats::jetstream::{ + Context, + consumer::{Consumer, pull}, +}; +use futures_util::{StreamExt, future}; +use tokio::signal; +use tracing::{error, trace, warn}; +use warden_stack::{Configuration, tracing::SdkTracerProvider}; + +use crate::{ + cnfg::Nats, + state::{AppHandle, AppState, Services}, +}; + +pub async fn serve( + services: Services, + config: Configuration, + provider: SdkTracerProvider, +) -> anyhow::Result<()> { + let state = Arc::new(AppState::new(services, config).await?); + + tokio::select! { + _ = futures_util::future::try_join(reload::reload(Arc::clone(&state)), run(state)) => {} + _ = shutdown_signal(provider) => {} + }; + + Ok(()) +} + +async fn run(state: AppHandle) -> anyhow::Result<()> { + let consumer = get_or_create_stream(&state.services.jetstream, &state.config.nats).await?; + + let limit = None; + + consumer + .messages() + .await? + .for_each_concurrent(limit, |message| { + let state = Arc::clone(&state); + tokio::spawn(async move { + match message { + Ok(message) => { + if let Err(e) = typology::process_typology(message, state).await { + error!("{e:?}"); + } + } + Err(e) => { + warn!("{e:?}"); + } + } + }); + future::ready(()) + }) + .await; + + Ok(()) +} + +async fn get_or_create_stream( + jetstream: &Context, + nats: &Nats, +) -> anyhow::Result> { + let name = format!("{}.v{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION")) + .replace(".", "") + .replace("_", ""); + trace!(name = name, subjects = ?nats.subjects, "getting or creating stream"); + + let stream = jetstream + .get_or_create_stream(async_nats::jetstream::stream::Config { + name, + subjects: nats.subjects.iter().map(Into::into).collect(), + max_messages: nats.max_messages, + ..Default::default() + }) + .await?; + let durable = nats.durable_name.to_string(); + // Get or create a pull-based consumer + Ok(stream + .get_or_create_consumer( + durable.as_ref(), + async_nats::jetstream::consumer::pull::Config { + durable_name: Some(durable.to_string()), + ..Default::default() + }, + ) + .await?) +} + +async fn shutdown_signal(provider: SdkTracerProvider) -> Result<()> { + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + }; + + #[cfg(unix)] + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("failed to install signal handler") + .recv() + .await; + }; + + #[cfg(not(unix))] + let terminate = std::future::pending::<()>(); + + tokio::select! { + _ = ctrl_c => { + }, + _ = terminate => { + }, + } + let _ = provider.shutdown(); + + Ok(()) +} -- cgit v1.2.3