aboutsummaryrefslogtreecommitdiffstats
path: root/crates/typologies/src/processor/reload.rs
diff options
context:
space:
mode:
authorrtkay123 <dev@kanjala.com>2025-08-17 20:02:49 +0200
committerGitHub <noreply@github.com>2025-08-17 20:02:49 +0200
commit73d7bab8844bb21c7a9143c30800c2d11d411e42 (patch)
tree955290bd2bded56b534738d6320216fbeeb708cb /crates/typologies/src/processor/reload.rs
parent725739985d853b07d73fa7fcd6db1f2f1b0000b6 (diff)
downloadwarden-73d7bab8844bb21c7a9143c30800c2d11d411e42.tar.bz2
warden-73d7bab8844bb21c7a9143c30800c2d11d411e42.zip
feat: typology processor (#8)
Diffstat (limited to 'crates/typologies/src/processor/reload.rs')
-rw-r--r--crates/typologies/src/processor/reload.rs71
1 files changed, 71 insertions, 0 deletions
diff --git a/crates/typologies/src/processor/reload.rs b/crates/typologies/src/processor/reload.rs
new file mode 100644
index 0000000..fac4c40
--- /dev/null
+++ b/crates/typologies/src/processor/reload.rs
@@ -0,0 +1,71 @@
+use async_nats::jetstream::consumer;
+use futures_util::StreamExt;
+use prost::Message as _;
+use tracing::{error, info, trace};
+use uuid::Uuid;
+use warden_core::configuration::{ConfigKind, ReloadEvent, typology::TypologyConfigurationRequest};
+
+use crate::state::AppHandle;
+
+pub async fn reload(state: AppHandle) -> anyhow::Result<()> {
+ let id = Uuid::now_v7().to_string();
+ info!(durable = id, "listening for configuration changes");
+
+ let durable = &id;
+ let consumer = state
+ .services
+ .jetstream
+ .get_stream(state.config.nats.config.stream.to_string())
+ .await?
+ .get_or_create_consumer(
+ durable,
+ consumer::pull::Config {
+ durable_name: Some(durable.to_string()),
+ filter_subject: state.config.nats.config.reload_subject.to_string(),
+ deliver_policy: consumer::DeliverPolicy::LastPerSubject,
+ ..Default::default()
+ },
+ )
+ .await?;
+
+ let mut messages = consumer.messages().await?;
+ while let Some(value) = messages.next().await {
+ match value {
+ Ok(message) => {
+ trace!("got reload cache event");
+ if let Ok(res) = ReloadEvent::decode(message.payload.as_ref())
+ && let Ok(kind) = ConfigKind::try_from(res.kind)
+ {
+ match kind {
+ ConfigKind::Typology => {
+ let local_cache = state.local_cache.write().await;
+ let id = res.id();
+ let version = res.version();
+ trace!(
+ id = id,
+ ver = version,
+ "update triggered, invalidating typology config"
+ );
+ let key = TypologyConfigurationRequest {
+ id: id.to_string(),
+ version: version.to_string(),
+ };
+
+ local_cache.invalidate(&key).await;
+ let _ = message.ack().await.inspect_err(|e| error!("{e}"));
+ }
+ _ => {
+ trace!(kind = ?kind, "detected reload event, nothing to do here, acknowledging");
+ let _ = message.ack().await.inspect_err(|e| error!("{e}"));
+ }
+ }
+ }
+ }
+ Err(e) => {
+ error!("{e:?}")
+ }
+ }
+ }
+
+ Ok(())
+}