aboutsummaryrefslogtreecommitdiffstats
path: root/lib/api-config/src/schema/list_schemas.rs
diff options
context:
space:
mode:
Diffstat (limited to 'lib/api-config/src/schema/list_schemas.rs')
-rw-r--r--lib/api-config/src/schema/list_schemas.rs213
1 files changed, 213 insertions, 0 deletions
diff --git a/lib/api-config/src/schema/list_schemas.rs b/lib/api-config/src/schema/list_schemas.rs
new file mode 100644
index 0000000..0b539ed
--- /dev/null
+++ b/lib/api-config/src/schema/list_schemas.rs
@@ -0,0 +1,213 @@
+use base64::{Engine, engine::general_purpose};
+use tracing::debug;
+use warden_core::pagination::{Connection, Edge, PageInfo, PaginationArgs};
+
+use crate::{
+ ConfigurationError,
+ schema::{SchemaService, TransactionSchema, pagination::DecodedSchemaPagination},
+};
+
+fn decode_cursors(
+ args: &PaginationArgs,
+ limit: i64,
+) -> Result<DecodedSchemaPagination, ConfigurationError> {
+ let limit = args
+ .first
+ .or(args.last)
+ .map(|value| if value > limit { limit } else { value })
+ .unwrap_or(limit);
+
+ // Fetch limit + 1 to check if there's a next/prev page
+ let query_limit = limit + 1;
+
+ let (id, type_cursor, ver_cursor) =
+ if let Some(cursor) = args.after.as_ref().or(args.before.as_ref()) {
+ let decoded = general_purpose::STANDARD.decode(cursor)?;
+ let s = String::from_utf8(decoded)?;
+ let parts: Vec<&str> = s.split(',').collect();
+ (
+ Some(parts[0].parse()?),
+ Some(parts[1].to_string()),
+ Some(parts[2].to_string()),
+ )
+ } else {
+ (None, None, None)
+ };
+
+ Ok(DecodedSchemaPagination {
+ type_cursor,
+ version_cursor: ver_cursor,
+ limit: query_limit,
+ is_last: args.last.is_some(),
+ base_limit: limit,
+ id,
+ })
+}
+
+pub(super) async fn list_schemas(
+ state: &SchemaService,
+ args: &PaginationArgs,
+ limit: i64,
+) -> Result<Connection<TransactionSchema>, ConfigurationError> {
+ debug!("listing schemas transaction schema");
+
+ let input = decode_cursors(args, limit)?;
+
+ let rows = if input.is_last {
+ // BACKWARD PAGINATION
+ sqlx::query_as!(
+ TransactionSchema,
+ "
+ select * from (
+ select * from transaction_schema
+ where ($1::bigint is null or (id, schema_type, schema_version) < ($1, $2, $3))
+ order by id desc, schema_type desc, schema_version desc
+ limit $4
+ ) sub
+ order by id asc, schema_type asc, schema_version asc
+ ",
+ input.id,
+ input.type_cursor,
+ input.version_cursor,
+ input.limit
+ )
+ .fetch_all(&state.database)
+ .await?
+ } else {
+ // FORWARD PAGINATION (Default)
+ sqlx::query_as!(
+ TransactionSchema,
+ "
+ select * from transaction_schema
+ where ($1::bigint is null or (id, schema_type, schema_version) > ($1, $2, $3))
+ order by id asc, schema_type asc, schema_version asc
+ limit $4
+ ",
+ input.id,
+ input.type_cursor,
+ input.version_cursor,
+ input.limit
+ )
+ .fetch_all(&state.database)
+ .await?
+ };
+
+ let res = encode_cursors(&rows, &input)?;
+
+ Ok(res)
+}
+
+fn encode_cursors(
+ rows: &[TransactionSchema],
+ args: &DecodedSchemaPagination,
+) -> Result<Connection<TransactionSchema>, ConfigurationError> {
+ let has_extra = rows.len() > args.base_limit as usize;
+ let final_rows = if has_extra {
+ if args.is_last {
+ &rows[1..]
+ } else {
+ &rows[..args.base_limit as usize]
+ }
+ } else {
+ rows
+ };
+
+ let edges = final_rows
+ .iter()
+ .map(|row| {
+ let raw_cursor = format!("{},{},{}", row.id, row.schema_type, row.schema_version);
+ Edge {
+ node: row.clone(),
+ cursor: general_purpose::STANDARD.encode(raw_cursor),
+ }
+ })
+ .collect::<Vec<_>>();
+
+ let page_info = PageInfo {
+ has_next_page: if args.is_last { false } else { has_extra },
+ has_previous_page: if args.is_last { has_extra } else { false },
+ start_cursor: edges.first().map(|e| e.cursor.clone()),
+ end_cursor: edges.last().map(|e| e.cursor.clone()),
+ };
+
+ Ok(Connection { edges, page_info })
+}
+
+#[cfg(test)]
+mod tests {
+ use base64::{Engine, engine::general_purpose};
+ use sqlx::PgPool;
+ use warden_core::pagination::PaginationArgs;
+
+ use crate::schema::{SchemaDriver, SchemaService};
+
+ #[sqlx::test(
+ migrator = "crate::MIGRATOR",
+ fixtures(path = "../../tests/fixtures", scripts("schema"))
+ )]
+ async fn test_forward_pagination(pool: PgPool) -> anyhow::Result<()> {
+ let get_count = 2;
+
+ // 1. Request first 2
+ let args = PaginationArgs {
+ first: Some(get_count),
+ after: None,
+ last: None,
+ before: None,
+ };
+ let limit = 10;
+ let driver = SchemaService { database: pool };
+
+ let res = driver.list_schemas(&args, limit).await?;
+
+ assert_eq!(res.edges.len(), get_count as usize);
+ dbg!(&res);
+ assert_eq!(res.edges[0].node.schema_type, "payment");
+ assert_eq!(res.edges[0].node.schema_version, "1.0.0");
+ assert!(res.page_info.has_next_page);
+
+ // 2. Request next 2 using the end_cursor
+ let args_next = PaginationArgs {
+ first: Some(2),
+ after: res.page_info.end_cursor,
+ last: None,
+ before: None,
+ };
+
+ let res_next = driver.list_schemas(&args_next, limit).await?;
+
+ assert_eq!(res_next.edges.len(), 2);
+ assert_eq!(res_next.edges[0].node.schema_type, "payment");
+ assert_eq!(res_next.edges[1].node.schema_type, "refund");
+ Ok(())
+ }
+
+ #[sqlx::test(
+ migrator = "crate::MIGRATOR",
+ fixtures(path = "../../tests/fixtures", scripts("schema"))
+ )]
+ async fn test_backward_pagination(pool: PgPool) -> anyhow::Result<()> {
+ let get_count = 2;
+
+ let cursor = general_purpose::STANDARD.encode("4,refund,1.2.0");
+
+ let args = PaginationArgs {
+ first: None,
+ after: None,
+ last: Some(get_count),
+ before: Some(cursor),
+ };
+
+ let limit = 10;
+ let driver = SchemaService { database: pool };
+
+ let res = driver.list_schemas(&args, limit).await?;
+
+ assert_eq!(res.edges.len(), get_count as usize);
+ assert_eq!(res.edges[0].node.schema_type, "refund");
+ assert_eq!(res.edges[0].node.schema_version, "1.0.0");
+ assert_eq!(res.edges[1].node.schema_type, "payment");
+ assert_eq!(res.edges[1].node.schema_version, "1.1.0");
+ Ok(())
+ }
+}