From 1968002d656383069a386bd874c9f0cc83e3116e Mon Sep 17 00:00:00 2001 From: rtkay123 Date: Fri, 15 Aug 2025 19:36:22 +0200 Subject: feat(rule-exec): receive messages --- Cargo.lock | 29 +++++++ Cargo.toml | 1 + crates/configuration/src/server.rs | 3 +- crates/configuration/src/server/grpc_svc.rs | 24 ------ crates/router/Cargo.toml | 3 +- crates/router/src/processor.rs | 26 ++++-- crates/router/src/processor/grpc.rs | 27 ------ crates/router/src/state.rs | 6 +- crates/rule-executor/Cargo.toml | 45 ++++++++++ crates/rule-executor/rule-executor.toml | 33 ++++++++ crates/rule-executor/src/cnfg.rs | 26 ++++++ crates/rule-executor/src/main.rs | 60 ++++++++++++++ crates/rule-executor/src/processor.rs | 114 ++++++++++++++++++++++++++ crates/rule-executor/src/processor/publish.rs | 1 + crates/rule-executor/src/processor/reload.rs | 59 +++++++++++++ crates/rule-executor/src/processor/rule.rs | 1 + crates/rule-executor/src/state.rs | 54 ++++++++++++ crates/warden/src/server.rs | 1 - crates/warden/src/server/grpc.rs | 28 ------- crates/warden/src/state.rs | 7 +- lib/warden-core/src/configuration/conv.rs | 2 +- lib/warden-middleware/Cargo.toml | 2 + lib/warden-middleware/src/grpc.rs | 28 +++++++ lib/warden-middleware/src/lib.rs | 1 + 24 files changed, 481 insertions(+), 100 deletions(-) delete mode 100644 crates/configuration/src/server/grpc_svc.rs delete mode 100644 crates/router/src/processor/grpc.rs create mode 100644 crates/rule-executor/Cargo.toml create mode 100644 crates/rule-executor/rule-executor.toml create mode 100644 crates/rule-executor/src/cnfg.rs create mode 100644 crates/rule-executor/src/main.rs create mode 100644 crates/rule-executor/src/processor.rs create mode 100644 crates/rule-executor/src/processor/publish.rs create mode 100644 crates/rule-executor/src/processor/reload.rs create mode 100644 crates/rule-executor/src/processor/rule.rs create mode 100644 crates/rule-executor/src/state.rs delete mode 100644 crates/warden/src/server/grpc.rs create mode 100644 lib/warden-middleware/src/grpc.rs diff --git a/Cargo.lock b/Cargo.lock index 4fcbeae..9410446 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2448,6 +2448,32 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rule-executor" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-nats", + "clap", + "config", + "futures-util", + "moka", + "opentelemetry", + "opentelemetry-semantic-conventions", + "prost 0.14.1", + "serde", + "serde_json", + "time", + "tokio", + "tonic 0.14.1", + "tracing", + "tracing-opentelemetry", + "uuid", + "warden-core", + "warden-middleware", + "warden-stack", +] + [[package]] name = "rust-embed" version = "8.7.2" @@ -3994,8 +4020,10 @@ dependencies = [ "axum", "metrics", "metrics-exporter-prometheus", + "tonic 0.14.1", "tower-http", "tracing", + "warden-stack", ] [[package]] @@ -4040,6 +4068,7 @@ dependencies = [ "tracing-opentelemetry", "uuid", "warden-core", + "warden-middleware", "warden-stack", ] diff --git a/Cargo.toml b/Cargo.toml index fee837d..3da737b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ config = { version = "0.15.13", default-features = false } futures-util = { version = "0.3.31", default-features = false } metrics = { version = "0.24.2", default-features = false } metrics-exporter-prometheus = { version = "0.17.2", default-features = false } +moka = "0.12.10" opentelemetry = { version = "0.30.0", default-features = false } opentelemetry-http = "0.30.0" opentelemetry-otlp = { version = "0.30.0", default-features = false } diff --git a/crates/configuration/src/server.rs b/crates/configuration/src/server.rs index 28d6dd8..e31fc60 100644 --- a/crates/configuration/src/server.rs +++ b/crates/configuration/src/server.rs @@ -1,10 +1,8 @@ pub mod error; -pub mod grpc_svc; mod http_svc; pub mod reload_stream; mod version; -use grpc_svc::interceptor::MyInterceptor; use http_svc::build_router; use tonic::service::Routes; use tower_http::trace::TraceLayer; @@ -20,6 +18,7 @@ use warden_core::{ }, }, }; +use warden_middleware::grpc::interceptor::MyInterceptor; use crate::{server::error::AppError, state::AppHandle}; diff --git a/crates/configuration/src/server/grpc_svc.rs b/crates/configuration/src/server/grpc_svc.rs deleted file mode 100644 index 42aa871..0000000 --- a/crates/configuration/src/server/grpc_svc.rs +++ /dev/null @@ -1,24 +0,0 @@ -pub mod interceptor { - use opentelemetry::global; - use tonic::{Status, service::Interceptor}; - use tracing::Span; - use tracing_opentelemetry::OpenTelemetrySpanExt; - use warden_stack::tracing::telemetry::tonic::extractor; - - #[derive(Clone, Copy)] - pub struct MyInterceptor; - - impl Interceptor for MyInterceptor { - fn call(&mut self, request: tonic::Request<()>) -> Result, Status> { - let span = Span::current(); - - let cx = global::get_text_map_propagator(|propagator| { - propagator.extract(&extractor::MetadataMap(request.metadata())) - }); - - span.set_parent(cx); - - Ok(request) - } - } -} diff --git a/crates/router/Cargo.toml b/crates/router/Cargo.toml index 0b7b232..08ccf71 100644 --- a/crates/router/Cargo.toml +++ b/crates/router/Cargo.toml @@ -14,7 +14,7 @@ bytes = "1.10.1" clap = { workspace = true, features = ["derive"] } config = { workspace = true, features = ["convert-case", "toml"] } futures-util.workspace = true -moka = { version = "0.12.10", features = ["future"] } +moka = { workspace = true, features = ["future"] } opentelemetry.workspace = true opentelemetry-semantic-conventions.workspace = true prost.workspace = true @@ -34,6 +34,7 @@ warden-core = { workspace = true, features = [ "configuration", "serde-time" ] } +warden-middleware.workspace = true [dependencies.warden-stack] workspace = true diff --git a/crates/router/src/processor.rs b/crates/router/src/processor.rs index 9afe726..c593393 100644 --- a/crates/router/src/processor.rs +++ b/crates/router/src/processor.rs @@ -1,4 +1,3 @@ -pub mod grpc; mod load; mod publish; mod reload; @@ -11,7 +10,7 @@ use async_nats::jetstream::{ Context, consumer::{Consumer, pull}, }; -use futures_util::StreamExt; +use futures_util::{StreamExt, future}; use tokio::signal; use tracing::{error, trace}; use warden_stack::{Configuration, tracing::SdkTracerProvider}; @@ -46,11 +45,24 @@ async fn run(state: AppHandle) -> anyhow::Result<()> { let consumer = consumer?; // Consume messages from the consumer - while let Some(Ok(message)) = consumer.messages().await?.next().await { - if let Err(e) = route::route(message, Arc::clone(&state)).await { - error!("{}", e.to_string()); - } - } + + let limit = None; + + consumer + .messages() + .await? + .for_each_concurrent(limit, |message| { + let state = Arc::clone(&state); + tokio::spawn(async move { + if let Ok(message) = message + && let Err(e) = route::route(message, Arc::clone(&state)).await + { + error!("{}", e.to_string()); + } + }); + future::ready(()) + }) + .await; Ok(()) } diff --git a/crates/router/src/processor/grpc.rs b/crates/router/src/processor/grpc.rs deleted file mode 100644 index 344f2a1..0000000 --- a/crates/router/src/processor/grpc.rs +++ /dev/null @@ -1,27 +0,0 @@ -pub mod interceptor { - use opentelemetry::global; - use tonic::{ - Status, - service::{Interceptor, interceptor::InterceptedService}, - transport::Channel, - }; - use tracing::Span; - use tracing_opentelemetry::OpenTelemetrySpanExt; - use warden_stack::tracing::telemetry::tonic::injector; - - pub type Intercepted = InterceptedService; - - #[derive(Clone, Copy)] - pub struct MyInterceptor; - - impl Interceptor for MyInterceptor { - fn call(&mut self, mut request: tonic::Request<()>) -> Result, Status> { - let cx = Span::current().context(); - global::get_text_map_propagator(|propagator| { - propagator.inject_context(&cx, &mut injector::MetadataMap(request.metadata_mut())) - }); - - Ok(request) - } - } -} diff --git a/crates/router/src/state.rs b/crates/router/src/state.rs index 4ede2de..a0d9228 100644 --- a/crates/router/src/state.rs +++ b/crates/router/src/state.rs @@ -8,12 +8,10 @@ use tracing::error; use warden_core::configuration::routing::{ RoutingConfiguration, query_routing_client::QueryRoutingClient, }; +use warden_middleware::grpc::interceptor::{Intercepted, MyInterceptor}; use warden_stack::Configuration; -use crate::{ - cnfg::LocalConfig, - processor::grpc::interceptor::{Intercepted, MyInterceptor}, -}; +use crate::cnfg::LocalConfig; #[derive(Clone)] pub struct Services { diff --git a/crates/rule-executor/Cargo.toml b/crates/rule-executor/Cargo.toml new file mode 100644 index 0000000..3bb9561 --- /dev/null +++ b/crates/rule-executor/Cargo.toml @@ -0,0 +1,45 @@ +[package] +name = "rule-executor" +version = "0.1.0" +edition = "2024" +license.workspace = true +homepage.workspace = true +documentation.workspace = true +description.workspace = true + +[dependencies] +anyhow.workspace = true +async-nats.workspace = true +clap = { workspace = true, features = ["derive"] } +config = { workspace = true, features = ["convert-case", "toml"] } +futures-util.workspace = true +moka = { workspace = true, features = ["future"] } +opentelemetry.workspace = true +opentelemetry-semantic-conventions.workspace = true +prost.workspace = true +serde = { workspace = true, features = ["derive"] } +serde_json.workspace = true +time = { workspace = true, features = ["serde"] } +tokio = { workspace = true, features = [ + "macros", + "rt-multi-thread", + "signal", +] } +tonic.workspace = true +tracing.workspace = true +tracing-opentelemetry.workspace = true +uuid = { workspace = true, features = ["v7"] } +warden-core = { workspace = true, features = [ + "message", + "configuration", + "serde", + "time", +] } +warden-stack = { workspace = true, features = [ + "nats-jetstream", + "opentelemetry", + "postgres", + "opentelemetry-tonic", + "tracing-loki", +] } +warden-middleware.workspace = true diff --git a/crates/rule-executor/rule-executor.toml b/crates/rule-executor/rule-executor.toml new file mode 100644 index 0000000..dd64828 --- /dev/null +++ b/crates/rule-executor/rule-executor.toml @@ -0,0 +1,33 @@ +[application] +env = "development" + +[monitoring] +log-level = "warden_rule=trace,info" +opentelemetry-endpoint = "http://localhost:4317" +loki-endpoint = "http://localhost:3100" + +[misc] +config-endpoint = "http://localhost:1304" + +[misc.nats] +stream-name = "rules" +subjects = ["rule.>"] +durable-name = "rules" +destination-prefix = "typology-rule" + +[misc.nats.config] +stream = "configuration" +reload-subject = "configuration.reload" + +[database] +pool_size = 100 +port = 5432 +name = "pseudonyms" +host = "localhost" +password = "password" +user = "postgres" + +[nats] +hosts = ["nats://localhost:4222"] + +# vim:ft=toml diff --git a/crates/rule-executor/src/cnfg.rs b/crates/rule-executor/src/cnfg.rs new file mode 100644 index 0000000..eac1c2d --- /dev/null +++ b/crates/rule-executor/src/cnfg.rs @@ -0,0 +1,26 @@ +use std::sync::Arc; + +use serde::Deserialize; + +#[derive(Deserialize, Clone)] +#[serde(rename_all = "kebab-case")] +pub struct LocalConfig { + pub config_endpoint: Arc, + pub nats: Nats, +} + +#[derive(Deserialize, Clone)] +#[serde(rename_all = "kebab-case")] +pub struct Nats { + pub name: Arc, + pub subjects: Arc<[String]>, + pub durable_name: Arc, + pub destination_prefix: Arc, + pub config: ConfigNats, +} + +#[derive(Deserialize, Clone)] +pub struct ConfigNats { + pub stream: Arc, + pub reload_subject: Arc, +} diff --git a/crates/rule-executor/src/main.rs b/crates/rule-executor/src/main.rs new file mode 100644 index 0000000..18c9222 --- /dev/null +++ b/crates/rule-executor/src/main.rs @@ -0,0 +1,60 @@ +mod cnfg; +mod processor; +mod state; + +use anyhow::Result; +use clap::Parser; +use tracing::error; +use warden_stack::{Configuration, Services, tracing::Tracing}; + +/// rule-executor +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +struct Args { + /// Path to config file + #[arg(short, long)] + config_file: Option, +} + +#[tokio::main] +async fn main() -> Result<()> { + let args = Args::parse(); + let config = include_str!("../rule-executor.toml"); + + let mut config = config::Config::builder() + .add_source(config::File::from_str(config, config::FileFormat::Toml)); + + if let Some(cf) = args.config_file.as_ref().and_then(|v| v.to_str()) { + config = config.add_source(config::File::new(cf, config::FileFormat::Toml)); + }; + + let mut config: Configuration = config.build()?.try_deserialize()?; + config.application.name = env!("CARGO_CRATE_NAME").into(); + config.application.version = env!("CARGO_PKG_VERSION").into(); + + let tracing = Tracing::builder() + .opentelemetry(&config.application, &config.monitoring)? + .loki(&config.application, &config.monitoring)? + .build(&config.monitoring); + + let provider = tracing.otel_provider; + + tokio::spawn(tracing.loki_task); + + let mut services = Services::builder() + .nats_jetstream(&config.nats) + .await + .inspect_err(|e| error!("nats: {e}"))? + .build(); + + let jetstream = services + .jetstream + .take() + .ok_or_else(|| anyhow::anyhow!("jetstream is not ready"))?; + + let services = state::Services { jetstream }; + + processor::serve(services, config, provider) + .await + .inspect_err(|e| error!("{e}")) +} diff --git a/crates/rule-executor/src/processor.rs b/crates/rule-executor/src/processor.rs new file mode 100644 index 0000000..67d0d15 --- /dev/null +++ b/crates/rule-executor/src/processor.rs @@ -0,0 +1,114 @@ +mod publish; +mod reload; +mod rule; + +use std::sync::Arc; + +use anyhow::Result; +use async_nats::jetstream::{ + Context, + consumer::{Consumer, pull}, +}; +use futures_util::{future, StreamExt}; +use tokio::signal; +use tracing::trace; +use warden_stack::{Configuration, tracing::SdkTracerProvider}; + +use crate::{ + cnfg::Nats, + state::{AppHandle, AppState, Services}, +}; + +pub async fn serve( + services: Services, + config: Configuration, + provider: SdkTracerProvider, +) -> anyhow::Result<()> { + let state = Arc::new(AppState::new(services, config).await?); + + tokio::select! { + _ = futures_util::future::try_join(reload::reload(Arc::clone(&state)), run(state)) => {} + _ = shutdown_signal(provider) => {} + }; + + Ok(()) +} + +async fn run(state: AppHandle) -> anyhow::Result<()> { + let config = Arc::clone(&state); + let consumer = get_or_create_stream(&state.services.jetstream, &state.config.nats).await?; + + let limit = None; + + consumer + .messages() + .await? + .for_each_concurrent(limit, |message| { + let state = Arc::clone(&state); + // tokio::spawn(async move { + // if let Ok(message) = message + // && let Err(e) = route::route(message, Arc::clone(&state)).await + // { + // error!("{}", e.to_string()); + // } + // }); + future::ready(()) + }) + .await; + + Ok(()) +} + +async fn get_or_create_stream( + jetstream: &Context, + nats: &Nats, +) -> anyhow::Result> { + trace!(name = ?nats.name, "getting or creating stream"); + let stream = jetstream + .get_or_create_stream(async_nats::jetstream::stream::Config { + name: format!("{}.v{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION")), + subjects: nats.subjects.iter().map(Into::into).collect(), + ..Default::default() + }) + .await?; + let durable = nats.durable_name.to_string(); + // Get or create a pull-based consumer + Ok(stream + .get_or_create_consumer( + durable.as_ref(), + async_nats::jetstream::consumer::pull::Config { + durable_name: Some(durable.to_string()), + ..Default::default() + }, + ) + .await?) +} + +async fn shutdown_signal(provider: SdkTracerProvider) -> Result<()> { + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + }; + + #[cfg(unix)] + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("failed to install signal handler") + .recv() + .await; + }; + + #[cfg(not(unix))] + let terminate = std::future::pending::<()>(); + + tokio::select! { + _ = ctrl_c => { + }, + _ = terminate => { + }, + } + let _ = provider.shutdown(); + + Ok(()) +} diff --git a/crates/rule-executor/src/processor/publish.rs b/crates/rule-executor/src/processor/publish.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/crates/rule-executor/src/processor/publish.rs @@ -0,0 +1 @@ + diff --git a/crates/rule-executor/src/processor/reload.rs b/crates/rule-executor/src/processor/reload.rs new file mode 100644 index 0000000..a111948 --- /dev/null +++ b/crates/rule-executor/src/processor/reload.rs @@ -0,0 +1,59 @@ +use async_nats::jetstream::consumer; +use futures_util::StreamExt; +use tracing::{debug, error, info}; +use uuid::Uuid; +use warden_core::configuration::ReloadEvent; + +use crate::state::AppHandle; + +pub async fn reload(state: AppHandle) -> anyhow::Result<()> { + let id = Uuid::now_v7().to_string(); + info!(durable = id, "listening for configuration changes"); + + let durable = &id; + let consumer = state + .services + .jetstream + .get_stream(state.config.nats.config.stream.to_string()) + .await? + .get_or_create_consumer( + durable, + consumer::pull::Config { + durable_name: Some(durable.to_string()), + filter_subject: state.config.nats.config.reload_subject.to_string(), + deliver_policy: consumer::DeliverPolicy::LastPerSubject, + ..Default::default() + }, + ) + .await?; + + let mut messages = consumer.messages().await?; + while let Some(value) = messages.next().await { + match value { + Ok(message) => { + debug!("got reload cache event",); + if let Ok(Some(event)) = String::from_utf8(message.payload.to_vec()) + .map(|value| ReloadEvent::from_str_name(&value)) + { + match event { + // TODO: find exact rule + ReloadEvent::Rule => { + let local_cache = state.local_cache.write().await; + local_cache.invalidate_all(); + let _ = message.ack().await.inspect_err(|e| error!("{e}")); + } + _ => { + debug!(event = ?event, "detected reload event, acknowledging"); + let _ = message.ack().await.inspect_err(|e| error!("{e}")); + } + } + } + } + Err(e) => { + error!("{e}") + } + } + } + + Ok(()) +} diff --git a/crates/rule-executor/src/processor/rule.rs b/crates/rule-executor/src/processor/rule.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/crates/rule-executor/src/processor/rule.rs @@ -0,0 +1 @@ + diff --git a/crates/rule-executor/src/state.rs b/crates/rule-executor/src/state.rs new file mode 100644 index 0000000..ec59519 --- /dev/null +++ b/crates/rule-executor/src/state.rs @@ -0,0 +1,54 @@ +use std::sync::Arc; + +use async_nats::jetstream::Context; +use moka::future::Cache; +use tokio::sync::RwLock; +use tonic::transport::Endpoint; +use tracing::error; +use warden_core::configuration::{ + routing::{RoutingConfiguration, query_routing_client::QueryRoutingClient}, + rule::RuleConfigurationRequest, +}; +use warden_stack::Configuration; + +use crate::cnfg::LocalConfig; +use warden_middleware::grpc::interceptor::{Intercepted, MyInterceptor}; + +#[derive(Clone)] +pub struct Services { + pub jetstream: Context, +} + +pub type AppHandle = Arc; + +#[derive(Clone)] +pub struct AppState { + pub services: Services, + pub local_cache: Arc>>, + pub config: LocalConfig, + pub query_routing_client: QueryRoutingClient, +} + +impl AppState { + pub async fn new(services: Services, configuration: Configuration) -> anyhow::Result { + let config: LocalConfig = serde_json::from_value(configuration.misc.clone())?; + let channel = Endpoint::new(config.config_endpoint.to_string())? + .connect() + .await + .inspect_err(|e| { + error!( + endpoint = ?config.config_endpoint, + "could not connect to config service: {e}", + ) + })?; + + let query_routing_client = QueryRoutingClient::with_interceptor(channel, MyInterceptor); + + Ok(Self { + services, + config, + local_cache: Arc::new(RwLock::new(Cache::builder().build())), + query_routing_client, + }) + } +} diff --git a/crates/warden/src/server.rs b/crates/warden/src/server.rs index 2db9f0f..760138d 100644 --- a/crates/warden/src/server.rs +++ b/crates/warden/src/server.rs @@ -1,4 +1,3 @@ -pub mod grpc; mod publish; mod routes; pub use routes::metrics::metrics_app; diff --git a/crates/warden/src/server/grpc.rs b/crates/warden/src/server/grpc.rs deleted file mode 100644 index f239ddb..0000000 --- a/crates/warden/src/server/grpc.rs +++ /dev/null @@ -1,28 +0,0 @@ -pub mod interceptor { - use tonic::{ - Status, - service::{Interceptor, interceptor::InterceptedService}, - transport::Channel, - }; - use tracing::Span; - use warden_stack::{ - opentelemetry::global, tracing::telemetry::tonic::injector, - tracing_opentelemetry::OpenTelemetrySpanExt, - }; - - pub type Intercepted = InterceptedService; - - #[derive(Clone, Copy)] - pub struct MyInterceptor; - - impl Interceptor for MyInterceptor { - fn call(&mut self, mut request: tonic::Request<()>) -> Result, Status> { - let cx = Span::current().context(); - global::get_text_map_propagator(|propagator| { - propagator.inject_context(&cx, &mut injector::MetadataMap(request.metadata_mut())) - }); - - Ok(request) - } - } -} diff --git a/crates/warden/src/state.rs b/crates/warden/src/state.rs index 628225c..3ebb748 100644 --- a/crates/warden/src/state.rs +++ b/crates/warden/src/state.rs @@ -6,11 +6,8 @@ use tracing::error; use warden_core::pseudonyms::transaction_relationship::mutate_pseudonym_client::MutatePseudonymClient; use warden_stack::{Configuration, cache::RedisManager}; -use crate::{ - cnfg::LocalConfig, - error::AppError, - server::grpc::interceptor::{Intercepted, MyInterceptor}, -}; +use crate::{cnfg::LocalConfig, error::AppError}; +use warden_middleware::grpc::interceptor::{Intercepted, MyInterceptor}; #[derive(Clone)] pub struct AppHandle(Arc); diff --git a/lib/warden-core/src/configuration/conv.rs b/lib/warden-core/src/configuration/conv.rs index c5c7768..02f0d27 100644 --- a/lib/warden-core/src/configuration/conv.rs +++ b/lib/warden-core/src/configuration/conv.rs @@ -103,7 +103,7 @@ impl serde::Serialize for GenericParameter { where S: serde::Serializer, { - let json = serde_json::Value::from(self.0.clone()); + let json = self.0.clone(); json.serialize(serializer) } } diff --git a/lib/warden-middleware/Cargo.toml b/lib/warden-middleware/Cargo.toml index 97c2c88..c68bc69 100644 --- a/lib/warden-middleware/Cargo.toml +++ b/lib/warden-middleware/Cargo.toml @@ -12,8 +12,10 @@ publish = false axum.workspace = true metrics.workspace = true metrics-exporter-prometheus.workspace = true +tonic.workspace = true tower-http = { workspace = true, features = [ "request-id", "trace", ] } tracing.workspace = true +warden-stack = { workspace = true, features = ["opentelemetry-tonic"] } diff --git a/lib/warden-middleware/src/grpc.rs b/lib/warden-middleware/src/grpc.rs new file mode 100644 index 0000000..f239ddb --- /dev/null +++ b/lib/warden-middleware/src/grpc.rs @@ -0,0 +1,28 @@ +pub mod interceptor { + use tonic::{ + Status, + service::{Interceptor, interceptor::InterceptedService}, + transport::Channel, + }; + use tracing::Span; + use warden_stack::{ + opentelemetry::global, tracing::telemetry::tonic::injector, + tracing_opentelemetry::OpenTelemetrySpanExt, + }; + + pub type Intercepted = InterceptedService; + + #[derive(Clone, Copy)] + pub struct MyInterceptor; + + impl Interceptor for MyInterceptor { + fn call(&mut self, mut request: tonic::Request<()>) -> Result, Status> { + let cx = Span::current().context(); + global::get_text_map_propagator(|propagator| { + propagator.inject_context(&cx, &mut injector::MetadataMap(request.metadata_mut())) + }); + + Ok(request) + } + } +} diff --git a/lib/warden-middleware/src/lib.rs b/lib/warden-middleware/src/lib.rs index 6e3a0f4..2fb8df8 100644 --- a/lib/warden-middleware/src/lib.rs +++ b/lib/warden-middleware/src/lib.rs @@ -1,3 +1,4 @@ +pub mod grpc; mod metrics; mod trace_layer; -- cgit v1.2.3