@aurora/runtime-effect (0.3.3)
Published 2025-12-27 10:44:38 +00:00 by vlad
Installation
@aurora:registry=npm install @aurora/runtime-effect@0.3.3"@aurora/runtime-effect": "0.3.3"About this package
@aurora/runtime-effect
Core functional programming abstractions for Aurora - Effect and Stream types for composable async/concurrent operations.
Overview
Aurora's runtime-effect provides two fundamental abstractions:
- Effect<R, E, A> - Composable async operations with typed dependencies, errors, and results
- Stream<R, E, A> - Potentially unbounded flows of values over time
These abstractions enable:
- Explicit dependency injection
- Type-safe error handling
- Predictable async flow
- Composable concurrent operations
Installation
bun add @aurora/runtime-effect
Effect<R, E, A>
An Effect is a composable async operation:
- R - Environment/dependencies required
- E - Error type that can fail with
- A - Success value type
import { Effect, succeed, fail, map, flatMap, run } from "@aurora/runtime-effect";
interface Logger {
log: (msg: string) => void;
}
// A simple effect
const greet = succeed("Hello, World!");
// An effect with environment
const logGreet: Effect<Logger, never, string> = flatMap(greet, (msg) =>
map((logger: Logger) => {
logger.log(msg);
return msg;
}, succeed(logger))
);
// Run the effect
const result = await run(logGreet, { log: console.log });
Core Effect Operations
import {
succeed,
fail,
map,
flatMap,
tap,
pipe
} from "@aurora/runtime-effect";
// Create effects
const success = succeed(42);
const failure = fail(new Error("Something went wrong"));
// Transform results
const doubled = map(success, (n) => n * 2);
// Chain effects
const fetchAndProcess = flatMap(fetchUser, (user) =>
processUser(user)
);
// Side effects without changing value
const logged = tap(fetchData, (data) =>
console.log("Fetched:", data)
);
// Pipeline composition
const result = pipe(
fetchUser,
flatMap(user => fetchOrders(user.id)),
map(orders => orders.length),
tap(count => console.log("Orders:", count))
);
Error Handling
import { catchAll, mapError, tryCatch } from "@aurora/runtime-effect";
// Catch all errors with fallback
const safeFetch = catchAll(
fetchData,
(error) => succeed([])
);
// Transform error types
const typedFetch = mapError(
fetchData,
(error) => new DomainError("Failed to fetch")
);
// Wrap throwing code
const safeJson = tryCatch(
() => JSON.parse(input),
(error) => new ParseError("Invalid JSON")
);
Concurrency
import { zip, race, fork, join } from "@aurora/runtime-effect";
// Run effects in parallel and collect results
const [user, orders] = await run(
zip(fetchUser(), fetchOrders()),
{}
);
// Race multiple effects - first to succeed wins
const fastest = race(
fetchFromCache(),
fetchFromNetwork()
);
// Fork background tasks
const task = fork(longRunningTask);
// ... do other work ...
// Wait for forked task to complete
const result = await join(task);
Resource Management
import { resource, use, ensure } from "@aurora/runtime-effect";
// Manage resources with automatic cleanup
const dbConnection = resource(
// Acquire
async () => await connectDatabase(),
// Release
async (conn) => await conn.close()
);
// Use resource safely
const result = await run(
use(dbConnection, (conn) =>
conn.query("SELECT * FROM users")
),
{}
);
// Ensure cleanup runs even on failure
const withCleanup = ensure(
mainOperation(),
() => console.log("Cleanup complete")
);
Stream<R, E, A>
A Stream represents a potentially unbounded flow of values over time.
import { Stream, fromArray, map, take, toArray } from "@aurora/runtime-effect";
// Create streams
const numbers = fromArray([1, 2, 3, 4, 5]);
// Transform streams
const doubled = map(numbers, (n) => n * 2);
// Take first N values
const firstThree = take(numbers, 3);
// Collect stream to array
const result = await toArray(firstThree).run({});
console.log(result); // [1, 2, 3]
Stream Operations
import {
map,
filter,
flatMap,
take,
takeWhile,
drop,
dropWhile,
scan,
mergeMap,
buffer,
throttle,
debounce
} from "@aurora/runtime-effect";
// Transform each value
const upper = map(stream, s => s.toUpperCase());
// Filter values
const evens = filter(stream, n => n % 2 === 0);
// Chain async operations
const expanded = flatMap(stream, async (id) =>
await fetchItem(id)
);
// Take first N
const first = take(stream, 10);
// Take while condition holds
const valid = takeWhile(stream, n => n < 100);
// Drop first N
const tail = drop(stream, 5);
// Drop while condition holds
const afterStart = dropWhile(stream, n => n !== 0);
// Accumulate values
const sum = scan(stream, 0, (acc, n) => acc + n);
// Concurrent mapping (max 5 at a time)
const fetched = mergeMap(stream, fetchItem, 5);
// Buffer values
const buffered = buffer(stream, {
maxSize: 100,
strategy: "block" // or "drop-oldest" or "drop-newest"
});
Stream Supervision
import {
supervised,
retry,
catchError,
SupervisionStrategies
} from "@aurora/runtime-effect";
// Automatic restart on failure
const resilient = supervised(
riskyStream,
SupervisionStrategies.alwaysRestart(10)
);
// Retry with exponential backoff
const retried = retry(
networkStream,
{
maxAttempts: 5,
initialDelay: 1000,
backoffFactor: 2
}
);
// Error recovery with fallback
const recovered = catchError(
primaryStream,
(error) => fallbackStream
);
Running Effects
import { run, runSync, provide } from "@aurora/runtime-effect";
// Run async effect
const result = await run(effect, environment);
// Run synchronous effect (if possible)
const syncResult = runSync(syncEffect, environment);
// Provide partial environment
const env = {
logger: console.log,
database: db
};
const result = await run(effect, env);
Patterns
Request/Response
function handleRequest(req: Request): Effect<Dependencies, Error, Response> {
return pipe(
validate(req),
flatMap(authenticate),
flatMap(fetchData),
flatMap(process),
map(toResponse)
);
}
Batch Processing
const processBatch = pipe(
fromArray(items),
map(processItem),
forkAll(10), // 10 concurrent workers
collect
);
Event Processing
const processEvents = pipe(
eventStream,
filter(isValid),
flatMap(handleEvent),
tap(storeResult),
supervise(SupervisionStrategies.alwaysRestart())
);
Testing
import { expect, test } from "bun:test";
import { succeed, map, run } from "@aurora/runtime-effect";
test("effect transforms values", async () => {
const effect = pipe(
succeed(5),
map(n => n * 2)
);
const result = await run(effect, {});
expect(result).toBe(10);
});
API Reference
See docs/overview.md for complete Effect and Stream API documentation.
License
MIT