131 lines
4.1 KiB
Rust
131 lines
4.1 KiB
Rust
use std::net::SocketAddr;
|
|
use std::sync::Arc;
|
|
|
|
use gateway::observability;
|
|
use gateway::routing;
|
|
use gateway::storage;
|
|
use gateway::AppState;
|
|
|
|
#[tokio::main]
|
|
async fn main() -> anyhow::Result<()> {
|
|
observability::init_tracing();
|
|
let metrics = observability::init_metrics();
|
|
let authn = gateway::authn::AuthnConfig::from_env();
|
|
|
|
let build_version = option_env!("GATEWAY_BUILD_VERSION").unwrap_or("dev");
|
|
let build_sha = option_env!("GATEWAY_BUILD_SHA").unwrap_or("unknown");
|
|
tracing::info!(build_version, build_sha, "gateway starting");
|
|
|
|
let addr: SocketAddr = std::env::var("GATEWAY_ADDR")
|
|
.unwrap_or_else(|_| "0.0.0.0:8080".to_string())
|
|
.parse()?;
|
|
|
|
let storage_path =
|
|
std::env::var("GATEWAY_STORAGE_PATH").unwrap_or_else(|_| "./data/gateway.mdbx".to_string());
|
|
if let Some(parent) = std::path::Path::new(&storage_path).parent() {
|
|
let _ = std::fs::create_dir_all(parent);
|
|
}
|
|
let storage = storage::GatewayStorage::open_edge_storage(storage_path, "gateway")
|
|
.unwrap_or_else(|_| storage::GatewayStorage::new_in_memory());
|
|
|
|
let routing_source: Arc<dyn routing::RoutingSource> =
|
|
if let Ok(path) = std::env::var("GATEWAY_ROUTING_FILE") {
|
|
Arc::new(routing::StaticFileSource::new(path))
|
|
} else if let (Ok(nats_url), Ok(bucket), Ok(key)) = (
|
|
std::env::var("GATEWAY_ROUTING_NATS_URL"),
|
|
std::env::var("GATEWAY_ROUTING_NATS_BUCKET"),
|
|
std::env::var("GATEWAY_ROUTING_NATS_KEY"),
|
|
) {
|
|
Arc::new(routing::NatsKvSource::connect(nats_url, bucket, key).await?)
|
|
} else {
|
|
Arc::new(routing::FixedSource::new(routing::RoutingConfig::empty()))
|
|
};
|
|
|
|
let routing = routing::RouterState::new(routing_source).await?;
|
|
let _routing_watcher = routing.start_watcher();
|
|
|
|
let grpc_addr: SocketAddr = std::env::var("GATEWAY_GRPC_ADDR")
|
|
.unwrap_or_else(|_| "0.0.0.0:8081".to_string())
|
|
.parse()?;
|
|
|
|
let state = AppState {
|
|
metrics,
|
|
routing,
|
|
storage,
|
|
authn,
|
|
};
|
|
|
|
let app = gateway::app(state.clone());
|
|
|
|
let listener = tokio::net::TcpListener::bind(addr).await?;
|
|
tracing::info!(%addr, "gateway listening");
|
|
|
|
tracing::info!(%grpc_addr, "gateway grpc listening");
|
|
|
|
let (shutdown_tx, _shutdown_rx) = tokio::sync::broadcast::channel::<()>(2);
|
|
let shutdown_task = {
|
|
let shutdown_tx = shutdown_tx.clone();
|
|
tokio::spawn(async move {
|
|
shutdown_signal().await;
|
|
let _ = shutdown_tx.send(());
|
|
})
|
|
};
|
|
|
|
let http_task = {
|
|
let mut shutdown_rx = shutdown_tx.subscribe();
|
|
tokio::spawn(async move {
|
|
axum::serve(listener, app)
|
|
.with_graceful_shutdown(async move {
|
|
let _ = shutdown_rx.recv().await;
|
|
})
|
|
.await
|
|
.unwrap();
|
|
})
|
|
};
|
|
|
|
let grpc_task = {
|
|
let mut shutdown_rx = shutdown_tx.subscribe();
|
|
let svc = gateway::grpc::GatewayCommandService::new(state.routing.clone());
|
|
tokio::spawn(async move {
|
|
tonic::transport::Server::builder()
|
|
.add_service(
|
|
gateway::grpc::proto::command_service_server::CommandServiceServer::new(svc),
|
|
)
|
|
.serve_with_shutdown(grpc_addr, async move {
|
|
let _ = shutdown_rx.recv().await;
|
|
})
|
|
.await
|
|
.unwrap();
|
|
})
|
|
};
|
|
|
|
tokio::select! {
|
|
_ = http_task => {},
|
|
_ = grpc_task => {},
|
|
}
|
|
let _ = shutdown_task.await;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn shutdown_signal() {
|
|
let ctrl_c = async {
|
|
let _ = tokio::signal::ctrl_c().await;
|
|
};
|
|
|
|
#[cfg(unix)]
|
|
let terminate = async {
|
|
let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
|
|
.expect("failed to register SIGTERM handler");
|
|
sigterm.recv().await;
|
|
};
|
|
|
|
#[cfg(not(unix))]
|
|
let terminate = std::future::pending::<()>();
|
|
|
|
tokio::select! {
|
|
_ = ctrl_c => {},
|
|
_ = terminate => {},
|
|
}
|
|
}
|