shared: add stream+consumer policy helpers; NATS context header builder aggregate/runner/projection: centralize stream validation and header usage; set bounded consumer params projection: add QueryService gRPC and wire into main; settings include PROJECTION_GRPC_ADDR gateway: gRPC routing to Projection/Runner with deadlines; bounded read-only retries; pooled gRPC channels (bounded LRU+TTL); admin proxy forwards to gRPC; probes use concurrency limiter + TTL cache runner: add RunnerAdmin gRPC server (drain, status, reload) and wire into main; settings include RUNNER_GRPC_ADDR tests: add gateway authz for runner admin, projection tenant isolation, runner admin drain semantics docs: update TRANSPORT_DEVELOPMENT_PLAN to reflect completed milestones and details
270 lines
8.2 KiB
Rust
270 lines
8.2 KiB
Rust
use std::collections::HashMap;
|
|
use std::sync::{Mutex, OnceLock};
|
|
use std::time::{Duration, Instant};
|
|
|
|
pub fn http_client() -> &'static reqwest::Client {
|
|
static CLIENT: OnceLock<reqwest::Client> = OnceLock::new();
|
|
CLIENT.get_or_init(|| {
|
|
let mut builder = reqwest::Client::builder().timeout(Duration::from_secs(10));
|
|
|
|
if let Some(ca_pem) = env_or_file(
|
|
"GATEWAY_INTERNAL_CA_CERT_PEM",
|
|
"GATEWAY_INTERNAL_CA_CERT_PEM_FILE",
|
|
) {
|
|
if let Ok(cert) = reqwest::Certificate::from_pem(ca_pem.as_bytes()) {
|
|
builder = builder.add_root_certificate(cert);
|
|
}
|
|
}
|
|
|
|
if let Some(identity_pem) = env_or_file(
|
|
"GATEWAY_INTERNAL_IDENTITY_PEM",
|
|
"GATEWAY_INTERNAL_IDENTITY_PEM_FILE",
|
|
) {
|
|
if let Ok(identity) = reqwest::Identity::from_pem(identity_pem.as_bytes()) {
|
|
builder = builder.identity(identity);
|
|
}
|
|
}
|
|
|
|
builder.build().expect("failed to build reqwest client")
|
|
})
|
|
}
|
|
|
|
pub fn grpc_endpoint(url: &str) -> Result<tonic::transport::Endpoint, tonic::transport::Error> {
|
|
let mut endpoint =
|
|
tonic::transport::Endpoint::from_shared(url.to_string())?.timeout(Duration::from_secs(10));
|
|
|
|
let wants_tls = url.starts_with("https://")
|
|
|| std::env::var("GATEWAY_INTERNAL_GRPC_TLS")
|
|
.ok()
|
|
.map(|v| matches!(v.trim().to_ascii_lowercase().as_str(), "1" | "true" | "yes"))
|
|
.unwrap_or(false);
|
|
|
|
if wants_tls {
|
|
if let Some(tls) = grpc_tls_config() {
|
|
endpoint = endpoint.tls_config(tls)?;
|
|
}
|
|
}
|
|
|
|
Ok(endpoint)
|
|
}
|
|
|
|
pub fn grpc_channel(url: &str) -> Result<tonic::transport::Channel, tonic::transport::Error> {
|
|
const MAX_CHANNELS: usize = 64;
|
|
const TTL: Duration = Duration::from_secs(300);
|
|
|
|
static CACHE: OnceLock<Mutex<HashMap<String, (tonic::transport::Channel, Instant)>>> =
|
|
OnceLock::new();
|
|
let cache = CACHE.get_or_init(|| Mutex::new(HashMap::new()));
|
|
|
|
if let Ok(mut guard) = cache.lock() {
|
|
if let Some((channel, last_used)) = guard.get_mut(url) {
|
|
if last_used.elapsed() < TTL {
|
|
*last_used = Instant::now();
|
|
return Ok(channel.clone());
|
|
}
|
|
}
|
|
|
|
let endpoint = grpc_endpoint(url)?;
|
|
let channel = endpoint.connect_lazy();
|
|
|
|
if guard.len() >= MAX_CHANNELS {
|
|
let mut oldest_key = None;
|
|
let mut oldest_at = Instant::now();
|
|
for (k, (_, last_used)) in guard.iter() {
|
|
if oldest_key.is_none() || *last_used < oldest_at {
|
|
oldest_key = Some(k.clone());
|
|
oldest_at = *last_used;
|
|
}
|
|
}
|
|
if let Some(key) = oldest_key {
|
|
guard.remove(&key);
|
|
}
|
|
}
|
|
|
|
guard.insert(url.to_string(), (channel.clone(), Instant::now()));
|
|
Ok(channel)
|
|
} else {
|
|
let endpoint = grpc_endpoint(url)?;
|
|
Ok(endpoint.connect_lazy())
|
|
}
|
|
}
|
|
|
|
pub async fn probe_status_ok(
|
|
url: &str,
|
|
headers: &[(&str, &str)],
|
|
timeout: Duration,
|
|
cache_ttl: Duration,
|
|
) -> Result<bool, reqwest::Error> {
|
|
const MAX_ENTRIES: usize = 256;
|
|
|
|
static SEM: OnceLock<tokio::sync::Semaphore> = OnceLock::new();
|
|
static CACHE: OnceLock<Mutex<HashMap<String, (bool, Instant)>>> = OnceLock::new();
|
|
|
|
let sem = SEM.get_or_init(|| tokio::sync::Semaphore::new(32));
|
|
let cache = CACHE.get_or_init(|| Mutex::new(HashMap::new()));
|
|
|
|
if cache_ttl > Duration::ZERO {
|
|
if let Ok(mut guard) = cache.lock() {
|
|
if let Some((value, last_used)) = guard.get_mut(url) {
|
|
if last_used.elapsed() < cache_ttl {
|
|
*last_used = Instant::now();
|
|
return Ok(*value);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
let _permit = sem.acquire().await.expect("probe semaphore closed");
|
|
|
|
if cache_ttl > Duration::ZERO {
|
|
if let Ok(mut guard) = cache.lock() {
|
|
if let Some((value, last_used)) = guard.get_mut(url) {
|
|
if last_used.elapsed() < cache_ttl {
|
|
*last_used = Instant::now();
|
|
return Ok(*value);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
let client = http_client();
|
|
let mut req = client.get(url).timeout(timeout);
|
|
for (k, v) in headers {
|
|
req = req.header(*k, *v);
|
|
}
|
|
let ok = req.send().await.map(|r| r.status().is_success())?;
|
|
|
|
if cache_ttl > Duration::ZERO {
|
|
if let Ok(mut guard) = cache.lock() {
|
|
if guard.len() >= MAX_ENTRIES {
|
|
evict_oldest(&mut guard);
|
|
}
|
|
guard.insert(url.to_string(), (ok, Instant::now()));
|
|
}
|
|
}
|
|
|
|
Ok(ok)
|
|
}
|
|
|
|
pub async fn probe_text(
|
|
url: &str,
|
|
headers: &[(&str, &str)],
|
|
timeout: Duration,
|
|
cache_ttl: Duration,
|
|
) -> Result<String, reqwest::Error> {
|
|
const MAX_ENTRIES: usize = 128;
|
|
|
|
static SEM: OnceLock<tokio::sync::Semaphore> = OnceLock::new();
|
|
static CACHE: OnceLock<Mutex<HashMap<String, (String, Instant)>>> = OnceLock::new();
|
|
|
|
let sem = SEM.get_or_init(|| tokio::sync::Semaphore::new(16));
|
|
let cache = CACHE.get_or_init(|| Mutex::new(HashMap::new()));
|
|
|
|
if cache_ttl > Duration::ZERO {
|
|
if let Ok(mut guard) = cache.lock() {
|
|
if let Some((value, last_used)) = guard.get_mut(url) {
|
|
if last_used.elapsed() < cache_ttl {
|
|
*last_used = Instant::now();
|
|
return Ok(value.clone());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
let _permit = sem.acquire().await.expect("probe semaphore closed");
|
|
|
|
if cache_ttl > Duration::ZERO {
|
|
if let Ok(mut guard) = cache.lock() {
|
|
if let Some((value, last_used)) = guard.get_mut(url) {
|
|
if last_used.elapsed() < cache_ttl {
|
|
*last_used = Instant::now();
|
|
return Ok(value.clone());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
let client = http_client();
|
|
let mut req = client.get(url).timeout(timeout);
|
|
for (k, v) in headers {
|
|
req = req.header(*k, *v);
|
|
}
|
|
let text = req.send().await?.text().await?;
|
|
|
|
if cache_ttl > Duration::ZERO {
|
|
if let Ok(mut guard) = cache.lock() {
|
|
if guard.len() >= MAX_ENTRIES {
|
|
evict_oldest(&mut guard);
|
|
}
|
|
guard.insert(url.to_string(), (text.clone(), Instant::now()));
|
|
}
|
|
}
|
|
|
|
Ok(text)
|
|
}
|
|
|
|
fn evict_oldest<T: Clone>(map: &mut HashMap<String, (T, Instant)>) {
|
|
let mut oldest_key = None;
|
|
let mut oldest_at = Instant::now();
|
|
for (k, (_, last_used)) in map.iter() {
|
|
if oldest_key.is_none() || *last_used < oldest_at {
|
|
oldest_key = Some(k.clone());
|
|
oldest_at = *last_used;
|
|
}
|
|
}
|
|
if let Some(key) = oldest_key {
|
|
map.remove(&key);
|
|
}
|
|
}
|
|
|
|
fn grpc_tls_config() -> Option<tonic::transport::ClientTlsConfig> {
|
|
let mut tls = tonic::transport::ClientTlsConfig::new();
|
|
let mut configured = false;
|
|
|
|
if let Some(ca_pem) = env_or_file(
|
|
"GATEWAY_INTERNAL_GRPC_CA_CERT_PEM",
|
|
"GATEWAY_INTERNAL_GRPC_CA_CERT_PEM_FILE",
|
|
) {
|
|
tls = tls.ca_certificate(tonic::transport::Certificate::from_pem(ca_pem));
|
|
configured = true;
|
|
}
|
|
|
|
let cert_pem = env_or_file(
|
|
"GATEWAY_INTERNAL_GRPC_CLIENT_CERT_PEM",
|
|
"GATEWAY_INTERNAL_GRPC_CLIENT_CERT_PEM_FILE",
|
|
);
|
|
let key_pem = env_or_file(
|
|
"GATEWAY_INTERNAL_GRPC_CLIENT_KEY_PEM",
|
|
"GATEWAY_INTERNAL_GRPC_CLIENT_KEY_PEM_FILE",
|
|
);
|
|
if let (Some(cert_pem), Some(key_pem)) = (cert_pem, key_pem) {
|
|
tls = tls.identity(tonic::transport::Identity::from_pem(cert_pem, key_pem));
|
|
configured = true;
|
|
}
|
|
|
|
if configured {
|
|
Some(tls)
|
|
} else {
|
|
None
|
|
}
|
|
}
|
|
|
|
fn env_or_file(env_key: &str, file_env_key: &str) -> Option<String> {
|
|
if let Ok(path) = std::env::var(file_env_key) {
|
|
if let Ok(raw) = std::fs::read_to_string(path) {
|
|
let trimmed = raw.trim().to_string();
|
|
if !trimmed.is_empty() {
|
|
return Some(trimmed);
|
|
}
|
|
}
|
|
}
|
|
std::env::var(env_key).ok().and_then(|v| {
|
|
let trimmed = v.trim().to_string();
|
|
if trimmed.is_empty() {
|
|
None
|
|
} else {
|
|
Some(trimmed)
|
|
}
|
|
})
|
|
}
|