aboutsummaryrefslogtreecommitdiffstats
path: root/crates/router/src/processor.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/router/src/processor.rs')
-rw-r--r--crates/router/src/processor.rs26
1 files changed, 19 insertions, 7 deletions
diff --git a/crates/router/src/processor.rs b/crates/router/src/processor.rs
index 9afe726..c593393 100644
--- a/crates/router/src/processor.rs
+++ b/crates/router/src/processor.rs
@@ -1,4 +1,3 @@
-pub mod grpc;
mod load;
mod publish;
mod reload;
@@ -11,7 +10,7 @@ use async_nats::jetstream::{
Context,
consumer::{Consumer, pull},
};
-use futures_util::StreamExt;
+use futures_util::{StreamExt, future};
use tokio::signal;
use tracing::{error, trace};
use warden_stack::{Configuration, tracing::SdkTracerProvider};
@@ -46,11 +45,24 @@ async fn run(state: AppHandle) -> anyhow::Result<()> {
let consumer = consumer?;
// Consume messages from the consumer
- while let Some(Ok(message)) = consumer.messages().await?.next().await {
- if let Err(e) = route::route(message, Arc::clone(&state)).await {
- error!("{}", e.to_string());
- }
- }
+
+ let limit = None;
+
+ consumer
+ .messages()
+ .await?
+ .for_each_concurrent(limit, |message| {
+ let state = Arc::clone(&state);
+ tokio::spawn(async move {
+ if let Ok(message) = message
+ && let Err(e) = route::route(message, Arc::clone(&state)).await
+ {
+ error!("{}", e.to_string());
+ }
+ });
+ future::ready(())
+ })
+ .await;
Ok(())
}