aboutsummaryrefslogtreecommitdiffstats
path: root/crates/rule-executor/src/processor
diff options
context:
space:
mode:
authorrtkay123 <dev@kanjala.com>2025-08-16 00:47:43 +0200
committerrtkay123 <dev@kanjala.com>2025-08-16 00:47:43 +0200
commit000885c1d5a23eb353c3f490e32363010ca804d3 (patch)
tree74f320a969b45f765a4826f31ee88064822cdccd /crates/rule-executor/src/processor
parent4a82b6db8a1278588b97b874bad468ec6f7cda6c (diff)
downloadwarden-000885c1d5a23eb353c3f490e32363010ca804d3.tar.bz2
warden-000885c1d5a23eb353c3f490e32363010ca804d3.zip
feat(config): identify resource to reload
Diffstat (limited to 'crates/rule-executor/src/processor')
-rw-r--r--crates/rule-executor/src/processor/reload.rs38
1 files changed, 25 insertions, 13 deletions
diff --git a/crates/rule-executor/src/processor/reload.rs b/crates/rule-executor/src/processor/reload.rs
index a111948..385c7ab 100644
--- a/crates/rule-executor/src/processor/reload.rs
+++ b/crates/rule-executor/src/processor/reload.rs
@@ -1,8 +1,9 @@
use async_nats::jetstream::consumer;
use futures_util::StreamExt;
-use tracing::{debug, error, info};
+use prost::Message as _;
+use tracing::{error, info, trace};
use uuid::Uuid;
-use warden_core::configuration::ReloadEvent;
+use warden_core::configuration::{ConfigKind, ReloadEvent, rule::RuleConfigurationRequest};
use crate::state::AppHandle;
@@ -31,26 +32,37 @@ pub async fn reload(state: AppHandle) -> anyhow::Result<()> {
while let Some(value) = messages.next().await {
match value {
Ok(message) => {
- debug!("got reload cache event",);
- if let Ok(Some(event)) = String::from_utf8(message.payload.to_vec())
- .map(|value| ReloadEvent::from_str_name(&value))
+ trace!("got reload cache event");
+ if let Ok(res) = ReloadEvent::decode(message.payload.as_ref())
+ && let Ok(kind) = ConfigKind::try_from(res.kind)
{
- match event {
- // TODO: find exact rule
- ReloadEvent::Rule => {
- let local_cache = state.local_cache.write().await;
- local_cache.invalidate_all();
+ match kind {
+ ConfigKind::Routing => {
+ trace!(kind = ?kind, "detected reload event, nothing to do here, acknowledging");
let _ = message.ack().await.inspect_err(|e| error!("{e}"));
}
- _ => {
- debug!(event = ?event, "detected reload event, acknowledging");
+ ConfigKind::Rule => {
+ let local_cache = state.local_cache.write().await;
+ let id = res.id();
+ let version = res.version();
+ trace!(
+ id = id,
+ ver = version,
+ "update triggered, invalidating rule config"
+ );
+ let key = RuleConfigurationRequest {
+ id: id.to_string(),
+ version: version.to_string(),
+ };
+
+ local_cache.invalidate(&key).await;
let _ = message.ack().await.inspect_err(|e| error!("{e}"));
}
}
}
}
Err(e) => {
- error!("{e}")
+ error!("{e:?}")
}
}
}