use async_trait::async_trait; use tracing::debug; use crate::schema::{self, SchemaDriver, SchemaService, TransactionSchema}; #[async_trait] impl SchemaDriver for SchemaService { #[tracing::instrument(skip(self, schema))] async fn create_schema( &self, kind: &str, version: &str, schema: &serde_json::Value, ) -> Result { schema::create_schema::create_schema(self, kind, version, schema).await } #[tracing::instrument(skip(self))] async fn delete_schema( &self, kind: &str, version: &str, ) -> Result<(), crate::ConfigurationError> { schema::delete_schema::delete_schema(self, kind, version).await } #[tracing::instrument(skip(self))] async fn get_schema( &self, kind: &str, version: &str, ) -> Result, crate::ConfigurationError> { schema::get_schema::get_schema(self, kind, version).await } #[tracing::instrument(skip(self, schema))] async fn update_schema( &self, kind: &str, version: &str, schema: &serde_json::Value, ) -> Result, crate::ConfigurationError> { schema::update_schema::update_schema(self, kind, version, schema).await } #[tracing::instrument(skip(self))] async fn get_schemas( &self, limit: i64, first: Option, after: Option<&str>, ) -> Result, crate::ConfigurationError> { debug!("getting transaction schemas"); let limit = first.unwrap_or(limit); let mut last_type = String::default(); let mut last_version = String::default(); if let Some(s) = after { let parts: Vec<&str> = s.split(',').collect(); if parts.len() == 2 { last_type = parts[0].to_string(); last_version = parts[1].to_string(); } } let rows = sqlx::query_as!( TransactionSchema, " select * from transaction_schema where ($1 = '' or (schema_type, schema_version) > ($1, $2)) order by schema_type asc, schema_version asc limit $3 ", &last_type, &last_version, limit + 1 ) .fetch_all(&self.database) .await?; Ok(rows) } }