wip:milestone 0 fixes
Some checks failed
CI/CD Pipeline / unit-tests (push) Failing after 1m16s
CI/CD Pipeline / integration-tests (push) Failing after 2m32s
CI/CD Pipeline / lint (push) Successful in 5m22s
CI/CD Pipeline / e2e-tests (push) Has been skipped
CI/CD Pipeline / build (push) Has been skipped

This commit is contained in:
2026-03-15 12:35:42 +02:00
parent 6708cf28a7
commit cffdf8af86
61266 changed files with 4511646 additions and 1938 deletions

242
gateway/src/admin_auth.rs Normal file
View File

@@ -0,0 +1,242 @@
use axum::{
extract::{Request, State},
http::{StatusCode, header::COOKIE},
middleware::Next,
response::Response,
};
use uuid::Uuid;
use std::sync::Arc;
use tokio::sync::RwLock;
use std::collections::HashMap;
use chrono::{DateTime, Utc};
#[derive(Clone)]
pub struct AdminAuthState {
// In-memory session store (for production, use Redis)
sessions: Arc<RwLock<HashMap<String, SessionData>>>,
}
#[derive(Clone)]
struct SessionData {
created_at: DateTime<Utc>,
last_accessed: DateTime<Utc>,
}
impl AdminAuthState {
pub fn new() -> Self {
Self {
sessions: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn create_session(&self) -> String {
let session_id = Uuid::new_v4().to_string();
let data = SessionData {
created_at: Utc::now(),
last_accessed: Utc::now(),
};
self.sessions.write().await.insert(session_id.clone(), data);
// Clean up old sessions (older than 24 hours)
self.cleanup_old_sessions().await;
session_id
}
pub async fn validate_session(&self, session_id: &str) -> bool {
let mut sessions = self.sessions.write().await;
if let Some(data) = sessions.get_mut(session_id) {
// Update last accessed time
data.last_accessed = Utc::now();
true
} else {
false
}
}
pub async fn revoke_session(&self, session_id: &str) {
self.sessions.write().await.remove(session_id);
}
async fn cleanup_old_sessions(&self) {
let cutoff = Utc::now() - chrono::Duration::hours(24);
let mut sessions = self.sessions.write().await;
sessions.retain(|_, data| data.last_accessed > cutoff);
}
}
impl Default for AdminAuthState {
fn default() -> Self {
Self::new()
}
}
pub async fn admin_auth_middleware(
State(state): State<AdminAuthState>,
req: Request,
next: Next,
) -> Result<Response, StatusCode> {
let path = req.uri().path();
// 1. ALWAYS Allow the dashboard page itself so the login UI can load
if path == "/dashboard" {
return Ok(next.run(req).await);
}
// 2. Protect ONLY the platform API routes
if path.starts_with("/platform/v1") {
// Allow the login endpoint
if path == "/platform/v1/login" {
return Ok(next.run(req).await);
}
// Extract session token from cookie
let session_token = req.headers()
.get(COOKIE)
.and_then(|h| h.to_str().ok())
.and_then(|cookies| {
cookies.split(';')
.find_map(|c| {
let c = c.trim();
c.strip_prefix("madbase_admin_session=")
})
});
// Also check X-Admin-Token header (for API clients)
let token = session_token
.or_else(|| req.headers()
.get("x-admin-token")
.and_then(|v| v.to_str().ok()));
let token = token.ok_or(StatusCode::UNAUTHORIZED)?;
// Validate against session store
let valid = state.validate_session(token).await;
if !valid {
return Err(StatusCode::UNAUTHORIZED);
}
}
Ok(next.run(req).await)
}
#[cfg(test)]
mod tests {
use super::*;
use axum::{body::Body, http::Request, routing::get, Router};
async fn dummy_handler() -> &'static str {
"ok"
}
#[tokio::test]
async fn test_admin_auth_rejects_no_session() {
let state = AdminAuthState::new();
let app = Router::new()
.route("/protected", get(dummy_handler))
.layer(axum::middleware::from_fn_with_state(state.clone(), admin_auth_middleware));
let response = app
.oneshot(
Request::builder()
.uri("/protected")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
}
#[tokio::test]
async fn test_admin_accepts_valid_session() {
let state = AdminAuthState::new();
let session_id = state.create_session().await;
let app = Router::new()
.route("/platform/v1/test", get(dummy_handler))
.layer(axum::middleware::from_fn_with_state(state.clone(), admin_auth_middleware));
let response = app
.oneshot(
Request::builder()
.uri("/platform/v1/test")
.header("Cookie", format!("madbase_admin_session={}", session_id))
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
}
#[tokio::test]
async fn test_admin_rejects_invalid_session() {
let state = AdminAuthState::new();
let app = Router::new()
.route("/platform/v1/test", get(dummy_handler))
.layer(axum::middleware::from_fn_with_state(state.clone(), admin_auth_middleware));
let response = app
.oneshot(
Request::builder()
.uri("/platform/v1/test")
.header("Cookie", "madbase_admin_session=invalid-token")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
}
#[tokio::test]
async fn test_admin_allows_login_endpoint() {
let state = AdminAuthState::new();
let app = Router::new()
.route("/platform/v1/login", get(dummy_handler))
.layer(axum::middleware::from_fn_with_state(state.clone(), admin_auth_middleware));
let response = app
.oneshot(
Request::builder()
.uri("/platform/v1/login")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
}
#[tokio::test]
async fn test_admin_allows_dashboard() {
let state = AdminAuthState::new();
let app = Router::new()
.route("/dashboard", get(dummy_handler))
.layer(axum::middleware::from_fn_with_state(state.clone(), admin_auth_middleware));
let response = app
.oneshot(
Request::builder()
.uri("/dashboard")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
}
}

View File

@@ -0,0 +1,9 @@
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenvy::dotenv().ok();
let _rust_log = std::env::var("RUST_LOG").unwrap_or_else(|_| "info".into());
tracing_subscriber::fmt::init();
gateway::control::run().await
}

9
gateway/src/bin/proxy.rs Normal file
View File

@@ -0,0 +1,9 @@
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenvy::dotenv().ok();
let _rust_log = std::env::var("RUST_LOG").unwrap_or_else(|_| "info".into());
tracing_subscriber::fmt::init();
gateway::proxy::run().await
}

View File

@@ -0,0 +1,9 @@
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenvy::dotenv().ok();
let _rust_log = std::env::var("RUST_LOG").unwrap_or_else(|_| "info".into());
tracing_subscriber::fmt::init();
gateway::worker::run().await
}

130
gateway/src/control.rs Normal file
View File

@@ -0,0 +1,130 @@
### /Users/vlad/Developer/madapes/madbase/gateway/src/control.rs
```rust
1: use axum::{
2: extract::{Request, Query},
3: middleware::{from_fn, Next},
4: response::{Response, IntoResponse},
5: routing::get,
6: Router,
7: };
8: use axum::http::StatusCode;
9: use axum_prometheus::PrometheusMetricLayer;
10: use common::{init_pool, Config};
11: use sqlx::PgPool;
12: use crate::admin_auth::admin_auth_middleware;
13: use std::collections::HashMap;
14: use std::net::SocketAddr;
15: use std::time::Duration;
16: use tower_http::services::ServeDir;
17: use tower_http::cors::{AllowOrigin, CorsLayer};
use axum::http::{HeaderMap, HeaderValue, Method};
use axum::http::header;
18: use tower_http::trace::TraceLayer;
19:
20: async fn logs_proxy_handler(
21: Query(params): Query<HashMap<String, String>>,
22: ) -> impl IntoResponse {
23: let client = reqwest::Client::new();
24: let loki_url = std::env::var("LOKI_URL")
25: .unwrap_or_else(|_| "http://loki:3100".to_string());
26: let query_url = format!("{}/loki/api/v1/query_range", loki_url);
27:
28: let resp = client
29: .get(&query_url)
30: .query(&params)
31: .send()
32: .await;
33:
34: match resp {
35: Ok(r) => {
36: let status = StatusCode::from_u16(r.status().as_u16())
37: .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
38: let body = r.bytes().await.unwrap_or_default();
39: (status, body).into_response()
40: },
41: Err(e) => {
42: tracing::error!("Loki proxy error: {}", e);
43: (StatusCode::BAD_GATEWAY, e.to_string()).into_response()
44: }
45: }
46: }
47:
48: async fn dashboard_handler() -> axum::response::Html<&'static str> {
49: axum::response::Html(include_str!("../../web/admin.html"))
50: }
51:
52: async fn wait_for_db(db_url: &str) -> PgPool {
53: loop {
54: match init_pool(db_url).await {
55: Ok(pool) => return pool,
56: Err(e) => {
57: tracing::warn!("Database not ready yet, retrying in 2s: {}", e);
58: tokio::time::sleep(Duration::from_secs(2)).await;
59: }
60: }
61: }
62: }
63:
64: async fn log_headers(req: Request, next: Next) -> Response {
65: tracing::debug!("Request Headers: {:?}", req.headers());
66: next.run(req).await
67: }
68:
69: pub async fn run() -> anyhow::Result<()> {
70: let config = Config::new().expect("Failed to load configuration");
71:
72: tracing::info!("Starting MadBase Control Plane...");
73:
74: let pool = wait_for_db(&config.database_url).await;
75:
76: sqlx::migrate!("../migrations")
77: .run(&pool)
78: .await
79: .expect("Failed to run migrations");
80:
81: let default_tenant_db_url = std::env::var("DEFAULT_TENANT_DB_URL")
82: .expect("DEFAULT_TENANT_DB_URL must be set");
83: let tenant_pool = wait_for_db(&default_tenant_db_url).await;
84:
85: let control_state = control_plane::ControlPlaneState {
86: db: pool.clone(),
87: tenant_db: tenant_pool.clone(),
88: };
89:
90: let (prometheus_layer, metric_handle) = PrometheusMetricLayer::pair();
91:
92: let platform_router = control_plane::router(control_state)
93: .route("/logs", get(logs_proxy_handler));
94:
95: let app = Router::new()
96: .route("/", get(|| async { "MadBase Control Plane" }))
97: .route("/health", get(|| async { "OK" }))
98: .route("/metrics", get(|| async move { metric_handle.render() }))
99: .route("/dashboard", get(dashboard_handler))
100: .nest_service("/css", ServeDir::new("web/css"))
101: .nest_service("/js", ServeDir::new("web/js"))
102: .nest("/platform/v1", platform_router)
103: .layer(from_fn(admin_auth_middleware))
104: .layer(
105: CorsLayer::new()
106: .allow_origin(Any)
107: .allow_methods(Any)
108: .allow_headers(Any),
109: )
110: .layer(TraceLayer::new_for_http())
111: .layer(from_fn(log_headers))
112: .layer(prometheus_layer);
113:
114: let port = std::env::var("CONTROL_PORT")
115: .unwrap_or_else(|_| "8001".to_string())
116: .parse::<u16>()?;
117:
118: let addr = SocketAddr::from(([0, 0, 0, 0], port));
119: tracing::info!("Control plane listening on {}", addr);
120:
121: let listener = tokio::net::TcpListener::bind(addr).await?;
122: axum::serve(listener, app.into_make_service_with_connect_info::<SocketAddr>()).await?;
123:
124: Ok(())
125: }
```

9
gateway/src/lib.rs Normal file
View File

@@ -0,0 +1,9 @@
pub mod admin_auth;
pub mod middleware;
pub mod state;
pub mod worker;
pub mod control;
pub mod proxy;
pub mod rate_limit;
pub use rate_limit::{RateLimiter, RateLimitConfig, RateLimitMiddleware, RateLimitStatus};

265
gateway/src/proxy.rs Normal file
View File

@@ -0,0 +1,265 @@
use axum::{
body::Body,
extract::{Request, State},
http::StatusCode,
response::Response,
routing::get,
Router,
};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{error, info};
#[derive(Clone, Debug)]
struct Upstream {
name: String,
url: String,
healthy: Arc<RwLock<bool>>,
}
impl Upstream {
fn new(name: String, url: String) -> Self {
Self {
name,
url,
healthy: Arc::new(RwLock::new(true)),
}
}
}
#[derive(Clone)]
struct ProxyState {
control_upstream: Upstream,
worker_upstreams: Arc<RwLock<Vec<Upstream>>>,
current_worker_index: Arc<RwLock<usize>>,
}
impl ProxyState {
fn new(control_url: String, worker_urls: Vec<String>) -> Self {
let worker_upstreams = worker_urls
.into_iter()
.map(|url| Upstream::new(format!("worker-{}", url), url))
.collect();
Self {
control_upstream: Upstream::new("control".to_string(), control_url),
worker_upstreams: Arc::new(RwLock::new(worker_upstreams)),
current_worker_index: Arc::new(RwLock::new(0)),
}
}
async fn get_next_worker(&self) -> Option<Upstream> {
let upstreams = self.worker_upstreams.read().await;
let current_len = upstreams.len();
if current_len == 0 {
return None;
}
let mut index = self.current_worker_index.write().await;
let selected = upstreams[*index % current_len].clone();
*index = (*index + 1) % current_len;
Some(selected)
}
async fn get_healthy_worker(&self) -> Option<Upstream> {
let upstreams = self.worker_upstreams.read().await;
for upstream in upstreams.iter() {
let is_healthy = *upstream.healthy.read().await;
if is_healthy {
return Some(upstream.clone());
}
}
None
}
async fn start_health_check_loop(&self) {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
info!("Starting proxy health check loop");
loop {
interval.tick().await;
// Check workers
let worker_upstreams = self.worker_upstreams.read().await;
for worker in worker_upstreams.iter() {
let worker = worker.clone();
tokio::spawn(async move {
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(2))
.build()
.unwrap();
let res = client.get(format!("{}/health", worker.url)).send().await;
let is_healthy = res.is_ok() && res.unwrap().status().is_success();
let mut healthy = worker.healthy.write().await;
if *healthy != is_healthy {
if is_healthy {
info!("Worker {} is now healthy", worker.url);
} else {
error!("Worker {} is now unhealthy", worker.url);
}
}
*healthy = is_healthy;
});
}
// Check control plane
let control = self.control_upstream.clone();
tokio::spawn(async move {
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(2))
.build()
.unwrap();
let res = client.get(format!("{}/health", control.url)).send().await;
let is_healthy = res.is_ok() && res.unwrap().status().is_success();
let mut healthy = control.healthy.write().await;
if *healthy != is_healthy {
if is_healthy {
info!("Control plane {} is now healthy", control.url);
} else {
error!("Control plane {} is now unhealthy", control.url);
}
}
*healthy = is_healthy;
});
}
}
}
async fn proxy_request(
State(state): State<ProxyState>,
req: Request,
) -> Result<Response, StatusCode> {
let path = req.uri().path();
// Route /platform/* to control plane
if path.starts_with("/platform") || path.starts_with("/dashboard") || path == "/login" {
return forward_request(state.control_upstream.clone(), req).await;
}
// Route /auth/v1, /rest/v1, /storage/v1, /realtime/v1, /functions/v1 to workers
if path.starts_with("/auth/v1")
|| path.starts_with("/rest/v1")
|| path.starts_with("/storage/v1")
|| path.starts_with("/realtime/v1")
|| path.starts_with("/functions/v1") {
// Try to get a healthy worker, fall back to round-robin
let mut selected_worker = state.get_healthy_worker().await;
if selected_worker.is_none() {
selected_worker = state.get_next_worker().await;
}
if let Some(upstream) = selected_worker {
forward_request(upstream, req).await
} else {
Err(StatusCode::SERVICE_UNAVAILABLE)
}
} else {
// Default to control plane
forward_request(state.control_upstream.clone(), req).await
}
}
async fn forward_request(upstream: Upstream, req: Request) -> Result<Response, StatusCode> {
let client = reqwest::Client::new();
// Update the request URI
let original_uri = req.uri().clone();
let path_and_query = original_uri
.path_and_query()
.map(|pq| pq.as_str())
.unwrap_or("/");
let target_url = format!("{}{}", upstream.url, path_and_query);
info!("Proxying {} -> {}", original_uri.path(), target_url);
// Build the request
let request_builder = client
.request(req.method().clone(), &target_url)
.headers(req.headers().clone());
let response = request_builder
.send()
.await
.map_err(|e| {
error!("Failed to proxy request to {}: {}", upstream.name, e);
StatusCode::BAD_GATEWAY
})?;
let status = StatusCode::from_u16(response.status().as_u16()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
let headers = response.headers().clone();
let body_bytes = response.bytes().await.map_err(|e| {
error!("Failed to read response body from {}: {}", upstream.name, e);
StatusCode::BAD_GATEWAY
})?;
let mut response_builder = Response::builder().status(status);
// Copy relevant headers
for (name, value) in headers.iter() {
if name != "connection" && name != "transfer-encoding" {
response_builder = response_builder.header(name, value);
}
}
response_builder
.body(Body::from(body_bytes.to_vec()))
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)
}
async fn health_check() -> &'static str {
"OK"
}
pub async fn run() -> anyhow::Result<()> {
info!("Starting MadBase Proxy...");
let control_url = std::env::var("CONTROL_UPSTREAM_URL")
.unwrap_or_else(|_| "http://control:8001".to_string());
let worker_urls_str = std::env::var("WORKER_UPSTREAM_URLS")
.unwrap_or_else(|_| "http://worker1:8002".to_string());
let worker_urls: Vec<String> = worker_urls_str
.split(',')
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect();
info!("Control upstream: {}", control_url);
info!("Worker upstreams: {:?}", worker_urls);
let state = ProxyState::new(control_url, worker_urls);
// Start health check loop in background
let state_clone = state.clone();
tokio::spawn(async move {
state_clone.start_health_check_loop().await;
});
let app = Router::new()
.route("/health", get(health_check))
.fallback(proxy_request)
.with_state(state);
let port = std::env::var("PROXY_PORT")
.unwrap_or_else(|_| "8000".to_string())
.parse::<u16>()?;
let addr = SocketAddr::from(([0, 0, 0, 0], port));
info!("Proxy listening on {}", addr);
let listener = tokio::net::TcpListener::bind(addr).await?;
axum::serve(listener, app.into_make_service_with_connect_info::<SocketAddr>()).await?;
Ok(())
}

