aboutsummaryrefslogtreecommitdiffstats
path: root/crates/configuration/src
diff options
context:
space:
mode:
Diffstat (limited to 'crates/configuration/src')
-rw-r--r--crates/configuration/src/server/http_svc.rs1
-rw-r--r--crates/configuration/src/server/http_svc/routes.rs8
-rw-r--r--crates/configuration/src/server/http_svc/routes/typology.rs4
-rw-r--r--crates/configuration/src/server/http_svc/routes/typology/create_typology.rs38
-rw-r--r--crates/configuration/src/server/http_svc/routes/typology/delete_typology.rs41
-rw-r--r--crates/configuration/src/server/http_svc/routes/typology/get_typology.rs40
-rw-r--r--crates/configuration/src/server/http_svc/routes/typology/post_typology.rs39
-rw-r--r--crates/configuration/src/state.rs1
-rw-r--r--crates/configuration/src/state/cache_key.rs2
-rw-r--r--crates/configuration/src/state/typology.rs19
-rw-r--r--crates/configuration/src/state/typology/mutate_typology.rs170
-rw-r--r--crates/configuration/src/state/typology/query_typology.rs88
12 files changed, 451 insertions, 0 deletions
diff --git a/crates/configuration/src/server/http_svc.rs b/crates/configuration/src/server/http_svc.rs
index b07aece..423d67d 100644
--- a/crates/configuration/src/server/http_svc.rs
+++ b/crates/configuration/src/server/http_svc.rs
@@ -12,6 +12,7 @@ use crate::state::AppHandle;
const TAG_ROUTING: &str = "Routing";
const TAG_RULES: &str = "Rules";
+const TAG_TYPOLOGIES: &str = "Typologies";
#[derive(OpenApi)]
#[openapi(
diff --git a/crates/configuration/src/server/http_svc/routes.rs b/crates/configuration/src/server/http_svc/routes.rs
index 7663da2..92f3184 100644
--- a/crates/configuration/src/server/http_svc/routes.rs
+++ b/crates/configuration/src/server/http_svc/routes.rs
@@ -1,5 +1,6 @@
mod routing;
mod rule;
+mod typology;
use utoipa_axum::{router::OpenApiRouter, routes};
@@ -21,5 +22,12 @@ pub fn router(store: AppHandle) -> OpenApiRouter {
rule::delete::delete_rule_config,
rule::get::get_rule,
))
+ .routes(routes!(
+ /* typology */
+ typology::get_typology::get_typology,
+ typology::post_typology::update,
+ typology::delete_typology::delete_typology,
+ typology::create_typology::create_typology,
+ ))
.with_state(store)
}
diff --git a/crates/configuration/src/server/http_svc/routes/typology.rs b/crates/configuration/src/server/http_svc/routes/typology.rs
new file mode 100644
index 0000000..85a593b
--- /dev/null
+++ b/crates/configuration/src/server/http_svc/routes/typology.rs
@@ -0,0 +1,4 @@
+pub mod create_typology;
+pub mod delete_typology;
+pub mod get_typology;
+pub mod post_typology;
diff --git a/crates/configuration/src/server/http_svc/routes/typology/create_typology.rs b/crates/configuration/src/server/http_svc/routes/typology/create_typology.rs
new file mode 100644
index 0000000..9f4985a
--- /dev/null
+++ b/crates/configuration/src/server/http_svc/routes/typology/create_typology.rs
@@ -0,0 +1,38 @@
+use axum::{extract::State, response::IntoResponse};
+use warden_core::configuration::typology::{
+ TypologyConfiguration, mutate_typologies_server::MutateTypologies,
+};
+
+use crate::{
+ server::{error::AppError, http_svc::TAG_TYPOLOGIES, version::Version},
+ state::AppHandle,
+};
+
+/// Create rule configuration
+#[utoipa::path(
+ post,
+ path = "/{version}/typology",
+ params(
+ ("version" = Version, Path, description = "API version, e.g., v1, v2, v3"),
+ ),
+ responses((
+ status = CREATED,
+ body = TypologyConfiguration,
+ )),
+ operation_id = "create_typology_configuration", // https://github.com/juhaku/utoipa/issues/1170
+ tag = TAG_TYPOLOGIES,
+ )
+]
+#[axum::debug_handler]
+#[tracing::instrument(skip(state))]
+pub async fn create_typology(
+ version: Version,
+ State(state): State<AppHandle>,
+ axum::Json(body): axum::Json<TypologyConfiguration>,
+) -> Result<impl IntoResponse, AppError> {
+ let response = state
+ .create_typology_configuration(tonic::Request::new(body))
+ .await?
+ .into_inner();
+ Ok((axum::http::StatusCode::CREATED, axum::Json(response)))
+}
diff --git a/crates/configuration/src/server/http_svc/routes/typology/delete_typology.rs b/crates/configuration/src/server/http_svc/routes/typology/delete_typology.rs
new file mode 100644
index 0000000..0e85e29
--- /dev/null
+++ b/crates/configuration/src/server/http_svc/routes/typology/delete_typology.rs
@@ -0,0 +1,41 @@
+use axum::extract::{Query, State};
+use tonic::IntoRequest;
+use warden_core::configuration::typology::{
+ DeleteTypologyConfigurationRequest, TypologyConfiguration,
+ mutate_typologies_server::MutateTypologies,
+};
+
+use crate::{
+ server::{error::AppError, http_svc::TAG_TYPOLOGIES, version::Version},
+ state::AppHandle,
+};
+
+/// Get the typology configuration
+#[utoipa::path(
+ delete,
+ path = "/{version}/typology",
+ responses((
+ status = OK,
+ body = TypologyConfiguration
+ )),
+ params(
+ ("version" = Version, Path, description = "API version, e.g., v1, v2, v3"),
+ DeleteTypologyConfigurationRequest
+ ),
+ operation_id = "delete_typology_configuration", // https://github.com/juhaku/utoipa/issues/1170
+ tag = TAG_TYPOLOGIES,
+ )
+]
+#[axum::debug_handler]
+#[tracing::instrument(skip(state))]
+pub async fn delete_typology(
+ State(state): State<AppHandle>,
+ Query(params): Query<DeleteTypologyConfigurationRequest>,
+) -> Result<axum::Json<TypologyConfiguration>, AppError> {
+ let config = state
+ .delete_typology_configuration(params.into_request())
+ .await?
+ .into_inner();
+
+ Ok(axum::Json(config))
+}
diff --git a/crates/configuration/src/server/http_svc/routes/typology/get_typology.rs b/crates/configuration/src/server/http_svc/routes/typology/get_typology.rs
new file mode 100644
index 0000000..4962593
--- /dev/null
+++ b/crates/configuration/src/server/http_svc/routes/typology/get_typology.rs
@@ -0,0 +1,40 @@
+use axum::extract::{Query, State};
+use tonic::IntoRequest;
+use warden_core::configuration::typology::{
+ TypologyConfiguration, TypologyConfigurationRequest, query_typologies_server::QueryTypologies,
+};
+
+use crate::{
+ server::{error::AppError, http_svc::TAG_TYPOLOGIES, version::Version},
+ state::AppHandle,
+};
+
+/// Get the typology configuration
+#[utoipa::path(
+ get,
+ path = "/{version}/typology",
+ responses((
+ status = OK,
+ body = TypologyConfiguration
+ )),
+ params(
+ ("version" = Version, Path, description = "API version, e.g., v1, v2, v3"),
+ TypologyConfigurationRequest
+ ),
+ operation_id = "get_typology_configuration", // https://github.com/juhaku/utoipa/issues/1170
+ tag = TAG_TYPOLOGIES,
+ )
+]
+#[axum::debug_handler]
+#[tracing::instrument(skip(state))]
+pub async fn get_typology(
+ State(state): State<AppHandle>,
+ Query(params): Query<TypologyConfigurationRequest>,
+) -> Result<axum::Json<Option<TypologyConfiguration>>, AppError> {
+ let config = state
+ .get_typology_configuration(params.into_request())
+ .await?
+ .into_inner();
+
+ Ok(axum::Json(config.configuration))
+}
diff --git a/crates/configuration/src/server/http_svc/routes/typology/post_typology.rs b/crates/configuration/src/server/http_svc/routes/typology/post_typology.rs
new file mode 100644
index 0000000..2864372
--- /dev/null
+++ b/crates/configuration/src/server/http_svc/routes/typology/post_typology.rs
@@ -0,0 +1,39 @@
+use axum::extract::State;
+use warden_core::configuration::typology::{
+ TypologyConfiguration, UpdateTypologyConfigRequest, mutate_typologies_server::MutateTypologies,
+};
+
+use crate::{
+ server::{error::AppError, http_svc::TAG_TYPOLOGIES, version::Version},
+ state::AppHandle,
+};
+
+/// Update typology configuration
+#[utoipa::path(
+ put,
+ path = "/{version}/typology",
+ params(
+ ("version" = Version, Path, description = "API version, e.g., v1, v2, v3"),
+ ),
+ responses((
+ status = OK,
+ body = TypologyConfiguration
+ )),
+ operation_id = "update_typology_configuration", // https://github.com/juhaku/utoipa/issues/1170
+ tag = TAG_TYPOLOGIES,
+ )
+]
+#[axum::debug_handler]
+#[tracing::instrument(skip(state))]
+pub async fn update(
+ State(state): State<AppHandle>,
+ axum::Json(body): axum::Json<TypologyConfiguration>,
+) -> Result<axum::Json<TypologyConfiguration>, AppError> {
+ let response = state
+ .update_typology_configuration(tonic::Request::new(UpdateTypologyConfigRequest {
+ configuration: Some(body),
+ }))
+ .await?
+ .into_inner();
+ Ok(axum::Json(response))
+}
diff --git a/crates/configuration/src/state.rs b/crates/configuration/src/state.rs
index f337a54..6952932 100644
--- a/crates/configuration/src/state.rs
+++ b/crates/configuration/src/state.rs
@@ -1,6 +1,7 @@
mod cache_key;
mod routing;
mod rule;
+mod typology;
use async_nats::jetstream::Context;
use opentelemetry_semantic_conventions::attribute;
diff --git a/crates/configuration/src/state/cache_key.rs b/crates/configuration/src/state/cache_key.rs
index a99700e..dd2ad12 100644
--- a/crates/configuration/src/state/cache_key.rs
+++ b/crates/configuration/src/state/cache_key.rs
@@ -5,6 +5,7 @@ pub enum CacheKey<'a> {
ActiveRouting,
Routing(&'a uuid::Uuid),
Rule { id: &'a str, version: &'a str },
+ Typology { id: &'a str, version: &'a str },
}
impl ToRedisArgs for CacheKey<'_> {
@@ -16,6 +17,7 @@ impl ToRedisArgs for CacheKey<'_> {
CacheKey::ActiveRouting => "routing.active".into(),
CacheKey::Routing(uuid) => format!("routing.{uuid}"),
CacheKey::Rule { id, version } => format!("rule.{id}.{version}"),
+ CacheKey::Typology { id, version } => format!("typology.{id}.{version}"),
};
out.write_arg(value.as_bytes());
diff --git a/crates/configuration/src/state/typology.rs b/crates/configuration/src/state/typology.rs
new file mode 100644
index 0000000..1c7090a
--- /dev/null
+++ b/crates/configuration/src/state/typology.rs
@@ -0,0 +1,19 @@
+use warden_core::configuration::typology::{TypologyConfiguration, TypologyConfigurationRequest};
+
+use crate::state::cache_key::CacheKey;
+
+pub mod mutate_typology;
+pub mod query_typology;
+
+pub struct TypologyRow {
+ pub configuration: sqlx::types::Json<TypologyConfiguration>,
+}
+
+impl<'a> From<&'a TypologyConfigurationRequest> for CacheKey<'a> {
+ fn from(value: &'a TypologyConfigurationRequest) -> Self {
+ Self::Typology {
+ id: &value.id,
+ version: &value.version,
+ }
+ }
+}
diff --git a/crates/configuration/src/state/typology/mutate_typology.rs b/crates/configuration/src/state/typology/mutate_typology.rs
new file mode 100644
index 0000000..f2ab2cc
--- /dev/null
+++ b/crates/configuration/src/state/typology/mutate_typology.rs
@@ -0,0 +1,170 @@
+use opentelemetry_semantic_conventions::attribute;
+use tonic::{Request, Response, Status, async_trait};
+use tracing::{Instrument, error, info_span};
+use tracing_opentelemetry::OpenTelemetrySpanExt;
+use uuid::Uuid;
+use warden_core::configuration::{
+ ConfigKind, ReloadEvent,
+ typology::{
+ DeleteTypologyConfigurationRequest, TypologyConfiguration, UpdateTypologyConfigRequest,
+ mutate_typologies_server::MutateTypologies,
+ },
+};
+
+use crate::state::{
+ AppHandle, cache_key::CacheKey, invalidate_cache, publish_reload, typology::TypologyRow,
+};
+
+#[async_trait]
+impl MutateTypologies for AppHandle {
+ async fn create_typology_configuration(
+ &self,
+ request: Request<TypologyConfiguration>,
+ ) -> Result<Response<TypologyConfiguration>, Status> {
+ let request = request.into_inner();
+ let span = info_span!("create.configuration.typology");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "insert");
+ span.set_attribute(attribute::DB_COLLECTION_NAME, "typology");
+ span.set_attribute("otel.kind", "client");
+
+ sqlx::query!(
+ "insert into typology (uuid, configuration) values ($1, $2)",
+ Uuid::now_v7(),
+ sqlx::types::Json(&request) as _,
+ )
+ .execute(&self.services.postgres)
+ .instrument(span)
+ .await
+ .map_err(|e| {
+ error!("{e}");
+ tonic::Status::internal(e.to_string())
+ })?;
+
+ Ok(tonic::Response::new(request))
+ }
+
+ async fn update_typology_configuration(
+ &self,
+ request: Request<UpdateTypologyConfigRequest>,
+ ) -> Result<Response<TypologyConfiguration>, Status> {
+ let conf = self
+ .app_config
+ .nats
+ .subject
+ .split(".")
+ .next()
+ .expect("bad config");
+
+ let request = request.into_inner();
+
+ let config = request.configuration.expect("configuration to be provided");
+
+ let span = info_span!("update.configuration.typology");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "update");
+ span.set_attribute(attribute::DB_COLLECTION_NAME, "typology");
+ span.set_attribute("otel.kind", "client");
+
+ sqlx::query!(
+ r#"
+ update typology
+ set configuration = $1
+ where id = $2 and version = $3
+ "#,
+ sqlx::types::Json(&config) as _,
+ config.id,
+ config.version,
+ )
+ .execute(&self.services.postgres)
+ .instrument(span)
+ .await
+ .map_err(|e| {
+ error!("{e}");
+ tonic::Status::internal(e.to_string())
+ })?;
+
+ let (_del_result, _publish_result) = tokio::try_join!(
+ invalidate_cache(
+ self,
+ CacheKey::Typology {
+ id: &config.id,
+ version: &config.version,
+ }
+ ),
+ publish_reload(
+ self,
+ conf,
+ ReloadEvent {
+ kind: ConfigKind::Typology.into(),
+ id: Some(config.id.to_owned()),
+ version: Some(config.version.to_owned()),
+ }
+ )
+ )?;
+
+ Ok(Response::new(config))
+ }
+
+ async fn delete_typology_configuration(
+ &self,
+ request: Request<DeleteTypologyConfigurationRequest>,
+ ) -> Result<Response<TypologyConfiguration>, Status> {
+ let conf = self
+ .app_config
+ .nats
+ .subject
+ .split(".")
+ .next()
+ .expect("bad config");
+
+ let request = request.into_inner();
+
+ let span = info_span!("delete.configuration.typology");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "delete");
+ span.set_attribute(attribute::DB_COLLECTION_NAME, "typology");
+ span.set_attribute("otel.kind", "client");
+
+ let updated = sqlx::query_as!(
+ TypologyRow,
+ r#"
+ delete from typology
+ where id = $1 and version = $2
+ returning configuration as "configuration: sqlx::types::Json<TypologyConfiguration>"
+ "#,
+ request.id,
+ request.version,
+ )
+ .fetch_one(&self.services.postgres)
+ .instrument(span)
+ .await
+ .map_err(|e| {
+ error!("{e}");
+ tonic::Status::internal(e.to_string())
+ })?;
+
+ let (_del_result, _publish_result) = tokio::try_join!(
+ invalidate_cache(
+ self,
+ CacheKey::Typology {
+ id: &request.id,
+ version: &request.version,
+ }
+ ),
+ publish_reload(
+ self,
+ conf,
+ ReloadEvent {
+ kind: ConfigKind::Typology.into(),
+ id: Some(request.id.to_owned()),
+ version: Some(request.version.to_owned()),
+ }
+ )
+ )?;
+
+ let res = updated.configuration.0;
+
+ Ok(Response::new(res))
+ }
+}
diff --git a/crates/configuration/src/state/typology/query_typology.rs b/crates/configuration/src/state/typology/query_typology.rs
new file mode 100644
index 0000000..c71317b
--- /dev/null
+++ b/crates/configuration/src/state/typology/query_typology.rs
@@ -0,0 +1,88 @@
+use opentelemetry_semantic_conventions::attribute;
+use prost::Message;
+use tonic::{Request, Response, Status, async_trait};
+use tracing::{Instrument, debug, info_span, instrument, warn};
+use tracing_opentelemetry::OpenTelemetrySpanExt;
+use warden_core::configuration::typology::{
+ GetTypologyConfigResponse, TypologyConfiguration, TypologyConfigurationRequest,
+ query_typologies_server::QueryTypologies,
+};
+use warden_stack::redis::AsyncCommands;
+
+use crate::state::{AppHandle, cache_key::CacheKey, typology::TypologyRow};
+
+#[async_trait]
+impl QueryTypologies for AppHandle {
+ #[instrument(skip(self, request), Err(Debug))]
+ async fn get_typology_configuration(
+ &self,
+ request: Request<TypologyConfigurationRequest>,
+ ) -> Result<Response<GetTypologyConfigResponse>, Status> {
+ let data = request.into_inner();
+ let mut cache = self
+ .services
+ .cache
+ .get()
+ .await
+ .map_err(|e| tonic::Status::internal(e.to_string()))?;
+
+ let key = CacheKey::from(&data);
+
+ let configuration = cache.get::<_, Vec<u8>>(&key).await.map(|value| {
+ if !value.is_empty() {
+ TypologyConfiguration::decode(value.as_ref()).ok()
+ } else {
+ None
+ }
+ });
+
+ if let Ok(Some(typology_config)) = configuration {
+ return Ok(tonic::Response::new(GetTypologyConfigResponse {
+ configuration: Some(typology_config),
+ }));
+ }
+
+ let span = info_span!("get.typology");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "select");
+ span.set_attribute(attribute::DB_COLLECTION_NAME, "typology");
+ span.set_attribute("otel.kind", "client");
+
+ let config = sqlx::query_as!(
+ TypologyRow,
+ r#"select configuration as "configuration: sqlx::types::Json<TypologyConfiguration>" from typology where
+ id = $1 and version = $2"#,
+ data.id,
+ data.version,
+ )
+ .fetch_optional(&self.services.postgres)
+ .instrument(span)
+ .await.map_err(|e| tonic::Status::internal(e.to_string()))?;
+
+ let config = config.map(|transaction| {
+ debug!(id = ?transaction.configuration.0.id, "found config");
+ transaction.configuration.0
+ });
+
+ match config {
+ Some(config) => {
+ let bytes = config.encode_to_vec();
+ let span = info_span!("cache.set");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "set");
+ span.set_attribute("otel.kind", "client");
+
+ if let Err(e) = cache.set::<_, _, ()>(&key, bytes).instrument(span).await {
+ warn!("{e}");
+ };
+
+ Ok(tonic::Response::new(GetTypologyConfigResponse {
+ configuration: Some(config),
+ }))
+ }
+ None => Ok(tonic::Response::new(GetTypologyConfigResponse {
+ configuration: None,
+ })),
+ }
+ }
+}