diff options
28 files changed, 1351 insertions, 73 deletions
diff --git a/.github/codecov.yml b/.github/codecov.yml new file mode 100644 index 0000000..dc4c65c --- /dev/null +++ b/.github/codecov.yml @@ -0,0 +1,9 @@ +# doc: https://docs.codecov.com/docs/codecovyml-reference +coverage: + # range: 85..100 + round: down + precision: 1 + status: + project: + default: + threshold: 1% diff --git a/.github/renovate.json b/.github/renovate.json index 0568be1..f4a0325 100644 --- a/.github/renovate.json +++ b/.github/renovate.json @@ -10,6 +10,12 @@ "matchUpdateTypes": ["minor", "patch"], "matchCurrentVersion": "!/^0/", "automerge": true + }, + { + "matchManagers": ["github-actions"], + "matchPackageNames": ["actions/setup-node"], + "automerge": true, + "automergeType": "branch" } ] } diff --git a/.github/workflows/check.yaml b/.github/workflows/check.yaml deleted file mode 100644 index 7bf14b2..0000000 --- a/.github/workflows/check.yaml +++ /dev/null @@ -1,62 +0,0 @@ -permissions: - contents: read -on: - push: - branches: [master] - pull_request: - -concurrency: - group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} - cancel-in-progress: true -name: check -jobs: - dockerfile: - runs-on: ubuntu-latest - strategy: - fail-fast: true - matrix: - crate: - - pseudonyms - - warden - - configuration - - router - - rule-executor - - aggregator - name: dockerfile / ${{ matrix.crate }} - steps: - - uses: actions/checkout@v5 - with: - submodules: true - - name: set up docker buildx - uses: docker/setup-buildx-action@v3 - - name: build # and push - uses: docker/build-push-action@v6 - with: - push: false - context: . - file: crates/${{ matrix.crate }}/Dockerfile - tags: warden/${{ matrix.crate }}:latest - cache-from: type=gha - cache-to: type=gha,mode=max - msrv: - runs-on: ubuntu-latest - strategy: - matrix: - msrv: ["1.89.0"] - name: msrv / ${{ matrix.msrv }} - steps: - - uses: actions/checkout@v5 - with: - submodules: true - - name: Install ${{ matrix.msrv }} - uses: dtolnay/rust-toolchain@master - with: - toolchain: ${{ matrix.msrv }} - - name: install protoc - uses: arduino/setup-protoc@v3 - with: - repo-token: ${{ secrets.GITHUB_TOKEN }} - - name: cargo install cargo-hack - uses: taiki-e/install-action@cargo-hack - - name: cargo hack +${{ matrix.msrv }} - run: cargo hack --clean-per-run --feature-powerset check diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml new file mode 100644 index 0000000..56cfdda --- /dev/null +++ b/.github/workflows/ci.yaml @@ -0,0 +1,191 @@ +permissions: + contents: read +on: + push: + branches: [master] + pull_request: + +concurrency: + group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} + cancel-in-progress: true + +name: ci + +jobs: + os-check: + runs-on: ${{ matrix.os }} + name: ${{ matrix.os }} / stable + strategy: + fail-fast: false + matrix: + os: [macos-latest, windows-latest] + steps: + - uses: actions/checkout@v5 + with: + submodules: true + - name: Install stable + uses: dtolnay/rust-toolchain@stable + - name: Install protoc + uses: arduino/setup-protoc@v3 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} + - name: Install cargo-nextest + uses: taiki-e/install-action@v2 + with: + tool: cargo-nextest + - uses: Swatinem/rust-cache@v2 + - name: cargo test --workspace + run: cargo nextest run --no-run --workspace --locked --all-features --all-targets + - name: cargo build + run: cargo build --workspace --locked --all-features --all-targets + + dockerfile: + runs-on: ubuntu-latest + strategy: + fail-fast: true + matrix: + crate: + - pseudonyms + - warden + - configuration + - router + - rule-executor + - aggregator + - typologies + name: dockerfile / ${{ matrix.crate }} + steps: + - uses: actions/checkout@v5 + with: + submodules: true + - name: Set up docker buildx + uses: docker/setup-buildx-action@v3 + - name: Build # and push + uses: docker/build-push-action@v6 + with: + push: false + context: . + file: crates/${{ matrix.crate }}/Dockerfile + tags: warden/${{ matrix.crate }}:latest + cache-from: type=gha + cache-to: type=gha,mode=max + outputs: type=docker,dest=/tmp/${{ matrix.crate }}.tar + - name: Upload artifact + uses: actions/upload-artifact@v4 + with: + name: warden-${{ matrix.crate }} + path: /tmp/${{ matrix.crate }}.tar + + msrv: + runs-on: ubuntu-latest + strategy: + matrix: + msrv: ["1.89.0"] + name: msrv / ${{ matrix.msrv }} + steps: + - uses: actions/checkout@v5 + with: + submodules: true + - name: Install ${{ matrix.msrv }} + uses: dtolnay/rust-toolchain@master + with: + toolchain: ${{ matrix.msrv }} + - name: Install protoc + uses: arduino/setup-protoc@v3 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} + - name: cargo install cargo-hack + uses: taiki-e/install-action@cargo-hack + - name: cargo hack +${{ matrix.msrv }} + run: cargo hack --clean-per-run --feature-powerset check + + test: + runs-on: ubuntu-latest + needs: + - dockerfile + - os-check + name: test / workspace + steps: + - uses: actions/checkout@v5 + with: + submodules: true + - name: Start stack + run: docker compose -f contrib/docker-compose/compose.yaml up -d + - name: Setup node.js + uses: actions/setup-node@v4 + with: + node-version: '22' + - name: Download artifacts + uses: actions/download-artifact@v5 + with: + path: warden-images + pattern: warden-* + merge-multiple: true + - name: Load docker images + run: | + dir="warden-images" + for file in "$dir"/*.tar; do + if [ -f "$file" ]; then + echo "Loading tar: $file" + image_name=$(docker load --input "$file" | awk '/Loaded image:/ {print $3}') + echo "Running image: $image_name" + docker run --rm -d --network host "$image_name" + fi + done + echo "images loaded" + docker image ls -a + - name: Install bruno cli + run: npm install -g @usebruno/cli + - name: Run HTTP-api tests + run: | + cd contrib/bruno + bru run configuration/health-check.bru \ + configuration/routing/02-post-routing.bru \ + configuration/rule/01-create.bru \ + configuration/typology/01-create.bru \ + --env warden --reporter-html results.html + - name: Upload test results + uses: actions/upload-artifact@v4 + with: + name: test-results + path: contrib/bruno/results.html + - name: Install stable + uses: dtolnay/rust-toolchain@stable + with: + components: llvm-tools-preview + - uses: taiki-e/install-action@cargo-llvm-cov + - name: Install cargo-nextest + uses: taiki-e/install-action@v2 + with: + tool: cargo-nextest + - uses: Swatinem/rust-cache@v2 + - name: Install protoc + uses: arduino/setup-protoc@v3 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} + - name: Prepare environment for tests + run: | + for processor in warden pseudonyms aggregator configuration; do + cp crates/$processor/.env.example crates/$processor/.env + done + # - name: Collect coverage data + # # Generate separate reports for nextest and doctests, and combine them. + # run: | + # cargo llvm-cov nextest --workspace --locked --all-features --lcov --output-path lcov.info + # # switch to nightly + # # cargo llvm-cov --no-report nextest + # # cargo llvm-cov --no-report --doc + # # cargo llvm-cov report --doctests --lcov --output-path lcov.info + - name: cargo llvm-cov + run: cargo llvm-cov nextest --workspace --locked --all-features --lcov --output-path lcov.info + - name: Upload to codecov.io + uses: codecov/codecov-action@v5 + with: + fail_ci_if_error: true + token: ${{ secrets.CODECOV_TOKEN }} + slug: ${{ github.repository }} + - name: Stop stack + if: always() + run: | + docker compose -f contrib/docker-compose/compose.yaml down -v + docker stop $(docker ps -aq) || true + docker rm $(docker ps -aq) || true @@ -3717,10 +3717,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1dccffe3ce07af9386bfd29e80c0ab1a8205a2fc34e4bcd40364df902cfa8f3f" [[package]] -name = "typologies" -version = "0.1.0" - -[[package]] name = "unicase" version = "2.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -4129,6 +4125,31 @@ dependencies = [ ] [[package]] +name = "warden-typologies" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-nats", + "clap", + "config", + "futures-util", + "moka", + "opentelemetry", + "opentelemetry-semantic-conventions", + "prost 0.14.1", + "serde", + "serde_json", + "tokio", + "tonic 0.14.1", + "tracing", + "tracing-opentelemetry", + "uuid", + "warden-core", + "warden-middleware", + "warden-stack", +] + +[[package]] name = "wasi" version = "0.11.1+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" diff --git a/contrib/bruno/configuration/health-check.bru b/contrib/bruno/configuration/health-check.bru new file mode 100644 index 0000000..62f66a9 --- /dev/null +++ b/contrib/bruno/configuration/health-check.bru @@ -0,0 +1,19 @@ +meta { + name: health-check + type: http + seq: 4 +} + +get { + url: {{WARDEN_CFG_HOST}} + body: none + auth: inherit +} + +assert { + res.status: eq 200 +} + +settings { + encodeUrl: true +} diff --git a/contrib/bruno/transaction-monitoring/folder.bru b/contrib/bruno/transaction-monitoring/folder.bru index ca6f740..030714b 100644 --- a/contrib/bruno/transaction-monitoring/folder.bru +++ b/contrib/bruno/transaction-monitoring/folder.bru @@ -1,6 +1,6 @@ meta { name: transaction-monitoring - seq: 2 + seq: 3 } auth { diff --git a/crates/aggregator/.env.example b/crates/aggregator/.env.example new file mode 100644 index 0000000..5178bd5 --- /dev/null +++ b/crates/aggregator/.env.example @@ -0,0 +1 @@ +DATABASE_URL=postgres://postgres:password@localhost:5432/evaluations diff --git a/crates/aggregator/src/main.rs b/crates/aggregator/src/main.rs index 62af544..7bb5a79 100644 --- a/crates/aggregator/src/main.rs +++ b/crates/aggregator/src/main.rs @@ -4,7 +4,7 @@ mod state; use anyhow::Result; use clap::Parser; -use tracing::error; +use tracing::{error, trace}; use warden_stack::{Configuration, Services, tracing::Tracing}; use crate::state::AppState; @@ -78,6 +78,12 @@ async fn main() -> Result<()> { let state = AppState::create(services, &config).await?; + trace!("running migrations"); + sqlx::migrate!("./migrations") + .run(&state.services.postgres) + .await?; + trace!("migrations updated"); + processor::serve(state, provider).await?; Ok(()) diff --git a/crates/configuration/.env.example b/crates/configuration/.env.example new file mode 100644 index 0000000..9581956 --- /dev/null +++ b/crates/configuration/.env.example @@ -0,0 +1 @@ +DATABASE_URL=postgres://postgres:password@localhost:5432/configuration diff --git a/crates/configuration/src/main.rs b/crates/configuration/src/main.rs index 7dc8da6..6f90b69 100644 --- a/crates/configuration/src/main.rs +++ b/crates/configuration/src/main.rs @@ -8,7 +8,7 @@ use crate::{server::error::AppError, state::AppState}; use axum::http::header::CONTENT_TYPE; use clap::Parser; use tower::{make::Shared, steer::Steer}; -use tracing::{error, info}; +use tracing::{error, info, trace}; use warden_stack::{Configuration, Services, tracing::Tracing}; /// warden-config @@ -82,6 +82,12 @@ async fn main() -> Result<(), AppError> { ) .await?; + trace!("running migrations"); + sqlx::migrate!("./migrations") + .run(&state.services.postgres) + .await?; + trace!("migrations updated"); + let (app, grpc_server) = server::serve(state)?; let service = Steer::new( diff --git a/crates/pseudonyms/.env.example b/crates/pseudonyms/.env.example new file mode 100644 index 0000000..3728d42 --- /dev/null +++ b/crates/pseudonyms/.env.example @@ -0,0 +1 @@ +DATABASE_URL=postgres://postgres:password@localhost:5432/pseudonyms diff --git a/crates/typologies/.dockerignore b/crates/typologies/.dockerignore new file mode 100644 index 0000000..c8cd160 --- /dev/null +++ b/crates/typologies/.dockerignore @@ -0,0 +1,5 @@ +/target +.env +.git +.github +/contrib diff --git a/crates/typologies/Cargo.toml b/crates/typologies/Cargo.toml index 207e671..38a8cb4 100644 --- a/crates/typologies/Cargo.toml +++ b/crates/typologies/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "typologies" +name = "warden-typologies" version = "0.1.0" edition = "2024" license.workspace = true @@ -8,3 +8,36 @@ documentation.workspace = true description.workspace = true [dependencies] +anyhow.workspace = true +async-nats.workspace = true +clap = { workspace = true, features = ["derive"] } +config = { workspace = true, features = ["toml"] } +futures-util = { workspace = true, default-features = false } +moka = { workspace = true, features = ["future"] } +opentelemetry.workspace = true +opentelemetry-semantic-conventions.workspace = true +prost.workspace = true +serde = { workspace = true, features = ["derive"] } +serde_json.workspace = true +tokio = { workspace = true, features = [ + "macros", + "rt-multi-thread", + "signal", +] } +tonic.workspace = true +tracing.workspace = true +tracing-opentelemetry.workspace = true +uuid = { workspace = true, features = ["v7"] } +warden-core = { workspace = true, features = [ + "message", + "configuration", + "serde-time", +] } +warden-middleware.workspace = true +warden-stack = { workspace = true, features = [ + "cache", + "nats-jetstream", + "opentelemetry", + "opentelemetry-tonic", + "tracing-loki", +] } diff --git a/crates/typologies/Dockerfile b/crates/typologies/Dockerfile new file mode 100644 index 0000000..61bb2e2 --- /dev/null +++ b/crates/typologies/Dockerfile @@ -0,0 +1,26 @@ +FROM rust:1.89.0-slim AS builder + +ENV SQLX_OFFLINE=true + +RUN rustup target add x86_64-unknown-linux-musl +RUN apt update && apt install -y musl-tools musl-dev protobuf-compiler curl +RUN update-ca-certificates + +WORKDIR /usr/src/app + +RUN mkdir -p crates + +COPY ./crates/typologies crates/typologies +COPY ./lib lib +COPY ./Cargo.toml . +COPY ./Cargo.lock . + +RUN cargo fetch + +COPY ./proto proto + +RUN cargo build --target x86_64-unknown-linux-musl --release + +FROM scratch +COPY --from=builder /usr/src/app/target/x86_64-unknown-linux-musl/release/warden-typologies ./ +CMD [ "./warden-typologies" ] diff --git a/crates/typologies/src/cnfg.rs b/crates/typologies/src/cnfg.rs new file mode 100644 index 0000000..6086f46 --- /dev/null +++ b/crates/typologies/src/cnfg.rs @@ -0,0 +1,27 @@ +use std::sync::Arc; + +use serde::Deserialize; + +#[derive(Deserialize, Clone)] +#[serde(rename_all = "kebab-case")] +pub struct LocalConfig { + pub config_endpoint: Arc<str>, + pub nats: Nats, +} + +#[derive(Deserialize, Clone)] +#[serde(rename_all = "kebab-case")] +pub struct Nats { + pub subjects: Arc<[String]>, + pub destination_prefix: Arc<str>, + pub max_messages: i64, + pub durable_name: Arc<str>, + pub config: ConfigNats, +} + +#[derive(Deserialize, Clone)] +#[serde(rename_all = "kebab-case")] +pub struct ConfigNats { + pub stream: Arc<str>, + pub reload_subject: Arc<str>, +} diff --git a/crates/typologies/src/main.rs b/crates/typologies/src/main.rs index e7a11a9..ea7843a 100644 --- a/crates/typologies/src/main.rs +++ b/crates/typologies/src/main.rs @@ -1,3 +1,65 @@ -fn main() { - println!("Hello, world!"); +mod cnfg; +mod processor; +mod state; + +use anyhow::Result; +use clap::Parser; +use tracing::error; +use warden_stack::{Configuration, Services, tracing::Tracing}; + +/// typologies +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +struct Args { + /// Path to config file + #[arg(short, long)] + config_file: Option<std::path::PathBuf>, +} + +#[tokio::main] +async fn main() -> Result<()> { + let args = Args::parse(); + let config = include_str!("../typologies.toml"); + + let mut config = config::Config::builder() + .add_source(config::File::from_str(config, config::FileFormat::Toml)); + + if let Some(cf) = args.config_file.as_ref().and_then(|v| v.to_str()) { + config = config.add_source(config::File::new(cf, config::FileFormat::Toml)); + }; + + let mut config: Configuration = config.build()?.try_deserialize()?; + config.application.name = env!("CARGO_CRATE_NAME").into(); + config.application.version = env!("CARGO_PKG_VERSION").into(); + + let tracing = Tracing::builder() + .opentelemetry(&config.application, &config.monitoring)? + .loki(&config.application, &config.monitoring)? + .build(&config.monitoring); + + let provider = tracing.otel_provider; + + tokio::spawn(tracing.loki_task); + + let mut services = Services::builder() + .nats_jetstream(&config.nats) + .await + .inspect_err(|e| error!("nats: {e}"))? + .build(); + + let jetstream = services + .jetstream + .take() + .ok_or_else(|| anyhow::anyhow!("jetstream is not ready"))?; + + let cache = services + .cache + .take() + .ok_or_else(|| anyhow::anyhow!("cache is not ready"))?; + + let services = state::Services { jetstream, cache }; + + processor::serve(services, config, provider) + .await + .inspect_err(|e| error!("{e}")) } diff --git a/crates/typologies/src/processor.rs b/crates/typologies/src/processor.rs new file mode 100644 index 0000000..a2a3b17 --- /dev/null +++ b/crates/typologies/src/processor.rs @@ -0,0 +1,124 @@ +mod driver; +mod publish; +mod reload; +mod typology; + +use std::sync::Arc; + +use anyhow::Result; +use async_nats::jetstream::{ + Context, + consumer::{Consumer, pull}, +}; +use futures_util::{StreamExt, future}; +use tokio::signal; +use tracing::{error, trace, warn}; +use warden_stack::{Configuration, tracing::SdkTracerProvider}; + +use crate::{ + cnfg::Nats, + state::{AppHandle, AppState, Services}, +}; + +pub async fn serve( + services: Services, + config: Configuration, + provider: SdkTracerProvider, +) -> anyhow::Result<()> { + let state = Arc::new(AppState::new(services, config).await?); + + tokio::select! { + _ = futures_util::future::try_join(reload::reload(Arc::clone(&state)), run(state)) => {} + _ = shutdown_signal(provider) => {} + }; + + Ok(()) +} + +async fn run(state: AppHandle) -> anyhow::Result<()> { + let consumer = get_or_create_stream(&state.services.jetstream, &state.config.nats).await?; + + let limit = None; + + consumer + .messages() + .await? + .for_each_concurrent(limit, |message| { + let state = Arc::clone(&state); + tokio::spawn(async move { + match message { + Ok(message) => { + if let Err(e) = typology::process_typology(message, state).await { + error!("{e:?}"); + } + } + Err(e) => { + warn!("{e:?}"); + } + } + }); + future::ready(()) + }) + .await; + + Ok(()) +} + +async fn get_or_create_stream( + jetstream: &Context, + nats: &Nats, +) -> anyhow::Result<Consumer<pull::Config>> { + let name = format!("{}.v{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION")) + .replace(".", "") + .replace("_", ""); + trace!(name = name, subjects = ?nats.subjects, "getting or creating stream"); + + let stream = jetstream + .get_or_create_stream(async_nats::jetstream::stream::Config { + name, + subjects: nats.subjects.iter().map(Into::into).collect(), + max_messages: nats.max_messages, + ..Default::default() + }) + .await?; + let durable = nats.durable_name.to_string(); + // Get or create a pull-based consumer + Ok(stream + .get_or_create_consumer( + durable.as_ref(), + async_nats::jetstream::consumer::pull::Config { + durable_name: Some(durable.to_string()), + ..Default::default() + }, + ) + .await?) +} + +async fn shutdown_signal(provider: SdkTracerProvider) -> Result<()> { + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + }; + + #[cfg(unix)] + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("failed to install signal handler") + .recv() + .await; + }; + + #[cfg(not(unix))] + let terminate = std::future::pending::<()>(); + + tokio::select! { + _ = ctrl_c => { + }, + _ = terminate => { + }, + } + let _ = provider.shutdown(); + + Ok(()) +} diff --git a/crates/typologies/src/processor/driver.rs b/crates/typologies/src/processor/driver.rs new file mode 100644 index 0000000..d150620 --- /dev/null +++ b/crates/typologies/src/processor/driver.rs @@ -0,0 +1,40 @@ +use tonic::IntoRequest; +use warden_core::configuration::typology::{TypologyConfiguration, TypologyConfigurationRequest}; + +use crate::state::AppHandle; + +pub trait GetTypologyConfiguration { + fn get_typology_config( + &self, + typology_key: TypologyConfigurationRequest, + ) -> impl std::future::Future<Output = anyhow::Result<TypologyConfiguration>> + Send; +} + +impl GetTypologyConfiguration for AppHandle { + async fn get_typology_config( + &self, + typology_key: TypologyConfigurationRequest, + ) -> anyhow::Result<TypologyConfiguration> { + { + let local_cache = self.local_cache.read().await; + if let Some(result) = local_cache.get(&typology_key).await.map(Ok) { + return result; + } + } + + let local_cache = self.local_cache.write().await; + let mut client = self.query_typology_client.clone(); + + let value = client + .get_typology_configuration(typology_key.clone().into_request()) + .await? + .into_inner() + .configuration + .ok_or_else(|| anyhow::anyhow!("configuration unavailable"))?; + local_cache + .insert(typology_key.clone(), value.clone()) + .await; + + Ok(value) + } +} diff --git a/crates/typologies/src/processor/publish.rs b/crates/typologies/src/processor/publish.rs new file mode 100644 index 0000000..b031bf3 --- /dev/null +++ b/crates/typologies/src/processor/publish.rs @@ -0,0 +1,44 @@ +use opentelemetry::global; +use opentelemetry_semantic_conventions::attribute; +use tracing::{Instrument, Span, debug, info_span, warn}; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use warden_core::message::Payload; +use warden_stack::tracing::telemetry::nats::injector; + +use crate::state::AppHandle; + +pub(crate) async fn to_tadp( + subject: &str, + state: AppHandle, + payload: Payload, +) -> anyhow::Result<()> { + // send transaction to next with nats + let subject = format!("{}.{}", state.config.nats.destination_prefix, subject); + debug!(subject = ?subject, "publishing"); + + let payload = prost::Message::encode_to_vec(&payload); + + let mut headers = async_nats::HeaderMap::new(); + + let cx = Span::current().context(); + + global::get_text_map_propagator(|propagator| { + propagator.inject_context(&cx, &mut injector::HeaderMap(&mut headers)) + }); + + let span = info_span!("nats.publish"); + span.set_attribute( + attribute::MESSAGING_DESTINATION_SUBSCRIPTION_NAME, + subject.to_string(), + ); + span.set_attribute(attribute::MESSAGING_SYSTEM, "nats"); + state + .services + .jetstream + .publish_with_headers(subject.clone(), headers, payload.into()) + .instrument(span) + .await + .inspect_err(|e| warn!(subject = ?subject, "failed to publish: {e}"))?; + + Ok(()) +} diff --git a/crates/typologies/src/processor/reload.rs b/crates/typologies/src/processor/reload.rs new file mode 100644 index 0000000..fac4c40 --- /dev/null +++ b/crates/typologies/src/processor/reload.rs @@ -0,0 +1,71 @@ +use async_nats::jetstream::consumer; +use futures_util::StreamExt; +use prost::Message as _; +use tracing::{error, info, trace}; +use uuid::Uuid; +use warden_core::configuration::{ConfigKind, ReloadEvent, typology::TypologyConfigurationRequest}; + +use crate::state::AppHandle; + +pub async fn reload(state: AppHandle) -> anyhow::Result<()> { + let id = Uuid::now_v7().to_string(); + info!(durable = id, "listening for configuration changes"); + + let durable = &id; + let consumer = state + .services + .jetstream + .get_stream(state.config.nats.config.stream.to_string()) + .await? + .get_or_create_consumer( + durable, + consumer::pull::Config { + durable_name: Some(durable.to_string()), + filter_subject: state.config.nats.config.reload_subject.to_string(), + deliver_policy: consumer::DeliverPolicy::LastPerSubject, + ..Default::default() + }, + ) + .await?; + + let mut messages = consumer.messages().await?; + while let Some(value) = messages.next().await { + match value { + Ok(message) => { + trace!("got reload cache event"); + if let Ok(res) = ReloadEvent::decode(message.payload.as_ref()) + && let Ok(kind) = ConfigKind::try_from(res.kind) + { + match kind { + ConfigKind::Typology => { + let local_cache = state.local_cache.write().await; + let id = res.id(); + let version = res.version(); + trace!( + id = id, + ver = version, + "update triggered, invalidating typology config" + ); + let key = TypologyConfigurationRequest { + id: id.to_string(), + version: version.to_string(), + }; + + local_cache.invalidate(&key).await; + let _ = message.ack().await.inspect_err(|e| error!("{e}")); + } + _ => { + trace!(kind = ?kind, "detected reload event, nothing to do here, acknowledging"); + let _ = message.ack().await.inspect_err(|e| error!("{e}")); + } + } + } + } + Err(e) => { + error!("{e:?}") + } + } + } + + Ok(()) +} diff --git a/crates/typologies/src/processor/typology.rs b/crates/typologies/src/processor/typology.rs new file mode 100644 index 0000000..b1b2592 --- /dev/null +++ b/crates/typologies/src/processor/typology.rs @@ -0,0 +1,180 @@ +mod aggregate_rules; +mod evaluate_expression; + +use std::sync::Arc; + +use anyhow::Result; +use opentelemetry::global; +use prost::Message; +use tracing::{Instrument, Span, error, info, info_span, instrument, warn}; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use warden_core::{ + configuration::{routing::RoutingConfiguration, typology::TypologyConfigurationRequest}, + message::{Payload, RuleResult, TypologyResult}, +}; +use warden_stack::{redis::AsyncCommands, tracing::telemetry::nats::extractor}; + +use crate::{ + processor::{driver::GetTypologyConfiguration as _, publish}, + state::AppHandle, +}; + +#[instrument(skip(message, state), err(Debug))] +pub async fn process_typology( + message: async_nats::jetstream::Message, + state: AppHandle, +) -> Result<()> { + let span = Span::current(); + + if let Some(ref headers) = message.headers { + let context = global::get_text_map_propagator(|propagator| { + propagator.extract(&extractor::HeaderMap(headers)) + }); + span.set_parent(context); + }; + + let payload: Payload = Message::decode(message.payload.as_ref())?; + + if payload.transaction.is_none() { + warn!("transaction is empty - proceeding with ack"); + let _ = message.ack().await; + return Ok(()); + } + + let transaction = payload.transaction.as_ref().expect("to have returned"); + + match transaction { + warden_core::message::payload::Transaction::Pacs008(_) => { + warn!("Pacs008 is unsupported on this version: this should be unreachable"); + } + warden_core::message::payload::Transaction::Pacs002(pacs002_document) => { + let key = format!( + "tp_{}", + pacs002_document.f_i_to_f_i_pmt_sts_rpt.grp_hdr.msg_id + ); + + let rule_result = &payload + .rule_result + .as_ref() + .expect("rule result should be here"); + let rule_results = cache_and_get_all(&key, rule_result, Arc::clone(&state)).await?; + + let routing = payload + .routing + .as_ref() + .expect("routing missing from payload"); + + let (mut typology_result, _rule_count) = + aggregate_rules::aggregate_rules(&rule_results, routing, rule_result)?; + + let _ = evaluate_typology(&mut typology_result, routing, payload.clone(), &key, state) + .await + .inspect_err(|e| error!("{e}")); + } + }; + + let span = info_span!("nats.ack"); + message + .ack() + .instrument(span) + .await + .map_err(|_| anyhow::anyhow!("ack error"))?; + + Ok(()) +} + +#[instrument(skip(typology_result, routing, payload, state), err(Debug))] +async fn evaluate_typology( + typology_result: &mut [TypologyResult], + routing: &RoutingConfiguration, + mut payload: Payload, + key: &str, + state: AppHandle, +) -> Result<()> { + for typology_result in typology_result.iter_mut() { + let handle = Arc::clone(&state); + let routing_rules = routing.messages[0].typologies.iter().find(|typology| { + typology.version.eq(&typology_result.version) && typology.id.eq(&typology_result.id) + }); + let typology_result_rules = &typology_result.rule_results; + + if routing_rules.is_some() + && typology_result_rules.len() < routing_rules.unwrap().rules.len() + { + continue; + } + + let typology_config = handle + .get_typology_config(TypologyConfigurationRequest { + id: typology_result.id.to_owned(), + version: typology_result.version.to_owned(), + }) + .await?; + + let result = evaluate_expression::evaluate_expression(typology_result, &typology_config)?; + + typology_result.result = result; + + let workflow = typology_config + .workflow + .as_ref() + .expect("no workflow in config"); + + if workflow.interdiction_threshold.is_some() { + typology_result.workflow.replace(*workflow); + } + typology_result.review = result.ge(&typology_config.workflow.unwrap().alert_threshold); + + payload.typology_result = Some(typology_result.to_owned()); + + let is_interdicting = typology_config + .workflow + .unwrap() + .interdiction_threshold + .is_some_and(|value| value > 0.0 && result >= value); + + if is_interdicting { + typology_result.review = true; + } + + if result >= typology_config.workflow.unwrap().alert_threshold { + info!("alerting"); + } + + let subj = handle.config.nats.destination_prefix.to_string(); + let _ = publish::to_tadp(&subj, handle, payload.clone()) + .await + .inspect_err(|e| error!("{e}")); + + let mut c = state.services.cache.get().await?; + c.del::<_, ()>(key).await?; + } + + Ok(()) +} + +async fn cache_and_get_all( + cache_key: &str, + rule_result: &RuleResult, + state: AppHandle, +) -> Result<Vec<RuleResult>> { + let mut cache = state.services.cache.get().await?; + + let bytes = prost::Message::encode_to_vec(rule_result); + + let res = warden_stack::redis::pipe() + .sadd::<_, _>(cache_key, bytes) + .ignore() + .smembers(cache_key) + .query_async::<Vec<Vec<Vec<u8>>>>(&mut cache) + .await?; + + let members = res + .first() + .ok_or_else(|| anyhow::anyhow!("smembers did not return anything"))?; + + members + .iter() + .map(|value| RuleResult::decode(value.as_ref()).map_err(anyhow::Error::new)) + .collect() +} diff --git a/crates/typologies/src/processor/typology/aggregate_rules.rs b/crates/typologies/src/processor/typology/aggregate_rules.rs new file mode 100644 index 0000000..8f92dae --- /dev/null +++ b/crates/typologies/src/processor/typology/aggregate_rules.rs @@ -0,0 +1,202 @@ +use anyhow::Result; +use std::collections::HashSet; + +use warden_core::{ + configuration::routing::RoutingConfiguration, + message::{RuleResult, TypologyResult}, +}; + +pub(super) fn aggregate_rules( + rule_results: &[RuleResult], + routing: &RoutingConfiguration, + rule_result: &RuleResult, +) -> Result<(Vec<TypologyResult>, usize)> { + let mut typology_result: Vec<TypologyResult> = vec![]; + let mut all_rules_set = HashSet::new(); + + routing.messages.iter().for_each(|message| { + message.typologies.iter().for_each(|typology| { + let mut set = HashSet::new(); + + for rule in typology.rules.iter() { + set.insert((&rule.id, rule.version())); + all_rules_set.insert((&rule.id, rule.version())); + } + + if !set.contains(&(&rule_result.id, rule_result.version.as_str())) { + return; + } + + let rule_results: Vec<_> = rule_results + .iter() + .filter_map(|value| { + if set.contains(&(&value.id, &value.version)) { + Some(value.to_owned()) + } else { + None + } + }) + .collect(); + + if !rule_results.is_empty() { + typology_result.push(TypologyResult { + id: typology.id.to_owned(), + version: typology.version.to_owned(), + rule_results, + ..Default::default() + }); + } + }); + }); + + Ok((typology_result, all_rules_set.len())) +} + +#[cfg(test)] +mod tests { + use super::*; + use warden_core::{ + configuration::routing::{Message, RoutingConfiguration, Rule, Typology}, + message::RuleResult, + }; + + fn create_rule(id: &str, version: &str) -> Rule { + Rule { + id: id.to_string(), + version: Some(version.to_string()), + } + } + + fn create_rule_result(id: &str, version: &str) -> RuleResult { + RuleResult { + id: id.to_string(), + version: version.to_string(), + ..Default::default() + } + } + + #[test] + fn returns_empty_when_no_matching_typology() { + let routing = RoutingConfiguration { + messages: vec![Message { + typologies: vec![Typology { + id: "T1".to_string(), + version: "v1".to_string(), + rules: vec![create_rule("R1", "v1")], + }], + ..Default::default() + }], + ..Default::default() + }; + + let rule_results = vec![create_rule_result("R2", "v1")]; + let input_rule = create_rule_result("R2", "v1"); + + let (result, count) = aggregate_rules(&rule_results, &routing, &input_rule).unwrap(); + assert!(result.is_empty()); + assert_eq!(count, 1); // one rule in routing + } + + #[test] + fn returns_typology_with_matching_rule() { + let routing = RoutingConfiguration { + messages: vec![Message { + typologies: vec![Typology { + id: "T1".to_string(), + version: "v1".to_string(), + rules: vec![create_rule("R1", "v1"), create_rule("R2", "v1")], + }], + ..Default::default() + }], + ..Default::default() + }; + + let rule_results = vec![ + create_rule_result("R1", "v1"), + create_rule_result("R2", "v1"), + ]; + + let input_rule = create_rule_result("R1", "v1"); + + let (result, count) = aggregate_rules(&rule_results, &routing, &input_rule).unwrap(); + + assert_eq!(count, 2); // R1, R2 + assert_eq!(result.len(), 1); + assert_eq!(result[0].id, "T1"); + assert_eq!(result[0].rule_results.len(), 2); + } + + #[test] + fn ignores_unrelated_rules_in_rule_results() { + let routing = RoutingConfiguration { + messages: vec![Message { + typologies: vec![Typology { + id: "T1".to_string(), + version: "v1".to_string(), + rules: vec![create_rule("R1", "v1")], + }], + ..Default::default() + }], + ..Default::default() + }; + + let rule_results = vec![ + create_rule_result("R1", "v1"), + create_rule_result("R99", "v1"), // unrelated + ]; + + let input_rule = create_rule_result("R1", "v1"); + + let (result, count) = aggregate_rules(&rule_results, &routing, &input_rule).unwrap(); + + assert_eq!(count, 1); + assert_eq!(result.len(), 1); + assert_eq!(result[0].rule_results.len(), 1); + assert_eq!(result[0].rule_results[0].id, "R1"); + } + + #[test] + fn handles_multiple_messages_and_typologies() { + let routing = RoutingConfiguration { + messages: vec![ + Message { + typologies: vec![ + Typology { + id: "T1".to_string(), + version: "v1".to_string(), + rules: vec![create_rule("R1", "v1")], + }, + Typology { + id: "T2".to_string(), + version: "v1".to_string(), + rules: vec![create_rule("R2", "v1")], + }, + ], + ..Default::default() + }, + Message { + typologies: vec![Typology { + id: "T3".to_string(), + version: "v1".to_string(), + rules: vec![create_rule("R1", "v1"), create_rule("R2", "v1")], + }], + ..Default::default() + }, + ], + ..Default::default() + }; + + let rule_results = vec![ + create_rule_result("R1", "v1"), + create_rule_result("R2", "v1"), + ]; + let input_rule = create_rule_result("R1", "v1"); + + let (result, count) = aggregate_rules(&rule_results, &routing, &input_rule).unwrap(); + + assert_eq!(count, 2); // R1, R2 appear in multiple typologies, but unique rules are 2 + assert_eq!(result.len(), 2); // T1 (R1) and T3 (R1 & R2) + assert_eq!(result[0].id, "T1"); + assert_eq!(result[1].id, "T3"); + } +} diff --git a/crates/typologies/src/processor/typology/evaluate_expression.rs b/crates/typologies/src/processor/typology/evaluate_expression.rs new file mode 100644 index 0000000..844011e --- /dev/null +++ b/crates/typologies/src/processor/typology/evaluate_expression.rs @@ -0,0 +1,171 @@ +use anyhow::Result; +use tracing::warn; +use warden_core::{configuration::typology::TypologyConfiguration, message::TypologyResult}; + +pub(super) fn evaluate_expression( + typology_result: &mut TypologyResult, + typology_config: &TypologyConfiguration, +) -> Result<f64> { + let mut to_return = 0.0; + let expression = typology_config + .expression + .as_ref() + .expect("expression is missing"); + + let rule_values = &typology_config.rules; + + for rule in expression.terms.iter() { + let rule_result = typology_result + .rule_results + .iter() + .find(|value| value.id.eq(&rule.id) && value.version.eq(&rule.version)); + + if rule_result.is_none() { + warn!(term = ?rule, "could not find rule result for typology term"); + return Ok(Default::default()); + } + + let rule_result = rule_result.expect("checked and is some"); + + let weight = rule_values + .iter() + .filter_map(|rv| { + if !(rv.id.eq(&rule_result.id) && rv.version.eq(&rule_result.version)) { + None + } else { + rv.wghts.iter().find_map(|value| { + match value.r#ref.eq(&rule_result.sub_rule_ref) { + true => Some(value.wght), + false => None, + } + }) + } + }) + .next(); + + if weight.is_none() { + warn!(rule = ?rule, "could not find a weight for the matching rule"); + } + let weight = weight.unwrap_or_default(); + + to_return = match expression.operator() { + warden_core::configuration::typology::Operator::Add => to_return + weight, + warden_core::configuration::typology::Operator::Multiply => to_return * weight, + warden_core::configuration::typology::Operator::Subtract => to_return - weight, + warden_core::configuration::typology::Operator::Divide => { + if weight.ne(&0.0) { + to_return / weight + } else { + to_return + } + } + }; + } + Ok(to_return) +} + +#[cfg(test)] +mod tests { + use warden_core::{ + configuration::typology::{Expression, Operator, Term, TypologyRule, TypologyRuleWeight}, + message::RuleResult, + }; + + use super::*; + + fn make_rule_result(id: &str, version: &str, sub_ref: &str) -> RuleResult { + RuleResult { + id: id.to_string(), + version: version.to_string(), + sub_rule_ref: sub_ref.to_string(), + ..Default::default() + } + } + + fn make_rule_value(id: &str, version: &str, ref_name: &str, weight: f64) -> TypologyRule { + TypologyRule { + id: id.to_string(), + version: version.to_string(), + wghts: vec![TypologyRuleWeight { + r#ref: ref_name.to_string(), + wght: weight, + }], + } + } + + fn make_expression(terms: Vec<(&str, &str)>, op: Operator) -> Expression { + Expression { + terms: terms + .into_iter() + .map(|(id, version)| Term { + id: id.to_string(), + version: version.to_string(), + }) + .collect(), + operator: op.into(), + } + } + + #[test] + fn test_add_operator_multiple_terms() { + let mut typology_result = TypologyResult { + rule_results: vec![ + make_rule_result("R1", "v1", "sub1"), + make_rule_result("R2", "v1", "sub2"), + ], + ..Default::default() + }; + + let config = TypologyConfiguration { + expression: Some(make_expression( + vec![("R1", "v1"), ("R2", "v1")], + Operator::Add, + )), + rules: vec![ + make_rule_value("R1", "v1", "sub1", 10.0), + make_rule_value("R2", "v1", "sub2", 5.0), + ], + ..Default::default() + }; + + let result = evaluate_expression(&mut typology_result, &config).unwrap(); + assert_eq!(result, 15.0); + } + + #[test] + fn test_missing_rule_result_returns_zero() { + let mut typology_result = TypologyResult { + rule_results: vec![make_rule_result("R1", "v1", "sub1")], + ..Default::default() + }; + + let config = TypologyConfiguration { + expression: Some(make_expression( + vec![("R1", "v1"), ("R2", "v1")], + Operator::Add, + )), + rules: vec![make_rule_value("R1", "v1", "sub1", 10.0)], + ..Default::default() + }; + + let result = evaluate_expression(&mut typology_result, &config).unwrap(); + assert_eq!(result, 0.0); + } + + #[test] + fn test_missing_weight_defaults_to_zero() { + let mut typology_result = TypologyResult { + rule_results: vec![make_rule_result("R1", "v1", "subX")], // sub_ref doesn't match + ..Default::default() + }; + + let config = TypologyConfiguration { + expression: Some(make_expression(vec![("R1", "v1")], Operator::Add)), + rules: vec![make_rule_value("R1", "v1", "sub1", 10.0)], // different ref + ..Default::default() + }; + + let result = evaluate_expression(&mut typology_result, &config).unwrap(); + assert_eq!(result, 0.0); + } +} diff --git a/crates/typologies/src/state.rs b/crates/typologies/src/state.rs new file mode 100644 index 0000000..23e08d9 --- /dev/null +++ b/crates/typologies/src/state.rs @@ -0,0 +1,55 @@ +use std::sync::Arc; + +use async_nats::jetstream::Context; +use moka::future::Cache; +use tokio::sync::RwLock; +use tonic::transport::Endpoint; +use tracing::error; +use warden_core::configuration::typology::{ + TypologyConfiguration, TypologyConfigurationRequest, + query_typologies_client::QueryTypologiesClient, +}; +use warden_stack::{Configuration, cache::RedisManager}; + +use crate::cnfg::LocalConfig; +use warden_middleware::grpc::interceptor::{Intercepted, MyInterceptor}; + +#[derive(Clone)] +pub struct Services { + pub jetstream: Context, + pub cache: RedisManager, +} + +pub type AppHandle = Arc<AppState>; + +#[derive(Clone)] +pub struct AppState { + pub services: Services, + pub local_cache: Arc<RwLock<Cache<TypologyConfigurationRequest, TypologyConfiguration>>>, + pub config: LocalConfig, + pub query_typology_client: QueryTypologiesClient<Intercepted>, +} + +impl AppState { + pub async fn new(services: Services, configuration: Configuration) -> anyhow::Result<Self> { + let config: LocalConfig = serde_json::from_value(configuration.misc.clone())?; + let channel = Endpoint::new(config.config_endpoint.to_string())? + .connect() + .await + .inspect_err(|e| { + error!( + endpoint = ?config.config_endpoint, + "could not connect to config service: {e}", + ) + })?; + + let query_typology_client = QueryTypologiesClient::with_interceptor(channel, MyInterceptor); + + Ok(Self { + services, + config, + local_cache: Arc::new(RwLock::new(Cache::builder().build())), + query_typology_client, + }) + } +} diff --git a/crates/typologies/typologies.toml b/crates/typologies/typologies.toml new file mode 100644 index 0000000..53dfa84 --- /dev/null +++ b/crates/typologies/typologies.toml @@ -0,0 +1,32 @@ +[application] +env = "development" + +[monitoring] +log-level = "warden_typologies=trace,info" +opentelemetry-endpoint = "http://localhost:4317" +loki-endpoint = "http://localhost:3100" + +[misc] +config-endpoint = "http://localhost:1304" + +[misc.nats] +stream-name = "typology" +subjects = ["typology-rule.>"] +destination-prefix = "tadp" +max-messages = 10000 +durable-name = "typologies" + +[misc.nats.config] +stream = "configuration" +reload-subject = "configuration.reload" + +[nats] +hosts = ["nats://localhost:4222"] + +[cache] +dsn = "redis://localhost:6379" +pooled = true +type = "non-clustered" # clustered, non-clustered or sentinel +max-connections = 100 + +# vim:ft=toml diff --git a/crates/warden/.env.example b/crates/warden/.env.example new file mode 100644 index 0000000..6caf00a --- /dev/null +++ b/crates/warden/.env.example @@ -0,0 +1 @@ +DATABASE_URL=postgres://postgres:password@localhost:5432/transaction_history diff --git a/crates/warden/src/main.rs b/crates/warden/src/main.rs index 9e33700..90fedfd 100644 --- a/crates/warden/src/main.rs +++ b/crates/warden/src/main.rs @@ -7,7 +7,7 @@ mod version; use std::net::{Ipv6Addr, SocketAddr}; use clap::{Parser, command}; -use tracing::{error, info}; +use tracing::{error, info, trace}; use warden_stack::{Configuration, Services, tracing::Tracing}; use crate::state::AppState; @@ -79,6 +79,12 @@ async fn main() -> Result<(), error::AppError> { let state = AppState::create(services, &config).await?; + trace!("running migrations"); + sqlx::migrate!("./migrations") + .run(&state.services.postgres) + .await?; + trace!("migrations updated"); + let addr = SocketAddr::from((Ipv6Addr::UNSPECIFIED, config.application.port)); let listener = tokio::net::TcpListener::bind(addr).await?; |