174
gateway/src/rate_limit.rs Normal file
View File

@@ -0,0 +1,174 @@
//! Distributed rate limiting using Redis
//!
//! This module provides sliding window rate limiting that works across multiple instances.
//! Rate limits are enforced using Redis counters, ensuring coordinated limits across the cluster.
use common::{CacheLayer, CacheError, CacheResult};
use std::time::Duration;
/// Rate limit configuration
#[derive(Clone, Debug)]
pub struct RateLimitConfig {
pub max_requests: u64,
pub window_seconds: u64,
}
impl RateLimitConfig {
pub fn new(max_requests: u64, window_seconds: u64) -> Self {
Self {
max_requests,
window_seconds,
}
}
}
/// Rate limiter using sliding window algorithm
#[derive(Clone)]
pub struct RateLimiter {
cache: CacheLayer,
}
impl RateLimiter {
/// Create a new rate limiter
pub fn new(cache: CacheLayer) -> Self {
Self { cache }
}
/// Check if a request should be rate limited
///
/// Returns true if the request is allowed, false if rate limit exceeded
pub async fn check_rate_limit(
&self,
key: &str,
config: RateLimitConfig,
) -> CacheResult<bool> {
if let Some(redis) = &self.cache.redis {
let mut conn = redis.get_async_connection().await?;
// Use Lua script for atomic increment and expiry
let script = r"
local key = KEYS[1]
local window = tonumber(ARGV[1])
local max = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
-- Get current count and window start
local current = redis.call('GET', key)
if current == false then
-- First request in window
redis.call('SET', key, 1)
redis.call('EXPIRE', key, window)
return 1
end
-- Increment counter
local count = redis.call('INCR', key)
if count <= max then
return 1
else
return 0
end
";
let result: i32 = redis::Script::new(script)
.key(key)
.arg(config.window_seconds)
.arg(config.max_requests)
.arg(chrono::Utc::now().timestamp())
.invoke_async(&mut conn)
.await?;
Ok(result == 1)
} else {
// No Redis, allow all requests
Ok(true)
}
}
/// Get current rate limit status
pub async fn get_rate_limit_status(
&self,
key: &str,
) -> CacheResult<RateLimitStatus> {
if let Some(redis) = &self.cache.redis {
let mut conn = redis.get_async_connection().await?;
let count: Option<u64> = redis::cmd("GET")
.arg(key)
.query_async(&mut conn)
.await?;
let ttl: Option<i64> = redis::cmd("TTL")
.arg(key)
.query_async(&mut conn)
.await?;
Ok(RateLimitStatus {
current_count: count.unwrap_or(0),
reset_in_seconds: ttl.unwrap_or(0) as u64,
})
} else {
Ok(RateLimitStatus {
current_count: 0,
reset_in_seconds: 0,
})
}
}
/// Reset rate limit for a key
pub async fn reset_rate_limit(&self, key: &str) -> CacheResult<()> {
self.cache.delete(key).await
}
}
/// Rate limit status
#[derive(Clone, Debug, serde::Serialize)]
pub struct RateLimitStatus {
pub current_count: u64,
pub reset_in_seconds: u64,
}
/// Rate limiter middleware for Axum
#[derive(Clone)]
pub struct RateLimitMiddleware {
limiter: RateLimiter,
config: RateLimitConfig,
}
impl RateLimitMiddleware {
pub fn new(limiter: RateLimiter, config: RateLimitConfig) -> Self {
Self { limiter, config }
}
/// Check rate limit for an IP address
pub async fn check_ip(&self, ip: &str) -> CacheResult<bool> {
let key = format!("ratelimit:ip:{}", ip);
self.limiter.check_rate_limit(&key, self.config.clone()).await
}
/// Check rate limit for a user
pub async fn check_user(&self, user_id: &str) -> CacheResult<bool> {
let key = format!("ratelimit:user:{}", user_id);
self.limiter.check_rate_limit(&key, self.config.clone()).await
}
/// Check rate limit for an API key
pub async fn check_api_key(&self, api_key: &str) -> CacheResult<bool> {
let key = format!("ratelimit:apikey:{}", api_key);
self.limiter.check_rate_limit(&key, self.config.clone()).await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_rate_limiter_creation() {
let cache = CacheLayer::new(None, 60);
let limiter = RateLimiter::new(cache);
assert_eq!(limiter.cache.redis.is_none(), true);
}
}

152
gateway/src/worker.rs Normal file
View File

@@ -0,0 +1,152 @@
### /Users/vlad/Developer/madapes/madbase/gateway/src/worker.rs
```rust
1: use axum::{
2: middleware::{from_fn_with_state},
3: routing::get,
4: Router,
5: };
6: use axum_prometheus::PrometheusMetricLayer;
7: use common::{init_pool, Config};
8: use crate::state::AppState;
9: use crate::middleware;
10: use sqlx::PgPool;
11: use std::collections::HashMap;
12: use std::net::SocketAddr;
13: use std::sync::Arc;
14: use std::time::Duration;
15: use tokio::sync::RwLock;
16: use tower_http::cors::{AllowOrigin, CorsLayer};
use axum::http::{HeaderValue, Method};
use axum::http::header;
17: use tower_http::trace::TraceLayer;
18:
19: async fn wait_for_db(db_url: &str) -> PgPool {
20: loop {
21: match init_pool(db_url).await {
22: Ok(pool) => return pool,
23: Err(e) => {
24: tracing::warn!("Database not ready yet, retrying in 2s: {}", e);
25: tokio::time::sleep(Duration::from_secs(2)).await;
26: }
27: }
28: }
29: }
30:
31: pub async fn run() -> anyhow::Result<()> {
32: let config = Config::new().expect("Failed to load configuration");
33:
34: tracing::info!("Starting MadBase Worker...");
35:
36: let pool = wait_for_db(&config.database_url).await;
37:
38: let app_state = AppState {
39: control_db: pool.clone(),
40: tenant_pools: Arc::new(RwLock::new(HashMap::new())),
41: };
42:
43: let auth_state = auth::AuthState {
44: db: pool.clone(),
45: config: config.clone(),
46: };
47:
48: let data_state = data_api::handlers::DataState {
49: db: pool.clone(),
50: config: config.clone(),
51: };
52:
53: let default_tenant_db_url = std::env::var("DEFAULT_TENANT_DB_URL")
54: .expect("DEFAULT_TENANT_DB_URL must be set");
55: let tenant_pool = wait_for_db(&default_tenant_db_url).await;
56:
57: let mut tenant_config = config.clone();
58: tenant_config.database_url = default_tenant_db_url.clone();
59:
60: // Realtime Init
61: let (realtime_router, realtime_state) = realtime::init(tenant_pool.clone(), tenant_config.clone());
62:
63: // Replication Listener
64: let repl_config = tenant_config.clone();
65: let repl_tx = realtime_state.broadcast_tx.clone();
66: tokio::spawn(async move {
67: if let Err(e) = realtime::replication::start_replication_listener(repl_config, repl_tx).await {
68: tracing::error!("Replication listener failed: {}", e);
69: }
70: });
71:
72: // Storage Init
73: let storage_router = storage::init(pool.clone(), config.clone()).await;
74:
75: // Functions Init
76: let functions_runtime = Arc::new(
77: functions::runtime::WasmRuntime::new()
78: .expect("Failed to initialize WASM runtime")
79: );
80: let deno_runtime = Arc::new(functions::deno_runtime::DenoRuntime::new());
81: let functions_state = functions::FunctionsState {
82: db: pool.clone(),
83: config: config.clone(),
84: runtime: functions_runtime,
85: deno_runtime,
86: };
87:
88: // Auth Middleware State
89: let auth_middleware_state = auth::AuthMiddlewareState {
90: config: config.clone(),
91: };
92:
93: // Project Middleware State
94: let project_middleware_state = middleware::ProjectMiddlewareState {
95: control_db: app_state.control_db.clone(),
96: tenant_pools: app_state.tenant_pools.clone(),
97: project_cache: moka::future::Cache::new(100),
98: };
99:
100: // Construct Worker Routes
101: let tenant_routes = Router::new()
102: .nest("/auth/v1", auth::router().with_state(auth_state))
103: .nest("/rest/v1", data_api::router().with_state(data_state))
104: .nest("/realtime/v1", realtime_router)
105: .nest("/storage/v1", storage_router)
106: .nest("/functions/v1", functions::router(functions_state))
107: .layer(from_fn_with_state(
108: auth_middleware_state,
109: auth::auth_middleware,
110: ))
111: .layer(from_fn_with_state(
112: project_middleware_state.clone(),
113: middleware::inject_tenant_pool,
114: ))
115: .layer(from_fn_with_state(
116: project_middleware_state,
117: middleware::resolve_project,
118: ));
119:
120: let (prometheus_layer, metric_handle) = PrometheusMetricLayer::pair();
121:
122: let app = Router::new()
123: .route("/health", get(|| async { "OK" }))
124: .route("/metrics", get(|| async move { metric_handle.render() }))
125: .route("/ready", get(|| async { "Ready" }))
126: .nest("/", tenant_routes)
127: .layer(
128: CorsLayer::new()
129: .allow_origin(Any)
130: .allow_methods(Any)
131: .allow_headers(Any),
132: )
133: .layer(TraceLayer::new_for_http())
134: .layer(prometheus_layer);
135:
136: let port = std::env::var("WORKER_PORT")
137: .unwrap_or_else(|_| "8002".to_string())
138: .parse::<u16>()?;
139:
140: let addr = SocketAddr::from(([0, 0, 0, 0], port));
141: tracing::info!("Worker listening on {}", addr);
142:
143: let listener = tokio::net::TcpListener::bind(addr).await?;
144: axum::serve(listener, app.into_make_service_with_connect_info::<SocketAddr>()).await?;
145:
146: Ok(())
147: }
```