aboutsummaryrefslogtreecommitdiffstats
path: root/crates/rule-executor/src/processor
diff options
context:
space:
mode:
authorrtkay123 <dev@kanjala.com>2025-08-15 19:36:22 +0200
committerrtkay123 <dev@kanjala.com>2025-08-15 19:36:22 +0200
commit1968002d656383069a386bd874c9f0cc83e3116e (patch)
tree3f37092facf20b1176313428ee6269878529278f /crates/rule-executor/src/processor
parentf5ba1a25cad80bff8c6e01f8d956e212be097ae7 (diff)
downloadwarden-1968002d656383069a386bd874c9f0cc83e3116e.tar.bz2
warden-1968002d656383069a386bd874c9f0cc83e3116e.zip
feat(rule-exec): receive messages
Diffstat (limited to 'crates/rule-executor/src/processor')
-rw-r--r--crates/rule-executor/src/processor/publish.rs1
-rw-r--r--crates/rule-executor/src/processor/reload.rs59
-rw-r--r--crates/rule-executor/src/processor/rule.rs1
3 files changed, 61 insertions, 0 deletions
diff --git a/crates/rule-executor/src/processor/publish.rs b/crates/rule-executor/src/processor/publish.rs
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/crates/rule-executor/src/processor/publish.rs
@@ -0,0 +1 @@
+
diff --git a/crates/rule-executor/src/processor/reload.rs b/crates/rule-executor/src/processor/reload.rs
new file mode 100644
index 0000000..a111948
--- /dev/null
+++ b/crates/rule-executor/src/processor/reload.rs
@@ -0,0 +1,59 @@
+use async_nats::jetstream::consumer;
+use futures_util::StreamExt;
+use tracing::{debug, error, info};
+use uuid::Uuid;
+use warden_core::configuration::ReloadEvent;
+
+use crate::state::AppHandle;
+
+pub async fn reload(state: AppHandle) -> anyhow::Result<()> {
+ let id = Uuid::now_v7().to_string();
+ info!(durable = id, "listening for configuration changes");
+
+ let durable = &id;
+ let consumer = state
+ .services
+ .jetstream
+ .get_stream(state.config.nats.config.stream.to_string())
+ .await?
+ .get_or_create_consumer(
+ durable,
+ consumer::pull::Config {
+ durable_name: Some(durable.to_string()),
+ filter_subject: state.config.nats.config.reload_subject.to_string(),
+ deliver_policy: consumer::DeliverPolicy::LastPerSubject,
+ ..Default::default()
+ },
+ )
+ .await?;
+
+ let mut messages = consumer.messages().await?;
+ while let Some(value) = messages.next().await {
+ match value {
+ Ok(message) => {
+ debug!("got reload cache event",);
+ if let Ok(Some(event)) = String::from_utf8(message.payload.to_vec())
+ .map(|value| ReloadEvent::from_str_name(&value))
+ {
+ match event {
+ // TODO: find exact rule
+ ReloadEvent::Rule => {
+ let local_cache = state.local_cache.write().await;
+ local_cache.invalidate_all();
+ let _ = message.ack().await.inspect_err(|e| error!("{e}"));
+ }
+ _ => {
+ debug!(event = ?event, "detected reload event, acknowledging");
+ let _ = message.ack().await.inspect_err(|e| error!("{e}"));
+ }
+ }
+ }
+ }
+ Err(e) => {
+ error!("{e}")
+ }
+ }
+ }
+
+ Ok(())
+}
diff --git a/crates/rule-executor/src/processor/rule.rs b/crates/rule-executor/src/processor/rule.rs
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/crates/rule-executor/src/processor/rule.rs
@@ -0,0 +1 @@
+