mod create_schema; mod delete_schema; mod get_schema; mod implementation; mod update_schema; pub use create_schema::CreateSchema; use sqlx::PgPool; use async_trait::async_trait; use serde::{Deserialize, Serialize}; use std::fmt::Debug; use time::OffsetDateTime; use crate::ConfigurationError; /// Transaction to monitor #[derive(Deserialize, Debug, Serialize)] #[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))] #[cfg_attr(feature = "utoipa", schema(example = json!({ "schemaType": "custom.schema", "schemaVersion": "1.0.0", "schema": { "$schema": "https://json-schema.org/draft/2020-12/schema", "title": "FinancialTransaction", "type": "object", "required": ["transactionId", "amount", "currency", "timestamp"], "properties": { "transactionId": { "type": "string", "format": "uuid" }, "amount": { "type": "number", "exclusiveMinimum": 0 }, "currency": { "type": "string", "pattern": "^[A-Z]{3}$", "description": "ISO 4217 Alpha-3 code (e.g., USD, EUR)" }, "timestamp": { "type": "string", "format": "date-time" }, } }, "createdAt": time::OffsetDateTime::now_utc().format(&time::format_description::well_known::Rfc3339).unwrap(), "updatedAt": time::OffsetDateTime::now_utc().format(&time::format_description::well_known::Rfc3339).unwrap(), })))] #[serde(rename_all = "camelCase")] pub struct TransactionSchema { /// Transaction schema type pub schema_type: String, /// The schema's version pub schema_version: String, /// JSON schema for transcation pub schema: serde_json::Value, /// When the schema was created #[serde(with = "time::serde::rfc3339")] pub created_at: OffsetDateTime, /// When the schema was last updated #[serde(with = "time::serde::rfc3339")] pub updated_at: OffsetDateTime, } pub struct SchemaService { pub database: PgPool, } #[async_trait] pub trait SchemaDriver: Send + Sync { async fn create_schema( &self, kind: &str, version: &str, schema: &serde_json::Value, ) -> Result; async fn delete_schema(&self, kind: &str, version: &str) -> Result<(), ConfigurationError>; async fn get_schema( &self, kind: &str, version: &str, ) -> Result, ConfigurationError>; async fn update_schema( &self, kind: &str, version: &str, schema: &serde_json::Value, ) -> Result, ConfigurationError>; async fn get_schemas( &self, limit: i64, first: Option, after: Option<&str>, ) -> Result, ConfigurationError>; } #[cfg(test)] mod tests { use sqlx::PgPool; use crate::schema::{SchemaDriver, SchemaService}; #[sqlx::test( migrator = "crate::MIGRATOR", fixtures(path = "../../tests/fixtures", scripts("schema")) )] async fn schema(pool: PgPool) -> anyhow::Result<()> { let driver = SchemaService { database: pool }; // 2. Define Fixtures let kind = "fin_tx_v1"; let version = "1.0.0"; let min_schema = serde_json::json!({ "type": "object", "required": ["amount", "currency"], "properties": { "amount": { "type": "integer" }, "ccy": { "type": "string" } } }); // CREATE let created = driver .create_schema(kind, version, &min_schema) .await .expect("Create failed"); assert_eq!(created.schema_type, kind); // GET let found = driver .get_schema("payment", "1.0.0") .await .expect("Get failed") .expect("Schema missing"); assert_eq!(found.schema["required"], min_schema["required"]); // UPDATE let updated_json = serde_json::json!({"type": "object", "note": "updated"}); let updated = driver .update_schema(kind, version, &updated_json) .await .expect("Update failed") .expect("No row to update"); assert_eq!(updated.schema["note"], "updated"); // DELETE driver .delete_schema(kind, version) .await .expect("Delete failed"); let final_check = driver.get_schema(kind, version).await.unwrap(); assert!(final_check.is_none()); Ok(()) } }