Files
cloudlysis/gateway/src/upstream.rs
Vlad Durnea 90c307016d
Some checks failed
ci / rust (push) Failing after 2m21s
ci / ui (push) Failing after 28s
images / build-and-push (push) Failing after 18s
transport: complete M0–M7
shared: add stream+consumer policy helpers; NATS context header builder

aggregate/runner/projection: centralize stream validation and header usage; set bounded consumer params

projection: add QueryService gRPC and wire into main; settings include PROJECTION_GRPC_ADDR

gateway: gRPC routing to Projection/Runner with deadlines; bounded read-only retries; pooled gRPC channels (bounded LRU+TTL); admin proxy forwards to gRPC; probes use concurrency limiter + TTL cache

runner: add RunnerAdmin gRPC server (drain, status, reload) and wire into main; settings include RUNNER_GRPC_ADDR

tests: add gateway authz for runner admin, projection tenant isolation, runner admin drain semantics

docs: update TRANSPORT_DEVELOPMENT_PLAN to reflect completed milestones and details
2026-03-30 14:24:14 +03:00

270 lines
8.2 KiB
Rust

use std::collections::HashMap;
use std::sync::{Mutex, OnceLock};
use std::time::{Duration, Instant};
pub fn http_client() -> &'static reqwest::Client {
static CLIENT: OnceLock<reqwest::Client> = OnceLock::new();
CLIENT.get_or_init(|| {
let mut builder = reqwest::Client::builder().timeout(Duration::from_secs(10));
if let Some(ca_pem) = env_or_file(
"GATEWAY_INTERNAL_CA_CERT_PEM",
"GATEWAY_INTERNAL_CA_CERT_PEM_FILE",
) {
if let Ok(cert) = reqwest::Certificate::from_pem(ca_pem.as_bytes()) {
builder = builder.add_root_certificate(cert);
}
}
if let Some(identity_pem) = env_or_file(
"GATEWAY_INTERNAL_IDENTITY_PEM",
"GATEWAY_INTERNAL_IDENTITY_PEM_FILE",
) {
if let Ok(identity) = reqwest::Identity::from_pem(identity_pem.as_bytes()) {
builder = builder.identity(identity);
}
}
builder.build().expect("failed to build reqwest client")
})
}
pub fn grpc_endpoint(url: &str) -> Result<tonic::transport::Endpoint, tonic::transport::Error> {
let mut endpoint =
tonic::transport::Endpoint::from_shared(url.to_string())?.timeout(Duration::from_secs(10));
let wants_tls = url.starts_with("https://")
|| std::env::var("GATEWAY_INTERNAL_GRPC_TLS")
.ok()
.map(|v| matches!(v.trim().to_ascii_lowercase().as_str(), "1" | "true" | "yes"))
.unwrap_or(false);
if wants_tls {
if let Some(tls) = grpc_tls_config() {
endpoint = endpoint.tls_config(tls)?;
}
}
Ok(endpoint)
}
pub fn grpc_channel(url: &str) -> Result<tonic::transport::Channel, tonic::transport::Error> {
const MAX_CHANNELS: usize = 64;
const TTL: Duration = Duration::from_secs(300);
static CACHE: OnceLock<Mutex<HashMap<String, (tonic::transport::Channel, Instant)>>> =
OnceLock::new();
let cache = CACHE.get_or_init(|| Mutex::new(HashMap::new()));
if let Ok(mut guard) = cache.lock() {
if let Some((channel, last_used)) = guard.get_mut(url) {
if last_used.elapsed() < TTL {
*last_used = Instant::now();
return Ok(channel.clone());
}
}
let endpoint = grpc_endpoint(url)?;
let channel = endpoint.connect_lazy();
if guard.len() >= MAX_CHANNELS {
let mut oldest_key = None;
let mut oldest_at = Instant::now();
for (k, (_, last_used)) in guard.iter() {
if oldest_key.is_none() || *last_used < oldest_at {
oldest_key = Some(k.clone());
oldest_at = *last_used;
}
}
if let Some(key) = oldest_key {
guard.remove(&key);
}
}
guard.insert(url.to_string(), (channel.clone(), Instant::now()));
Ok(channel)
} else {
let endpoint = grpc_endpoint(url)?;
Ok(endpoint.connect_lazy())
}
}
pub async fn probe_status_ok(
url: &str,
headers: &[(&str, &str)],
timeout: Duration,
cache_ttl: Duration,
) -> Result<bool, reqwest::Error> {
const MAX_ENTRIES: usize = 256;
static SEM: OnceLock<tokio::sync::Semaphore> = OnceLock::new();
static CACHE: OnceLock<Mutex<HashMap<String, (bool, Instant)>>> = OnceLock::new();
let sem = SEM.get_or_init(|| tokio::sync::Semaphore::new(32));
let cache = CACHE.get_or_init(|| Mutex::new(HashMap::new()));
if cache_ttl > Duration::ZERO {
if let Ok(mut guard) = cache.lock() {
if let Some((value, last_used)) = guard.get_mut(url) {
if last_used.elapsed() < cache_ttl {
*last_used = Instant::now();
return Ok(*value);
}
}
}
}
let _permit = sem.acquire().await.expect("probe semaphore closed");
if cache_ttl > Duration::ZERO {
if let Ok(mut guard) = cache.lock() {
if let Some((value, last_used)) = guard.get_mut(url) {
if last_used.elapsed() < cache_ttl {
*last_used = Instant::now();
return Ok(*value);
}
}
}
}
let client = http_client();
let mut req = client.get(url).timeout(timeout);
for (k, v) in headers {
req = req.header(*k, *v);
}
let ok = req.send().await.map(|r| r.status().is_success())?;
if cache_ttl > Duration::ZERO {
if let Ok(mut guard) = cache.lock() {
if guard.len() >= MAX_ENTRIES {
evict_oldest(&mut guard);
}
guard.insert(url.to_string(), (ok, Instant::now()));
}
}
Ok(ok)
}
pub async fn probe_text(
url: &str,
headers: &[(&str, &str)],
timeout: Duration,
cache_ttl: Duration,
) -> Result<String, reqwest::Error> {
const MAX_ENTRIES: usize = 128;
static SEM: OnceLock<tokio::sync::Semaphore> = OnceLock::new();
static CACHE: OnceLock<Mutex<HashMap<String, (String, Instant)>>> = OnceLock::new();
let sem = SEM.get_or_init(|| tokio::sync::Semaphore::new(16));
let cache = CACHE.get_or_init(|| Mutex::new(HashMap::new()));
if cache_ttl > Duration::ZERO {
if let Ok(mut guard) = cache.lock() {
if let Some((value, last_used)) = guard.get_mut(url) {
if last_used.elapsed() < cache_ttl {
*last_used = Instant::now();
return Ok(value.clone());
}
}
}
}
let _permit = sem.acquire().await.expect("probe semaphore closed");
if cache_ttl > Duration::ZERO {
if let Ok(mut guard) = cache.lock() {
if let Some((value, last_used)) = guard.get_mut(url) {
if last_used.elapsed() < cache_ttl {
*last_used = Instant::now();
return Ok(value.clone());
}
}
}
}
let client = http_client();
let mut req = client.get(url).timeout(timeout);
for (k, v) in headers {
req = req.header(*k, *v);
}
let text = req.send().await?.text().await?;
if cache_ttl > Duration::ZERO {
if let Ok(mut guard) = cache.lock() {
if guard.len() >= MAX_ENTRIES {
evict_oldest(&mut guard);
}
guard.insert(url.to_string(), (text.clone(), Instant::now()));
}
}
Ok(text)
}
fn evict_oldest<T: Clone>(map: &mut HashMap<String, (T, Instant)>) {
let mut oldest_key = None;
let mut oldest_at = Instant::now();
for (k, (_, last_used)) in map.iter() {
if oldest_key.is_none() || *last_used < oldest_at {
oldest_key = Some(k.clone());
oldest_at = *last_used;
}
}
if let Some(key) = oldest_key {
map.remove(&key);
}
}
fn grpc_tls_config() -> Option<tonic::transport::ClientTlsConfig> {
let mut tls = tonic::transport::ClientTlsConfig::new();
let mut configured = false;
if let Some(ca_pem) = env_or_file(
"GATEWAY_INTERNAL_GRPC_CA_CERT_PEM",
"GATEWAY_INTERNAL_GRPC_CA_CERT_PEM_FILE",
) {
tls = tls.ca_certificate(tonic::transport::Certificate::from_pem(ca_pem));
configured = true;
}
let cert_pem = env_or_file(
"GATEWAY_INTERNAL_GRPC_CLIENT_CERT_PEM",
"GATEWAY_INTERNAL_GRPC_CLIENT_CERT_PEM_FILE",
);
let key_pem = env_or_file(
"GATEWAY_INTERNAL_GRPC_CLIENT_KEY_PEM",
"GATEWAY_INTERNAL_GRPC_CLIENT_KEY_PEM_FILE",
);
if let (Some(cert_pem), Some(key_pem)) = (cert_pem, key_pem) {
tls = tls.identity(tonic::transport::Identity::from_pem(cert_pem, key_pem));
configured = true;
}
if configured {
Some(tls)
} else {
None
}
}
fn env_or_file(env_key: &str, file_env_key: &str) -> Option<String> {
if let Ok(path) = std::env::var(file_env_key) {
if let Ok(raw) = std::fs::read_to_string(path) {
let trimmed = raw.trim().to_string();
if !trimmed.is_empty() {
return Some(trimmed);
}
}
}
std::env::var(env_key).ok().and_then(|v| {
let trimmed = v.trim().to_string();
if trimmed.is_empty() {
None
} else {
Some(trimmed)
}
})
}