aboutsummaryrefslogtreecommitdiffstats
path: root/crates/rule-executor/src/processor/rule.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/rule-executor/src/processor/rule.rs')
-rw-r--r--crates/rule-executor/src/processor/rule.rs23
1 files changed, 19 insertions, 4 deletions
diff --git a/crates/rule-executor/src/processor/rule.rs b/crates/rule-executor/src/processor/rule.rs
index 3a54424..6eaf25c 100644
--- a/crates/rule-executor/src/processor/rule.rs
+++ b/crates/rule-executor/src/processor/rule.rs
@@ -2,15 +2,17 @@ use std::sync::Arc;
use anyhow::Result;
mod configuration;
+mod determine_outcome;
+mod rule_901;
use async_nats::jetstream;
use opentelemetry::global;
-use tracing::{Span, error, instrument, warn};
+use tracing::{Span, debug, error, instrument, warn};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use warden_core::{configuration::rule::RuleConfigurationRequest, message::Payload};
use warden_stack::tracing::telemetry::nats;
-use crate::state::AppHandle;
+use crate::{processor::publish, state::AppHandle};
#[instrument(
skip(message, state),
@@ -27,7 +29,7 @@ pub async fn process_rule(message: jetstream::Message, state: AppHandle) -> Resu
span.set_parent(context);
};
- let payload: Payload = prost::Message::decode(message.payload.as_ref())?;
+ let mut payload: Payload = prost::Message::decode(message.payload.as_ref())?;
if payload.transaction.is_none() {
warn!("transaction is empty - proceeding with ack");
@@ -58,10 +60,23 @@ pub async fn process_rule(message: jetstream::Message, state: AppHandle) -> Resu
span.record("rule_id", &req.id);
span.record("rule_version", &req.version);
- let _rule_configuration = configuration::get_configuration(req, Arc::clone(&state))
+ let config = configuration::get_configuration(req, Arc::clone(&state))
.await
.unwrap();
+ match rule_901::process_901(&config, &payload, state.clone()).await {
+ Ok(res) => {
+ debug!(outcome = ?res.reason, "rule executed");
+ payload.rule_result = Some(res);
+ publish::to_typologies(&config.id, state, payload)
+ .await
+ .inspect_err(|e| error!("{e}"))?;
+ }
+ Err(e) => {
+ error!("{e}");
+ }
+ };
+
if let Err(e) = message.ack().await {
error!("ack error {e:?}");
};