diff options
Diffstat (limited to 'crates/configuration')
13 files changed, 462 insertions, 0 deletions
diff --git a/crates/configuration/migrations/20250816073400_typology.sql b/crates/configuration/migrations/20250816073400_typology.sql new file mode 100644 index 0000000..85f8b14 --- /dev/null +++ b/crates/configuration/migrations/20250816073400_typology.sql @@ -0,0 +1,11 @@ +create table typology ( + uuid uuid primary key, + configuration jsonb not null, + id text generated always as ( + configuration->>'id' + ) stored, + version text generated always as ( + configuration->>'version' + ) stored, + unique (id, version) +); diff --git a/crates/configuration/src/server/http_svc.rs b/crates/configuration/src/server/http_svc.rs index b07aece..423d67d 100644 --- a/crates/configuration/src/server/http_svc.rs +++ b/crates/configuration/src/server/http_svc.rs @@ -12,6 +12,7 @@ use crate::state::AppHandle; const TAG_ROUTING: &str = "Routing"; const TAG_RULES: &str = "Rules"; +const TAG_TYPOLOGIES: &str = "Typologies"; #[derive(OpenApi)] #[openapi( diff --git a/crates/configuration/src/server/http_svc/routes.rs b/crates/configuration/src/server/http_svc/routes.rs index 7663da2..92f3184 100644 --- a/crates/configuration/src/server/http_svc/routes.rs +++ b/crates/configuration/src/server/http_svc/routes.rs @@ -1,5 +1,6 @@ mod routing; mod rule; +mod typology; use utoipa_axum::{router::OpenApiRouter, routes}; @@ -21,5 +22,12 @@ pub fn router(store: AppHandle) -> OpenApiRouter { rule::delete::delete_rule_config, rule::get::get_rule, )) + .routes(routes!( + /* typology */ + typology::get_typology::get_typology, + typology::post_typology::update, + typology::delete_typology::delete_typology, + typology::create_typology::create_typology, + )) .with_state(store) } diff --git a/crates/configuration/src/server/http_svc/routes/typology.rs b/crates/configuration/src/server/http_svc/routes/typology.rs new file mode 100644 index 0000000..85a593b --- /dev/null +++ b/crates/configuration/src/server/http_svc/routes/typology.rs @@ -0,0 +1,4 @@ +pub mod create_typology; +pub mod delete_typology; +pub mod get_typology; +pub mod post_typology; diff --git a/crates/configuration/src/server/http_svc/routes/typology/create_typology.rs b/crates/configuration/src/server/http_svc/routes/typology/create_typology.rs new file mode 100644 index 0000000..9f4985a --- /dev/null +++ b/crates/configuration/src/server/http_svc/routes/typology/create_typology.rs @@ -0,0 +1,38 @@ +use axum::{extract::State, response::IntoResponse}; +use warden_core::configuration::typology::{ + TypologyConfiguration, mutate_typologies_server::MutateTypologies, +}; + +use crate::{ + server::{error::AppError, http_svc::TAG_TYPOLOGIES, version::Version}, + state::AppHandle, +}; + +/// Create rule configuration +#[utoipa::path( + post, + path = "/{version}/typology", + params( + ("version" = Version, Path, description = "API version, e.g., v1, v2, v3"), + ), + responses(( + status = CREATED, + body = TypologyConfiguration, + )), + operation_id = "create_typology_configuration", // https://github.com/juhaku/utoipa/issues/1170 + tag = TAG_TYPOLOGIES, + ) +] +#[axum::debug_handler] +#[tracing::instrument(skip(state))] +pub async fn create_typology( + version: Version, + State(state): State<AppHandle>, + axum::Json(body): axum::Json<TypologyConfiguration>, +) -> Result<impl IntoResponse, AppError> { + let response = state + .create_typology_configuration(tonic::Request::new(body)) + .await? + .into_inner(); + Ok((axum::http::StatusCode::CREATED, axum::Json(response))) +} diff --git a/crates/configuration/src/server/http_svc/routes/typology/delete_typology.rs b/crates/configuration/src/server/http_svc/routes/typology/delete_typology.rs new file mode 100644 index 0000000..0e85e29 --- /dev/null +++ b/crates/configuration/src/server/http_svc/routes/typology/delete_typology.rs @@ -0,0 +1,41 @@ +use axum::extract::{Query, State}; +use tonic::IntoRequest; +use warden_core::configuration::typology::{ + DeleteTypologyConfigurationRequest, TypologyConfiguration, + mutate_typologies_server::MutateTypologies, +}; + +use crate::{ + server::{error::AppError, http_svc::TAG_TYPOLOGIES, version::Version}, + state::AppHandle, +}; + +/// Get the typology configuration +#[utoipa::path( + delete, + path = "/{version}/typology", + responses(( + status = OK, + body = TypologyConfiguration + )), + params( + ("version" = Version, Path, description = "API version, e.g., v1, v2, v3"), + DeleteTypologyConfigurationRequest + ), + operation_id = "delete_typology_configuration", // https://github.com/juhaku/utoipa/issues/1170 + tag = TAG_TYPOLOGIES, + ) +] +#[axum::debug_handler] +#[tracing::instrument(skip(state))] +pub async fn delete_typology( + State(state): State<AppHandle>, + Query(params): Query<DeleteTypologyConfigurationRequest>, +) -> Result<axum::Json<TypologyConfiguration>, AppError> { + let config = state + .delete_typology_configuration(params.into_request()) + .await? + .into_inner(); + + Ok(axum::Json(config)) +} diff --git a/crates/configuration/src/server/http_svc/routes/typology/get_typology.rs b/crates/configuration/src/server/http_svc/routes/typology/get_typology.rs new file mode 100644 index 0000000..4962593 --- /dev/null +++ b/crates/configuration/src/server/http_svc/routes/typology/get_typology.rs @@ -0,0 +1,40 @@ +use axum::extract::{Query, State}; +use tonic::IntoRequest; +use warden_core::configuration::typology::{ + TypologyConfiguration, TypologyConfigurationRequest, query_typologies_server::QueryTypologies, +}; + +use crate::{ + server::{error::AppError, http_svc::TAG_TYPOLOGIES, version::Version}, + state::AppHandle, +}; + +/// Get the typology configuration +#[utoipa::path( + get, + path = "/{version}/typology", + responses(( + status = OK, + body = TypologyConfiguration + )), + params( + ("version" = Version, Path, description = "API version, e.g., v1, v2, v3"), + TypologyConfigurationRequest + ), + operation_id = "get_typology_configuration", // https://github.com/juhaku/utoipa/issues/1170 + tag = TAG_TYPOLOGIES, + ) +] +#[axum::debug_handler] +#[tracing::instrument(skip(state))] +pub async fn get_typology( + State(state): State<AppHandle>, + Query(params): Query<TypologyConfigurationRequest>, +) -> Result<axum::Json<Option<TypologyConfiguration>>, AppError> { + let config = state + .get_typology_configuration(params.into_request()) + .await? + .into_inner(); + + Ok(axum::Json(config.configuration)) +} diff --git a/crates/configuration/src/server/http_svc/routes/typology/post_typology.rs b/crates/configuration/src/server/http_svc/routes/typology/post_typology.rs new file mode 100644 index 0000000..2864372 --- /dev/null +++ b/crates/configuration/src/server/http_svc/routes/typology/post_typology.rs @@ -0,0 +1,39 @@ +use axum::extract::State; +use warden_core::configuration::typology::{ + TypologyConfiguration, UpdateTypologyConfigRequest, mutate_typologies_server::MutateTypologies, +}; + +use crate::{ + server::{error::AppError, http_svc::TAG_TYPOLOGIES, version::Version}, + state::AppHandle, +}; + +/// Update typology configuration +#[utoipa::path( + put, + path = "/{version}/typology", + params( + ("version" = Version, Path, description = "API version, e.g., v1, v2, v3"), + ), + responses(( + status = OK, + body = TypologyConfiguration + )), + operation_id = "update_typology_configuration", // https://github.com/juhaku/utoipa/issues/1170 + tag = TAG_TYPOLOGIES, + ) +] +#[axum::debug_handler] +#[tracing::instrument(skip(state))] +pub async fn update( + State(state): State<AppHandle>, + axum::Json(body): axum::Json<TypologyConfiguration>, +) -> Result<axum::Json<TypologyConfiguration>, AppError> { + let response = state + .update_typology_configuration(tonic::Request::new(UpdateTypologyConfigRequest { + configuration: Some(body), + })) + .await? + .into_inner(); + Ok(axum::Json(response)) +} diff --git a/crates/configuration/src/state.rs b/crates/configuration/src/state.rs index f337a54..6952932 100644 --- a/crates/configuration/src/state.rs +++ b/crates/configuration/src/state.rs @@ -1,6 +1,7 @@ mod cache_key; mod routing; mod rule; +mod typology; use async_nats::jetstream::Context; use opentelemetry_semantic_conventions::attribute; diff --git a/crates/configuration/src/state/cache_key.rs b/crates/configuration/src/state/cache_key.rs index a99700e..dd2ad12 100644 --- a/crates/configuration/src/state/cache_key.rs +++ b/crates/configuration/src/state/cache_key.rs @@ -5,6 +5,7 @@ pub enum CacheKey<'a> { ActiveRouting, Routing(&'a uuid::Uuid), Rule { id: &'a str, version: &'a str }, + Typology { id: &'a str, version: &'a str }, } impl ToRedisArgs for CacheKey<'_> { @@ -16,6 +17,7 @@ impl ToRedisArgs for CacheKey<'_> { CacheKey::ActiveRouting => "routing.active".into(), CacheKey::Routing(uuid) => format!("routing.{uuid}"), CacheKey::Rule { id, version } => format!("rule.{id}.{version}"), + CacheKey::Typology { id, version } => format!("typology.{id}.{version}"), }; out.write_arg(value.as_bytes()); diff --git a/crates/configuration/src/state/typology.rs b/crates/configuration/src/state/typology.rs new file mode 100644 index 0000000..1c7090a --- /dev/null +++ b/crates/configuration/src/state/typology.rs @@ -0,0 +1,19 @@ +use warden_core::configuration::typology::{TypologyConfiguration, TypologyConfigurationRequest}; + +use crate::state::cache_key::CacheKey; + +pub mod mutate_typology; +pub mod query_typology; + +pub struct TypologyRow { + pub configuration: sqlx::types::Json<TypologyConfiguration>, +} + +impl<'a> From<&'a TypologyConfigurationRequest> for CacheKey<'a> { + fn from(value: &'a TypologyConfigurationRequest) -> Self { + Self::Typology { + id: &value.id, + version: &value.version, + } + } +} diff --git a/crates/configuration/src/state/typology/mutate_typology.rs b/crates/configuration/src/state/typology/mutate_typology.rs new file mode 100644 index 0000000..f2ab2cc --- /dev/null +++ b/crates/configuration/src/state/typology/mutate_typology.rs @@ -0,0 +1,170 @@ +use opentelemetry_semantic_conventions::attribute; +use tonic::{Request, Response, Status, async_trait}; +use tracing::{Instrument, error, info_span}; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use uuid::Uuid; +use warden_core::configuration::{ + ConfigKind, ReloadEvent, + typology::{ + DeleteTypologyConfigurationRequest, TypologyConfiguration, UpdateTypologyConfigRequest, + mutate_typologies_server::MutateTypologies, + }, +}; + +use crate::state::{ + AppHandle, cache_key::CacheKey, invalidate_cache, publish_reload, typology::TypologyRow, +}; + +#[async_trait] +impl MutateTypologies for AppHandle { + async fn create_typology_configuration( + &self, + request: Request<TypologyConfiguration>, + ) -> Result<Response<TypologyConfiguration>, Status> { + let request = request.into_inner(); + let span = info_span!("create.configuration.typology"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "insert"); + span.set_attribute(attribute::DB_COLLECTION_NAME, "typology"); + span.set_attribute("otel.kind", "client"); + + sqlx::query!( + "insert into typology (uuid, configuration) values ($1, $2)", + Uuid::now_v7(), + sqlx::types::Json(&request) as _, + ) + .execute(&self.services.postgres) + .instrument(span) + .await + .map_err(|e| { + error!("{e}"); + tonic::Status::internal(e.to_string()) + })?; + + Ok(tonic::Response::new(request)) + } + + async fn update_typology_configuration( + &self, + request: Request<UpdateTypologyConfigRequest>, + ) -> Result<Response<TypologyConfiguration>, Status> { + let conf = self + .app_config + .nats + .subject + .split(".") + .next() + .expect("bad config"); + + let request = request.into_inner(); + + let config = request.configuration.expect("configuration to be provided"); + + let span = info_span!("update.configuration.typology"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "update"); + span.set_attribute(attribute::DB_COLLECTION_NAME, "typology"); + span.set_attribute("otel.kind", "client"); + + sqlx::query!( + r#" + update typology + set configuration = $1 + where id = $2 and version = $3 + "#, + sqlx::types::Json(&config) as _, + config.id, + config.version, + ) + .execute(&self.services.postgres) + .instrument(span) + .await + .map_err(|e| { + error!("{e}"); + tonic::Status::internal(e.to_string()) + })?; + + let (_del_result, _publish_result) = tokio::try_join!( + invalidate_cache( + self, + CacheKey::Typology { + id: &config.id, + version: &config.version, + } + ), + publish_reload( + self, + conf, + ReloadEvent { + kind: ConfigKind::Typology.into(), + id: Some(config.id.to_owned()), + version: Some(config.version.to_owned()), + } + ) + )?; + + Ok(Response::new(config)) + } + + async fn delete_typology_configuration( + &self, + request: Request<DeleteTypologyConfigurationRequest>, + ) -> Result<Response<TypologyConfiguration>, Status> { + let conf = self + .app_config + .nats + .subject + .split(".") + .next() + .expect("bad config"); + + let request = request.into_inner(); + + let span = info_span!("delete.configuration.typology"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "delete"); + span.set_attribute(attribute::DB_COLLECTION_NAME, "typology"); + span.set_attribute("otel.kind", "client"); + + let updated = sqlx::query_as!( + TypologyRow, + r#" + delete from typology + where id = $1 and version = $2 + returning configuration as "configuration: sqlx::types::Json<TypologyConfiguration>" + "#, + request.id, + request.version, + ) + .fetch_one(&self.services.postgres) + .instrument(span) + .await + .map_err(|e| { + error!("{e}"); + tonic::Status::internal(e.to_string()) + })?; + + let (_del_result, _publish_result) = tokio::try_join!( + invalidate_cache( + self, + CacheKey::Typology { + id: &request.id, + version: &request.version, + } + ), + publish_reload( + self, + conf, + ReloadEvent { + kind: ConfigKind::Typology.into(), + id: Some(request.id.to_owned()), + version: Some(request.version.to_owned()), + } + ) + )?; + + let res = updated.configuration.0; + + Ok(Response::new(res)) + } +} diff --git a/crates/configuration/src/state/typology/query_typology.rs b/crates/configuration/src/state/typology/query_typology.rs new file mode 100644 index 0000000..c71317b --- /dev/null +++ b/crates/configuration/src/state/typology/query_typology.rs @@ -0,0 +1,88 @@ +use opentelemetry_semantic_conventions::attribute; +use prost::Message; +use tonic::{Request, Response, Status, async_trait}; +use tracing::{Instrument, debug, info_span, instrument, warn}; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use warden_core::configuration::typology::{ + GetTypologyConfigResponse, TypologyConfiguration, TypologyConfigurationRequest, + query_typologies_server::QueryTypologies, +}; +use warden_stack::redis::AsyncCommands; + +use crate::state::{AppHandle, cache_key::CacheKey, typology::TypologyRow}; + +#[async_trait] +impl QueryTypologies for AppHandle { + #[instrument(skip(self, request), Err(Debug))] + async fn get_typology_configuration( + &self, + request: Request<TypologyConfigurationRequest>, + ) -> Result<Response<GetTypologyConfigResponse>, Status> { + let data = request.into_inner(); + let mut cache = self + .services + .cache + .get() + .await + .map_err(|e| tonic::Status::internal(e.to_string()))?; + + let key = CacheKey::from(&data); + + let configuration = cache.get::<_, Vec<u8>>(&key).await.map(|value| { + if !value.is_empty() { + TypologyConfiguration::decode(value.as_ref()).ok() + } else { + None + } + }); + + if let Ok(Some(typology_config)) = configuration { + return Ok(tonic::Response::new(GetTypologyConfigResponse { + configuration: Some(typology_config), + })); + } + + let span = info_span!("get.typology"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "select"); + span.set_attribute(attribute::DB_COLLECTION_NAME, "typology"); + span.set_attribute("otel.kind", "client"); + + let config = sqlx::query_as!( + TypologyRow, + r#"select configuration as "configuration: sqlx::types::Json<TypologyConfiguration>" from typology where + id = $1 and version = $2"#, + data.id, + data.version, + ) + .fetch_optional(&self.services.postgres) + .instrument(span) + .await.map_err(|e| tonic::Status::internal(e.to_string()))?; + + let config = config.map(|transaction| { + debug!(id = ?transaction.configuration.0.id, "found config"); + transaction.configuration.0 + }); + + match config { + Some(config) => { + let bytes = config.encode_to_vec(); + let span = info_span!("cache.set"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey"); + span.set_attribute(attribute::DB_OPERATION_NAME, "set"); + span.set_attribute("otel.kind", "client"); + + if let Err(e) = cache.set::<_, _, ()>(&key, bytes).instrument(span).await { + warn!("{e}"); + }; + + Ok(tonic::Response::new(GetTypologyConfigResponse { + configuration: Some(config), + })) + } + None => Ok(tonic::Response::new(GetTypologyConfigResponse { + configuration: None, + })), + } + } +} |