aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrtkay123 <dev@kanjala.com>2026-04-01 09:05:33 +0200
committerrtkay123 <dev@kanjala.com>2026-04-01 09:05:33 +0200
commitdaeb5311840680599a0ce6e49d181b9289010f68 (patch)
treed0c9c040ca003a6d431781b867c4290cbe5c9ef2
parent2c336f0339747aa77a8fe6613b83200c8d4902a5 (diff)
downloadwarden-daeb5311840680599a0ce6e49d181b9289010f68.tar.bz2
warden-daeb5311840680599a0ce6e49d181b9289010f68.zip
feat(schema): cursor pagination
-rw-r--r--.sqlx/query-3bceb45ae115dd44778ad979e276f0d2a525b5d51d32ac2c9c8ced801e273ad2.json14
-rw-r--r--.sqlx/query-502a6d7fb354206edfeabe452d5705efb0ed3ac5bfaa362be8ef615a2ee5f4a3.json14
-rw-r--r--.sqlx/query-528f112b0ca941f4f1dbd61c36db7f8fa12d82bbde8f9e8abeb6f8961d18a971.json55
-rw-r--r--.sqlx/query-5a3f8243d2641834e14a9c6888d54f6c5fbfeec32eb9fd3836251b26ecc97344.json14
-rw-r--r--.sqlx/query-bd0c565ebe2ea4b64d4c8940ac7162cf70134d0af9c85e2571d2727ffd1bc767.json (renamed from .sqlx/query-48a65761a26b8bfae728cc17754849ec1c1593179da57b1f87bc9a18860ad75f.json)19
-rw-r--r--Cargo.lock3
-rw-r--r--Cargo.toml1
-rw-r--r--lib/api-config/Cargo.toml1
-rw-r--r--lib/api-config/src/error.rs6
-rw-r--r--lib/api-config/src/lib.rs8
-rw-r--r--lib/api-config/src/schema/create_schema.rs2
-rw-r--r--lib/api-config/src/schema/implementation.rs42
-rw-r--r--lib/api-config/src/schema/list_schemas.rs213
-rw-r--r--lib/api-config/src/schema/mod.rs34
-rw-r--r--lib/api-config/src/schema/pagination.rs9
-rw-r--r--lib/api-config/tests/fixtures/schema.sql25
-rw-r--r--lib/warden-core/Cargo.toml2
-rw-r--r--lib/warden-core/src/config/cli/database.rs2
-rw-r--r--lib/warden-core/src/error.rs4
-rw-r--r--lib/warden-core/src/lib.rs1
-rw-r--r--lib/warden-core/src/pagination/mod.rs58
-rw-r--r--migrations/20260329120645_transaction_schema.sql3
-rw-r--r--warden/Cargo.toml2
-rw-r--r--warden/src/main.rs4
-rw-r--r--warden/src/server/api/mod.rs1
-rw-r--r--warden/src/server/api/pagination.rs21
-rw-r--r--warden/src/server/api/transaction.rs1
-rw-r--r--warden/src/server/mod.rs7
-rw-r--r--warden/src/server/routes/config/schema/create.rs4
-rw-r--r--warden/src/server/routes/config/schema/delete.rs5
-rw-r--r--warden/src/server/routes/config/schema/mod.rs7
-rw-r--r--warden/src/server/routes/config/schema/read.rs47
-rw-r--r--warden/src/server/routes/config/schema/update.rs6
-rw-r--r--warden/src/server/routes/transaction_monitoring/monitor.rs4
-rw-r--r--warden/src/state/mod.rs3
35 files changed, 497 insertions, 145 deletions
diff --git a/.sqlx/query-3bceb45ae115dd44778ad979e276f0d2a525b5d51d32ac2c9c8ced801e273ad2.json b/.sqlx/query-3bceb45ae115dd44778ad979e276f0d2a525b5d51d32ac2c9c8ced801e273ad2.json
index 0fa6500..71caaab 100644
--- a/.sqlx/query-3bceb45ae115dd44778ad979e276f0d2a525b5d51d32ac2c9c8ced801e273ad2.json
+++ b/.sqlx/query-3bceb45ae115dd44778ad979e276f0d2a525b5d51d32ac2c9c8ced801e273ad2.json
@@ -5,26 +5,31 @@
"columns": [
{
"ordinal": 0,
+ "name": "id",
+ "type_info": "Int8"
+ },
+ {
+ "ordinal": 1,
"name": "schema_type",
"type_info": "Text"
},
{
- "ordinal": 1,
+ "ordinal": 2,
"name": "schema_version",
"type_info": "Varchar"
},
{
- "ordinal": 2,
+ "ordinal": 3,
"name": "schema",
"type_info": "Jsonb"
},
{
- "ordinal": 3,
+ "ordinal": 4,
"name": "created_at",
"type_info": "Timestamptz"
},
{
- "ordinal": 4,
+ "ordinal": 5,
"name": "updated_at",
"type_info": "Timestamptz"
}
@@ -41,6 +46,7 @@
false,
false,
false,
+ false,
false
]
},
diff --git a/.sqlx/query-502a6d7fb354206edfeabe452d5705efb0ed3ac5bfaa362be8ef615a2ee5f4a3.json b/.sqlx/query-502a6d7fb354206edfeabe452d5705efb0ed3ac5bfaa362be8ef615a2ee5f4a3.json
index 0453107..a4918e2 100644
--- a/.sqlx/query-502a6d7fb354206edfeabe452d5705efb0ed3ac5bfaa362be8ef615a2ee5f4a3.json
+++ b/.sqlx/query-502a6d7fb354206edfeabe452d5705efb0ed3ac5bfaa362be8ef615a2ee5f4a3.json
@@ -5,26 +5,31 @@
"columns": [
{
"ordinal": 0,
+ "name": "id",
+ "type_info": "Int8"
+ },
+ {
+ "ordinal": 1,
"name": "schema_type",
"type_info": "Text"
},
{
- "ordinal": 1,
+ "ordinal": 2,
"name": "schema_version",
"type_info": "Varchar"
},
{
- "ordinal": 2,
+ "ordinal": 3,
"name": "schema",
"type_info": "Jsonb"
},
{
- "ordinal": 3,
+ "ordinal": 4,
"name": "created_at",
"type_info": "Timestamptz"
},
{
- "ordinal": 4,
+ "ordinal": 5,
"name": "updated_at",
"type_info": "Timestamptz"
}
@@ -41,6 +46,7 @@
false,
false,
false,
+ false,
false
]
},
diff --git a/.sqlx/query-528f112b0ca941f4f1dbd61c36db7f8fa12d82bbde8f9e8abeb6f8961d18a971.json b/.sqlx/query-528f112b0ca941f4f1dbd61c36db7f8fa12d82bbde8f9e8abeb6f8961d18a971.json
new file mode 100644
index 0000000..7d6d63d
--- /dev/null
+++ b/.sqlx/query-528f112b0ca941f4f1dbd61c36db7f8fa12d82bbde8f9e8abeb6f8961d18a971.json
@@ -0,0 +1,55 @@
+{
+ "db_name": "PostgreSQL",
+ "query": "\n select * from (\n select * from transaction_schema\n where ($1::bigint is null or (id, schema_type, schema_version) < ($1, $2, $3))\n order by id desc, schema_type desc, schema_version desc\n limit $4\n ) sub\n order by id asc, schema_type asc, schema_version asc\n ",
+ "describe": {
+ "columns": [
+ {
+ "ordinal": 0,
+ "name": "id",
+ "type_info": "Int8"
+ },
+ {
+ "ordinal": 1,
+ "name": "schema_type",
+ "type_info": "Text"
+ },
+ {
+ "ordinal": 2,
+ "name": "schema_version",
+ "type_info": "Varchar"
+ },
+ {
+ "ordinal": 3,
+ "name": "schema",
+ "type_info": "Jsonb"
+ },
+ {
+ "ordinal": 4,
+ "name": "created_at",
+ "type_info": "Timestamptz"
+ },
+ {
+ "ordinal": 5,
+ "name": "updated_at",
+ "type_info": "Timestamptz"
+ }
+ ],
+ "parameters": {
+ "Left": [
+ "Int8",
+ "Text",
+ "Text",
+ "Int8"
+ ]
+ },
+ "nullable": [
+ false,
+ false,
+ false,
+ false,
+ false,
+ false
+ ]
+ },
+ "hash": "528f112b0ca941f4f1dbd61c36db7f8fa12d82bbde8f9e8abeb6f8961d18a971"
+}
diff --git a/.sqlx/query-5a3f8243d2641834e14a9c6888d54f6c5fbfeec32eb9fd3836251b26ecc97344.json b/.sqlx/query-5a3f8243d2641834e14a9c6888d54f6c5fbfeec32eb9fd3836251b26ecc97344.json
index 5e32b25..0c410f0 100644
--- a/.sqlx/query-5a3f8243d2641834e14a9c6888d54f6c5fbfeec32eb9fd3836251b26ecc97344.json
+++ b/.sqlx/query-5a3f8243d2641834e14a9c6888d54f6c5fbfeec32eb9fd3836251b26ecc97344.json
@@ -5,26 +5,31 @@
"columns": [
{
"ordinal": 0,
+ "name": "id",
+ "type_info": "Int8"
+ },
+ {
+ "ordinal": 1,
"name": "schema_type",
"type_info": "Text"
},
{
- "ordinal": 1,
+ "ordinal": 2,
"name": "schema_version",
"type_info": "Varchar"
},
{
- "ordinal": 2,
+ "ordinal": 3,
"name": "schema",
"type_info": "Jsonb"
},
{
- "ordinal": 3,
+ "ordinal": 4,
"name": "created_at",
"type_info": "Timestamptz"
},
{
- "ordinal": 4,
+ "ordinal": 5,
"name": "updated_at",
"type_info": "Timestamptz"
}
@@ -40,6 +45,7 @@
false,
false,
false,
+ false,
false
]
},
diff --git a/.sqlx/query-48a65761a26b8bfae728cc17754849ec1c1593179da57b1f87bc9a18860ad75f.json b/.sqlx/query-bd0c565ebe2ea4b64d4c8940ac7162cf70134d0af9c85e2571d2727ffd1bc767.json
index cee9aca..655e358 100644
--- a/.sqlx/query-48a65761a26b8bfae728cc17754849ec1c1593179da57b1f87bc9a18860ad75f.json
+++ b/.sqlx/query-bd0c565ebe2ea4b64d4c8940ac7162cf70134d0af9c85e2571d2727ffd1bc767.json
@@ -1,36 +1,42 @@
{
"db_name": "PostgreSQL",
- "query": "\n select *\n from transaction_schema\n where ($1 = '' or (schema_type, schema_version) > ($1, $2))\n order by schema_type asc, schema_version asc\n limit $3\n ",
+ "query": "\n select * from transaction_schema\n where ($1::bigint is null or (id, schema_type, schema_version) > ($1, $2, $3))\n order by id asc, schema_type asc, schema_version asc\n limit $4\n ",
"describe": {
"columns": [
{
"ordinal": 0,
+ "name": "id",
+ "type_info": "Int8"
+ },
+ {
+ "ordinal": 1,
"name": "schema_type",
"type_info": "Text"
},
{
- "ordinal": 1,
+ "ordinal": 2,
"name": "schema_version",
"type_info": "Varchar"
},
{
- "ordinal": 2,
+ "ordinal": 3,
"name": "schema",
"type_info": "Jsonb"
},
{
- "ordinal": 3,
+ "ordinal": 4,
"name": "created_at",
"type_info": "Timestamptz"
},
{
- "ordinal": 4,
+ "ordinal": 5,
"name": "updated_at",
"type_info": "Timestamptz"
}
],
"parameters": {
"Left": [
+ "Int8",
"Text",
"Text",
"Int8"
@@ -41,8 +47,9 @@
false,
false,
false,
+ false,
false
]
},
- "hash": "48a65761a26b8bfae728cc17754849ec1c1593179da57b1f87bc9a18860ad75f"
+ "hash": "bd0c565ebe2ea4b64d4c8940ac7162cf70134d0af9c85e2571d2727ffd1bc767"
}
diff --git a/Cargo.lock b/Cargo.lock
index 3acbf03..56cedb9 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -99,6 +99,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
+ "base64",
"serde",
"serde_json",
"sqlx",
@@ -3102,6 +3103,7 @@ dependencies = [
name = "warden-core"
version = "0.1.0"
dependencies = [
+ "base64",
"clap",
"serde",
"sqlx",
@@ -3109,6 +3111,7 @@ dependencies = [
"tracing",
"tracing-subscriber",
"url",
+ "utoipa",
]
[[package]]
diff --git a/Cargo.toml b/Cargo.toml
index bc284e9..6827457 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -12,6 +12,7 @@ publish = false
[workspace.dependencies]
anyhow = "1.0.102"
async-trait = "0.1.89"
+base64 = "0.22.1"
clap = "4.6.0"
jsonschema = "0.45.0"
secrecy = { version = "0.10.3", features = ["serde"] }
diff --git a/lib/api-config/Cargo.toml b/lib/api-config/Cargo.toml
index 6724306..e19fcb5 100644
--- a/lib/api-config/Cargo.toml
+++ b/lib/api-config/Cargo.toml
@@ -10,6 +10,7 @@ publish.workspace = true
[dependencies]
async-trait.workspace = true
+base64.workspace = true
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
diff --git a/lib/api-config/src/error.rs b/lib/api-config/src/error.rs
index b2dc9ad..a6c4991 100644
--- a/lib/api-config/src/error.rs
+++ b/lib/api-config/src/error.rs
@@ -10,4 +10,10 @@ pub enum ConfigurationError {
InvalidHeader { expected: String, found: String },
#[error("unknown data store error")]
Unknown,
+ #[error(transparent)]
+ Pagination(#[from] base64::DecodeError),
+ #[error(transparent)]
+ PaginationCursor(#[from] std::string::FromUtf8Error),
+ #[error(transparent)]
+ PaginationId(#[from] std::num::ParseIntError),
}
diff --git a/lib/api-config/src/lib.rs b/lib/api-config/src/lib.rs
index 0d0117a..c2c6ccb 100644
--- a/lib/api-config/src/lib.rs
+++ b/lib/api-config/src/lib.rs
@@ -1,5 +1,11 @@
+//! Configuration
+#![warn(missing_docs, missing_debug_implementations)]
mod error;
+/// Schema configuration implementation
pub mod schema;
+/// Errors returned by the configuration library
pub use error::ConfigurationError;
-pub(crate) static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!("../../migrations");
+#[cfg(test)]
+/// Database migrator
+pub static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!("../../migrations");
diff --git a/lib/api-config/src/schema/create_schema.rs b/lib/api-config/src/schema/create_schema.rs
index 493fb09..cdbc91f 100644
--- a/lib/api-config/src/schema/create_schema.rs
+++ b/lib/api-config/src/schema/create_schema.rs
@@ -6,7 +6,7 @@ use crate::{
schema::{SchemaService, TransactionSchema},
};
-#[derive(Deserialize, Serialize)]
+#[derive(Deserialize, Serialize, Debug)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
#[cfg_attr(feature = "utoipa", schema(example = json!({
"schemaType": "custom.schema",
diff --git a/lib/api-config/src/schema/implementation.rs b/lib/api-config/src/schema/implementation.rs
index c414879..ca0757f 100644
--- a/lib/api-config/src/schema/implementation.rs
+++ b/lib/api-config/src/schema/implementation.rs
@@ -1,7 +1,10 @@
use async_trait::async_trait;
use tracing::debug;
+use warden_core::pagination::{Connection, PaginationArgs};
-use crate::schema::{self, SchemaDriver, SchemaService, TransactionSchema};
+use crate::schema::{
+ self, SchemaDriver, SchemaService, TransactionSchema, pagination::DecodedSchemaPagination,
+};
#[async_trait]
impl SchemaDriver for SchemaService {
@@ -44,41 +47,12 @@ impl SchemaDriver for SchemaService {
}
#[tracing::instrument(skip(self))]
- async fn get_schemas(
+ async fn list_schemas(
&self,
+ input: &PaginationArgs,
limit: i64,
- first: Option<i64>,
- after: Option<&str>,
- ) -> Result<Vec<TransactionSchema>, crate::ConfigurationError> {
+ ) -> Result<Connection<TransactionSchema>, crate::ConfigurationError> {
debug!("getting transaction schemas");
- let limit = first.unwrap_or(limit);
- let mut last_type = String::default();
- let mut last_version = String::default();
-
- if let Some(s) = after {
- let parts: Vec<&str> = s.split(',').collect();
- if parts.len() == 2 {
- last_type = parts[0].to_string();
- last_version = parts[1].to_string();
- }
- }
-
- let rows = sqlx::query_as!(
- TransactionSchema,
- "
- select *
- from transaction_schema
- where ($1 = '' or (schema_type, schema_version) > ($1, $2))
- order by schema_type asc, schema_version asc
- limit $3
- ",
- &last_type,
- &last_version,
- limit + 1
- )
- .fetch_all(&self.database)
- .await?;
-
- Ok(rows)
+ schema::list_schemas::list_schemas(self, input, limit).await
}
}
diff --git a/lib/api-config/src/schema/list_schemas.rs b/lib/api-config/src/schema/list_schemas.rs
new file mode 100644
index 0000000..0b539ed
--- /dev/null
+++ b/lib/api-config/src/schema/list_schemas.rs
@@ -0,0 +1,213 @@
+use base64::{Engine, engine::general_purpose};
+use tracing::debug;
+use warden_core::pagination::{Connection, Edge, PageInfo, PaginationArgs};
+
+use crate::{
+ ConfigurationError,
+ schema::{SchemaService, TransactionSchema, pagination::DecodedSchemaPagination},
+};
+
+fn decode_cursors(
+ args: &PaginationArgs,
+ limit: i64,
+) -> Result<DecodedSchemaPagination, ConfigurationError> {
+ let limit = args
+ .first
+ .or(args.last)
+ .map(|value| if value > limit { limit } else { value })
+ .unwrap_or(limit);
+
+ // Fetch limit + 1 to check if there's a next/prev page
+ let query_limit = limit + 1;
+
+ let (id, type_cursor, ver_cursor) =
+ if let Some(cursor) = args.after.as_ref().or(args.before.as_ref()) {
+ let decoded = general_purpose::STANDARD.decode(cursor)?;
+ let s = String::from_utf8(decoded)?;
+ let parts: Vec<&str> = s.split(',').collect();
+ (
+ Some(parts[0].parse()?),
+ Some(parts[1].to_string()),
+ Some(parts[2].to_string()),
+ )
+ } else {
+ (None, None, None)
+ };
+
+ Ok(DecodedSchemaPagination {
+ type_cursor,
+ version_cursor: ver_cursor,
+ limit: query_limit,
+ is_last: args.last.is_some(),
+ base_limit: limit,
+ id,
+ })
+}
+
+pub(super) async fn list_schemas(
+ state: &SchemaService,
+ args: &PaginationArgs,
+ limit: i64,
+) -> Result<Connection<TransactionSchema>, ConfigurationError> {
+ debug!("listing schemas transaction schema");
+
+ let input = decode_cursors(args, limit)?;
+
+ let rows = if input.is_last {
+ // BACKWARD PAGINATION
+ sqlx::query_as!(
+ TransactionSchema,
+ "
+ select * from (
+ select * from transaction_schema
+ where ($1::bigint is null or (id, schema_type, schema_version) < ($1, $2, $3))
+ order by id desc, schema_type desc, schema_version desc
+ limit $4
+ ) sub
+ order by id asc, schema_type asc, schema_version asc
+ ",
+ input.id,
+ input.type_cursor,
+ input.version_cursor,
+ input.limit
+ )
+ .fetch_all(&state.database)
+ .await?
+ } else {
+ // FORWARD PAGINATION (Default)
+ sqlx::query_as!(
+ TransactionSchema,
+ "
+ select * from transaction_schema
+ where ($1::bigint is null or (id, schema_type, schema_version) > ($1, $2, $3))
+ order by id asc, schema_type asc, schema_version asc
+ limit $4
+ ",
+ input.id,
+ input.type_cursor,
+ input.version_cursor,
+ input.limit
+ )
+ .fetch_all(&state.database)
+ .await?
+ };
+
+ let res = encode_cursors(&rows, &input)?;
+
+ Ok(res)
+}
+
+fn encode_cursors(
+ rows: &[TransactionSchema],
+ args: &DecodedSchemaPagination,
+) -> Result<Connection<TransactionSchema>, ConfigurationError> {
+ let has_extra = rows.len() > args.base_limit as usize;
+ let final_rows = if has_extra {
+ if args.is_last {
+ &rows[1..]
+ } else {
+ &rows[..args.base_limit as usize]
+ }
+ } else {
+ rows
+ };
+
+ let edges = final_rows
+ .iter()
+ .map(|row| {
+ let raw_cursor = format!("{},{},{}", row.id, row.schema_type, row.schema_version);
+ Edge {
+ node: row.clone(),
+ cursor: general_purpose::STANDARD.encode(raw_cursor),
+ }
+ })
+ .collect::<Vec<_>>();
+
+ let page_info = PageInfo {
+ has_next_page: if args.is_last { false } else { has_extra },
+ has_previous_page: if args.is_last { has_extra } else { false },
+ start_cursor: edges.first().map(|e| e.cursor.clone()),
+ end_cursor: edges.last().map(|e| e.cursor.clone()),
+ };
+
+ Ok(Connection { edges, page_info })
+}
+
+#[cfg(test)]
+mod tests {
+ use base64::{Engine, engine::general_purpose};
+ use sqlx::PgPool;
+ use warden_core::pagination::PaginationArgs;
+
+ use crate::schema::{SchemaDriver, SchemaService};
+
+ #[sqlx::test(
+ migrator = "crate::MIGRATOR",
+ fixtures(path = "../../tests/fixtures", scripts("schema"))
+ )]
+ async fn test_forward_pagination(pool: PgPool) -> anyhow::Result<()> {
+ let get_count = 2;
+
+ // 1. Request first 2
+ let args = PaginationArgs {
+ first: Some(get_count),
+ after: None,
+ last: None,
+ before: None,
+ };
+ let limit = 10;
+ let driver = SchemaService { database: pool };
+
+ let res = driver.list_schemas(&args, limit).await?;
+
+ assert_eq!(res.edges.len(), get_count as usize);
+ dbg!(&res);
+ assert_eq!(res.edges[0].node.schema_type, "payment");
+ assert_eq!(res.edges[0].node.schema_version, "1.0.0");
+ assert!(res.page_info.has_next_page);
+
+ // 2. Request next 2 using the end_cursor
+ let args_next = PaginationArgs {
+ first: Some(2),
+ after: res.page_info.end_cursor,
+ last: None,
+ before: None,
+ };
+
+ let res_next = driver.list_schemas(&args_next, limit).await?;
+
+ assert_eq!(res_next.edges.len(), 2);
+ assert_eq!(res_next.edges[0].node.schema_type, "payment");
+ assert_eq!(res_next.edges[1].node.schema_type, "refund");
+ Ok(())
+ }
+
+ #[sqlx::test(
+ migrator = "crate::MIGRATOR",
+ fixtures(path = "../../tests/fixtures", scripts("schema"))
+ )]
+ async fn test_backward_pagination(pool: PgPool) -> anyhow::Result<()> {
+ let get_count = 2;
+
+ let cursor = general_purpose::STANDARD.encode("4,refund,1.2.0");
+
+ let args = PaginationArgs {
+ first: None,
+ after: None,
+ last: Some(get_count),
+ before: Some(cursor),
+ };
+
+ let limit = 10;
+ let driver = SchemaService { database: pool };
+
+ let res = driver.list_schemas(&args, limit).await?;
+
+ assert_eq!(res.edges.len(), get_count as usize);
+ assert_eq!(res.edges[0].node.schema_type, "refund");
+ assert_eq!(res.edges[0].node.schema_version, "1.0.0");
+ assert_eq!(res.edges[1].node.schema_type, "payment");
+ assert_eq!(res.edges[1].node.schema_version, "1.1.0");
+ Ok(())
+ }
+}
diff --git a/lib/api-config/src/schema/mod.rs b/lib/api-config/src/schema/mod.rs
index d16ee9f..54bc015 100644
--- a/lib/api-config/src/schema/mod.rs
+++ b/lib/api-config/src/schema/mod.rs
@@ -2,6 +2,8 @@ mod create_schema;
mod delete_schema;
mod get_schema;
mod implementation;
+mod list_schemas;
+mod pagination;
mod update_schema;
pub use create_schema::CreateSchema;
use sqlx::PgPool;
@@ -10,11 +12,13 @@ use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use time::OffsetDateTime;
+use warden_core::pagination::Connection;
+use warden_core::pagination::PaginationArgs;
use crate::ConfigurationError;
-/// Transaction to monitor
-#[derive(Deserialize, Debug, Serialize)]
+/// Transaction's schema
+#[derive(Clone, Deserialize, Debug, Serialize)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
#[cfg_attr(feature = "utoipa", schema(example = json!({
"schemaType": "custom.schema",
@@ -49,6 +53,9 @@ use crate::ConfigurationError;
})))]
#[serde(rename_all = "camelCase")]
pub struct TransactionSchema {
+ /// Transaction schema id
+ #[serde(skip)]
+ pub id: i64,
/// Transaction schema type
pub schema_type: String,
/// The schema's version
@@ -63,8 +70,16 @@ pub struct TransactionSchema {
pub updated_at: OffsetDateTime,
}
+/// The type implementing [SchemaDriver]
+#[derive(Debug)]
pub struct SchemaService {
- pub database: PgPool,
+ pub(crate) database: PgPool,
+}
+
+impl SchemaService {
+ pub fn new(database: PgPool) -> Self {
+ Self { database }
+ }
}
#[async_trait]
@@ -91,12 +106,11 @@ pub trait SchemaDriver: Send + Sync {
schema: &serde_json::Value,
) -> Result<Option<TransactionSchema>, ConfigurationError>;
- async fn get_schemas(
+ async fn list_schemas(
&self,
+ input: &PaginationArgs,
limit: i64,
- first: Option<i64>,
- after: Option<&str>,
- ) -> Result<Vec<TransactionSchema>, ConfigurationError>;
+ ) -> Result<Connection<TransactionSchema>, ConfigurationError>;
}
#[cfg(test)]
@@ -110,7 +124,7 @@ mod tests {
fixtures(path = "../../tests/fixtures", scripts("schema"))
)]
async fn schema(pool: PgPool) -> anyhow::Result<()> {
- let driver = SchemaService { database: pool };
+ let driver = SchemaService::new(pool);
// 2. Define Fixtures
let kind = "fin_tx_v1";
@@ -131,6 +145,10 @@ mod tests {
.expect("Create failed");
assert_eq!(created.schema_type, kind);
+ // CREATE AGAIN SHOULD FAIL (CONFLICTING IDs)
+ let created = driver.create_schema(kind, version, &min_schema).await;
+ assert!(created.is_err());
+
// GET
let found = driver
.get_schema("payment", "1.0.0")
diff --git a/lib/api-config/src/schema/pagination.rs b/lib/api-config/src/schema/pagination.rs
new file mode 100644
index 0000000..caaf726
--- /dev/null
+++ b/lib/api-config/src/schema/pagination.rs
@@ -0,0 +1,9 @@
+#[derive(Debug)]
+pub struct DecodedSchemaPagination {
+ pub(crate) type_cursor: Option<String>,
+ pub(crate) version_cursor: Option<String>,
+ pub(crate) limit: i64,
+ pub(crate) base_limit: i64,
+ pub(crate) is_last: bool,
+ pub(crate) id: Option<i64>,
+}
diff --git a/lib/api-config/tests/fixtures/schema.sql b/lib/api-config/tests/fixtures/schema.sql
index 4ec082e..f948735 100644
--- a/lib/api-config/tests/fixtures/schema.sql
+++ b/lib/api-config/tests/fixtures/schema.sql
@@ -1,12 +1,27 @@
insert into transaction_schema (schema_type, schema_version, schema)
-values
+values
(
- 'payment',
- '1.0.0',
+ 'payment',
+ '1.0.0',
'{"type": "object", "required": ["amount", "currency"], "properties": {"amount": {"type": "number"}, "currency": {"type": "string"}}}'
),
(
- 'refund',
- '1.0.0',
+ 'refund',
+ '1.0.0',
'{"type": "object", "required": ["original_txn_id"], "properties": {"original_txn_id": {"type": "string"}}}'
+),
+(
+ 'payment',
+ '1.1.0',
+ '{}'
+),
+(
+ 'refund',
+ '1.2.0',
+ '{}'
+),
+(
+ 'refund',
+ '1.3.0',
+ '{}'
);
diff --git a/lib/warden-core/Cargo.toml b/lib/warden-core/Cargo.toml
index 49b7186..c9b1aa7 100644
--- a/lib/warden-core/Cargo.toml
+++ b/lib/warden-core/Cargo.toml
@@ -9,12 +9,14 @@ homepage.workspace = true
publish.workspace = true
[dependencies]
+base64.workspace = true
clap = { workspace = true, features = ["derive", "env"] }
serde = { workspace = true, features = ["derive"] }
thiserror.workspace = true
tracing.workspace = true
tracing-subscriber = { version = "0.3.23", features = ["env-filter"] }
url = { workspace = true, features = ["serde"] }
+utoipa.workspace = true
[dependencies.sqlx]
workspace = true
diff --git a/lib/warden-core/src/config/cli/database.rs b/lib/warden-core/src/config/cli/database.rs
index 31ba930..70bf600 100644
--- a/lib/warden-core/src/config/cli/database.rs
+++ b/lib/warden-core/src/config/cli/database.rs
@@ -117,7 +117,7 @@ impl Database {
}
let host = "localhost".to_owned();
- let host = self.database_host.as_ref().unwrap_or_else(|| &host);
+ let host = self.database_host.as_ref().unwrap_or(&host);
let mut url = Url::parse(&format!("postgres://{host}"))?;
if let Some(ref u) = self.database_username {
diff --git a/lib/warden-core/src/error.rs b/lib/warden-core/src/error.rs
index d90a862..e347407 100644
--- a/lib/warden-core/src/error.rs
+++ b/lib/warden-core/src/error.rs
@@ -8,6 +8,10 @@ pub enum WardenError {
Migration(#[from] sqlx::migrate::MigrateError),
#[error(transparent)]
Url(#[from] url::ParseError),
+ #[error(transparent)]
+ Pagination(#[from] base64::DecodeError),
+ #[error(transparent)]
+ PaginationCursor(#[from] std::string::FromUtf8Error),
#[error("Missing required configuration values:\n`{0}`")]
Config(String),
#[error("invalid header (expected {expected:?}, found {found:?})")]
diff --git a/lib/warden-core/src/lib.rs b/lib/warden-core/src/lib.rs
index f200ba1..627659e 100644
--- a/lib/warden-core/src/lib.rs
+++ b/lib/warden-core/src/lib.rs
@@ -1,3 +1,4 @@
mod error;
pub use error::WardenError;
pub mod config;
+pub mod pagination;
diff --git a/lib/warden-core/src/pagination/mod.rs b/lib/warden-core/src/pagination/mod.rs
new file mode 100644
index 0000000..25fb083
--- /dev/null
+++ b/lib/warden-core/src/pagination/mod.rs
@@ -0,0 +1,58 @@
+use serde::{Deserialize, Serialize};
+use utoipa::ToSchema;
+
+/// Arguments used for cursor-based pagination.
+#[derive(Deserialize, Debug, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct PaginationArgs {
+ /// Returns the first `n` items from the list.
+ pub first: Option<i64>,
+
+ /// A cursor pointing to the position after which items should be returned.
+ pub after: Option<String>,
+
+ /// Returns the last `n` items from the list.
+ pub last: Option<i64>,
+
+ /// A cursor pointing to the position before which items should be returned.
+ pub before: Option<String>,
+}
+
+/// Metadata describing the current page of results.
+#[derive(Serialize, Debug, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct PageInfo {
+ /// Indicates whether there are more items when paginating forward.
+ pub has_next_page: bool,
+
+ /// Indicates whether there are more items when paginating backward.
+ pub has_previous_page: bool,
+
+ /// The cursor corresponding to the first item in the current page.
+ pub start_cursor: Option<String>,
+
+ /// The cursor corresponding to the last item in the current page.
+ pub end_cursor: Option<String>,
+}
+
+/// A paginated connection containing edges and pagination metadata.
+#[derive(Serialize, Debug, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct Connection<T> {
+ /// A list of edges, each containing a node and its cursor.
+ pub edges: Vec<Edge<T>>,
+
+ /// Information about pagination for this connection.
+ pub page_info: PageInfo,
+}
+
+/// An edge in a connection, representing a node and its cursor.
+#[derive(Serialize, Debug, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct Edge<T> {
+ /// The item/node contained in this edge.
+ pub node: T,
+
+ /// A cursor for this node, used in pagination.
+ pub cursor: String,
+}
diff --git a/migrations/20260329120645_transaction_schema.sql b/migrations/20260329120645_transaction_schema.sql
index 8496c80..7868591 100644
--- a/migrations/20260329120645_transaction_schema.sql
+++ b/migrations/20260329120645_transaction_schema.sql
@@ -1,6 +1,7 @@
-- Add migration script here
-- The transaction's blueprint to be checked on each request
create table transaction_schema (
+ id bigserial primary key,
-- The transaction type
schema_type text not null,
-- The schema's version (to allow for multiple revisions - maybe)
@@ -9,7 +10,7 @@ create table transaction_schema (
schema jsonb not null,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now(),
- primary key (schema_type, schema_version)
+ unique (schema_type, schema_version)
);
create trigger update_transaction_schema_modtime
diff --git a/warden/Cargo.toml b/warden/Cargo.toml
index d4dcf8b..47cd8b6 100644
--- a/warden/Cargo.toml
+++ b/warden/Cargo.toml
@@ -9,7 +9,7 @@ homepage.workspace = true
[dependencies]
anyhow.workspace = true
axum = { version = "0.8.8", features = ["macros"] }
-base64 = "0.22.1"
+base64.workspace = true
clap.workspace = true
jsonschema.workspace = true
secrecy = { version = "0.10.3", features = ["serde"] }
diff --git a/warden/src/main.rs b/warden/src/main.rs
index e155dee..59068f5 100644
--- a/warden/src/main.rs
+++ b/warden/src/main.rs
@@ -37,9 +37,7 @@ async fn main() -> anyhow::Result<()> {
let (log_handle, _guard) =
logging::initialise_logging(&config.server.log_level, &config.server.log_dir);
- let schema = SchemaService {
- database: state::database::connect(&config.database).await?,
- };
+ let schema = SchemaService::new(state::database::connect(&config.database).await?);
let schema = Arc::new(schema);
let state = state::AppState::new(log_handle, schema).await?;
diff --git a/warden/src/server/api/mod.rs b/warden/src/server/api/mod.rs
index f5aa0c8..963a3ef 100644
--- a/warden/src/server/api/mod.rs
+++ b/warden/src/server/api/mod.rs
@@ -1,4 +1,3 @@
-pub mod pagination;
pub mod transaction;
pub mod version;
diff --git a/warden/src/server/api/pagination.rs b/warden/src/server/api/pagination.rs
deleted file mode 100644
index 7341d2f..0000000
--- a/warden/src/server/api/pagination.rs
+++ /dev/null
@@ -1,21 +0,0 @@
-use api_config::schema::TransactionSchema;
-use serde::{Deserialize, Serialize};
-use utoipa::{IntoParams, ToSchema};
-
-#[derive(Deserialize, Debug, IntoParams, ToSchema)]
-pub struct PaginationParams {
- pub first: Option<i64>,
- pub after: Option<String>, // This is our Base64 cursor
-}
-
-#[derive(Serialize, ToSchema)]
-pub struct PageInfo {
- pub has_next_page: bool,
- pub end_cursor: Option<String>,
-}
-
-#[derive(Serialize, ToSchema)]
-pub struct RelayResponse {
- pub nodes: Vec<TransactionSchema>,
- pub page_info: PageInfo,
-}
diff --git a/warden/src/server/api/transaction.rs b/warden/src/server/api/transaction.rs
index 03f8a1f..7db8528 100644
--- a/warden/src/server/api/transaction.rs
+++ b/warden/src/server/api/transaction.rs
@@ -4,6 +4,7 @@ use utoipa::ToSchema;
#[derive(Deserialize, Serialize, ToSchema)]
/// Transaction to monitor
+#[serde(rename_all = "camelCase")]
pub struct Transaction {
/// Transaction schema type
pub schema_type: String,
diff --git a/warden/src/server/mod.rs b/warden/src/server/mod.rs
index fe93352..92ac12d 100644
--- a/warden/src/server/mod.rs
+++ b/warden/src/server/mod.rs
@@ -18,7 +18,11 @@ use crate::{
config::Configuration,
server::{
middleware::request_id::{REQUEST_ID_HEADER, middleware_request_id},
- routes::{ApiDoc, config::ConfigDoc, transaction_monitoring::MonitoringDoc},
+ routes::{
+ ApiDoc,
+ config::{ConfigDoc, schema::SchemaDoc},
+ transaction_monitoring::MonitoringDoc,
+ },
},
state::AppState,
};
@@ -32,6 +36,7 @@ pub async fn router(state: Arc<AppState>, config: &Configuration) -> Router<()>
let mut doc = ApiDoc::openapi();
doc.merge(ConfigDoc::openapi());
+ doc.merge(SchemaDoc::openapi());
doc.merge(MonitoringDoc::openapi());
let stubs = OpenApiRouter::with_openapi(doc)
diff --git a/warden/src/server/routes/config/schema/create.rs b/warden/src/server/routes/config/schema/create.rs
index aa22546..767ff3f 100644
--- a/warden/src/server/routes/config/schema/create.rs
+++ b/warden/src/server/routes/config/schema/create.rs
@@ -13,7 +13,7 @@ use crate::{
server::{
api::version::{Version, VersionPath},
error::AppError,
- routes::config::CONFIG,
+ routes::config::schema::SCHEMA,
},
state::AppState,
};
@@ -60,7 +60,7 @@ use crate::{
)
),
operation_id = "create_schema", // https://github.com/juhaku/utoipa/issues/1170
- tag = CONFIG,
+ tag = SCHEMA,
request_body(
content = CreateSchema
),
diff --git a/warden/src/server/routes/config/schema/delete.rs b/warden/src/server/routes/config/schema/delete.rs
index 98fd581..55577ae 100644
--- a/warden/src/server/routes/config/schema/delete.rs
+++ b/warden/src/server/routes/config/schema/delete.rs
@@ -1,6 +1,5 @@
use std::sync::Arc;
-use api_config::schema::SchemaDriver;
use axum::{
debug_handler,
extract::{Path, Query, State},
@@ -14,7 +13,7 @@ use crate::{
server::{
api::version::{Version, VersionPath},
error::AppError,
- routes::config::CONFIG,
+ routes::config::schema::SCHEMA,
},
state::AppState,
};
@@ -65,7 +64,7 @@ pub struct SchemaDeleteQuery {
)
),
operation_id = "delete_schema", // https://github.com/juhaku/utoipa/issues/1170
- tag = CONFIG,
+ tag = SCHEMA,
path = "/{apiVersion}/config/schema",
params(VersionPath, SchemaDeleteQuery),
)]
diff --git a/warden/src/server/routes/config/schema/mod.rs b/warden/src/server/routes/config/schema/mod.rs
index 8c84f10..17db5ce 100644
--- a/warden/src/server/routes/config/schema/mod.rs
+++ b/warden/src/server/routes/config/schema/mod.rs
@@ -1,5 +1,6 @@
use std::sync::Arc;
+use utoipa::OpenApi;
use utoipa_axum::router::OpenApiRouter;
use crate::state::AppState;
@@ -9,6 +10,12 @@ pub mod delete;
pub mod read;
pub mod update;
+const SCHEMA: &str = "Schema";
+
+#[derive(OpenApi)]
+#[openapi(tags((name = SCHEMA, description = "JSON schemas that each monitoring request is validated against")))]
+pub struct SchemaDoc;
+
pub fn router(store: Arc<AppState>) -> OpenApiRouter {
OpenApiRouter::new()
.routes(utoipa_axum::routes!(create::create_schema))
diff --git a/warden/src/server/routes/config/schema/read.rs b/warden/src/server/routes/config/schema/read.rs
index c89583b..2d12935 100644
--- a/warden/src/server/routes/config/schema/read.rs
+++ b/warden/src/server/routes/config/schema/read.rs
@@ -1,23 +1,20 @@
use std::sync::Arc;
-use api_config::schema::{SchemaDriver, TransactionSchema};
+use api_config::schema::TransactionSchema;
use axum::{
Json, debug_handler,
extract::{Path, Query, State},
http::{HeaderMap, StatusCode},
response::IntoResponse,
};
-use base64::{Engine, engine::general_purpose};
use tracing::debug;
+use warden_core::pagination::{Connection, PaginationArgs};
use crate::{
server::{
- api::{
- pagination::{PageInfo, PaginationParams, RelayResponse},
- version::{Version, VersionPath},
- },
+ api::version::{Version, VersionPath},
error::AppError,
- routes::config::CONFIG,
+ routes::config::schema::SCHEMA,
},
state::AppState,
};
@@ -64,7 +61,7 @@ use crate::{
)
),
operation_id = "get_schema", // https://github.com/juhaku/utoipa/issues/1170
- tag = CONFIG,
+ tag = SCHEMA,
path = "/{apiVersion}/config/schema/{schemaType}/{schemaVersion}",
params(
VersionPath,
@@ -116,7 +113,7 @@ pub async fn get_schema(
}
}
-/// Get transaction schemas
+/// List transaction schemas
#[utoipa::path(
get,
responses(
@@ -126,7 +123,7 @@ pub async fn get_schema(
headers(
("x-request-id" = Uuid, description = "Request identifier")
),
- body = RelayResponse
+ body = Connection<TransactionSchema>
),
(
status = 400,
@@ -158,7 +155,7 @@ pub async fn get_schema(
)
),
operation_id = "get_schemas", // https://github.com/juhaku/utoipa/issues/1170
- tag = CONFIG,
+ tag = SCHEMA,
path = "/{apiVersion}/config/schema",
params(VersionPath),
)]
@@ -175,22 +172,15 @@ pub async fn get_schemas(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
Path(version): Path<Version>,
- params: Query<PaginationParams>,
+ params: Query<PaginationArgs>,
) -> Result<impl IntoResponse, AppError> {
debug!("searching for schema");
- let mut cursor = None;
let limit = 10;
- // 1. Decode Cursor
- if let Some(ref cursor_str) = params.after {
- let decoded = general_purpose::STANDARD.decode(cursor_str).unwrap();
- cursor = Some(String::from_utf8(decoded).unwrap());
- }
-
// TODO: get from cache
let rows = state
.schema_service
- .get_schemas(limit, params.first, cursor.as_deref())
+ .list_schemas(&params, limit)
.await
.map_err(|e| match e {
api_config::ConfigurationError::Database(ref error) => match error {
@@ -205,20 +195,5 @@ pub async fn get_schemas(
_ => e.into(),
})?;
- // 3. Process Relay Metadata
- let has_next_page = rows.len() > limit as usize;
- let nodes: Vec<TransactionSchema> = rows.into_iter().take(limit as usize).collect();
-
- let end_cursor = nodes.last().map(|node| {
- let raw_cursor = format!("{},{}", node.schema_type, node.schema_version);
- general_purpose::STANDARD.encode(raw_cursor)
- });
-
- Ok(Json(RelayResponse {
- nodes,
- page_info: PageInfo {
- has_next_page,
- end_cursor,
- },
- }))
+ Ok(Json(rows))
}
diff --git a/warden/src/server/routes/config/schema/update.rs b/warden/src/server/routes/config/schema/update.rs
index 13f9fe9..ff518a7 100644
--- a/warden/src/server/routes/config/schema/update.rs
+++ b/warden/src/server/routes/config/schema/update.rs
@@ -1,6 +1,6 @@
use std::sync::Arc;
-use api_config::schema::{CreateSchema, SchemaDriver, TransactionSchema};
+use api_config::schema::{CreateSchema, TransactionSchema};
use axum::{
Json, debug_handler,
extract::{Path, State},
@@ -12,7 +12,7 @@ use crate::{
server::{
api::version::{Version, VersionPath},
error::AppError,
- routes::config::CONFIG,
+ routes::config::schema::SCHEMA,
},
state::AppState,
};
@@ -59,7 +59,7 @@ use crate::{
)
),
operation_id = "update_schema", // https://github.com/juhaku/utoipa/issues/1170
- tag = CONFIG,
+ tag = SCHEMA,
request_body(
content = CreateSchema
),
diff --git a/warden/src/server/routes/transaction_monitoring/monitor.rs b/warden/src/server/routes/transaction_monitoring/monitor.rs
index 66075f5..44cc02c 100644
--- a/warden/src/server/routes/transaction_monitoring/monitor.rs
+++ b/warden/src/server/routes/transaction_monitoring/monitor.rs
@@ -56,8 +56,8 @@ use axum::{
]
pub async fn reload(
- State(state): State<Arc<AppState>>,
- Path(version): Path<Version>,
+ State(_state): State<Arc<AppState>>,
+ Path(_version): Path<Version>,
ValidatedTransaction(body): ValidatedTransaction<serde_json::Value>,
) -> StatusCode {
dbg!(&body);
diff --git a/warden/src/state/mod.rs b/warden/src/state/mod.rs
index 7dfcdc4..a6b36e1 100644
--- a/warden/src/state/mod.rs
+++ b/warden/src/state/mod.rs
@@ -2,11 +2,8 @@ pub(crate) mod database;
use std::sync::Arc;
use api_config::schema::SchemaDriver;
-use tracing::{debug, trace};
use tracing_subscriber::EnvFilter;
-use crate::config::Configuration;
-
pub type LogHandle = tracing_subscriber::reload::Handle<EnvFilter, tracing_subscriber::Registry>;
#[derive(Clone)]