aboutsummaryrefslogtreecommitdiffstats
path: root/crates/aggregator/src
diff options
context:
space:
mode:
Diffstat (limited to 'crates/aggregator/src')
-rw-r--r--crates/aggregator/src/cnfg.rs17
-rw-r--r--crates/aggregator/src/main.rs84
-rw-r--r--crates/aggregator/src/processor.rs105
-rw-r--r--crates/aggregator/src/processor/aggregate.rs167
-rw-r--r--crates/aggregator/src/state.rs42
5 files changed, 415 insertions, 0 deletions
diff --git a/crates/aggregator/src/cnfg.rs b/crates/aggregator/src/cnfg.rs
new file mode 100644
index 0000000..7c7aaf4
--- /dev/null
+++ b/crates/aggregator/src/cnfg.rs
@@ -0,0 +1,17 @@
+use std::sync::Arc;
+
+use serde::Deserialize;
+
+#[derive(Deserialize, Clone)]
+pub struct LocalConfig {
+ pub nats: NatsConfig,
+}
+
+#[derive(Deserialize, Clone)]
+#[serde(rename_all = "kebab-case")]
+pub struct NatsConfig {
+ #[serde(rename = "stream-name")]
+ pub name: Arc<str>,
+ pub subjects: Arc<[String]>,
+ pub durable_name: Arc<str>,
+}
diff --git a/crates/aggregator/src/main.rs b/crates/aggregator/src/main.rs
new file mode 100644
index 0000000..62af544
--- /dev/null
+++ b/crates/aggregator/src/main.rs
@@ -0,0 +1,84 @@
+mod cnfg;
+mod processor;
+mod state;
+
+use anyhow::Result;
+use clap::Parser;
+use tracing::error;
+use warden_stack::{Configuration, Services, tracing::Tracing};
+
+use crate::state::AppState;
+
+/// warden-aggregator
+#[derive(Parser, Debug)]
+#[command(version, about, long_about = None)]
+struct Args {
+ /// Path to config file
+ #[arg(short, long)]
+ config_file: Option<std::path::PathBuf>,
+}
+
+#[tokio::main]
+async fn main() -> Result<()> {
+ let args = Args::parse();
+ let config = include_str!("../aggregator.toml");
+
+ let mut config = config::Config::builder()
+ .add_source(config::File::from_str(config, config::FileFormat::Toml));
+
+ if let Some(cf) = args.config_file.as_ref().and_then(|v| v.to_str()) {
+ config = config.add_source(config::File::new(cf, config::FileFormat::Toml));
+ };
+
+ let mut config: Configuration = config.build()?.try_deserialize()?;
+ config.application.name = env!("CARGO_CRATE_NAME").into();
+ config.application.version = env!("CARGO_PKG_VERSION").into();
+
+ let tracing = Tracing::builder()
+ .opentelemetry(&config.application, &config.monitoring)?
+ .loki(&config.application, &config.monitoring)?
+ .build(&config.monitoring);
+
+ let provider = tracing.otel_provider;
+
+ tokio::spawn(tracing.loki_task);
+
+ let mut services = Services::builder()
+ .postgres(&config.database)
+ .await
+ .inspect_err(|e| error!("database: {e}"))?
+ .cache(&config.cache)
+ .await
+ .inspect_err(|e| error!("cache: {e}"))?
+ .nats_jetstream(&config.nats)
+ .await
+ .inspect_err(|e| error!("nats: {e}"))?
+ .build();
+
+ let postgres = services
+ .postgres
+ .take()
+ .ok_or_else(|| anyhow::anyhow!("database is not ready"))?;
+
+ let cache = services
+ .cache
+ .take()
+ .ok_or_else(|| anyhow::anyhow!("cache is not ready"))?;
+
+ let jetstream = services
+ .jetstream
+ .take()
+ .ok_or_else(|| anyhow::anyhow!("jetstream is not ready"))?;
+
+ let services = state::Services {
+ postgres,
+ cache,
+ jetstream,
+ };
+
+ let state = AppState::create(services, &config).await?;
+
+ processor::serve(state, provider).await?;
+
+ Ok(())
+}
diff --git a/crates/aggregator/src/processor.rs b/crates/aggregator/src/processor.rs
new file mode 100644
index 0000000..3a7c8ac
--- /dev/null
+++ b/crates/aggregator/src/processor.rs
@@ -0,0 +1,105 @@
+mod aggregate;
+
+use anyhow::Result;
+use async_nats::{
+ self,
+ jetstream::{
+ Context,
+ consumer::{Consumer, pull::Config},
+ },
+};
+use futures_util::{StreamExt as _, future};
+use tokio::signal;
+use tracing::{debug, error, info};
+use warden_stack::tracing::SdkTracerProvider;
+
+use crate::{cnfg::NatsConfig, state::AppHandle};
+
+pub async fn serve(state: AppHandle, provider: SdkTracerProvider) -> Result<()> {
+ tokio::select! {
+ _ = run(state) => {}
+ _ = shutdown_signal(provider) => {}
+ };
+ Ok(())
+}
+
+async fn run(state: AppHandle) -> anyhow::Result<()> {
+ let consumer = get_or_create_stream(&state.services.jetstream, &state.config.nats).await?;
+
+ let limit = None;
+
+ consumer
+ .messages()
+ .await?
+ .for_each_concurrent(limit, |message| {
+ let state = state.clone();
+ tokio::spawn(async move {
+ if let Ok(message) = message
+ && let Err(e) = aggregate::handle(message, state).await
+ {
+ error!("{}", e.to_string());
+ }
+ });
+ future::ready(())
+ })
+ .await;
+
+ Ok(())
+}
+
+async fn get_or_create_stream(
+ jetstream: &Context,
+ nats: &NatsConfig,
+) -> anyhow::Result<Consumer<Config>> {
+ debug!(name = ?nats.name, subjects = ?nats.subjects, "getting or creating stream");
+ let stream = jetstream
+ .get_or_create_stream(async_nats::jetstream::stream::Config {
+ name: nats.name.to_string(),
+ subjects: nats.subjects.iter().map(|v| v.to_string()).collect(),
+ ..Default::default()
+ })
+ .await?;
+ let durable = nats.durable_name.to_string();
+ // Get or create a pull-based consumer
+ let consumer = stream
+ .get_or_create_consumer(
+ durable.as_ref(),
+ async_nats::jetstream::consumer::pull::Config {
+ durable_name: Some(durable.to_string()),
+ ..Default::default()
+ },
+ )
+ .await?;
+
+ info!(subject = ?nats.subjects, "ready to receive messages");
+ Ok(consumer)
+}
+
+async fn shutdown_signal(provider: SdkTracerProvider) -> Result<()> {
+ let ctrl_c = async {
+ signal::ctrl_c()
+ .await
+ .expect("failed to install Ctrl+C handler");
+ };
+
+ #[cfg(unix)]
+ let terminate = async {
+ signal::unix::signal(signal::unix::SignalKind::terminate())
+ .expect("failed to install signal handler")
+ .recv()
+ .await;
+ };
+
+ #[cfg(not(unix))]
+ let terminate = std::future::pending::<()>();
+
+ tokio::select! {
+ _ = ctrl_c => {
+ },
+ _ = terminate => {
+ },
+ }
+ let _ = provider.shutdown();
+
+ Ok(())
+}
diff --git a/crates/aggregator/src/processor/aggregate.rs b/crates/aggregator/src/processor/aggregate.rs
new file mode 100644
index 0000000..d9d8482
--- /dev/null
+++ b/crates/aggregator/src/processor/aggregate.rs
@@ -0,0 +1,167 @@
+use async_nats::jetstream::Message;
+use opentelemetry::global;
+use opentelemetry_semantic_conventions::attribute;
+use tracing::{Instrument, Span, debug, error, info, info_span, instrument, trace};
+use tracing_opentelemetry::OpenTelemetrySpanExt;
+use uuid::Uuid;
+use warden_core::{
+ configuration::routing::RoutingConfiguration,
+ message::{AggregationResult, Payload, TypologyResult, payload::Transaction},
+};
+use warden_stack::{redis::AsyncCommands, tracing::telemetry::nats::extractor};
+
+use crate::state::AppHandle;
+
+#[instrument(skip(message, state), err(Debug))]
+pub async fn handle(message: Message, state: AppHandle) -> anyhow::Result<()> {
+ let span = Span::current();
+
+ if let Some(ref headers) = message.headers {
+ let context = global::get_text_map_propagator(|propagator| {
+ propagator.extract(&extractor::HeaderMap(headers))
+ });
+ span.set_parent(context);
+ };
+
+ let mut payload: Payload = prost::Message::decode(message.payload.as_ref())?;
+
+ if let (Some(ref typology_result), Some(Transaction::Pacs002(document)), Some(routing)) = (
+ payload.typology_result.take(),
+ &payload.transaction,
+ &payload.routing,
+ ) {
+ let cache_key = format!("tadp_{}_tp", document.f_i_to_f_i_pmt_sts_rpt.grp_hdr.msg_id);
+ let (typology_results, review) =
+ handle_typologies(typology_result, &state, &cache_key, routing).await?;
+
+ if typology_results
+ .len()
+ .ne(&routing.messages[0].typologies.len())
+ {
+ trace!("insufficient typology results for this typology. waiting for more");
+ return Ok(());
+ }
+
+ let aggs = AggregationResult {
+ id: routing.messages[0].id.to_owned(),
+ version: routing.messages[0].version.to_owned(),
+ typology_results,
+ review,
+ };
+
+ payload.aggregation_result = Some(aggs);
+ let _ = payload.rule_result.take();
+
+ let id = Uuid::now_v7();
+ debug!(%id, "inserting evaluation result");
+
+ let span = info_span!("create.evaluations.evaluation");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "insert");
+ span.set_attribute(attribute::DB_COLLECTION_NAME, "transaction");
+ span.set_attribute("otel.kind", "client");
+
+ sqlx::query!(
+ "insert into evaluation (id, document) values ($1, $2)",
+ id,
+ sqlx::types::Json(&payload) as _
+ )
+ .execute(&state.services.postgres)
+ .instrument(span)
+ .await?;
+ info!(%id, "evaluation added");
+
+ let mut cache = state.services.cache.get().await?;
+ let span = Span::current();
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "del");
+ span.set_attribute(attribute::DB_OPERATION_PARAMETER, cache_key.to_string());
+ span.set_attribute("otel.kind", "client");
+ debug!("cache cleared");
+
+ cache.del::<_, ()>(&cache_key).await?;
+ } else {
+ error!("payload has insufficient data");
+ }
+
+ let span = info_span!("nats.ack");
+ message
+ .ack()
+ .instrument(span)
+ .await
+ .map_err(|_| anyhow::anyhow!("ack error"))?;
+
+ Ok(())
+}
+
+async fn handle_typologies(
+ payload: &TypologyResult,
+ state: &AppHandle,
+ cache_key: &str,
+ routing: &RoutingConfiguration,
+) -> anyhow::Result<(Vec<TypologyResult>, bool)> {
+ let mut cache = state.services.cache.get().await?;
+ let bytes = prost::Message::encode_to_vec(payload);
+
+ let span = Span::current();
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "sadd+scard");
+ span.set_attribute(attribute::DB_OPERATION_PARAMETER, cache_key.to_string());
+ span.set_attribute("otel.kind", "client");
+
+ debug!("saving typology result");
+ let res = warden_stack::redis::pipe()
+ .sadd::<_, _>(cache_key, bytes)
+ .ignore()
+ .scard(cache_key)
+ .query_async::<Vec<usize>>(&mut cache)
+ .instrument(span)
+ .await?;
+
+ let typology_count = res
+ .first()
+ .ok_or_else(|| anyhow::anyhow!("smembers did not return anything"))?;
+
+ let typologies = &routing.messages[0].typologies;
+
+ if typology_count.lt(&typologies.len()) {
+ return Ok((vec![], false));
+ }
+
+ debug!("getting all typology results");
+ let span = Span::current();
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "smembers");
+ span.set_attribute(attribute::DB_OPERATION_PARAMETER, cache_key.to_string());
+ span.set_attribute("otel.kind", "client");
+ let res = cache
+ .smembers::<_, Vec<Vec<Vec<u8>>>>(cache_key)
+ .instrument(span)
+ .await?;
+
+ let members = res
+ .first()
+ .ok_or_else(|| anyhow::anyhow!("smembers did not return anything"))?;
+
+ let typologies: Result<Vec<TypologyResult>, _> = members
+ .iter()
+ .map(|value| {
+ <TypologyResult as prost::Message>::decode(value.as_ref()).map_err(anyhow::Error::new)
+ })
+ .collect();
+
+ let typologies = typologies?;
+
+ let mut review = false;
+ for typology in routing.messages[0].typologies.iter() {
+ if let Some(value) = typologies
+ .iter()
+ .find(|value| value.id.eq(&typology.id) && value.version.eq(&typology.version))
+ && value.review
+ {
+ review = true;
+ }
+ }
+
+ Ok((typologies, review))
+}
diff --git a/crates/aggregator/src/state.rs b/crates/aggregator/src/state.rs
new file mode 100644
index 0000000..ac4f574
--- /dev/null
+++ b/crates/aggregator/src/state.rs
@@ -0,0 +1,42 @@
+use sqlx::PgPool;
+use std::{ops::Deref, sync::Arc};
+
+use async_nats::jetstream::Context;
+use warden_stack::{Configuration, cache::RedisManager};
+
+use crate::cnfg::LocalConfig;
+
+#[derive(Clone)]
+pub struct Services {
+ pub jetstream: Context,
+ pub cache: RedisManager,
+ pub postgres: PgPool,
+}
+
+#[derive(Clone)]
+pub struct AppState {
+ pub services: Services,
+ pub config: LocalConfig,
+}
+
+#[derive(Clone)]
+pub struct AppHandle(pub Arc<AppState>);
+
+impl Deref for AppHandle {
+ type Target = Arc<AppState>;
+
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
+}
+
+impl AppState {
+ pub async fn create(
+ services: Services,
+ configuration: &Configuration,
+ ) -> anyhow::Result<AppHandle> {
+ let config = serde_json::from_value(configuration.misc.clone())?;
+
+ Ok(AppHandle(Arc::new(Self { services, config })))
+ }
+}