From 552300d437ed3a3c3ecf20049b0f96eb2a9a13c1 Mon Sep 17 00:00:00 2001 From: rtkay123 Date: Fri, 15 Aug 2025 21:05:36 +0200 Subject: feat(exec): get config --- crates/rule-executor/src/cnfg.rs | 2 +- crates/rule-executor/src/processor.rs | 32 ++++++++++------- crates/rule-executor/src/processor/rule.rs | 20 +++++++++++ .../src/processor/rule/configuration.rs | 42 ++++++++++++++++++++++ crates/rule-executor/src/state.rs | 15 ++++---- 5 files changed, 91 insertions(+), 20 deletions(-) create mode 100644 crates/rule-executor/src/processor/rule/configuration.rs (limited to 'crates/rule-executor/src') diff --git a/crates/rule-executor/src/cnfg.rs b/crates/rule-executor/src/cnfg.rs index eac1c2d..bdf3520 100644 --- a/crates/rule-executor/src/cnfg.rs +++ b/crates/rule-executor/src/cnfg.rs @@ -12,7 +12,6 @@ pub struct LocalConfig { #[derive(Deserialize, Clone)] #[serde(rename_all = "kebab-case")] pub struct Nats { - pub name: Arc, pub subjects: Arc<[String]>, pub durable_name: Arc, pub destination_prefix: Arc, @@ -20,6 +19,7 @@ pub struct Nats { } #[derive(Deserialize, Clone)] +#[serde(rename_all = "kebab-case")] pub struct ConfigNats { pub stream: Arc, pub reload_subject: Arc, diff --git a/crates/rule-executor/src/processor.rs b/crates/rule-executor/src/processor.rs index 67d0d15..1efe731 100644 --- a/crates/rule-executor/src/processor.rs +++ b/crates/rule-executor/src/processor.rs @@ -9,9 +9,9 @@ use async_nats::jetstream::{ Context, consumer::{Consumer, pull}, }; -use futures_util::{future, StreamExt}; +use futures_util::{StreamExt, future}; use tokio::signal; -use tracing::trace; +use tracing::{error, trace, warn}; use warden_stack::{Configuration, tracing::SdkTracerProvider}; use crate::{ @@ -35,7 +35,6 @@ pub async fn serve( } async fn run(state: AppHandle) -> anyhow::Result<()> { - let config = Arc::clone(&state); let consumer = get_or_create_stream(&state.services.jetstream, &state.config.nats).await?; let limit = None; @@ -45,13 +44,18 @@ async fn run(state: AppHandle) -> anyhow::Result<()> { .await? .for_each_concurrent(limit, |message| { let state = Arc::clone(&state); - // tokio::spawn(async move { - // if let Ok(message) = message - // && let Err(e) = route::route(message, Arc::clone(&state)).await - // { - // error!("{}", e.to_string()); - // } - // }); + tokio::spawn(async move { + match message { + Ok(message) => { + if let Err(e) = rule::process_rule(message, state).await { + error!("{e:?}"); + } + } + Err(e) => { + warn!("{e:?}"); + } + } + }); future::ready(()) }) .await; @@ -63,10 +67,14 @@ async fn get_or_create_stream( jetstream: &Context, nats: &Nats, ) -> anyhow::Result> { - trace!(name = ?nats.name, "getting or creating stream"); + let name = format!("{}.v{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION")) + .replace(".", "") + .replace("_", ""); + trace!(name = name, subjects = ?nats.subjects, "getting or creating stream"); + let stream = jetstream .get_or_create_stream(async_nats::jetstream::stream::Config { - name: format!("{}.v{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION")), + name, subjects: nats.subjects.iter().map(Into::into).collect(), ..Default::default() }) diff --git a/crates/rule-executor/src/processor/rule.rs b/crates/rule-executor/src/processor/rule.rs index 8b13789..8168c5a 100644 --- a/crates/rule-executor/src/processor/rule.rs +++ b/crates/rule-executor/src/processor/rule.rs @@ -1 +1,21 @@ +use std::sync::Arc; +use anyhow::Result; +mod configuration; + +use async_nats::jetstream::Message; +use warden_core::configuration::rule::RuleConfigurationRequest; + +use crate::state::AppHandle; + +pub async fn process_rule(message: Message, state: AppHandle) -> Result<()> { + let req = create_configuration_request(&message); + + let rule_configuration = configuration::get_configuration(req, Arc::clone(&state)).await?; + + Ok(()) +} + +fn create_configuration_request(message: &Message) -> RuleConfigurationRequest { + todo!() +} diff --git a/crates/rule-executor/src/processor/rule/configuration.rs b/crates/rule-executor/src/processor/rule/configuration.rs new file mode 100644 index 0000000..6e11248 --- /dev/null +++ b/crates/rule-executor/src/processor/rule/configuration.rs @@ -0,0 +1,42 @@ +use anyhow::{Result, anyhow}; +use tracing::{Instrument, instrument, trace, trace_span}; +use warden_core::configuration::rule::{RuleConfiguration, RuleConfigurationRequest}; + +use crate::state::AppHandle; + +#[instrument(skip(state))] +pub(super) async fn get_configuration( + request: RuleConfigurationRequest, + state: AppHandle, +) -> Result { + trace!("checking cache for rule configuration"); + let cache = state.local_cache.read().await; + let config = cache.get(&request).await; + if let Some(config) = config { + trace!("cache hit"); + return Ok(config); + } + trace!("cache miss, asking config service"); + + let mut client = state.query_rule_client.clone(); + + let span = trace_span!( + "get.rule.config", + "otel.kind" = "client", + "rpc.service" = "configuration" + ); + let resp = client + .get_rule_configuration(request.clone()) + .instrument(span) + .await? + .into_inner(); + + let config = resp + .configuration + .ok_or_else(|| anyhow!("missing configuration"))?; + + let mut cache = state.local_cache.write().await; + cache.insert(request, config.clone()).await; + + Ok(config) +} diff --git a/crates/rule-executor/src/state.rs b/crates/rule-executor/src/state.rs index ec59519..efad4ea 100644 --- a/crates/rule-executor/src/state.rs +++ b/crates/rule-executor/src/state.rs @@ -5,9 +5,9 @@ use moka::future::Cache; use tokio::sync::RwLock; use tonic::transport::Endpoint; use tracing::error; -use warden_core::configuration::{ - routing::{RoutingConfiguration, query_routing_client::QueryRoutingClient}, - rule::RuleConfigurationRequest, +use warden_core::configuration::rule::{ + RuleConfiguration, RuleConfigurationRequest, + query_rule_configuration_client::QueryRuleConfigurationClient, }; use warden_stack::Configuration; @@ -24,9 +24,9 @@ pub type AppHandle = Arc; #[derive(Clone)] pub struct AppState { pub services: Services, - pub local_cache: Arc>>, + pub local_cache: Arc>>, pub config: LocalConfig, - pub query_routing_client: QueryRoutingClient, + pub query_rule_client: QueryRuleConfigurationClient, } impl AppState { @@ -42,13 +42,14 @@ impl AppState { ) })?; - let query_routing_client = QueryRoutingClient::with_interceptor(channel, MyInterceptor); + let query_rule_client = + QueryRuleConfigurationClient::with_interceptor(channel, MyInterceptor); Ok(Self { services, config, local_cache: Arc::new(RwLock::new(Cache::builder().build())), - query_routing_client, + query_rule_client, }) } } -- cgit v1.2.3