aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.github/codecov.yml9
-rw-r--r--.github/renovate.json6
-rw-r--r--.github/workflows/check.yaml62
-rw-r--r--.github/workflows/ci.yaml191
-rw-r--r--Cargo.lock29
-rw-r--r--contrib/bruno/configuration/health-check.bru19
-rw-r--r--contrib/bruno/transaction-monitoring/folder.bru2
-rw-r--r--crates/aggregator/.env.example1
-rw-r--r--crates/aggregator/src/main.rs8
-rw-r--r--crates/configuration/.env.example1
-rw-r--r--crates/configuration/src/main.rs8
-rw-r--r--crates/pseudonyms/.env.example1
-rw-r--r--crates/typologies/.dockerignore5
-rw-r--r--crates/typologies/Cargo.toml35
-rw-r--r--crates/typologies/Dockerfile26
-rw-r--r--crates/typologies/src/cnfg.rs27
-rw-r--r--crates/typologies/src/main.rs66
-rw-r--r--crates/typologies/src/processor.rs124
-rw-r--r--crates/typologies/src/processor/driver.rs40
-rw-r--r--crates/typologies/src/processor/publish.rs44
-rw-r--r--crates/typologies/src/processor/reload.rs71
-rw-r--r--crates/typologies/src/processor/typology.rs180
-rw-r--r--crates/typologies/src/processor/typology/aggregate_rules.rs202
-rw-r--r--crates/typologies/src/processor/typology/evaluate_expression.rs171
-rw-r--r--crates/typologies/src/state.rs55
-rw-r--r--crates/typologies/typologies.toml32
-rw-r--r--crates/warden/.env.example1
-rw-r--r--crates/warden/src/main.rs8
28 files changed, 1351 insertions, 73 deletions
diff --git a/.github/codecov.yml b/.github/codecov.yml
new file mode 100644
index 0000000..dc4c65c
--- /dev/null
+++ b/.github/codecov.yml
@@ -0,0 +1,9 @@
+# doc: https://docs.codecov.com/docs/codecovyml-reference
+coverage:
+ # range: 85..100
+ round: down
+ precision: 1
+ status:
+ project:
+ default:
+ threshold: 1%
diff --git a/.github/renovate.json b/.github/renovate.json
index 0568be1..f4a0325 100644
--- a/.github/renovate.json
+++ b/.github/renovate.json
@@ -10,6 +10,12 @@
"matchUpdateTypes": ["minor", "patch"],
"matchCurrentVersion": "!/^0/",
"automerge": true
+ },
+ {
+ "matchManagers": ["github-actions"],
+ "matchPackageNames": ["actions/setup-node"],
+ "automerge": true,
+ "automergeType": "branch"
}
]
}
diff --git a/.github/workflows/check.yaml b/.github/workflows/check.yaml
deleted file mode 100644
index 7bf14b2..0000000
--- a/.github/workflows/check.yaml
+++ /dev/null
@@ -1,62 +0,0 @@
-permissions:
- contents: read
-on:
- push:
- branches: [master]
- pull_request:
-
-concurrency:
- group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
- cancel-in-progress: true
-name: check
-jobs:
- dockerfile:
- runs-on: ubuntu-latest
- strategy:
- fail-fast: true
- matrix:
- crate:
- - pseudonyms
- - warden
- - configuration
- - router
- - rule-executor
- - aggregator
- name: dockerfile / ${{ matrix.crate }}
- steps:
- - uses: actions/checkout@v5
- with:
- submodules: true
- - name: set up docker buildx
- uses: docker/setup-buildx-action@v3
- - name: build # and push
- uses: docker/build-push-action@v6
- with:
- push: false
- context: .
- file: crates/${{ matrix.crate }}/Dockerfile
- tags: warden/${{ matrix.crate }}:latest
- cache-from: type=gha
- cache-to: type=gha,mode=max
- msrv:
- runs-on: ubuntu-latest
- strategy:
- matrix:
- msrv: ["1.89.0"]
- name: msrv / ${{ matrix.msrv }}
- steps:
- - uses: actions/checkout@v5
- with:
- submodules: true
- - name: Install ${{ matrix.msrv }}
- uses: dtolnay/rust-toolchain@master
- with:
- toolchain: ${{ matrix.msrv }}
- - name: install protoc
- uses: arduino/setup-protoc@v3
- with:
- repo-token: ${{ secrets.GITHUB_TOKEN }}
- - name: cargo install cargo-hack
- uses: taiki-e/install-action@cargo-hack
- - name: cargo hack +${{ matrix.msrv }}
- run: cargo hack --clean-per-run --feature-powerset check
diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml
new file mode 100644
index 0000000..56cfdda
--- /dev/null
+++ b/.github/workflows/ci.yaml
@@ -0,0 +1,191 @@
+permissions:
+ contents: read
+on:
+ push:
+ branches: [master]
+ pull_request:
+
+concurrency:
+ group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
+ cancel-in-progress: true
+
+name: ci
+
+jobs:
+ os-check:
+ runs-on: ${{ matrix.os }}
+ name: ${{ matrix.os }} / stable
+ strategy:
+ fail-fast: false
+ matrix:
+ os: [macos-latest, windows-latest]
+ steps:
+ - uses: actions/checkout@v5
+ with:
+ submodules: true
+ - name: Install stable
+ uses: dtolnay/rust-toolchain@stable
+ - name: Install protoc
+ uses: arduino/setup-protoc@v3
+ with:
+ repo-token: ${{ secrets.GITHUB_TOKEN }}
+ - name: Install cargo-nextest
+ uses: taiki-e/install-action@v2
+ with:
+ tool: cargo-nextest
+ - uses: Swatinem/rust-cache@v2
+ - name: cargo test --workspace
+ run: cargo nextest run --no-run --workspace --locked --all-features --all-targets
+ - name: cargo build
+ run: cargo build --workspace --locked --all-features --all-targets
+
+ dockerfile:
+ runs-on: ubuntu-latest
+ strategy:
+ fail-fast: true
+ matrix:
+ crate:
+ - pseudonyms
+ - warden
+ - configuration
+ - router
+ - rule-executor
+ - aggregator
+ - typologies
+ name: dockerfile / ${{ matrix.crate }}
+ steps:
+ - uses: actions/checkout@v5
+ with:
+ submodules: true
+ - name: Set up docker buildx
+ uses: docker/setup-buildx-action@v3
+ - name: Build # and push
+ uses: docker/build-push-action@v6
+ with:
+ push: false
+ context: .
+ file: crates/${{ matrix.crate }}/Dockerfile
+ tags: warden/${{ matrix.crate }}:latest
+ cache-from: type=gha
+ cache-to: type=gha,mode=max
+ outputs: type=docker,dest=/tmp/${{ matrix.crate }}.tar
+ - name: Upload artifact
+ uses: actions/upload-artifact@v4
+ with:
+ name: warden-${{ matrix.crate }}
+ path: /tmp/${{ matrix.crate }}.tar
+
+ msrv:
+ runs-on: ubuntu-latest
+ strategy:
+ matrix:
+ msrv: ["1.89.0"]
+ name: msrv / ${{ matrix.msrv }}
+ steps:
+ - uses: actions/checkout@v5
+ with:
+ submodules: true
+ - name: Install ${{ matrix.msrv }}
+ uses: dtolnay/rust-toolchain@master
+ with:
+ toolchain: ${{ matrix.msrv }}
+ - name: Install protoc
+ uses: arduino/setup-protoc@v3
+ with:
+ repo-token: ${{ secrets.GITHUB_TOKEN }}
+ - name: cargo install cargo-hack
+ uses: taiki-e/install-action@cargo-hack
+ - name: cargo hack +${{ matrix.msrv }}
+ run: cargo hack --clean-per-run --feature-powerset check
+
+ test:
+ runs-on: ubuntu-latest
+ needs:
+ - dockerfile
+ - os-check
+ name: test / workspace
+ steps:
+ - uses: actions/checkout@v5
+ with:
+ submodules: true
+ - name: Start stack
+ run: docker compose -f contrib/docker-compose/compose.yaml up -d
+ - name: Setup node.js
+ uses: actions/setup-node@v4
+ with:
+ node-version: '22'
+ - name: Download artifacts
+ uses: actions/download-artifact@v5
+ with:
+ path: warden-images
+ pattern: warden-*
+ merge-multiple: true
+ - name: Load docker images
+ run: |
+ dir="warden-images"
+ for file in "$dir"/*.tar; do
+ if [ -f "$file" ]; then
+ echo "Loading tar: $file"
+ image_name=$(docker load --input "$file" | awk '/Loaded image:/ {print $3}')
+ echo "Running image: $image_name"
+ docker run --rm -d --network host "$image_name"
+ fi
+ done
+ echo "images loaded"
+ docker image ls -a
+ - name: Install bruno cli
+ run: npm install -g @usebruno/cli
+ - name: Run HTTP-api tests
+ run: |
+ cd contrib/bruno
+ bru run configuration/health-check.bru \
+ configuration/routing/02-post-routing.bru \
+ configuration/rule/01-create.bru \
+ configuration/typology/01-create.bru \
+ --env warden --reporter-html results.html
+ - name: Upload test results
+ uses: actions/upload-artifact@v4
+ with:
+ name: test-results
+ path: contrib/bruno/results.html
+ - name: Install stable
+ uses: dtolnay/rust-toolchain@stable
+ with:
+ components: llvm-tools-preview
+ - uses: taiki-e/install-action@cargo-llvm-cov
+ - name: Install cargo-nextest
+ uses: taiki-e/install-action@v2
+ with:
+ tool: cargo-nextest
+ - uses: Swatinem/rust-cache@v2
+ - name: Install protoc
+ uses: arduino/setup-protoc@v3
+ with:
+ repo-token: ${{ secrets.GITHUB_TOKEN }}
+ - name: Prepare environment for tests
+ run: |
+ for processor in warden pseudonyms aggregator configuration; do
+ cp crates/$processor/.env.example crates/$processor/.env
+ done
+ # - name: Collect coverage data
+ # # Generate separate reports for nextest and doctests, and combine them.
+ # run: |
+ # cargo llvm-cov nextest --workspace --locked --all-features --lcov --output-path lcov.info
+ # # switch to nightly
+ # # cargo llvm-cov --no-report nextest
+ # # cargo llvm-cov --no-report --doc
+ # # cargo llvm-cov report --doctests --lcov --output-path lcov.info
+ - name: cargo llvm-cov
+ run: cargo llvm-cov nextest --workspace --locked --all-features --lcov --output-path lcov.info
+ - name: Upload to codecov.io
+ uses: codecov/codecov-action@v5
+ with:
+ fail_ci_if_error: true
+ token: ${{ secrets.CODECOV_TOKEN }}
+ slug: ${{ github.repository }}
+ - name: Stop stack
+ if: always()
+ run: |
+ docker compose -f contrib/docker-compose/compose.yaml down -v
+ docker stop $(docker ps -aq) || true
+ docker rm $(docker ps -aq) || true
diff --git a/Cargo.lock b/Cargo.lock
index 46a6261..877acdf 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3717,10 +3717,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1dccffe3ce07af9386bfd29e80c0ab1a8205a2fc34e4bcd40364df902cfa8f3f"
[[package]]
-name = "typologies"
-version = "0.1.0"
-
-[[package]]
name = "unicase"
version = "2.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -4129,6 +4125,31 @@ dependencies = [
]
[[package]]
+name = "warden-typologies"
+version = "0.1.0"
+dependencies = [
+ "anyhow",
+ "async-nats",
+ "clap",
+ "config",
+ "futures-util",
+ "moka",
+ "opentelemetry",
+ "opentelemetry-semantic-conventions",
+ "prost 0.14.1",
+ "serde",
+ "serde_json",
+ "tokio",
+ "tonic 0.14.1",
+ "tracing",
+ "tracing-opentelemetry",
+ "uuid",
+ "warden-core",
+ "warden-middleware",
+ "warden-stack",
+]
+
+[[package]]
name = "wasi"
version = "0.11.1+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/contrib/bruno/configuration/health-check.bru b/contrib/bruno/configuration/health-check.bru
new file mode 100644
index 0000000..62f66a9
--- /dev/null
+++ b/contrib/bruno/configuration/health-check.bru
@@ -0,0 +1,19 @@
+meta {
+ name: health-check
+ type: http
+ seq: 4
+}
+
+get {
+ url: {{WARDEN_CFG_HOST}}
+ body: none
+ auth: inherit
+}
+
+assert {
+ res.status: eq 200
+}
+
+settings {
+ encodeUrl: true
+}
diff --git a/contrib/bruno/transaction-monitoring/folder.bru b/contrib/bruno/transaction-monitoring/folder.bru
index ca6f740..030714b 100644
--- a/contrib/bruno/transaction-monitoring/folder.bru
+++ b/contrib/bruno/transaction-monitoring/folder.bru
@@ -1,6 +1,6 @@
meta {
name: transaction-monitoring
- seq: 2
+ seq: 3
}
auth {
diff --git a/crates/aggregator/.env.example b/crates/aggregator/.env.example
new file mode 100644
index 0000000..5178bd5
--- /dev/null
+++ b/crates/aggregator/.env.example
@@ -0,0 +1 @@
+DATABASE_URL=postgres://postgres:password@localhost:5432/evaluations
diff --git a/crates/aggregator/src/main.rs b/crates/aggregator/src/main.rs
index 62af544..7bb5a79 100644
--- a/crates/aggregator/src/main.rs
+++ b/crates/aggregator/src/main.rs
@@ -4,7 +4,7 @@ mod state;
use anyhow::Result;
use clap::Parser;
-use tracing::error;
+use tracing::{error, trace};
use warden_stack::{Configuration, Services, tracing::Tracing};
use crate::state::AppState;
@@ -78,6 +78,12 @@ async fn main() -> Result<()> {
let state = AppState::create(services, &config).await?;
+ trace!("running migrations");
+ sqlx::migrate!("./migrations")
+ .run(&state.services.postgres)
+ .await?;
+ trace!("migrations updated");
+
processor::serve(state, provider).await?;
Ok(())
diff --git a/crates/configuration/.env.example b/crates/configuration/.env.example
new file mode 100644
index 0000000..9581956
--- /dev/null
+++ b/crates/configuration/.env.example
@@ -0,0 +1 @@
+DATABASE_URL=postgres://postgres:password@localhost:5432/configuration
diff --git a/crates/configuration/src/main.rs b/crates/configuration/src/main.rs
index 7dc8da6..6f90b69 100644
--- a/crates/configuration/src/main.rs
+++ b/crates/configuration/src/main.rs
@@ -8,7 +8,7 @@ use crate::{server::error::AppError, state::AppState};
use axum::http::header::CONTENT_TYPE;
use clap::Parser;
use tower::{make::Shared, steer::Steer};
-use tracing::{error, info};
+use tracing::{error, info, trace};
use warden_stack::{Configuration, Services, tracing::Tracing};
/// warden-config
@@ -82,6 +82,12 @@ async fn main() -> Result<(), AppError> {
)
.await?;
+ trace!("running migrations");
+ sqlx::migrate!("./migrations")
+ .run(&state.services.postgres)
+ .await?;
+ trace!("migrations updated");
+
let (app, grpc_server) = server::serve(state)?;
let service = Steer::new(
diff --git a/crates/pseudonyms/.env.example b/crates/pseudonyms/.env.example
new file mode 100644
index 0000000..3728d42
--- /dev/null
+++ b/crates/pseudonyms/.env.example
@@ -0,0 +1 @@
+DATABASE_URL=postgres://postgres:password@localhost:5432/pseudonyms
diff --git a/crates/typologies/.dockerignore b/crates/typologies/.dockerignore
new file mode 100644
index 0000000..c8cd160
--- /dev/null
+++ b/crates/typologies/.dockerignore
@@ -0,0 +1,5 @@
+/target
+.env
+.git
+.github
+/contrib
diff --git a/crates/typologies/Cargo.toml b/crates/typologies/Cargo.toml
index 207e671..38a8cb4 100644
--- a/crates/typologies/Cargo.toml
+++ b/crates/typologies/Cargo.toml
@@ -1,5 +1,5 @@
[package]
-name = "typologies"
+name = "warden-typologies"
version = "0.1.0"
edition = "2024"
license.workspace = true
@@ -8,3 +8,36 @@ documentation.workspace = true
description.workspace = true
[dependencies]
+anyhow.workspace = true
+async-nats.workspace = true
+clap = { workspace = true, features = ["derive"] }
+config = { workspace = true, features = ["toml"] }
+futures-util = { workspace = true, default-features = false }
+moka = { workspace = true, features = ["future"] }
+opentelemetry.workspace = true
+opentelemetry-semantic-conventions.workspace = true
+prost.workspace = true
+serde = { workspace = true, features = ["derive"] }
+serde_json.workspace = true
+tokio = { workspace = true, features = [
+ "macros",
+ "rt-multi-thread",
+ "signal",
+] }
+tonic.workspace = true
+tracing.workspace = true
+tracing-opentelemetry.workspace = true
+uuid = { workspace = true, features = ["v7"] }
+warden-core = { workspace = true, features = [
+ "message",
+ "configuration",
+ "serde-time",
+] }
+warden-middleware.workspace = true
+warden-stack = { workspace = true, features = [
+ "cache",
+ "nats-jetstream",
+ "opentelemetry",
+ "opentelemetry-tonic",
+ "tracing-loki",
+] }
diff --git a/crates/typologies/Dockerfile b/crates/typologies/Dockerfile
new file mode 100644
index 0000000..61bb2e2
--- /dev/null
+++ b/crates/typologies/Dockerfile
@@ -0,0 +1,26 @@
+FROM rust:1.89.0-slim AS builder
+
+ENV SQLX_OFFLINE=true
+
+RUN rustup target add x86_64-unknown-linux-musl
+RUN apt update && apt install -y musl-tools musl-dev protobuf-compiler curl
+RUN update-ca-certificates
+
+WORKDIR /usr/src/app
+
+RUN mkdir -p crates
+
+COPY ./crates/typologies crates/typologies
+COPY ./lib lib
+COPY ./Cargo.toml .
+COPY ./Cargo.lock .
+
+RUN cargo fetch
+
+COPY ./proto proto
+
+RUN cargo build --target x86_64-unknown-linux-musl --release
+
+FROM scratch
+COPY --from=builder /usr/src/app/target/x86_64-unknown-linux-musl/release/warden-typologies ./
+CMD [ "./warden-typologies" ]
diff --git a/crates/typologies/src/cnfg.rs b/crates/typologies/src/cnfg.rs
new file mode 100644
index 0000000..6086f46
--- /dev/null
+++ b/crates/typologies/src/cnfg.rs
@@ -0,0 +1,27 @@
+use std::sync::Arc;
+
+use serde::Deserialize;
+
+#[derive(Deserialize, Clone)]
+#[serde(rename_all = "kebab-case")]
+pub struct LocalConfig {
+ pub config_endpoint: Arc<str>,
+ pub nats: Nats,
+}
+
+#[derive(Deserialize, Clone)]
+#[serde(rename_all = "kebab-case")]
+pub struct Nats {
+ pub subjects: Arc<[String]>,
+ pub destination_prefix: Arc<str>,
+ pub max_messages: i64,
+ pub durable_name: Arc<str>,
+ pub config: ConfigNats,
+}
+
+#[derive(Deserialize, Clone)]
+#[serde(rename_all = "kebab-case")]
+pub struct ConfigNats {
+ pub stream: Arc<str>,
+ pub reload_subject: Arc<str>,
+}
diff --git a/crates/typologies/src/main.rs b/crates/typologies/src/main.rs
index e7a11a9..ea7843a 100644
--- a/crates/typologies/src/main.rs
+++ b/crates/typologies/src/main.rs
@@ -1,3 +1,65 @@
-fn main() {
- println!("Hello, world!");
+mod cnfg;
+mod processor;
+mod state;
+
+use anyhow::Result;
+use clap::Parser;
+use tracing::error;
+use warden_stack::{Configuration, Services, tracing::Tracing};
+
+/// typologies
+#[derive(Parser, Debug)]
+#[command(version, about, long_about = None)]
+struct Args {
+ /// Path to config file
+ #[arg(short, long)]
+ config_file: Option<std::path::PathBuf>,
+}
+
+#[tokio::main]
+async fn main() -> Result<()> {
+ let args = Args::parse();
+ let config = include_str!("../typologies.toml");
+
+ let mut config = config::Config::builder()
+ .add_source(config::File::from_str(config, config::FileFormat::Toml));
+
+ if let Some(cf) = args.config_file.as_ref().and_then(|v| v.to_str()) {
+ config = config.add_source(config::File::new(cf, config::FileFormat::Toml));
+ };
+
+ let mut config: Configuration = config.build()?.try_deserialize()?;
+ config.application.name = env!("CARGO_CRATE_NAME").into();
+ config.application.version = env!("CARGO_PKG_VERSION").into();
+
+ let tracing = Tracing::builder()
+ .opentelemetry(&config.application, &config.monitoring)?
+ .loki(&config.application, &config.monitoring)?
+ .build(&config.monitoring);
+
+ let provider = tracing.otel_provider;
+
+ tokio::spawn(tracing.loki_task);
+
+ let mut services = Services::builder()
+ .nats_jetstream(&config.nats)
+ .await
+ .inspect_err(|e| error!("nats: {e}"))?
+ .build();
+
+ let jetstream = services
+ .jetstream
+ .take()
+ .ok_or_else(|| anyhow::anyhow!("jetstream is not ready"))?;
+
+ let cache = services
+ .cache
+ .take()
+ .ok_or_else(|| anyhow::anyhow!("cache is not ready"))?;
+
+ let services = state::Services { jetstream, cache };
+
+ processor::serve(services, config, provider)
+ .await
+ .inspect_err(|e| error!("{e}"))
}
diff --git a/crates/typologies/src/processor.rs b/crates/typologies/src/processor.rs
new file mode 100644
index 0000000..a2a3b17
--- /dev/null
+++ b/crates/typologies/src/processor.rs
@@ -0,0 +1,124 @@
+mod driver;
+mod publish;
+mod reload;
+mod typology;
+
+use std::sync::Arc;
+
+use anyhow::Result;
+use async_nats::jetstream::{
+ Context,
+ consumer::{Consumer, pull},
+};
+use futures_util::{StreamExt, future};
+use tokio::signal;
+use tracing::{error, trace, warn};
+use warden_stack::{Configuration, tracing::SdkTracerProvider};
+
+use crate::{
+ cnfg::Nats,
+ state::{AppHandle, AppState, Services},
+};
+
+pub async fn serve(
+ services: Services,
+ config: Configuration,
+ provider: SdkTracerProvider,
+) -> anyhow::Result<()> {
+ let state = Arc::new(AppState::new(services, config).await?);
+
+ tokio::select! {
+ _ = futures_util::future::try_join(reload::reload(Arc::clone(&state)), run(state)) => {}
+ _ = shutdown_signal(provider) => {}
+ };
+
+ Ok(())
+}
+
+async fn run(state: AppHandle) -> anyhow::Result<()> {
+ let consumer = get_or_create_stream(&state.services.jetstream, &state.config.nats).await?;
+
+ let limit = None;
+
+ consumer
+ .messages()
+ .await?
+ .for_each_concurrent(limit, |message| {
+ let state = Arc::clone(&state);
+ tokio::spawn(async move {
+ match message {
+ Ok(message) => {
+ if let Err(e) = typology::process_typology(message, state).await {
+ error!("{e:?}");
+ }
+ }
+ Err(e) => {
+ warn!("{e:?}");
+ }
+ }
+ });
+ future::ready(())
+ })
+ .await;
+
+ Ok(())
+}
+
+async fn get_or_create_stream(
+ jetstream: &Context,
+ nats: &Nats,
+) -> anyhow::Result<Consumer<pull::Config>> {
+ let name = format!("{}.v{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"))
+ .replace(".", "")
+ .replace("_", "");
+ trace!(name = name, subjects = ?nats.subjects, "getting or creating stream");
+
+ let stream = jetstream
+ .get_or_create_stream(async_nats::jetstream::stream::Config {
+ name,
+ subjects: nats.subjects.iter().map(Into::into).collect(),
+ max_messages: nats.max_messages,
+ ..Default::default()
+ })
+ .await?;
+ let durable = nats.durable_name.to_string();
+ // Get or create a pull-based consumer
+ Ok(stream
+ .get_or_create_consumer(
+ durable.as_ref(),
+ async_nats::jetstream::consumer::pull::Config {
+ durable_name: Some(durable.to_string()),
+ ..Default::default()
+ },
+ )
+ .await?)
+}
+
+async fn shutdown_signal(provider: SdkTracerProvider) -> Result<()> {
+ let ctrl_c = async {
+ signal::ctrl_c()
+ .await
+ .expect("failed to install Ctrl+C handler");
+ };
+
+ #[cfg(unix)]
+ let terminate = async {
+ signal::unix::signal(signal::unix::SignalKind::terminate())
+ .expect("failed to install signal handler")
+ .recv()
+ .await;
+ };
+
+ #[cfg(not(unix))]
+ let terminate = std::future::pending::<()>();
+
+ tokio::select! {
+ _ = ctrl_c => {
+ },
+ _ = terminate => {
+ },
+ }
+ let _ = provider.shutdown();
+
+ Ok(())
+}
diff --git a/crates/typologies/src/processor/driver.rs b/crates/typologies/src/processor/driver.rs
new file mode 100644
index 0000000..d150620
--- /dev/null
+++ b/crates/typologies/src/processor/driver.rs
@@ -0,0 +1,40 @@
+use tonic::IntoRequest;
+use warden_core::configuration::typology::{TypologyConfiguration, TypologyConfigurationRequest};
+
+use crate::state::AppHandle;
+
+pub trait GetTypologyConfiguration {
+ fn get_typology_config(
+ &self,
+ typology_key: TypologyConfigurationRequest,
+ ) -> impl std::future::Future<Output = anyhow::Result<TypologyConfiguration>> + Send;
+}
+
+impl GetTypologyConfiguration for AppHandle {
+ async fn get_typology_config(
+ &self,
+ typology_key: TypologyConfigurationRequest,
+ ) -> anyhow::Result<TypologyConfiguration> {
+ {
+ let local_cache = self.local_cache.read().await;
+ if let Some(result) = local_cache.get(&typology_key).await.map(Ok) {
+ return result;
+ }
+ }
+
+ let local_cache = self.local_cache.write().await;
+ let mut client = self.query_typology_client.clone();
+
+ let value = client
+ .get_typology_configuration(typology_key.clone().into_request())
+ .await?
+ .into_inner()
+ .configuration
+ .ok_or_else(|| anyhow::anyhow!("configuration unavailable"))?;
+ local_cache
+ .insert(typology_key.clone(), value.clone())
+ .await;
+
+ Ok(value)
+ }
+}
diff --git a/crates/typologies/src/processor/publish.rs b/crates/typologies/src/processor/publish.rs
new file mode 100644
index 0000000..b031bf3
--- /dev/null
+++ b/crates/typologies/src/processor/publish.rs
@@ -0,0 +1,44 @@
+use opentelemetry::global;
+use opentelemetry_semantic_conventions::attribute;
+use tracing::{Instrument, Span, debug, info_span, warn};
+use tracing_opentelemetry::OpenTelemetrySpanExt;
+use warden_core::message::Payload;
+use warden_stack::tracing::telemetry::nats::injector;
+
+use crate::state::AppHandle;
+
+pub(crate) async fn to_tadp(
+ subject: &str,
+ state: AppHandle,
+ payload: Payload,
+) -> anyhow::Result<()> {
+ // send transaction to next with nats
+ let subject = format!("{}.{}", state.config.nats.destination_prefix, subject);
+ debug!(subject = ?subject, "publishing");
+
+ let payload = prost::Message::encode_to_vec(&payload);
+
+ let mut headers = async_nats::HeaderMap::new();
+
+ let cx = Span::current().context();
+
+ global::get_text_map_propagator(|propagator| {
+ propagator.inject_context(&cx, &mut injector::HeaderMap(&mut headers))
+ });
+
+ let span = info_span!("nats.publish");
+ span.set_attribute(
+ attribute::MESSAGING_DESTINATION_SUBSCRIPTION_NAME,
+ subject.to_string(),
+ );
+ span.set_attribute(attribute::MESSAGING_SYSTEM, "nats");
+ state
+ .services
+ .jetstream
+ .publish_with_headers(subject.clone(), headers, payload.into())
+ .instrument(span)
+ .await
+ .inspect_err(|e| warn!(subject = ?subject, "failed to publish: {e}"))?;
+
+ Ok(())
+}
diff --git a/crates/typologies/src/processor/reload.rs b/crates/typologies/src/processor/reload.rs
new file mode 100644
index 0000000..fac4c40
--- /dev/null
+++ b/crates/typologies/src/processor/reload.rs
@@ -0,0 +1,71 @@
+use async_nats::jetstream::consumer;
+use futures_util::StreamExt;
+use prost::Message as _;
+use tracing::{error, info, trace};
+use uuid::Uuid;
+use warden_core::configuration::{ConfigKind, ReloadEvent, typology::TypologyConfigurationRequest};
+
+use crate::state::AppHandle;
+
+pub async fn reload(state: AppHandle) -> anyhow::Result<()> {
+ let id = Uuid::now_v7().to_string();
+ info!(durable = id, "listening for configuration changes");
+
+ let durable = &id;
+ let consumer = state
+ .services
+ .jetstream
+ .get_stream(state.config.nats.config.stream.to_string())
+ .await?
+ .get_or_create_consumer(
+ durable,
+ consumer::pull::Config {
+ durable_name: Some(durable.to_string()),
+ filter_subject: state.config.nats.config.reload_subject.to_string(),
+ deliver_policy: consumer::DeliverPolicy::LastPerSubject,
+ ..Default::default()
+ },
+ )
+ .await?;
+
+ let mut messages = consumer.messages().await?;
+ while let Some(value) = messages.next().await {
+ match value {
+ Ok(message) => {
+ trace!("got reload cache event");
+ if let Ok(res) = ReloadEvent::decode(message.payload.as_ref())
+ && let Ok(kind) = ConfigKind::try_from(res.kind)
+ {
+ match kind {
+ ConfigKind::Typology => {
+ let local_cache = state.local_cache.write().await;
+ let id = res.id();
+ let version = res.version();
+ trace!(
+ id = id,
+ ver = version,
+ "update triggered, invalidating typology config"
+ );
+ let key = TypologyConfigurationRequest {
+ id: id.to_string(),
+ version: version.to_string(),
+ };
+
+ local_cache.invalidate(&key).await;
+ let _ = message.ack().await.inspect_err(|e| error!("{e}"));
+ }
+ _ => {
+ trace!(kind = ?kind, "detected reload event, nothing to do here, acknowledging");
+ let _ = message.ack().await.inspect_err(|e| error!("{e}"));
+ }
+ }
+ }
+ }
+ Err(e) => {
+ error!("{e:?}")
+ }
+ }
+ }
+
+ Ok(())
+}
diff --git a/crates/typologies/src/processor/typology.rs b/crates/typologies/src/processor/typology.rs
new file mode 100644
index 0000000..b1b2592
--- /dev/null
+++ b/crates/typologies/src/processor/typology.rs
@@ -0,0 +1,180 @@
+mod aggregate_rules;
+mod evaluate_expression;
+
+use std::sync::Arc;
+
+use anyhow::Result;
+use opentelemetry::global;
+use prost::Message;
+use tracing::{Instrument, Span, error, info, info_span, instrument, warn};
+use tracing_opentelemetry::OpenTelemetrySpanExt;
+use warden_core::{
+ configuration::{routing::RoutingConfiguration, typology::TypologyConfigurationRequest},
+ message::{Payload, RuleResult, TypologyResult},
+};
+use warden_stack::{redis::AsyncCommands, tracing::telemetry::nats::extractor};
+
+use crate::{
+ processor::{driver::GetTypologyConfiguration as _, publish},
+ state::AppHandle,
+};
+
+#[instrument(skip(message, state), err(Debug))]
+pub async fn process_typology(
+ message: async_nats::jetstream::Message,
+ state: AppHandle,
+) -> Result<()> {
+ let span = Span::current();
+
+ if let Some(ref headers) = message.headers {
+ let context = global::get_text_map_propagator(|propagator| {
+ propagator.extract(&extractor::HeaderMap(headers))
+ });
+ span.set_parent(context);
+ };
+
+ let payload: Payload = Message::decode(message.payload.as_ref())?;
+
+ if payload.transaction.is_none() {
+ warn!("transaction is empty - proceeding with ack");
+ let _ = message.ack().await;
+ return Ok(());
+ }
+
+ let transaction = payload.transaction.as_ref().expect("to have returned");
+
+ match transaction {
+ warden_core::message::payload::Transaction::Pacs008(_) => {
+ warn!("Pacs008 is unsupported on this version: this should be unreachable");
+ }
+ warden_core::message::payload::Transaction::Pacs002(pacs002_document) => {
+ let key = format!(
+ "tp_{}",
+ pacs002_document.f_i_to_f_i_pmt_sts_rpt.grp_hdr.msg_id
+ );
+
+ let rule_result = &payload
+ .rule_result
+ .as_ref()
+ .expect("rule result should be here");
+ let rule_results = cache_and_get_all(&key, rule_result, Arc::clone(&state)).await?;
+
+ let routing = payload
+ .routing
+ .as_ref()
+ .expect("routing missing from payload");
+
+ let (mut typology_result, _rule_count) =
+ aggregate_rules::aggregate_rules(&rule_results, routing, rule_result)?;
+
+ let _ = evaluate_typology(&mut typology_result, routing, payload.clone(), &key, state)
+ .await
+ .inspect_err(|e| error!("{e}"));
+ }
+ };
+
+ let span = info_span!("nats.ack");
+ message
+ .ack()
+ .instrument(span)
+ .await
+ .map_err(|_| anyhow::anyhow!("ack error"))?;
+
+ Ok(())
+}
+
+#[instrument(skip(typology_result, routing, payload, state), err(Debug))]
+async fn evaluate_typology(
+ typology_result: &mut [TypologyResult],
+ routing: &RoutingConfiguration,
+ mut payload: Payload,
+ key: &str,
+ state: AppHandle,
+) -> Result<()> {
+ for typology_result in typology_result.iter_mut() {
+ let handle = Arc::clone(&state);
+ let routing_rules = routing.messages[0].typologies.iter().find(|typology| {
+ typology.version.eq(&typology_result.version) && typology.id.eq(&typology_result.id)
+ });
+ let typology_result_rules = &typology_result.rule_results;
+
+ if routing_rules.is_some()
+ && typology_result_rules.len() < routing_rules.unwrap().rules.len()
+ {
+ continue;
+ }
+
+ let typology_config = handle
+ .get_typology_config(TypologyConfigurationRequest {
+ id: typology_result.id.to_owned(),
+ version: typology_result.version.to_owned(),
+ })
+ .await?;
+
+ let result = evaluate_expression::evaluate_expression(typology_result, &typology_config)?;
+
+ typology_result.result = result;
+
+ let workflow = typology_config
+ .workflow
+ .as_ref()
+ .expect("no workflow in config");
+
+ if workflow.interdiction_threshold.is_some() {
+ typology_result.workflow.replace(*workflow);
+ }
+ typology_result.review = result.ge(&typology_config.workflow.unwrap().alert_threshold);
+
+ payload.typology_result = Some(typology_result.to_owned());
+
+ let is_interdicting = typology_config
+ .workflow
+ .unwrap()
+ .interdiction_threshold
+ .is_some_and(|value| value > 0.0 && result >= value);
+
+ if is_interdicting {
+ typology_result.review = true;
+ }
+
+ if result >= typology_config.workflow.unwrap().alert_threshold {
+ info!("alerting");
+ }
+
+ let subj = handle.config.nats.destination_prefix.to_string();
+ let _ = publish::to_tadp(&subj, handle, payload.clone())
+ .await
+ .inspect_err(|e| error!("{e}"));
+
+ let mut c = state.services.cache.get().await?;
+ c.del::<_, ()>(key).await?;
+ }
+
+ Ok(())
+}
+
+async fn cache_and_get_all(
+ cache_key: &str,
+ rule_result: &RuleResult,
+ state: AppHandle,
+) -> Result<Vec<RuleResult>> {
+ let mut cache = state.services.cache.get().await?;
+
+ let bytes = prost::Message::encode_to_vec(rule_result);
+
+ let res = warden_stack::redis::pipe()
+ .sadd::<_, _>(cache_key, bytes)
+ .ignore()
+ .smembers(cache_key)
+ .query_async::<Vec<Vec<Vec<u8>>>>(&mut cache)
+ .await?;
+
+ let members = res
+ .first()
+ .ok_or_else(|| anyhow::anyhow!("smembers did not return anything"))?;
+
+ members
+ .iter()
+ .map(|value| RuleResult::decode(value.as_ref()).map_err(anyhow::Error::new))
+ .collect()
+}
diff --git a/crates/typologies/src/processor/typology/aggregate_rules.rs b/crates/typologies/src/processor/typology/aggregate_rules.rs
new file mode 100644
index 0000000..8f92dae
--- /dev/null
+++ b/crates/typologies/src/processor/typology/aggregate_rules.rs
@@ -0,0 +1,202 @@
+use anyhow::Result;
+use std::collections::HashSet;
+
+use warden_core::{
+ configuration::routing::RoutingConfiguration,
+ message::{RuleResult, TypologyResult},
+};
+
+pub(super) fn aggregate_rules(
+ rule_results: &[RuleResult],
+ routing: &RoutingConfiguration,
+ rule_result: &RuleResult,
+) -> Result<(Vec<TypologyResult>, usize)> {
+ let mut typology_result: Vec<TypologyResult> = vec![];
+ let mut all_rules_set = HashSet::new();
+
+ routing.messages.iter().for_each(|message| {
+ message.typologies.iter().for_each(|typology| {
+ let mut set = HashSet::new();
+
+ for rule in typology.rules.iter() {
+ set.insert((&rule.id, rule.version()));
+ all_rules_set.insert((&rule.id, rule.version()));
+ }
+
+ if !set.contains(&(&rule_result.id, rule_result.version.as_str())) {
+ return;
+ }
+
+ let rule_results: Vec<_> = rule_results
+ .iter()
+ .filter_map(|value| {
+ if set.contains(&(&value.id, &value.version)) {
+ Some(value.to_owned())
+ } else {
+ None
+ }
+ })
+ .collect();
+
+ if !rule_results.is_empty() {
+ typology_result.push(TypologyResult {
+ id: typology.id.to_owned(),
+ version: typology.version.to_owned(),
+ rule_results,
+ ..Default::default()
+ });
+ }
+ });
+ });
+
+ Ok((typology_result, all_rules_set.len()))
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use warden_core::{
+ configuration::routing::{Message, RoutingConfiguration, Rule, Typology},
+ message::RuleResult,
+ };
+
+ fn create_rule(id: &str, version: &str) -> Rule {
+ Rule {
+ id: id.to_string(),
+ version: Some(version.to_string()),
+ }
+ }
+
+ fn create_rule_result(id: &str, version: &str) -> RuleResult {
+ RuleResult {
+ id: id.to_string(),
+ version: version.to_string(),
+ ..Default::default()
+ }
+ }
+
+ #[test]
+ fn returns_empty_when_no_matching_typology() {
+ let routing = RoutingConfiguration {
+ messages: vec![Message {
+ typologies: vec![Typology {
+ id: "T1".to_string(),
+ version: "v1".to_string(),
+ rules: vec![create_rule("R1", "v1")],
+ }],
+ ..Default::default()
+ }],
+ ..Default::default()
+ };
+
+ let rule_results = vec![create_rule_result("R2", "v1")];
+ let input_rule = create_rule_result("R2", "v1");
+
+ let (result, count) = aggregate_rules(&rule_results, &routing, &input_rule).unwrap();
+ assert!(result.is_empty());
+ assert_eq!(count, 1); // one rule in routing
+ }
+
+ #[test]
+ fn returns_typology_with_matching_rule() {
+ let routing = RoutingConfiguration {
+ messages: vec![Message {
+ typologies: vec![Typology {
+ id: "T1".to_string(),
+ version: "v1".to_string(),
+ rules: vec![create_rule("R1", "v1"), create_rule("R2", "v1")],
+ }],
+ ..Default::default()
+ }],
+ ..Default::default()
+ };
+
+ let rule_results = vec![
+ create_rule_result("R1", "v1"),
+ create_rule_result("R2", "v1"),
+ ];
+
+ let input_rule = create_rule_result("R1", "v1");
+
+ let (result, count) = aggregate_rules(&rule_results, &routing, &input_rule).unwrap();
+
+ assert_eq!(count, 2); // R1, R2
+ assert_eq!(result.len(), 1);
+ assert_eq!(result[0].id, "T1");
+ assert_eq!(result[0].rule_results.len(), 2);
+ }
+
+ #[test]
+ fn ignores_unrelated_rules_in_rule_results() {
+ let routing = RoutingConfiguration {
+ messages: vec![Message {
+ typologies: vec![Typology {
+ id: "T1".to_string(),
+ version: "v1".to_string(),
+ rules: vec![create_rule("R1", "v1")],
+ }],
+ ..Default::default()
+ }],
+ ..Default::default()
+ };
+
+ let rule_results = vec![
+ create_rule_result("R1", "v1"),
+ create_rule_result("R99", "v1"), // unrelated
+ ];
+
+ let input_rule = create_rule_result("R1", "v1");
+
+ let (result, count) = aggregate_rules(&rule_results, &routing, &input_rule).unwrap();
+
+ assert_eq!(count, 1);
+ assert_eq!(result.len(), 1);
+ assert_eq!(result[0].rule_results.len(), 1);
+ assert_eq!(result[0].rule_results[0].id, "R1");
+ }
+
+ #[test]
+ fn handles_multiple_messages_and_typologies() {
+ let routing = RoutingConfiguration {
+ messages: vec![
+ Message {
+ typologies: vec![
+ Typology {
+ id: "T1".to_string(),
+ version: "v1".to_string(),
+ rules: vec![create_rule("R1", "v1")],
+ },
+ Typology {
+ id: "T2".to_string(),
+ version: "v1".to_string(),
+ rules: vec![create_rule("R2", "v1")],
+ },
+ ],
+ ..Default::default()
+ },
+ Message {
+ typologies: vec![Typology {
+ id: "T3".to_string(),
+ version: "v1".to_string(),
+ rules: vec![create_rule("R1", "v1"), create_rule("R2", "v1")],
+ }],
+ ..Default::default()
+ },
+ ],
+ ..Default::default()
+ };
+
+ let rule_results = vec![
+ create_rule_result("R1", "v1"),
+ create_rule_result("R2", "v1"),
+ ];
+ let input_rule = create_rule_result("R1", "v1");
+
+ let (result, count) = aggregate_rules(&rule_results, &routing, &input_rule).unwrap();
+
+ assert_eq!(count, 2); // R1, R2 appear in multiple typologies, but unique rules are 2
+ assert_eq!(result.len(), 2); // T1 (R1) and T3 (R1 & R2)
+ assert_eq!(result[0].id, "T1");
+ assert_eq!(result[1].id, "T3");
+ }
+}
diff --git a/crates/typologies/src/processor/typology/evaluate_expression.rs b/crates/typologies/src/processor/typology/evaluate_expression.rs
new file mode 100644
index 0000000..844011e
--- /dev/null
+++ b/crates/typologies/src/processor/typology/evaluate_expression.rs
@@ -0,0 +1,171 @@
+use anyhow::Result;
+use tracing::warn;
+use warden_core::{configuration::typology::TypologyConfiguration, message::TypologyResult};
+
+pub(super) fn evaluate_expression(
+ typology_result: &mut TypologyResult,
+ typology_config: &TypologyConfiguration,
+) -> Result<f64> {
+ let mut to_return = 0.0;
+ let expression = typology_config
+ .expression
+ .as_ref()
+ .expect("expression is missing");
+
+ let rule_values = &typology_config.rules;
+
+ for rule in expression.terms.iter() {
+ let rule_result = typology_result
+ .rule_results
+ .iter()
+ .find(|value| value.id.eq(&rule.id) && value.version.eq(&rule.version));
+
+ if rule_result.is_none() {
+ warn!(term = ?rule, "could not find rule result for typology term");
+ return Ok(Default::default());
+ }
+
+ let rule_result = rule_result.expect("checked and is some");
+
+ let weight = rule_values
+ .iter()
+ .filter_map(|rv| {
+ if !(rv.id.eq(&rule_result.id) && rv.version.eq(&rule_result.version)) {
+ None
+ } else {
+ rv.wghts.iter().find_map(|value| {
+ match value.r#ref.eq(&rule_result.sub_rule_ref) {
+ true => Some(value.wght),
+ false => None,
+ }
+ })
+ }
+ })
+ .next();
+
+ if weight.is_none() {
+ warn!(rule = ?rule, "could not find a weight for the matching rule");
+ }
+ let weight = weight.unwrap_or_default();
+
+ to_return = match expression.operator() {
+ warden_core::configuration::typology::Operator::Add => to_return + weight,
+ warden_core::configuration::typology::Operator::Multiply => to_return * weight,
+ warden_core::configuration::typology::Operator::Subtract => to_return - weight,
+ warden_core::configuration::typology::Operator::Divide => {
+ if weight.ne(&0.0) {
+ to_return / weight
+ } else {
+ to_return
+ }
+ }
+ };
+ }
+ Ok(to_return)
+}
+
+#[cfg(test)]
+mod tests {
+ use warden_core::{
+ configuration::typology::{Expression, Operator, Term, TypologyRule, TypologyRuleWeight},
+ message::RuleResult,
+ };
+
+ use super::*;
+
+ fn make_rule_result(id: &str, version: &str, sub_ref: &str) -> RuleResult {
+ RuleResult {
+ id: id.to_string(),
+ version: version.to_string(),
+ sub_rule_ref: sub_ref.to_string(),
+ ..Default::default()
+ }
+ }
+
+ fn make_rule_value(id: &str, version: &str, ref_name: &str, weight: f64) -> TypologyRule {
+ TypologyRule {
+ id: id.to_string(),
+ version: version.to_string(),
+ wghts: vec![TypologyRuleWeight {
+ r#ref: ref_name.to_string(),
+ wght: weight,
+ }],
+ }
+ }
+
+ fn make_expression(terms: Vec<(&str, &str)>, op: Operator) -> Expression {
+ Expression {
+ terms: terms
+ .into_iter()
+ .map(|(id, version)| Term {
+ id: id.to_string(),
+ version: version.to_string(),
+ })
+ .collect(),
+ operator: op.into(),
+ }
+ }
+
+ #[test]
+ fn test_add_operator_multiple_terms() {
+ let mut typology_result = TypologyResult {
+ rule_results: vec![
+ make_rule_result("R1", "v1", "sub1"),
+ make_rule_result("R2", "v1", "sub2"),
+ ],
+ ..Default::default()
+ };
+
+ let config = TypologyConfiguration {
+ expression: Some(make_expression(
+ vec![("R1", "v1"), ("R2", "v1")],
+ Operator::Add,
+ )),
+ rules: vec![
+ make_rule_value("R1", "v1", "sub1", 10.0),
+ make_rule_value("R2", "v1", "sub2", 5.0),
+ ],
+ ..Default::default()
+ };
+
+ let result = evaluate_expression(&mut typology_result, &config).unwrap();
+ assert_eq!(result, 15.0);
+ }
+
+ #[test]
+ fn test_missing_rule_result_returns_zero() {
+ let mut typology_result = TypologyResult {
+ rule_results: vec![make_rule_result("R1", "v1", "sub1")],
+ ..Default::default()
+ };
+
+ let config = TypologyConfiguration {
+ expression: Some(make_expression(
+ vec![("R1", "v1"), ("R2", "v1")],
+ Operator::Add,
+ )),
+ rules: vec![make_rule_value("R1", "v1", "sub1", 10.0)],
+ ..Default::default()
+ };
+
+ let result = evaluate_expression(&mut typology_result, &config).unwrap();
+ assert_eq!(result, 0.0);
+ }
+
+ #[test]
+ fn test_missing_weight_defaults_to_zero() {
+ let mut typology_result = TypologyResult {
+ rule_results: vec![make_rule_result("R1", "v1", "subX")], // sub_ref doesn't match
+ ..Default::default()
+ };
+
+ let config = TypologyConfiguration {
+ expression: Some(make_expression(vec![("R1", "v1")], Operator::Add)),
+ rules: vec![make_rule_value("R1", "v1", "sub1", 10.0)], // different ref
+ ..Default::default()
+ };
+
+ let result = evaluate_expression(&mut typology_result, &config).unwrap();
+ assert_eq!(result, 0.0);
+ }
+}
diff --git a/crates/typologies/src/state.rs b/crates/typologies/src/state.rs
new file mode 100644
index 0000000..23e08d9
--- /dev/null
+++ b/crates/typologies/src/state.rs
@@ -0,0 +1,55 @@
+use std::sync::Arc;
+
+use async_nats::jetstream::Context;
+use moka::future::Cache;
+use tokio::sync::RwLock;
+use tonic::transport::Endpoint;
+use tracing::error;
+use warden_core::configuration::typology::{
+ TypologyConfiguration, TypologyConfigurationRequest,
+ query_typologies_client::QueryTypologiesClient,
+};
+use warden_stack::{Configuration, cache::RedisManager};
+
+use crate::cnfg::LocalConfig;
+use warden_middleware::grpc::interceptor::{Intercepted, MyInterceptor};
+
+#[derive(Clone)]
+pub struct Services {
+ pub jetstream: Context,
+ pub cache: RedisManager,
+}
+
+pub type AppHandle = Arc<AppState>;
+
+#[derive(Clone)]
+pub struct AppState {
+ pub services: Services,
+ pub local_cache: Arc<RwLock<Cache<TypologyConfigurationRequest, TypologyConfiguration>>>,
+ pub config: LocalConfig,
+ pub query_typology_client: QueryTypologiesClient<Intercepted>,
+}
+
+impl AppState {
+ pub async fn new(services: Services, configuration: Configuration) -> anyhow::Result<Self> {
+ let config: LocalConfig = serde_json::from_value(configuration.misc.clone())?;
+ let channel = Endpoint::new(config.config_endpoint.to_string())?
+ .connect()
+ .await
+ .inspect_err(|e| {
+ error!(
+ endpoint = ?config.config_endpoint,
+ "could not connect to config service: {e}",
+ )
+ })?;
+
+ let query_typology_client = QueryTypologiesClient::with_interceptor(channel, MyInterceptor);
+
+ Ok(Self {
+ services,
+ config,
+ local_cache: Arc::new(RwLock::new(Cache::builder().build())),
+ query_typology_client,
+ })
+ }
+}
diff --git a/crates/typologies/typologies.toml b/crates/typologies/typologies.toml
new file mode 100644
index 0000000..53dfa84
--- /dev/null
+++ b/crates/typologies/typologies.toml
@@ -0,0 +1,32 @@
+[application]
+env = "development"
+
+[monitoring]
+log-level = "warden_typologies=trace,info"
+opentelemetry-endpoint = "http://localhost:4317"
+loki-endpoint = "http://localhost:3100"
+
+[misc]
+config-endpoint = "http://localhost:1304"
+
+[misc.nats]
+stream-name = "typology"
+subjects = ["typology-rule.>"]
+destination-prefix = "tadp"
+max-messages = 10000
+durable-name = "typologies"
+
+[misc.nats.config]
+stream = "configuration"
+reload-subject = "configuration.reload"
+
+[nats]
+hosts = ["nats://localhost:4222"]
+
+[cache]
+dsn = "redis://localhost:6379"
+pooled = true
+type = "non-clustered" # clustered, non-clustered or sentinel
+max-connections = 100
+
+# vim:ft=toml
diff --git a/crates/warden/.env.example b/crates/warden/.env.example
new file mode 100644
index 0000000..6caf00a
--- /dev/null
+++ b/crates/warden/.env.example
@@ -0,0 +1 @@
+DATABASE_URL=postgres://postgres:password@localhost:5432/transaction_history
diff --git a/crates/warden/src/main.rs b/crates/warden/src/main.rs
index 9e33700..90fedfd 100644
--- a/crates/warden/src/main.rs
+++ b/crates/warden/src/main.rs
@@ -7,7 +7,7 @@ mod version;
use std::net::{Ipv6Addr, SocketAddr};
use clap::{Parser, command};
-use tracing::{error, info};
+use tracing::{error, info, trace};
use warden_stack::{Configuration, Services, tracing::Tracing};
use crate::state::AppState;
@@ -79,6 +79,12 @@ async fn main() -> Result<(), error::AppError> {
let state = AppState::create(services, &config).await?;
+ trace!("running migrations");
+ sqlx::migrate!("./migrations")
+ .run(&state.services.postgres)
+ .await?;
+ trace!("migrations updated");
+
let addr = SocketAddr::from((Ipv6Addr::UNSPECIFIED, config.application.port));
let listener = tokio::net::TcpListener::bind(addr).await?;