From 1968002d656383069a386bd874c9f0cc83e3116e Mon Sep 17 00:00:00 2001 From: rtkay123 Date: Fri, 15 Aug 2025 19:36:22 +0200 Subject: feat(rule-exec): receive messages --- crates/rule-executor/src/cnfg.rs | 26 ++++++ crates/rule-executor/src/main.rs | 60 ++++++++++++++ crates/rule-executor/src/processor.rs | 114 ++++++++++++++++++++++++++ crates/rule-executor/src/processor/publish.rs | 1 + crates/rule-executor/src/processor/reload.rs | 59 +++++++++++++ crates/rule-executor/src/processor/rule.rs | 1 + crates/rule-executor/src/state.rs | 54 ++++++++++++ 7 files changed, 315 insertions(+) create mode 100644 crates/rule-executor/src/cnfg.rs create mode 100644 crates/rule-executor/src/main.rs create mode 100644 crates/rule-executor/src/processor.rs create mode 100644 crates/rule-executor/src/processor/publish.rs create mode 100644 crates/rule-executor/src/processor/reload.rs create mode 100644 crates/rule-executor/src/processor/rule.rs create mode 100644 crates/rule-executor/src/state.rs (limited to 'crates/rule-executor/src') diff --git a/crates/rule-executor/src/cnfg.rs b/crates/rule-executor/src/cnfg.rs new file mode 100644 index 0000000..eac1c2d --- /dev/null +++ b/crates/rule-executor/src/cnfg.rs @@ -0,0 +1,26 @@ +use std::sync::Arc; + +use serde::Deserialize; + +#[derive(Deserialize, Clone)] +#[serde(rename_all = "kebab-case")] +pub struct LocalConfig { + pub config_endpoint: Arc, + pub nats: Nats, +} + +#[derive(Deserialize, Clone)] +#[serde(rename_all = "kebab-case")] +pub struct Nats { + pub name: Arc, + pub subjects: Arc<[String]>, + pub durable_name: Arc, + pub destination_prefix: Arc, + pub config: ConfigNats, +} + +#[derive(Deserialize, Clone)] +pub struct ConfigNats { + pub stream: Arc, + pub reload_subject: Arc, +} diff --git a/crates/rule-executor/src/main.rs b/crates/rule-executor/src/main.rs new file mode 100644 index 0000000..18c9222 --- /dev/null +++ b/crates/rule-executor/src/main.rs @@ -0,0 +1,60 @@ +mod cnfg; +mod processor; +mod state; + +use anyhow::Result; +use clap::Parser; +use tracing::error; +use warden_stack::{Configuration, Services, tracing::Tracing}; + +/// rule-executor +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +struct Args { + /// Path to config file + #[arg(short, long)] + config_file: Option, +} + +#[tokio::main] +async fn main() -> Result<()> { + let args = Args::parse(); + let config = include_str!("../rule-executor.toml"); + + let mut config = config::Config::builder() + .add_source(config::File::from_str(config, config::FileFormat::Toml)); + + if let Some(cf) = args.config_file.as_ref().and_then(|v| v.to_str()) { + config = config.add_source(config::File::new(cf, config::FileFormat::Toml)); + }; + + let mut config: Configuration = config.build()?.try_deserialize()?; + config.application.name = env!("CARGO_CRATE_NAME").into(); + config.application.version = env!("CARGO_PKG_VERSION").into(); + + let tracing = Tracing::builder() + .opentelemetry(&config.application, &config.monitoring)? + .loki(&config.application, &config.monitoring)? + .build(&config.monitoring); + + let provider = tracing.otel_provider; + + tokio::spawn(tracing.loki_task); + + let mut services = Services::builder() + .nats_jetstream(&config.nats) + .await + .inspect_err(|e| error!("nats: {e}"))? + .build(); + + let jetstream = services + .jetstream + .take() + .ok_or_else(|| anyhow::anyhow!("jetstream is not ready"))?; + + let services = state::Services { jetstream }; + + processor::serve(services, config, provider) + .await + .inspect_err(|e| error!("{e}")) +} diff --git a/crates/rule-executor/src/processor.rs b/crates/rule-executor/src/processor.rs new file mode 100644 index 0000000..67d0d15 --- /dev/null +++ b/crates/rule-executor/src/processor.rs @@ -0,0 +1,114 @@ +mod publish; +mod reload; +mod rule; + +use std::sync::Arc; + +use anyhow::Result; +use async_nats::jetstream::{ + Context, + consumer::{Consumer, pull}, +}; +use futures_util::{future, StreamExt}; +use tokio::signal; +use tracing::trace; +use warden_stack::{Configuration, tracing::SdkTracerProvider}; + +use crate::{ + cnfg::Nats, + state::{AppHandle, AppState, Services}, +}; + +pub async fn serve( + services: Services, + config: Configuration, + provider: SdkTracerProvider, +) -> anyhow::Result<()> { + let state = Arc::new(AppState::new(services, config).await?); + + tokio::select! { + _ = futures_util::future::try_join(reload::reload(Arc::clone(&state)), run(state)) => {} + _ = shutdown_signal(provider) => {} + }; + + Ok(()) +} + +async fn run(state: AppHandle) -> anyhow::Result<()> { + let config = Arc::clone(&state); + let consumer = get_or_create_stream(&state.services.jetstream, &state.config.nats).await?; + + let limit = None; + + consumer + .messages() + .await? + .for_each_concurrent(limit, |message| { + let state = Arc::clone(&state); + // tokio::spawn(async move { + // if let Ok(message) = message + // && let Err(e) = route::route(message, Arc::clone(&state)).await + // { + // error!("{}", e.to_string()); + // } + // }); + future::ready(()) + }) + .await; + + Ok(()) +} + +async fn get_or_create_stream( + jetstream: &Context, + nats: &Nats, +) -> anyhow::Result> { + trace!(name = ?nats.name, "getting or creating stream"); + let stream = jetstream + .get_or_create_stream(async_nats::jetstream::stream::Config { + name: format!("{}.v{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION")), + subjects: nats.subjects.iter().map(Into::into).collect(), + ..Default::default() + }) + .await?; + let durable = nats.durable_name.to_string(); + // Get or create a pull-based consumer + Ok(stream + .get_or_create_consumer( + durable.as_ref(), + async_nats::jetstream::consumer::pull::Config { + durable_name: Some(durable.to_string()), + ..Default::default() + }, + ) + .await?) +} + +async fn shutdown_signal(provider: SdkTracerProvider) -> Result<()> { + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + }; + + #[cfg(unix)] + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("failed to install signal handler") + .recv() + .await; + }; + + #[cfg(not(unix))] + let terminate = std::future::pending::<()>(); + + tokio::select! { + _ = ctrl_c => { + }, + _ = terminate => { + }, + } + let _ = provider.shutdown(); + + Ok(()) +} diff --git a/crates/rule-executor/src/processor/publish.rs b/crates/rule-executor/src/processor/publish.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/crates/rule-executor/src/processor/publish.rs @@ -0,0 +1 @@ + diff --git a/crates/rule-executor/src/processor/reload.rs b/crates/rule-executor/src/processor/reload.rs new file mode 100644 index 0000000..a111948 --- /dev/null +++ b/crates/rule-executor/src/processor/reload.rs @@ -0,0 +1,59 @@ +use async_nats::jetstream::consumer; +use futures_util::StreamExt; +use tracing::{debug, error, info}; +use uuid::Uuid; +use warden_core::configuration::ReloadEvent; + +use crate::state::AppHandle; + +pub async fn reload(state: AppHandle) -> anyhow::Result<()> { + let id = Uuid::now_v7().to_string(); + info!(durable = id, "listening for configuration changes"); + + let durable = &id; + let consumer = state + .services + .jetstream + .get_stream(state.config.nats.config.stream.to_string()) + .await? + .get_or_create_consumer( + durable, + consumer::pull::Config { + durable_name: Some(durable.to_string()), + filter_subject: state.config.nats.config.reload_subject.to_string(), + deliver_policy: consumer::DeliverPolicy::LastPerSubject, + ..Default::default() + }, + ) + .await?; + + let mut messages = consumer.messages().await?; + while let Some(value) = messages.next().await { + match value { + Ok(message) => { + debug!("got reload cache event",); + if let Ok(Some(event)) = String::from_utf8(message.payload.to_vec()) + .map(|value| ReloadEvent::from_str_name(&value)) + { + match event { + // TODO: find exact rule + ReloadEvent::Rule => { + let local_cache = state.local_cache.write().await; + local_cache.invalidate_all(); + let _ = message.ack().await.inspect_err(|e| error!("{e}")); + } + _ => { + debug!(event = ?event, "detected reload event, acknowledging"); + let _ = message.ack().await.inspect_err(|e| error!("{e}")); + } + } + } + } + Err(e) => { + error!("{e}") + } + } + } + + Ok(()) +} diff --git a/crates/rule-executor/src/processor/rule.rs b/crates/rule-executor/src/processor/rule.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/crates/rule-executor/src/processor/rule.rs @@ -0,0 +1 @@ + diff --git a/crates/rule-executor/src/state.rs b/crates/rule-executor/src/state.rs new file mode 100644 index 0000000..ec59519 --- /dev/null +++ b/crates/rule-executor/src/state.rs @@ -0,0 +1,54 @@ +use std::sync::Arc; + +use async_nats::jetstream::Context; +use moka::future::Cache; +use tokio::sync::RwLock; +use tonic::transport::Endpoint; +use tracing::error; +use warden_core::configuration::{ + routing::{RoutingConfiguration, query_routing_client::QueryRoutingClient}, + rule::RuleConfigurationRequest, +}; +use warden_stack::Configuration; + +use crate::cnfg::LocalConfig; +use warden_middleware::grpc::interceptor::{Intercepted, MyInterceptor}; + +#[derive(Clone)] +pub struct Services { + pub jetstream: Context, +} + +pub type AppHandle = Arc; + +#[derive(Clone)] +pub struct AppState { + pub services: Services, + pub local_cache: Arc>>, + pub config: LocalConfig, + pub query_routing_client: QueryRoutingClient, +} + +impl AppState { + pub async fn new(services: Services, configuration: Configuration) -> anyhow::Result { + let config: LocalConfig = serde_json::from_value(configuration.misc.clone())?; + let channel = Endpoint::new(config.config_endpoint.to_string())? + .connect() + .await + .inspect_err(|e| { + error!( + endpoint = ?config.config_endpoint, + "could not connect to config service: {e}", + ) + })?; + + let query_routing_client = QueryRoutingClient::with_interceptor(channel, MyInterceptor); + + Ok(Self { + services, + config, + local_cache: Arc::new(RwLock::new(Cache::builder().build())), + query_routing_client, + }) + } +} -- cgit v1.2.3