From 4a82b6db8a1278588b97b874bad468ec6f7cda6c Mon Sep 17 00:00:00 2001 From: rtkay123 Date: Sat, 16 Aug 2025 00:17:07 +0200 Subject: fix(exec): cache lock --- crates/rule-executor/src/processor/rule.rs | 64 ++++++++++++++++++++-- .../src/processor/rule/configuration.rs | 10 +++- 2 files changed, 66 insertions(+), 8 deletions(-) (limited to 'crates/rule-executor/src/processor') diff --git a/crates/rule-executor/src/processor/rule.rs b/crates/rule-executor/src/processor/rule.rs index 8c23e0f..3a54424 100644 --- a/crates/rule-executor/src/processor/rule.rs +++ b/crates/rule-executor/src/processor/rule.rs @@ -3,15 +3,68 @@ use std::sync::Arc; use anyhow::Result; mod configuration; -use async_nats::jetstream::Message; -use warden_core::configuration::rule::RuleConfigurationRequest; +use async_nats::jetstream; +use opentelemetry::global; +use tracing::{Span, error, instrument, warn}; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use warden_core::{configuration::rule::RuleConfigurationRequest, message::Payload}; +use warden_stack::tracing::telemetry::nats; use crate::state::AppHandle; -pub async fn process_rule(message: Message, state: AppHandle) -> Result<()> { +#[instrument( + skip(message, state), + err(Debug), + fields(msg_id, rule_id, rule_version) +)] +pub async fn process_rule(message: jetstream::Message, state: AppHandle) -> Result<()> { + let span = Span::current(); + + if let Some(ref headers) = message.headers { + let context = global::get_text_map_propagator(|propagator| { + propagator.extract(&nats::extractor::HeaderMap(headers)) + }); + span.set_parent(context); + }; + + let payload: Payload = prost::Message::decode(message.payload.as_ref())?; + + if payload.transaction.is_none() { + warn!("transaction is empty - proceeding with ack"); + let _ = message + .ack() + .await + .inspect_err(|e| warn!("ack failed: {e:?}")); + return Ok(()); + } + + let transaction = payload + .transaction + .as_ref() + .expect("none to have been handled"); + + let msg_id = match transaction { + warden_core::message::payload::Transaction::Pacs008(pacs008_document) => { + &pacs008_document.f_i_to_f_i_cstmr_cdt_trf.grp_hdr.msg_id + } + warden_core::message::payload::Transaction::Pacs002(pacs002_document) => { + &pacs002_document.f_i_to_f_i_pmt_sts_rpt.grp_hdr.msg_id + } + }; + span.record("msg_id", msg_id); + let req = create_configuration_request(message.subject.as_str()); - let rule_configuration = configuration::get_configuration(req, Arc::clone(&state)).await?; + span.record("rule_id", &req.id); + span.record("rule_version", &req.version); + + let _rule_configuration = configuration::get_configuration(req, Arc::clone(&state)) + .await + .unwrap(); + + if let Err(e) = message.ack().await { + error!("ack error {e:?}"); + }; Ok(()) } @@ -19,6 +72,7 @@ pub async fn process_rule(message: Message, state: AppHandle) -> Result<()> { fn create_configuration_request(subject: &str) -> RuleConfigurationRequest { // rule.901.v1.0.0 let mut tokens = subject.split("rule."); + dbg!(&tokens); // rule. tokens.next(); // 901.v1.0.0 @@ -30,7 +84,7 @@ fn create_configuration_request(subject: &str) -> RuleConfigurationRequest { RuleConfigurationRequest { id: rule_id.to_owned(), - version: version.to_owned() + version: version.to_owned(), } } diff --git a/crates/rule-executor/src/processor/rule/configuration.rs b/crates/rule-executor/src/processor/rule/configuration.rs index 6e11248..5f384aa 100644 --- a/crates/rule-executor/src/processor/rule/configuration.rs +++ b/crates/rule-executor/src/processor/rule/configuration.rs @@ -10,8 +10,10 @@ pub(super) async fn get_configuration( state: AppHandle, ) -> Result { trace!("checking cache for rule configuration"); - let cache = state.local_cache.read().await; - let config = cache.get(&request).await; + let config = { + let cache = state.local_cache.read().await; + cache.get(&request).await + }; if let Some(config) = config { trace!("cache hit"); return Ok(config); @@ -35,8 +37,10 @@ pub(super) async fn get_configuration( .configuration .ok_or_else(|| anyhow!("missing configuration"))?; - let mut cache = state.local_cache.write().await; + println!("inserting"); + let cache = state.local_cache.write().await; cache.insert(request, config.clone()).await; + println!("inserted"); Ok(config) } -- cgit v1.2.3