mod create; pub use create::CreateSchema; use async_trait::async_trait; use serde::{Deserialize, Serialize}; use time::OffsetDateTime; use warden_core::state::AppState; use crate::ConfigurationError; /// Transaction to monitor #[derive(Deserialize, Serialize)] #[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))] pub struct TransactionSchema { #[serde(rename = "type")] /// Transaction schema type pub kind: String, /// The schema's version pub version: String, /// JSON schema for transcation #[serde(rename = "json_schema")] pub schema: serde_json::Value, #[serde(with = "time::serde::rfc3339")] pub created_at: OffsetDateTime, #[serde(with = "time::serde::rfc3339")] pub updated_at: OffsetDateTime, } #[async_trait] pub trait SchemaDriver { async fn create_schema( &self, kind: impl AsRef + Send + Sync, version: impl AsRef + Send + Sync, schema: &serde_json::Value, ) -> Result; async fn delete_schema( &self, kind: impl AsRef + Send + Sync, version: impl AsRef + Send + Sync, ) -> Result<(), ConfigurationError>; async fn get_schema( &self, kind: impl AsRef + Send + Sync, version: impl AsRef + Send + Sync, ) -> Result, ConfigurationError>; async fn update_schema( &self, kind: impl AsRef + Send + Sync, version: impl AsRef + Send + Sync, schema: &serde_json::Value, ) -> Result; } #[async_trait] impl SchemaDriver for AppState { async fn create_schema( &self, kind: impl AsRef + Send + Sync, version: impl AsRef + Send + Sync, schema: &serde_json::Value, ) -> Result { sqlx::query_as!( TransactionSchema, "insert into transaction_schema (type, version, json_schema) values ($1, $2, $3) returning type as kind, version, json_schema as schema, created_at, updated_at ", kind.as_ref(), version.as_ref(), sqlx::types::Json(&schema) as _ ) .fetch_one(&self.database) .await .map_err(|e| e.into()) } async fn delete_schema( &self, kind: impl AsRef + Send + Sync, version: impl AsRef + Send + Sync, ) -> Result<(), crate::ConfigurationError> { sqlx::query!( "delete from transaction_schema where type = $1 and version = $2", kind.as_ref(), version.as_ref(), ) .execute(&self.database) .await?; Ok(()) } async fn get_schema( &self, kind: impl AsRef + Send + Sync, version: impl AsRef + Send + Sync, ) -> Result, crate::ConfigurationError> { let result = sqlx::query_as!( TransactionSchema, "select type as kind, version, json_schema as schema, created_at, updated_at from transaction_schema where type = $1 and version = $2", kind.as_ref(), version.as_ref(), ) .fetch_optional(&self.database) .await?; Ok(result) } async fn update_schema( &self, kind: impl AsRef + Send + Sync, version: impl AsRef + Send + Sync, schema: &serde_json::Value, ) -> Result { sqlx::query_as!(TransactionSchema, " update transaction_schema set json_schema = $3 where type = $1 and version = $2 returning type as kind, version, json_schema as schema, created_at, updated_at ", kind.as_ref(), version.as_ref(), sqlx::types::Json(&schema) as _ ) .fetch_one(&self.database) .await .map_err(|e| e.into()) } }