use axum::extract::Query; use axum::extract::State; use axum::http::StatusCode; use axum::Json; use serde::Deserialize; use serde::Serialize; use std::time::Duration; use crate::authz; use crate::authz::AuthzRejection; use crate::authz::Principal; use crate::routing::ServiceKind; use crate::storage::StorageError; use crate::AppState; pub fn router() -> axum::Router { axum::Router::new() .route("/status", axum::routing::get(status)) .route("/gates", axum::routing::get(gates)) .route("/plans", axum::routing::get(list_plans)) .route("/plan", axum::routing::post(create_plan)) .route("/apply", axum::routing::post(apply_plan)) .route("/rollback", axum::routing::post(rollback_plan)) } #[derive(Debug, Deserialize)] pub struct ResolveQuery { pub tenant_id: String, pub kind: String, } #[derive(Debug, Serialize)] pub struct ResolveResponse { pub tenant_id: String, pub kind: ServiceKind, pub endpoint: String, pub revision: u64, } #[derive(Debug, Deserialize)] struct TenantQuery { tenant_id: String, } #[derive(Debug, Serialize)] struct StatusResponse { tenant_id: String, revision: u64, aggregate: Option, projection: Option, runner: Option, } #[derive(Debug, Serialize)] struct GatesResponse { tenant_id: String, aggregate_ready: bool, projection_ready: bool, runner_ready: bool, } #[derive(Debug, Serialize, Deserialize, Clone)] struct Stored { v: u32, data: T, } #[derive(Debug, Serialize, Deserialize, Clone)] struct RebalancePlan { plan_id: String, tenant_id: String, kind: ServiceKind, from_endpoint: Option, to_endpoint: Option, status: String, actor_id: String, created_at_ms: i64, updated_at_ms: i64, } #[derive(Debug, Deserialize)] struct CreatePlanBody { tenant_id: String, kind: String, to_endpoint: Option, } #[derive(Debug, Deserialize)] struct PlanActionBody { plan_id: String, tenant_id: String, } #[derive(Debug, Deserialize)] struct ListPlansQuery { tenant_id: Option, limit: Option, } pub async fn resolve( State(state): State, principal: Principal, Query(q): Query, ) -> Result, AuthzRejection> { require_platform_admin(&state.storage, &principal.user_id).await?; let kind = parse_kind(&q.kind).ok_or(AuthzRejection::Internal)?; let table = state.routing.snapshot().await; let endpoint = table.resolve(&q.tenant_id, kind).map_err(|e| match e { crate::routing::RoutingError::UnknownTenant => AuthzRejection::NotFound, crate::routing::RoutingError::MissingShard | crate::routing::RoutingError::EmptyShard => { AuthzRejection::Internal } })?; Ok(Json(ResolveResponse { tenant_id: q.tenant_id, kind, endpoint, revision: table.revision, })) } async fn status( State(state): State, principal: Principal, Query(q): Query, ) -> Result, AuthzRejection> { require_platform_admin(&state.storage, &principal.user_id).await?; let table = state.routing.snapshot().await; let aggregate = table.resolve(&q.tenant_id, ServiceKind::Aggregate).ok(); let projection = table.resolve(&q.tenant_id, ServiceKind::Projection).ok(); let runner = table.resolve(&q.tenant_id, ServiceKind::Runner).ok(); Ok(Json(StatusResponse { tenant_id: q.tenant_id, revision: table.revision, aggregate, projection, runner, })) } async fn gates( State(state): State, ctx: crate::RequestContext, principal: Principal, Query(q): Query, ) -> Result, AuthzRejection> { require_platform_admin(&state.storage, &principal.user_id).await?; let projection_endpoint = state .routing .resolve(&q.tenant_id, ServiceKind::Projection) .await .ok(); let runner_endpoint = state .routing .resolve(&q.tenant_id, ServiceKind::Runner) .await .ok(); let aggregate_endpoint = state .routing .resolve(&q.tenant_id, ServiceKind::Aggregate) .await .ok(); let projection_fut = async { if let Some(ep) = projection_endpoint { projection_gate_ready(&ep, &q.tenant_id, &ctx) .await .unwrap_or(false) } else { false } }; let runner_fut = async { if let Some(ep) = runner_endpoint { http_ready(&ep, &ctx).await.unwrap_or(false) } else { false } }; let aggregate_fut = async { if let Some(ep) = aggregate_endpoint { aggregate_ready(&ep, &ctx).await.unwrap_or(false) } else { false } }; let (projection_ready, runner_ready, aggregate_ready) = tokio::join!(projection_fut, runner_fut, aggregate_fut); Ok(Json(GatesResponse { tenant_id: q.tenant_id, aggregate_ready, projection_ready, runner_ready, })) } async fn http_ready(endpoint: &str, ctx: &crate::RequestContext) -> Result { let url = format!("{}/ready", endpoint.trim_end_matches('/')); crate::upstream::probe_status_ok( &url, &[ (shared::HEADER_X_CORRELATION_ID, ctx.correlation_id.as_str()), (shared::HEADER_TRACEPARENT, ctx.traceparent.as_str()), ], Duration::from_secs(2), Duration::from_millis(500), ) .await .map_err(|_| AuthzRejection::Internal) } async fn aggregate_ready( endpoint: &str, ctx: &crate::RequestContext, ) -> Result { if endpoint.contains(":50051") { let http_ep = endpoint.replace(":50051", ":8080"); return http_ready(&http_ep, ctx).await; } http_ready(endpoint, ctx).await } async fn projection_gate_ready( endpoint: &str, tenant_id: &str, ctx: &crate::RequestContext, ) -> Result { let url = format!("{}/metrics", endpoint.trim_end_matches('/')); let text = crate::upstream::probe_text( &url, &[ (shared::HEADER_X_CORRELATION_ID, ctx.correlation_id.as_str()), (shared::HEADER_TRACEPARENT, ctx.traceparent.as_str()), ], Duration::from_secs(2), Duration::from_millis(250), ) .await .map_err(|_| AuthzRejection::Internal)?; let ready = parse_prom_gauge(&text, "projection_ready").unwrap_or(0.0) >= 1.0; if !ready { return Ok(false); } let max_lag = parse_projection_max_lag(&text, tenant_id).unwrap_or(u64::MAX); let threshold = std::env::var("GATEWAY_REBALANCE_PROJECTION_MAX_LAG") .ok() .and_then(|v| v.parse::().ok()) .unwrap_or(0); Ok(max_lag <= threshold) } fn parse_prom_gauge(metrics: &str, name: &str) -> Option { for line in metrics.lines() { let line = line.trim(); if line.starts_with('#') || line.is_empty() { continue; } if line.starts_with(name) && !line.contains('{') { let mut it = line.split_whitespace(); let _ = it.next()?; return it.next()?.parse::().ok(); } } None } fn parse_projection_max_lag(metrics: &str, tenant_id: &str) -> Option { let mut max: Option = None; for line in metrics.lines() { let line = line.trim(); if !line.starts_with("projection_lag{") { continue; } if !line.contains(&format!("tenant_id=\"{}\"", tenant_id)) { continue; } let value = line .split_whitespace() .nth(1) .and_then(|v| v.parse::().ok())?; max = Some(max.map(|m| m.max(value)).unwrap_or(value)); } max } fn parse_kind(kind: &str) -> Option { match kind.trim().to_ascii_lowercase().as_str() { "aggregate" => Some(ServiceKind::Aggregate), "projection" => Some(ServiceKind::Projection), "runner" => Some(ServiceKind::Runner), _ => None, } } async fn require_platform_admin( storage: &crate::storage::GatewayStorage, principal_id: &str, ) -> Result<(), AuthzRejection> { authz::ensure_allowed(storage, principal_id, "*", "iam.platform_admin").await } async fn create_plan( State(state): State, principal: Principal, Json(body): Json, ) -> Result, AuthzRejection> { require_platform_admin(&state.storage, &principal.user_id).await?; if body.tenant_id.trim().is_empty() { return Err(AuthzRejection::BadRequest); } let kind = parse_kind(&body.kind).ok_or(AuthzRejection::BadRequest)?; let to_endpoint = body.to_endpoint.filter(|s| !s.trim().is_empty()); if to_endpoint.is_none() { return Err(AuthzRejection::BadRequest); } let from_endpoint = state.routing.resolve(&body.tenant_id, kind).await.ok(); let plan_id = uuid::Uuid::new_v4().to_string(); let now_ms = unix_ms(); let plan = RebalancePlan { plan_id: plan_id.clone(), tenant_id: body.tenant_id.clone(), kind, from_endpoint, to_endpoint, status: "planned".to_string(), actor_id: principal.user_id, created_at_ms: now_ms, updated_at_ms: now_ms, }; let key = plan_key(&plan.tenant_id, &plan.plan_id); state .storage .audit_index .create( &key, encode_stored(&plan).map_err(|_| AuthzRejection::Internal)?, ) .await .map_err(|e| match e { StorageError::AlreadyExists => AuthzRejection::Conflict, _ => AuthzRejection::Internal, })?; Ok(Json(plan)) } async fn apply_plan( State(state): State, principal: Principal, Json(body): Json, ) -> Result { require_platform_admin(&state.storage, &principal.user_id).await?; transition_plan_status(&state, &body.tenant_id, &body.plan_id, "apply_requested").await?; Ok(StatusCode::NO_CONTENT) } async fn rollback_plan( State(state): State, principal: Principal, Json(body): Json, ) -> Result { require_platform_admin(&state.storage, &principal.user_id).await?; transition_plan_status(&state, &body.tenant_id, &body.plan_id, "rollback_requested").await?; Ok(StatusCode::NO_CONTENT) } async fn list_plans( State(state): State, principal: Principal, Query(q): Query, ) -> Result>, AuthzRejection> { require_platform_admin(&state.storage, &principal.user_id).await?; let prefix = match &q.tenant_id { Some(t) => format!("v1/rebalance/plans/{}/", t.trim()), None => "v1/rebalance/plans/".to_string(), }; let mut keys = state .storage .audit_index .list_keys(&prefix) .await .map_err(|_| AuthzRejection::Internal)?; keys.sort(); keys.reverse(); let limit = q.limit.unwrap_or(50).min(200); let mut out = Vec::new(); for key in keys.into_iter().take(limit) { let entry = state .storage .audit_index .get(&key) .await .map_err(|_| AuthzRejection::Internal)?; let Some(entry) = entry else { continue; }; let plan: RebalancePlan = decode_stored(&entry.value).map_err(|_| AuthzRejection::Internal)?; out.push(plan); } Ok(Json(out)) } async fn transition_plan_status( state: &AppState, tenant_id: &str, plan_id: &str, next_status: &str, ) -> Result<(), AuthzRejection> { let key = plan_key(tenant_id, plan_id); for _ in 0..10 { let entry = state .storage .audit_index .get(&key) .await .map_err(|_| AuthzRejection::Internal)? .ok_or(AuthzRejection::NotFound)?; let mut plan: Stored = serde_json::from_slice(&entry.value).map_err(|_| AuthzRejection::Internal)?; plan.data.status = next_status.to_string(); plan.data.updated_at_ms = unix_ms(); let payload = serde_json::to_vec(&plan).map_err(|_| AuthzRejection::Internal)?; match state .storage .audit_index .update(&key, entry.revision, payload) .await { Ok(_) => return Ok(()), Err(StorageError::CasMismatch) => continue, Err(_) => return Err(AuthzRejection::Internal), } } Err(AuthzRejection::Internal) } fn plan_key(tenant_id: &str, plan_id: &str) -> String { format!("v1/rebalance/plans/{tenant_id}/{plan_id}") } fn encode_stored(data: &T) -> Result, StorageError> { serde_json::to_vec(&Stored { v: crate::storage::SCHEMA_VERSION, data, }) .map_err(|e| StorageError::Serde(e.to_string())) } fn decode_stored Deserialize<'de>>(bytes: &[u8]) -> Result { let stored: Stored = serde_json::from_slice(bytes).map_err(|e| StorageError::Serde(e.to_string()))?; Ok(stored.data) } fn unix_ms() -> i64 { std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_millis() as i64 } #[cfg(test)] mod tests { use super::*; use crate::authn; use std::collections::HashMap; use std::sync::Arc; use tower::util::ServiceExt; async fn test_app_with_routing(cfg: crate::routing::RoutingConfig) -> (axum::Router, AppState) { let metrics = crate::observability::init_metrics_for_tests(); let source: Arc = Arc::new(crate::routing::FixedSource::new(cfg)); let routing = crate::routing::RouterState::new(source).await.unwrap(); let storage = crate::storage::GatewayStorage::new_in_memory(); let authn_cfg = crate::authn::AuthnConfig::for_tests(); let state = crate::AppState { metrics, routing, storage, authn: authn_cfg, }; let app = crate::app(state.clone()); (app, state) } async fn signup_and_token(app: &axum::Router, cfg: &authn::AuthnConfig) -> (String, String) { let response = app .clone() .oneshot( axum::http::Request::builder() .method("POST") .uri("/v1/auth/signup") .header("content-type", "application/json") .body(axum::body::Body::from( r#"{"email":"a@b.com","password":"password123"}"#, )) .unwrap(), ) .await .unwrap(); let body = axum::body::to_bytes(response.into_body(), usize::MAX) .await .unwrap(); let created: crate::authn::AuthResponse = serde_json::from_slice(&body).unwrap(); let claims = cfg.verify_access_token(&created.access_token).unwrap(); (created.access_token, claims.sub) } #[tokio::test] async fn resolve_requires_platform_admin() { let cfg = crate::routing::RoutingConfig::empty(); let (app, state) = test_app_with_routing(cfg).await; let (token, user_id) = signup_and_token(&app, &state.authn).await; let resp = app .clone() .oneshot( axum::http::Request::builder() .method("GET") .uri("/admin/routing/resolve?tenant_id=t1&kind=aggregate") .header("authorization", format!("Bearer {token}")) .body(axum::body::Body::empty()) .unwrap(), ) .await .unwrap(); assert_eq!(resp.status(), axum::http::StatusCode::FORBIDDEN); crate::authz::put_role( &state.storage, "role-platform-admin", vec!["iam.platform_admin".to_string()], ) .await .unwrap(); crate::authz::assign_role(&state.storage, "*", &user_id, "role-platform-admin") .await .unwrap(); let resp = app .oneshot( axum::http::Request::builder() .method("GET") .uri("/admin/routing/resolve?tenant_id=t1&kind=aggregate") .header("authorization", format!("Bearer {token}")) .body(axum::body::Body::empty()) .unwrap(), ) .await .unwrap(); assert_eq!(resp.status(), axum::http::StatusCode::NOT_FOUND); } #[tokio::test] async fn status_includes_revision() { let cfg = crate::routing::RoutingConfig { revision: 42, aggregate_placement: HashMap::new(), projection_placement: HashMap::new(), runner_placement: HashMap::new(), aggregate_shards: HashMap::new(), projection_shards: HashMap::new(), runner_shards: HashMap::new(), }; let (app, state) = test_app_with_routing(cfg).await; let (token, user_id) = signup_and_token(&app, &state.authn).await; crate::authz::put_role( &state.storage, "role-platform-admin", vec!["iam.platform_admin".to_string()], ) .await .unwrap(); crate::authz::assign_role(&state.storage, "*", &user_id, "role-platform-admin") .await .unwrap(); let resp = app .oneshot( axum::http::Request::builder() .method("GET") .uri("/admin/rebalance/status?tenant_id=t1") .header("authorization", format!("Bearer {token}")) .body(axum::body::Body::empty()) .unwrap(), ) .await .unwrap(); assert_eq!(resp.status(), axum::http::StatusCode::OK); let body = axum::body::to_bytes(resp.into_body(), usize::MAX) .await .unwrap(); let value: serde_json::Value = serde_json::from_slice(&body).unwrap(); assert_eq!(value.get("revision").and_then(|v| v.as_u64()).unwrap(), 42); } #[tokio::test] async fn gates_prevent_cutover_when_projection_not_ready_or_lagging() { let metrics_not_ready = axum::Router::new().route( "/metrics", axum::routing::get(|| async { (axum::http::StatusCode::OK, "projection_ready 0\n") }), ); let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); tokio::spawn(async move { axum::serve(listener, metrics_not_ready).await.unwrap(); }); tokio::time::sleep(std::time::Duration::from_millis(50)).await; let endpoint = format!("http://{}", addr); let cfg = crate::routing::RoutingConfig { revision: 1, aggregate_placement: HashMap::new(), projection_placement: HashMap::from([("tenant-a".to_string(), "p".to_string())]), runner_placement: HashMap::new(), aggregate_shards: HashMap::new(), projection_shards: HashMap::from([("p".to_string(), vec![endpoint])]), runner_shards: HashMap::new(), }; let (app, state) = test_app_with_routing(cfg).await; let (token, user_id) = signup_and_token(&app, &state.authn).await; crate::authz::put_role( &state.storage, "role-platform-admin", vec!["iam.platform_admin".to_string()], ) .await .unwrap(); crate::authz::assign_role(&state.storage, "*", &user_id, "role-platform-admin") .await .unwrap(); let resp = app .clone() .oneshot( axum::http::Request::builder() .method("GET") .uri("/admin/rebalance/gates?tenant_id=tenant-a") .header("authorization", format!("Bearer {token}")) .body(axum::body::Body::empty()) .unwrap(), ) .await .unwrap(); assert_eq!(resp.status(), axum::http::StatusCode::OK); let body = axum::body::to_bytes(resp.into_body(), usize::MAX) .await .unwrap(); let value: serde_json::Value = serde_json::from_slice(&body).unwrap(); assert!(!value .get("projection_ready") .and_then(|v| v.as_bool()) .unwrap()); let metrics_lagging = axum::Router::new().route( "/metrics", axum::routing::get(|| async { ( axum::http::StatusCode::OK, "projection_ready 1\nprojection_lag{tenant_id=\"tenant-a\"} 5\n", ) }), ); let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); tokio::spawn(async move { axum::serve(listener, metrics_lagging).await.unwrap(); }); tokio::time::sleep(std::time::Duration::from_millis(50)).await; let endpoint = format!("http://{}", addr); std::env::set_var("GATEWAY_REBALANCE_PROJECTION_MAX_LAG", "0"); let cfg = crate::routing::RoutingConfig { revision: 2, aggregate_placement: HashMap::new(), projection_placement: HashMap::from([("tenant-a".to_string(), "p".to_string())]), runner_placement: HashMap::new(), aggregate_shards: HashMap::new(), projection_shards: HashMap::from([("p".to_string(), vec![endpoint])]), runner_shards: HashMap::new(), }; let (app, state) = test_app_with_routing(cfg).await; crate::authz::put_role( &state.storage, "role-platform-admin", vec!["iam.platform_admin".to_string()], ) .await .unwrap(); crate::authz::assign_role(&state.storage, "*", &user_id, "role-platform-admin") .await .unwrap(); let resp = app .oneshot( axum::http::Request::builder() .method("GET") .uri("/admin/rebalance/gates?tenant_id=tenant-a") .header("authorization", format!("Bearer {token}")) .body(axum::body::Body::empty()) .unwrap(), ) .await .unwrap(); assert_eq!(resp.status(), axum::http::StatusCode::OK); let body = axum::body::to_bytes(resp.into_body(), usize::MAX) .await .unwrap(); let value: serde_json::Value = serde_json::from_slice(&body).unwrap(); assert!(!value .get("projection_ready") .and_then(|v| v.as_bool()) .unwrap()); } #[tokio::test] async fn plan_endpoints_require_platform_admin_and_persist_plans() { let cfg = crate::routing::RoutingConfig::empty(); let (app, state) = test_app_with_routing(cfg).await; let (token, user_id) = signup_and_token(&app, &state.authn).await; let forbidden = app .clone() .oneshot( axum::http::Request::builder() .method("POST") .uri("/admin/rebalance/plan") .header("authorization", format!("Bearer {token}")) .header("content-type", "application/json") .body(axum::body::Body::from( r#"{"tenant_id":"tenant-a","kind":"projection","to_endpoint":"http://p"}"#, )) .unwrap(), ) .await .unwrap(); assert_eq!(forbidden.status(), axum::http::StatusCode::FORBIDDEN); crate::authz::put_role( &state.storage, "role-platform-admin", vec!["iam.platform_admin".to_string()], ) .await .unwrap(); crate::authz::assign_role(&state.storage, "*", &user_id, "role-platform-admin") .await .unwrap(); let created = app .clone() .oneshot( axum::http::Request::builder() .method("POST") .uri("/admin/rebalance/plan") .header("authorization", format!("Bearer {token}")) .header("content-type", "application/json") .body(axum::body::Body::from( r#"{"tenant_id":"tenant-a","kind":"projection","to_endpoint":"http://p"}"#, )) .unwrap(), ) .await .unwrap(); assert_eq!(created.status(), axum::http::StatusCode::OK); let body = axum::body::to_bytes(created.into_body(), usize::MAX) .await .unwrap(); let plan: serde_json::Value = serde_json::from_slice(&body).unwrap(); let plan_id = plan.get("plan_id").and_then(|v| v.as_str()).unwrap(); let listed = app .oneshot( axum::http::Request::builder() .method("GET") .uri("/admin/rebalance/plans?tenant_id=tenant-a&limit=10") .header("authorization", format!("Bearer {token}")) .body(axum::body::Body::empty()) .unwrap(), ) .await .unwrap(); assert_eq!(listed.status(), axum::http::StatusCode::OK); let body = axum::body::to_bytes(listed.into_body(), usize::MAX) .await .unwrap(); let plans: serde_json::Value = serde_json::from_slice(&body).unwrap(); assert!(plans .as_array() .unwrap() .iter() .any(|p| p.get("plan_id").and_then(|v| v.as_str()) == Some(plan_id))); } }