aboutsummaryrefslogtreecommitdiffstats
path: root/crates/rule-executor/src
diff options
context:
space:
mode:
authorrtkay123 <dev@kanjala.com>2025-08-15 19:36:22 +0200
committerrtkay123 <dev@kanjala.com>2025-08-15 19:36:22 +0200
commit1968002d656383069a386bd874c9f0cc83e3116e (patch)
tree3f37092facf20b1176313428ee6269878529278f /crates/rule-executor/src
parentf5ba1a25cad80bff8c6e01f8d956e212be097ae7 (diff)
downloadwarden-1968002d656383069a386bd874c9f0cc83e3116e.tar.bz2
warden-1968002d656383069a386bd874c9f0cc83e3116e.zip
feat(rule-exec): receive messages
Diffstat (limited to 'crates/rule-executor/src')
-rw-r--r--crates/rule-executor/src/cnfg.rs26
-rw-r--r--crates/rule-executor/src/main.rs60
-rw-r--r--crates/rule-executor/src/processor.rs114
-rw-r--r--crates/rule-executor/src/processor/publish.rs1
-rw-r--r--crates/rule-executor/src/processor/reload.rs59
-rw-r--r--crates/rule-executor/src/processor/rule.rs1
-rw-r--r--crates/rule-executor/src/state.rs54
7 files changed, 315 insertions, 0 deletions
diff --git a/crates/rule-executor/src/cnfg.rs b/crates/rule-executor/src/cnfg.rs
new file mode 100644
index 0000000..eac1c2d
--- /dev/null
+++ b/crates/rule-executor/src/cnfg.rs
@@ -0,0 +1,26 @@
+use std::sync::Arc;
+
+use serde::Deserialize;
+
+#[derive(Deserialize, Clone)]
+#[serde(rename_all = "kebab-case")]
+pub struct LocalConfig {
+ pub config_endpoint: Arc<str>,
+ pub nats: Nats,
+}
+
+#[derive(Deserialize, Clone)]
+#[serde(rename_all = "kebab-case")]
+pub struct Nats {
+ pub name: Arc<str>,
+ pub subjects: Arc<[String]>,
+ pub durable_name: Arc<str>,
+ pub destination_prefix: Arc<str>,
+ pub config: ConfigNats,
+}
+
+#[derive(Deserialize, Clone)]
+pub struct ConfigNats {
+ pub stream: Arc<str>,
+ pub reload_subject: Arc<str>,
+}
diff --git a/crates/rule-executor/src/main.rs b/crates/rule-executor/src/main.rs
new file mode 100644
index 0000000..18c9222
--- /dev/null
+++ b/crates/rule-executor/src/main.rs
@@ -0,0 +1,60 @@
+mod cnfg;
+mod processor;
+mod state;
+
+use anyhow::Result;
+use clap::Parser;
+use tracing::error;
+use warden_stack::{Configuration, Services, tracing::Tracing};
+
+/// rule-executor
+#[derive(Parser, Debug)]
+#[command(version, about, long_about = None)]
+struct Args {
+ /// Path to config file
+ #[arg(short, long)]
+ config_file: Option<std::path::PathBuf>,
+}
+
+#[tokio::main]
+async fn main() -> Result<()> {
+ let args = Args::parse();
+ let config = include_str!("../rule-executor.toml");
+
+ let mut config = config::Config::builder()
+ .add_source(config::File::from_str(config, config::FileFormat::Toml));
+
+ if let Some(cf) = args.config_file.as_ref().and_then(|v| v.to_str()) {
+ config = config.add_source(config::File::new(cf, config::FileFormat::Toml));
+ };
+
+ let mut config: Configuration = config.build()?.try_deserialize()?;
+ config.application.name = env!("CARGO_CRATE_NAME").into();
+ config.application.version = env!("CARGO_PKG_VERSION").into();
+
+ let tracing = Tracing::builder()
+ .opentelemetry(&config.application, &config.monitoring)?
+ .loki(&config.application, &config.monitoring)?
+ .build(&config.monitoring);
+
+ let provider = tracing.otel_provider;
+
+ tokio::spawn(tracing.loki_task);
+
+ let mut services = Services::builder()
+ .nats_jetstream(&config.nats)
+ .await
+ .inspect_err(|e| error!("nats: {e}"))?
+ .build();
+
+ let jetstream = services
+ .jetstream
+ .take()
+ .ok_or_else(|| anyhow::anyhow!("jetstream is not ready"))?;
+
+ let services = state::Services { jetstream };
+
+ processor::serve(services, config, provider)
+ .await
+ .inspect_err(|e| error!("{e}"))
+}
diff --git a/crates/rule-executor/src/processor.rs b/crates/rule-executor/src/processor.rs
new file mode 100644
index 0000000..67d0d15
--- /dev/null
+++ b/crates/rule-executor/src/processor.rs
@@ -0,0 +1,114 @@
+mod publish;
+mod reload;
+mod rule;
+
+use std::sync::Arc;
+
+use anyhow::Result;
+use async_nats::jetstream::{
+ Context,
+ consumer::{Consumer, pull},
+};
+use futures_util::{future, StreamExt};
+use tokio::signal;
+use tracing::trace;
+use warden_stack::{Configuration, tracing::SdkTracerProvider};
+
+use crate::{
+ cnfg::Nats,
+ state::{AppHandle, AppState, Services},
+};
+
+pub async fn serve(
+ services: Services,
+ config: Configuration,
+ provider: SdkTracerProvider,
+) -> anyhow::Result<()> {
+ let state = Arc::new(AppState::new(services, config).await?);
+
+ tokio::select! {
+ _ = futures_util::future::try_join(reload::reload(Arc::clone(&state)), run(state)) => {}
+ _ = shutdown_signal(provider) => {}
+ };
+
+ Ok(())
+}
+
+async fn run(state: AppHandle) -> anyhow::Result<()> {
+ let config = Arc::clone(&state);
+ let consumer = get_or_create_stream(&state.services.jetstream, &state.config.nats).await?;
+
+ let limit = None;
+
+ consumer
+ .messages()
+ .await?
+ .for_each_concurrent(limit, |message| {
+ let state = Arc::clone(&state);
+ // tokio::spawn(async move {
+ // if let Ok(message) = message
+ // && let Err(e) = route::route(message, Arc::clone(&state)).await
+ // {
+ // error!("{}", e.to_string());
+ // }
+ // });
+ future::ready(())
+ })
+ .await;
+
+ Ok(())
+}
+
+async fn get_or_create_stream(
+ jetstream: &Context,
+ nats: &Nats,
+) -> anyhow::Result<Consumer<pull::Config>> {
+ trace!(name = ?nats.name, "getting or creating stream");
+ let stream = jetstream
+ .get_or_create_stream(async_nats::jetstream::stream::Config {
+ name: format!("{}.v{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION")),
+ subjects: nats.subjects.iter().map(Into::into).collect(),
+ ..Default::default()
+ })
+ .await?;
+ let durable = nats.durable_name.to_string();
+ // Get or create a pull-based consumer
+ Ok(stream
+ .get_or_create_consumer(
+ durable.as_ref(),
+ async_nats::jetstream::consumer::pull::Config {
+ durable_name: Some(durable.to_string()),
+ ..Default::default()
+ },
+ )
+ .await?)
+}
+
+async fn shutdown_signal(provider: SdkTracerProvider) -> Result<()> {
+ let ctrl_c = async {
+ signal::ctrl_c()
+ .await
+ .expect("failed to install Ctrl+C handler");
+ };
+
+ #[cfg(unix)]
+ let terminate = async {
+ signal::unix::signal(signal::unix::SignalKind::terminate())
+ .expect("failed to install signal handler")
+ .recv()
+ .await;
+ };
+
+ #[cfg(not(unix))]
+ let terminate = std::future::pending::<()>();
+
+ tokio::select! {
+ _ = ctrl_c => {
+ },
+ _ = terminate => {
+ },
+ }
+ let _ = provider.shutdown();
+
+ Ok(())
+}
diff --git a/crates/rule-executor/src/processor/publish.rs b/crates/rule-executor/src/processor/publish.rs
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/crates/rule-executor/src/processor/publish.rs
@@ -0,0 +1 @@
+
diff --git a/crates/rule-executor/src/processor/reload.rs b/crates/rule-executor/src/processor/reload.rs
new file mode 100644
index 0000000..a111948
--- /dev/null
+++ b/crates/rule-executor/src/processor/reload.rs
@@ -0,0 +1,59 @@
+use async_nats::jetstream::consumer;
+use futures_util::StreamExt;
+use tracing::{debug, error, info};
+use uuid::Uuid;
+use warden_core::configuration::ReloadEvent;
+
+use crate::state::AppHandle;
+
+pub async fn reload(state: AppHandle) -> anyhow::Result<()> {
+ let id = Uuid::now_v7().to_string();
+ info!(durable = id, "listening for configuration changes");
+
+ let durable = &id;
+ let consumer = state
+ .services
+ .jetstream
+ .get_stream(state.config.nats.config.stream.to_string())
+ .await?
+ .get_or_create_consumer(
+ durable,
+ consumer::pull::Config {
+ durable_name: Some(durable.to_string()),
+ filter_subject: state.config.nats.config.reload_subject.to_string(),
+ deliver_policy: consumer::DeliverPolicy::LastPerSubject,
+ ..Default::default()
+ },
+ )
+ .await?;
+
+ let mut messages = consumer.messages().await?;
+ while let Some(value) = messages.next().await {
+ match value {
+ Ok(message) => {
+ debug!("got reload cache event",);
+ if let Ok(Some(event)) = String::from_utf8(message.payload.to_vec())
+ .map(|value| ReloadEvent::from_str_name(&value))
+ {
+ match event {
+ // TODO: find exact rule
+ ReloadEvent::Rule => {
+ let local_cache = state.local_cache.write().await;
+ local_cache.invalidate_all();
+ let _ = message.ack().await.inspect_err(|e| error!("{e}"));
+ }
+ _ => {
+ debug!(event = ?event, "detected reload event, acknowledging");
+ let _ = message.ack().await.inspect_err(|e| error!("{e}"));
+ }
+ }
+ }
+ }
+ Err(e) => {
+ error!("{e}")
+ }
+ }
+ }
+
+ Ok(())
+}
diff --git a/crates/rule-executor/src/processor/rule.rs b/crates/rule-executor/src/processor/rule.rs
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/crates/rule-executor/src/processor/rule.rs
@@ -0,0 +1 @@
+
diff --git a/crates/rule-executor/src/state.rs b/crates/rule-executor/src/state.rs
new file mode 100644
index 0000000..ec59519
--- /dev/null
+++ b/crates/rule-executor/src/state.rs
@@ -0,0 +1,54 @@
+use std::sync::Arc;
+
+use async_nats::jetstream::Context;
+use moka::future::Cache;
+use tokio::sync::RwLock;
+use tonic::transport::Endpoint;
+use tracing::error;
+use warden_core::configuration::{
+ routing::{RoutingConfiguration, query_routing_client::QueryRoutingClient},
+ rule::RuleConfigurationRequest,
+};
+use warden_stack::Configuration;
+
+use crate::cnfg::LocalConfig;
+use warden_middleware::grpc::interceptor::{Intercepted, MyInterceptor};
+
+#[derive(Clone)]
+pub struct Services {
+ pub jetstream: Context,
+}
+
+pub type AppHandle = Arc<AppState>;
+
+#[derive(Clone)]
+pub struct AppState {
+ pub services: Services,
+ pub local_cache: Arc<RwLock<Cache<RuleConfigurationRequest, RoutingConfiguration>>>,
+ pub config: LocalConfig,
+ pub query_routing_client: QueryRoutingClient<Intercepted>,
+}
+
+impl AppState {
+ pub async fn new(services: Services, configuration: Configuration) -> anyhow::Result<Self> {
+ let config: LocalConfig = serde_json::from_value(configuration.misc.clone())?;
+ let channel = Endpoint::new(config.config_endpoint.to_string())?
+ .connect()
+ .await
+ .inspect_err(|e| {
+ error!(
+ endpoint = ?config.config_endpoint,
+ "could not connect to config service: {e}",
+ )
+ })?;
+
+ let query_routing_client = QueryRoutingClient::with_interceptor(channel, MyInterceptor);
+
+ Ok(Self {
+ services,
+ config,
+ local_cache: Arc::new(RwLock::new(Cache::builder().build())),
+ query_routing_client,
+ })
+ }
+}