aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.github/codecov.yml4
-rw-r--r--.github/workflows/ci.yaml24
-rw-r--r--Cargo.lock29
-rw-r--r--crates/configuration/src/main.rs38
-rw-r--r--crates/rule-executor/src/main.rs2
-rw-r--r--crates/typologies/.dockerignore5
-rw-r--r--crates/typologies/Cargo.toml35
-rw-r--r--crates/typologies/Dockerfile26
-rw-r--r--crates/typologies/src/cnfg.rs27
-rw-r--r--crates/typologies/src/main.rs66
-rw-r--r--crates/typologies/src/processor.rs124
-rw-r--r--crates/typologies/src/processor/driver.rs40
-rw-r--r--crates/typologies/src/processor/publish.rs44
-rw-r--r--crates/typologies/src/processor/reload.rs71
-rw-r--r--crates/typologies/src/processor/typology.rs180
-rw-r--r--crates/typologies/src/processor/typology/aggregate_rules.rs202
-rw-r--r--crates/typologies/src/processor/typology/evaluate_expression.rs171
-rw-r--r--crates/typologies/src/state.rs55
-rw-r--r--crates/typologies/typologies.toml32
-rw-r--r--crates/warden/src/main.rs40
-rw-r--r--lib/warden-core/src/configuration/conv.rs147
-rw-r--r--lib/warden-core/src/google/parser/dt.rs91
-rw-r--r--lib/warden-core/src/lib.rs1
23 files changed, 1429 insertions, 25 deletions
diff --git a/.github/codecov.yml b/.github/codecov.yml
index a51dfc6..dc4c65c 100644
--- a/.github/codecov.yml
+++ b/.github/codecov.yml
@@ -7,7 +7,3 @@ coverage:
project:
default:
threshold: 1%
-
-comment:
- layout: "files"
- require_changes: true
diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml
index db90ef4..56cfdda 100644
--- a/.github/workflows/ci.yaml
+++ b/.github/workflows/ci.yaml
@@ -51,6 +51,7 @@ jobs:
- router
- rule-executor
- aggregator
+ - typologies
name: dockerfile / ${{ matrix.crate }}
steps:
- uses: actions/checkout@v5
@@ -166,19 +167,22 @@ jobs:
for processor in warden pseudonyms aggregator configuration; do
cp crates/$processor/.env.example crates/$processor/.env
done
- - name: Collect coverage data
- # Generate separate reports for nextest and doctests, and combine them.
- run: |
- cargo llvm-cov nextest --workspace --locked --all-features --lcov --output-path lcov.info
- # switch to nightly
- # cargo llvm-cov --no-report nextest
- # cargo llvm-cov --no-report --doc
- # cargo llvm-cov report --doctests --lcov --output-path lcov.info
- - name: Upload coverage reports to Codecov
+ # - name: Collect coverage data
+ # # Generate separate reports for nextest and doctests, and combine them.
+ # run: |
+ # cargo llvm-cov nextest --workspace --locked --all-features --lcov --output-path lcov.info
+ # # switch to nightly
+ # # cargo llvm-cov --no-report nextest
+ # # cargo llvm-cov --no-report --doc
+ # # cargo llvm-cov report --doctests --lcov --output-path lcov.info
+ - name: cargo llvm-cov
+ run: cargo llvm-cov nextest --workspace --locked --all-features --lcov --output-path lcov.info
+ - name: Upload to codecov.io
uses: codecov/codecov-action@v5
with:
+ fail_ci_if_error: true
token: ${{ secrets.CODECOV_TOKEN }}
- files: lcov.info
+ slug: ${{ github.repository }}
- name: Stop stack
if: always()
run: |
diff --git a/Cargo.lock b/Cargo.lock
index 46a6261..877acdf 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3717,10 +3717,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1dccffe3ce07af9386bfd29e80c0ab1a8205a2fc34e4bcd40364df902cfa8f3f"
[[package]]
-name = "typologies"
-version = "0.1.0"
-
-[[package]]
name = "unicase"
version = "2.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -4129,6 +4125,31 @@ dependencies = [
]
[[package]]
+name = "warden-typologies"
+version = "0.1.0"
+dependencies = [
+ "anyhow",
+ "async-nats",
+ "clap",
+ "config",
+ "futures-util",
+ "moka",
+ "opentelemetry",
+ "opentelemetry-semantic-conventions",
+ "prost 0.14.1",
+ "serde",
+ "serde_json",
+ "tokio",
+ "tonic 0.14.1",
+ "tracing",
+ "tracing-opentelemetry",
+ "uuid",
+ "warden-core",
+ "warden-middleware",
+ "warden-stack",
+]
+
+[[package]]
name = "wasi"
version = "0.11.1+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/crates/configuration/src/main.rs b/crates/configuration/src/main.rs
index 6f90b69..a7843a1 100644
--- a/crates/configuration/src/main.rs
+++ b/crates/configuration/src/main.rs
@@ -7,9 +7,13 @@ use std::net::{Ipv6Addr, SocketAddr};
use crate::{server::error::AppError, state::AppState};
use axum::http::header::CONTENT_TYPE;
use clap::Parser;
+use tokio::signal;
use tower::{make::Shared, steer::Steer};
use tracing::{error, info, trace};
-use warden_stack::{Configuration, Services, tracing::Tracing};
+use warden_stack::{
+ Configuration, Services,
+ tracing::{SdkTracerProvider, Tracing},
+};
/// warden-config
#[derive(Parser, Debug)]
@@ -114,7 +118,37 @@ async fn main() -> Result<(), AppError> {
let listener = tokio::net::TcpListener::bind(addr).await?;
info!(port = addr.port(), "starting config-api");
- axum::serve(listener, Shared::new(service)).await?;
+ axum::serve(listener, Shared::new(service))
+ .with_graceful_shutdown(shutdown_signal(provider))
+ .await?;
Ok(())
}
+
+async fn shutdown_signal(provider: SdkTracerProvider) {
+ let ctrl_c = async {
+ signal::ctrl_c()
+ .await
+ .expect("failed to install Ctrl+C handler");
+ };
+
+ #[cfg(unix)]
+ let terminate = async {
+ signal::unix::signal(signal::unix::SignalKind::terminate())
+ .expect("failed to install signal handler")
+ .recv()
+ .await;
+ };
+
+ #[cfg(not(unix))]
+ let terminate = std::future::pending::<()>();
+
+ tokio::select! {
+ _ = ctrl_c => {},
+ _ = terminate => {},
+ }
+
+ provider
+ .shutdown()
+ .expect("failed to shutdown trace provider");
+}
diff --git a/crates/rule-executor/src/main.rs b/crates/rule-executor/src/main.rs
index 18c9222..ed284c6 100644
--- a/crates/rule-executor/src/main.rs
+++ b/crates/rule-executor/src/main.rs
@@ -1,4 +1,6 @@
+#[allow(dead_code)]
mod cnfg;
+
mod processor;
mod state;
diff --git a/crates/typologies/.dockerignore b/crates/typologies/.dockerignore
new file mode 100644
index 0000000..c8cd160
--- /dev/null
+++ b/crates/typologies/.dockerignore
@@ -0,0 +1,5 @@
+/target
+.env
+.git
+.github
+/contrib
diff --git a/crates/typologies/Cargo.toml b/crates/typologies/Cargo.toml
index 207e671..38a8cb4 100644
--- a/crates/typologies/Cargo.toml
+++ b/crates/typologies/Cargo.toml
@@ -1,5 +1,5 @@
[package]
-name = "typologies"
+name = "warden-typologies"
version = "0.1.0"
edition = "2024"
license.workspace = true
@@ -8,3 +8,36 @@ documentation.workspace = true
description.workspace = true
[dependencies]
+anyhow.workspace = true
+async-nats.workspace = true
+clap = { workspace = true, features = ["derive"] }
+config = { workspace = true, features = ["toml"] }
+futures-util = { workspace = true, default-features = false }
+moka = { workspace = true, features = ["future"] }
+opentelemetry.workspace = true
+opentelemetry-semantic-conventions.workspace = true
+prost.workspace = true
+serde = { workspace = true, features = ["derive"] }
+serde_json.workspace = true
+tokio = { workspace = true, features = [
+ "macros",
+ "rt-multi-thread",
+ "signal",
+] }
+tonic.workspace = true
+tracing.workspace = true
+tracing-opentelemetry.workspace = true
+uuid = { workspace = true, features = ["v7"] }
+warden-core = { workspace = true, features = [
+ "message",
+ "configuration",
+ "serde-time",
+] }
+warden-middleware.workspace = true
+warden-stack = { workspace = true, features = [
+ "cache",
+ "nats-jetstream",
+ "opentelemetry",
+ "opentelemetry-tonic",
+ "tracing-loki",
+] }
diff --git a/crates/typologies/Dockerfile b/crates/typologies/Dockerfile
new file mode 100644
index 0000000..61bb2e2
--- /dev/null
+++ b/crates/typologies/Dockerfile
@@ -0,0 +1,26 @@
+FROM rust:1.89.0-slim AS builder
+
+ENV SQLX_OFFLINE=true
+
+RUN rustup target add x86_64-unknown-linux-musl
+RUN apt update && apt install -y musl-tools musl-dev protobuf-compiler curl
+RUN update-ca-certificates
+
+WORKDIR /usr/src/app
+
+RUN mkdir -p crates
+
+COPY ./crates/typologies crates/typologies
+COPY ./lib lib
+COPY ./Cargo.toml .
+COPY ./Cargo.lock .
+
+RUN cargo fetch
+
+COPY ./proto proto
+
+RUN cargo build --target x86_64-unknown-linux-musl --release
+
+FROM scratch
+COPY --from=builder /usr/src/app/target/x86_64-unknown-linux-musl/release/warden-typologies ./
+CMD [ "./warden-typologies" ]
diff --git a/crates/typologies/src/cnfg.rs b/crates/typologies/src/cnfg.rs
new file mode 100644
index 0000000..6086f46
--- /dev/null
+++ b/crates/typologies/src/cnfg.rs
@@ -0,0 +1,27 @@
+use std::sync::Arc;
+
+use serde::Deserialize;
+
+#[derive(Deserialize, Clone)]
+#[serde(rename_all = "kebab-case")]
+pub struct LocalConfig {
+ pub config_endpoint: Arc<str>,
+ pub nats: Nats,
+}
+
+#[derive(Deserialize, Clone)]
+#[serde(rename_all = "kebab-case")]
+pub struct Nats {
+ pub subjects: Arc<[String]>,
+ pub destination_prefix: Arc<str>,
+ pub max_messages: i64,
+ pub durable_name: Arc<str>,
+ pub config: ConfigNats,
+}
+
+#[derive(Deserialize, Clone)]
+#[serde(rename_all = "kebab-case")]
+pub struct ConfigNats {
+ pub stream: Arc<str>,
+ pub reload_subject: Arc<str>,
+}
diff --git a/crates/typologies/src/main.rs b/crates/typologies/src/main.rs
index e7a11a9..ea7843a 100644
--- a/crates/typologies/src/main.rs
+++ b/crates/typologies/src/main.rs
@@ -1,3 +1,65 @@
-fn main() {
- println!("Hello, world!");
+mod cnfg;
+mod processor;
+mod state;
+
+use anyhow::Result;
+use clap::Parser;
+use tracing::error;
+use warden_stack::{Configuration, Services, tracing::Tracing};
+
+/// typologies
+#[derive(Parser, Debug)]
+#[command(version, about, long_about = None)]
+struct Args {
+ /// Path to config file
+ #[arg(short, long)]
+ config_file: Option<std::path::PathBuf>,
+}
+
+#[tokio::main]
+async fn main() -> Result<()> {
+ let args = Args::parse();
+ let config = include_str!("../typologies.toml");
+
+ let mut config = config::Config::builder()
+ .add_source(config::File::from_str(config, config::FileFormat::Toml));
+
+ if let Some(cf) = args.config_file.as_ref().and_then(|v| v.to_str()) {
+ config = config.add_source(config::File::new(cf, config::FileFormat::Toml));
+ };
+
+ let mut config: Configuration = config.build()?.try_deserialize()?;
+ config.application.name = env!("CARGO_CRATE_NAME").into();
+ config.application.version = env!("CARGO_PKG_VERSION").into();
+
+ let tracing = Tracing::builder()
+ .opentelemetry(&config.application, &config.monitoring)?
+ .loki(&config.application, &config.monitoring)?
+ .build(&config.monitoring);
+
+ let provider = tracing.otel_provider;
+
+ tokio::spawn(tracing.loki_task);
+
+ let mut services = Services::builder()
+ .nats_jetstream(&config.nats)
+ .await
+ .inspect_err(|e| error!("nats: {e}"))?
+ .build();
+
+ let jetstream = services
+ .jetstream
+ .take()
+ .ok_or_else(|| anyhow::anyhow!("jetstream is not ready"))?;
+
+ let cache = services
+ .cache
+ .take()
+ .ok_or_else(|| anyhow::anyhow!("cache is not ready"))?;
+
+ let services = state::Services { jetstream, cache };
+
+ processor::serve(services, config, provider)
+ .await
+ .inspect_err(|e| error!("{e}"))
}
diff --git a/crates/typologies/src/processor.rs b/crates/typologies/src/processor.rs
new file mode 100644
index 0000000..a2a3b17
--- /dev/null
+++ b/crates/typologies/src/processor.rs
@@ -0,0 +1,124 @@
+mod driver;
+mod publish;
+mod reload;
+mod typology;
+
+use std::sync::Arc;
+
+use anyhow::Result;
+use async_nats::jetstream::{
+ Context,
+ consumer::{Consumer, pull},
+};
+use futures_util::{StreamExt, future};
+use tokio::signal;
+use tracing::{error, trace, warn};
+use warden_stack::{Configuration, tracing::SdkTracerProvider};
+
+use crate::{
+ cnfg::Nats,
+ state::{AppHandle, AppState, Services},
+};
+
+pub async fn serve(
+ services: Services,
+ config: Configuration,
+ provider: SdkTracerProvider,
+) -> anyhow::Result<()> {
+ let state = Arc::new(AppState::new(services, config).await?);
+
+ tokio::select! {
+ _ = futures_util::future::try_join(reload::reload(Arc::clone(&state)), run(state)) => {}
+ _ = shutdown_signal(provider) => {}
+ };
+
+ Ok(())
+}
+
+async fn run(state: AppHandle) -> anyhow::Result<()> {
+ let consumer = get_or_create_stream(&state.services.jetstream, &state.config.nats).await?;
+
+ let limit = None;
+
+ consumer
+ .messages()
+ .await?
+ .for_each_concurrent(limit, |message| {
+ let state = Arc::clone(&state);
+ tokio::spawn(async move {
+ match message {
+ Ok(message) => {
+ if let Err(e) = typology::process_typology(message, state).await {
+ error!("{e:?}");
+ }
+ }
+ Err(e) => {
+ warn!("{e:?}");
+ }
+ }
+ });
+ future::ready(())
+ })
+ .await;
+
+ Ok(())
+}
+
+async fn get_or_create_stream(
+ jetstream: &Context,
+ nats: &Nats,
+) -> anyhow::Result<Consumer<pull::Config>> {
+ let name = format!("{}.v{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"))
+ .replace(".", "")
+ .replace("_", "");
+ trace!(name = name, subjects = ?nats.subjects, "getting or creating stream");
+
+ let stream = jetstream
+ .get_or_create_stream(async_nats::jetstream::stream::Config {
+ name,
+ subjects: nats.subjects.iter().map(Into::into).collect(),
+ max_messages: nats.max_messages,
+ ..Default::default()
+ })
+ .await?;
+ let durable = nats.durable_name.to_string();
+ // Get or create a pull-based consumer
+ Ok(stream
+ .get_or_create_consumer(
+ durable.as_ref(),
+ async_nats::jetstream::consumer::pull::Config {
+ durable_name: Some(durable.to_string()),
+ ..Default::default()
+ },
+ )
+ .await?)
+}
+
+async fn shutdown_signal(provider: SdkTracerProvider) -> Result<()> {
+ let ctrl_c = async {
+ signal::ctrl_c()
+ .await
+ .expect("failed to install Ctrl+C handler");
+ };
+
+ #[cfg(unix)]
+ let terminate = async {
+ signal::unix::signal(signal::unix::SignalKind::terminate())
+ .expect("failed to install signal handler")
+ .recv()
+ .await;
+ };
+
+ #[cfg(not(unix))]
+ let terminate = std::future::pending::<()>();
+
+ tokio::select! {
+ _ = ctrl_c => {
+ },
+ _ = terminate => {
+ },
+ }
+ let _ = provider.shutdown();
+
+ Ok(())
+}
diff --git a/crates/typologies/src/processor/driver.rs b/crates/typologies/src/processor/driver.rs
new file mode 100644
index 0000000..d150620
--- /dev/null
+++ b/crates/typologies/src/processor/driver.rs
@@ -0,0 +1,40 @@
+use tonic::IntoRequest;
+use warden_core::configuration::typology::{TypologyConfiguration, TypologyConfigurationRequest};
+
+use crate::state::AppHandle;
+
+pub trait GetTypologyConfiguration {
+ fn get_typology_config(
+ &self,
+ typology_key: TypologyConfigurationRequest,
+ ) -> impl std::future::Future<Output = anyhow::Result<TypologyConfiguration>> + Send;
+}
+
+impl GetTypologyConfiguration for AppHandle {
+ async fn get_typology_config(
+ &self,
+ typology_key: TypologyConfigurationRequest,
+ ) -> anyhow::Result<TypologyConfiguration> {
+ {
+ let local_cache = self.local_cache.read().await;
+ if let Some(result) = local_cache.get(&typology_key).await.map(Ok) {
+ return result;
+ }
+ }
+
+ let local_cache = self.local_cache.write().await;
+ let mut client = self.query_typology_client.clone();
+
+ let value = client
+ .get_typology_configuration(typology_key.clone().into_request())
+ .await?
+ .into_inner()
+ .configuration
+ .ok_or_else(|| anyhow::anyhow!("configuration unavailable"))?;
+ local_cache
+ .insert(typology_key.clone(), value.clone())
+ .await;
+
+ Ok(value)
+ }
+}
diff --git a/crates/typologies/src/processor/publish.rs b/crates/typologies/src/processor/publish.rs
new file mode 100644
index 0000000..b031bf3
--- /dev/null
+++ b/crates/typologies/src/processor/publish.rs
@@ -0,0 +1,44 @@
+use opentelemetry::global;
+use opentelemetry_semantic_conventions::attribute;
+use tracing::{Instrument, Span, debug, info_span, warn};
+use tracing_opentelemetry::OpenTelemetrySpanExt;
+use warden_core::message::Payload;
+use warden_stack::tracing::telemetry::nats::injector;
+
+use crate::state::AppHandle;
+
+pub(crate) async fn to_tadp(
+ subject: &str,
+ state: AppHandle,
+ payload: Payload,
+) -> anyhow::Result<()> {
+ // send transaction to next with nats
+ let subject = format!("{}.{}", state.config.nats.destination_prefix, subject);
+ debug!(subject = ?subject, "publishing");
+
+ let payload = prost::Message::encode_to_vec(&payload);
+
+ let mut headers = async_nats::HeaderMap::new();
+
+ let cx = Span::current().context();
+
+ global::get_text_map_propagator(|propagator| {
+ propagator.inject_context(&cx, &mut injector::HeaderMap(&mut headers))
+ });
+
+ let span = info_span!("nats.publish");
+ span.set_attribute(
+ attribute::MESSAGING_DESTINATION_SUBSCRIPTION_NAME,
+ subject.to_string(),
+ );
+ span.set_attribute(attribute::MESSAGING_SYSTEM, "nats");
+ state
+ .services
+ .jetstream
+ .publish_with_headers(subject.clone(), headers, payload.into())
+ .instrument(span)
+ .await
+ .inspect_err(|e| warn!(subject = ?subject, "failed to publish: {e}"))?;
+
+ Ok(())
+}
diff --git a/crates/typologies/src/processor/reload.rs b/crates/typologies/src/processor/reload.rs
new file mode 100644
index 0000000..fac4c40
--- /dev/null
+++ b/crates/typologies/src/processor/reload.rs
@@ -0,0 +1,71 @@
+use async_nats::jetstream::consumer;
+use futures_util::StreamExt;
+use prost::Message as _;
+use tracing::{error, info, trace};
+use uuid::Uuid;
+use warden_core::configuration::{ConfigKind, ReloadEvent, typology::TypologyConfigurationRequest};
+
+use crate::state::AppHandle;
+
+pub async fn reload(state: AppHandle) -> anyhow::Result<()> {
+ let id = Uuid::now_v7().to_string();
+ info!(durable = id, "listening for configuration changes");
+
+ let durable = &id;
+ let consumer = state
+ .services
+ .jetstream
+ .get_stream(state.config.nats.config.stream.to_string())
+ .await?
+ .get_or_create_consumer(
+ durable,
+ consumer::pull::Config {
+ durable_name: Some(durable.to_string()),
+ filter_subject: state.config.nats.config.reload_subject.to_string(),
+ deliver_policy: consumer::DeliverPolicy::LastPerSubject,
+ ..Default::default()
+ },
+ )
+ .await?;
+
+ let mut messages = consumer.messages().await?;
+ while let Some(value) = messages.next().await {
+ match value {
+ Ok(message) => {
+ trace!("got reload cache event");
+ if let Ok(res) = ReloadEvent::decode(message.payload.as_ref())
+ && let Ok(kind) = ConfigKind::try_from(res.kind)
+ {
+ match kind {
+ ConfigKind::Typology => {
+ let local_cache = state.local_cache.write().await;
+ let id = res.id();
+ let version = res.version();
+ trace!(
+ id = id,
+ ver = version,
+ "update triggered, invalidating typology config"
+ );
+ let key = TypologyConfigurationRequest {
+ id: id.to_string(),
+ version: version.to_string(),
+ };
+
+ local_cache.invalidate(&key).await;
+ let _ = message.ack().await.inspect_err(|e| error!("{e}"));
+ }
+ _ => {
+ trace!(kind = ?kind, "detected reload event, nothing to do here, acknowledging");
+ let _ = message.ack().await.inspect_err(|e| error!("{e}"));
+ }
+ }
+ }
+ }
+ Err(e) => {
+ error!("{e:?}")
+ }
+ }
+ }
+
+ Ok(())
+}
diff --git a/crates/typologies/src/processor/typology.rs b/crates/typologies/src/processor/typology.rs
new file mode 100644
index 0000000..b1b2592
--- /dev/null
+++ b/crates/typologies/src/processor/typology.rs
@@ -0,0 +1,180 @@
+mod aggregate_rules;
+mod evaluate_expression;
+
+use std::sync::Arc;
+
+use anyhow::Result;
+use opentelemetry::global;
+use prost::Message;
+use tracing::{Instrument, Span, error, info, info_span, instrument, warn};
+use tracing_opentelemetry::OpenTelemetrySpanExt;
+use warden_core::{
+ configuration::{routing::RoutingConfiguration, typology::TypologyConfigurationRequest},
+ message::{Payload, RuleResult, TypologyResult},
+};
+use warden_stack::{redis::AsyncCommands, tracing::telemetry::nats::extractor};
+
+use crate::{
+ processor::{driver::GetTypologyConfiguration as _, publish},
+ state::AppHandle,
+};
+
+#[instrument(skip(message, state), err(Debug))]
+pub async fn process_typology(
+ message: async_nats::jetstream::Message,
+ state: AppHandle,
+) -> Result<()> {
+ let span = Span::current();
+
+ if let Some(ref headers) = message.headers {
+ let context = global::get_text_map_propagator(|propagator| {
+ propagator.extract(&extractor::HeaderMap(headers))
+ });
+ span.set_parent(context);
+ };
+
+ let payload: Payload = Message::decode(message.payload.as_ref())?;
+
+ if payload.transaction.is_none() {
+ warn!("transaction is empty - proceeding with ack");
+ let _ = message.ack().await;
+ return Ok(());
+ }
+
+ let transaction = payload.transaction.as_ref().expect("to have returned");
+
+ match transaction {
+ warden_core::message::payload::Transaction::Pacs008(_) => {
+ warn!("Pacs008 is unsupported on this version: this should be unreachable");
+ }
+ warden_core::message::payload::Transaction::Pacs002(pacs002_document) => {
+ let key = format!(
+ "tp_{}",
+ pacs002_document.f_i_to_f_i_pmt_sts_rpt.grp_hdr.msg_id
+ );
+
+ let rule_result = &payload
+ .rule_result
+ .as_ref()
+ .expect("rule result should be here");
+ let rule_results = cache_and_get_all(&key, rule_result, Arc::clone(&state)).await?;
+
+ let routing = payload
+ .routing
+ .as_ref()
+ .expect("routing missing from payload");
+
+ let (mut typology_result, _rule_count) =
+ aggregate_rules::aggregate_rules(&rule_results, routing, rule_result)?;
+
+ let _ = evaluate_typology(&mut typology_result, routing, payload.clone(), &key, state)
+ .await
+ .inspect_err(|e| error!("{e}"));
+ }
+ };
+
+ let span = info_span!("nats.ack");
+ message
+ .ack()
+ .instrument(span)
+ .await
+ .map_err(|_| anyhow::anyhow!("ack error"))?;
+
+ Ok(())
+}
+
+#[instrument(skip(typology_result, routing, payload, state), err(Debug))]
+async fn evaluate_typology(
+ typology_result: &mut [TypologyResult],
+ routing: &RoutingConfiguration,
+ mut payload: Payload,
+ key: &str,
+ state: AppHandle,
+) -> Result<()> {
+ for typology_result in typology_result.iter_mut() {
+ let handle = Arc::clone(&state);
+ let routing_rules = routing.messages[0].typologies.iter().find(|typology| {
+ typology.version.eq(&typology_result.version) && typology.id.eq(&typology_result.id)
+ });
+ let typology_result_rules = &typology_result.rule_results;
+
+ if routing_rules.is_some()
+ && typology_result_rules.len() < routing_rules.unwrap().rules.len()
+ {
+ continue;
+ }
+
+ let typology_config = handle
+ .get_typology_config(TypologyConfigurationRequest {
+ id: typology_result.id.to_owned(),
+ version: typology_result.version.to_owned(),
+ })
+ .await?;
+
+ let result = evaluate_expression::evaluate_expression(typology_result, &typology_config)?;
+
+ typology_result.result = result;
+
+ let workflow = typology_config
+ .workflow
+ .as_ref()
+ .expect("no workflow in config");
+
+ if workflow.interdiction_threshold.is_some() {
+ typology_result.workflow.replace(*workflow);
+ }
+ typology_result.review = result.ge(&typology_config.workflow.unwrap().alert_threshold);
+
+ payload.typology_result = Some(typology_result.to_owned());
+
+ let is_interdicting = typology_config
+ .workflow
+ .unwrap()
+ .interdiction_threshold
+ .is_some_and(|value| value > 0.0 && result >= value);
+
+ if is_interdicting {
+ typology_result.review = true;
+ }
+
+ if result >= typology_config.workflow.unwrap().alert_threshold {
+ info!("alerting");
+ }
+
+ let subj = handle.config.nats.destination_prefix.to_string();
+ let _ = publish::to_tadp(&subj, handle, payload.clone())
+ .await
+ .inspect_err(|e| error!("{e}"));
+
+ let mut c = state.services.cache.get().await?;
+ c.del::<_, ()>(key).await?;
+ }
+
+ Ok(())
+}
+
+async fn cache_and_get_all(
+ cache_key: &str,
+ rule_result: &RuleResult,
+ state: AppHandle,
+) -> Result<Vec<RuleResult>> {
+ let mut cache = state.services.cache.get().await?;
+
+ let bytes = prost::Message::encode_to_vec(rule_result);
+
+ let res = warden_stack::redis::pipe()
+ .sadd::<_, _>(cache_key, bytes)
+ .ignore()
+ .smembers(cache_key)
+ .query_async::<Vec<Vec<Vec<u8>>>>(&mut cache)
+ .await?;
+
+ let members = res
+ .first()
+ .ok_or_else(|| anyhow::anyhow!("smembers did not return anything"))?;
+
+ members
+ .iter()
+ .map(|value| RuleResult::decode(value.as_ref()).map_err(anyhow::Error::new))
+ .collect()
+}
diff --git a/crates/typologies/src/processor/typology/aggregate_rules.rs b/crates/typologies/src/processor/typology/aggregate_rules.rs
new file mode 100644
index 0000000..8f92dae
--- /dev/null
+++ b/crates/typologies/src/processor/typology/aggregate_rules.rs
@@ -0,0 +1,202 @@
+use anyhow::Result;
+use std::collections::HashSet;
+
+use warden_core::{
+ configuration::routing::RoutingConfiguration,
+ message::{RuleResult, TypologyResult},
+};
+
+pub(super) fn aggregate_rules(
+ rule_results: &[RuleResult],
+ routing: &RoutingConfiguration,
+ rule_result: &RuleResult,
+) -> Result<(Vec<TypologyResult>, usize)> {
+ let mut typology_result: Vec<TypologyResult> = vec![];
+ let mut all_rules_set = HashSet::new();
+
+ routing.messages.iter().for_each(|message| {
+ message.typologies.iter().for_each(|typology| {
+ let mut set = HashSet::new();
+
+ for rule in typology.rules.iter() {
+ set.insert((&rule.id, rule.version()));
+ all_rules_set.insert((&rule.id, rule.version()));
+ }
+
+ if !set.contains(&(&rule_result.id, rule_result.version.as_str())) {
+ return;
+ }
+
+ let rule_results: Vec<_> = rule_results
+ .iter()
+ .filter_map(|value| {
+ if set.contains(&(&value.id, &value.version)) {
+ Some(value.to_owned())
+ } else {
+ None
+ }
+ })
+ .collect();
+
+ if !rule_results.is_empty() {
+ typology_result.push(TypologyResult {
+ id: typology.id.to_owned(),
+ version: typology.version.to_owned(),
+ rule_results,
+ ..Default::default()
+ });
+ }
+ });
+ });
+
+ Ok((typology_result, all_rules_set.len()))
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use warden_core::{
+ configuration::routing::{Message, RoutingConfiguration, Rule, Typology},
+ message::RuleResult,
+ };
+
+ fn create_rule(id: &str, version: &str) -> Rule {
+ Rule {
+ id: id.to_string(),
+ version: Some(version.to_string()),
+ }
+ }
+
+ fn create_rule_result(id: &str, version: &str) -> RuleResult {
+ RuleResult {
+ id: id.to_string(),
+ version: version.to_string(),
+ ..Default::default()
+ }
+ }
+
+ #[test]
+ fn returns_empty_when_no_matching_typology() {
+ let routing = RoutingConfiguration {
+ messages: vec![Message {
+ typologies: vec![Typology {
+ id: "T1".to_string(),
+ version: "v1".to_string(),
+ rules: vec![create_rule("R1", "v1")],
+ }],
+ ..Default::default()
+ }],
+ ..Default::default()
+ };
+
+ let rule_results = vec![create_rule_result("R2", "v1")];
+ let input_rule = create_rule_result("R2", "v1");
+
+ let (result, count) = aggregate_rules(&rule_results, &routing, &input_rule).unwrap();
+ assert!(result.is_empty());
+ assert_eq!(count, 1); // one rule in routing
+ }
+
+ #[test]
+ fn returns_typology_with_matching_rule() {
+ let routing = RoutingConfiguration {
+ messages: vec![Message {
+ typologies: vec![Typology {
+ id: "T1".to_string(),
+ version: "v1".to_string(),
+ rules: vec![create_rule("R1", "v1"), create_rule("R2", "v1")],
+ }],
+ ..Default::default()
+ }],
+ ..Default::default()
+ };
+
+ let rule_results = vec![
+ create_rule_result("R1", "v1"),
+ create_rule_result("R2", "v1"),
+ ];
+
+ let input_rule = create_rule_result("R1", "v1");
+
+ let (result, count) = aggregate_rules(&rule_results, &routing, &input_rule).unwrap();
+
+ assert_eq!(count, 2); // R1, R2
+ assert_eq!(result.len(), 1);
+ assert_eq!(result[0].id, "T1");
+ assert_eq!(result[0].rule_results.len(), 2);
+ }
+
+ #[test]
+ fn ignores_unrelated_rules_in_rule_results() {
+ let routing = RoutingConfiguration {
+ messages: vec![Message {
+ typologies: vec![Typology {
+ id: "T1".to_string(),
+ version: "v1".to_string(),
+ rules: vec![create_rule("R1", "v1")],
+ }],
+ ..Default::default()
+ }],
+ ..Default::default()
+ };
+
+ let rule_results = vec![
+ create_rule_result("R1", "v1"),
+ create_rule_result("R99", "v1"), // unrelated
+ ];
+
+ let input_rule = create_rule_result("R1", "v1");
+
+ let (result, count) = aggregate_rules(&rule_results, &routing, &input_rule).unwrap();
+
+ assert_eq!(count, 1);
+ assert_eq!(result.len(), 1);
+ assert_eq!(result[0].rule_results.len(), 1);
+ assert_eq!(result[0].rule_results[0].id, "R1");
+ }
+
+ #[test]
+ fn handles_multiple_messages_and_typologies() {
+ let routing = RoutingConfiguration {
+ messages: vec![
+ Message {
+ typologies: vec![
+ Typology {
+ id: "T1".to_string(),
+ version: "v1".to_string(),
+ rules: vec![create_rule("R1", "v1")],
+ },
+ Typology {
+ id: "T2".to_string(),
+ version: "v1".to_string(),
+ rules: vec![create_rule("R2", "v1")],
+ },
+ ],
+ ..Default::default()
+ },
+ Message {
+ typologies: vec![Typology {
+ id: "T3".to_string(),
+ version: "v1".to_string(),
+ rules: vec![create_rule("R1", "v1"), create_rule("R2", "v1")],
+ }],
+ ..Default::default()
+ },
+ ],
+ ..Default::default()
+ };
+
+ let rule_results = vec![
+ create_rule_result("R1", "v1"),
+ create_rule_result("R2", "v1"),
+ ];
+ let input_rule = create_rule_result("R1", "v1");
+
+ let (result, count) = aggregate_rules(&rule_results, &routing, &input_rule).unwrap();
+
+ assert_eq!(count, 2); // R1, R2 appear in multiple typologies, but unique rules are 2
+ assert_eq!(result.len(), 2); // T1 (R1) and T3 (R1 & R2)
+ assert_eq!(result[0].id, "T1");
+ assert_eq!(result[1].id, "T3");
+ }
+}
diff --git a/crates/typologies/src/processor/typology/evaluate_expression.rs b/crates/typologies/src/processor/typology/evaluate_expression.rs
new file mode 100644
index 0000000..844011e
--- /dev/null
+++ b/crates/typologies/src/processor/typology/evaluate_expression.rs
@@ -0,0 +1,171 @@
+use anyhow::Result;
+use tracing::warn;
+use warden_core::{configuration::typology::TypologyConfiguration, message::TypologyResult};
+
+pub(super) fn evaluate_expression(
+ typology_result: &mut TypologyResult,
+ typology_config: &TypologyConfiguration,
+) -> Result<f64> {
+ let mut to_return = 0.0;
+ let expression = typology_config
+ .expression
+ .as_ref()
+ .expect("expression is missing");
+
+ let rule_values = &typology_config.rules;
+
+ for rule in expression.terms.iter() {
+ let rule_result = typology_result
+ .rule_results
+ .iter()
+ .find(|value| value.id.eq(&rule.id) && value.version.eq(&rule.version));
+
+ if rule_result.is_none() {
+ warn!(term = ?rule, "could not find rule result for typology term");
+ return Ok(Default::default());
+ }
+
+ let rule_result = rule_result.expect("checked and is some");
+
+ let weight = rule_values
+ .iter()
+ .filter_map(|rv| {
+ if !(rv.id.eq(&rule_result.id) && rv.version.eq(&rule_result.version)) {
+ None
+ } else {
+ rv.wghts.iter().find_map(|value| {
+ match value.r#ref.eq(&rule_result.sub_rule_ref) {
+ true => Some(value.wght),
+ false => None,
+ }
+ })
+ }
+ })
+ .next();
+
+ if weight.is_none() {
+ warn!(rule = ?rule, "could not find a weight for the matching rule");
+ }
+ let weight = weight.unwrap_or_default();
+
+ to_return = match expression.operator() {
+ warden_core::configuration::typology::Operator::Add => to_return + weight,
+ warden_core::configuration::typology::Operator::Multiply => to_return * weight,
+ warden_core::configuration::typology::Operator::Subtract => to_return - weight,
+ warden_core::configuration::typology::Operator::Divide => {
+ if weight.ne(&0.0) {
+ to_return / weight
+ } else {
+ to_return
+ }
+ }
+ };
+ }
+ Ok(to_return)
+}
+
+#[cfg(test)]
+mod tests {
+ use warden_core::{
+ configuration::typology::{Expression, Operator, Term, TypologyRule, TypologyRuleWeight},
+ message::RuleResult,
+ };
+
+ use super::*;
+
+ fn make_rule_result(id: &str, version: &str, sub_ref: &str) -> RuleResult {
+ RuleResult {
+ id: id.to_string(),
+ version: version.to_string(),
+ sub_rule_ref: sub_ref.to_string(),
+ ..Default::default()
+ }
+ }
+
+ fn make_rule_value(id: &str, version: &str, ref_name: &str, weight: f64) -> TypologyRule {
+ TypologyRule {
+ id: id.to_string(),
+ version: version.to_string(),
+ wghts: vec![TypologyRuleWeight {
+ r#ref: ref_name.to_string(),
+ wght: weight,
+ }],
+ }
+ }
+
+ fn make_expression(terms: Vec<(&str, &str)>, op: Operator) -> Expression {
+ Expression {
+ terms: terms
+ .into_iter()
+ .map(|(id, version)| Term {
+ id: id.to_string(),
+ version: version.to_string(),
+ })
+ .collect(),
+ operator: op.into(),
+ }
+ }
+
+ #[test]
+ fn test_add_operator_multiple_terms() {
+ let mut typology_result = TypologyResult {
+ rule_results: vec![
+ make_rule_result("R1", "v1", "sub1"),
+ make_rule_result("R2", "v1", "sub2"),
+ ],
+ ..Default::default()
+ };
+
+ let config = TypologyConfiguration {
+ expression: Some(make_expression(
+ vec![("R1", "v1"), ("R2", "v1")],
+ Operator::Add,
+ )),
+ rules: vec![
+ make_rule_value("R1", "v1", "sub1", 10.0),
+ make_rule_value("R2", "v1", "sub2", 5.0),
+ ],
+ ..Default::default()
+ };
+
+ let result = evaluate_expression(&mut typology_result, &config).unwrap();
+ assert_eq!(result, 15.0);
+ }
+
+ #[test]
+ fn test_missing_rule_result_returns_zero() {
+ let mut typology_result = TypologyResult {
+ rule_results: vec![make_rule_result("R1", "v1", "sub1")],
+ ..Default::default()
+ };
+
+ let config = TypologyConfiguration {
+ expression: Some(make_expression(
+ vec![("R1", "v1"), ("R2", "v1")],
+ Operator::Add,
+ )),
+ rules: vec![make_rule_value("R1", "v1", "sub1", 10.0)],
+ ..Default::default()
+ };
+
+ let result = evaluate_expression(&mut typology_result, &config).unwrap();
+ assert_eq!(result, 0.0);
+ }
+
+ #[test]
+ fn test_missing_weight_defaults_to_zero() {
+ let mut typology_result = TypologyResult {
+ rule_results: vec![make_rule_result("R1", "v1", "subX")], // sub_ref doesn't match
+ ..Default::default()
+ };
+
+ let config = TypologyConfiguration {
+ expression: Some(make_expression(vec![("R1", "v1")], Operator::Add)),
+ rules: vec![make_rule_value("R1", "v1", "sub1", 10.0)], // different ref
+ ..Default::default()
+ };
+
+ let result = evaluate_expression(&mut typology_result, &config).unwrap();
+ assert_eq!(result, 0.0);
+ }
+}
diff --git a/crates/typologies/src/state.rs b/crates/typologies/src/state.rs
new file mode 100644
index 0000000..23e08d9
--- /dev/null
+++ b/crates/typologies/src/state.rs
@@ -0,0 +1,55 @@
+use std::sync::Arc;
+
+use async_nats::jetstream::Context;
+use moka::future::Cache;
+use tokio::sync::RwLock;
+use tonic::transport::Endpoint;
+use tracing::error;
+use warden_core::configuration::typology::{
+ TypologyConfiguration, TypologyConfigurationRequest,
+ query_typologies_client::QueryTypologiesClient,
+};
+use warden_stack::{Configuration, cache::RedisManager};
+
+use crate::cnfg::LocalConfig;
+use warden_middleware::grpc::interceptor::{Intercepted, MyInterceptor};
+
+#[derive(Clone)]
+pub struct Services {
+ pub jetstream: Context,
+ pub cache: RedisManager,
+}
+
+pub type AppHandle = Arc<AppState>;
+
+#[derive(Clone)]
+pub struct AppState {
+ pub services: Services,
+ pub local_cache: Arc<RwLock<Cache<TypologyConfigurationRequest, TypologyConfiguration>>>,
+ pub config: LocalConfig,
+ pub query_typology_client: QueryTypologiesClient<Intercepted>,
+}
+
+impl AppState {
+ pub async fn new(services: Services, configuration: Configuration) -> anyhow::Result<Self> {
+ let config: LocalConfig = serde_json::from_value(configuration.misc.clone())?;
+ let channel = Endpoint::new(config.config_endpoint.to_string())?
+ .connect()
+ .await
+ .inspect_err(|e| {
+ error!(
+ endpoint = ?config.config_endpoint,
+ "could not connect to config service: {e}",
+ )
+ })?;
+
+ let query_typology_client = QueryTypologiesClient::with_interceptor(channel, MyInterceptor);
+
+ Ok(Self {
+ services,
+ config,
+ local_cache: Arc::new(RwLock::new(Cache::builder().build())),
+ query_typology_client,
+ })
+ }
+}
diff --git a/crates/typologies/typologies.toml b/crates/typologies/typologies.toml
new file mode 100644
index 0000000..53dfa84
--- /dev/null
+++ b/crates/typologies/typologies.toml
@@ -0,0 +1,32 @@
+[application]
+env = "development"
+
+[monitoring]
+log-level = "warden_typologies=trace,info"
+opentelemetry-endpoint = "http://localhost:4317"
+loki-endpoint = "http://localhost:3100"
+
+[misc]
+config-endpoint = "http://localhost:1304"
+
+[misc.nats]
+stream-name = "typology"
+subjects = ["typology-rule.>"]
+destination-prefix = "tadp"
+max-messages = 10000
+durable-name = "typologies"
+
+[misc.nats.config]
+stream = "configuration"
+reload-subject = "configuration.reload"
+
+[nats]
+hosts = ["nats://localhost:4222"]
+
+[cache]
+dsn = "redis://localhost:6379"
+pooled = true
+type = "non-clustered" # clustered, non-clustered or sentinel
+max-connections = 100
+
+# vim:ft=toml
diff --git a/crates/warden/src/main.rs b/crates/warden/src/main.rs
index 90fedfd..ecc9cbc 100644
--- a/crates/warden/src/main.rs
+++ b/crates/warden/src/main.rs
@@ -5,10 +5,14 @@ mod state;
mod version;
use std::net::{Ipv6Addr, SocketAddr};
+use tokio::signal;
use clap::{Parser, command};
use tracing::{error, info, trace};
-use warden_stack::{Configuration, Services, tracing::Tracing};
+use warden_stack::{
+ Configuration, Services,
+ tracing::{SdkTracerProvider, Tracing},
+};
use crate::state::AppState;
@@ -42,6 +46,8 @@ async fn main() -> Result<(), error::AppError> {
.loki(&config.application, &config.monitoring)?
.build(&config.monitoring);
+ let provider = tracing.otel_provider;
+
tokio::spawn(tracing.loki_task);
let mut services = Services::builder()
@@ -91,7 +97,37 @@ async fn main() -> Result<(), error::AppError> {
info!(port = addr.port(), "starting warden");
let router = server::router(state).merge(server::metrics_app());
- axum::serve(listener, router).await?;
+ axum::serve(listener, router)
+ .with_graceful_shutdown(shutdown_signal(provider))
+ .await?;
Ok(())
}
+
+async fn shutdown_signal(provider: SdkTracerProvider) {
+ let ctrl_c = async {
+ signal::ctrl_c()
+ .await
+ .expect("failed to install Ctrl+C handler");
+ };
+
+ #[cfg(unix)]
+ let terminate = async {
+ signal::unix::signal(signal::unix::SignalKind::terminate())
+ .expect("failed to install signal handler")
+ .recv()
+ .await;
+ };
+
+ #[cfg(not(unix))]
+ let terminate = std::future::pending::<()>();
+
+ tokio::select! {
+ _ = ctrl_c => {},
+ _ = terminate => {},
+ }
+
+ provider
+ .shutdown()
+ .expect("failed to shutdown trace provider");
+}
diff --git a/lib/warden-core/src/configuration/conv.rs b/lib/warden-core/src/configuration/conv.rs
index cbb1e88..7f982b4 100644
--- a/lib/warden-core/src/configuration/conv.rs
+++ b/lib/warden-core/src/configuration/conv.rs
@@ -157,3 +157,150 @@ impl TryFrom<String> for Operator {
Operator::from_str_name(&value).ok_or_else(|| format!("unsupported operator: {}", value))
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::google::protobuf::{Value, value::Kind};
+ use serde::{Deserialize, Serialize};
+ use serde_json::{self, json};
+
+ #[derive(Deserialize, Serialize)]
+ struct OpWrap {
+ #[serde(with = "super::operator_serde")]
+ op: i32,
+ }
+
+ fn normalize_json_numbers(val: serde_json::Value) -> serde_json::Value {
+ match val {
+ serde_json::Value::Array(arr) => {
+ serde_json::Value::Array(arr.into_iter().map(normalize_json_numbers).collect())
+ }
+ serde_json::Value::Object(map) => serde_json::Value::Object(
+ map.into_iter()
+ .map(|(k, v)| (k, normalize_json_numbers(v)))
+ .collect(),
+ ),
+ serde_json::Value::Number(n) => {
+ if let Some(f) = n.as_f64() {
+ serde_json::Value::Number(
+ serde_json::Number::from_f64(f)
+ .unwrap_or_else(|| serde_json::Number::from(0)),
+ )
+ } else {
+ serde_json::Value::Number(n)
+ }
+ }
+ other => other,
+ }
+ }
+
+ #[test]
+ fn test_json_to_protobuf_value_null() {
+ let json = serde_json::Value::Null;
+ let protobuf_value = Value::try_from(json).unwrap();
+ assert!(matches!(protobuf_value.kind.unwrap(), Kind::NullValue(_)));
+ }
+
+ #[test]
+ fn test_json_to_protobuf_value_bool() {
+ let json = serde_json::Value::Bool(true);
+ let protobuf_value = Value::try_from(json).unwrap();
+ assert_eq!(protobuf_value.kind.unwrap(), Kind::BoolValue(true));
+ }
+
+ #[test]
+ fn test_json_to_protobuf_value_number() {
+ let json = serde_json::Value::Number(serde_json::Number::from(42));
+ let protobuf_value = Value::try_from(json).unwrap();
+ assert_eq!(protobuf_value.kind.unwrap(), Kind::NumberValue(42.0));
+ }
+
+ #[test]
+ fn test_json_to_protobuf_value_string() {
+ let json = serde_json::Value::String("hello".to_string());
+ let protobuf_value = Value::try_from(json).unwrap();
+ assert_eq!(
+ protobuf_value.kind.unwrap(),
+ Kind::StringValue("hello".to_string())
+ );
+ }
+
+ #[test]
+ fn test_json_to_protobuf_value_array() {
+ let json = json!([1, 2, "three"]);
+ let protobuf_value = Value::try_from(json).unwrap();
+ if let Kind::ListValue(list) = protobuf_value.kind.unwrap() {
+ assert_eq!(list.values.len(), 3);
+ } else {
+ panic!("Expected ListValue");
+ }
+ }
+
+ #[test]
+ fn test_json_to_protobuf_value_object() {
+ let json = json!({
+ "a": 1,
+ "b": true,
+ "c": "hello"
+ });
+ let protobuf_value = Value::try_from(json).unwrap();
+ if let Kind::StructValue(s) = protobuf_value.kind.unwrap() {
+ assert_eq!(
+ s.fields["a"].kind.as_ref().unwrap(),
+ &Kind::NumberValue(1.0)
+ );
+ assert_eq!(s.fields["b"].kind.as_ref().unwrap(), &Kind::BoolValue(true));
+ assert_eq!(
+ s.fields["c"].kind.as_ref().unwrap(),
+ &Kind::StringValue("hello".to_string())
+ );
+ } else {
+ panic!("Expected StructValue");
+ }
+ }
+
+ #[test]
+ fn test_protobuf_to_json_roundtrip() {
+ let original = json!({
+ "x": 1,
+ "y": [true, null, "str"],
+ "z": {
+ "nested": 3.14
+ }
+ });
+
+ let protobuf_value = Value::try_from(original.clone()).unwrap();
+ let json_value: serde_json::Value = protobuf_value.into();
+
+ assert_eq!(
+ normalize_json_numbers(original),
+ normalize_json_numbers(json_value)
+ );
+ }
+
+ #[test]
+ fn test_operator_serialization() {
+ let wrap = OpWrap {
+ op: Operator::Add as i32, // Replace with actual enum variant
+ };
+
+ let s = serde_json::to_string(&wrap).unwrap();
+ assert!(s.contains("ADD")); // Assuming .as_str_name() gives "ADD"
+ }
+
+ #[test]
+ fn test_operator_deserialization() {
+ let json_data = json!({ "op": "ADD" }).to_string();
+ let wrap: OpWrap = serde_json::from_str(&json_data).unwrap();
+
+ assert_eq!(wrap.op, Operator::Add as i32); // Replace with actual enum variant
+ }
+
+ #[test]
+ fn test_operator_invalid_deserialization() {
+ let json_data = json!({ "op": "UNKNOWN_OP" }).to_string();
+ let result: Result<OpWrap, _> = serde_json::from_str(&json_data);
+ assert!(result.is_err());
+ }
+}
diff --git a/lib/warden-core/src/google/parser/dt.rs b/lib/warden-core/src/google/parser/dt.rs
index 0e57833..7e61c1f 100644
--- a/lib/warden-core/src/google/parser/dt.rs
+++ b/lib/warden-core/src/google/parser/dt.rs
@@ -103,6 +103,97 @@ mod date {
)
}
}
+
+ #[cfg(test)]
+ mod tests {
+ use super::*;
+ use time::{Month, OffsetDateTime};
+
+ use time::Date as TimeDate;
+
+ #[test]
+ fn converts_dates() {
+ let d = TimeDate::from_calendar_date(2023, Month::August, 17).unwrap();
+ let date: Date = d.into();
+
+ let time_date = TimeDate::try_from(date);
+
+ assert!(time_date.is_ok());
+ }
+
+ #[test]
+ fn converts_regular_date_no_time() {
+ let d = TimeDate::from_calendar_date(2023, Month::August, 17).unwrap();
+ let date: Date = d.into();
+
+ assert_eq!(date.year, 2023);
+ assert_eq!(date.month, 8);
+ assert_eq!(date.day, 17);
+ }
+
+ #[test]
+ fn converts_leap_year_date() {
+ let d = TimeDate::from_calendar_date(2020, Month::February, 29).unwrap();
+ let date: Date = d.into();
+
+ assert_eq!(date.year, 2020);
+ assert_eq!(date.month, 2);
+ assert_eq!(date.day, 29);
+ }
+
+ #[test]
+ fn converts_minimum_date() {
+ let d = TimeDate::MIN; // Year -9999-01-01
+ let date: Date = d.into();
+
+ assert_eq!(date.year, -9999);
+ assert_eq!(date.month, 1);
+ assert_eq!(date.day, 1);
+ }
+
+ #[test]
+ fn converts_maximum_date() {
+ let d = TimeDate::MAX; // Year 9999-12-31
+ let date: Date = d.into();
+
+ assert_eq!(date.year, 9999);
+ assert_eq!(date.month, 12);
+ assert_eq!(date.day, 31);
+ }
+
+ #[test]
+ fn converts_regular_date() {
+ let dt = OffsetDateTime::from_unix_timestamp(1_600_000_000).unwrap(); // 2020-09-13 UTC
+ let date: Date = dt.into();
+
+ assert_eq!(date.year, 2020);
+ assert_eq!(date.month, 9);
+ assert_eq!(date.day, 13);
+ }
+
+ #[test]
+ fn converts_leap_year_feb_29() {
+ let dt = OffsetDateTime::new_utc(
+ time::Date::from_calendar_date(2020, Month::February, 29).unwrap(),
+ time::Time::from_hms(0, 0, 0).unwrap(),
+ );
+ let date: Date = dt.into();
+
+ assert_eq!(date.year, 2020);
+ assert_eq!(date.month, 2);
+ assert_eq!(date.day, 29);
+ }
+
+ #[test]
+ fn converts_first_day_of_epoch() {
+ let dt = OffsetDateTime::UNIX_EPOCH; // 1970-01-01
+ let date: Date = dt.into();
+
+ assert_eq!(date.year, 1970);
+ assert_eq!(date.month, 1);
+ assert_eq!(date.day, 1);
+ }
+ }
}
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
diff --git a/lib/warden-core/src/lib.rs b/lib/warden-core/src/lib.rs
index c97bef3..80fb34d 100644
--- a/lib/warden-core/src/lib.rs
+++ b/lib/warden-core/src/lib.rs
@@ -22,6 +22,7 @@ pub mod iso20022;
/// Message in transit
#[allow(missing_docs)]
+#[allow(clippy::large_enum_variant)]
#[cfg(feature = "message")]
pub mod message;