aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrtkay123 <dev@kanjala.com>2026-03-29 15:06:30 +0200
committerrtkay123 <dev@kanjala.com>2026-03-29 15:06:30 +0200
commit7e08464b2a42048ebb5128bec5731719cf8bd1db (patch)
treea3b2da94cfcd6ceff480e30a37692b48ca85144f
parenta32f67624b14d5bf3ff726f2dbab09652ae34974 (diff)
downloadwarden-7e08464b2a42048ebb5128bec5731719cf8bd1db.tar.bz2
warden-7e08464b2a42048ebb5128bec5731719cf8bd1db.zip
feat: create transaction schema
-rw-r--r--Cargo.lock22
-rw-r--r--Cargo.toml2
-rw-r--r--migrations/20260329120607_functions.sql7
-rw-r--r--migrations/20260329120645_transaction_schema.sql18
-rw-r--r--warden/Cargo.toml9
-rw-r--r--warden/src/server/api/mod.rs1
-rw-r--r--warden/src/server/api/schema.rs33
-rw-r--r--warden/src/server/api/transaction.rs13
-rw-r--r--warden/src/server/error/mod.rs37
-rw-r--r--warden/src/server/middleware/extractors/transaction.rs8
-rw-r--r--warden/src/server/routes/config/mod.rs2
-rw-r--r--warden/src/server/routes/config/schema/create.rs122
-rw-r--r--warden/src/server/routes/config/schema/delete.rs1
-rw-r--r--warden/src/server/routes/config/schema/mod.rs4
-rw-r--r--warden/src/server/routes/config/schema/read.rs1
-rw-r--r--warden/src/server/routes/config/schema/update.rs1
-rw-r--r--warden/src/server/routes/transaction_monitoring/monitor.rs13
-rw-r--r--warden/src/state/mod.rs6
18 files changed, 276 insertions, 24 deletions
diff --git a/Cargo.lock b/Cargo.lock
index a2d273d..595583f 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -163,6 +163,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b52af3cb4058c895d37317bb27508dccc8e5f2d39454016b297bf4a400597b8"
dependencies = [
"axum-core",
+ "axum-macros",
"bytes",
"form_urlencoded",
"futures-util",
@@ -209,6 +210,17 @@ dependencies = [
]
[[package]]
+name = "axum-macros"
+version = "0.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "604fde5e028fea851ce1d8570bbdc034bec850d157f7569d10f347d06808c05c"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
name = "base64"
version = "0.22.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -494,6 +506,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7cd812cc2bc1d69d4764bd80df88b4317eaef9e773c75226407d9bc0876b211c"
dependencies = [
"powerfmt",
+ "serde_core",
]
[[package]]
@@ -2253,6 +2266,7 @@ dependencies = [
"sha2",
"smallvec",
"thiserror 2.0.18",
+ "time",
"tokio",
"tokio-stream",
"tracing",
@@ -2290,7 +2304,9 @@ dependencies = [
"serde_json",
"sha2",
"sqlx-core",
+ "sqlx-mysql",
"sqlx-postgres",
+ "sqlx-sqlite",
"syn",
"tokio",
"url",
@@ -2327,12 +2343,14 @@ dependencies = [
"percent-encoding",
"rand",
"rsa",
+ "serde",
"sha1",
"sha2",
"smallvec",
"sqlx-core",
"stringprep",
"thiserror 2.0.18",
+ "time",
"tracing",
"whoami",
]
@@ -2370,6 +2388,7 @@ dependencies = [
"sqlx-core",
"stringprep",
"thiserror 2.0.18",
+ "time",
"tracing",
"whoami",
]
@@ -2390,9 +2409,11 @@ dependencies = [
"libsqlite3-sys",
"log",
"percent-encoding",
+ "serde",
"serde_urlencoded",
"sqlx-core",
"thiserror 2.0.18",
+ "time",
"tracing",
"url",
]
@@ -3039,6 +3060,7 @@ dependencies = [
"serde",
"serde_json",
"sqlx",
+ "time",
"tokio",
"toml",
"tower-http",
diff --git a/Cargo.toml b/Cargo.toml
index e9d6e8b..5446260 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -11,8 +11,10 @@ homepage = "https://git.kanjala.com/warden"
[workspace.dependencies]
serde = "1.0.228"
serde_json = "1.0.149"
+time = { version = "0.3.47", default-features = false }
tracing = "0.1.44"
url = "2.5.8"
+uuid = "1.23.0"
[workspace.dependencies.sqlx]
version = "0.8.6"
diff --git a/migrations/20260329120607_functions.sql b/migrations/20260329120607_functions.sql
new file mode 100644
index 0000000..f662c25
--- /dev/null
+++ b/migrations/20260329120607_functions.sql
@@ -0,0 +1,7 @@
+create or replace function update_updated_at_column()
+returns trigger as $$
+begin
+ new.updated_at = now();
+ return new;
+end;
+$$ language 'plpgsql';
diff --git a/migrations/20260329120645_transaction_schema.sql b/migrations/20260329120645_transaction_schema.sql
new file mode 100644
index 0000000..d7b4744
--- /dev/null
+++ b/migrations/20260329120645_transaction_schema.sql
@@ -0,0 +1,18 @@
+-- Add migration script here
+-- The transaction's blueprint to be checked on each request
+create table transaction_schema (
+ -- The transaction type
+ type text not null,
+ -- The schema's version (to allow for multiple revisions - maybe)
+ version varchar not null,
+ -- Actual JSON schema
+ json_schema jsonb not null,
+ created_at timestamptz not null default now(),
+ updated_at timestamptz not null default now(),
+ primary key (type, version)
+);
+
+create trigger update_transaction_schema_modtime
+ before update on transaction_schema
+ for each row
+ execute function update_updated_at_column();
diff --git a/warden/Cargo.toml b/warden/Cargo.toml
index 501c651..d2b7b49 100644
--- a/warden/Cargo.toml
+++ b/warden/Cargo.toml
@@ -9,29 +9,30 @@ homepage.workspace = true
[dependencies]
anyhow = "1.0.102"
async-trait = "0.1.89"
-axum = "0.8.8"
+axum = { version = "0.8.8", features = ["macros"] }
clap = { version = "4.6.0", features = ["derive", "env"] }
jsonschema = "0.45.0"
secrecy = { version = "0.10.3", features = ["serde"] }
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
+time = { workspace = true, features = ["formatting", "serde"] }
toml = "1.1.0"
tower-http = { version = "0.6.8", features = ["cors", "request-id", "timeout", "trace"] }
tracing.workspace = true
tracing-appender = "0.2.4"
tracing-subscriber = { version = "0.3.23", features = ["env-filter"] }
url = { workspace = true, features = ["serde"] }
-utoipa = "5.4.0"
+utoipa = { version = "5.4.0", features = ["time"] }
utoipa-axum = "0.2.0"
utoipa-rapidoc = { version = "6.0.0", optional = true }
utoipa-redoc = { version = "6.0.0", optional = true }
utoipa-scalar = { version = "0.3.0", optional = true }
utoipa-swagger-ui = { version = "9.0.2", optional = true }
-uuid = { version = "1.23.0", features = ["v7"] }
+uuid = { workspace = true, features = ["v7"] }
[dependencies.sqlx]
workspace = true
-features = ["runtime-tokio-rustls"]
+features = ["json", "runtime-tokio-rustls", "time"]
[dependencies.tokio]
version = "1.50.0"
diff --git a/warden/src/server/api/mod.rs b/warden/src/server/api/mod.rs
index 963a3ef..ecfbad3 100644
--- a/warden/src/server/api/mod.rs
+++ b/warden/src/server/api/mod.rs
@@ -1,3 +1,4 @@
+pub mod schema;
pub mod transaction;
pub mod version;
diff --git a/warden/src/server/api/schema.rs b/warden/src/server/api/schema.rs
new file mode 100644
index 0000000..e19c341
--- /dev/null
+++ b/warden/src/server/api/schema.rs
@@ -0,0 +1,33 @@
+use serde::{Deserialize, Serialize};
+use time::OffsetDateTime;
+use utoipa::ToSchema;
+
+#[derive(Deserialize, Serialize, ToSchema)]
+/// Transaction to monitor
+pub struct CreateSchema {
+ #[serde(rename = "type")]
+ /// Transaction schema type
+ pub kind: String,
+ /// The schema's version
+ pub version: String,
+ /// Transaction data
+ #[serde(rename = "json_schema")]
+ pub schema: serde_json::Value,
+}
+
+/// Transaction to monitor
+#[derive(Deserialize, Serialize, Debug, ToSchema)]
+pub struct TransactionSchema {
+ #[serde(rename = "type")]
+ /// Transaction schema type
+ pub kind: String,
+ /// The schema's version
+ pub version: String,
+ /// JSON schema for transcation
+ #[serde(rename = "json_schema")]
+ pub schema: serde_json::Value,
+ #[serde(with = "time::serde::rfc3339")]
+ pub created_at: OffsetDateTime,
+ #[serde(with = "time::serde::rfc3339")]
+ pub updated_at: OffsetDateTime,
+}
diff --git a/warden/src/server/api/transaction.rs b/warden/src/server/api/transaction.rs
index 805ca8a..9524354 100644
--- a/warden/src/server/api/transaction.rs
+++ b/warden/src/server/api/transaction.rs
@@ -1,9 +1,16 @@
-use serde::Deserialize;
+use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
-#[derive(Deserialize, ToSchema)]
+#[derive(Deserialize, Serialize, ToSchema)]
+
+/// Transaction to monitor
pub struct Transaction {
#[serde(rename = "type")]
+ /// Transaction schema type
pub kind: String,
- pub data: serde_json::Value,
+ /// The schema's version
+ pub version: String,
+ /// Transaction data
+ #[serde(rename = "json_schema")]
+ pub transaction: serde_json::Value,
}
diff --git a/warden/src/server/error/mod.rs b/warden/src/server/error/mod.rs
index 1bd0498..e535c37 100644
--- a/warden/src/server/error/mod.rs
+++ b/warden/src/server/error/mod.rs
@@ -4,16 +4,36 @@ use axum::{
};
// Make our own error that wraps `anyhow::Error`.
-pub struct AppError(anyhow::Error);
+pub struct AppError {
+ pub code: StatusCode,
+ pub inner: anyhow::Error,
+}
+
+impl AppError {
+ pub fn new(code: StatusCode, err: impl Into<anyhow::Error>) -> Self {
+ Self {
+ code,
+ inner: err.into(),
+ }
+ }
+}
// Tell axum how to convert `AppError` into a response.
impl IntoResponse for AppError {
fn into_response(self) -> Response {
- (
- StatusCode::INTERNAL_SERVER_ERROR,
- format!("Something went wrong: {}", self.0),
- )
- .into_response()
+ if self.code.is_server_error() {
+ tracing::error!(status = %self.code, error = ?self.inner, "Internal server error occurred");
+ } else {
+ tracing::warn!(status = %self.code, error = %self.inner, "Client request error");
+ }
+
+ let message = if self.code == StatusCode::INTERNAL_SERVER_ERROR {
+ "Something went wrong".to_string()
+ } else {
+ self.inner.to_string()
+ };
+
+ (self.code, message).into_response()
}
}
@@ -24,6 +44,9 @@ where
E: Into<anyhow::Error>,
{
fn from(err: E) -> Self {
- Self(err.into())
+ Self {
+ code: StatusCode::INTERNAL_SERVER_ERROR,
+ inner: err.into(),
+ }
}
}
diff --git a/warden/src/server/middleware/extractors/transaction.rs b/warden/src/server/middleware/extractors/transaction.rs
index 2d3b693..8d02f6e 100644
--- a/warden/src/server/middleware/extractors/transaction.rs
+++ b/warden/src/server/middleware/extractors/transaction.rs
@@ -32,20 +32,20 @@ where
})?;
let errors: Vec<String> = validator
- .iter_errors(&payload.data)
+ .iter_errors(&payload.transaction)
.map(|e| e.to_string())
.collect();
if !errors.is_empty() {
let error_details = errors.join("; ");
return Err((
- StatusCode::BAD_REQUEST,
+ StatusCode::NOT_ACCEPTABLE,
format!("Validation failed: {}", error_details),
));
}
- let data = serde_json::from_value(payload.data)
- .map_err(|e| (StatusCode::BAD_REQUEST, e.to_string()))?;
+ let data = serde_json::from_value(payload.transaction)
+ .map_err(|e| (StatusCode::NOT_ACCEPTABLE, e.to_string()))?;
Ok(ValidatedTransaction(data))
}
diff --git a/warden/src/server/routes/config/mod.rs b/warden/src/server/routes/config/mod.rs
index 4737c45..f5dd56e 100644
--- a/warden/src/server/routes/config/mod.rs
+++ b/warden/src/server/routes/config/mod.rs
@@ -1,4 +1,5 @@
pub mod logs;
+pub mod schema;
use std::sync::Arc;
use utoipa::OpenApi;
@@ -15,5 +16,6 @@ pub struct ConfigDoc;
pub fn router(store: Arc<AppState>) -> OpenApiRouter {
OpenApiRouter::new()
.routes(utoipa_axum::routes!(logs::reload))
+ .routes(utoipa_axum::routes!(schema::create::create_schema))
.with_state(store)
}
diff --git a/warden/src/server/routes/config/schema/create.rs b/warden/src/server/routes/config/schema/create.rs
new file mode 100644
index 0000000..e2ad580
--- /dev/null
+++ b/warden/src/server/routes/config/schema/create.rs
@@ -0,0 +1,122 @@
+use std::sync::Arc;
+
+use axum::{
+ Json, debug_handler,
+ extract::State,
+ http::{HeaderMap, StatusCode},
+ response::IntoResponse,
+};
+use tracing::{debug, info, trace};
+
+use crate::{
+ server::{
+ api::{
+ schema::{CreateSchema, TransactionSchema},
+ version::Version,
+ },
+ error::AppError,
+ routes::config::CONFIG,
+ },
+ state::AppState,
+};
+
+#[utoipa::path(
+ put,
+ responses(
+ (
+ status = 201,
+ description = "The transaction's schema has been saved",
+ headers(
+ ("x-request-id", description = "Request identifier")
+ ),
+ body = TransactionSchema
+ ),
+ (
+ status = 400,
+ description = "Invalid request",
+ headers(
+ ("x-request-id", description = "Request identifier")
+ )
+ ),
+ (
+ status = 405,
+ description = "Method not allowed",
+ headers(
+ ("x-request-id", description = "Request identifier")
+ )
+ ),
+ (
+ status = 409,
+ description = "Schema with provided type and version is already available",
+ headers(
+ ("x-request-id", description = "Request identifier")
+ )
+ ),
+ (
+ status = 500,
+ description = "Internal server error",
+ headers(
+ ("x-request-id", description = "Request identifier")
+ )
+ )
+ ),
+ operation_id = "create_schema", // https://github.com/juhaku/utoipa/issues/1170
+ tag = CONFIG,
+ request_body(
+ content = CreateSchema
+ ),
+ path = "/{version}/config/schema",
+ params(
+ ("version" = Version, Path, description = "API version, e.g., v1, v2, v3")
+ ),
+)]
+#[tracing::instrument(
+ name = "create_schema",
+ skip(state, headers, body),
+ fields(
+ request_id = %headers.get("x-request-id")
+ .and_then(|v| v.to_str().ok()).expect("request id"),
+ kind = %body.kind,
+ )
+)]
+#[debug_handler]
+pub async fn create_schema(
+ State(state): State<Arc<AppState>>,
+ headers: HeaderMap,
+ Json(body): Json<CreateSchema>,
+) -> Result<impl IntoResponse, AppError> {
+ trace!("checking schema validity");
+ jsonschema::Validator::new(&body.schema).map_err(|e| {
+ AppError::new(
+ StatusCode::BAD_REQUEST,
+ anyhow::anyhow!("Invalid schema: {e}"),
+ )
+ })?;
+ info!("schema is valid. trying to save...");
+
+ let result = sqlx::query_as!(
+ TransactionSchema,
+ "insert into transaction_schema (type, version, json_schema) values ($1, $2, $3)
+ returning
+ type as kind,
+ version,
+ json_schema as schema,
+ created_at,
+ updated_at
+ ",
+ body.kind,
+ body.version,
+ sqlx::types::Json(&body.schema) as _
+ )
+ .fetch_one(&state.database)
+ .await
+ .map_err(|e| match e {
+ sqlx::Error::Database(db_err) if db_err.code() == Some("23505".into()) => AppError::new(
+ StatusCode::CONFLICT,
+ anyhow::anyhow!("Transaction schema already exists"),
+ ),
+ _ => e.into(),
+ })?;
+ debug!("schema created");
+ Ok((StatusCode::CREATED, Json(result)))
+}
diff --git a/warden/src/server/routes/config/schema/delete.rs b/warden/src/server/routes/config/schema/delete.rs
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/warden/src/server/routes/config/schema/delete.rs
@@ -0,0 +1 @@
+
diff --git a/warden/src/server/routes/config/schema/mod.rs b/warden/src/server/routes/config/schema/mod.rs
new file mode 100644
index 0000000..ab7fa43
--- /dev/null
+++ b/warden/src/server/routes/config/schema/mod.rs
@@ -0,0 +1,4 @@
+pub mod create;
+pub mod delete;
+pub mod read;
+pub mod update;
diff --git a/warden/src/server/routes/config/schema/read.rs b/warden/src/server/routes/config/schema/read.rs
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/warden/src/server/routes/config/schema/read.rs
@@ -0,0 +1 @@
+
diff --git a/warden/src/server/routes/config/schema/update.rs b/warden/src/server/routes/config/schema/update.rs
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/warden/src/server/routes/config/schema/update.rs
@@ -0,0 +1 @@
+
diff --git a/warden/src/server/routes/transaction_monitoring/monitor.rs b/warden/src/server/routes/transaction_monitoring/monitor.rs
index 1ef5c1f..22c2864 100644
--- a/warden/src/server/routes/transaction_monitoring/monitor.rs
+++ b/warden/src/server/routes/transaction_monitoring/monitor.rs
@@ -10,18 +10,25 @@ use crate::{
use axum::{extract::State, http::StatusCode};
#[utoipa::path(
- patch,
+ post,
responses(
(
status = 200,
- description = "Server's log level has been updated",
+ description = "Transaction has been processed",
headers(
("x-request-id", description = "Request identifier")
)
),
(
status = 400,
- description = "Invalid log level",
+ description = "Invalid request",
+ headers(
+ ("x-request-id", description = "Request identifier")
+ )
+ ),
+ (
+ status = 406,
+ description = "The supplied transaction does not conform to the schema",
headers(
("x-request-id", description = "Request identifier")
)
diff --git a/warden/src/state/mod.rs b/warden/src/state/mod.rs
index 5f0f840..eae1c43 100644
--- a/warden/src/state/mod.rs
+++ b/warden/src/state/mod.rs
@@ -6,16 +6,16 @@ use crate::{config::Configuration, logging::LogHandle};
#[derive(Debug, Clone)]
pub struct AppState {
pub log_handle: LogHandle,
- // pub database: PgPool,
+ pub database: PgPool,
}
impl AppState {
pub async fn new(log_handle: LogHandle, config: &Configuration) -> anyhow::Result<Self> {
- // let database = database::connect(&config.database).await?;
+ let database = database::connect(&config.database).await?;
Ok(Self {
log_handle,
- // database,
+ database,
})
}
}