diff options
Diffstat (limited to 'crates/aggregator/src/main.rs')
-rw-r--r-- | crates/aggregator/src/main.rs | 84 |
1 files changed, 84 insertions, 0 deletions
diff --git a/crates/aggregator/src/main.rs b/crates/aggregator/src/main.rs new file mode 100644 index 0000000..62af544 --- /dev/null +++ b/crates/aggregator/src/main.rs @@ -0,0 +1,84 @@ +mod cnfg; +mod processor; +mod state; + +use anyhow::Result; +use clap::Parser; +use tracing::error; +use warden_stack::{Configuration, Services, tracing::Tracing}; + +use crate::state::AppState; + +/// warden-aggregator +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +struct Args { + /// Path to config file + #[arg(short, long)] + config_file: Option<std::path::PathBuf>, +} + +#[tokio::main] +async fn main() -> Result<()> { + let args = Args::parse(); + let config = include_str!("../aggregator.toml"); + + let mut config = config::Config::builder() + .add_source(config::File::from_str(config, config::FileFormat::Toml)); + + if let Some(cf) = args.config_file.as_ref().and_then(|v| v.to_str()) { + config = config.add_source(config::File::new(cf, config::FileFormat::Toml)); + }; + + let mut config: Configuration = config.build()?.try_deserialize()?; + config.application.name = env!("CARGO_CRATE_NAME").into(); + config.application.version = env!("CARGO_PKG_VERSION").into(); + + let tracing = Tracing::builder() + .opentelemetry(&config.application, &config.monitoring)? + .loki(&config.application, &config.monitoring)? + .build(&config.monitoring); + + let provider = tracing.otel_provider; + + tokio::spawn(tracing.loki_task); + + let mut services = Services::builder() + .postgres(&config.database) + .await + .inspect_err(|e| error!("database: {e}"))? + .cache(&config.cache) + .await + .inspect_err(|e| error!("cache: {e}"))? + .nats_jetstream(&config.nats) + .await + .inspect_err(|e| error!("nats: {e}"))? + .build(); + + let postgres = services + .postgres + .take() + .ok_or_else(|| anyhow::anyhow!("database is not ready"))?; + + let cache = services + .cache + .take() + .ok_or_else(|| anyhow::anyhow!("cache is not ready"))?; + + let jetstream = services + .jetstream + .take() + .ok_or_else(|| anyhow::anyhow!("jetstream is not ready"))?; + + let services = state::Services { + postgres, + cache, + jetstream, + }; + + let state = AppState::create(services, &config).await?; + + processor::serve(state, provider).await?; + + Ok(()) +} |