aboutsummaryrefslogtreecommitdiffstats
path: root/crates/rule-executor/src/processor.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/rule-executor/src/processor.rs')
-rw-r--r--crates/rule-executor/src/processor.rs32
1 files changed, 20 insertions, 12 deletions
diff --git a/crates/rule-executor/src/processor.rs b/crates/rule-executor/src/processor.rs
index 67d0d15..1efe731 100644
--- a/crates/rule-executor/src/processor.rs
+++ b/crates/rule-executor/src/processor.rs
@@ -9,9 +9,9 @@ use async_nats::jetstream::{
Context,
consumer::{Consumer, pull},
};
-use futures_util::{future, StreamExt};
+use futures_util::{StreamExt, future};
use tokio::signal;
-use tracing::trace;
+use tracing::{error, trace, warn};
use warden_stack::{Configuration, tracing::SdkTracerProvider};
use crate::{
@@ -35,7 +35,6 @@ pub async fn serve(
}
async fn run(state: AppHandle) -> anyhow::Result<()> {
- let config = Arc::clone(&state);
let consumer = get_or_create_stream(&state.services.jetstream, &state.config.nats).await?;
let limit = None;
@@ -45,13 +44,18 @@ async fn run(state: AppHandle) -> anyhow::Result<()> {
.await?
.for_each_concurrent(limit, |message| {
let state = Arc::clone(&state);
- // tokio::spawn(async move {
- // if let Ok(message) = message
- // && let Err(e) = route::route(message, Arc::clone(&state)).await
- // {
- // error!("{}", e.to_string());
- // }
- // });
+ tokio::spawn(async move {
+ match message {
+ Ok(message) => {
+ if let Err(e) = rule::process_rule(message, state).await {
+ error!("{e:?}");
+ }
+ }
+ Err(e) => {
+ warn!("{e:?}");
+ }
+ }
+ });
future::ready(())
})
.await;
@@ -63,10 +67,14 @@ async fn get_or_create_stream(
jetstream: &Context,
nats: &Nats,
) -> anyhow::Result<Consumer<pull::Config>> {
- trace!(name = ?nats.name, "getting or creating stream");
+ let name = format!("{}.v{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"))
+ .replace(".", "")
+ .replace("_", "");
+ trace!(name = name, subjects = ?nats.subjects, "getting or creating stream");
+
let stream = jetstream
.get_or_create_stream(async_nats::jetstream::stream::Config {
- name: format!("{}.v{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION")),
+ name,
subjects: nats.subjects.iter().map(Into::into).collect(),
..Default::default()
})