aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/dockerfile.yaml7
-rw-r--r--crates/configuration/src/state.rs10
-rw-r--r--crates/router/src/processor/publish.rs4
-rw-r--r--crates/rule-executor/.dockerignore5
-rw-r--r--crates/rule-executor/Dockerfile27
-rw-r--r--crates/rule-executor/rule-executor.toml2
-rw-r--r--crates/rule-executor/src/cnfg.rs2
-rw-r--r--crates/rule-executor/src/processor.rs32
-rw-r--r--crates/rule-executor/src/processor/rule.rs20
-rw-r--r--crates/rule-executor/src/processor/rule/configuration.rs42
-rw-r--r--crates/rule-executor/src/state.rs15
-rw-r--r--crates/warden/src/server/publish.rs6
12 files changed, 148 insertions, 24 deletions
diff --git a/.github/workflows/dockerfile.yaml b/.github/workflows/dockerfile.yaml
index c2823ea..3d2435d 100644
--- a/.github/workflows/dockerfile.yaml
+++ b/.github/workflows/dockerfile.yaml
@@ -15,7 +15,12 @@ jobs:
strategy:
fail-fast: true
matrix:
- crate: ["pseudonyms", "warden", "configuration", "router"]
+ crate:
+ - pseudonyms
+ - warden
+ - configuration
+ - router
+ - rule-executor
name: build / ${{ matrix.crate }}
steps:
- uses: actions/checkout@v4
diff --git a/crates/configuration/src/state.rs b/crates/configuration/src/state.rs
index de58d4b..d8f22d5 100644
--- a/crates/configuration/src/state.rs
+++ b/crates/configuration/src/state.rs
@@ -3,9 +3,11 @@ mod routing;
mod rule;
use async_nats::jetstream::Context;
+use opentelemetry_semantic_conventions::attribute;
use sqlx::PgPool;
use std::{ops::Deref, sync::Arc};
-use tracing::{instrument, trace};
+use tracing::{Instrument, info_span, instrument, trace};
+use tracing_opentelemetry::OpenTelemetrySpanExt;
use warden_core::configuration::ReloadEvent;
use warden_stack::{Configuration, cache::RedisManager, redis::AsyncCommands};
@@ -84,10 +86,16 @@ pub async fn publish_reload(
event: ReloadEvent,
) -> Result<(), tonic::Status> {
trace!("publishing reload event");
+
+ let span = info_span!("reload config");
+ span.set_attribute(attribute::MESSAGING_SYSTEM, "nats");
+ span.set_attribute("otel.kind", "producer");
+
state
.services
.jetstream
.publish(format!("{prefix}.reload"), event.as_str_name().into())
+ .instrument(span)
.await
.map_err(|e| tonic::Status::internal(e.to_string()))?;
diff --git a/crates/router/src/processor/publish.rs b/crates/router/src/processor/publish.rs
index 277b674..3158e4d 100644
--- a/crates/router/src/processor/publish.rs
+++ b/crates/router/src/processor/publish.rs
@@ -32,11 +32,13 @@ pub(crate) async fn to_rule(
propagator.inject_context(&cx, &mut injector::HeaderMap(&mut headers))
});
- let span = info_span!("nats.publish");
+ let span = info_span!("nats.publish", "otel.kind" = "producer");
span.set_attribute(
attribute::MESSAGING_DESTINATION_SUBSCRIPTION_NAME,
subject.to_string(),
);
+ span.set_attribute(attribute::MESSAGING_SYSTEM, "nats");
+
state
.services
.jetstream
diff --git a/crates/rule-executor/.dockerignore b/crates/rule-executor/.dockerignore
new file mode 100644
index 0000000..c8cd160
--- /dev/null
+++ b/crates/rule-executor/.dockerignore
@@ -0,0 +1,5 @@
+/target
+.env
+.git
+.github
+/contrib
diff --git a/crates/rule-executor/Dockerfile b/crates/rule-executor/Dockerfile
new file mode 100644
index 0000000..b7561d7
--- /dev/null
+++ b/crates/rule-executor/Dockerfile
@@ -0,0 +1,27 @@
+FROM rust:1.89.0-slim AS builder
+
+ENV SQLX_OFFLINE=true
+
+RUN rustup target add x86_64-unknown-linux-musl
+RUN apt update && apt install -y musl-tools musl-dev protobuf-compiler curl
+RUN update-ca-certificates
+
+WORKDIR /usr/src/app
+
+RUN mkdir -p crates
+
+COPY ./.sqlx .sqlx
+COPY ./crates/rule-executor crates/rule-executor
+COPY ./lib lib
+COPY ./Cargo.toml .
+COPY ./Cargo.lock .
+
+RUN cargo fetch
+
+COPY ./proto proto
+
+RUN cargo build --target x86_64-unknown-linux-musl --release
+
+FROM scratch
+COPY --from=builder /usr/src/app/target/x86_64-unknown-linux-musl/release/rule-executor ./
+CMD [ "./rule-executor" ]
diff --git a/crates/rule-executor/rule-executor.toml b/crates/rule-executor/rule-executor.toml
index dd64828..69303bb 100644
--- a/crates/rule-executor/rule-executor.toml
+++ b/crates/rule-executor/rule-executor.toml
@@ -2,7 +2,7 @@
env = "development"
[monitoring]
-log-level = "warden_rule=trace,info"
+log-level = "rule_executor=trace,info"
opentelemetry-endpoint = "http://localhost:4317"
loki-endpoint = "http://localhost:3100"
diff --git a/crates/rule-executor/src/cnfg.rs b/crates/rule-executor/src/cnfg.rs
index eac1c2d..bdf3520 100644
--- a/crates/rule-executor/src/cnfg.rs
+++ b/crates/rule-executor/src/cnfg.rs
@@ -12,7 +12,6 @@ pub struct LocalConfig {
#[derive(Deserialize, Clone)]
#[serde(rename_all = "kebab-case")]
pub struct Nats {
- pub name: Arc<str>,
pub subjects: Arc<[String]>,
pub durable_name: Arc<str>,
pub destination_prefix: Arc<str>,
@@ -20,6 +19,7 @@ pub struct Nats {
}
#[derive(Deserialize, Clone)]
+#[serde(rename_all = "kebab-case")]
pub struct ConfigNats {
pub stream: Arc<str>,
pub reload_subject: Arc<str>,
diff --git a/crates/rule-executor/src/processor.rs b/crates/rule-executor/src/processor.rs
index 67d0d15..1efe731 100644
--- a/crates/rule-executor/src/processor.rs
+++ b/crates/rule-executor/src/processor.rs
@@ -9,9 +9,9 @@ use async_nats::jetstream::{
Context,
consumer::{Consumer, pull},
};
-use futures_util::{future, StreamExt};
+use futures_util::{StreamExt, future};
use tokio::signal;
-use tracing::trace;
+use tracing::{error, trace, warn};
use warden_stack::{Configuration, tracing::SdkTracerProvider};
use crate::{
@@ -35,7 +35,6 @@ pub async fn serve(
}
async fn run(state: AppHandle) -> anyhow::Result<()> {
- let config = Arc::clone(&state);
let consumer = get_or_create_stream(&state.services.jetstream, &state.config.nats).await?;
let limit = None;
@@ -45,13 +44,18 @@ async fn run(state: AppHandle) -> anyhow::Result<()> {
.await?
.for_each_concurrent(limit, |message| {
let state = Arc::clone(&state);
- // tokio::spawn(async move {
- // if let Ok(message) = message
- // && let Err(e) = route::route(message, Arc::clone(&state)).await
- // {
- // error!("{}", e.to_string());
- // }
- // });
+ tokio::spawn(async move {
+ match message {
+ Ok(message) => {
+ if let Err(e) = rule::process_rule(message, state).await {
+ error!("{e:?}");
+ }
+ }
+ Err(e) => {
+ warn!("{e:?}");
+ }
+ }
+ });
future::ready(())
})
.await;
@@ -63,10 +67,14 @@ async fn get_or_create_stream(
jetstream: &Context,
nats: &Nats,
) -> anyhow::Result<Consumer<pull::Config>> {
- trace!(name = ?nats.name, "getting or creating stream");
+ let name = format!("{}.v{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"))
+ .replace(".", "")
+ .replace("_", "");
+ trace!(name = name, subjects = ?nats.subjects, "getting or creating stream");
+
let stream = jetstream
.get_or_create_stream(async_nats::jetstream::stream::Config {
- name: format!("{}.v{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION")),
+ name,
subjects: nats.subjects.iter().map(Into::into).collect(),
..Default::default()
})
diff --git a/crates/rule-executor/src/processor/rule.rs b/crates/rule-executor/src/processor/rule.rs
index 8b13789..8168c5a 100644
--- a/crates/rule-executor/src/processor/rule.rs
+++ b/crates/rule-executor/src/processor/rule.rs
@@ -1 +1,21 @@
+use std::sync::Arc;
+use anyhow::Result;
+mod configuration;
+
+use async_nats::jetstream::Message;
+use warden_core::configuration::rule::RuleConfigurationRequest;
+
+use crate::state::AppHandle;
+
+pub async fn process_rule(message: Message, state: AppHandle) -> Result<()> {
+ let req = create_configuration_request(&message);
+
+ let rule_configuration = configuration::get_configuration(req, Arc::clone(&state)).await?;
+
+ Ok(())
+}
+
+fn create_configuration_request(message: &Message) -> RuleConfigurationRequest {
+ todo!()
+}
diff --git a/crates/rule-executor/src/processor/rule/configuration.rs b/crates/rule-executor/src/processor/rule/configuration.rs
new file mode 100644
index 0000000..6e11248
--- /dev/null
+++ b/crates/rule-executor/src/processor/rule/configuration.rs
@@ -0,0 +1,42 @@
+use anyhow::{Result, anyhow};
+use tracing::{Instrument, instrument, trace, trace_span};
+use warden_core::configuration::rule::{RuleConfiguration, RuleConfigurationRequest};
+
+use crate::state::AppHandle;
+
+#[instrument(skip(state))]
+pub(super) async fn get_configuration(
+ request: RuleConfigurationRequest,
+ state: AppHandle,
+) -> Result<RuleConfiguration> {
+ trace!("checking cache for rule configuration");
+ let cache = state.local_cache.read().await;
+ let config = cache.get(&request).await;
+ if let Some(config) = config {
+ trace!("cache hit");
+ return Ok(config);
+ }
+ trace!("cache miss, asking config service");
+
+ let mut client = state.query_rule_client.clone();
+
+ let span = trace_span!(
+ "get.rule.config",
+ "otel.kind" = "client",
+ "rpc.service" = "configuration"
+ );
+ let resp = client
+ .get_rule_configuration(request.clone())
+ .instrument(span)
+ .await?
+ .into_inner();
+
+ let config = resp
+ .configuration
+ .ok_or_else(|| anyhow!("missing configuration"))?;
+
+ let mut cache = state.local_cache.write().await;
+ cache.insert(request, config.clone()).await;
+
+ Ok(config)
+}
diff --git a/crates/rule-executor/src/state.rs b/crates/rule-executor/src/state.rs
index ec59519..efad4ea 100644
--- a/crates/rule-executor/src/state.rs
+++ b/crates/rule-executor/src/state.rs
@@ -5,9 +5,9 @@ use moka::future::Cache;
use tokio::sync::RwLock;
use tonic::transport::Endpoint;
use tracing::error;
-use warden_core::configuration::{
- routing::{RoutingConfiguration, query_routing_client::QueryRoutingClient},
- rule::RuleConfigurationRequest,
+use warden_core::configuration::rule::{
+ RuleConfiguration, RuleConfigurationRequest,
+ query_rule_configuration_client::QueryRuleConfigurationClient,
};
use warden_stack::Configuration;
@@ -24,9 +24,9 @@ pub type AppHandle = Arc<AppState>;
#[derive(Clone)]
pub struct AppState {
pub services: Services,
- pub local_cache: Arc<RwLock<Cache<RuleConfigurationRequest, RoutingConfiguration>>>,
+ pub local_cache: Arc<RwLock<Cache<RuleConfigurationRequest, RuleConfiguration>>>,
pub config: LocalConfig,
- pub query_routing_client: QueryRoutingClient<Intercepted>,
+ pub query_rule_client: QueryRuleConfigurationClient<Intercepted>,
}
impl AppState {
@@ -42,13 +42,14 @@ impl AppState {
)
})?;
- let query_routing_client = QueryRoutingClient::with_interceptor(channel, MyInterceptor);
+ let query_rule_client =
+ QueryRuleConfigurationClient::with_interceptor(channel, MyInterceptor);
Ok(Self {
services,
config,
local_cache: Arc::new(RwLock::new(Cache::builder().build())),
- query_routing_client,
+ query_rule_client,
})
}
}
diff --git a/crates/warden/src/server/publish.rs b/crates/warden/src/server/publish.rs
index 89922d4..07844ec 100644
--- a/crates/warden/src/server/publish.rs
+++ b/crates/warden/src/server/publish.rs
@@ -27,6 +27,12 @@ pub async fn publish_message(state: &AppHandle, payload: Payload, msg_id: &str)
attribute::MESSAGING_DESTINATION_SUBSCRIPTION_NAME,
subject.to_string(),
);
+ span.set_attribute(
+ attribute::MESSAGING_DESTINATION_SUBSCRIPTION_NAME,
+ subject.to_string(),
+ );
+ span.set_attribute(attribute::MESSAGING_SYSTEM, "nats");
+
trace!(%msg_id, "publishing message");
state