aboutsummaryrefslogtreecommitdiffstats
path: root/crates
diff options
context:
space:
mode:
authorrtkay123 <dev@kanjala.com>2025-08-16 00:17:07 +0200
committerrtkay123 <dev@kanjala.com>2025-08-16 00:17:19 +0200
commit4a82b6db8a1278588b97b874bad468ec6f7cda6c (patch)
treeec1ac9e3725a6db9d3cc952f44b4d2dca4e491f4 /crates
parent4e31b25854e015e089c0abd4e5c61ee3de4bfc8a (diff)
downloadwarden-4a82b6db8a1278588b97b874bad468ec6f7cda6c.tar.bz2
warden-4a82b6db8a1278588b97b874bad468ec6f7cda6c.zip
fix(exec): cache lock
Diffstat (limited to 'crates')
-rw-r--r--crates/rule-executor/src/processor/rule.rs64
-rw-r--r--crates/rule-executor/src/processor/rule/configuration.rs10
2 files changed, 66 insertions, 8 deletions
diff --git a/crates/rule-executor/src/processor/rule.rs b/crates/rule-executor/src/processor/rule.rs
index 8c23e0f..3a54424 100644
--- a/crates/rule-executor/src/processor/rule.rs
+++ b/crates/rule-executor/src/processor/rule.rs
@@ -3,15 +3,68 @@ use std::sync::Arc;
use anyhow::Result;
mod configuration;
-use async_nats::jetstream::Message;
-use warden_core::configuration::rule::RuleConfigurationRequest;
+use async_nats::jetstream;
+use opentelemetry::global;
+use tracing::{Span, error, instrument, warn};
+use tracing_opentelemetry::OpenTelemetrySpanExt;
+use warden_core::{configuration::rule::RuleConfigurationRequest, message::Payload};
+use warden_stack::tracing::telemetry::nats;
use crate::state::AppHandle;
-pub async fn process_rule(message: Message, state: AppHandle) -> Result<()> {
+#[instrument(
+ skip(message, state),
+ err(Debug),
+ fields(msg_id, rule_id, rule_version)
+)]
+pub async fn process_rule(message: jetstream::Message, state: AppHandle) -> Result<()> {
+ let span = Span::current();
+
+ if let Some(ref headers) = message.headers {
+ let context = global::get_text_map_propagator(|propagator| {
+ propagator.extract(&nats::extractor::HeaderMap(headers))
+ });
+ span.set_parent(context);
+ };
+
+ let payload: Payload = prost::Message::decode(message.payload.as_ref())?;
+
+ if payload.transaction.is_none() {
+ warn!("transaction is empty - proceeding with ack");
+ let _ = message
+ .ack()
+ .await
+ .inspect_err(|e| warn!("ack failed: {e:?}"));
+ return Ok(());
+ }
+
+ let transaction = payload
+ .transaction
+ .as_ref()
+ .expect("none to have been handled");
+
+ let msg_id = match transaction {
+ warden_core::message::payload::Transaction::Pacs008(pacs008_document) => {
+ &pacs008_document.f_i_to_f_i_cstmr_cdt_trf.grp_hdr.msg_id
+ }
+ warden_core::message::payload::Transaction::Pacs002(pacs002_document) => {
+ &pacs002_document.f_i_to_f_i_pmt_sts_rpt.grp_hdr.msg_id
+ }
+ };
+ span.record("msg_id", msg_id);
+
let req = create_configuration_request(message.subject.as_str());
- let rule_configuration = configuration::get_configuration(req, Arc::clone(&state)).await?;
+ span.record("rule_id", &req.id);
+ span.record("rule_version", &req.version);
+
+ let _rule_configuration = configuration::get_configuration(req, Arc::clone(&state))
+ .await
+ .unwrap();
+
+ if let Err(e) = message.ack().await {
+ error!("ack error {e:?}");
+ };
Ok(())
}
@@ -19,6 +72,7 @@ pub async fn process_rule(message: Message, state: AppHandle) -> Result<()> {
fn create_configuration_request(subject: &str) -> RuleConfigurationRequest {
// rule.901.v1.0.0
let mut tokens = subject.split("rule.");
+ dbg!(&tokens);
// rule.
tokens.next();
// 901.v1.0.0
@@ -30,7 +84,7 @@ fn create_configuration_request(subject: &str) -> RuleConfigurationRequest {
RuleConfigurationRequest {
id: rule_id.to_owned(),
- version: version.to_owned()
+ version: version.to_owned(),
}
}
diff --git a/crates/rule-executor/src/processor/rule/configuration.rs b/crates/rule-executor/src/processor/rule/configuration.rs
index 6e11248..5f384aa 100644
--- a/crates/rule-executor/src/processor/rule/configuration.rs
+++ b/crates/rule-executor/src/processor/rule/configuration.rs
@@ -10,8 +10,10 @@ pub(super) async fn get_configuration(
state: AppHandle,
) -> Result<RuleConfiguration> {
trace!("checking cache for rule configuration");
- let cache = state.local_cache.read().await;
- let config = cache.get(&request).await;
+ let config = {
+ let cache = state.local_cache.read().await;
+ cache.get(&request).await
+ };
if let Some(config) = config {
trace!("cache hit");
return Ok(config);
@@ -35,8 +37,10 @@ pub(super) async fn get_configuration(
.configuration
.ok_or_else(|| anyhow!("missing configuration"))?;
- let mut cache = state.local_cache.write().await;
+ println!("inserting");
+ let cache = state.local_cache.write().await;
cache.insert(request, config.clone()).await;
+ println!("inserted");
Ok(config)
}