diff options
author | rtkay123 <dev@kanjala.com> | 2025-08-16 14:13:14 +0200 |
---|---|---|
committer | rtkay123 <dev@kanjala.com> | 2025-08-16 14:13:14 +0200 |
commit | 70a148dd86be9c8ccc56e1fce232262475aa3158 (patch) | |
tree | 51214ab651215ef94a8554039cc953042b043cf6 /crates | |
parent | bf4a2b8b0a04f0cb682db84a835fe7c57d8526bc (diff) | |
download | warden-70a148dd86be9c8ccc56e1fce232262475aa3158.tar.bz2 warden-70a148dd86be9c8ccc56e1fce232262475aa3158.zip |
feat(aggregator): write evaluation
Diffstat (limited to 'crates')
-rw-r--r-- | crates/aggregator/.dockerignore | 5 | ||||
-rw-r--r-- | crates/aggregator/Cargo.toml | 51 | ||||
-rw-r--r-- | crates/aggregator/Dockerfile | 27 | ||||
-rw-r--r-- | crates/aggregator/aggregator.toml | 39 | ||||
-rw-r--r-- | crates/aggregator/migrations/20250816113451_evaluation.sql | 6 | ||||
-rw-r--r-- | crates/aggregator/src/cnfg.rs | 17 | ||||
-rw-r--r-- | crates/aggregator/src/main.rs | 84 | ||||
-rw-r--r-- | crates/aggregator/src/processor.rs | 105 | ||||
-rw-r--r-- | crates/aggregator/src/processor/aggregate.rs | 167 | ||||
-rw-r--r-- | crates/aggregator/src/state.rs | 42 |
10 files changed, 543 insertions, 0 deletions
diff --git a/crates/aggregator/.dockerignore b/crates/aggregator/.dockerignore new file mode 100644 index 0000000..c8cd160 --- /dev/null +++ b/crates/aggregator/.dockerignore @@ -0,0 +1,5 @@ +/target +.env +.git +.github +/contrib diff --git a/crates/aggregator/Cargo.toml b/crates/aggregator/Cargo.toml new file mode 100644 index 0000000..28e4b01 --- /dev/null +++ b/crates/aggregator/Cargo.toml @@ -0,0 +1,51 @@ +[package] +name = "warden-aggregator" +version = "0.1.0" +edition = "2024" +license.workspace = true +homepage.workspace = true +documentation.workspace = true +description.workspace = true + +[dependencies] +anyhow.workspace = true +async-nats.workspace = true +clap = { workspace = true, features = ["derive"] } +config = { workspace = true, features = ["toml"] } +futures-util.workspace = true +opentelemetry.workspace = true +opentelemetry-semantic-conventions.workspace = true +prost.workspace = true +serde = { workspace = true, features = ["derive", "rc"] } +serde_json.workspace = true +sqlx = { workspace = true, features = [ + "json", + "macros", + "migrate", + "postgres", + "runtime-tokio", + "time", + "tls-rustls", + "uuid", +] } +tokio = { workspace = true, features = [ + "macros", + "rt-multi-thread", + "signal", +] } +tonic.workspace = true +tracing.workspace = true +tracing-opentelemetry.workspace = true +uuid = { workspace = true, features = ["v7"] } +warden-core = { workspace = true, features = [ + "message", + "serde", + "time", +] } +warden-stack = { workspace = true, features = [ + "cache", + "nats-jetstream", + "opentelemetry", + "postgres", + "tracing-loki", +] } diff --git a/crates/aggregator/Dockerfile b/crates/aggregator/Dockerfile new file mode 100644 index 0000000..73560e6 --- /dev/null +++ b/crates/aggregator/Dockerfile @@ -0,0 +1,27 @@ +FROM rust:1.89.0-slim AS builder + +ENV SQLX_OFFLINE=true + +RUN rustup target add x86_64-unknown-linux-musl +RUN apt update && apt install -y musl-tools musl-dev protobuf-compiler curl +RUN update-ca-certificates + +WORKDIR /usr/src/app + +RUN mkdir -p crates + +COPY ./.sqlx .sqlx +COPY ./crates/aggregator crates/aggregator +COPY ./lib lib +COPY ./Cargo.toml . +COPY ./Cargo.lock . + +RUN cargo fetch + +COPY ./proto proto + +RUN cargo build --target x86_64-unknown-linux-musl --release + +FROM scratch +COPY --from=builder /usr/src/app/target/x86_64-unknown-linux-musl/release/warden-aggregator ./ +CMD [ "./warden-aggregator" ] diff --git a/crates/aggregator/aggregator.toml b/crates/aggregator/aggregator.toml new file mode 100644 index 0000000..b9a0695 --- /dev/null +++ b/crates/aggregator/aggregator.toml @@ -0,0 +1,39 @@ +[application] +env = "development" + +[monitoring] +log-level = "warden_aggregator=trace,info" +opentelemetry-endpoint = "http://localhost:4317" +loki-endpoint = "http://localhost:3100" + +[misc.nats] +stream-name = "tadp" +subjects = ["tadp.>"] +durable-name = "tadp" + +[database] +pool_size = 100 +port = 5432 +name = "evaluations" +host = "localhost" +password = "password" +user = "postgres" + +[nats] +hosts = ["nats://localhost:4222"] + +[cache] +dsn = "redis://localhost:6379" +pooled = true +type = "non-clustered" # clustered, non-clustered or sentinel +max-connections = 100 + +[cache.sentinel] +master-name = "mymaster" +nodes = [ + { host = "127.0.0.1", port = 26379 }, + { host = "127.0.0.2", port = 26379 }, + { host = "127.0.0.3", port = 26379 }, +] + +# vim:ft=toml diff --git a/crates/aggregator/migrations/20250816113451_evaluation.sql b/crates/aggregator/migrations/20250816113451_evaluation.sql new file mode 100644 index 0000000..c8c0ac7 --- /dev/null +++ b/crates/aggregator/migrations/20250816113451_evaluation.sql @@ -0,0 +1,6 @@ +-- Add migration script here +create table evaluation ( + id uuid primary key, + document jsonb not null, + created_at timestamptz default now() +); diff --git a/crates/aggregator/src/cnfg.rs b/crates/aggregator/src/cnfg.rs new file mode 100644 index 0000000..7c7aaf4 --- /dev/null +++ b/crates/aggregator/src/cnfg.rs @@ -0,0 +1,17 @@ +use std::sync::Arc; + +use serde::Deserialize; + +#[derive(Deserialize, Clone)] +pub struct LocalConfig { + pub nats: NatsConfig, +} + +#[derive(Deserialize, Clone)] +#[serde(rename_all = "kebab-case")] +pub struct NatsConfig { + #[serde(rename = "stream-name")] + pub name: Arc<str>, + pub subjects: Arc<[String]>, + pub durable_name: Arc<str>, +} diff --git a/crates/aggregator/src/main.rs b/crates/aggregator/src/main.rs new file mode 100644 index 0000000..62af544 --- /dev/null +++ b/crates/aggregator/src/main.rs @@ -0,0 +1,84 @@ +mod cnfg; +mod processor; +mod state; + +use anyhow::Result; +use clap::Parser; +use tracing::error; +use warden_stack::{Configuration, Services, tracing::Tracing}; + +use crate::state::AppState; + +/// warden-aggregator +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +struct Args { + /// Path to config file + #[arg(short, long)] + config_file: Option<std::path::PathBuf>, +} + +#[tokio::main] +async fn main() -> Result<()> { + let args = Args::parse(); + let config = include_str!("../aggregator.toml"); + + let mut config = config::Config::builder() + .add_source(config::File::from_str(config, config::FileFormat::Toml)); + + if let Some(cf) = args.config_file.as_ref().and_then(|v| v.to_str()) { + config = config.add_source(config::File::new(cf, config::FileFormat::Toml)); + }; + + let mut config: Configuration = config.build()?.try_deserialize()?; + config.application.name = env!("CARGO_CRATE_NAME").into(); + config.application.version = env!("CARGO_PKG_VERSION").into(); + + let tracing = Tracing::builder() + .opentelemetry(&config.application, &config.monitoring)? + .loki(&config.application, &config.monitoring)? + .build(&config.monitoring); + + let provider = tracing.otel_provider; + + tokio::spawn(tracing.loki_task); + + let mut services = Services::builder() + .postgres(&config.database) + .await + .inspect_err(|e| error!("database: {e}"))? + .cache(&config.cache) + .await + .inspect_err(|e| error!("cache: {e}"))? + .nats_jetstream(&config.nats) + .await + .inspect_err(|e| error!("nats: {e}"))? + .build(); + + let postgres = services + .postgres + .take() + .ok_or_else(|| anyhow::anyhow!("database is not ready"))?; + + let cache = services + .cache + .take() + .ok_or_else(|| anyhow::anyhow!("cache is not ready"))?; + + let jetstream = services + .jetstream + .take() + .ok_or_else(|| anyhow::anyhow!("jetstream is not ready"))?; + + let services = state::Services { + postgres, + cache, + jetstream, + }; + + let state = AppState::create(services, &config).await?; + + processor::serve(state, provider).await?; + + Ok(()) +} diff --git a/crates/aggregator/src/processor.rs b/crates/aggregator/src/processor.rs new file mode 100644 index 0000000..3a7c8ac --- /dev/null +++ b/crates/aggregator/src/processor.rs @@ -0,0 +1,105 @@ +mod aggregate; + +use anyhow::Result; +use async_nats::{ + self, + jetstream::{ + Context, + consumer::{Consumer, pull::Config}, + }, +}; +use futures_util::{StreamExt as _, future}; +use tokio::signal; +use tracing::{debug, error, info}; +use warden_stack::tracing::SdkTracerProvider; + +use crate::{cnfg::NatsConfig, state::AppHandle}; + +pub async fn serve(state: AppHandle, provider: SdkTracerProvider) -> Result<()> { + tokio::select! { + _ = run(state) => {} + _ = shutdown_signal(provider) => {} + }; + Ok(()) +} + +async fn run(state: AppHandle) -> anyhow::Result<()> { + let consumer = get_or_create_stream(&state.services.jetstream, &state.config.nats).await?; + + let limit = None; + + consumer + .messages() + .await? + .for_each_concurrent(limit, |message| { + let state = state.clone(); + tokio::spawn(async move { + if let Ok(message) = message + && let Err(e) = aggregate::handle(message, state).await + { + error!("{}", e.to_string()); + } + }); + future::ready(()) + }) + .await; + + Ok(()) +} + +async fn get_or_create_stream( + jetstream: &Context, + nats: &NatsConfig, +) -> anyhow::Result<Consumer<Config>> { + debug!(name = ?nats.name, subjects = ?nats.subjects, "getting or creating stream"); + let stream = jetstream + .get_or_create_stream(async_nats::jetstream::stream::Config { + name: nats.name.to_string(), + subjects: nats.subjects.iter().map(|v| v.to_string()).collect(), + ..Default::default() + }) + .await?; + let durable = nats.durable_name.to_string(); + // Get or create a pull-based consumer + let consumer = stream + .get_or_create_consumer( + durable.as_ref(), + async_nats::jetstream::consumer::pull::Config { + durable_name: Some(durable.to_string()), + ..Default::default() + }, + ) + .await?; + + info!(subject = ?nats.subjects, "ready to receive messages"); + Ok(consumer) +} + +async fn shutdown_signal(provider: SdkTracerProvider) -> Result<()> { + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + }; + + #[cfg(unix)] + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("failed to install signal handler") + .recv() + .await; + }; + + #[cfg(not(unix))] + let terminate = std::future::pending::<()>(); + + tokio::select! { + _ = ctrl_c => { + }, + _ = terminate => { + }, + } + let _ = provider.shutdown(); + + Ok(()) +} diff --git a/crates/aggregator/src/processor/aggregate.rs b/crates/aggregator/src/processor/aggregate.rs new file mode 100644 index 0000000..d9d8482 --- /dev/null +++ b/crates/aggregator/src/processor/aggregate.rs @@ -0,0 +1,167 @@ +use async_nats::jetstream::Message; +use opentelemetry::global; +use opentelemetry_semantic_conventions::attribute; +use tracing::{Instrument, Span, debug, error, info, info_span, instrument, trace}; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use uuid::Uuid; +use warden_core::{ + configuration::routing::RoutingConfiguration, + message::{AggregationResult, Payload, TypologyResult, payload::Transaction}, +}; +use warden_stack::{redis::AsyncCommands, tracing::telemetry::nats::extractor}; + +use crate::state::AppHandle; + +#[instrument(skip(message, state), err(Debug))] +pub async fn handle(message: Message, state: AppHandle) -> anyhow::Result<()> { + let span = Span::current(); + + if let Some(ref headers) = message.headers { + let context = global::get_text_map_propagator(|propagator| { + propagator.extract(&extractor::HeaderMap(headers)) + }); + span.set_parent(context); + }; + + let mut payload: Payload = prost::Message::decode(message.payload.as_ref())?; + + if let (Some(ref typology_result), Some(Transaction::Pacs002(document)), Some(routing)) = ( + payload.typology_result.take(), + &payload.transaction, + &payload.routing, + ) { + let cache_key = format!("tadp_{}_tp", document.f_i_to_f_i_pmt_sts_rpt.grp_hdr.msg_id); + let (typology_results, review) = + handle_typologies(typology_result, &state, &cache_key, routing).await?; + + if typology_results + .len() + .ne(&routing.messages[0].typologies.len()) + { + trace!("insufficient typology results for this typology. waiting for more"); + return Ok(()); + } + + let aggs = AggregationResult { + id: routing.messages[0].id.to_owned(), + version: routing.messages[0].version.to_owned(), + typology_results, + review, + }; + + payload.aggregation_result = Some(aggs); + let _ = payload.rule_result.take(); + + let id = Uuid::now_v7(); + debug!(%id, "inserting evaluation result"); + + let span = info_span!("create.evaluations.evaluation"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "insert"); + span.set_attribute(attribute::DB_COLLECTION_NAME, "transaction"); + span.set_attribute("otel.kind", "client"); + + sqlx::query!( + "insert into evaluation (id, document) values ($1, $2)", + id, + sqlx::types::Json(&payload) as _ + ) + .execute(&state.services.postgres) + .instrument(span) + .await?; + info!(%id, "evaluation added"); + + let mut cache = state.services.cache.get().await?; + let span = Span::current(); + span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey"); + span.set_attribute(attribute::DB_OPERATION_NAME, "del"); + span.set_attribute(attribute::DB_OPERATION_PARAMETER, cache_key.to_string()); + span.set_attribute("otel.kind", "client"); + debug!("cache cleared"); + + cache.del::<_, ()>(&cache_key).await?; + } else { + error!("payload has insufficient data"); + } + + let span = info_span!("nats.ack"); + message + .ack() + .instrument(span) + .await + .map_err(|_| anyhow::anyhow!("ack error"))?; + + Ok(()) +} + +async fn handle_typologies( + payload: &TypologyResult, + state: &AppHandle, + cache_key: &str, + routing: &RoutingConfiguration, +) -> anyhow::Result<(Vec<TypologyResult>, bool)> { + let mut cache = state.services.cache.get().await?; + let bytes = prost::Message::encode_to_vec(payload); + + let span = Span::current(); + span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey"); + span.set_attribute(attribute::DB_OPERATION_NAME, "sadd+scard"); + span.set_attribute(attribute::DB_OPERATION_PARAMETER, cache_key.to_string()); + span.set_attribute("otel.kind", "client"); + + debug!("saving typology result"); + let res = warden_stack::redis::pipe() + .sadd::<_, _>(cache_key, bytes) + .ignore() + .scard(cache_key) + .query_async::<Vec<usize>>(&mut cache) + .instrument(span) + .await?; + + let typology_count = res + .first() + .ok_or_else(|| anyhow::anyhow!("smembers did not return anything"))?; + + let typologies = &routing.messages[0].typologies; + + if typology_count.lt(&typologies.len()) { + return Ok((vec![], false)); + } + + debug!("getting all typology results"); + let span = Span::current(); + span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey"); + span.set_attribute(attribute::DB_OPERATION_NAME, "smembers"); + span.set_attribute(attribute::DB_OPERATION_PARAMETER, cache_key.to_string()); + span.set_attribute("otel.kind", "client"); + let res = cache + .smembers::<_, Vec<Vec<Vec<u8>>>>(cache_key) + .instrument(span) + .await?; + + let members = res + .first() + .ok_or_else(|| anyhow::anyhow!("smembers did not return anything"))?; + + let typologies: Result<Vec<TypologyResult>, _> = members + .iter() + .map(|value| { + <TypologyResult as prost::Message>::decode(value.as_ref()).map_err(anyhow::Error::new) + }) + .collect(); + + let typologies = typologies?; + + let mut review = false; + for typology in routing.messages[0].typologies.iter() { + if let Some(value) = typologies + .iter() + .find(|value| value.id.eq(&typology.id) && value.version.eq(&typology.version)) + && value.review + { + review = true; + } + } + + Ok((typologies, review)) +} diff --git a/crates/aggregator/src/state.rs b/crates/aggregator/src/state.rs new file mode 100644 index 0000000..ac4f574 --- /dev/null +++ b/crates/aggregator/src/state.rs @@ -0,0 +1,42 @@ +use sqlx::PgPool; +use std::{ops::Deref, sync::Arc}; + +use async_nats::jetstream::Context; +use warden_stack::{Configuration, cache::RedisManager}; + +use crate::cnfg::LocalConfig; + +#[derive(Clone)] +pub struct Services { + pub jetstream: Context, + pub cache: RedisManager, + pub postgres: PgPool, +} + +#[derive(Clone)] +pub struct AppState { + pub services: Services, + pub config: LocalConfig, +} + +#[derive(Clone)] +pub struct AppHandle(pub Arc<AppState>); + +impl Deref for AppHandle { + type Target = Arc<AppState>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl AppState { + pub async fn create( + services: Services, + configuration: &Configuration, + ) -> anyhow::Result<AppHandle> { + let config = serde_json::from_value(configuration.misc.clone())?; + + Ok(AppHandle(Arc::new(Self { services, config }))) + } +} |