diff options
author | rtkay123 <dev@kanjala.com> | 2025-08-16 00:47:43 +0200 |
---|---|---|
committer | rtkay123 <dev@kanjala.com> | 2025-08-16 00:47:43 +0200 |
commit | 000885c1d5a23eb353c3f490e32363010ca804d3 (patch) | |
tree | 74f320a969b45f765a4826f31ee88064822cdccd /crates | |
parent | 4a82b6db8a1278588b97b874bad468ec6f7cda6c (diff) | |
download | warden-000885c1d5a23eb353c3f490e32363010ca804d3.tar.bz2 warden-000885c1d5a23eb353c3f490e32363010ca804d3.zip |
feat(config): identify resource to reload
Diffstat (limited to 'crates')
-rw-r--r-- | crates/configuration/src/state.rs | 4 | ||||
-rw-r--r-- | crates/configuration/src/state/routing/mutate_routing.rs | 20 | ||||
-rw-r--r-- | crates/configuration/src/state/rule/mutate_rule.rs | 22 | ||||
-rw-r--r-- | crates/router/src/processor/reload.rs | 20 | ||||
-rw-r--r-- | crates/rule-executor/src/processor/reload.rs | 38 |
5 files changed, 75 insertions, 29 deletions
diff --git a/crates/configuration/src/state.rs b/crates/configuration/src/state.rs index d8f22d5..f337a54 100644 --- a/crates/configuration/src/state.rs +++ b/crates/configuration/src/state.rs @@ -4,6 +4,7 @@ mod rule; use async_nats::jetstream::Context; use opentelemetry_semantic_conventions::attribute; +use prost::Message; use sqlx::PgPool; use std::{ops::Deref, sync::Arc}; use tracing::{Instrument, info_span, instrument, trace}; @@ -91,10 +92,11 @@ pub async fn publish_reload( span.set_attribute(attribute::MESSAGING_SYSTEM, "nats"); span.set_attribute("otel.kind", "producer"); + let bytes = event.encode_to_vec(); state .services .jetstream - .publish(format!("{prefix}.reload"), event.as_str_name().into()) + .publish(format!("{prefix}.reload"), bytes.into()) .instrument(span) .await .map_err(|e| tonic::Status::internal(e.to_string()))?; diff --git a/crates/configuration/src/state/routing/mutate_routing.rs b/crates/configuration/src/state/routing/mutate_routing.rs index 105cf18..9542ba7 100644 --- a/crates/configuration/src/state/routing/mutate_routing.rs +++ b/crates/configuration/src/state/routing/mutate_routing.rs @@ -4,7 +4,7 @@ use tracing::{Instrument, error, info_span, instrument, trace}; use tracing_opentelemetry::OpenTelemetrySpanExt; use uuid::Uuid; use warden_core::configuration::{ - ReloadEvent, + ConfigKind, ReloadEvent, routing::{ DeleteConfigurationRequest, RoutingConfiguration, UpdateRoutingRequest, mutate_routing_server::MutateRouting, @@ -100,7 +100,14 @@ impl MutateRouting for AppHandle { let (_del_result, _publish_result) = tokio::try_join!( invalidate_cache(self, CacheKey::Routing(&id)), - publish_reload(self, conf, ReloadEvent::Routing) + publish_reload( + self, + conf, + ReloadEvent { + kind: ConfigKind::Routing.into(), + ..Default::default() + } + ) )?; let res = updated.configuration.0; @@ -151,7 +158,14 @@ impl MutateRouting for AppHandle { let (_del_result, _publish_result) = tokio::try_join!( invalidate_cache(self, CacheKey::Routing(&id)), - publish_reload(self, conf, ReloadEvent::Routing) + publish_reload( + self, + conf, + ReloadEvent { + kind: ConfigKind::Routing.into(), + ..Default::default() + } + ) )?; let res = updated.configuration.0; diff --git a/crates/configuration/src/state/rule/mutate_rule.rs b/crates/configuration/src/state/rule/mutate_rule.rs index 7b853aa..9c4f393 100644 --- a/crates/configuration/src/state/rule/mutate_rule.rs +++ b/crates/configuration/src/state/rule/mutate_rule.rs @@ -4,7 +4,7 @@ use tracing::{Instrument, error, info_span}; use tracing_opentelemetry::OpenTelemetrySpanExt; use uuid::Uuid; use warden_core::configuration::{ - ReloadEvent, + ConfigKind, ReloadEvent, rule::{ DeleteRuleConfigurationRequest, RuleConfiguration, UpdateRuleRequest, mutate_rule_configuration_server::MutateRuleConfiguration, @@ -92,7 +92,15 @@ impl MutateRuleConfiguration for AppHandle { version: &config.version, } ), - publish_reload(self, conf, ReloadEvent::Rule) + publish_reload( + self, + conf, + ReloadEvent { + kind: ConfigKind::Rule.into(), + id: Some(config.id.to_owned()), + version: Some(config.version.to_owned()), + } + ) )?; Ok(Response::new(config)) @@ -144,7 +152,15 @@ impl MutateRuleConfiguration for AppHandle { version: &request.version, } ), - publish_reload(self, conf, ReloadEvent::Rule) + publish_reload( + self, + conf, + ReloadEvent { + kind: ConfigKind::Rule.into(), + id: Some(request.id.to_owned()), + version: Some(request.version.to_owned()), + } + ) )?; let res = updated.configuration.0; diff --git a/crates/router/src/processor/reload.rs b/crates/router/src/processor/reload.rs index 900b7ce..e15ca07 100644 --- a/crates/router/src/processor/reload.rs +++ b/crates/router/src/processor/reload.rs @@ -1,8 +1,9 @@ use async_nats::jetstream::consumer; use futures_util::StreamExt; -use tracing::{debug, error, info, trace}; +use prost::Message as _; +use tracing::{error, info, trace}; use uuid::Uuid; -use warden_core::configuration::ReloadEvent; +use warden_core::configuration::{ConfigKind, ReloadEvent}; use crate::state::AppHandle; @@ -31,18 +32,19 @@ pub async fn reload(state: AppHandle) -> anyhow::Result<()> { while let Some(value) = messages.next().await { match value { Ok(message) => { - trace!("got reload cache event",); - if let Ok(Some(event)) = String::from_utf8(message.payload.to_vec()) - .map(|value| ReloadEvent::from_str_name(&value)) + trace!("got reload cache event"); + if let Ok(res) = ReloadEvent::decode(message.payload.as_ref()) + && let Ok(kind) = ConfigKind::try_from(res.kind) { - match event { - ReloadEvent::Routing => { + match kind { + ConfigKind::Routing => { + trace!("update triggered, invalidating active routing config"); let local_cache = state.local_cache.write().await; local_cache.invalidate_all(); let _ = message.ack().await.inspect_err(|e| error!("{e}")); } - _ => { - debug!(event = ?event, "detected reload event, acknowledging"); + ConfigKind::Rule => { + trace!(kind = ?kind, "detected reload event, nothing to do here, acknowledging"); let _ = message.ack().await.inspect_err(|e| error!("{e}")); } } diff --git a/crates/rule-executor/src/processor/reload.rs b/crates/rule-executor/src/processor/reload.rs index a111948..385c7ab 100644 --- a/crates/rule-executor/src/processor/reload.rs +++ b/crates/rule-executor/src/processor/reload.rs @@ -1,8 +1,9 @@ use async_nats::jetstream::consumer; use futures_util::StreamExt; -use tracing::{debug, error, info}; +use prost::Message as _; +use tracing::{error, info, trace}; use uuid::Uuid; -use warden_core::configuration::ReloadEvent; +use warden_core::configuration::{ConfigKind, ReloadEvent, rule::RuleConfigurationRequest}; use crate::state::AppHandle; @@ -31,26 +32,37 @@ pub async fn reload(state: AppHandle) -> anyhow::Result<()> { while let Some(value) = messages.next().await { match value { Ok(message) => { - debug!("got reload cache event",); - if let Ok(Some(event)) = String::from_utf8(message.payload.to_vec()) - .map(|value| ReloadEvent::from_str_name(&value)) + trace!("got reload cache event"); + if let Ok(res) = ReloadEvent::decode(message.payload.as_ref()) + && let Ok(kind) = ConfigKind::try_from(res.kind) { - match event { - // TODO: find exact rule - ReloadEvent::Rule => { - let local_cache = state.local_cache.write().await; - local_cache.invalidate_all(); + match kind { + ConfigKind::Routing => { + trace!(kind = ?kind, "detected reload event, nothing to do here, acknowledging"); let _ = message.ack().await.inspect_err(|e| error!("{e}")); } - _ => { - debug!(event = ?event, "detected reload event, acknowledging"); + ConfigKind::Rule => { + let local_cache = state.local_cache.write().await; + let id = res.id(); + let version = res.version(); + trace!( + id = id, + ver = version, + "update triggered, invalidating rule config" + ); + let key = RuleConfigurationRequest { + id: id.to_string(), + version: version.to_string(), + }; + + local_cache.invalidate(&key).await; let _ = message.ack().await.inspect_err(|e| error!("{e}")); } } } } Err(e) => { - error!("{e}") + error!("{e:?}") } } } |