diff options
Diffstat (limited to 'crates')
-rw-r--r-- | crates/configuration/src/state.rs | 10 | ||||
-rw-r--r-- | crates/router/src/processor/publish.rs | 4 | ||||
-rw-r--r-- | crates/rule-executor/.dockerignore | 5 | ||||
-rw-r--r-- | crates/rule-executor/Dockerfile | 27 | ||||
-rw-r--r-- | crates/rule-executor/rule-executor.toml | 2 | ||||
-rw-r--r-- | crates/rule-executor/src/cnfg.rs | 2 | ||||
-rw-r--r-- | crates/rule-executor/src/processor.rs | 32 | ||||
-rw-r--r-- | crates/rule-executor/src/processor/rule.rs | 20 | ||||
-rw-r--r-- | crates/rule-executor/src/processor/rule/configuration.rs | 42 | ||||
-rw-r--r-- | crates/rule-executor/src/state.rs | 15 | ||||
-rw-r--r-- | crates/warden/src/server/publish.rs | 6 |
11 files changed, 142 insertions, 23 deletions
diff --git a/crates/configuration/src/state.rs b/crates/configuration/src/state.rs index de58d4b..d8f22d5 100644 --- a/crates/configuration/src/state.rs +++ b/crates/configuration/src/state.rs @@ -3,9 +3,11 @@ mod routing; mod rule; use async_nats::jetstream::Context; +use opentelemetry_semantic_conventions::attribute; use sqlx::PgPool; use std::{ops::Deref, sync::Arc}; -use tracing::{instrument, trace}; +use tracing::{Instrument, info_span, instrument, trace}; +use tracing_opentelemetry::OpenTelemetrySpanExt; use warden_core::configuration::ReloadEvent; use warden_stack::{Configuration, cache::RedisManager, redis::AsyncCommands}; @@ -84,10 +86,16 @@ pub async fn publish_reload( event: ReloadEvent, ) -> Result<(), tonic::Status> { trace!("publishing reload event"); + + let span = info_span!("reload config"); + span.set_attribute(attribute::MESSAGING_SYSTEM, "nats"); + span.set_attribute("otel.kind", "producer"); + state .services .jetstream .publish(format!("{prefix}.reload"), event.as_str_name().into()) + .instrument(span) .await .map_err(|e| tonic::Status::internal(e.to_string()))?; diff --git a/crates/router/src/processor/publish.rs b/crates/router/src/processor/publish.rs index 277b674..3158e4d 100644 --- a/crates/router/src/processor/publish.rs +++ b/crates/router/src/processor/publish.rs @@ -32,11 +32,13 @@ pub(crate) async fn to_rule( propagator.inject_context(&cx, &mut injector::HeaderMap(&mut headers)) }); - let span = info_span!("nats.publish"); + let span = info_span!("nats.publish", "otel.kind" = "producer"); span.set_attribute( attribute::MESSAGING_DESTINATION_SUBSCRIPTION_NAME, subject.to_string(), ); + span.set_attribute(attribute::MESSAGING_SYSTEM, "nats"); + state .services .jetstream diff --git a/crates/rule-executor/.dockerignore b/crates/rule-executor/.dockerignore new file mode 100644 index 0000000..c8cd160 --- /dev/null +++ b/crates/rule-executor/.dockerignore @@ -0,0 +1,5 @@ +/target +.env +.git +.github +/contrib diff --git a/crates/rule-executor/Dockerfile b/crates/rule-executor/Dockerfile new file mode 100644 index 0000000..b7561d7 --- /dev/null +++ b/crates/rule-executor/Dockerfile @@ -0,0 +1,27 @@ +FROM rust:1.89.0-slim AS builder + +ENV SQLX_OFFLINE=true + +RUN rustup target add x86_64-unknown-linux-musl +RUN apt update && apt install -y musl-tools musl-dev protobuf-compiler curl +RUN update-ca-certificates + +WORKDIR /usr/src/app + +RUN mkdir -p crates + +COPY ./.sqlx .sqlx +COPY ./crates/rule-executor crates/rule-executor +COPY ./lib lib +COPY ./Cargo.toml . +COPY ./Cargo.lock . + +RUN cargo fetch + +COPY ./proto proto + +RUN cargo build --target x86_64-unknown-linux-musl --release + +FROM scratch +COPY --from=builder /usr/src/app/target/x86_64-unknown-linux-musl/release/rule-executor ./ +CMD [ "./rule-executor" ] diff --git a/crates/rule-executor/rule-executor.toml b/crates/rule-executor/rule-executor.toml index dd64828..69303bb 100644 --- a/crates/rule-executor/rule-executor.toml +++ b/crates/rule-executor/rule-executor.toml @@ -2,7 +2,7 @@ env = "development" [monitoring] -log-level = "warden_rule=trace,info" +log-level = "rule_executor=trace,info" opentelemetry-endpoint = "http://localhost:4317" loki-endpoint = "http://localhost:3100" diff --git a/crates/rule-executor/src/cnfg.rs b/crates/rule-executor/src/cnfg.rs index eac1c2d..bdf3520 100644 --- a/crates/rule-executor/src/cnfg.rs +++ b/crates/rule-executor/src/cnfg.rs @@ -12,7 +12,6 @@ pub struct LocalConfig { #[derive(Deserialize, Clone)] #[serde(rename_all = "kebab-case")] pub struct Nats { - pub name: Arc<str>, pub subjects: Arc<[String]>, pub durable_name: Arc<str>, pub destination_prefix: Arc<str>, @@ -20,6 +19,7 @@ pub struct Nats { } #[derive(Deserialize, Clone)] +#[serde(rename_all = "kebab-case")] pub struct ConfigNats { pub stream: Arc<str>, pub reload_subject: Arc<str>, diff --git a/crates/rule-executor/src/processor.rs b/crates/rule-executor/src/processor.rs index 67d0d15..1efe731 100644 --- a/crates/rule-executor/src/processor.rs +++ b/crates/rule-executor/src/processor.rs @@ -9,9 +9,9 @@ use async_nats::jetstream::{ Context, consumer::{Consumer, pull}, }; -use futures_util::{future, StreamExt}; +use futures_util::{StreamExt, future}; use tokio::signal; -use tracing::trace; +use tracing::{error, trace, warn}; use warden_stack::{Configuration, tracing::SdkTracerProvider}; use crate::{ @@ -35,7 +35,6 @@ pub async fn serve( } async fn run(state: AppHandle) -> anyhow::Result<()> { - let config = Arc::clone(&state); let consumer = get_or_create_stream(&state.services.jetstream, &state.config.nats).await?; let limit = None; @@ -45,13 +44,18 @@ async fn run(state: AppHandle) -> anyhow::Result<()> { .await? .for_each_concurrent(limit, |message| { let state = Arc::clone(&state); - // tokio::spawn(async move { - // if let Ok(message) = message - // && let Err(e) = route::route(message, Arc::clone(&state)).await - // { - // error!("{}", e.to_string()); - // } - // }); + tokio::spawn(async move { + match message { + Ok(message) => { + if let Err(e) = rule::process_rule(message, state).await { + error!("{e:?}"); + } + } + Err(e) => { + warn!("{e:?}"); + } + } + }); future::ready(()) }) .await; @@ -63,10 +67,14 @@ async fn get_or_create_stream( jetstream: &Context, nats: &Nats, ) -> anyhow::Result<Consumer<pull::Config>> { - trace!(name = ?nats.name, "getting or creating stream"); + let name = format!("{}.v{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION")) + .replace(".", "") + .replace("_", ""); + trace!(name = name, subjects = ?nats.subjects, "getting or creating stream"); + let stream = jetstream .get_or_create_stream(async_nats::jetstream::stream::Config { - name: format!("{}.v{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION")), + name, subjects: nats.subjects.iter().map(Into::into).collect(), ..Default::default() }) diff --git a/crates/rule-executor/src/processor/rule.rs b/crates/rule-executor/src/processor/rule.rs index 8b13789..8168c5a 100644 --- a/crates/rule-executor/src/processor/rule.rs +++ b/crates/rule-executor/src/processor/rule.rs @@ -1 +1,21 @@ +use std::sync::Arc; +use anyhow::Result; +mod configuration; + +use async_nats::jetstream::Message; +use warden_core::configuration::rule::RuleConfigurationRequest; + +use crate::state::AppHandle; + +pub async fn process_rule(message: Message, state: AppHandle) -> Result<()> { + let req = create_configuration_request(&message); + + let rule_configuration = configuration::get_configuration(req, Arc::clone(&state)).await?; + + Ok(()) +} + +fn create_configuration_request(message: &Message) -> RuleConfigurationRequest { + todo!() +} diff --git a/crates/rule-executor/src/processor/rule/configuration.rs b/crates/rule-executor/src/processor/rule/configuration.rs new file mode 100644 index 0000000..6e11248 --- /dev/null +++ b/crates/rule-executor/src/processor/rule/configuration.rs @@ -0,0 +1,42 @@ +use anyhow::{Result, anyhow}; +use tracing::{Instrument, instrument, trace, trace_span}; +use warden_core::configuration::rule::{RuleConfiguration, RuleConfigurationRequest}; + +use crate::state::AppHandle; + +#[instrument(skip(state))] +pub(super) async fn get_configuration( + request: RuleConfigurationRequest, + state: AppHandle, +) -> Result<RuleConfiguration> { + trace!("checking cache for rule configuration"); + let cache = state.local_cache.read().await; + let config = cache.get(&request).await; + if let Some(config) = config { + trace!("cache hit"); + return Ok(config); + } + trace!("cache miss, asking config service"); + + let mut client = state.query_rule_client.clone(); + + let span = trace_span!( + "get.rule.config", + "otel.kind" = "client", + "rpc.service" = "configuration" + ); + let resp = client + .get_rule_configuration(request.clone()) + .instrument(span) + .await? + .into_inner(); + + let config = resp + .configuration + .ok_or_else(|| anyhow!("missing configuration"))?; + + let mut cache = state.local_cache.write().await; + cache.insert(request, config.clone()).await; + + Ok(config) +} diff --git a/crates/rule-executor/src/state.rs b/crates/rule-executor/src/state.rs index ec59519..efad4ea 100644 --- a/crates/rule-executor/src/state.rs +++ b/crates/rule-executor/src/state.rs @@ -5,9 +5,9 @@ use moka::future::Cache; use tokio::sync::RwLock; use tonic::transport::Endpoint; use tracing::error; -use warden_core::configuration::{ - routing::{RoutingConfiguration, query_routing_client::QueryRoutingClient}, - rule::RuleConfigurationRequest, +use warden_core::configuration::rule::{ + RuleConfiguration, RuleConfigurationRequest, + query_rule_configuration_client::QueryRuleConfigurationClient, }; use warden_stack::Configuration; @@ -24,9 +24,9 @@ pub type AppHandle = Arc<AppState>; #[derive(Clone)] pub struct AppState { pub services: Services, - pub local_cache: Arc<RwLock<Cache<RuleConfigurationRequest, RoutingConfiguration>>>, + pub local_cache: Arc<RwLock<Cache<RuleConfigurationRequest, RuleConfiguration>>>, pub config: LocalConfig, - pub query_routing_client: QueryRoutingClient<Intercepted>, + pub query_rule_client: QueryRuleConfigurationClient<Intercepted>, } impl AppState { @@ -42,13 +42,14 @@ impl AppState { ) })?; - let query_routing_client = QueryRoutingClient::with_interceptor(channel, MyInterceptor); + let query_rule_client = + QueryRuleConfigurationClient::with_interceptor(channel, MyInterceptor); Ok(Self { services, config, local_cache: Arc::new(RwLock::new(Cache::builder().build())), - query_routing_client, + query_rule_client, }) } } diff --git a/crates/warden/src/server/publish.rs b/crates/warden/src/server/publish.rs index 89922d4..07844ec 100644 --- a/crates/warden/src/server/publish.rs +++ b/crates/warden/src/server/publish.rs @@ -27,6 +27,12 @@ pub async fn publish_message(state: &AppHandle, payload: Payload, msg_id: &str) attribute::MESSAGING_DESTINATION_SUBSCRIPTION_NAME, subject.to_string(), ); + span.set_attribute( + attribute::MESSAGING_DESTINATION_SUBSCRIPTION_NAME, + subject.to_string(), + ); + span.set_attribute(attribute::MESSAGING_SYSTEM, "nats"); + trace!(%msg_id, "publishing message"); state |