From 7e08464b2a42048ebb5128bec5731719cf8bd1db Mon Sep 17 00:00:00 2001 From: rtkay123 Date: Sun, 29 Mar 2026 15:06:30 +0200 Subject: feat: create transaction schema --- Cargo.lock | 22 ++++ Cargo.toml | 2 + migrations/20260329120607_functions.sql | 7 ++ migrations/20260329120645_transaction_schema.sql | 18 +++ warden/Cargo.toml | 9 +- warden/src/server/api/mod.rs | 1 + warden/src/server/api/schema.rs | 33 ++++++ warden/src/server/api/transaction.rs | 13 ++- warden/src/server/error/mod.rs | 37 +++++-- .../server/middleware/extractors/transaction.rs | 8 +- warden/src/server/routes/config/mod.rs | 2 + warden/src/server/routes/config/schema/create.rs | 122 +++++++++++++++++++++ warden/src/server/routes/config/schema/delete.rs | 1 + warden/src/server/routes/config/schema/mod.rs | 4 + warden/src/server/routes/config/schema/read.rs | 1 + warden/src/server/routes/config/schema/update.rs | 1 + .../routes/transaction_monitoring/monitor.rs | 13 ++- warden/src/state/mod.rs | 6 +- 18 files changed, 276 insertions(+), 24 deletions(-) create mode 100644 migrations/20260329120607_functions.sql create mode 100644 migrations/20260329120645_transaction_schema.sql create mode 100644 warden/src/server/api/schema.rs create mode 100644 warden/src/server/routes/config/schema/create.rs create mode 100644 warden/src/server/routes/config/schema/delete.rs create mode 100644 warden/src/server/routes/config/schema/mod.rs create mode 100644 warden/src/server/routes/config/schema/read.rs create mode 100644 warden/src/server/routes/config/schema/update.rs diff --git a/Cargo.lock b/Cargo.lock index a2d273d..595583f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -163,6 +163,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b52af3cb4058c895d37317bb27508dccc8e5f2d39454016b297bf4a400597b8" dependencies = [ "axum-core", + "axum-macros", "bytes", "form_urlencoded", "futures-util", @@ -208,6 +209,17 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum-macros" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "604fde5e028fea851ce1d8570bbdc034bec850d157f7569d10f347d06808c05c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "base64" version = "0.22.1" @@ -494,6 +506,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7cd812cc2bc1d69d4764bd80df88b4317eaef9e773c75226407d9bc0876b211c" dependencies = [ "powerfmt", + "serde_core", ] [[package]] @@ -2253,6 +2266,7 @@ dependencies = [ "sha2", "smallvec", "thiserror 2.0.18", + "time", "tokio", "tokio-stream", "tracing", @@ -2290,7 +2304,9 @@ dependencies = [ "serde_json", "sha2", "sqlx-core", + "sqlx-mysql", "sqlx-postgres", + "sqlx-sqlite", "syn", "tokio", "url", @@ -2327,12 +2343,14 @@ dependencies = [ "percent-encoding", "rand", "rsa", + "serde", "sha1", "sha2", "smallvec", "sqlx-core", "stringprep", "thiserror 2.0.18", + "time", "tracing", "whoami", ] @@ -2370,6 +2388,7 @@ dependencies = [ "sqlx-core", "stringprep", "thiserror 2.0.18", + "time", "tracing", "whoami", ] @@ -2390,9 +2409,11 @@ dependencies = [ "libsqlite3-sys", "log", "percent-encoding", + "serde", "serde_urlencoded", "sqlx-core", "thiserror 2.0.18", + "time", "tracing", "url", ] @@ -3039,6 +3060,7 @@ dependencies = [ "serde", "serde_json", "sqlx", + "time", "tokio", "toml", "tower-http", diff --git a/Cargo.toml b/Cargo.toml index e9d6e8b..5446260 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,8 +11,10 @@ homepage = "https://git.kanjala.com/warden" [workspace.dependencies] serde = "1.0.228" serde_json = "1.0.149" +time = { version = "0.3.47", default-features = false } tracing = "0.1.44" url = "2.5.8" +uuid = "1.23.0" [workspace.dependencies.sqlx] version = "0.8.6" diff --git a/migrations/20260329120607_functions.sql b/migrations/20260329120607_functions.sql new file mode 100644 index 0000000..f662c25 --- /dev/null +++ b/migrations/20260329120607_functions.sql @@ -0,0 +1,7 @@ +create or replace function update_updated_at_column() +returns trigger as $$ +begin + new.updated_at = now(); + return new; +end; +$$ language 'plpgsql'; diff --git a/migrations/20260329120645_transaction_schema.sql b/migrations/20260329120645_transaction_schema.sql new file mode 100644 index 0000000..d7b4744 --- /dev/null +++ b/migrations/20260329120645_transaction_schema.sql @@ -0,0 +1,18 @@ +-- Add migration script here +-- The transaction's blueprint to be checked on each request +create table transaction_schema ( + -- The transaction type + type text not null, + -- The schema's version (to allow for multiple revisions - maybe) + version varchar not null, + -- Actual JSON schema + json_schema jsonb not null, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now(), + primary key (type, version) +); + +create trigger update_transaction_schema_modtime + before update on transaction_schema + for each row + execute function update_updated_at_column(); diff --git a/warden/Cargo.toml b/warden/Cargo.toml index 501c651..d2b7b49 100644 --- a/warden/Cargo.toml +++ b/warden/Cargo.toml @@ -9,29 +9,30 @@ homepage.workspace = true [dependencies] anyhow = "1.0.102" async-trait = "0.1.89" -axum = "0.8.8" +axum = { version = "0.8.8", features = ["macros"] } clap = { version = "4.6.0", features = ["derive", "env"] } jsonschema = "0.45.0" secrecy = { version = "0.10.3", features = ["serde"] } serde = { workspace = true, features = ["derive"] } serde_json.workspace = true +time = { workspace = true, features = ["formatting", "serde"] } toml = "1.1.0" tower-http = { version = "0.6.8", features = ["cors", "request-id", "timeout", "trace"] } tracing.workspace = true tracing-appender = "0.2.4" tracing-subscriber = { version = "0.3.23", features = ["env-filter"] } url = { workspace = true, features = ["serde"] } -utoipa = "5.4.0" +utoipa = { version = "5.4.0", features = ["time"] } utoipa-axum = "0.2.0" utoipa-rapidoc = { version = "6.0.0", optional = true } utoipa-redoc = { version = "6.0.0", optional = true } utoipa-scalar = { version = "0.3.0", optional = true } utoipa-swagger-ui = { version = "9.0.2", optional = true } -uuid = { version = "1.23.0", features = ["v7"] } +uuid = { workspace = true, features = ["v7"] } [dependencies.sqlx] workspace = true -features = ["runtime-tokio-rustls"] +features = ["json", "runtime-tokio-rustls", "time"] [dependencies.tokio] version = "1.50.0" diff --git a/warden/src/server/api/mod.rs b/warden/src/server/api/mod.rs index 963a3ef..ecfbad3 100644 --- a/warden/src/server/api/mod.rs +++ b/warden/src/server/api/mod.rs @@ -1,3 +1,4 @@ +pub mod schema; pub mod transaction; pub mod version; diff --git a/warden/src/server/api/schema.rs b/warden/src/server/api/schema.rs new file mode 100644 index 0000000..e19c341 --- /dev/null +++ b/warden/src/server/api/schema.rs @@ -0,0 +1,33 @@ +use serde::{Deserialize, Serialize}; +use time::OffsetDateTime; +use utoipa::ToSchema; + +#[derive(Deserialize, Serialize, ToSchema)] +/// Transaction to monitor +pub struct CreateSchema { + #[serde(rename = "type")] + /// Transaction schema type + pub kind: String, + /// The schema's version + pub version: String, + /// Transaction data + #[serde(rename = "json_schema")] + pub schema: serde_json::Value, +} + +/// Transaction to monitor +#[derive(Deserialize, Serialize, Debug, ToSchema)] +pub struct TransactionSchema { + #[serde(rename = "type")] + /// Transaction schema type + pub kind: String, + /// The schema's version + pub version: String, + /// JSON schema for transcation + #[serde(rename = "json_schema")] + pub schema: serde_json::Value, + #[serde(with = "time::serde::rfc3339")] + pub created_at: OffsetDateTime, + #[serde(with = "time::serde::rfc3339")] + pub updated_at: OffsetDateTime, +} diff --git a/warden/src/server/api/transaction.rs b/warden/src/server/api/transaction.rs index 805ca8a..9524354 100644 --- a/warden/src/server/api/transaction.rs +++ b/warden/src/server/api/transaction.rs @@ -1,9 +1,16 @@ -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use utoipa::ToSchema; -#[derive(Deserialize, ToSchema)] +#[derive(Deserialize, Serialize, ToSchema)] + +/// Transaction to monitor pub struct Transaction { #[serde(rename = "type")] + /// Transaction schema type pub kind: String, - pub data: serde_json::Value, + /// The schema's version + pub version: String, + /// Transaction data + #[serde(rename = "json_schema")] + pub transaction: serde_json::Value, } diff --git a/warden/src/server/error/mod.rs b/warden/src/server/error/mod.rs index 1bd0498..e535c37 100644 --- a/warden/src/server/error/mod.rs +++ b/warden/src/server/error/mod.rs @@ -4,16 +4,36 @@ use axum::{ }; // Make our own error that wraps `anyhow::Error`. -pub struct AppError(anyhow::Error); +pub struct AppError { + pub code: StatusCode, + pub inner: anyhow::Error, +} + +impl AppError { + pub fn new(code: StatusCode, err: impl Into) -> Self { + Self { + code, + inner: err.into(), + } + } +} // Tell axum how to convert `AppError` into a response. impl IntoResponse for AppError { fn into_response(self) -> Response { - ( - StatusCode::INTERNAL_SERVER_ERROR, - format!("Something went wrong: {}", self.0), - ) - .into_response() + if self.code.is_server_error() { + tracing::error!(status = %self.code, error = ?self.inner, "Internal server error occurred"); + } else { + tracing::warn!(status = %self.code, error = %self.inner, "Client request error"); + } + + let message = if self.code == StatusCode::INTERNAL_SERVER_ERROR { + "Something went wrong".to_string() + } else { + self.inner.to_string() + }; + + (self.code, message).into_response() } } @@ -24,6 +44,9 @@ where E: Into, { fn from(err: E) -> Self { - Self(err.into()) + Self { + code: StatusCode::INTERNAL_SERVER_ERROR, + inner: err.into(), + } } } diff --git a/warden/src/server/middleware/extractors/transaction.rs b/warden/src/server/middleware/extractors/transaction.rs index 2d3b693..8d02f6e 100644 --- a/warden/src/server/middleware/extractors/transaction.rs +++ b/warden/src/server/middleware/extractors/transaction.rs @@ -32,20 +32,20 @@ where })?; let errors: Vec = validator - .iter_errors(&payload.data) + .iter_errors(&payload.transaction) .map(|e| e.to_string()) .collect(); if !errors.is_empty() { let error_details = errors.join("; "); return Err(( - StatusCode::BAD_REQUEST, + StatusCode::NOT_ACCEPTABLE, format!("Validation failed: {}", error_details), )); } - let data = serde_json::from_value(payload.data) - .map_err(|e| (StatusCode::BAD_REQUEST, e.to_string()))?; + let data = serde_json::from_value(payload.transaction) + .map_err(|e| (StatusCode::NOT_ACCEPTABLE, e.to_string()))?; Ok(ValidatedTransaction(data)) } diff --git a/warden/src/server/routes/config/mod.rs b/warden/src/server/routes/config/mod.rs index 4737c45..f5dd56e 100644 --- a/warden/src/server/routes/config/mod.rs +++ b/warden/src/server/routes/config/mod.rs @@ -1,4 +1,5 @@ pub mod logs; +pub mod schema; use std::sync::Arc; use utoipa::OpenApi; @@ -15,5 +16,6 @@ pub struct ConfigDoc; pub fn router(store: Arc) -> OpenApiRouter { OpenApiRouter::new() .routes(utoipa_axum::routes!(logs::reload)) + .routes(utoipa_axum::routes!(schema::create::create_schema)) .with_state(store) } diff --git a/warden/src/server/routes/config/schema/create.rs b/warden/src/server/routes/config/schema/create.rs new file mode 100644 index 0000000..e2ad580 --- /dev/null +++ b/warden/src/server/routes/config/schema/create.rs @@ -0,0 +1,122 @@ +use std::sync::Arc; + +use axum::{ + Json, debug_handler, + extract::State, + http::{HeaderMap, StatusCode}, + response::IntoResponse, +}; +use tracing::{debug, info, trace}; + +use crate::{ + server::{ + api::{ + schema::{CreateSchema, TransactionSchema}, + version::Version, + }, + error::AppError, + routes::config::CONFIG, + }, + state::AppState, +}; + +#[utoipa::path( + put, + responses( + ( + status = 201, + description = "The transaction's schema has been saved", + headers( + ("x-request-id", description = "Request identifier") + ), + body = TransactionSchema + ), + ( + status = 400, + description = "Invalid request", + headers( + ("x-request-id", description = "Request identifier") + ) + ), + ( + status = 405, + description = "Method not allowed", + headers( + ("x-request-id", description = "Request identifier") + ) + ), + ( + status = 409, + description = "Schema with provided type and version is already available", + headers( + ("x-request-id", description = "Request identifier") + ) + ), + ( + status = 500, + description = "Internal server error", + headers( + ("x-request-id", description = "Request identifier") + ) + ) + ), + operation_id = "create_schema", // https://github.com/juhaku/utoipa/issues/1170 + tag = CONFIG, + request_body( + content = CreateSchema + ), + path = "/{version}/config/schema", + params( + ("version" = Version, Path, description = "API version, e.g., v1, v2, v3") + ), +)] +#[tracing::instrument( + name = "create_schema", + skip(state, headers, body), + fields( + request_id = %headers.get("x-request-id") + .and_then(|v| v.to_str().ok()).expect("request id"), + kind = %body.kind, + ) +)] +#[debug_handler] +pub async fn create_schema( + State(state): State>, + headers: HeaderMap, + Json(body): Json, +) -> Result { + trace!("checking schema validity"); + jsonschema::Validator::new(&body.schema).map_err(|e| { + AppError::new( + StatusCode::BAD_REQUEST, + anyhow::anyhow!("Invalid schema: {e}"), + ) + })?; + info!("schema is valid. trying to save..."); + + let result = sqlx::query_as!( + TransactionSchema, + "insert into transaction_schema (type, version, json_schema) values ($1, $2, $3) + returning + type as kind, + version, + json_schema as schema, + created_at, + updated_at + ", + body.kind, + body.version, + sqlx::types::Json(&body.schema) as _ + ) + .fetch_one(&state.database) + .await + .map_err(|e| match e { + sqlx::Error::Database(db_err) if db_err.code() == Some("23505".into()) => AppError::new( + StatusCode::CONFLICT, + anyhow::anyhow!("Transaction schema already exists"), + ), + _ => e.into(), + })?; + debug!("schema created"); + Ok((StatusCode::CREATED, Json(result))) +} diff --git a/warden/src/server/routes/config/schema/delete.rs b/warden/src/server/routes/config/schema/delete.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/warden/src/server/routes/config/schema/delete.rs @@ -0,0 +1 @@ + diff --git a/warden/src/server/routes/config/schema/mod.rs b/warden/src/server/routes/config/schema/mod.rs new file mode 100644 index 0000000..ab7fa43 --- /dev/null +++ b/warden/src/server/routes/config/schema/mod.rs @@ -0,0 +1,4 @@ +pub mod create; +pub mod delete; +pub mod read; +pub mod update; diff --git a/warden/src/server/routes/config/schema/read.rs b/warden/src/server/routes/config/schema/read.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/warden/src/server/routes/config/schema/read.rs @@ -0,0 +1 @@ + diff --git a/warden/src/server/routes/config/schema/update.rs b/warden/src/server/routes/config/schema/update.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/warden/src/server/routes/config/schema/update.rs @@ -0,0 +1 @@ + diff --git a/warden/src/server/routes/transaction_monitoring/monitor.rs b/warden/src/server/routes/transaction_monitoring/monitor.rs index 1ef5c1f..22c2864 100644 --- a/warden/src/server/routes/transaction_monitoring/monitor.rs +++ b/warden/src/server/routes/transaction_monitoring/monitor.rs @@ -10,18 +10,25 @@ use crate::{ use axum::{extract::State, http::StatusCode}; #[utoipa::path( - patch, + post, responses( ( status = 200, - description = "Server's log level has been updated", + description = "Transaction has been processed", headers( ("x-request-id", description = "Request identifier") ) ), ( status = 400, - description = "Invalid log level", + description = "Invalid request", + headers( + ("x-request-id", description = "Request identifier") + ) + ), + ( + status = 406, + description = "The supplied transaction does not conform to the schema", headers( ("x-request-id", description = "Request identifier") ) diff --git a/warden/src/state/mod.rs b/warden/src/state/mod.rs index 5f0f840..eae1c43 100644 --- a/warden/src/state/mod.rs +++ b/warden/src/state/mod.rs @@ -6,16 +6,16 @@ use crate::{config::Configuration, logging::LogHandle}; #[derive(Debug, Clone)] pub struct AppState { pub log_handle: LogHandle, - // pub database: PgPool, + pub database: PgPool, } impl AppState { pub async fn new(log_handle: LogHandle, config: &Configuration) -> anyhow::Result { - // let database = database::connect(&config.database).await?; + let database = database::connect(&config.database).await?; Ok(Self { log_handle, - // database, + database, }) } } -- cgit v1.2.3