diff options
Diffstat (limited to 'crates/aggregator/src/processor.rs')
-rw-r--r-- | crates/aggregator/src/processor.rs | 105 |
1 files changed, 105 insertions, 0 deletions
diff --git a/crates/aggregator/src/processor.rs b/crates/aggregator/src/processor.rs new file mode 100644 index 0000000..3a7c8ac --- /dev/null +++ b/crates/aggregator/src/processor.rs @@ -0,0 +1,105 @@ +mod aggregate; + +use anyhow::Result; +use async_nats::{ + self, + jetstream::{ + Context, + consumer::{Consumer, pull::Config}, + }, +}; +use futures_util::{StreamExt as _, future}; +use tokio::signal; +use tracing::{debug, error, info}; +use warden_stack::tracing::SdkTracerProvider; + +use crate::{cnfg::NatsConfig, state::AppHandle}; + +pub async fn serve(state: AppHandle, provider: SdkTracerProvider) -> Result<()> { + tokio::select! { + _ = run(state) => {} + _ = shutdown_signal(provider) => {} + }; + Ok(()) +} + +async fn run(state: AppHandle) -> anyhow::Result<()> { + let consumer = get_or_create_stream(&state.services.jetstream, &state.config.nats).await?; + + let limit = None; + + consumer + .messages() + .await? + .for_each_concurrent(limit, |message| { + let state = state.clone(); + tokio::spawn(async move { + if let Ok(message) = message + && let Err(e) = aggregate::handle(message, state).await + { + error!("{}", e.to_string()); + } + }); + future::ready(()) + }) + .await; + + Ok(()) +} + +async fn get_or_create_stream( + jetstream: &Context, + nats: &NatsConfig, +) -> anyhow::Result<Consumer<Config>> { + debug!(name = ?nats.name, subjects = ?nats.subjects, "getting or creating stream"); + let stream = jetstream + .get_or_create_stream(async_nats::jetstream::stream::Config { + name: nats.name.to_string(), + subjects: nats.subjects.iter().map(|v| v.to_string()).collect(), + ..Default::default() + }) + .await?; + let durable = nats.durable_name.to_string(); + // Get or create a pull-based consumer + let consumer = stream + .get_or_create_consumer( + durable.as_ref(), + async_nats::jetstream::consumer::pull::Config { + durable_name: Some(durable.to_string()), + ..Default::default() + }, + ) + .await?; + + info!(subject = ?nats.subjects, "ready to receive messages"); + Ok(consumer) +} + +async fn shutdown_signal(provider: SdkTracerProvider) -> Result<()> { + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + }; + + #[cfg(unix)] + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("failed to install signal handler") + .recv() + .await; + }; + + #[cfg(not(unix))] + let terminate = std::future::pending::<()>(); + + tokio::select! { + _ = ctrl_c => { + }, + _ = terminate => { + }, + } + let _ = provider.shutdown(); + + Ok(()) +} |