aboutsummaryrefslogtreecommitdiffstats
path: root/crates/rule-executor/src/processor/rule.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/rule-executor/src/processor/rule.rs')
-rw-r--r--crates/rule-executor/src/processor/rule.rs64
1 files changed, 59 insertions, 5 deletions
diff --git a/crates/rule-executor/src/processor/rule.rs b/crates/rule-executor/src/processor/rule.rs
index 8c23e0f..3a54424 100644
--- a/crates/rule-executor/src/processor/rule.rs
+++ b/crates/rule-executor/src/processor/rule.rs
@@ -3,15 +3,68 @@ use std::sync::Arc;
use anyhow::Result;
mod configuration;
-use async_nats::jetstream::Message;
-use warden_core::configuration::rule::RuleConfigurationRequest;
+use async_nats::jetstream;
+use opentelemetry::global;
+use tracing::{Span, error, instrument, warn};
+use tracing_opentelemetry::OpenTelemetrySpanExt;
+use warden_core::{configuration::rule::RuleConfigurationRequest, message::Payload};
+use warden_stack::tracing::telemetry::nats;
use crate::state::AppHandle;
-pub async fn process_rule(message: Message, state: AppHandle) -> Result<()> {
+#[instrument(
+ skip(message, state),
+ err(Debug),
+ fields(msg_id, rule_id, rule_version)
+)]
+pub async fn process_rule(message: jetstream::Message, state: AppHandle) -> Result<()> {
+ let span = Span::current();
+
+ if let Some(ref headers) = message.headers {
+ let context = global::get_text_map_propagator(|propagator| {
+ propagator.extract(&nats::extractor::HeaderMap(headers))
+ });
+ span.set_parent(context);
+ };
+
+ let payload: Payload = prost::Message::decode(message.payload.as_ref())?;
+
+ if payload.transaction.is_none() {
+ warn!("transaction is empty - proceeding with ack");
+ let _ = message
+ .ack()
+ .await
+ .inspect_err(|e| warn!("ack failed: {e:?}"));
+ return Ok(());
+ }
+
+ let transaction = payload
+ .transaction
+ .as_ref()
+ .expect("none to have been handled");
+
+ let msg_id = match transaction {
+ warden_core::message::payload::Transaction::Pacs008(pacs008_document) => {
+ &pacs008_document.f_i_to_f_i_cstmr_cdt_trf.grp_hdr.msg_id
+ }
+ warden_core::message::payload::Transaction::Pacs002(pacs002_document) => {
+ &pacs002_document.f_i_to_f_i_pmt_sts_rpt.grp_hdr.msg_id
+ }
+ };
+ span.record("msg_id", msg_id);
+
let req = create_configuration_request(message.subject.as_str());
- let rule_configuration = configuration::get_configuration(req, Arc::clone(&state)).await?;
+ span.record("rule_id", &req.id);
+ span.record("rule_version", &req.version);
+
+ let _rule_configuration = configuration::get_configuration(req, Arc::clone(&state))
+ .await
+ .unwrap();
+
+ if let Err(e) = message.ack().await {
+ error!("ack error {e:?}");
+ };
Ok(())
}
@@ -19,6 +72,7 @@ pub async fn process_rule(message: Message, state: AppHandle) -> Result<()> {
fn create_configuration_request(subject: &str) -> RuleConfigurationRequest {
// rule.901.v1.0.0
let mut tokens = subject.split("rule.");
+ dbg!(&tokens);
// rule.
tokens.next();
// 901.v1.0.0
@@ -30,7 +84,7 @@ fn create_configuration_request(subject: &str) -> RuleConfigurationRequest {
RuleConfigurationRequest {
id: rule_id.to_owned(),
- version: version.to_owned()
+ version: version.to_owned(),
}
}