mod create_schema; mod delete_schema; mod get_schema; mod implementation; mod list_schemas; mod pagination; mod update_schema; pub use create_schema::CreateSchema; use sqlx::PgPool; use async_trait::async_trait; use serde::{Deserialize, Serialize}; use std::fmt::Debug; use time::OffsetDateTime; use warden_core::pagination::Connection; use warden_core::pagination::PaginationArgs; use crate::ConfigurationError; /// Transaction's schema #[derive(Clone, Deserialize, Debug, Serialize)] #[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))] #[cfg_attr(feature = "utoipa", schema(example = json!({ "schemaType": "custom.schema", "schemaVersion": "1.0.0", "schema": { "$schema": "https://json-schema.org/draft/2020-12/schema", "title": "FinancialTransaction", "type": "object", "required": ["transactionId", "amount", "currency", "timestamp"], "properties": { "transactionId": { "type": "string", "format": "uuid" }, "amount": { "type": "number", "exclusiveMinimum": 0 }, "currency": { "type": "string", "pattern": "^[A-Z]{3}$", "description": "ISO 4217 Alpha-3 code (e.g., USD, EUR)" }, "timestamp": { "type": "string", "format": "date-time" }, } }, "createdAt": time::OffsetDateTime::now_utc().format(&time::format_description::well_known::Rfc3339).unwrap(), "updatedAt": time::OffsetDateTime::now_utc().format(&time::format_description::well_known::Rfc3339).unwrap(), })))] #[serde(rename_all = "camelCase")] pub struct TransactionSchema { /// Transaction schema id #[serde(skip)] pub id: i64, /// Transaction schema type pub schema_type: String, /// The schema's version pub schema_version: String, /// JSON schema for transcation pub schema: serde_json::Value, /// When the schema was created #[serde(with = "time::serde::rfc3339")] pub created_at: OffsetDateTime, /// When the schema was last updated #[serde(with = "time::serde::rfc3339")] pub updated_at: OffsetDateTime, } /// The type implementing [SchemaDriver] #[derive(Debug)] pub struct SchemaService { pub(crate) database: PgPool, } impl SchemaService { pub fn new(database: PgPool) -> Self { Self { database } } } #[async_trait] pub trait SchemaDriver: Send + Sync { async fn create_schema( &self, kind: &str, version: &str, schema: &serde_json::Value, ) -> Result; async fn delete_schema(&self, kind: &str, version: &str) -> Result<(), ConfigurationError>; async fn get_schema( &self, kind: &str, version: &str, ) -> Result, ConfigurationError>; async fn update_schema( &self, kind: &str, version: &str, schema: &serde_json::Value, ) -> Result, ConfigurationError>; async fn list_schemas( &self, input: &PaginationArgs, limit: i64, ) -> Result, ConfigurationError>; } #[cfg(test)] mod tests { use sqlx::PgPool; use crate::schema::{SchemaDriver, SchemaService}; #[sqlx::test( migrator = "crate::MIGRATOR", fixtures(path = "../../tests/fixtures", scripts("schema")) )] async fn schema(pool: PgPool) -> anyhow::Result<()> { let driver = SchemaService::new(pool); // 2. Define Fixtures let kind = "fin_tx_v1"; let version = "1.0.0"; let min_schema = serde_json::json!({ "type": "object", "required": ["amount", "currency"], "properties": { "amount": { "type": "integer" }, "ccy": { "type": "string" } } }); // CREATE let created = driver .create_schema(kind, version, &min_schema) .await .expect("Create failed"); assert_eq!(created.schema_type, kind); // CREATE AGAIN SHOULD FAIL (CONFLICTING IDs) let created = driver.create_schema(kind, version, &min_schema).await; assert!(created.is_err()); // GET let found = driver .get_schema("payment", "1.0.0") .await .expect("Get failed") .expect("Schema missing"); assert_eq!(found.schema["required"], min_schema["required"]); // UPDATE let updated_json = serde_json::json!({"type": "object", "note": "updated"}); let updated = driver .update_schema(kind, version, &updated_json) .await .expect("Update failed") .expect("No row to update"); assert_eq!(updated.schema["note"], "updated"); // DELETE driver .delete_schema(kind, version) .await .expect("Delete failed"); let final_check = driver.get_schema(kind, version).await.unwrap(); assert!(final_check.is_none()); Ok(()) } }