Files
madbase/_milestones/M5_realtime.md
Vlad Durnea cffdf8af86
Some checks failed
CI/CD Pipeline / unit-tests (push) Failing after 1m16s
CI/CD Pipeline / integration-tests (push) Failing after 2m32s
CI/CD Pipeline / lint (push) Successful in 5m22s
CI/CD Pipeline / e2e-tests (push) Has been skipped
CI/CD Pipeline / build (push) Has been skipped
wip:milestone 0 fixes
2026-03-15 12:35:42 +02:00

274 lines
9.5 KiB
Markdown

# Milestone 5: Realtime
**Goal:** `supabase.channel('room').on('postgres_changes', ...).subscribe()` delivers filtered change events to authorized clients.
**Depends on:** M0 (Security), M1 (Foundation)
---
## 5.1 — Fix Core Functionality
### 5.1.1 Make the realtime crate compile
**File:** `realtime/src/lib.rs`
Current issue: `pub mod lib;` is self-referential and will fail. The crate also references `PostgresPayload` and `PresenceMessage` types that don't exist.
**Fix:**
1. Remove `pub mod lib;` — it creates a circular module reference
2. Define the missing types in a `types.rs` module:
```rust
// realtime/src/types.rs
use serde::{Serialize, Deserialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PostgresPayload {
pub schema: String,
pub table: String,
#[serde(rename = "type")]
pub change_type: String, // INSERT, UPDATE, DELETE
pub record: Option<serde_json::Value>,
pub old_record: Option<serde_json::Value>,
pub columns: Option<Vec<ColumnInfo>>,
#[serde(default)]
pub truncated: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ColumnInfo {
pub name: String,
pub type_: String,
}
```
Update `lib.rs`:
```rust
pub mod types;
pub mod replication;
pub mod ws;
pub mod presence;
pub use types::*;
pub use presence::{PresenceManager, PresenceInfo, PresenceStatus};
```
### 5.1.2 Per-table broadcast channels
**Current problem:** A single `tokio::sync::broadcast::Sender` is used for all table changes. Every connected client receives every change from every table, then filters client-side.
**Fix:** Use a `DashMap<String, broadcast::Sender<PostgresPayload>>` keyed by `"schema.table"`:
```rust
use dashmap::DashMap;
pub struct RealtimeState {
pub channels: Arc<DashMap<String, broadcast::Sender<PostgresPayload>>>,
}
impl RealtimeState {
pub fn get_or_create_channel(&self, key: &str) -> broadcast::Sender<PostgresPayload> {
self.channels
.entry(key.to_string())
.or_insert_with(|| broadcast::channel(1024).0)
.clone()
}
}
```
The replication listener dispatches to the correct channel:
```rust
let key = format!("{}.{}", payload.schema, payload.table);
if let Some(sender) = state.channels.get(&key) {
let _ = sender.send(payload);
}
```
Clients subscribe to specific channels on join.
### 5.1.3 Authorization
On WebSocket join for a postgres_changes subscription:
```rust
async fn authorize_subscription(
pool: &PgPool,
auth_ctx: &AuthContext,
schema: &str,
table: &str,
) -> Result<bool, ApiError> {
let mut tx = pool.begin().await?;
// Set the user's role
let role_query = format!("SET LOCAL role = '{}'", auth_ctx.role);
sqlx::query(&role_query).execute(&mut *tx).await?;
if let Some(claims) = &auth_ctx.claims {
sqlx::query("SELECT set_config('request.jwt.claim.sub', $1, true)")
.bind(&claims.sub).execute(&mut *tx).await?;
}
// Attempt a SELECT — if RLS denies it, the user can't subscribe
let check = format!("SELECT 1 FROM \"{}\".\"{}\" LIMIT 0", schema, table);
match sqlx::query(&check).execute(&mut *tx).await {
Ok(_) => Ok(true),
Err(_) => Ok(false),
}
}
```
### 5.1.4 Event type filtering
Client sends a join message specifying which events to receive:
```json
["1", "1", "realtime:public:posts", "phx_join", {
"config": {
"postgres_changes": [{
"event": "INSERT",
"schema": "public",
"table": "posts",
"filter": "user_id=eq.123"
}]
}
}]
```
Server-side, filter before sending:
```rust
if let Some(event_filter) = &subscription.event_filter {
if !event_filter.contains(&payload.change_type) {
continue; // Skip this event
}
}
```
### 5.1.5 Row-level filtering
Apply the filter expression from the subscription config:
```rust
if let Some(filter) = &subscription.filter {
// Parse "user_id=eq.123" into a condition
// Check if payload.record matches the condition
if !matches_filter(&payload.record, filter) {
continue;
}
}
```
### 5.1.6 Replication listener retry
**File:** `gateway/src/worker.rs` — replication spawn (line ~66)
```rust
tokio::spawn(async move {
loop {
match realtime::replication::start_replication_listener(repl_config.clone(), repl_tx.clone()).await {
Ok(_) => {
tracing::warn!("Replication listener exited normally, restarting...");
}
Err(e) => {
tracing::error!("Replication listener failed: {}, retrying in 5s", e);
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
}
}
});
```
---
## 5.2 — Broadcast & Presence
### 5.2.1 Broadcast channels
Broadcast channels are server-side fan-out without touching the database. Clients send messages to a topic, and all subscribers on that topic receive them.
```rust
// On receiving a broadcast message from a client:
let key = format!("broadcast:{}", topic);
let sender = state.get_or_create_channel(&key);
sender.send(BroadcastPayload { event, payload }).ok();
```
### 5.2.2 Wire in presence
Connect `realtime/src/presence.rs` to the WebSocket handler:
- On `phx_join` with presence config: call `presence_manager.join_channel(user_id, channel, metadata)`
- On `phx_leave` or disconnect: call `presence_manager.leave_channel(user_id, channel)`
- Periodic heartbeat: call `presence_manager.heartbeat(user_id, channel)`
- On `presence_state` request: return `presence_manager.get_channel_users(channel)`
- On presence change: broadcast `presence_diff` to all channel subscribers
---
## 5.3 — Phoenix Protocol
### 5.3.1 Message format
supabase-js sends and expects JSON arrays: `[join_ref, ref, topic, event, payload]`
Verify the server parses this correctly. The current WS handler may expect JSON objects. Test with:
```javascript
const channel = supabase.channel('test')
.on('postgres_changes', { event: 'INSERT', schema: 'public', table: 'posts' }, (payload) => {
console.log(payload);
})
.subscribe();
```
Server responses must also be arrays:
```json
["1", "1", "realtime:public:posts", "phx_reply", {"status": "ok", "response": {}}]
```
---
## Completion Requirements
This milestone is **not complete** until every item below is satisfied.
### 1. Full Test Suite — All Green
- [ ] `cargo test --workspace` passes with **zero failures**
- [ ] `cargo build -p realtime` compiles without errors
- [ ] All **pre-existing tests** still pass (no regressions)
- [ ] **New unit tests** are written for every feature in this milestone:
| Test | Location | What it validates |
|------|----------|-------------------|
| `test_postgres_payload_deserialize` | `realtime/src/lib.rs` | `PostgresPayload` correctly deserializes a pgoutput message |
| `test_column_info_mapping` | `realtime/src/lib.rs` | `ColumnInfo` maps OIDs to column names and types |
| `test_per_table_channel_isolation` | `realtime/src/lib.rs` | Messages for `public.posts` don't reach subscribers of `public.users` |
| `test_authorize_subscription_allowed` | `realtime/src/lib.rs` | User with SELECT on table → `authorize_subscription` returns true |
| `test_authorize_subscription_denied` | `realtime/src/lib.rs` | User without SELECT on table → `authorize_subscription` returns false |
| `test_event_type_filter` | `realtime/src/lib.rs` | Subscribing to INSERT only → UPDATE events are filtered out |
| `test_row_level_filter` | `realtime/src/lib.rs` | `filter: 'user_id=eq.123'` → only matching rows are delivered |
| `test_broadcast_delivery` | `realtime/src/lib.rs` | Broadcast message to a topic reaches all subscribers |
| `test_broadcast_no_cross_topic_leak` | `realtime/src/lib.rs` | Broadcast on topic A doesn't reach topic B subscribers |
| `test_presence_join` | `realtime/src/presence.rs` | Joining a channel broadcasts a `presence_state` event |
| `test_presence_leave` | `realtime/src/presence.rs` | Leaving a channel broadcasts an updated `presence_diff` |
| `test_presence_key_format_consistency` | `realtime/src/presence.rs` | All Redis keys use `presence:channel:{ch}:user:{id}` format (regression for the existing bug) |
| `test_replication_listener_retry` | `realtime/src/lib.rs` | After simulated disconnect, listener reconnects within 5s |
| `test_phoenix_message_format` | `realtime/src/lib.rs` | Outbound messages match `[join_ref, ref, topic, event, payload]` Phoenix format |
### 2. Integration / supabase-js Compatibility Verification
- [ ] WebSocket connection to `/realtime/v1/websocket` succeeds
- [ ] Subscribing to `postgres_changes` for a table the user has access to works
- [ ] Subscribing to a table the user does NOT have access to is rejected
- [ ] INSERT into a subscribed table delivers a change event to the client
- [ ] Event type filter: subscribing to INSERT only → UPDATE events are not received
- [ ] Row-level filter: `filter: 'user_id=eq.123'` → only matching changes are received
- [ ] Broadcast: sending a message to a topic → all subscribers receive it
- [ ] Presence: joining a channel → other members see the join event
- [ ] Replication listener auto-restarts after failure
- [ ] `supabase.channel('room').on('postgres_changes', { event: 'INSERT', schema: 'public', table: 'messages' }, callback).subscribe()` — full round-trip works
### 3. CI Gate
- [ ] All unit tests run in `cargo test --workspace`
- [ ] Tests requiring a Postgres replication slot are gated behind `#[ignore]` or `#[cfg(feature = "integration")]`
- [ ] `cargo build --workspace` succeeds (no compilation errors in `realtime`)