diff options
Diffstat (limited to 'crates/rule-executor/src/processor')
-rw-r--r-- | crates/rule-executor/src/processor/rule.rs | 20 | ||||
-rw-r--r-- | crates/rule-executor/src/processor/rule/configuration.rs | 42 |
2 files changed, 62 insertions, 0 deletions
diff --git a/crates/rule-executor/src/processor/rule.rs b/crates/rule-executor/src/processor/rule.rs index 8b13789..8168c5a 100644 --- a/crates/rule-executor/src/processor/rule.rs +++ b/crates/rule-executor/src/processor/rule.rs @@ -1 +1,21 @@ +use std::sync::Arc; +use anyhow::Result; +mod configuration; + +use async_nats::jetstream::Message; +use warden_core::configuration::rule::RuleConfigurationRequest; + +use crate::state::AppHandle; + +pub async fn process_rule(message: Message, state: AppHandle) -> Result<()> { + let req = create_configuration_request(&message); + + let rule_configuration = configuration::get_configuration(req, Arc::clone(&state)).await?; + + Ok(()) +} + +fn create_configuration_request(message: &Message) -> RuleConfigurationRequest { + todo!() +} diff --git a/crates/rule-executor/src/processor/rule/configuration.rs b/crates/rule-executor/src/processor/rule/configuration.rs new file mode 100644 index 0000000..6e11248 --- /dev/null +++ b/crates/rule-executor/src/processor/rule/configuration.rs @@ -0,0 +1,42 @@ +use anyhow::{Result, anyhow}; +use tracing::{Instrument, instrument, trace, trace_span}; +use warden_core::configuration::rule::{RuleConfiguration, RuleConfigurationRequest}; + +use crate::state::AppHandle; + +#[instrument(skip(state))] +pub(super) async fn get_configuration( + request: RuleConfigurationRequest, + state: AppHandle, +) -> Result<RuleConfiguration> { + trace!("checking cache for rule configuration"); + let cache = state.local_cache.read().await; + let config = cache.get(&request).await; + if let Some(config) = config { + trace!("cache hit"); + return Ok(config); + } + trace!("cache miss, asking config service"); + + let mut client = state.query_rule_client.clone(); + + let span = trace_span!( + "get.rule.config", + "otel.kind" = "client", + "rpc.service" = "configuration" + ); + let resp = client + .get_rule_configuration(request.clone()) + .instrument(span) + .await? + .into_inner(); + + let config = resp + .configuration + .ok_or_else(|| anyhow!("missing configuration"))?; + + let mut cache = state.local_cache.write().await; + cache.insert(request, config.clone()).await; + + Ok(config) +} |