diff options
Diffstat (limited to 'crates/rule-executor/src/processor/rule.rs')
-rw-r--r-- | crates/rule-executor/src/processor/rule.rs | 23 |
1 files changed, 19 insertions, 4 deletions
diff --git a/crates/rule-executor/src/processor/rule.rs b/crates/rule-executor/src/processor/rule.rs index 3a54424..6eaf25c 100644 --- a/crates/rule-executor/src/processor/rule.rs +++ b/crates/rule-executor/src/processor/rule.rs @@ -2,15 +2,17 @@ use std::sync::Arc; use anyhow::Result; mod configuration; +mod determine_outcome; +mod rule_901; use async_nats::jetstream; use opentelemetry::global; -use tracing::{Span, error, instrument, warn}; +use tracing::{Span, debug, error, instrument, warn}; use tracing_opentelemetry::OpenTelemetrySpanExt; use warden_core::{configuration::rule::RuleConfigurationRequest, message::Payload}; use warden_stack::tracing::telemetry::nats; -use crate::state::AppHandle; +use crate::{processor::publish, state::AppHandle}; #[instrument( skip(message, state), @@ -27,7 +29,7 @@ pub async fn process_rule(message: jetstream::Message, state: AppHandle) -> Resu span.set_parent(context); }; - let payload: Payload = prost::Message::decode(message.payload.as_ref())?; + let mut payload: Payload = prost::Message::decode(message.payload.as_ref())?; if payload.transaction.is_none() { warn!("transaction is empty - proceeding with ack"); @@ -58,10 +60,23 @@ pub async fn process_rule(message: jetstream::Message, state: AppHandle) -> Resu span.record("rule_id", &req.id); span.record("rule_version", &req.version); - let _rule_configuration = configuration::get_configuration(req, Arc::clone(&state)) + let config = configuration::get_configuration(req, Arc::clone(&state)) .await .unwrap(); + match rule_901::process_901(&config, &payload, state.clone()).await { + Ok(res) => { + debug!(outcome = ?res.reason, "rule executed"); + payload.rule_result = Some(res); + publish::to_typologies(&config.id, state, payload) + .await + .inspect_err(|e| error!("{e}"))?; + } + Err(e) => { + error!("{e}"); + } + }; + if let Err(e) = message.ack().await { error!("ack error {e:?}"); }; |