added initial roadmap and implementation
This commit is contained in:
22
realtime/Cargo.toml
Normal file
22
realtime/Cargo.toml
Normal file
@@ -0,0 +1,22 @@
|
||||
[package]
|
||||
name = "realtime"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
common = { workspace = true }
|
||||
auth = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
axum = { workspace = true, features = ["ws"] }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
sqlx = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
tokio-postgres = "0.7"
|
||||
postgres-protocol = "0.6"
|
||||
anyhow = { workspace = true }
|
||||
bytes = "1.0"
|
||||
jsonwebtoken = { workspace = true }
|
||||
chrono.workspace = true
|
||||
34
realtime/src/lib.rs
Normal file
34
realtime/src/lib.rs
Normal file
@@ -0,0 +1,34 @@
|
||||
pub mod replication;
|
||||
pub mod ws;
|
||||
|
||||
use axum::Router;
|
||||
use common::Config;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use sqlx::PgPool;
|
||||
use tokio::sync::broadcast;
|
||||
pub use ws::{router, RealtimeState};
|
||||
|
||||
#[derive(Deserialize, Serialize, Debug, Clone)]
|
||||
pub struct PostgresPayload {
|
||||
pub schema: String,
|
||||
pub table: String,
|
||||
pub r#type: String,
|
||||
#[serde(default)]
|
||||
pub record: Option<Value>,
|
||||
#[serde(default)]
|
||||
pub old_record: Option<Value>,
|
||||
#[serde(default)]
|
||||
pub id: Option<i64>,
|
||||
}
|
||||
|
||||
pub fn init(db: PgPool, config: Config) -> (Router, RealtimeState) {
|
||||
let (tx, _) = broadcast::channel(100);
|
||||
let state = RealtimeState {
|
||||
db,
|
||||
config,
|
||||
broadcast_tx: tx,
|
||||
};
|
||||
|
||||
(ws::router(state.clone()), state)
|
||||
}
|
||||
35
realtime/src/replication.rs
Normal file
35
realtime/src/replication.rs
Normal file
@@ -0,0 +1,35 @@
|
||||
use common::Config;
|
||||
use tokio::sync::broadcast;
|
||||
use std::sync::Arc;
|
||||
use crate::PostgresPayload;
|
||||
|
||||
// Fallback listener using LISTEN/NOTIFY
|
||||
pub async fn start_replication_listener(
|
||||
config: Config,
|
||||
broadcast_tx: broadcast::Sender<Arc<PostgresPayload>>,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut listener = sqlx::postgres::PgListener::connect(&config.database_url).await?;
|
||||
listener.listen("madbase_realtime").await?;
|
||||
tracing::info!("Listening on channel 'madbase_realtime'");
|
||||
|
||||
loop {
|
||||
match listener.recv().await {
|
||||
Ok(notification) => {
|
||||
let payload = notification.payload();
|
||||
tracing::debug!("Received notification: {}", payload);
|
||||
match serde_json::from_str::<PostgresPayload>(payload) {
|
||||
Ok(pg_payload) => {
|
||||
let _ = broadcast_tx.send(Arc::new(pg_payload));
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to parse notification payload: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Replication listener error: {}", e);
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
223
realtime/src/ws.rs
Normal file
223
realtime/src/ws.rs
Normal file
@@ -0,0 +1,223 @@
|
||||
use crate::PostgresPayload;
|
||||
use axum::{
|
||||
extract::{
|
||||
ws::{Message, WebSocket, WebSocketUpgrade},
|
||||
Request, State,
|
||||
},
|
||||
middleware::{from_fn, Next},
|
||||
response::{IntoResponse, Response},
|
||||
routing::get,
|
||||
Extension, Router,
|
||||
};
|
||||
use common::{Config, ProjectContext};
|
||||
use futures::{sink::SinkExt, stream::StreamExt};
|
||||
use jsonwebtoken::{decode, Algorithm, DecodingKey, Validation};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use sqlx::PgPool;
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::{broadcast, mpsc};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct RealtimeState {
|
||||
pub db: PgPool,
|
||||
pub config: Config,
|
||||
pub broadcast_tx: broadcast::Sender<Arc<PostgresPayload>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct Claims {
|
||||
sub: String,
|
||||
role: String,
|
||||
exp: usize,
|
||||
}
|
||||
|
||||
pub async fn ws_handler(
|
||||
ws: WebSocketUpgrade,
|
||||
State(state): State<RealtimeState>,
|
||||
Extension(project_ctx): Extension<ProjectContext>,
|
||||
) -> impl IntoResponse {
|
||||
ws.on_upgrade(move |socket| handle_socket(socket, state, project_ctx))
|
||||
}
|
||||
|
||||
async fn handle_socket(socket: WebSocket, state: RealtimeState, project_ctx: ProjectContext) {
|
||||
let (mut ws_sender, mut ws_receiver) = socket.split();
|
||||
|
||||
// Channel for internal tasks to send messages to the websocket client
|
||||
// We send raw JSON string to avoid struct complexity
|
||||
let (tx_internal, mut rx_internal) = mpsc::channel::<String>(100);
|
||||
|
||||
let mut rx_broadcast = state.broadcast_tx.subscribe();
|
||||
|
||||
let mut subscriptions = HashSet::<String>::new();
|
||||
|
||||
// We might store the user's role/claims if they authenticate
|
||||
let mut _user_claims: Option<Claims> = None;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
// 1. Handle incoming broadcast messages from Postgres
|
||||
res = rx_broadcast.recv() => {
|
||||
match res {
|
||||
Ok(msg_arc) => {
|
||||
let pg_payload = msg_arc.as_ref();
|
||||
tracing::debug!("Received broadcast for {}.{}", pg_payload.schema, pg_payload.table);
|
||||
let topic = format!("realtime:{}:{}", pg_payload.schema, pg_payload.table);
|
||||
let wildcard_topic = format!("realtime:{}:*", pg_payload.schema);
|
||||
let global_topic = "realtime:*".to_string();
|
||||
|
||||
if subscriptions.contains(&topic) || subscriptions.contains(&wildcard_topic) || subscriptions.contains(&global_topic) {
|
||||
tracing::debug!("Match found for topic: {}", topic);
|
||||
// Map to Supabase Realtime V2 format
|
||||
let payload = serde_json::json!({
|
||||
"schema": pg_payload.schema,
|
||||
"table": pg_payload.table,
|
||||
"commit_timestamp": chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
|
||||
"type": pg_payload.r#type.to_uppercase(),
|
||||
"event": pg_payload.r#type.to_uppercase(), // For Supabase client fallback
|
||||
"new": pg_payload.record,
|
||||
"old": pg_payload.old_record,
|
||||
"errors": Option::<String>::None
|
||||
});
|
||||
|
||||
// Phoenix V2 Message: [null, null, topic, "postgres_changes", payload]
|
||||
let msg_arr = serde_json::json!([
|
||||
Value::Null,
|
||||
Value::Null,
|
||||
topic,
|
||||
"postgres_changes",
|
||||
payload
|
||||
]);
|
||||
|
||||
if let Ok(json) = serde_json::to_string(&msg_arr) {
|
||||
tracing::debug!("Sending to client: {}", json);
|
||||
if ws_sender.send(Message::Text(json)).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(broadcast::error::RecvError::Lagged(_)) => {
|
||||
tracing::warn!("Realtime broadcast lagged");
|
||||
continue;
|
||||
}
|
||||
Err(broadcast::error::RecvError::Closed) => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 2. Handle internal messages
|
||||
msg = rx_internal.recv() => {
|
||||
match msg {
|
||||
Some(msg) => {
|
||||
if ws_sender.send(Message::Text(msg)).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
None => break, // Channel closed
|
||||
}
|
||||
}
|
||||
|
||||
// 3. Handle incoming messages from Client
|
||||
result = ws_receiver.next() => {
|
||||
match result {
|
||||
Some(Ok(Message::Text(text))) => {
|
||||
// Parse Phoenix V2 Array
|
||||
if let Ok(arr) = serde_json::from_str::<Vec<Value>>(&text) {
|
||||
if arr.len() >= 4 {
|
||||
let join_ref = arr.get(0).and_then(|v| v.as_str()).map(|s| s.to_string());
|
||||
let r#ref = arr.get(1).and_then(|v| v.as_str()).map(|s| s.to_string());
|
||||
let topic = arr.get(2).and_then(|v| v.as_str()).unwrap_or("").to_string();
|
||||
let event = arr.get(3).and_then(|v| v.as_str()).unwrap_or("").to_string();
|
||||
let payload = arr.get(4).cloned().unwrap_or(Value::Null);
|
||||
|
||||
match event.as_str() {
|
||||
"phx_join" => {
|
||||
// Auth Check
|
||||
let token = payload.get("access_token").and_then(|v| v.as_str());
|
||||
if let Some(jwt) = token {
|
||||
let validation = Validation::new(Algorithm::HS256);
|
||||
match decode::<Claims>(jwt, &DecodingKey::from_secret(project_ctx.jwt_secret.as_bytes()), &validation) {
|
||||
Ok(data) => {
|
||||
_user_claims = Some(data.claims);
|
||||
},
|
||||
Err(_) => {
|
||||
tracing::warn!("Invalid JWT in join");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tracing::debug!("Client joined: {}", topic);
|
||||
subscriptions.insert(topic.clone());
|
||||
|
||||
// Send Ack: [join_ref, ref, topic, "phx_reply", {status: "ok", response: {}}]
|
||||
let reply = serde_json::json!([
|
||||
join_ref,
|
||||
r#ref,
|
||||
topic,
|
||||
"phx_reply",
|
||||
{ "status": "ok", "response": {} }
|
||||
]);
|
||||
if let Ok(reply_str) = serde_json::to_string(&reply) {
|
||||
let _ = tx_internal.send(reply_str).await;
|
||||
}
|
||||
},
|
||||
"phx_leave" => {
|
||||
tracing::debug!("Client left: {}", topic);
|
||||
subscriptions.remove(&topic);
|
||||
|
||||
let reply = serde_json::json!([
|
||||
join_ref,
|
||||
r#ref,
|
||||
topic,
|
||||
"phx_reply",
|
||||
{ "status": "ok", "response": {} }
|
||||
]);
|
||||
if let Ok(reply_str) = serde_json::to_string(&reply) {
|
||||
let _ = tx_internal.send(reply_str).await;
|
||||
}
|
||||
},
|
||||
"heartbeat" => {
|
||||
let reply = serde_json::json!([
|
||||
Value::Null,
|
||||
r#ref,
|
||||
"phoenix",
|
||||
"phx_reply",
|
||||
{ "status": "ok", "response": {} }
|
||||
]);
|
||||
if let Ok(reply_str) = serde_json::to_string(&reply) {
|
||||
let _ = tx_internal.send(reply_str).await;
|
||||
}
|
||||
},
|
||||
_ => {
|
||||
tracing::debug!("Unknown event: {}", event);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
tracing::warn!("Failed to deserialize client message: {}", text);
|
||||
}
|
||||
},
|
||||
Some(Ok(Message::Close(_))) => break,
|
||||
Some(Err(_)) => break,
|
||||
None => break, // Stream closed
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn log_realtime(req: Request, next: Next) -> Response {
|
||||
tracing::info!("Realtime router reached: {}", req.uri());
|
||||
next.run(req).await
|
||||
}
|
||||
|
||||
pub fn router(state: RealtimeState) -> Router {
|
||||
Router::new()
|
||||
.route("/websocket", get(ws_handler))
|
||||
.layer(from_fn(log_realtime))
|
||||
.with_state(state)
|
||||
}
|
||||
Reference in New Issue
Block a user