diff options
Diffstat (limited to 'crates/rule-executor/src/processor.rs')
-rw-r--r-- | crates/rule-executor/src/processor.rs | 32 |
1 files changed, 20 insertions, 12 deletions
diff --git a/crates/rule-executor/src/processor.rs b/crates/rule-executor/src/processor.rs index 67d0d15..1efe731 100644 --- a/crates/rule-executor/src/processor.rs +++ b/crates/rule-executor/src/processor.rs @@ -9,9 +9,9 @@ use async_nats::jetstream::{ Context, consumer::{Consumer, pull}, }; -use futures_util::{future, StreamExt}; +use futures_util::{StreamExt, future}; use tokio::signal; -use tracing::trace; +use tracing::{error, trace, warn}; use warden_stack::{Configuration, tracing::SdkTracerProvider}; use crate::{ @@ -35,7 +35,6 @@ pub async fn serve( } async fn run(state: AppHandle) -> anyhow::Result<()> { - let config = Arc::clone(&state); let consumer = get_or_create_stream(&state.services.jetstream, &state.config.nats).await?; let limit = None; @@ -45,13 +44,18 @@ async fn run(state: AppHandle) -> anyhow::Result<()> { .await? .for_each_concurrent(limit, |message| { let state = Arc::clone(&state); - // tokio::spawn(async move { - // if let Ok(message) = message - // && let Err(e) = route::route(message, Arc::clone(&state)).await - // { - // error!("{}", e.to_string()); - // } - // }); + tokio::spawn(async move { + match message { + Ok(message) => { + if let Err(e) = rule::process_rule(message, state).await { + error!("{e:?}"); + } + } + Err(e) => { + warn!("{e:?}"); + } + } + }); future::ready(()) }) .await; @@ -63,10 +67,14 @@ async fn get_or_create_stream( jetstream: &Context, nats: &Nats, ) -> anyhow::Result<Consumer<pull::Config>> { - trace!(name = ?nats.name, "getting or creating stream"); + let name = format!("{}.v{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION")) + .replace(".", "") + .replace("_", ""); + trace!(name = name, subjects = ?nats.subjects, "getting or creating stream"); + let stream = jetstream .get_or_create_stream(async_nats::jetstream::stream::Config { - name: format!("{}.v{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION")), + name, subjects: nats.subjects.iter().map(Into::into).collect(), ..Default::default() }) |