Files
madbase/functions/src/deno_runtime.rs
Vlad Durnea a66d908eff
Some checks failed
CI / podman-build (push) Has been cancelled
CI / rust (push) Has been cancelled
chore: full stack stability and migration fixes, plus react UI progress
2026-03-18 09:01:38 +02:00

593 lines
24 KiB
Rust

use anyhow::Result;
use deno_core::{JsRuntime, RuntimeOptions, v8, ModuleLoader, ModuleSource, ModuleSourceCode, ModuleType, ModuleLoadResponse, RequestedModuleType};
use serde_json::json;
use deno_ast::{ParseParams, MediaType};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::rc::Rc;
use std::sync::Arc;
deno_core::extension!(
madbase_runtime,
ops = [op_fetch],
);
#[deno_core::op2(async)]
#[serde]
async fn op_fetch(
#[string] url: String,
#[string] method: String,
#[serde] headers: HashMap<String, String>,
#[serde] body: Option<serde_json::Value>,
) -> Result<serde_json::Value, deno_core::error::AnyError> {
let client = reqwest::Client::new();
let mut builder = match method.to_uppercase().as_str() {
"POST" => client.post(&url),
"PUT" => client.put(&url),
"DELETE" => client.delete(&url),
_ => client.get(&url),
};
for (k, v) in headers {
builder = builder.header(k, v);
}
if let Some(b) = body {
builder = builder.json(&b);
}
let res = builder.send().await?;
let status = res.status().as_u16();
let mut res_headers = HashMap::new();
for (k, v) in res.headers() {
res_headers.insert(k.to_string(), v.to_str().unwrap_or("").to_string());
}
let text = res.text().await?;
Ok(json!({
"status": status,
"headers": res_headers,
"body": text
}))
}
struct SandboxedModuleLoader {
allowed_dir: PathBuf,
}
impl ModuleLoader for SandboxedModuleLoader {
fn resolve(&self, specifier: &str, referrer: &str, _kind: deno_core::ResolutionKind) -> Result<deno_core::ModuleSpecifier, anyhow::Error> {
let resolved = deno_core::resolve_import(specifier, referrer)?;
if resolved.scheme() == "file" {
let path = resolved.to_file_path().map_err(|_| anyhow::anyhow!("Invalid file path"))?;
let canonical = path.canonicalize().unwrap_or_else(|_| path.clone());
if !canonical.starts_with(&self.allowed_dir) {
return Err(anyhow::anyhow!("Import blocked: {} is outside allowed directory", specifier));
}
}
if resolved.scheme() != "file" && resolved.scheme() != "https" && resolved.scheme() != "http" {
return Err(anyhow::anyhow!("Blocked import scheme: {}", resolved.scheme()));
}
Ok(resolved)
}
fn load(&self, specifier: &deno_core::ModuleSpecifier, _maybe_referrer: Option<&deno_core::ModuleSpecifier>, _is_dynamic: bool, _requested_module_type: RequestedModuleType) -> ModuleLoadResponse {
let specifier = specifier.clone();
if specifier.scheme() == "file" {
let path = specifier.to_file_path().unwrap();
ModuleLoadResponse::Async(Box::pin(async move {
let code = tokio::fs::read_to_string(&path).await?;
let is_ts = path.extension().is_some_and(|ext| ext == "ts");
let transformed = if is_ts {
DenoRuntime::transpile(&code, &path)?
} else {
code
};
Ok(ModuleSource::new(
ModuleType::JavaScript,
ModuleSourceCode::String(transformed.into()),
&specifier,
None,
))
}))
} else {
ModuleLoadResponse::Async(Box::pin(async move {
Err(anyhow::anyhow!("Remote imports not fully implemented in loader yet"))
}))
}
}
}
extern "C" fn near_heap_limit_callback(
data: *mut std::ffi::c_void,
current_limit: usize,
_initial_limit: usize,
) -> usize {
if !data.is_null() {
// SAFETY: data is a *mut v8::Isolate passed from the same thread
let isolate = unsafe { &mut *(data as *mut v8::Isolate) };
isolate.terminate_execution();
}
// Give a small amount of extra room so V8 can wind down gracefully
// instead of calling FatalProcessOutOfMemory
current_limit + 4 * 1024 * 1024
}
pub struct DenoRuntime {}
impl Default for DenoRuntime {
fn default() -> Self {
Self::new()
}
}
impl DenoRuntime {
pub fn new() -> Self {
Self {}
}
pub fn transpile(code: &str, path: &Path) -> Result<String> {
let media_type = MediaType::from_path(path);
let specifier = deno_core::url::Url::parse(&format!("file://{}", path.display()))
.unwrap_or_else(|_| deno_core::url::Url::parse("file:///index.ts").unwrap());
let parsed = deno_ast::parse_module(ParseParams {
specifier,
text: Arc::from(code),
media_type,
capture_tokens: false,
scope_analysis: false,
maybe_syntax: None,
})?;
let transpiled = parsed.transpile(
&Default::default(),
&Default::default(),
&Default::default(),
)?;
Ok(transpiled.into_source().text)
}
pub async fn execute(&self, code: String, payload: Option<serde_json::Value>, headers: HashMap<String, String>, env_vars: HashMap<String, String>) -> Result<(String, String, u16, HashMap<String, String>, Vec<serde_json::Value>)> {
let timeout_secs = std::env::var("FUNCTION_TIMEOUT_SECS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(30u64);
let (tx, rx) = tokio::sync::oneshot::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
rt.block_on(async {
let result = Self::execute_inner(code, payload, headers, env_vars).await;
let _ = tx.send(result);
});
});
match tokio::time::timeout(
std::time::Duration::from_secs(timeout_secs),
rx,
).await {
Ok(Ok(result)) => result,
Ok(Err(_)) => Err(anyhow::anyhow!("Deno execution thread panicked")),
Err(_) => Err(anyhow::anyhow!("Function execution timed out after {}s", timeout_secs)),
}
}
pub(crate) async fn execute_inner(mut code: String, payload: Option<serde_json::Value>, headers: HashMap<String, String>, env_vars: HashMap<String, String>) -> Result<(String, String, u16, HashMap<String, String>, Vec<serde_json::Value>)> {
let allowed_dir = PathBuf::from("/tmp/madbase_functions");
if !allowed_dir.exists() {
let _ = std::fs::create_dir_all(&allowed_dir);
}
// Transpile entry code if it looks like TS (or we can just always try)
if code.contains(':') || code.contains("type ") || code.contains("interface ") {
if let Ok(transformed) = Self::transpile(&code, Path::new("index.ts")) {
code = transformed;
}
}
let mut runtime = JsRuntime::new(RuntimeOptions {
module_loader: Some(Rc::new(SandboxedModuleLoader { allowed_dir })),
create_params: Some(v8::CreateParams::default().heap_limits(0, 128 * 1024 * 1024)),
extensions: vec![madbase_runtime::init_ops()],
..Default::default()
});
let isolate = runtime.v8_isolate();
let isolate_ptr: *mut v8::Isolate = &mut **isolate;
// SAFETY: the callback runs on the same thread as the isolate
isolate.add_near_heap_limit_callback(near_heap_limit_callback, isolate_ptr as *mut std::ffi::c_void);
let env_json = serde_json::to_string(&env_vars)?;
runtime.execute_script("<env>", format!("globalThis._env = JSON.parse('{}');", env_json))?;
let preamble = r#"
globalThis.__logs__ = [];
globalThis.console = {
log: (...args) => {
const msg = args.map(a => typeof a === 'object' ? JSON.stringify(a) : String(a)).join(" ");
globalThis.__logs__.push({ level: "info", msg, ts: Date.now() });
Deno.core.print(msg + "\n");
},
error: (...args) => {
const msg = args.map(a => typeof a === 'object' ? JSON.stringify(a) : String(a)).join(" ");
globalThis.__logs__.push({ level: "error", msg, ts: Date.now() });
Deno.core.print("[ERROR] " + msg + "\n", true);
},
warn: (...args) => {
const msg = args.map(a => typeof a === 'object' ? JSON.stringify(a) : String(a)).join(" ");
globalThis.__logs__.push({ level: "warn", msg, ts: Date.now() });
Deno.core.print("[WARN] " + msg + "\n");
}
};
class Headers {
constructor(init) {
this.map = new Map();
if (init) {
if (init instanceof Headers) {
init.forEach((v, k) => this.map.set(k.toLowerCase(), v));
} else if (Array.isArray(init)) {
init.forEach(([k, v]) => this.map.set(k.toLowerCase(), v));
} else {
Object.entries(init).forEach(([k, v]) => this.map.set(k.toLowerCase(), v));
}
}
}
get(key) { return this.map.get(key.toLowerCase()) || null; }
set(key, value) { this.map.set(key.toLowerCase(), value); }
has(key) { return this.map.has(key.toLowerCase()); }
forEach(callback) { this.map.forEach(callback); }
entries() { return this.map.entries(); }
}
globalThis.Headers = Headers;
globalThis.Response = class Response {
constructor(body, init) {
this.body = body;
this.status = init?.status || 200;
this.headers = new Headers(init?.headers);
}
async text() { return String(this.body); }
async json() { return JSON.parse(this.body); }
};
globalThis.Request = class Request {
constructor(url, init) {
this.url = url;
this.method = init?.method || "GET";
this._body = init?.body;
this.headers = new Headers(init?.headers);
}
async json() { return typeof this._body === 'string' ? JSON.parse(this._body) : this._body; }
async text() { return typeof this._body === 'string' ? this._body : JSON.stringify(this._body); }
};
globalThis.fetch = async (url, init) => {
const method = init?.method || "GET";
const headers = {};
if (init?.headers) {
const h = new Headers(init.headers);
h.forEach((v, k) => headers[k] = v);
}
let body = init?.body;
if (body && typeof body !== 'string') body = JSON.stringify(body);
const res = await Deno.core.ops.op_fetch(url, method, headers, body);
return new Response(res.body, { status: res.status, headers: res.headers });
};
globalThis.Deno = {
serve: (handler) => { globalThis._handler = handler; },
core: Deno.core,
env: {
get: (key) => globalThis._env ? globalThis._env[key] : null,
toObject: () => globalThis._env || {}
}
};
"#;
runtime.execute_script("<preamble>", preamble.to_string())?;
runtime.execute_script("<user_script>", code.to_string())?;
let payload_json = serde_json::to_string(&payload.unwrap_or(json!({})))?;
let headers_json = serde_json::to_string(&headers)?;
let safe_payload = serde_json::to_string(&payload_json)?;
let safe_headers = serde_json::to_string(&headers_json)?;
let invoke_script = format!(r#"
(async () => {{
if (!globalThis._handler) return {{ error: "No handler registered via Deno.serve" }};
try {{
const headers = JSON.parse({1});
const body = JSON.parse({0});
const req = new Request("http://localhost", {{
method: "POST",
body: typeof body === 'string' ? body : JSON.stringify(body),
headers: headers
}});
const res = await globalThis._handler(req);
const text = await res.text();
const resHeaders = {{}};
if (res.headers && typeof res.headers.forEach === 'function') {{
res.headers.forEach((v, k) => resHeaders[k] = v);
}}
return {{ result: text, headers: resHeaders, status: res.status, logs: globalThis.__logs__ }};
}} catch (e) {{
return {{ error: String(e), logs: globalThis.__logs__ }};
}}
}})()
"#, safe_payload, safe_headers);
let result_val = runtime.execute_script("<invocation>", invoke_script)?;
#[allow(deprecated)]
let result = runtime.resolve_value(result_val).await?;
let scope = &mut runtime.handle_scope();
let local = v8::Local::new(scope, result);
let deserialized_value: serde_json::Value = deno_core::serde_v8::from_v8(scope, local)?;
let stdout = deserialized_value.get("result").and_then(|v| v.as_str()).unwrap_or("").to_string();
let stderr = deserialized_value.get("error").and_then(|v| v.as_str()).unwrap_or("").to_string();
let status = deserialized_value.get("status").and_then(|v| v.as_u64()).unwrap_or(200) as u16;
let mut res_headers = HashMap::new();
if let Some(h) = deserialized_value.get("headers").and_then(|v| v.as_object()) {
for (k, v) in h {
if let Some(s) = v.as_str() { res_headers.insert(k.clone(), s.to_string()); }
}
}
let logs = deserialized_value.get("logs").and_then(|v| v.as_array()).cloned().unwrap_or_default();
Ok((stdout, stderr, status, res_headers, logs))
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
// --- Sandbox tests ---
fn make_loader(dir: &str) -> SandboxedModuleLoader {
SandboxedModuleLoader { allowed_dir: PathBuf::from(dir) }
}
#[test]
fn test_sandboxed_loader_blocks_etc_passwd() {
let loader = make_loader("/tmp/madbase_functions");
let result = loader.resolve("/etc/passwd", "file:///tmp/madbase_functions/index.ts", deno_core::ResolutionKind::Import);
assert!(result.is_err(), "Should block /etc/passwd");
assert!(result.unwrap_err().to_string().contains("outside allowed directory"));
}
#[test]
fn test_sandboxed_loader_blocks_parent_traversal() {
let loader = make_loader("/tmp/madbase_functions");
let result = loader.resolve("../../etc/passwd", "file:///tmp/madbase_functions/index.ts", deno_core::ResolutionKind::Import);
assert!(result.is_err(), "Should block parent traversal to /etc/passwd");
}
#[test]
fn test_sandboxed_loader_allows_local_import() {
let loader = make_loader("/tmp/madbase_functions");
let result = loader.resolve("./helper.ts", "file:///tmp/madbase_functions/index.ts", deno_core::ResolutionKind::Import);
// resolve succeeds even if the file doesn't exist (file lookup happens in load())
assert!(result.is_ok(), "Should allow ./helper.ts within allowed dir");
}
#[test]
fn test_sandboxed_loader_allows_https_import() {
let loader = make_loader("/tmp/madbase_functions");
let result = loader.resolve("https://deno.land/std/testing/asserts.ts", "file:///tmp/madbase_functions/index.ts", deno_core::ResolutionKind::Import);
assert!(result.is_ok(), "Should allow https:// imports");
}
#[test]
fn test_sandboxed_loader_blocks_ftp() {
let loader = make_loader("/tmp/madbase_functions");
let result = loader.resolve("ftp://evil.com/payload", "file:///tmp/madbase_functions/index.ts", deno_core::ResolutionKind::Import);
assert!(result.is_err(), "Should block ftp:// scheme");
assert!(result.unwrap_err().to_string().contains("Blocked import scheme"));
}
// --- JS injection safety ---
#[tokio::test]
async fn test_js_injection_safe_payload() {
let runtime = DenoRuntime::new();
let code = r#"
Deno.serve(async (req) => {
const body = await req.text();
return new Response(JSON.stringify({ received: body, alive: true }));
});
"#.to_string();
let malicious_payload = json!({"key": "'; process.exit(); '"});
let (stdout, stderr, _status, _headers, _logs) = runtime
.execute(code, Some(malicious_payload), HashMap::new(), HashMap::new())
.await
.unwrap();
// The critical assertion: the runtime didn't crash and returned a response
let res: serde_json::Value = serde_json::from_str(&stdout).unwrap();
assert_eq!(res["alive"], true, "Runtime survived malicious payload, stderr={}", stderr);
assert!(res["received"].as_str().unwrap().contains("process.exit()"), "Malicious string was preserved as data");
}
#[tokio::test]
async fn test_js_injection_safe_headers() {
let runtime = DenoRuntime::new();
let code = r#"
Deno.serve(async (req) => {
const val = req.headers.get("x-evil");
return new Response(val || "none");
});
"#.to_string();
let mut headers = HashMap::new();
headers.insert("x-evil".to_string(), "\"});process.exit();//".to_string());
let (stdout, stderr, _status, _headers, _logs) = runtime
.execute(code, None, headers.clone(), HashMap::new())
.await
.unwrap();
assert!(stderr.is_empty(), "Should not crash: stderr={}", stderr);
assert_eq!(stdout, headers["x-evil"]);
}
// --- Resource limits ---
#[tokio::test]
async fn test_timeout_enforcement() {
// Use a short timeout for testing
std::env::set_var("FUNCTION_TIMEOUT_SECS", "2");
let runtime = DenoRuntime::new();
let code = r#"
Deno.serve(async (req) => {
while(true) {}
return new Response("unreachable");
});
"#.to_string();
let result = runtime.execute(code, None, HashMap::new(), HashMap::new()).await;
std::env::remove_var("FUNCTION_TIMEOUT_SECS");
assert!(result.is_err(), "Infinite loop should be terminated by timeout");
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("timed out") || err_msg.contains("panicked"),
"Error should mention timeout, got: {}", err_msg
);
}
#[tokio::test]
async fn test_memory_limit_enforcement() {
let runtime = DenoRuntime::new();
// Use JS objects/strings that consume V8 managed heap (not external backing stores)
let code = r#"
Deno.serve(async (req) => {
const arr = [];
while (true) {
arr.push("x".repeat(10000) + Math.random().toString());
}
return new Response("should not reach here");
});
"#.to_string();
std::env::set_var("FUNCTION_TIMEOUT_SECS", "10");
let result = runtime.execute(code, None, HashMap::new(), HashMap::new()).await;
std::env::remove_var("FUNCTION_TIMEOUT_SECS");
// V8 OOMs, the thread panics, or the timeout fires — any of these is an error
assert!(result.is_err(), "Should fail when exceeding 128MB heap limit");
}
// --- TypeScript ---
#[tokio::test]
async fn test_typescript_execution() {
let runtime = DenoRuntime::new();
let code = r#"
interface User { name: string; }
Deno.serve(async (req) => {
const user: User = { name: "MadBase" };
return new Response(`Hello ${user.name}`);
});
"#.to_string();
let (stdout, _stderr, _status, _headers, _logs) = runtime
.execute(code, None, HashMap::new(), HashMap::new())
.await
.unwrap();
assert_eq!(stdout, "Hello MadBase");
}
// --- Environment variables ---
#[tokio::test]
async fn test_env_vars_accessible() {
let runtime = DenoRuntime::new();
let code = r#"
Deno.serve(async (req) => {
const val = Deno.env.get("MY_VAR");
return new Response(val || "missing");
});
"#.to_string();
let mut env_vars = HashMap::new();
env_vars.insert("MY_VAR".to_string(), "hello_from_env".to_string());
let (stdout, _stderr, _status, _headers, _logs) = runtime
.execute(code, None, HashMap::new(), env_vars)
.await
.unwrap();
assert_eq!(stdout, "hello_from_env");
}
// --- Fetch API ---
#[tokio::test]
async fn test_fetch_api_available() {
let runtime = DenoRuntime::new();
let code = r#"
Deno.serve(async (req) => {
const hasFetch = typeof fetch === 'function';
return new Response(JSON.stringify({ hasFetch }));
});
"#.to_string();
let (stdout, _stderr, _status, _headers, _logs) = runtime
.execute(code, None, HashMap::new(), HashMap::new())
.await
.unwrap();
let res: serde_json::Value = serde_json::from_str(&stdout).unwrap();
assert!(res["hasFetch"].as_bool().unwrap());
}
// --- Console log capture ---
#[tokio::test]
async fn test_console_log_capture() {
let runtime = DenoRuntime::new();
let code = r#"
Deno.serve(async (req) => {
console.log("hello from log");
console.error("an error");
return new Response("ok");
});
"#.to_string();
let (stdout, _stderr, _status, _headers, logs) = runtime
.execute(code, None, HashMap::new(), HashMap::new())
.await
.unwrap();
assert_eq!(stdout, "ok");
assert!(logs.len() >= 2, "Should capture at least 2 log entries, got {}", logs.len());
let first_log = &logs[0];
assert!(first_log.to_string().contains("hello from log"));
}
// --- Worker pool ---
#[tokio::test]
async fn test_worker_pool_concurrent() {
let pool = Arc::new(crate::worker_pool::DenoPool::new(4));
let mut handles = vec![];
for i in 0..10 {
let pool = pool.clone();
let code = format!(r#"
Deno.serve(async (req) => {{
return new Response("result-{i}");
}});
"#);
handles.push(tokio::spawn(async move {
pool.execute(code, None, HashMap::new(), HashMap::new()).await
}));
}
let mut success_count = 0;
for handle in handles {
if let Ok(Ok((stdout, _, _, _, _))) = handle.await {
assert!(stdout.starts_with("result-"));
success_count += 1;
}
}
assert_eq!(success_count, 10, "All 10 concurrent invocations should complete");
}
// --- Transpile unit test ---
#[test]
fn test_transpile_strips_types() {
let ts_code = "const x: number = 42; export default x;";
let result = DenoRuntime::transpile(ts_code, Path::new("test.ts")).unwrap();
assert!(!result.contains(": number"), "Type annotations should be stripped");
assert!(result.contains("42"), "Value should be preserved");
}
}