diff options
author | rtkay123 <dev@kanjala.com> | 2025-08-16 14:13:14 +0200 |
---|---|---|
committer | rtkay123 <dev@kanjala.com> | 2025-08-16 14:13:14 +0200 |
commit | 70a148dd86be9c8ccc56e1fce232262475aa3158 (patch) | |
tree | 51214ab651215ef94a8554039cc953042b043cf6 /crates/aggregator/src/processor | |
parent | bf4a2b8b0a04f0cb682db84a835fe7c57d8526bc (diff) | |
download | warden-70a148dd86be9c8ccc56e1fce232262475aa3158.tar.bz2 warden-70a148dd86be9c8ccc56e1fce232262475aa3158.zip |
feat(aggregator): write evaluation
Diffstat (limited to 'crates/aggregator/src/processor')
-rw-r--r-- | crates/aggregator/src/processor/aggregate.rs | 167 |
1 files changed, 167 insertions, 0 deletions
diff --git a/crates/aggregator/src/processor/aggregate.rs b/crates/aggregator/src/processor/aggregate.rs new file mode 100644 index 0000000..d9d8482 --- /dev/null +++ b/crates/aggregator/src/processor/aggregate.rs @@ -0,0 +1,167 @@ +use async_nats::jetstream::Message; +use opentelemetry::global; +use opentelemetry_semantic_conventions::attribute; +use tracing::{Instrument, Span, debug, error, info, info_span, instrument, trace}; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use uuid::Uuid; +use warden_core::{ + configuration::routing::RoutingConfiguration, + message::{AggregationResult, Payload, TypologyResult, payload::Transaction}, +}; +use warden_stack::{redis::AsyncCommands, tracing::telemetry::nats::extractor}; + +use crate::state::AppHandle; + +#[instrument(skip(message, state), err(Debug))] +pub async fn handle(message: Message, state: AppHandle) -> anyhow::Result<()> { + let span = Span::current(); + + if let Some(ref headers) = message.headers { + let context = global::get_text_map_propagator(|propagator| { + propagator.extract(&extractor::HeaderMap(headers)) + }); + span.set_parent(context); + }; + + let mut payload: Payload = prost::Message::decode(message.payload.as_ref())?; + + if let (Some(ref typology_result), Some(Transaction::Pacs002(document)), Some(routing)) = ( + payload.typology_result.take(), + &payload.transaction, + &payload.routing, + ) { + let cache_key = format!("tadp_{}_tp", document.f_i_to_f_i_pmt_sts_rpt.grp_hdr.msg_id); + let (typology_results, review) = + handle_typologies(typology_result, &state, &cache_key, routing).await?; + + if typology_results + .len() + .ne(&routing.messages[0].typologies.len()) + { + trace!("insufficient typology results for this typology. waiting for more"); + return Ok(()); + } + + let aggs = AggregationResult { + id: routing.messages[0].id.to_owned(), + version: routing.messages[0].version.to_owned(), + typology_results, + review, + }; + + payload.aggregation_result = Some(aggs); + let _ = payload.rule_result.take(); + + let id = Uuid::now_v7(); + debug!(%id, "inserting evaluation result"); + + let span = info_span!("create.evaluations.evaluation"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "insert"); + span.set_attribute(attribute::DB_COLLECTION_NAME, "transaction"); + span.set_attribute("otel.kind", "client"); + + sqlx::query!( + "insert into evaluation (id, document) values ($1, $2)", + id, + sqlx::types::Json(&payload) as _ + ) + .execute(&state.services.postgres) + .instrument(span) + .await?; + info!(%id, "evaluation added"); + + let mut cache = state.services.cache.get().await?; + let span = Span::current(); + span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey"); + span.set_attribute(attribute::DB_OPERATION_NAME, "del"); + span.set_attribute(attribute::DB_OPERATION_PARAMETER, cache_key.to_string()); + span.set_attribute("otel.kind", "client"); + debug!("cache cleared"); + + cache.del::<_, ()>(&cache_key).await?; + } else { + error!("payload has insufficient data"); + } + + let span = info_span!("nats.ack"); + message + .ack() + .instrument(span) + .await + .map_err(|_| anyhow::anyhow!("ack error"))?; + + Ok(()) +} + +async fn handle_typologies( + payload: &TypologyResult, + state: &AppHandle, + cache_key: &str, + routing: &RoutingConfiguration, +) -> anyhow::Result<(Vec<TypologyResult>, bool)> { + let mut cache = state.services.cache.get().await?; + let bytes = prost::Message::encode_to_vec(payload); + + let span = Span::current(); + span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey"); + span.set_attribute(attribute::DB_OPERATION_NAME, "sadd+scard"); + span.set_attribute(attribute::DB_OPERATION_PARAMETER, cache_key.to_string()); + span.set_attribute("otel.kind", "client"); + + debug!("saving typology result"); + let res = warden_stack::redis::pipe() + .sadd::<_, _>(cache_key, bytes) + .ignore() + .scard(cache_key) + .query_async::<Vec<usize>>(&mut cache) + .instrument(span) + .await?; + + let typology_count = res + .first() + .ok_or_else(|| anyhow::anyhow!("smembers did not return anything"))?; + + let typologies = &routing.messages[0].typologies; + + if typology_count.lt(&typologies.len()) { + return Ok((vec![], false)); + } + + debug!("getting all typology results"); + let span = Span::current(); + span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey"); + span.set_attribute(attribute::DB_OPERATION_NAME, "smembers"); + span.set_attribute(attribute::DB_OPERATION_PARAMETER, cache_key.to_string()); + span.set_attribute("otel.kind", "client"); + let res = cache + .smembers::<_, Vec<Vec<Vec<u8>>>>(cache_key) + .instrument(span) + .await?; + + let members = res + .first() + .ok_or_else(|| anyhow::anyhow!("smembers did not return anything"))?; + + let typologies: Result<Vec<TypologyResult>, _> = members + .iter() + .map(|value| { + <TypologyResult as prost::Message>::decode(value.as_ref()).map_err(anyhow::Error::new) + }) + .collect(); + + let typologies = typologies?; + + let mut review = false; + for typology in routing.messages[0].typologies.iter() { + if let Some(value) = typologies + .iter() + .find(|value| value.id.eq(&typology.id) && value.version.eq(&typology.version)) + && value.review + { + review = true; + } + } + + Ok((typologies, review)) +} |