use base64::{Engine, engine::general_purpose}; use tracing::debug; use warden_core::pagination::{Connection, Edge, PageInfo, PaginationArgs}; use crate::{ ConfigurationError, schema::{SchemaService, TransactionSchema, pagination::DecodedSchemaPagination}, }; fn decode_cursors( args: &PaginationArgs, limit: i64, ) -> Result { let limit = args .first .or(args.last) .map(|value| if value > limit { limit } else { value }) .unwrap_or(limit); // Fetch limit + 1 to check if there's a next/prev page let query_limit = limit + 1; let (id, type_cursor, ver_cursor) = if let Some(cursor) = args.after.as_ref().or(args.before.as_ref()) { let decoded = general_purpose::STANDARD.decode(cursor)?; let s = String::from_utf8(decoded)?; let parts: Vec<&str> = s.split(',').collect(); ( Some(parts[0].parse()?), Some(parts[1].to_string()), Some(parts[2].to_string()), ) } else { (None, None, None) }; Ok(DecodedSchemaPagination { type_cursor, version_cursor: ver_cursor, limit: query_limit, is_last: args.last.is_some(), base_limit: limit, id, }) } pub(super) async fn list_schemas( state: &SchemaService, args: &PaginationArgs, limit: i64, ) -> Result, ConfigurationError> { debug!("listing schemas transaction schema"); let input = decode_cursors(args, limit)?; let rows = if input.is_last { // BACKWARD PAGINATION sqlx::query_as!( TransactionSchema, " select * from ( select * from transaction_schema where ($1::bigint is null or (id, schema_type, schema_version) < ($1, $2, $3)) order by id desc, schema_type desc, schema_version desc limit $4 ) sub order by id asc, schema_type asc, schema_version asc ", input.id, input.type_cursor, input.version_cursor, input.limit ) .fetch_all(&state.database) .await? } else { // FORWARD PAGINATION (Default) sqlx::query_as!( TransactionSchema, " select * from transaction_schema where ($1::bigint is null or (id, schema_type, schema_version) > ($1, $2, $3)) order by id asc, schema_type asc, schema_version asc limit $4 ", input.id, input.type_cursor, input.version_cursor, input.limit ) .fetch_all(&state.database) .await? }; let res = encode_cursors(&rows, &input)?; Ok(res) } fn encode_cursors( rows: &[TransactionSchema], args: &DecodedSchemaPagination, ) -> Result, ConfigurationError> { let has_extra = rows.len() > args.base_limit as usize; let final_rows = if has_extra { if args.is_last { &rows[1..] } else { &rows[..args.base_limit as usize] } } else { rows }; let edges = final_rows .iter() .map(|row| { let raw_cursor = format!("{},{},{}", row.id, row.schema_type, row.schema_version); Edge { node: row.clone(), cursor: general_purpose::STANDARD.encode(raw_cursor), } }) .collect::>(); let page_info = PageInfo { has_next_page: if args.is_last { false } else { has_extra }, has_previous_page: if args.is_last { has_extra } else { false }, start_cursor: edges.first().map(|e| e.cursor.clone()), end_cursor: edges.last().map(|e| e.cursor.clone()), }; Ok(Connection { edges, page_info }) } #[cfg(test)] mod tests { use base64::{Engine, engine::general_purpose}; use sqlx::PgPool; use warden_core::pagination::PaginationArgs; use crate::schema::{SchemaDriver, SchemaService}; #[sqlx::test( migrator = "crate::MIGRATOR", fixtures(path = "../../tests/fixtures", scripts("schema")) )] async fn test_forward_pagination(pool: PgPool) -> anyhow::Result<()> { let get_count = 2; // 1. Request first 2 let args = PaginationArgs { first: Some(get_count), after: None, last: None, before: None, }; let limit = 10; let driver = SchemaService { database: pool }; let res = driver.list_schemas(&args, limit).await?; assert_eq!(res.edges.len(), get_count as usize); dbg!(&res); assert_eq!(res.edges[0].node.schema_type, "payment"); assert_eq!(res.edges[0].node.schema_version, "1.0.0"); assert!(res.page_info.has_next_page); // 2. Request next 2 using the end_cursor let args_next = PaginationArgs { first: Some(2), after: res.page_info.end_cursor, last: None, before: None, }; let res_next = driver.list_schemas(&args_next, limit).await?; assert_eq!(res_next.edges.len(), 2); assert_eq!(res_next.edges[0].node.schema_type, "payment"); assert_eq!(res_next.edges[1].node.schema_type, "refund"); Ok(()) } #[sqlx::test( migrator = "crate::MIGRATOR", fixtures(path = "../../tests/fixtures", scripts("schema")) )] async fn test_backward_pagination(pool: PgPool) -> anyhow::Result<()> { let get_count = 2; let cursor = general_purpose::STANDARD.encode("4,refund,1.2.0"); let args = PaginationArgs { first: None, after: None, last: Some(get_count), before: Some(cursor), }; let limit = 10; let driver = SchemaService { database: pool }; let res = driver.list_schemas(&args, limit).await?; assert_eq!(res.edges.len(), get_count as usize); assert_eq!(res.edges[0].node.schema_type, "refund"); assert_eq!(res.edges[0].node.schema_version, "1.0.0"); assert_eq!(res.edges[1].node.schema_type, "payment"); assert_eq!(res.edges[1].node.schema_version, "1.1.0"); Ok(()) } }