use std::collections::HashMap; use std::sync::{Mutex, OnceLock}; use std::time::{Duration, Instant}; pub fn http_client() -> &'static reqwest::Client { static CLIENT: OnceLock = OnceLock::new(); CLIENT.get_or_init(|| { let mut builder = reqwest::Client::builder().timeout(Duration::from_secs(10)); if let Some(ca_pem) = env_or_file( "GATEWAY_INTERNAL_CA_CERT_PEM", "GATEWAY_INTERNAL_CA_CERT_PEM_FILE", ) { if let Ok(cert) = reqwest::Certificate::from_pem(ca_pem.as_bytes()) { builder = builder.add_root_certificate(cert); } } if let Some(identity_pem) = env_or_file( "GATEWAY_INTERNAL_IDENTITY_PEM", "GATEWAY_INTERNAL_IDENTITY_PEM_FILE", ) { if let Ok(identity) = reqwest::Identity::from_pem(identity_pem.as_bytes()) { builder = builder.identity(identity); } } builder.build().expect("failed to build reqwest client") }) } pub fn grpc_endpoint(url: &str) -> Result { let mut endpoint = tonic::transport::Endpoint::from_shared(url.to_string())?.timeout(Duration::from_secs(10)); let wants_tls = url.starts_with("https://") || std::env::var("GATEWAY_INTERNAL_GRPC_TLS") .ok() .map(|v| matches!(v.trim().to_ascii_lowercase().as_str(), "1" | "true" | "yes")) .unwrap_or(false); if wants_tls { if let Some(tls) = grpc_tls_config() { endpoint = endpoint.tls_config(tls)?; } } Ok(endpoint) } pub fn grpc_channel(url: &str) -> Result { const MAX_CHANNELS: usize = 64; const TTL: Duration = Duration::from_secs(300); static CACHE: OnceLock>> = OnceLock::new(); let cache = CACHE.get_or_init(|| Mutex::new(HashMap::new())); if let Ok(mut guard) = cache.lock() { if let Some((channel, last_used)) = guard.get_mut(url) { if last_used.elapsed() < TTL { *last_used = Instant::now(); return Ok(channel.clone()); } } let endpoint = grpc_endpoint(url)?; let channel = endpoint.connect_lazy(); if guard.len() >= MAX_CHANNELS { let mut oldest_key = None; let mut oldest_at = Instant::now(); for (k, (_, last_used)) in guard.iter() { if oldest_key.is_none() || *last_used < oldest_at { oldest_key = Some(k.clone()); oldest_at = *last_used; } } if let Some(key) = oldest_key { guard.remove(&key); } } guard.insert(url.to_string(), (channel.clone(), Instant::now())); Ok(channel) } else { let endpoint = grpc_endpoint(url)?; Ok(endpoint.connect_lazy()) } } pub async fn probe_status_ok( url: &str, headers: &[(&str, &str)], timeout: Duration, cache_ttl: Duration, ) -> Result { const MAX_ENTRIES: usize = 256; static SEM: OnceLock = OnceLock::new(); static CACHE: OnceLock>> = OnceLock::new(); let sem = SEM.get_or_init(|| tokio::sync::Semaphore::new(32)); let cache = CACHE.get_or_init(|| Mutex::new(HashMap::new())); if cache_ttl > Duration::ZERO { if let Ok(mut guard) = cache.lock() { if let Some((value, last_used)) = guard.get_mut(url) { if last_used.elapsed() < cache_ttl { *last_used = Instant::now(); return Ok(*value); } } } } let _permit = sem.acquire().await.expect("probe semaphore closed"); if cache_ttl > Duration::ZERO { if let Ok(mut guard) = cache.lock() { if let Some((value, last_used)) = guard.get_mut(url) { if last_used.elapsed() < cache_ttl { *last_used = Instant::now(); return Ok(*value); } } } } let client = http_client(); let mut req = client.get(url).timeout(timeout); for (k, v) in headers { req = req.header(*k, *v); } let ok = req.send().await.map(|r| r.status().is_success())?; if cache_ttl > Duration::ZERO { if let Ok(mut guard) = cache.lock() { if guard.len() >= MAX_ENTRIES { evict_oldest(&mut guard); } guard.insert(url.to_string(), (ok, Instant::now())); } } Ok(ok) } pub async fn probe_text( url: &str, headers: &[(&str, &str)], timeout: Duration, cache_ttl: Duration, ) -> Result { const MAX_ENTRIES: usize = 128; static SEM: OnceLock = OnceLock::new(); static CACHE: OnceLock>> = OnceLock::new(); let sem = SEM.get_or_init(|| tokio::sync::Semaphore::new(16)); let cache = CACHE.get_or_init(|| Mutex::new(HashMap::new())); if cache_ttl > Duration::ZERO { if let Ok(mut guard) = cache.lock() { if let Some((value, last_used)) = guard.get_mut(url) { if last_used.elapsed() < cache_ttl { *last_used = Instant::now(); return Ok(value.clone()); } } } } let _permit = sem.acquire().await.expect("probe semaphore closed"); if cache_ttl > Duration::ZERO { if let Ok(mut guard) = cache.lock() { if let Some((value, last_used)) = guard.get_mut(url) { if last_used.elapsed() < cache_ttl { *last_used = Instant::now(); return Ok(value.clone()); } } } } let client = http_client(); let mut req = client.get(url).timeout(timeout); for (k, v) in headers { req = req.header(*k, *v); } let text = req.send().await?.text().await?; if cache_ttl > Duration::ZERO { if let Ok(mut guard) = cache.lock() { if guard.len() >= MAX_ENTRIES { evict_oldest(&mut guard); } guard.insert(url.to_string(), (text.clone(), Instant::now())); } } Ok(text) } fn evict_oldest(map: &mut HashMap) { let mut oldest_key = None; let mut oldest_at = Instant::now(); for (k, (_, last_used)) in map.iter() { if oldest_key.is_none() || *last_used < oldest_at { oldest_key = Some(k.clone()); oldest_at = *last_used; } } if let Some(key) = oldest_key { map.remove(&key); } } fn grpc_tls_config() -> Option { let mut tls = tonic::transport::ClientTlsConfig::new(); let mut configured = false; if let Some(ca_pem) = env_or_file( "GATEWAY_INTERNAL_GRPC_CA_CERT_PEM", "GATEWAY_INTERNAL_GRPC_CA_CERT_PEM_FILE", ) { tls = tls.ca_certificate(tonic::transport::Certificate::from_pem(ca_pem)); configured = true; } let cert_pem = env_or_file( "GATEWAY_INTERNAL_GRPC_CLIENT_CERT_PEM", "GATEWAY_INTERNAL_GRPC_CLIENT_CERT_PEM_FILE", ); let key_pem = env_or_file( "GATEWAY_INTERNAL_GRPC_CLIENT_KEY_PEM", "GATEWAY_INTERNAL_GRPC_CLIENT_KEY_PEM_FILE", ); if let (Some(cert_pem), Some(key_pem)) = (cert_pem, key_pem) { tls = tls.identity(tonic::transport::Identity::from_pem(cert_pem, key_pem)); configured = true; } if configured { Some(tls) } else { None } } fn env_or_file(env_key: &str, file_env_key: &str) -> Option { if let Ok(path) = std::env::var(file_env_key) { if let Ok(raw) = std::fs::read_to_string(path) { let trimmed = raw.trim().to_string(); if !trimmed.is_empty() { return Some(trimmed); } } } std::env::var(env_key).ok().and_then(|v| { let trimmed = v.trim().to_string(); if trimmed.is_empty() { None } else { Some(trimmed) } }) }