use std::net::SocketAddr; use std::sync::Arc; use gateway::observability; use gateway::routing; use gateway::storage; use gateway::AppState; #[tokio::main] async fn main() -> anyhow::Result<()> { observability::init_tracing(); let metrics = observability::init_metrics(); let authn = gateway::authn::AuthnConfig::from_env(); let build_version = option_env!("GATEWAY_BUILD_VERSION").unwrap_or("dev"); let build_sha = option_env!("GATEWAY_BUILD_SHA").unwrap_or("unknown"); tracing::info!(build_version, build_sha, "gateway starting"); let addr: SocketAddr = std::env::var("GATEWAY_ADDR") .unwrap_or_else(|_| "0.0.0.0:8080".to_string()) .parse()?; let storage_path = std::env::var("GATEWAY_STORAGE_PATH").unwrap_or_else(|_| "./data/gateway.mdbx".to_string()); if let Some(parent) = std::path::Path::new(&storage_path).parent() { let _ = std::fs::create_dir_all(parent); } let storage = storage::GatewayStorage::open_edge_storage(storage_path, "gateway") .unwrap_or_else(|_| storage::GatewayStorage::new_in_memory()); let routing_source: Arc = if let Ok(path) = std::env::var("GATEWAY_ROUTING_FILE") { Arc::new(routing::StaticFileSource::new(path)) } else if let (Ok(nats_url), Ok(bucket), Ok(key)) = ( std::env::var("GATEWAY_ROUTING_NATS_URL"), std::env::var("GATEWAY_ROUTING_NATS_BUCKET"), std::env::var("GATEWAY_ROUTING_NATS_KEY"), ) { Arc::new(routing::NatsKvSource::connect(nats_url, bucket, key).await?) } else { Arc::new(routing::FixedSource::new(routing::RoutingConfig::empty())) }; let routing = routing::RouterState::new(routing_source).await?; let _routing_watcher = routing.start_watcher(); let grpc_addr: SocketAddr = std::env::var("GATEWAY_GRPC_ADDR") .unwrap_or_else(|_| "0.0.0.0:8081".to_string()) .parse()?; let state = AppState { metrics, routing, storage, authn, }; let app = gateway::app(state.clone()); let listener = tokio::net::TcpListener::bind(addr).await?; tracing::info!(%addr, "gateway listening"); tracing::info!(%grpc_addr, "gateway grpc listening"); let (shutdown_tx, _shutdown_rx) = tokio::sync::broadcast::channel::<()>(2); let shutdown_task = { let shutdown_tx = shutdown_tx.clone(); tokio::spawn(async move { shutdown_signal().await; let _ = shutdown_tx.send(()); }) }; let http_task = { let mut shutdown_rx = shutdown_tx.subscribe(); tokio::spawn(async move { axum::serve(listener, app) .with_graceful_shutdown(async move { let _ = shutdown_rx.recv().await; }) .await .unwrap(); }) }; let grpc_task = { let mut shutdown_rx = shutdown_tx.subscribe(); let svc = gateway::grpc::GatewayCommandService::new(state.routing.clone()); tokio::spawn(async move { tonic::transport::Server::builder() .add_service( gateway::grpc::proto::command_service_server::CommandServiceServer::new(svc), ) .serve_with_shutdown(grpc_addr, async move { let _ = shutdown_rx.recv().await; }) .await .unwrap(); }) }; tokio::select! { _ = http_task => {}, _ = grpc_task => {}, } let _ = shutdown_task.await; Ok(()) } async fn shutdown_signal() { let ctrl_c = async { let _ = tokio::signal::ctrl_c().await; }; #[cfg(unix)] let terminate = async { let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) .expect("failed to register SIGTERM handler"); sigterm.recv().await; }; #[cfg(not(unix))] let terminate = std::future::pending::<()>(); tokio::select! { _ = ctrl_c => {}, _ = terminate => {}, } }