From daeb5311840680599a0ce6e49d181b9289010f68 Mon Sep 17 00:00:00 2001 From: rtkay123 Date: Wed, 1 Apr 2026 09:05:33 +0200 Subject: feat(schema): cursor pagination --- lib/api-config/src/schema/list_schemas.rs | 213 ++++++++++++++++++++++++++++++ 1 file changed, 213 insertions(+) create mode 100644 lib/api-config/src/schema/list_schemas.rs (limited to 'lib/api-config/src/schema/list_schemas.rs') diff --git a/lib/api-config/src/schema/list_schemas.rs b/lib/api-config/src/schema/list_schemas.rs new file mode 100644 index 0000000..0b539ed --- /dev/null +++ b/lib/api-config/src/schema/list_schemas.rs @@ -0,0 +1,213 @@ +use base64::{Engine, engine::general_purpose}; +use tracing::debug; +use warden_core::pagination::{Connection, Edge, PageInfo, PaginationArgs}; + +use crate::{ + ConfigurationError, + schema::{SchemaService, TransactionSchema, pagination::DecodedSchemaPagination}, +}; + +fn decode_cursors( + args: &PaginationArgs, + limit: i64, +) -> Result { + let limit = args + .first + .or(args.last) + .map(|value| if value > limit { limit } else { value }) + .unwrap_or(limit); + + // Fetch limit + 1 to check if there's a next/prev page + let query_limit = limit + 1; + + let (id, type_cursor, ver_cursor) = + if let Some(cursor) = args.after.as_ref().or(args.before.as_ref()) { + let decoded = general_purpose::STANDARD.decode(cursor)?; + let s = String::from_utf8(decoded)?; + let parts: Vec<&str> = s.split(',').collect(); + ( + Some(parts[0].parse()?), + Some(parts[1].to_string()), + Some(parts[2].to_string()), + ) + } else { + (None, None, None) + }; + + Ok(DecodedSchemaPagination { + type_cursor, + version_cursor: ver_cursor, + limit: query_limit, + is_last: args.last.is_some(), + base_limit: limit, + id, + }) +} + +pub(super) async fn list_schemas( + state: &SchemaService, + args: &PaginationArgs, + limit: i64, +) -> Result, ConfigurationError> { + debug!("listing schemas transaction schema"); + + let input = decode_cursors(args, limit)?; + + let rows = if input.is_last { + // BACKWARD PAGINATION + sqlx::query_as!( + TransactionSchema, + " + select * from ( + select * from transaction_schema + where ($1::bigint is null or (id, schema_type, schema_version) < ($1, $2, $3)) + order by id desc, schema_type desc, schema_version desc + limit $4 + ) sub + order by id asc, schema_type asc, schema_version asc + ", + input.id, + input.type_cursor, + input.version_cursor, + input.limit + ) + .fetch_all(&state.database) + .await? + } else { + // FORWARD PAGINATION (Default) + sqlx::query_as!( + TransactionSchema, + " + select * from transaction_schema + where ($1::bigint is null or (id, schema_type, schema_version) > ($1, $2, $3)) + order by id asc, schema_type asc, schema_version asc + limit $4 + ", + input.id, + input.type_cursor, + input.version_cursor, + input.limit + ) + .fetch_all(&state.database) + .await? + }; + + let res = encode_cursors(&rows, &input)?; + + Ok(res) +} + +fn encode_cursors( + rows: &[TransactionSchema], + args: &DecodedSchemaPagination, +) -> Result, ConfigurationError> { + let has_extra = rows.len() > args.base_limit as usize; + let final_rows = if has_extra { + if args.is_last { + &rows[1..] + } else { + &rows[..args.base_limit as usize] + } + } else { + rows + }; + + let edges = final_rows + .iter() + .map(|row| { + let raw_cursor = format!("{},{},{}", row.id, row.schema_type, row.schema_version); + Edge { + node: row.clone(), + cursor: general_purpose::STANDARD.encode(raw_cursor), + } + }) + .collect::>(); + + let page_info = PageInfo { + has_next_page: if args.is_last { false } else { has_extra }, + has_previous_page: if args.is_last { has_extra } else { false }, + start_cursor: edges.first().map(|e| e.cursor.clone()), + end_cursor: edges.last().map(|e| e.cursor.clone()), + }; + + Ok(Connection { edges, page_info }) +} + +#[cfg(test)] +mod tests { + use base64::{Engine, engine::general_purpose}; + use sqlx::PgPool; + use warden_core::pagination::PaginationArgs; + + use crate::schema::{SchemaDriver, SchemaService}; + + #[sqlx::test( + migrator = "crate::MIGRATOR", + fixtures(path = "../../tests/fixtures", scripts("schema")) + )] + async fn test_forward_pagination(pool: PgPool) -> anyhow::Result<()> { + let get_count = 2; + + // 1. Request first 2 + let args = PaginationArgs { + first: Some(get_count), + after: None, + last: None, + before: None, + }; + let limit = 10; + let driver = SchemaService { database: pool }; + + let res = driver.list_schemas(&args, limit).await?; + + assert_eq!(res.edges.len(), get_count as usize); + dbg!(&res); + assert_eq!(res.edges[0].node.schema_type, "payment"); + assert_eq!(res.edges[0].node.schema_version, "1.0.0"); + assert!(res.page_info.has_next_page); + + // 2. Request next 2 using the end_cursor + let args_next = PaginationArgs { + first: Some(2), + after: res.page_info.end_cursor, + last: None, + before: None, + }; + + let res_next = driver.list_schemas(&args_next, limit).await?; + + assert_eq!(res_next.edges.len(), 2); + assert_eq!(res_next.edges[0].node.schema_type, "payment"); + assert_eq!(res_next.edges[1].node.schema_type, "refund"); + Ok(()) + } + + #[sqlx::test( + migrator = "crate::MIGRATOR", + fixtures(path = "../../tests/fixtures", scripts("schema")) + )] + async fn test_backward_pagination(pool: PgPool) -> anyhow::Result<()> { + let get_count = 2; + + let cursor = general_purpose::STANDARD.encode("4,refund,1.2.0"); + + let args = PaginationArgs { + first: None, + after: None, + last: Some(get_count), + before: Some(cursor), + }; + + let limit = 10; + let driver = SchemaService { database: pool }; + + let res = driver.list_schemas(&args, limit).await?; + + assert_eq!(res.edges.len(), get_count as usize); + assert_eq!(res.edges[0].node.schema_type, "refund"); + assert_eq!(res.edges[0].node.schema_version, "1.0.0"); + assert_eq!(res.edges[1].node.schema_type, "payment"); + assert_eq!(res.edges[1].node.schema_version, "1.1.0"); + Ok(()) + } +} -- cgit v1.2.3