diff options
| author | rtkay123 <dev@kanjala.com> | 2026-03-30 14:02:53 +0200 |
|---|---|---|
| committer | rtkay123 <dev@kanjala.com> | 2026-03-30 14:02:53 +0200 |
| commit | 4071482f983d66b16cc8a5519f5665990dc7bc02 (patch) | |
| tree | bd1b630df7c46edb34f96f4264b19ddbda14b0a0 | |
| parent | 1b5622e14ca3deaf2a25ee785af656779ded7a41 (diff) | |
| download | warden-4071482f983d66b16cc8a5519f5665990dc7bc02.tar.bz2 warden-4071482f983d66b16cc8a5519f5665990dc7bc02.zip | |
refactor: camelCase response
23 files changed, 462 insertions, 160 deletions
diff --git a/.sqlx/query-3bceb45ae115dd44778ad979e276f0d2a525b5d51d32ac2c9c8ced801e273ad2.json b/.sqlx/query-3bceb45ae115dd44778ad979e276f0d2a525b5d51d32ac2c9c8ced801e273ad2.json new file mode 100644 index 0000000..0fa6500 --- /dev/null +++ b/.sqlx/query-3bceb45ae115dd44778ad979e276f0d2a525b5d51d32ac2c9c8ced801e273ad2.json @@ -0,0 +1,48 @@ +{ + "db_name": "PostgreSQL", + "query": "\n update\n transaction_schema\n set \n schema = $3\n where \n schema_type = $1 \n and schema_version = $2\n returning *\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "schema_type", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "schema_version", + "type_info": "Varchar" + }, + { + "ordinal": 2, + "name": "schema", + "type_info": "Jsonb" + }, + { + "ordinal": 3, + "name": "created_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 4, + "name": "updated_at", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Text", + "Text", + "Jsonb" + ] + }, + "nullable": [ + false, + false, + false, + false, + false + ] + }, + "hash": "3bceb45ae115dd44778ad979e276f0d2a525b5d51d32ac2c9c8ced801e273ad2" +} diff --git a/.sqlx/query-48a65761a26b8bfae728cc17754849ec1c1593179da57b1f87bc9a18860ad75f.json b/.sqlx/query-48a65761a26b8bfae728cc17754849ec1c1593179da57b1f87bc9a18860ad75f.json new file mode 100644 index 0000000..cee9aca --- /dev/null +++ b/.sqlx/query-48a65761a26b8bfae728cc17754849ec1c1593179da57b1f87bc9a18860ad75f.json @@ -0,0 +1,48 @@ +{ + "db_name": "PostgreSQL", + "query": "\n select *\n from transaction_schema\n where ($1 = '' or (schema_type, schema_version) > ($1, $2))\n order by schema_type asc, schema_version asc\n limit $3\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "schema_type", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "schema_version", + "type_info": "Varchar" + }, + { + "ordinal": 2, + "name": "schema", + "type_info": "Jsonb" + }, + { + "ordinal": 3, + "name": "created_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 4, + "name": "updated_at", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Text", + "Text", + "Int8" + ] + }, + "nullable": [ + false, + false, + false, + false, + false + ] + }, + "hash": "48a65761a26b8bfae728cc17754849ec1c1593179da57b1f87bc9a18860ad75f" +} diff --git a/.sqlx/query-6474a8297762d8df4bc7d09d837cfa2daa0110377d70386b7788d6cd20052a90.json b/.sqlx/query-502a6d7fb354206edfeabe452d5705efb0ed3ac5bfaa362be8ef615a2ee5f4a3.json index a46ff8f..0453107 100644 --- a/.sqlx/query-6474a8297762d8df4bc7d09d837cfa2daa0110377d70386b7788d6cd20052a90.json +++ b/.sqlx/query-502a6d7fb354206edfeabe452d5705efb0ed3ac5bfaa362be8ef615a2ee5f4a3.json @@ -1,16 +1,16 @@ { "db_name": "PostgreSQL", - "query": "insert into transaction_schema (type, version, json_schema) values ($1, $2, $3)\n returning\n type as kind, \n version, \n json_schema as schema, \n created_at, \n updated_at\n ", + "query": "insert into transaction_schema (schema_type, schema_version, schema) values ($1, $2, $3)\n returning *\n ", "describe": { "columns": [ { "ordinal": 0, - "name": "kind", + "name": "schema_type", "type_info": "Text" }, { "ordinal": 1, - "name": "version", + "name": "schema_version", "type_info": "Varchar" }, { @@ -44,5 +44,5 @@ false ] }, - "hash": "6474a8297762d8df4bc7d09d837cfa2daa0110377d70386b7788d6cd20052a90" + "hash": "502a6d7fb354206edfeabe452d5705efb0ed3ac5bfaa362be8ef615a2ee5f4a3" } diff --git a/.sqlx/query-e9e130d0192838e5321e0bbd42a3ffa93d3dd52e4d8736551d328d6a36b9f482.json b/.sqlx/query-5a3f8243d2641834e14a9c6888d54f6c5fbfeec32eb9fd3836251b26ecc97344.json index 7a35bab..5e32b25 100644 --- a/.sqlx/query-e9e130d0192838e5321e0bbd42a3ffa93d3dd52e4d8736551d328d6a36b9f482.json +++ b/.sqlx/query-5a3f8243d2641834e14a9c6888d54f6c5fbfeec32eb9fd3836251b26ecc97344.json @@ -1,16 +1,16 @@ { "db_name": "PostgreSQL", - "query": "select \n type as kind, \n version, \n json_schema as schema, \n created_at, \n updated_at\n from transaction_schema where type = $1 and version = $2", + "query": "select \n *\n from transaction_schema where schema_type = $1 and schema_version = $2", "describe": { "columns": [ { "ordinal": 0, - "name": "kind", + "name": "schema_type", "type_info": "Text" }, { "ordinal": 1, - "name": "version", + "name": "schema_version", "type_info": "Varchar" }, { @@ -43,5 +43,5 @@ false ] }, - "hash": "e9e130d0192838e5321e0bbd42a3ffa93d3dd52e4d8736551d328d6a36b9f482" + "hash": "5a3f8243d2641834e14a9c6888d54f6c5fbfeec32eb9fd3836251b26ecc97344" } diff --git a/.sqlx/query-8911a00307267a402475c286f765d23f1e3b87538591837bfe41fff03e7313dd.json b/.sqlx/query-c97114552378d59c671960f1be916c588b74ad726f0a8308ea692d20a8558888.json index 99d50b9..401e6e6 100644 --- a/.sqlx/query-8911a00307267a402475c286f765d23f1e3b87538591837bfe41fff03e7313dd.json +++ b/.sqlx/query-c97114552378d59c671960f1be916c588b74ad726f0a8308ea692d20a8558888.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "delete from transaction_schema where type = $1 and version = $2", + "query": "delete from transaction_schema where schema_type = $1 and schema_version = $2", "describe": { "columns": [], "parameters": { @@ -11,5 +11,5 @@ }, "nullable": [] }, - "hash": "8911a00307267a402475c286f765d23f1e3b87538591837bfe41fff03e7313dd" + "hash": "c97114552378d59c671960f1be916c588b74ad726f0a8308ea692d20a8558888" } @@ -1151,9 +1151,9 @@ checksum = "d98f6fed1fde3f8c21bc40a1abb88dd75e67924f9cffc3ef95607bad8017f8e2" [[package]] name = "iri-string" -version = "0.7.11" +version = "0.7.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8e7418f59cc01c88316161279a7f665217ae316b388e58a0d10e29f54f1e5eb" +checksum = "25e659a4bb38e810ebc252e53b5814ff908a8c58c2a9ce2fae1bbec24cbf4e20" dependencies = [ "memchr", "serde", @@ -3072,6 +3072,7 @@ dependencies = [ "anyhow", "api-config", "axum", + "base64", "clap", "jsonschema", "secrecy", diff --git a/lib/api-config/src/schema/create.rs b/lib/api-config/src/schema/create.rs index eef11f8..e6511d5 100644 --- a/lib/api-config/src/schema/create.rs +++ b/lib/api-config/src/schema/create.rs @@ -2,17 +2,16 @@ use serde::{Deserialize, Serialize}; #[derive(Deserialize, Serialize)] #[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))] -/// Transaction to monitor #[cfg_attr(feature = "utoipa", schema(example = json!({ - "type": "custom.schema", - "version": "1.0.0", - "json_schema": { + "schemaType": "custom.schema", + "schemaVersion": "1.0.0", + "schema": { "$schema": "https://json-schema.org/draft/2020-12/schema", "title": "FinancialTransaction", "type": "object", - "required": ["transaction_id", "amount", "currency", "timestamp"], + "required": ["transactionId", "amount", "currency", "timestamp"], "properties": { - "transaction_id": { + "transactionId": { "type": "string", "format": "uuid" }, @@ -32,13 +31,13 @@ use serde::{Deserialize, Serialize}; } } })))] +#[serde(rename_all = "camelCase")] +/// The json schema to validate for each transaction of this type and version pub struct CreateSchema { - #[serde(rename = "type")] /// Transaction schema type - pub kind: String, + pub schema_type: String, /// The schema's version - pub version: String, - /// The json schema to validate for each transaction of this type and version - #[serde(rename = "json_schema")] + pub schema_version: String, + /// The json schema pub schema: serde_json::Value, } diff --git a/lib/api-config/src/schema/mod.rs b/lib/api-config/src/schema/mod.rs index d1ef5f4..7c65d49 100644 --- a/lib/api-config/src/schema/mod.rs +++ b/lib/api-config/src/schema/mod.rs @@ -1,26 +1,28 @@ mod create; pub use create::CreateSchema; +use tracing::debug; use async_trait::async_trait; use serde::{Deserialize, Serialize}; +use std::fmt::Debug; use time::OffsetDateTime; use warden_core::state::AppState; use crate::ConfigurationError; /// Transaction to monitor -#[derive(Deserialize, Serialize)] +#[derive(Deserialize, Debug, Serialize)] #[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))] #[cfg_attr(feature = "utoipa", schema(example = json!({ - "type": "custom.schema", - "version": "1.0.0", - "json_schema": { + "schemaType": "custom.schema", + "schemaVersion": "1.0.0", + "schema": { "$schema": "https://json-schema.org/draft/2020-12/schema", "title": "FinancialTransaction", "type": "object", - "required": ["transaction_id", "amount", "currency", "timestamp"], + "required": ["transactionId", "amount", "currency", "timestamp"], "properties": { - "transaction_id": { + "transactionId": { "type": "string", "format": "uuid" }, @@ -39,20 +41,21 @@ use crate::ConfigurationError; }, } }, - "created_at": time::OffsetDateTime::now_utc().format(&time::format_description::well_known::Rfc3339).unwrap(), - "updated_at": time::OffsetDateTime::now_utc().format(&time::format_description::well_known::Rfc3339).unwrap(), + "createdAt": time::OffsetDateTime::now_utc().format(&time::format_description::well_known::Rfc3339).unwrap(), + "updatedAt": time::OffsetDateTime::now_utc().format(&time::format_description::well_known::Rfc3339).unwrap(), })))] +#[serde(rename_all = "camelCase")] pub struct TransactionSchema { - #[serde(rename = "type")] /// Transaction schema type - pub kind: String, + pub schema_type: String, /// The schema's version - pub version: String, + pub schema_version: String, /// JSON schema for transcation - #[serde(rename = "json_schema")] pub schema: serde_json::Value, + /// When the schema was created #[serde(with = "time::serde::rfc3339")] pub created_at: OffsetDateTime, + /// When the schema was last updated #[serde(with = "time::serde::rfc3339")] pub updated_at: OffsetDateTime, } @@ -61,49 +64,53 @@ pub struct TransactionSchema { pub trait SchemaDriver { async fn create_schema( &self, - kind: impl AsRef<str> + Send + Sync, - version: impl AsRef<str> + Send + Sync, + kind: impl AsRef<str> + Send + Sync + Debug, + version: impl AsRef<str> + Send + Sync + Debug, schema: &serde_json::Value, ) -> Result<TransactionSchema, ConfigurationError>; async fn delete_schema( &self, - kind: impl AsRef<str> + Send + Sync, - version: impl AsRef<str> + Send + Sync, + kind: impl AsRef<str> + Send + Sync + Debug, + version: impl AsRef<str> + Send + Sync + Debug, ) -> Result<(), ConfigurationError>; async fn get_schema( &self, - kind: impl AsRef<str> + Send + Sync, - version: impl AsRef<str> + Send + Sync, + kind: impl AsRef<str> + Send + Sync + Debug, + version: impl AsRef<str> + Send + Sync + Debug, ) -> Result<Option<TransactionSchema>, ConfigurationError>; async fn update_schema( &self, - kind: impl AsRef<str> + Send + Sync, - version: impl AsRef<str> + Send + Sync, + kind: impl AsRef<str> + Send + Sync + Debug, + version: impl AsRef<str> + Send + Sync + Debug, schema: &serde_json::Value, - ) -> Result<TransactionSchema, ConfigurationError>; + ) -> Result<Option<TransactionSchema>, ConfigurationError>; + + async fn get_schemas( + &self, + limit: i64, + first: Option<i64>, + after: Option<impl AsRef<str> + Send + Sync + Debug>, + ) -> Result<Vec<TransactionSchema>, ConfigurationError>; } #[async_trait] impl SchemaDriver for AppState { + #[tracing::instrument(skip(self, schema))] async fn create_schema( &self, - kind: impl AsRef<str> + Send + Sync, - version: impl AsRef<str> + Send + Sync, + kind: impl AsRef<str> + Send + Sync + Debug, + version: impl AsRef<str> + Send + Sync + Debug, schema: &serde_json::Value, ) -> Result<TransactionSchema, crate::ConfigurationError> { + debug!("creating transaction schema"); sqlx::query_as!( TransactionSchema, - "insert into transaction_schema (type, version, json_schema) values ($1, $2, $3) - returning - type as kind, - version, - json_schema as schema, - created_at, - updated_at - ", + "insert into transaction_schema (schema_type, schema_version, schema) values ($1, $2, $3) + returning * + ", kind.as_ref(), version.as_ref(), sqlx::types::Json(&schema) as _ @@ -113,13 +120,15 @@ impl SchemaDriver for AppState { .map_err(|e| e.into()) } + #[tracing::instrument(skip(self))] async fn delete_schema( &self, - kind: impl AsRef<str> + Send + Sync, - version: impl AsRef<str> + Send + Sync, + kind: impl AsRef<str> + Send + Sync + Debug, + version: impl AsRef<str> + Send + Sync + Debug, ) -> Result<(), crate::ConfigurationError> { + debug!("deleting transaction schema"); sqlx::query!( - "delete from transaction_schema where type = $1 and version = $2", + "delete from transaction_schema where schema_type = $1 and schema_version = $2", kind.as_ref(), version.as_ref(), ) @@ -128,20 +137,18 @@ impl SchemaDriver for AppState { Ok(()) } + #[tracing::instrument(skip(self))] async fn get_schema( &self, - kind: impl AsRef<str> + Send + Sync, - version: impl AsRef<str> + Send + Sync, + kind: impl AsRef<str> + Send + Sync + Debug, + version: impl AsRef<str> + Send + Sync + Debug, ) -> Result<Option<TransactionSchema>, crate::ConfigurationError> { + debug!("getting transaction schema"); let result = sqlx::query_as!( TransactionSchema, "select - type as kind, - version, - json_schema as schema, - created_at, - updated_at - from transaction_schema where type = $1 and version = $2", + * + from transaction_schema where schema_type = $1 and schema_version = $2", kind.as_ref(), version.as_ref(), ) @@ -151,34 +158,71 @@ impl SchemaDriver for AppState { Ok(result) } + #[tracing::instrument(skip(self, schema))] async fn update_schema( &self, - kind: impl AsRef<str> + Send + Sync, - version: impl AsRef<str> + Send + Sync, + kind: impl AsRef<str> + Send + Sync + Debug, + version: impl AsRef<str> + Send + Sync + Debug, schema: &serde_json::Value, - ) -> Result<TransactionSchema, crate::ConfigurationError> { - sqlx::query_as!(TransactionSchema, - " + ) -> Result<Option<TransactionSchema>, crate::ConfigurationError> { + debug!("updating transaction schema"); + sqlx::query_as!( + TransactionSchema, + " update transaction_schema set - json_schema = $3 + schema = $3 where - type = $1 - and version = $2 - returning - type as kind, - version, - json_schema as schema, - created_at, - updated_at + schema_type = $1 + and schema_version = $2 + returning * ", kind.as_ref(), version.as_ref(), sqlx::types::Json(&schema) as _ ) - .fetch_one(&self.database) + .fetch_optional(&self.database) .await .map_err(|e| e.into()) } + + #[tracing::instrument(skip(self))] + async fn get_schemas( + &self, + limit: i64, + first: Option<i64>, + after: Option<impl AsRef<str> + Send + Sync + Debug>, + ) -> Result<Vec<TransactionSchema>, ConfigurationError> { + debug!("getting transaction schemas"); + let limit = first.unwrap_or(limit); + let mut last_type = String::default(); + let mut last_version = String::default(); + + if let Some(s) = after { + let parts: Vec<&str> = s.as_ref().split(',').collect(); + if parts.len() == 2 { + last_type = parts[0].to_string(); + last_version = parts[1].to_string(); + } + } + + let rows = sqlx::query_as!( + TransactionSchema, + " + select * + from transaction_schema + where ($1 = '' or (schema_type, schema_version) > ($1, $2)) + order by schema_type asc, schema_version asc + limit $3 + ", + &last_type, + &last_version, + limit + 1 + ) + .fetch_all(&self.database) + .await?; + + Ok(rows) + } } diff --git a/lib/warden-core/src/config/cli/mod.rs b/lib/warden-core/src/config/cli/mod.rs index 36f6bf0..e0c5450 100644 --- a/lib/warden-core/src/config/cli/mod.rs +++ b/lib/warden-core/src/config/cli/mod.rs @@ -65,7 +65,7 @@ impl Default for Server { port: Some(2210), environment: Default::default(), log_level: Some(format!( - "{}=debug,tower_http=debug,axum::rejection=trace", + "{}=debug,tower_http=debug,axum::rejection=trace,sqlx=warn,debug", env!("CARGO_CRATE_NAME") )), log_dir: Some(std::env::temp_dir()), diff --git a/migrations/20260329120645_transaction_schema.sql b/migrations/20260329120645_transaction_schema.sql index d7b4744..8496c80 100644 --- a/migrations/20260329120645_transaction_schema.sql +++ b/migrations/20260329120645_transaction_schema.sql @@ -2,14 +2,14 @@ -- The transaction's blueprint to be checked on each request create table transaction_schema ( -- The transaction type - type text not null, + schema_type text not null, -- The schema's version (to allow for multiple revisions - maybe) - version varchar not null, + schema_version varchar not null, -- Actual JSON schema - json_schema jsonb not null, + schema jsonb not null, created_at timestamptz not null default now(), updated_at timestamptz not null default now(), - primary key (type, version) + primary key (schema_type, schema_version) ); create trigger update_transaction_schema_modtime diff --git a/warden/Cargo.toml b/warden/Cargo.toml index 1b7c42a..e373fb8 100644 --- a/warden/Cargo.toml +++ b/warden/Cargo.toml @@ -9,6 +9,7 @@ homepage.workspace = true [dependencies] anyhow = "1.0.102" axum = { version = "0.8.8", features = ["macros"] } +base64 = "0.22.1" clap.workspace = true jsonschema.workspace = true secrecy = { version = "0.10.3", features = ["serde"] } diff --git a/warden/src/server/api/mod.rs b/warden/src/server/api/mod.rs index 963a3ef..f5aa0c8 100644 --- a/warden/src/server/api/mod.rs +++ b/warden/src/server/api/mod.rs @@ -1,3 +1,4 @@ +pub mod pagination; pub mod transaction; pub mod version; diff --git a/warden/src/server/api/pagination.rs b/warden/src/server/api/pagination.rs new file mode 100644 index 0000000..7341d2f --- /dev/null +++ b/warden/src/server/api/pagination.rs @@ -0,0 +1,21 @@ +use api_config::schema::TransactionSchema; +use serde::{Deserialize, Serialize}; +use utoipa::{IntoParams, ToSchema}; + +#[derive(Deserialize, Debug, IntoParams, ToSchema)] +pub struct PaginationParams { + pub first: Option<i64>, + pub after: Option<String>, // This is our Base64 cursor +} + +#[derive(Serialize, ToSchema)] +pub struct PageInfo { + pub has_next_page: bool, + pub end_cursor: Option<String>, +} + +#[derive(Serialize, ToSchema)] +pub struct RelayResponse { + pub nodes: Vec<TransactionSchema>, + pub page_info: PageInfo, +} diff --git a/warden/src/server/api/transaction.rs b/warden/src/server/api/transaction.rs index 9524354..03f8a1f 100644 --- a/warden/src/server/api/transaction.rs +++ b/warden/src/server/api/transaction.rs @@ -5,12 +5,10 @@ use utoipa::ToSchema; /// Transaction to monitor pub struct Transaction { - #[serde(rename = "type")] /// Transaction schema type - pub kind: String, + pub schema_type: String, /// The schema's version - pub version: String, + pub schema_version: String, /// Transaction data - #[serde(rename = "json_schema")] pub transaction: serde_json::Value, } diff --git a/warden/src/server/api/version.rs b/warden/src/server/api/version.rs index 32cfef3..f8d856a 100644 --- a/warden/src/server/api/version.rs +++ b/warden/src/server/api/version.rs @@ -9,9 +9,10 @@ use axum::{ use serde::{Deserialize, Serialize}; use utoipa::{IntoParams, ToSchema}; -#[derive(Deserialize,Debug, IntoParams)] +#[derive(Deserialize, Debug, IntoParams)] +#[serde(rename_all = "camelCase")] pub struct VersionPath { - pub version: Version, + pub api_version: Version, } #[derive(Debug, ToSchema, Deserialize, Serialize)] @@ -32,7 +33,7 @@ where parts.extract().await.map_err(IntoResponse::into_response)?; let version = params - .get("version") + .get("apiVersion") .ok_or_else(|| (StatusCode::NOT_FOUND, "version param missing").into_response())?; match version.as_str() { diff --git a/warden/src/server/middleware/extractors/transaction.rs b/warden/src/server/middleware/extractors/transaction.rs index 8d02f6e..cdee029 100644 --- a/warden/src/server/middleware/extractors/transaction.rs +++ b/warden/src/server/middleware/extractors/transaction.rs @@ -21,7 +21,7 @@ where .await .map_err(|e| (StatusCode::BAD_REQUEST, e.to_string()))?; - let schema_json = get_schema_from_db(&payload.kind) + let schema_json = get_schema_from_db(&payload.schema_type) .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid schema key".to_string()))?; let validator = Validator::new(&schema_json).map_err(|_| { diff --git a/warden/src/server/routes/config/logs.rs b/warden/src/server/routes/config/logs.rs index 45959c2..67bd14b 100644 --- a/warden/src/server/routes/config/logs.rs +++ b/warden/src/server/routes/config/logs.rs @@ -9,14 +9,9 @@ use crate::server::{api::SafeJson, routes::config::CONFIG}; #[derive(Deserialize, Debug, Clone, ToSchema)] /// Log level +#[serde(rename_all = "camelCase")] pub struct LogLevel { - #[schema( - examples( - "info", - "trace", - "warden=debug,tower_http=debug,axum::rejection=trace" - ) - )] + #[schema(examples("info", "trace", "warden=debug,tower_http=debug,axum::rejection=trace"))] log_level: String, } diff --git a/warden/src/server/routes/config/schema/create.rs b/warden/src/server/routes/config/schema/create.rs index b9a3b22..b2d3051 100644 --- a/warden/src/server/routes/config/schema/create.rs +++ b/warden/src/server/routes/config/schema/create.rs @@ -10,7 +10,11 @@ use axum::{ use tracing::{info, trace}; use warden_core::state::AppState; -use crate::server::{api::version::{Version, VersionPath}, error::AppError, routes::config::CONFIG}; +use crate::server::{ + api::version::{Version, VersionPath}, + error::AppError, + routes::config::CONFIG, +}; /// Save a transaction's schema #[utoipa::path( @@ -59,7 +63,7 @@ use crate::server::{api::version::{Version, VersionPath}, error::AppError, route content = CreateSchema ), params(VersionPath), - path = "/{version}/config/schema" + path = "/{apiVersion}/config/schema" )] #[tracing::instrument( name = "create_schema", @@ -67,7 +71,8 @@ use crate::server::{api::version::{Version, VersionPath}, error::AppError, route fields( request_id = %headers.get("x-request-id") .and_then(|v| v.to_str().ok()).expect("request id"), - kind = %body.kind, + schema_type = %body.schema_type, + schema_ver = %body.schema_version, ) )] #[debug_handler] @@ -87,7 +92,7 @@ pub async fn create_schema( info!("schema is valid. trying to save..."); let result = state - .create_schema(&body.kind, &body.version, &body.schema) + .create_schema(&body.schema_type, &body.schema_version, &body.schema) .await .map_err(|e| match e { api_config::ConfigurationError::Database(ref error) => match error { diff --git a/warden/src/server/routes/config/schema/delete.rs b/warden/src/server/routes/config/schema/delete.rs index cfafb68..a761203 100644 --- a/warden/src/server/routes/config/schema/delete.rs +++ b/warden/src/server/routes/config/schema/delete.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use api_config::schema::{SchemaDriver}; +use api_config::schema::SchemaDriver; use axum::{ debug_handler, extract::{Path, Query, State}, @@ -11,18 +11,22 @@ use serde::{Deserialize, Serialize}; use utoipa::{IntoParams, ToSchema}; use warden_core::state::AppState; -use crate::server::{api::version::{Version, VersionPath}, error::AppError, routes::config::CONFIG}; +use crate::server::{ + api::version::{Version, VersionPath}, + error::AppError, + routes::config::CONFIG, +}; /// Schema delete query #[derive(Deserialize, Serialize, IntoParams, ToSchema)] +#[serde(rename_all = "camelCase")] pub struct SchemaDeleteQuery { /// Schema type - #[serde(rename = "type")] #[param(example = "custom.schema")] - kind: String, + schema_type: String, /// Schema version #[param(example = "1.0.0")] - version: String, + schema_version: String, } /// Delete a transaction's schema @@ -60,7 +64,7 @@ pub struct SchemaDeleteQuery { ), operation_id = "delete_schema", // https://github.com/juhaku/utoipa/issues/1170 tag = CONFIG, - path = "/{version}/config/schema", + path = "/{apiVersion}/config/schema", params(VersionPath, SchemaDeleteQuery), )] #[tracing::instrument( @@ -69,7 +73,8 @@ pub struct SchemaDeleteQuery { fields( request_id = %headers.get("x-request-id") .and_then(|v| v.to_str().ok()).expect("request id"), - kind = %body.kind, + schema_type = %body.schema_type, + schema_ver = %body.schema_version, ) )] #[debug_handler] @@ -81,7 +86,7 @@ pub async fn delete_schema( ) -> Result<impl IntoResponse, AppError> { // TODO: should also clear cached ones eventually state - .delete_schema(&body.kind, &body.version) + .delete_schema(&body.schema_type, &body.schema_version) .await?; Ok(StatusCode::NO_CONTENT) } diff --git a/warden/src/server/routes/config/schema/mod.rs b/warden/src/server/routes/config/schema/mod.rs index 6ab66a1..3fbdb81 100644 --- a/warden/src/server/routes/config/schema/mod.rs +++ b/warden/src/server/routes/config/schema/mod.rs @@ -13,6 +13,7 @@ pub fn router(store: Arc<AppState>) -> OpenApiRouter { .routes(utoipa_axum::routes!(create::create_schema)) .routes(utoipa_axum::routes!(delete::delete_schema)) .routes(utoipa_axum::routes!(read::get_schema)) + .routes(utoipa_axum::routes!(read::get_schemas)) .routes(utoipa_axum::routes!(update::update_schema)) .with_state(store) } diff --git a/warden/src/server/routes/config/schema/read.rs b/warden/src/server/routes/config/schema/read.rs index 17fc3e2..1e87626 100644 --- a/warden/src/server/routes/config/schema/read.rs +++ b/warden/src/server/routes/config/schema/read.rs @@ -7,24 +7,18 @@ use axum::{ http::{HeaderMap, StatusCode}, response::IntoResponse, }; -use serde::{Deserialize, Serialize}; +use base64::{Engine, engine::general_purpose}; use tracing::debug; -use utoipa::{IntoParams, ToSchema}; use warden_core::state::AppState; -use crate::server::{api::version::{Version, VersionPath}, error::AppError, routes::config::CONFIG}; - -/// Schema search query -#[derive(Deserialize, Serialize, IntoParams, ToSchema)] -pub struct SchemaSearchQuery { - /// Schema type - #[serde(rename = "type")] - #[param(example = "custom.schema")] - kind: String, - /// Schema version - #[param(example = "1.0.0")] - version: String, -} +use crate::server::{ + api::{ + pagination::{PageInfo, PaginationParams, RelayResponse}, + version::{Version, VersionPath}, + }, + error::AppError, + routes::config::CONFIG, +}; /// Get a transaction's schema #[utoipa::path( @@ -34,7 +28,7 @@ pub struct SchemaSearchQuery { status = 200, description = "Lookup results", headers( - ("x-request-id", description = "Request identifier") + ("x-request-id" = Uuid, description = "Request identifier") ), body = Option<TransactionSchema> ), @@ -42,56 +36,60 @@ pub struct SchemaSearchQuery { status = 400, description = "Invalid request", headers( - ("x-request-id", description = "Request identifier") + ("x-request-id" = Uuid, description = "Request identifier") ) ), ( status = 404, description = "Schema not found", headers( - ("x-request-id", description = "Request identifier") + ("x-request-id" = Uuid, description = "Request identifier") ) ), ( status = 405, description = "Method not allowed", headers( - ("x-request-id", description = "Request identifier") + ("x-request-id" = Uuid, description = "Request identifier") ) ), ( status = 500, description = "Internal server error", headers( - ("x-request-id", description = "Request identifier") + ("x-request-id" = Uuid, description = "Request identifier") ) ) ), operation_id = "get_schema", // https://github.com/juhaku/utoipa/issues/1170 tag = CONFIG, - path = "/{version}/config/schema", - params(VersionPath, SchemaSearchQuery), + path = "/{apiVersion}/config/schema/{schemaType}/{schemaVersion}", + params( + VersionPath, + ("schemaType" = String, Path, example = "custom.schema"), + ("schemaVersion" = String, Path, example = "1.0.0"), + ), )] #[tracing::instrument( name = "get_schema", - skip(state, headers, body), + skip(state, headers), fields( request_id = %headers.get("x-request-id") .and_then(|v| v.to_str().ok()).expect("request id"), - kind = %body.kind, + schema_type = %schema_type, + schema_ver = %schema_version, ) )] #[debug_handler] pub async fn get_schema( State(state): State<Arc<AppState>>, headers: HeaderMap, - Path(version): Path<Version>, - body: Query<SchemaSearchQuery>, + Path((version, schema_type, schema_version)): Path<(Version, String, String)>, ) -> Result<impl IntoResponse, AppError> { debug!("searching for schema"); // TODO: get from cache let result = state - .get_schema(&body.kind, &body.version) + .get_schema(&schema_type, &schema_version) .await .map_err(|e| match e { api_config::ConfigurationError::Database(ref error) => match error { @@ -114,3 +112,109 @@ pub async fn get_schema( )) } } + +/// Get transaction schemas +#[utoipa::path( + get, + responses( + ( + status = 200, + description = "Lookup results", + headers( + ("x-request-id" = Uuid, description = "Request identifier") + ), + body = RelayResponse + ), + ( + status = 400, + description = "Invalid request", + headers( + ("x-request-id" = Uuid, description = "Request identifier") + ) + ), + ( + status = 404, + description = "Schema not found", + headers( + ("x-request-id" = Uuid, description = "Request identifier") + ) + ), + ( + status = 405, + description = "Method not allowed", + headers( + ("x-request-id" = Uuid, description = "Request identifier") + ) + ), + ( + status = 500, + description = "Internal server error", + headers( + ("x-request-id" = Uuid, description = "Request identifier") + ) + ) + ), + operation_id = "get_schemas", // https://github.com/juhaku/utoipa/issues/1170 + tag = CONFIG, + path = "/{apiVersion}/config/schema", + params(VersionPath), +)] +#[tracing::instrument( + name = "get_schemas", + skip(state, headers), + fields( + request_id = %headers.get("x-request-id") + .and_then(|v| v.to_str().ok()).expect("request id"), + ) +)] +#[debug_handler] +pub async fn get_schemas( + State(state): State<Arc<AppState>>, + headers: HeaderMap, + Path(version): Path<Version>, + params: Query<PaginationParams>, +) -> Result<impl IntoResponse, AppError> { + debug!("searching for schema"); + let mut cursor = None; + let limit = 10; + + // 1. Decode Cursor + if let Some(ref cursor_str) = params.after { + let decoded = general_purpose::STANDARD.decode(cursor_str).unwrap(); + cursor = Some(String::from_utf8(decoded).unwrap()); + } + + // TODO: get from cache + let rows = state + .get_schemas(limit, params.first, cursor) + .await + .map_err(|e| match e { + api_config::ConfigurationError::Database(ref error) => match error { + sqlx::Error::Database(db_err) if db_err.code() == Some("23505".into()) => { + AppError::new( + StatusCode::CONFLICT, + anyhow::anyhow!("Transaction schema already exists"), + ) + } + _ => e.into(), + }, + _ => e.into(), + })?; + + // 3. Process Relay Metadata + let has_next_page = rows.len() > limit as usize; + let nodes: Vec<TransactionSchema> = rows.into_iter().take(limit as usize).collect(); + + let end_cursor = nodes.last().map(|node| { + let raw_cursor = format!("{},{}", node.schema_type, node.schema_version); + general_purpose::STANDARD.encode(raw_cursor) + }); + + Ok(Json(RelayResponse { + nodes, + page_info: PageInfo { + has_next_page, + end_cursor, + }, + })) +} diff --git a/warden/src/server/routes/config/schema/update.rs b/warden/src/server/routes/config/schema/update.rs index a03e8e8..f116e0f 100644 --- a/warden/src/server/routes/config/schema/update.rs +++ b/warden/src/server/routes/config/schema/update.rs @@ -1,15 +1,19 @@ use std::sync::Arc; -use api_config::schema::{CreateSchema, SchemaDriver}; +use api_config::schema::{CreateSchema, SchemaDriver, TransactionSchema}; use axum::{ Json, debug_handler, - extract::State, + extract::{Path, State}, http::{HeaderMap, StatusCode}, response::IntoResponse, }; use warden_core::state::AppState; -use crate::server::{api::version::Version, error::AppError, routes::config::CONFIG}; +use crate::server::{ + api::version::{Version, VersionPath}, + error::AppError, + routes::config::CONFIG, +}; /// Update a transaction's schema #[utoipa::path( @@ -17,30 +21,38 @@ use crate::server::{api::version::Version, error::AppError, routes::config::CONF responses( ( status = 200, - description = "The schema has been deleted", + description = "The updated schema", headers( - ("x-request-id", description = "Request identifier") - ) + ("x-request-id" = Uuid, description = "Request identifier") + ), + body = TransactionSchema, ), ( status = 400, description = "Invalid request", headers( - ("x-request-id", description = "Request identifier") + ("x-request-id" = Uuid, description = "Request identifier") + ) + ), + ( + status = 404, + description = "No schema found to update", + headers( + ("x-request-id" = Uuid, description = "Request identifier") ) ), ( status = 405, description = "Method not allowed", headers( - ("x-request-id", description = "Request identifier") + ("x-request-id" = Uuid, description = "Request identifier") ) ), ( status = 500, description = "Internal server error", headers( - ("x-request-id", description = "Request identifier") + ("x-request-id" = Uuid, description = "Request identifier") ) ) ), @@ -49,29 +61,40 @@ use crate::server::{api::version::Version, error::AppError, routes::config::CONF request_body( content = CreateSchema ), - path = "/{version}/config/schema", + path = "/{apiVersion}/config/schema/{schemaType}/{schemaVersion}", params( - ("version" = Version, Path, description = "API version, e.g., v1, v2, v3") + VersionPath, + ("schemaType" = String, Path, example = "custom.schema"), + ("schemaVersion" = String, Path, example = "1.0.0"), ), )] #[tracing::instrument( - name = "delete_schema", skip(state, headers, body), fields( request_id = %headers.get("x-request-id") .and_then(|v| v.to_str().ok()).expect("request id"), - kind = %body.kind, + schema_type = %schema_type, + schema_ver = %schema_version, ) )] #[debug_handler] pub async fn update_schema( State(state): State<Arc<AppState>>, headers: HeaderMap, - Json(body): Json<CreateSchema>, + Path((version, schema_type, schema_version)): Path<(Version, String, String)>, + Json(body): Json<serde_json::Value>, ) -> Result<impl IntoResponse, AppError> { // TODO: should also clear cached ones eventually - state - .update_schema(&body.kind, &body.version, &body.schema) + let result = state + .update_schema(schema_type, schema_version, &body) .await?; - Ok(StatusCode::OK) + + if let Some(result) = result { + Ok(Json(result)) + } else { + Err(AppError::new( + StatusCode::NOT_FOUND, + anyhow::anyhow!("Resource not found"), + )) + } } diff --git a/warden/src/server/routes/transaction_monitoring/monitor.rs b/warden/src/server/routes/transaction_monitoring/monitor.rs index 0c60117..817dbca 100644 --- a/warden/src/server/routes/transaction_monitoring/monitor.rs +++ b/warden/src/server/routes/transaction_monitoring/monitor.rs @@ -1,10 +1,16 @@ use std::sync::Arc; use crate::server::{ - api::{transaction::Transaction, version::Version}, + api::{ + transaction::Transaction, + version::{Version, VersionPath}, + }, middleware::extractors::transaction::ValidatedTransaction, }; -use axum::{extract::State, http::StatusCode}; +use axum::{ + extract::{Path, State}, + http::StatusCode, +}; use warden_core::state::AppState; /// Submit a transaction for monitoring @@ -38,9 +44,9 @@ use warden_core::state::AppState; request_body( content = LogLevel ), - path = "/{version}/monitor", + path = "/{apiVersion}/monitor", params( - ("version" = Version, Path, description = "API version, e.g., v1, v2, v3") + VersionPath ), request_body( content = Transaction @@ -49,6 +55,7 @@ use warden_core::state::AppState; ] pub async fn reload( State(state): State<Arc<AppState>>, + Path(version): Path<Version>, ValidatedTransaction(body): ValidatedTransaction<serde_json::Value>, ) -> StatusCode { dbg!(&body); |
