1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
|
use std::sync::Arc;
use anyhow::Result;
mod configuration;
use async_nats::jetstream;
use opentelemetry::global;
use tracing::{Span, error, instrument, warn};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use warden_core::{configuration::rule::RuleConfigurationRequest, message::Payload};
use warden_stack::tracing::telemetry::nats;
use crate::state::AppHandle;
#[instrument(
skip(message, state),
err(Debug),
fields(msg_id, rule_id, rule_version)
)]
pub async fn process_rule(message: jetstream::Message, state: AppHandle) -> Result<()> {
let span = Span::current();
if let Some(ref headers) = message.headers {
let context = global::get_text_map_propagator(|propagator| {
propagator.extract(&nats::extractor::HeaderMap(headers))
});
span.set_parent(context);
};
let payload: Payload = prost::Message::decode(message.payload.as_ref())?;
if payload.transaction.is_none() {
warn!("transaction is empty - proceeding with ack");
let _ = message
.ack()
.await
.inspect_err(|e| warn!("ack failed: {e:?}"));
return Ok(());
}
let transaction = payload
.transaction
.as_ref()
.expect("none to have been handled");
let msg_id = match transaction {
warden_core::message::payload::Transaction::Pacs008(pacs008_document) => {
&pacs008_document.f_i_to_f_i_cstmr_cdt_trf.grp_hdr.msg_id
}
warden_core::message::payload::Transaction::Pacs002(pacs002_document) => {
&pacs002_document.f_i_to_f_i_pmt_sts_rpt.grp_hdr.msg_id
}
};
span.record("msg_id", msg_id);
let req = create_configuration_request(message.subject.as_str());
span.record("rule_id", &req.id);
span.record("rule_version", &req.version);
let _rule_configuration = configuration::get_configuration(req, Arc::clone(&state))
.await
.unwrap();
if let Err(e) = message.ack().await {
error!("ack error {e:?}");
};
Ok(())
}
fn create_configuration_request(subject: &str) -> RuleConfigurationRequest {
// rule.901.v1.0.0
let mut tokens = subject.split("rule.");
dbg!(&tokens);
// rule.
tokens.next();
// 901.v1.0.0
let rem = tokens.next().expect("router guarantees subject");
let mut tokens = rem.split(".v");
let rule_id = tokens.next().expect("router guarantees subject");
let version = tokens.next().expect("router guarantees subject");
RuleConfigurationRequest {
id: rule_id.to_owned(),
version: version.to_owned(),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_valid_subject() {
let subject = "rule.901.v1.0.0";
let req = create_configuration_request(subject);
assert_eq!(req.id, "901");
assert_eq!(req.version, "1.0.0");
}
#[test]
fn test_valid_subject_with_longer_id() {
let subject = "rule.12345.v2.3.4";
let req = create_configuration_request(subject);
assert_eq!(req.id, "12345");
assert_eq!(req.version, "2.3.4");
}
#[test]
#[should_panic(expected = "router guarantees subject")]
fn test_missing_rule_prefix() {
let subject = "901.v1.0.0"; // Missing "rule."
create_configuration_request(subject);
}
#[test]
#[should_panic(expected = "router guarantees subject")]
fn test_missing_version() {
let subject = "rule.901";
create_configuration_request(subject);
}
#[test]
fn test_different_version_format() {
let subject = "rule.abc.v999";
let req = create_configuration_request(subject);
assert_eq!(req.id, "abc");
assert_eq!(req.version, "999");
}
}
|