diff options
23 files changed, 1429 insertions, 25 deletions
diff --git a/.github/codecov.yml b/.github/codecov.yml index a51dfc6..dc4c65c 100644 --- a/.github/codecov.yml +++ b/.github/codecov.yml @@ -7,7 +7,3 @@ coverage: project: default: threshold: 1% - -comment: - layout: "files" - require_changes: true diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index db90ef4..56cfdda 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -51,6 +51,7 @@ jobs: - router - rule-executor - aggregator + - typologies name: dockerfile / ${{ matrix.crate }} steps: - uses: actions/checkout@v5 @@ -166,19 +167,22 @@ jobs: for processor in warden pseudonyms aggregator configuration; do cp crates/$processor/.env.example crates/$processor/.env done - - name: Collect coverage data - # Generate separate reports for nextest and doctests, and combine them. - run: | - cargo llvm-cov nextest --workspace --locked --all-features --lcov --output-path lcov.info - # switch to nightly - # cargo llvm-cov --no-report nextest - # cargo llvm-cov --no-report --doc - # cargo llvm-cov report --doctests --lcov --output-path lcov.info - - name: Upload coverage reports to Codecov + # - name: Collect coverage data + # # Generate separate reports for nextest and doctests, and combine them. + # run: | + # cargo llvm-cov nextest --workspace --locked --all-features --lcov --output-path lcov.info + # # switch to nightly + # # cargo llvm-cov --no-report nextest + # # cargo llvm-cov --no-report --doc + # # cargo llvm-cov report --doctests --lcov --output-path lcov.info + - name: cargo llvm-cov + run: cargo llvm-cov nextest --workspace --locked --all-features --lcov --output-path lcov.info + - name: Upload to codecov.io uses: codecov/codecov-action@v5 with: + fail_ci_if_error: true token: ${{ secrets.CODECOV_TOKEN }} - files: lcov.info + slug: ${{ github.repository }} - name: Stop stack if: always() run: | @@ -3717,10 +3717,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1dccffe3ce07af9386bfd29e80c0ab1a8205a2fc34e4bcd40364df902cfa8f3f" [[package]] -name = "typologies" -version = "0.1.0" - -[[package]] name = "unicase" version = "2.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -4129,6 +4125,31 @@ dependencies = [ ] [[package]] +name = "warden-typologies" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-nats", + "clap", + "config", + "futures-util", + "moka", + "opentelemetry", + "opentelemetry-semantic-conventions", + "prost 0.14.1", + "serde", + "serde_json", + "tokio", + "tonic 0.14.1", + "tracing", + "tracing-opentelemetry", + "uuid", + "warden-core", + "warden-middleware", + "warden-stack", +] + +[[package]] name = "wasi" version = "0.11.1+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" diff --git a/crates/configuration/src/main.rs b/crates/configuration/src/main.rs index 6f90b69..a7843a1 100644 --- a/crates/configuration/src/main.rs +++ b/crates/configuration/src/main.rs @@ -7,9 +7,13 @@ use std::net::{Ipv6Addr, SocketAddr}; use crate::{server::error::AppError, state::AppState}; use axum::http::header::CONTENT_TYPE; use clap::Parser; +use tokio::signal; use tower::{make::Shared, steer::Steer}; use tracing::{error, info, trace}; -use warden_stack::{Configuration, Services, tracing::Tracing}; +use warden_stack::{ + Configuration, Services, + tracing::{SdkTracerProvider, Tracing}, +}; /// warden-config #[derive(Parser, Debug)] @@ -114,7 +118,37 @@ async fn main() -> Result<(), AppError> { let listener = tokio::net::TcpListener::bind(addr).await?; info!(port = addr.port(), "starting config-api"); - axum::serve(listener, Shared::new(service)).await?; + axum::serve(listener, Shared::new(service)) + .with_graceful_shutdown(shutdown_signal(provider)) + .await?; Ok(()) } + +async fn shutdown_signal(provider: SdkTracerProvider) { + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + }; + + #[cfg(unix)] + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("failed to install signal handler") + .recv() + .await; + }; + + #[cfg(not(unix))] + let terminate = std::future::pending::<()>(); + + tokio::select! { + _ = ctrl_c => {}, + _ = terminate => {}, + } + + provider + .shutdown() + .expect("failed to shutdown trace provider"); +} diff --git a/crates/rule-executor/src/main.rs b/crates/rule-executor/src/main.rs index 18c9222..ed284c6 100644 --- a/crates/rule-executor/src/main.rs +++ b/crates/rule-executor/src/main.rs @@ -1,4 +1,6 @@ +#[allow(dead_code)] mod cnfg; + mod processor; mod state; diff --git a/crates/typologies/.dockerignore b/crates/typologies/.dockerignore new file mode 100644 index 0000000..c8cd160 --- /dev/null +++ b/crates/typologies/.dockerignore @@ -0,0 +1,5 @@ +/target +.env +.git +.github +/contrib diff --git a/crates/typologies/Cargo.toml b/crates/typologies/Cargo.toml index 207e671..38a8cb4 100644 --- a/crates/typologies/Cargo.toml +++ b/crates/typologies/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "typologies" +name = "warden-typologies" version = "0.1.0" edition = "2024" license.workspace = true @@ -8,3 +8,36 @@ documentation.workspace = true description.workspace = true [dependencies] +anyhow.workspace = true +async-nats.workspace = true +clap = { workspace = true, features = ["derive"] } +config = { workspace = true, features = ["toml"] } +futures-util = { workspace = true, default-features = false } +moka = { workspace = true, features = ["future"] } +opentelemetry.workspace = true +opentelemetry-semantic-conventions.workspace = true +prost.workspace = true +serde = { workspace = true, features = ["derive"] } +serde_json.workspace = true +tokio = { workspace = true, features = [ + "macros", + "rt-multi-thread", + "signal", +] } +tonic.workspace = true +tracing.workspace = true +tracing-opentelemetry.workspace = true +uuid = { workspace = true, features = ["v7"] } +warden-core = { workspace = true, features = [ + "message", + "configuration", + "serde-time", +] } +warden-middleware.workspace = true +warden-stack = { workspace = true, features = [ + "cache", + "nats-jetstream", + "opentelemetry", + "opentelemetry-tonic", + "tracing-loki", +] } diff --git a/crates/typologies/Dockerfile b/crates/typologies/Dockerfile new file mode 100644 index 0000000..61bb2e2 --- /dev/null +++ b/crates/typologies/Dockerfile @@ -0,0 +1,26 @@ +FROM rust:1.89.0-slim AS builder + +ENV SQLX_OFFLINE=true + +RUN rustup target add x86_64-unknown-linux-musl +RUN apt update && apt install -y musl-tools musl-dev protobuf-compiler curl +RUN update-ca-certificates + +WORKDIR /usr/src/app + +RUN mkdir -p crates + +COPY ./crates/typologies crates/typologies +COPY ./lib lib +COPY ./Cargo.toml . +COPY ./Cargo.lock . + +RUN cargo fetch + +COPY ./proto proto + +RUN cargo build --target x86_64-unknown-linux-musl --release + +FROM scratch +COPY --from=builder /usr/src/app/target/x86_64-unknown-linux-musl/release/warden-typologies ./ +CMD [ "./warden-typologies" ] diff --git a/crates/typologies/src/cnfg.rs b/crates/typologies/src/cnfg.rs new file mode 100644 index 0000000..6086f46 --- /dev/null +++ b/crates/typologies/src/cnfg.rs @@ -0,0 +1,27 @@ +use std::sync::Arc; + +use serde::Deserialize; + +#[derive(Deserialize, Clone)] +#[serde(rename_all = "kebab-case")] +pub struct LocalConfig { + pub config_endpoint: Arc<str>, + pub nats: Nats, +} + +#[derive(Deserialize, Clone)] +#[serde(rename_all = "kebab-case")] +pub struct Nats { + pub subjects: Arc<[String]>, + pub destination_prefix: Arc<str>, + pub max_messages: i64, + pub durable_name: Arc<str>, + pub config: ConfigNats, +} + +#[derive(Deserialize, Clone)] +#[serde(rename_all = "kebab-case")] +pub struct ConfigNats { + pub stream: Arc<str>, + pub reload_subject: Arc<str>, +} diff --git a/crates/typologies/src/main.rs b/crates/typologies/src/main.rs index e7a11a9..ea7843a 100644 --- a/crates/typologies/src/main.rs +++ b/crates/typologies/src/main.rs @@ -1,3 +1,65 @@ -fn main() { - println!("Hello, world!"); +mod cnfg; +mod processor; +mod state; + +use anyhow::Result; +use clap::Parser; +use tracing::error; +use warden_stack::{Configuration, Services, tracing::Tracing}; + +/// typologies +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +struct Args { + /// Path to config file + #[arg(short, long)] + config_file: Option<std::path::PathBuf>, +} + +#[tokio::main] +async fn main() -> Result<()> { + let args = Args::parse(); + let config = include_str!("../typologies.toml"); + + let mut config = config::Config::builder() + .add_source(config::File::from_str(config, config::FileFormat::Toml)); + + if let Some(cf) = args.config_file.as_ref().and_then(|v| v.to_str()) { + config = config.add_source(config::File::new(cf, config::FileFormat::Toml)); + }; + + let mut config: Configuration = config.build()?.try_deserialize()?; + config.application.name = env!("CARGO_CRATE_NAME").into(); + config.application.version = env!("CARGO_PKG_VERSION").into(); + + let tracing = Tracing::builder() + .opentelemetry(&config.application, &config.monitoring)? + .loki(&config.application, &config.monitoring)? + .build(&config.monitoring); + + let provider = tracing.otel_provider; + + tokio::spawn(tracing.loki_task); + + let mut services = Services::builder() + .nats_jetstream(&config.nats) + .await + .inspect_err(|e| error!("nats: {e}"))? + .build(); + + let jetstream = services + .jetstream + .take() + .ok_or_else(|| anyhow::anyhow!("jetstream is not ready"))?; + + let cache = services + .cache + .take() + .ok_or_else(|| anyhow::anyhow!("cache is not ready"))?; + + let services = state::Services { jetstream, cache }; + + processor::serve(services, config, provider) + .await + .inspect_err(|e| error!("{e}")) } diff --git a/crates/typologies/src/processor.rs b/crates/typologies/src/processor.rs new file mode 100644 index 0000000..a2a3b17 --- /dev/null +++ b/crates/typologies/src/processor.rs @@ -0,0 +1,124 @@ +mod driver; +mod publish; +mod reload; +mod typology; + +use std::sync::Arc; + +use anyhow::Result; +use async_nats::jetstream::{ + Context, + consumer::{Consumer, pull}, +}; +use futures_util::{StreamExt, future}; +use tokio::signal; +use tracing::{error, trace, warn}; +use warden_stack::{Configuration, tracing::SdkTracerProvider}; + +use crate::{ + cnfg::Nats, + state::{AppHandle, AppState, Services}, +}; + +pub async fn serve( + services: Services, + config: Configuration, + provider: SdkTracerProvider, +) -> anyhow::Result<()> { + let state = Arc::new(AppState::new(services, config).await?); + + tokio::select! { + _ = futures_util::future::try_join(reload::reload(Arc::clone(&state)), run(state)) => {} + _ = shutdown_signal(provider) => {} + }; + + Ok(()) +} + +async fn run(state: AppHandle) -> anyhow::Result<()> { + let consumer = get_or_create_stream(&state.services.jetstream, &state.config.nats).await?; + + let limit = None; + + consumer + .messages() + .await? + .for_each_concurrent(limit, |message| { + let state = Arc::clone(&state); + tokio::spawn(async move { + match message { + Ok(message) => { + if let Err(e) = typology::process_typology(message, state).await { + error!("{e:?}"); + } + } + Err(e) => { + warn!("{e:?}"); + } + } + }); + future::ready(()) + }) + .await; + + Ok(()) +} + +async fn get_or_create_stream( + jetstream: &Context, + nats: &Nats, +) -> anyhow::Result<Consumer<pull::Config>> { + let name = format!("{}.v{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION")) + .replace(".", "") + .replace("_", ""); + trace!(name = name, subjects = ?nats.subjects, "getting or creating stream"); + + let stream = jetstream + .get_or_create_stream(async_nats::jetstream::stream::Config { + name, + subjects: nats.subjects.iter().map(Into::into).collect(), + max_messages: nats.max_messages, + ..Default::default() + }) + .await?; + let durable = nats.durable_name.to_string(); + // Get or create a pull-based consumer + Ok(stream + .get_or_create_consumer( + durable.as_ref(), + async_nats::jetstream::consumer::pull::Config { + durable_name: Some(durable.to_string()), + ..Default::default() + }, + ) + .await?) +} + +async fn shutdown_signal(provider: SdkTracerProvider) -> Result<()> { + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + }; + + #[cfg(unix)] + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("failed to install signal handler") + .recv() + .await; + }; + + #[cfg(not(unix))] + let terminate = std::future::pending::<()>(); + + tokio::select! { + _ = ctrl_c => { + }, + _ = terminate => { + }, + } + let _ = provider.shutdown(); + + Ok(()) +} diff --git a/crates/typologies/src/processor/driver.rs b/crates/typologies/src/processor/driver.rs new file mode 100644 index 0000000..d150620 --- /dev/null +++ b/crates/typologies/src/processor/driver.rs @@ -0,0 +1,40 @@ +use tonic::IntoRequest; +use warden_core::configuration::typology::{TypologyConfiguration, TypologyConfigurationRequest}; + +use crate::state::AppHandle; + +pub trait GetTypologyConfiguration { + fn get_typology_config( + &self, + typology_key: TypologyConfigurationRequest, + ) -> impl std::future::Future<Output = anyhow::Result<TypologyConfiguration>> + Send; +} + +impl GetTypologyConfiguration for AppHandle { + async fn get_typology_config( + &self, + typology_key: TypologyConfigurationRequest, + ) -> anyhow::Result<TypologyConfiguration> { + { + let local_cache = self.local_cache.read().await; + if let Some(result) = local_cache.get(&typology_key).await.map(Ok) { + return result; + } + } + + let local_cache = self.local_cache.write().await; + let mut client = self.query_typology_client.clone(); + + let value = client + .get_typology_configuration(typology_key.clone().into_request()) + .await? + .into_inner() + .configuration + .ok_or_else(|| anyhow::anyhow!("configuration unavailable"))?; + local_cache + .insert(typology_key.clone(), value.clone()) + .await; + + Ok(value) + } +} diff --git a/crates/typologies/src/processor/publish.rs b/crates/typologies/src/processor/publish.rs new file mode 100644 index 0000000..b031bf3 --- /dev/null +++ b/crates/typologies/src/processor/publish.rs @@ -0,0 +1,44 @@ +use opentelemetry::global; +use opentelemetry_semantic_conventions::attribute; +use tracing::{Instrument, Span, debug, info_span, warn}; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use warden_core::message::Payload; +use warden_stack::tracing::telemetry::nats::injector; + +use crate::state::AppHandle; + +pub(crate) async fn to_tadp( + subject: &str, + state: AppHandle, + payload: Payload, +) -> anyhow::Result<()> { + // send transaction to next with nats + let subject = format!("{}.{}", state.config.nats.destination_prefix, subject); + debug!(subject = ?subject, "publishing"); + + let payload = prost::Message::encode_to_vec(&payload); + + let mut headers = async_nats::HeaderMap::new(); + + let cx = Span::current().context(); + + global::get_text_map_propagator(|propagator| { + propagator.inject_context(&cx, &mut injector::HeaderMap(&mut headers)) + }); + + let span = info_span!("nats.publish"); + span.set_attribute( + attribute::MESSAGING_DESTINATION_SUBSCRIPTION_NAME, + subject.to_string(), + ); + span.set_attribute(attribute::MESSAGING_SYSTEM, "nats"); + state + .services + .jetstream + .publish_with_headers(subject.clone(), headers, payload.into()) + .instrument(span) + .await + .inspect_err(|e| warn!(subject = ?subject, "failed to publish: {e}"))?; + + Ok(()) +} diff --git a/crates/typologies/src/processor/reload.rs b/crates/typologies/src/processor/reload.rs new file mode 100644 index 0000000..fac4c40 --- /dev/null +++ b/crates/typologies/src/processor/reload.rs @@ -0,0 +1,71 @@ +use async_nats::jetstream::consumer; +use futures_util::StreamExt; +use prost::Message as _; +use tracing::{error, info, trace}; +use uuid::Uuid; +use warden_core::configuration::{ConfigKind, ReloadEvent, typology::TypologyConfigurationRequest}; + +use crate::state::AppHandle; + +pub async fn reload(state: AppHandle) -> anyhow::Result<()> { + let id = Uuid::now_v7().to_string(); + info!(durable = id, "listening for configuration changes"); + + let durable = &id; + let consumer = state + .services + .jetstream + .get_stream(state.config.nats.config.stream.to_string()) + .await? + .get_or_create_consumer( + durable, + consumer::pull::Config { + durable_name: Some(durable.to_string()), + filter_subject: state.config.nats.config.reload_subject.to_string(), + deliver_policy: consumer::DeliverPolicy::LastPerSubject, + ..Default::default() + }, + ) + .await?; + + let mut messages = consumer.messages().await?; + while let Some(value) = messages.next().await { + match value { + Ok(message) => { + trace!("got reload cache event"); + if let Ok(res) = ReloadEvent::decode(message.payload.as_ref()) + && let Ok(kind) = ConfigKind::try_from(res.kind) + { + match kind { + ConfigKind::Typology => { + let local_cache = state.local_cache.write().await; + let id = res.id(); + let version = res.version(); + trace!( + id = id, + ver = version, + "update triggered, invalidating typology config" + ); + let key = TypologyConfigurationRequest { + id: id.to_string(), + version: version.to_string(), + }; + + local_cache.invalidate(&key).await; + let _ = message.ack().await.inspect_err(|e| error!("{e}")); + } + _ => { + trace!(kind = ?kind, "detected reload event, nothing to do here, acknowledging"); + let _ = message.ack().await.inspect_err(|e| error!("{e}")); + } + } + } + } + Err(e) => { + error!("{e:?}") + } + } + } + + Ok(()) +} diff --git a/crates/typologies/src/processor/typology.rs b/crates/typologies/src/processor/typology.rs new file mode 100644 index 0000000..b1b2592 --- /dev/null +++ b/crates/typologies/src/processor/typology.rs @@ -0,0 +1,180 @@ +mod aggregate_rules; +mod evaluate_expression; + +use std::sync::Arc; + +use anyhow::Result; +use opentelemetry::global; +use prost::Message; +use tracing::{Instrument, Span, error, info, info_span, instrument, warn}; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use warden_core::{ + configuration::{routing::RoutingConfiguration, typology::TypologyConfigurationRequest}, + message::{Payload, RuleResult, TypologyResult}, +}; +use warden_stack::{redis::AsyncCommands, tracing::telemetry::nats::extractor}; + +use crate::{ + processor::{driver::GetTypologyConfiguration as _, publish}, + state::AppHandle, +}; + +#[instrument(skip(message, state), err(Debug))] +pub async fn process_typology( + message: async_nats::jetstream::Message, + state: AppHandle, +) -> Result<()> { + let span = Span::current(); + + if let Some(ref headers) = message.headers { + let context = global::get_text_map_propagator(|propagator| { + propagator.extract(&extractor::HeaderMap(headers)) + }); + span.set_parent(context); + }; + + let payload: Payload = Message::decode(message.payload.as_ref())?; + + if payload.transaction.is_none() { + warn!("transaction is empty - proceeding with ack"); + let _ = message.ack().await; + return Ok(()); + } + + let transaction = payload.transaction.as_ref().expect("to have returned"); + + match transaction { + warden_core::message::payload::Transaction::Pacs008(_) => { + warn!("Pacs008 is unsupported on this version: this should be unreachable"); + } + warden_core::message::payload::Transaction::Pacs002(pacs002_document) => { + let key = format!( + "tp_{}", + pacs002_document.f_i_to_f_i_pmt_sts_rpt.grp_hdr.msg_id + ); + + let rule_result = &payload + .rule_result + .as_ref() + .expect("rule result should be here"); + let rule_results = cache_and_get_all(&key, rule_result, Arc::clone(&state)).await?; + + let routing = payload + .routing + .as_ref() + .expect("routing missing from payload"); + + let (mut typology_result, _rule_count) = + aggregate_rules::aggregate_rules(&rule_results, routing, rule_result)?; + + let _ = evaluate_typology(&mut typology_result, routing, payload.clone(), &key, state) + .await + .inspect_err(|e| error!("{e}")); + } + }; + + let span = info_span!("nats.ack"); + message + .ack() + .instrument(span) + .await + .map_err(|_| anyhow::anyhow!("ack error"))?; + + Ok(()) +} + +#[instrument(skip(typology_result, routing, payload, state), err(Debug))] +async fn evaluate_typology( + typology_result: &mut [TypologyResult], + routing: &RoutingConfiguration, + mut payload: Payload, + key: &str, + state: AppHandle, +) -> Result<()> { + for typology_result in typology_result.iter_mut() { + let handle = Arc::clone(&state); + let routing_rules = routing.messages[0].typologies.iter().find(|typology| { + typology.version.eq(&typology_result.version) && typology.id.eq(&typology_result.id) + }); + let typology_result_rules = &typology_result.rule_results; + + if routing_rules.is_some() + && typology_result_rules.len() < routing_rules.unwrap().rules.len() + { + continue; + } + + let typology_config = handle + .get_typology_config(TypologyConfigurationRequest { + id: typology_result.id.to_owned(), + version: typology_result.version.to_owned(), + }) + .await?; + + let result = evaluate_expression::evaluate_expression(typology_result, &typology_config)?; + + typology_result.result = result; + + let workflow = typology_config + .workflow + .as_ref() + .expect("no workflow in config"); + + if workflow.interdiction_threshold.is_some() { + typology_result.workflow.replace(*workflow); + } + typology_result.review = result.ge(&typology_config.workflow.unwrap().alert_threshold); + + payload.typology_result = Some(typology_result.to_owned()); + + let is_interdicting = typology_config + .workflow + .unwrap() + .interdiction_threshold + .is_some_and(|value| value > 0.0 && result >= value); + + if is_interdicting { + typology_result.review = true; + } + + if result >= typology_config.workflow.unwrap().alert_threshold { + info!("alerting"); + } + + let subj = handle.config.nats.destination_prefix.to_string(); + let _ = publish::to_tadp(&subj, handle, payload.clone()) + .await + .inspect_err(|e| error!("{e}")); + + let mut c = state.services.cache.get().await?; + c.del::<_, ()>(key).await?; + } + + Ok(()) +} + +async fn cache_and_get_all( + cache_key: &str, + rule_result: &RuleResult, + state: AppHandle, +) -> Result<Vec<RuleResult>> { + let mut cache = state.services.cache.get().await?; + + let bytes = prost::Message::encode_to_vec(rule_result); + + let res = warden_stack::redis::pipe() + .sadd::<_, _>(cache_key, bytes) + .ignore() + .smembers(cache_key) + .query_async::<Vec<Vec<Vec<u8>>>>(&mut cache) + .await?; + + let members = res + .first() + .ok_or_else(|| anyhow::anyhow!("smembers did not return anything"))?; + + members + .iter() + .map(|value| RuleResult::decode(value.as_ref()).map_err(anyhow::Error::new)) + .collect() +} diff --git a/crates/typologies/src/processor/typology/aggregate_rules.rs b/crates/typologies/src/processor/typology/aggregate_rules.rs new file mode 100644 index 0000000..8f92dae --- /dev/null +++ b/crates/typologies/src/processor/typology/aggregate_rules.rs @@ -0,0 +1,202 @@ +use anyhow::Result; +use std::collections::HashSet; + +use warden_core::{ + configuration::routing::RoutingConfiguration, + message::{RuleResult, TypologyResult}, +}; + +pub(super) fn aggregate_rules( + rule_results: &[RuleResult], + routing: &RoutingConfiguration, + rule_result: &RuleResult, +) -> Result<(Vec<TypologyResult>, usize)> { + let mut typology_result: Vec<TypologyResult> = vec![]; + let mut all_rules_set = HashSet::new(); + + routing.messages.iter().for_each(|message| { + message.typologies.iter().for_each(|typology| { + let mut set = HashSet::new(); + + for rule in typology.rules.iter() { + set.insert((&rule.id, rule.version())); + all_rules_set.insert((&rule.id, rule.version())); + } + + if !set.contains(&(&rule_result.id, rule_result.version.as_str())) { + return; + } + + let rule_results: Vec<_> = rule_results + .iter() + .filter_map(|value| { + if set.contains(&(&value.id, &value.version)) { + Some(value.to_owned()) + } else { + None + } + }) + .collect(); + + if !rule_results.is_empty() { + typology_result.push(TypologyResult { + id: typology.id.to_owned(), + version: typology.version.to_owned(), + rule_results, + ..Default::default() + }); + } + }); + }); + + Ok((typology_result, all_rules_set.len())) +} + +#[cfg(test)] +mod tests { + use super::*; + use warden_core::{ + configuration::routing::{Message, RoutingConfiguration, Rule, Typology}, + message::RuleResult, + }; + + fn create_rule(id: &str, version: &str) -> Rule { + Rule { + id: id.to_string(), + version: Some(version.to_string()), + } + } + + fn create_rule_result(id: &str, version: &str) -> RuleResult { + RuleResult { + id: id.to_string(), + version: version.to_string(), + ..Default::default() + } + } + + #[test] + fn returns_empty_when_no_matching_typology() { + let routing = RoutingConfiguration { + messages: vec![Message { + typologies: vec![Typology { + id: "T1".to_string(), + version: "v1".to_string(), + rules: vec![create_rule("R1", "v1")], + }], + ..Default::default() + }], + ..Default::default() + }; + + let rule_results = vec![create_rule_result("R2", "v1")]; + let input_rule = create_rule_result("R2", "v1"); + + let (result, count) = aggregate_rules(&rule_results, &routing, &input_rule).unwrap(); + assert!(result.is_empty()); + assert_eq!(count, 1); // one rule in routing + } + + #[test] + fn returns_typology_with_matching_rule() { + let routing = RoutingConfiguration { + messages: vec![Message { + typologies: vec![Typology { + id: "T1".to_string(), + version: "v1".to_string(), + rules: vec![create_rule("R1", "v1"), create_rule("R2", "v1")], + }], + ..Default::default() + }], + ..Default::default() + }; + + let rule_results = vec![ + create_rule_result("R1", "v1"), + create_rule_result("R2", "v1"), + ]; + + let input_rule = create_rule_result("R1", "v1"); + + let (result, count) = aggregate_rules(&rule_results, &routing, &input_rule).unwrap(); + + assert_eq!(count, 2); // R1, R2 + assert_eq!(result.len(), 1); + assert_eq!(result[0].id, "T1"); + assert_eq!(result[0].rule_results.len(), 2); + } + + #[test] + fn ignores_unrelated_rules_in_rule_results() { + let routing = RoutingConfiguration { + messages: vec![Message { + typologies: vec![Typology { + id: "T1".to_string(), + version: "v1".to_string(), + rules: vec![create_rule("R1", "v1")], + }], + ..Default::default() + }], + ..Default::default() + }; + + let rule_results = vec![ + create_rule_result("R1", "v1"), + create_rule_result("R99", "v1"), // unrelated + ]; + + let input_rule = create_rule_result("R1", "v1"); + + let (result, count) = aggregate_rules(&rule_results, &routing, &input_rule).unwrap(); + + assert_eq!(count, 1); + assert_eq!(result.len(), 1); + assert_eq!(result[0].rule_results.len(), 1); + assert_eq!(result[0].rule_results[0].id, "R1"); + } + + #[test] + fn handles_multiple_messages_and_typologies() { + let routing = RoutingConfiguration { + messages: vec![ + Message { + typologies: vec![ + Typology { + id: "T1".to_string(), + version: "v1".to_string(), + rules: vec![create_rule("R1", "v1")], + }, + Typology { + id: "T2".to_string(), + version: "v1".to_string(), + rules: vec![create_rule("R2", "v1")], + }, + ], + ..Default::default() + }, + Message { + typologies: vec![Typology { + id: "T3".to_string(), + version: "v1".to_string(), + rules: vec![create_rule("R1", "v1"), create_rule("R2", "v1")], + }], + ..Default::default() + }, + ], + ..Default::default() + }; + + let rule_results = vec![ + create_rule_result("R1", "v1"), + create_rule_result("R2", "v1"), + ]; + let input_rule = create_rule_result("R1", "v1"); + + let (result, count) = aggregate_rules(&rule_results, &routing, &input_rule).unwrap(); + + assert_eq!(count, 2); // R1, R2 appear in multiple typologies, but unique rules are 2 + assert_eq!(result.len(), 2); // T1 (R1) and T3 (R1 & R2) + assert_eq!(result[0].id, "T1"); + assert_eq!(result[1].id, "T3"); + } +} diff --git a/crates/typologies/src/processor/typology/evaluate_expression.rs b/crates/typologies/src/processor/typology/evaluate_expression.rs new file mode 100644 index 0000000..844011e --- /dev/null +++ b/crates/typologies/src/processor/typology/evaluate_expression.rs @@ -0,0 +1,171 @@ +use anyhow::Result; +use tracing::warn; +use warden_core::{configuration::typology::TypologyConfiguration, message::TypologyResult}; + +pub(super) fn evaluate_expression( + typology_result: &mut TypologyResult, + typology_config: &TypologyConfiguration, +) -> Result<f64> { + let mut to_return = 0.0; + let expression = typology_config + .expression + .as_ref() + .expect("expression is missing"); + + let rule_values = &typology_config.rules; + + for rule in expression.terms.iter() { + let rule_result = typology_result + .rule_results + .iter() + .find(|value| value.id.eq(&rule.id) && value.version.eq(&rule.version)); + + if rule_result.is_none() { + warn!(term = ?rule, "could not find rule result for typology term"); + return Ok(Default::default()); + } + + let rule_result = rule_result.expect("checked and is some"); + + let weight = rule_values + .iter() + .filter_map(|rv| { + if !(rv.id.eq(&rule_result.id) && rv.version.eq(&rule_result.version)) { + None + } else { + rv.wghts.iter().find_map(|value| { + match value.r#ref.eq(&rule_result.sub_rule_ref) { + true => Some(value.wght), + false => None, + } + }) + } + }) + .next(); + + if weight.is_none() { + warn!(rule = ?rule, "could not find a weight for the matching rule"); + } + let weight = weight.unwrap_or_default(); + + to_return = match expression.operator() { + warden_core::configuration::typology::Operator::Add => to_return + weight, + warden_core::configuration::typology::Operator::Multiply => to_return * weight, + warden_core::configuration::typology::Operator::Subtract => to_return - weight, + warden_core::configuration::typology::Operator::Divide => { + if weight.ne(&0.0) { + to_return / weight + } else { + to_return + } + } + }; + } + Ok(to_return) +} + +#[cfg(test)] +mod tests { + use warden_core::{ + configuration::typology::{Expression, Operator, Term, TypologyRule, TypologyRuleWeight}, + message::RuleResult, + }; + + use super::*; + + fn make_rule_result(id: &str, version: &str, sub_ref: &str) -> RuleResult { + RuleResult { + id: id.to_string(), + version: version.to_string(), + sub_rule_ref: sub_ref.to_string(), + ..Default::default() + } + } + + fn make_rule_value(id: &str, version: &str, ref_name: &str, weight: f64) -> TypologyRule { + TypologyRule { + id: id.to_string(), + version: version.to_string(), + wghts: vec![TypologyRuleWeight { + r#ref: ref_name.to_string(), + wght: weight, + }], + } + } + + fn make_expression(terms: Vec<(&str, &str)>, op: Operator) -> Expression { + Expression { + terms: terms + .into_iter() + .map(|(id, version)| Term { + id: id.to_string(), + version: version.to_string(), + }) + .collect(), + operator: op.into(), + } + } + + #[test] + fn test_add_operator_multiple_terms() { + let mut typology_result = TypologyResult { + rule_results: vec![ + make_rule_result("R1", "v1", "sub1"), + make_rule_result("R2", "v1", "sub2"), + ], + ..Default::default() + }; + + let config = TypologyConfiguration { + expression: Some(make_expression( + vec![("R1", "v1"), ("R2", "v1")], + Operator::Add, + )), + rules: vec![ + make_rule_value("R1", "v1", "sub1", 10.0), + make_rule_value("R2", "v1", "sub2", 5.0), + ], + ..Default::default() + }; + + let result = evaluate_expression(&mut typology_result, &config).unwrap(); + assert_eq!(result, 15.0); + } + + #[test] + fn test_missing_rule_result_returns_zero() { + let mut typology_result = TypologyResult { + rule_results: vec![make_rule_result("R1", "v1", "sub1")], + ..Default::default() + }; + + let config = TypologyConfiguration { + expression: Some(make_expression( + vec![("R1", "v1"), ("R2", "v1")], + Operator::Add, + )), + rules: vec![make_rule_value("R1", "v1", "sub1", 10.0)], + ..Default::default() + }; + + let result = evaluate_expression(&mut typology_result, &config).unwrap(); + assert_eq!(result, 0.0); + } + + #[test] + fn test_missing_weight_defaults_to_zero() { + let mut typology_result = TypologyResult { + rule_results: vec![make_rule_result("R1", "v1", "subX")], // sub_ref doesn't match + ..Default::default() + }; + + let config = TypologyConfiguration { + expression: Some(make_expression(vec![("R1", "v1")], Operator::Add)), + rules: vec![make_rule_value("R1", "v1", "sub1", 10.0)], // different ref + ..Default::default() + }; + + let result = evaluate_expression(&mut typology_result, &config).unwrap(); + assert_eq!(result, 0.0); + } +} diff --git a/crates/typologies/src/state.rs b/crates/typologies/src/state.rs new file mode 100644 index 0000000..23e08d9 --- /dev/null +++ b/crates/typologies/src/state.rs @@ -0,0 +1,55 @@ +use std::sync::Arc; + +use async_nats::jetstream::Context; +use moka::future::Cache; +use tokio::sync::RwLock; +use tonic::transport::Endpoint; +use tracing::error; +use warden_core::configuration::typology::{ + TypologyConfiguration, TypologyConfigurationRequest, + query_typologies_client::QueryTypologiesClient, +}; +use warden_stack::{Configuration, cache::RedisManager}; + +use crate::cnfg::LocalConfig; +use warden_middleware::grpc::interceptor::{Intercepted, MyInterceptor}; + +#[derive(Clone)] +pub struct Services { + pub jetstream: Context, + pub cache: RedisManager, +} + +pub type AppHandle = Arc<AppState>; + +#[derive(Clone)] +pub struct AppState { + pub services: Services, + pub local_cache: Arc<RwLock<Cache<TypologyConfigurationRequest, TypologyConfiguration>>>, + pub config: LocalConfig, + pub query_typology_client: QueryTypologiesClient<Intercepted>, +} + +impl AppState { + pub async fn new(services: Services, configuration: Configuration) -> anyhow::Result<Self> { + let config: LocalConfig = serde_json::from_value(configuration.misc.clone())?; + let channel = Endpoint::new(config.config_endpoint.to_string())? + .connect() + .await + .inspect_err(|e| { + error!( + endpoint = ?config.config_endpoint, + "could not connect to config service: {e}", + ) + })?; + + let query_typology_client = QueryTypologiesClient::with_interceptor(channel, MyInterceptor); + + Ok(Self { + services, + config, + local_cache: Arc::new(RwLock::new(Cache::builder().build())), + query_typology_client, + }) + } +} diff --git a/crates/typologies/typologies.toml b/crates/typologies/typologies.toml new file mode 100644 index 0000000..53dfa84 --- /dev/null +++ b/crates/typologies/typologies.toml @@ -0,0 +1,32 @@ +[application] +env = "development" + +[monitoring] +log-level = "warden_typologies=trace,info" +opentelemetry-endpoint = "http://localhost:4317" +loki-endpoint = "http://localhost:3100" + +[misc] +config-endpoint = "http://localhost:1304" + +[misc.nats] +stream-name = "typology" +subjects = ["typology-rule.>"] +destination-prefix = "tadp" +max-messages = 10000 +durable-name = "typologies" + +[misc.nats.config] +stream = "configuration" +reload-subject = "configuration.reload" + +[nats] +hosts = ["nats://localhost:4222"] + +[cache] +dsn = "redis://localhost:6379" +pooled = true +type = "non-clustered" # clustered, non-clustered or sentinel +max-connections = 100 + +# vim:ft=toml diff --git a/crates/warden/src/main.rs b/crates/warden/src/main.rs index 90fedfd..ecc9cbc 100644 --- a/crates/warden/src/main.rs +++ b/crates/warden/src/main.rs @@ -5,10 +5,14 @@ mod state; mod version; use std::net::{Ipv6Addr, SocketAddr}; +use tokio::signal; use clap::{Parser, command}; use tracing::{error, info, trace}; -use warden_stack::{Configuration, Services, tracing::Tracing}; +use warden_stack::{ + Configuration, Services, + tracing::{SdkTracerProvider, Tracing}, +}; use crate::state::AppState; @@ -42,6 +46,8 @@ async fn main() -> Result<(), error::AppError> { .loki(&config.application, &config.monitoring)? .build(&config.monitoring); + let provider = tracing.otel_provider; + tokio::spawn(tracing.loki_task); let mut services = Services::builder() @@ -91,7 +97,37 @@ async fn main() -> Result<(), error::AppError> { info!(port = addr.port(), "starting warden"); let router = server::router(state).merge(server::metrics_app()); - axum::serve(listener, router).await?; + axum::serve(listener, router) + .with_graceful_shutdown(shutdown_signal(provider)) + .await?; Ok(()) } + +async fn shutdown_signal(provider: SdkTracerProvider) { + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + }; + + #[cfg(unix)] + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("failed to install signal handler") + .recv() + .await; + }; + + #[cfg(not(unix))] + let terminate = std::future::pending::<()>(); + + tokio::select! { + _ = ctrl_c => {}, + _ = terminate => {}, + } + + provider + .shutdown() + .expect("failed to shutdown trace provider"); +} diff --git a/lib/warden-core/src/configuration/conv.rs b/lib/warden-core/src/configuration/conv.rs index cbb1e88..7f982b4 100644 --- a/lib/warden-core/src/configuration/conv.rs +++ b/lib/warden-core/src/configuration/conv.rs @@ -157,3 +157,150 @@ impl TryFrom<String> for Operator { Operator::from_str_name(&value).ok_or_else(|| format!("unsupported operator: {}", value)) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::google::protobuf::{Value, value::Kind}; + use serde::{Deserialize, Serialize}; + use serde_json::{self, json}; + + #[derive(Deserialize, Serialize)] + struct OpWrap { + #[serde(with = "super::operator_serde")] + op: i32, + } + + fn normalize_json_numbers(val: serde_json::Value) -> serde_json::Value { + match val { + serde_json::Value::Array(arr) => { + serde_json::Value::Array(arr.into_iter().map(normalize_json_numbers).collect()) + } + serde_json::Value::Object(map) => serde_json::Value::Object( + map.into_iter() + .map(|(k, v)| (k, normalize_json_numbers(v))) + .collect(), + ), + serde_json::Value::Number(n) => { + if let Some(f) = n.as_f64() { + serde_json::Value::Number( + serde_json::Number::from_f64(f) + .unwrap_or_else(|| serde_json::Number::from(0)), + ) + } else { + serde_json::Value::Number(n) + } + } + other => other, + } + } + + #[test] + fn test_json_to_protobuf_value_null() { + let json = serde_json::Value::Null; + let protobuf_value = Value::try_from(json).unwrap(); + assert!(matches!(protobuf_value.kind.unwrap(), Kind::NullValue(_))); + } + + #[test] + fn test_json_to_protobuf_value_bool() { + let json = serde_json::Value::Bool(true); + let protobuf_value = Value::try_from(json).unwrap(); + assert_eq!(protobuf_value.kind.unwrap(), Kind::BoolValue(true)); + } + + #[test] + fn test_json_to_protobuf_value_number() { + let json = serde_json::Value::Number(serde_json::Number::from(42)); + let protobuf_value = Value::try_from(json).unwrap(); + assert_eq!(protobuf_value.kind.unwrap(), Kind::NumberValue(42.0)); + } + + #[test] + fn test_json_to_protobuf_value_string() { + let json = serde_json::Value::String("hello".to_string()); + let protobuf_value = Value::try_from(json).unwrap(); + assert_eq!( + protobuf_value.kind.unwrap(), + Kind::StringValue("hello".to_string()) + ); + } + + #[test] + fn test_json_to_protobuf_value_array() { + let json = json!([1, 2, "three"]); + let protobuf_value = Value::try_from(json).unwrap(); + if let Kind::ListValue(list) = protobuf_value.kind.unwrap() { + assert_eq!(list.values.len(), 3); + } else { + panic!("Expected ListValue"); + } + } + + #[test] + fn test_json_to_protobuf_value_object() { + let json = json!({ + "a": 1, + "b": true, + "c": "hello" + }); + let protobuf_value = Value::try_from(json).unwrap(); + if let Kind::StructValue(s) = protobuf_value.kind.unwrap() { + assert_eq!( + s.fields["a"].kind.as_ref().unwrap(), + &Kind::NumberValue(1.0) + ); + assert_eq!(s.fields["b"].kind.as_ref().unwrap(), &Kind::BoolValue(true)); + assert_eq!( + s.fields["c"].kind.as_ref().unwrap(), + &Kind::StringValue("hello".to_string()) + ); + } else { + panic!("Expected StructValue"); + } + } + + #[test] + fn test_protobuf_to_json_roundtrip() { + let original = json!({ + "x": 1, + "y": [true, null, "str"], + "z": { + "nested": 3.14 + } + }); + + let protobuf_value = Value::try_from(original.clone()).unwrap(); + let json_value: serde_json::Value = protobuf_value.into(); + + assert_eq!( + normalize_json_numbers(original), + normalize_json_numbers(json_value) + ); + } + + #[test] + fn test_operator_serialization() { + let wrap = OpWrap { + op: Operator::Add as i32, // Replace with actual enum variant + }; + + let s = serde_json::to_string(&wrap).unwrap(); + assert!(s.contains("ADD")); // Assuming .as_str_name() gives "ADD" + } + + #[test] + fn test_operator_deserialization() { + let json_data = json!({ "op": "ADD" }).to_string(); + let wrap: OpWrap = serde_json::from_str(&json_data).unwrap(); + + assert_eq!(wrap.op, Operator::Add as i32); // Replace with actual enum variant + } + + #[test] + fn test_operator_invalid_deserialization() { + let json_data = json!({ "op": "UNKNOWN_OP" }).to_string(); + let result: Result<OpWrap, _> = serde_json::from_str(&json_data); + assert!(result.is_err()); + } +} diff --git a/lib/warden-core/src/google/parser/dt.rs b/lib/warden-core/src/google/parser/dt.rs index 0e57833..7e61c1f 100644 --- a/lib/warden-core/src/google/parser/dt.rs +++ b/lib/warden-core/src/google/parser/dt.rs @@ -103,6 +103,97 @@ mod date { ) } } + + #[cfg(test)] + mod tests { + use super::*; + use time::{Month, OffsetDateTime}; + + use time::Date as TimeDate; + + #[test] + fn converts_dates() { + let d = TimeDate::from_calendar_date(2023, Month::August, 17).unwrap(); + let date: Date = d.into(); + + let time_date = TimeDate::try_from(date); + + assert!(time_date.is_ok()); + } + + #[test] + fn converts_regular_date_no_time() { + let d = TimeDate::from_calendar_date(2023, Month::August, 17).unwrap(); + let date: Date = d.into(); + + assert_eq!(date.year, 2023); + assert_eq!(date.month, 8); + assert_eq!(date.day, 17); + } + + #[test] + fn converts_leap_year_date() { + let d = TimeDate::from_calendar_date(2020, Month::February, 29).unwrap(); + let date: Date = d.into(); + + assert_eq!(date.year, 2020); + assert_eq!(date.month, 2); + assert_eq!(date.day, 29); + } + + #[test] + fn converts_minimum_date() { + let d = TimeDate::MIN; // Year -9999-01-01 + let date: Date = d.into(); + + assert_eq!(date.year, -9999); + assert_eq!(date.month, 1); + assert_eq!(date.day, 1); + } + + #[test] + fn converts_maximum_date() { + let d = TimeDate::MAX; // Year 9999-12-31 + let date: Date = d.into(); + + assert_eq!(date.year, 9999); + assert_eq!(date.month, 12); + assert_eq!(date.day, 31); + } + + #[test] + fn converts_regular_date() { + let dt = OffsetDateTime::from_unix_timestamp(1_600_000_000).unwrap(); // 2020-09-13 UTC + let date: Date = dt.into(); + + assert_eq!(date.year, 2020); + assert_eq!(date.month, 9); + assert_eq!(date.day, 13); + } + + #[test] + fn converts_leap_year_feb_29() { + let dt = OffsetDateTime::new_utc( + time::Date::from_calendar_date(2020, Month::February, 29).unwrap(), + time::Time::from_hms(0, 0, 0).unwrap(), + ); + let date: Date = dt.into(); + + assert_eq!(date.year, 2020); + assert_eq!(date.month, 2); + assert_eq!(date.day, 29); + } + + #[test] + fn converts_first_day_of_epoch() { + let dt = OffsetDateTime::UNIX_EPOCH; // 1970-01-01 + let date: Date = dt.into(); + + assert_eq!(date.year, 1970); + assert_eq!(date.month, 1); + assert_eq!(date.day, 1); + } + } } #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] diff --git a/lib/warden-core/src/lib.rs b/lib/warden-core/src/lib.rs index c97bef3..80fb34d 100644 --- a/lib/warden-core/src/lib.rs +++ b/lib/warden-core/src/lib.rs @@ -22,6 +22,7 @@ pub mod iso20022; /// Message in transit #[allow(missing_docs)] +#[allow(clippy::large_enum_variant)] #[cfg(feature = "message")] pub mod message; |