Some checks failed
CI/CD Pipeline / unit-tests (push) Failing after 1m16s
CI/CD Pipeline / integration-tests (push) Failing after 2m32s
CI/CD Pipeline / lint (push) Successful in 5m22s
CI/CD Pipeline / e2e-tests (push) Has been skipped
CI/CD Pipeline / build (push) Has been skipped
1040 lines
31 KiB
TypeScript
1040 lines
31 KiB
TypeScript
import { CHANNEL_EVENTS, CHANNEL_STATES, MAX_PUSH_BUFFER_SIZE } from './lib/constants'
|
|
import Push from './lib/push'
|
|
import type RealtimeClient from './RealtimeClient'
|
|
import Timer from './lib/timer'
|
|
import RealtimePresence, { REALTIME_PRESENCE_LISTEN_EVENTS } from './RealtimePresence'
|
|
import type {
|
|
RealtimePresenceJoinPayload,
|
|
RealtimePresenceLeavePayload,
|
|
RealtimePresenceState,
|
|
} from './RealtimePresence'
|
|
import * as Transformers from './lib/transformers'
|
|
import { httpEndpointURL } from './lib/transformers'
|
|
|
|
type ReplayOption = {
|
|
since: number
|
|
limit?: number
|
|
}
|
|
|
|
export type RealtimeChannelOptions = {
|
|
config: {
|
|
/**
|
|
* self option enables client to receive message it broadcast
|
|
* ack option instructs server to acknowledge that broadcast message was received
|
|
* replay option instructs server to replay broadcast messages
|
|
*/
|
|
broadcast?: { self?: boolean; ack?: boolean; replay?: ReplayOption }
|
|
/**
|
|
* key option is used to track presence payload across clients
|
|
*/
|
|
presence?: { key?: string; enabled?: boolean }
|
|
/**
|
|
* defines if the channel is private or not and if RLS policies will be used to check data
|
|
*/
|
|
private?: boolean
|
|
}
|
|
}
|
|
|
|
type RealtimeChangesPayloadBase = {
|
|
schema: string
|
|
table: string
|
|
}
|
|
|
|
type RealtimeBroadcastChangesPayloadBase = RealtimeChangesPayloadBase & {
|
|
id: string
|
|
}
|
|
|
|
export type RealtimeBroadcastInsertPayload<T extends { [key: string]: any }> =
|
|
RealtimeBroadcastChangesPayloadBase & {
|
|
operation: `${REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.INSERT}`
|
|
record: T
|
|
old_record: null
|
|
}
|
|
|
|
export type RealtimeBroadcastUpdatePayload<T extends { [key: string]: any }> =
|
|
RealtimeBroadcastChangesPayloadBase & {
|
|
operation: `${REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.UPDATE}`
|
|
record: T
|
|
old_record: T
|
|
}
|
|
|
|
export type RealtimeBroadcastDeletePayload<T extends { [key: string]: any }> =
|
|
RealtimeBroadcastChangesPayloadBase & {
|
|
operation: `${REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.DELETE}`
|
|
record: null
|
|
old_record: T
|
|
}
|
|
|
|
export type RealtimeBroadcastPayload<T extends { [key: string]: any }> =
|
|
| RealtimeBroadcastInsertPayload<T>
|
|
| RealtimeBroadcastUpdatePayload<T>
|
|
| RealtimeBroadcastDeletePayload<T>
|
|
|
|
type RealtimePostgresChangesPayloadBase = {
|
|
schema: string
|
|
table: string
|
|
commit_timestamp: string
|
|
errors: string[]
|
|
}
|
|
|
|
export type RealtimePostgresInsertPayload<T extends { [key: string]: any }> =
|
|
RealtimePostgresChangesPayloadBase & {
|
|
eventType: `${REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.INSERT}`
|
|
new: T
|
|
old: {}
|
|
}
|
|
|
|
export type RealtimePostgresUpdatePayload<T extends { [key: string]: any }> =
|
|
RealtimePostgresChangesPayloadBase & {
|
|
eventType: `${REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.UPDATE}`
|
|
new: T
|
|
old: Partial<T>
|
|
}
|
|
|
|
export type RealtimePostgresDeletePayload<T extends { [key: string]: any }> =
|
|
RealtimePostgresChangesPayloadBase & {
|
|
eventType: `${REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.DELETE}`
|
|
new: {}
|
|
old: Partial<T>
|
|
}
|
|
|
|
export type RealtimePostgresChangesPayload<T extends { [key: string]: any }> =
|
|
| RealtimePostgresInsertPayload<T>
|
|
| RealtimePostgresUpdatePayload<T>
|
|
| RealtimePostgresDeletePayload<T>
|
|
|
|
export type RealtimePostgresChangesFilter<T extends `${REALTIME_POSTGRES_CHANGES_LISTEN_EVENT}`> = {
|
|
/**
|
|
* The type of database change to listen to.
|
|
*/
|
|
event: T
|
|
/**
|
|
* The database schema to listen to.
|
|
*/
|
|
schema: string
|
|
/**
|
|
* The database table to listen to.
|
|
*/
|
|
table?: string
|
|
/**
|
|
* Receive database changes when filter is matched.
|
|
*/
|
|
filter?: string
|
|
}
|
|
|
|
export type RealtimeChannelSendResponse = 'ok' | 'timed out' | 'error'
|
|
|
|
export enum REALTIME_POSTGRES_CHANGES_LISTEN_EVENT {
|
|
ALL = '*',
|
|
INSERT = 'INSERT',
|
|
UPDATE = 'UPDATE',
|
|
DELETE = 'DELETE',
|
|
}
|
|
|
|
export enum REALTIME_LISTEN_TYPES {
|
|
BROADCAST = 'broadcast',
|
|
PRESENCE = 'presence',
|
|
POSTGRES_CHANGES = 'postgres_changes',
|
|
SYSTEM = 'system',
|
|
}
|
|
|
|
export enum REALTIME_SUBSCRIBE_STATES {
|
|
SUBSCRIBED = 'SUBSCRIBED',
|
|
TIMED_OUT = 'TIMED_OUT',
|
|
CLOSED = 'CLOSED',
|
|
CHANNEL_ERROR = 'CHANNEL_ERROR',
|
|
}
|
|
|
|
export const REALTIME_CHANNEL_STATES = CHANNEL_STATES
|
|
|
|
interface PostgresChangesFilters {
|
|
postgres_changes: {
|
|
id: string
|
|
event: string
|
|
schema?: string
|
|
table?: string
|
|
filter?: string
|
|
}[]
|
|
}
|
|
/** A channel is the basic building block of Realtime
|
|
* and narrows the scope of data flow to subscribed clients.
|
|
* You can think of a channel as a chatroom where participants are able to see who's online
|
|
* and send and receive messages.
|
|
*/
|
|
export default class RealtimeChannel {
|
|
bindings: {
|
|
[key: string]: {
|
|
type: string
|
|
filter: { [key: string]: any }
|
|
callback: Function
|
|
id?: string
|
|
}[]
|
|
} = {}
|
|
timeout: number
|
|
state: CHANNEL_STATES = CHANNEL_STATES.closed
|
|
joinedOnce = false
|
|
joinPush: Push
|
|
rejoinTimer: Timer
|
|
pushBuffer: Push[] = []
|
|
presence: RealtimePresence
|
|
broadcastEndpointURL: string
|
|
subTopic: string
|
|
private: boolean
|
|
|
|
/**
|
|
* Creates a channel that can broadcast messages, sync presence, and listen to Postgres changes.
|
|
*
|
|
* The topic determines which realtime stream you are subscribing to. Config options let you
|
|
* enable acknowledgement for broadcasts, presence tracking, or private channels.
|
|
*
|
|
* @example
|
|
* ```ts
|
|
* import RealtimeClient from '@supabase/realtime-js'
|
|
*
|
|
* const client = new RealtimeClient('https://xyzcompany.supabase.co/realtime/v1', {
|
|
* params: { apikey: 'public-anon-key' },
|
|
* })
|
|
* const channel = new RealtimeChannel('realtime:public:messages', { config: {} }, client)
|
|
* ```
|
|
*/
|
|
constructor(
|
|
/** Topic name can be any string. */
|
|
public topic: string,
|
|
public params: RealtimeChannelOptions = { config: {} },
|
|
public socket: RealtimeClient
|
|
) {
|
|
this.subTopic = topic.replace(/^realtime:/i, '')
|
|
this.params.config = {
|
|
...{
|
|
broadcast: { ack: false, self: false },
|
|
presence: { key: '', enabled: false },
|
|
private: false,
|
|
},
|
|
...params.config,
|
|
}
|
|
this.timeout = this.socket.timeout
|
|
this.joinPush = new Push(this, CHANNEL_EVENTS.join, this.params, this.timeout)
|
|
this.rejoinTimer = new Timer(() => this._rejoinUntilConnected(), this.socket.reconnectAfterMs)
|
|
this.joinPush.receive('ok', () => {
|
|
this.state = CHANNEL_STATES.joined
|
|
this.rejoinTimer.reset()
|
|
this.pushBuffer.forEach((pushEvent: Push) => pushEvent.send())
|
|
this.pushBuffer = []
|
|
})
|
|
this._onClose(() => {
|
|
this.rejoinTimer.reset()
|
|
this.socket.log('channel', `close ${this.topic} ${this._joinRef()}`)
|
|
this.state = CHANNEL_STATES.closed
|
|
this.socket._remove(this)
|
|
})
|
|
this._onError((reason: string) => {
|
|
if (this._isLeaving() || this._isClosed()) {
|
|
return
|
|
}
|
|
this.socket.log('channel', `error ${this.topic}`, reason)
|
|
this.state = CHANNEL_STATES.errored
|
|
this.rejoinTimer.scheduleTimeout()
|
|
})
|
|
this.joinPush.receive('timeout', () => {
|
|
if (!this._isJoining()) {
|
|
return
|
|
}
|
|
this.socket.log('channel', `timeout ${this.topic}`, this.joinPush.timeout)
|
|
this.state = CHANNEL_STATES.errored
|
|
this.rejoinTimer.scheduleTimeout()
|
|
})
|
|
|
|
this.joinPush.receive('error', (reason: any) => {
|
|
if (this._isLeaving() || this._isClosed()) {
|
|
return
|
|
}
|
|
this.socket.log('channel', `error ${this.topic}`, reason)
|
|
this.state = CHANNEL_STATES.errored
|
|
this.rejoinTimer.scheduleTimeout()
|
|
})
|
|
this._on(CHANNEL_EVENTS.reply, {}, (payload: any, ref: string) => {
|
|
this._trigger(this._replyEventName(ref), payload)
|
|
})
|
|
|
|
this.presence = new RealtimePresence(this)
|
|
|
|
this.broadcastEndpointURL = httpEndpointURL(this.socket.endPoint)
|
|
this.private = this.params.config.private || false
|
|
|
|
if (!this.private && this.params.config?.broadcast?.replay) {
|
|
throw `tried to use replay on public channel '${this.topic}'. It must be a private channel.`
|
|
}
|
|
}
|
|
|
|
/** Subscribe registers your client with the server */
|
|
subscribe(
|
|
callback?: (status: REALTIME_SUBSCRIBE_STATES, err?: Error) => void,
|
|
timeout = this.timeout
|
|
): RealtimeChannel {
|
|
if (!this.socket.isConnected()) {
|
|
this.socket.connect()
|
|
}
|
|
if (this.state == CHANNEL_STATES.closed) {
|
|
const {
|
|
config: { broadcast, presence, private: isPrivate },
|
|
} = this.params
|
|
|
|
const postgres_changes = this.bindings.postgres_changes?.map((r) => r.filter) ?? []
|
|
|
|
const presence_enabled =
|
|
(!!this.bindings[REALTIME_LISTEN_TYPES.PRESENCE] &&
|
|
this.bindings[REALTIME_LISTEN_TYPES.PRESENCE].length > 0) ||
|
|
this.params.config.presence?.enabled === true
|
|
const accessTokenPayload: { access_token?: string } = {}
|
|
const config = {
|
|
broadcast,
|
|
presence: { ...presence, enabled: presence_enabled },
|
|
postgres_changes,
|
|
private: isPrivate,
|
|
}
|
|
|
|
if (this.socket.accessTokenValue) {
|
|
accessTokenPayload.access_token = this.socket.accessTokenValue
|
|
}
|
|
|
|
this._onError((e: Error) => callback?.(REALTIME_SUBSCRIBE_STATES.CHANNEL_ERROR, e))
|
|
|
|
this._onClose(() => callback?.(REALTIME_SUBSCRIBE_STATES.CLOSED))
|
|
|
|
this.updateJoinPayload({ ...{ config }, ...accessTokenPayload })
|
|
|
|
this.joinedOnce = true
|
|
this._rejoin(timeout)
|
|
|
|
this.joinPush
|
|
.receive('ok', async ({ postgres_changes }: PostgresChangesFilters) => {
|
|
// Only refresh auth if using callback-based tokens
|
|
if (!this.socket._isManualToken()) {
|
|
this.socket.setAuth()
|
|
}
|
|
if (postgres_changes === undefined) {
|
|
callback?.(REALTIME_SUBSCRIBE_STATES.SUBSCRIBED)
|
|
return
|
|
} else {
|
|
const clientPostgresBindings = this.bindings.postgres_changes
|
|
const bindingsLen = clientPostgresBindings?.length ?? 0
|
|
const newPostgresBindings = []
|
|
|
|
for (let i = 0; i < bindingsLen; i++) {
|
|
const clientPostgresBinding = clientPostgresBindings[i]
|
|
const {
|
|
filter: { event, schema, table, filter },
|
|
} = clientPostgresBinding
|
|
const serverPostgresFilter = postgres_changes && postgres_changes[i]
|
|
|
|
if (
|
|
serverPostgresFilter &&
|
|
serverPostgresFilter.event === event &&
|
|
RealtimeChannel.isFilterValueEqual(serverPostgresFilter.schema, schema) &&
|
|
RealtimeChannel.isFilterValueEqual(serverPostgresFilter.table, table) &&
|
|
RealtimeChannel.isFilterValueEqual(serverPostgresFilter.filter, filter)
|
|
) {
|
|
newPostgresBindings.push({
|
|
...clientPostgresBinding,
|
|
id: serverPostgresFilter.id,
|
|
})
|
|
} else {
|
|
this.unsubscribe()
|
|
this.state = CHANNEL_STATES.errored
|
|
|
|
callback?.(
|
|
REALTIME_SUBSCRIBE_STATES.CHANNEL_ERROR,
|
|
new Error('mismatch between server and client bindings for postgres changes')
|
|
)
|
|
return
|
|
}
|
|
}
|
|
|
|
this.bindings.postgres_changes = newPostgresBindings
|
|
|
|
callback && callback(REALTIME_SUBSCRIBE_STATES.SUBSCRIBED)
|
|
return
|
|
}
|
|
})
|
|
.receive('error', (error: { [key: string]: any }) => {
|
|
this.state = CHANNEL_STATES.errored
|
|
callback?.(
|
|
REALTIME_SUBSCRIBE_STATES.CHANNEL_ERROR,
|
|
new Error(JSON.stringify(Object.values(error).join(', ') || 'error'))
|
|
)
|
|
return
|
|
})
|
|
.receive('timeout', () => {
|
|
callback?.(REALTIME_SUBSCRIBE_STATES.TIMED_OUT)
|
|
return
|
|
})
|
|
}
|
|
return this
|
|
}
|
|
|
|
/**
|
|
* Returns the current presence state for this channel.
|
|
*
|
|
* The shape is a map keyed by presence key (for example a user id) where each entry contains the
|
|
* tracked metadata for that user.
|
|
*/
|
|
presenceState<T extends { [key: string]: any } = {}>(): RealtimePresenceState<T> {
|
|
return this.presence.state as RealtimePresenceState<T>
|
|
}
|
|
|
|
/**
|
|
* Sends the supplied payload to the presence tracker so other subscribers can see that this
|
|
* client is online. Use `untrack` to stop broadcasting presence for the same key.
|
|
*/
|
|
async track(
|
|
payload: { [key: string]: any },
|
|
opts: { [key: string]: any } = {}
|
|
): Promise<RealtimeChannelSendResponse> {
|
|
return await this.send(
|
|
{
|
|
type: 'presence',
|
|
event: 'track',
|
|
payload,
|
|
},
|
|
opts.timeout || this.timeout
|
|
)
|
|
}
|
|
|
|
/**
|
|
* Removes the current presence state for this client.
|
|
*/
|
|
async untrack(opts: { [key: string]: any } = {}): Promise<RealtimeChannelSendResponse> {
|
|
return await this.send(
|
|
{
|
|
type: 'presence',
|
|
event: 'untrack',
|
|
},
|
|
opts
|
|
)
|
|
}
|
|
|
|
/**
|
|
* Creates an event handler that listens to changes.
|
|
*/
|
|
on(
|
|
type: `${REALTIME_LISTEN_TYPES.PRESENCE}`,
|
|
filter: { event: `${REALTIME_PRESENCE_LISTEN_EVENTS.SYNC}` },
|
|
callback: () => void
|
|
): RealtimeChannel
|
|
on<T extends { [key: string]: any }>(
|
|
type: `${REALTIME_LISTEN_TYPES.PRESENCE}`,
|
|
filter: { event: `${REALTIME_PRESENCE_LISTEN_EVENTS.JOIN}` },
|
|
callback: (payload: RealtimePresenceJoinPayload<T>) => void
|
|
): RealtimeChannel
|
|
on<T extends { [key: string]: any }>(
|
|
type: `${REALTIME_LISTEN_TYPES.PRESENCE}`,
|
|
filter: { event: `${REALTIME_PRESENCE_LISTEN_EVENTS.LEAVE}` },
|
|
callback: (payload: RealtimePresenceLeavePayload<T>) => void
|
|
): RealtimeChannel
|
|
on<T extends { [key: string]: any }>(
|
|
type: `${REALTIME_LISTEN_TYPES.POSTGRES_CHANGES}`,
|
|
filter: RealtimePostgresChangesFilter<`${REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.ALL}`>,
|
|
callback: (payload: RealtimePostgresChangesPayload<T>) => void
|
|
): RealtimeChannel
|
|
on<T extends { [key: string]: any }>(
|
|
type: `${REALTIME_LISTEN_TYPES.POSTGRES_CHANGES}`,
|
|
filter: RealtimePostgresChangesFilter<`${REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.INSERT}`>,
|
|
callback: (payload: RealtimePostgresInsertPayload<T>) => void
|
|
): RealtimeChannel
|
|
on<T extends { [key: string]: any }>(
|
|
type: `${REALTIME_LISTEN_TYPES.POSTGRES_CHANGES}`,
|
|
filter: RealtimePostgresChangesFilter<`${REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.UPDATE}`>,
|
|
callback: (payload: RealtimePostgresUpdatePayload<T>) => void
|
|
): RealtimeChannel
|
|
on<T extends { [key: string]: any }>(
|
|
type: `${REALTIME_LISTEN_TYPES.POSTGRES_CHANGES}`,
|
|
filter: RealtimePostgresChangesFilter<`${REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.DELETE}`>,
|
|
callback: (payload: RealtimePostgresDeletePayload<T>) => void
|
|
): RealtimeChannel
|
|
on<T extends { [key: string]: any }>(
|
|
type: `${REALTIME_LISTEN_TYPES.POSTGRES_CHANGES}`,
|
|
filter: RealtimePostgresChangesFilter<`${REALTIME_POSTGRES_CHANGES_LISTEN_EVENT}`>,
|
|
callback: (payload: RealtimePostgresChangesPayload<T>) => void
|
|
): RealtimeChannel
|
|
/**
|
|
* The following is placed here to display on supabase.com/docs/reference/javascript/subscribe.
|
|
* @param type One of "broadcast", "presence", or "postgres_changes".
|
|
* @param filter Custom object specific to the Realtime feature detailing which payloads to receive.
|
|
* @param callback Function to be invoked when event handler is triggered.
|
|
*/
|
|
on(
|
|
type: `${REALTIME_LISTEN_TYPES.BROADCAST}`,
|
|
filter: { event: string },
|
|
callback: (payload: {
|
|
type: `${REALTIME_LISTEN_TYPES.BROADCAST}`
|
|
event: string
|
|
meta?: {
|
|
replayed?: boolean
|
|
id: string
|
|
}
|
|
[key: string]: any
|
|
}) => void
|
|
): RealtimeChannel
|
|
on<T extends { [key: string]: any }>(
|
|
type: `${REALTIME_LISTEN_TYPES.BROADCAST}`,
|
|
filter: { event: string },
|
|
callback: (payload: {
|
|
type: `${REALTIME_LISTEN_TYPES.BROADCAST}`
|
|
event: string
|
|
meta?: {
|
|
replayed?: boolean
|
|
id: string
|
|
}
|
|
payload: T
|
|
}) => void
|
|
): RealtimeChannel
|
|
on<T extends Record<string, unknown>>(
|
|
type: `${REALTIME_LISTEN_TYPES.BROADCAST}`,
|
|
filter: { event: REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.ALL },
|
|
callback: (payload: {
|
|
type: `${REALTIME_LISTEN_TYPES.BROADCAST}`
|
|
event: REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.ALL
|
|
payload: RealtimeBroadcastPayload<T>
|
|
}) => void
|
|
): RealtimeChannel
|
|
on<T extends { [key: string]: any }>(
|
|
type: `${REALTIME_LISTEN_TYPES.BROADCAST}`,
|
|
filter: { event: REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.INSERT },
|
|
callback: (payload: {
|
|
type: `${REALTIME_LISTEN_TYPES.BROADCAST}`
|
|
event: REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.INSERT
|
|
payload: RealtimeBroadcastInsertPayload<T>
|
|
}) => void
|
|
): RealtimeChannel
|
|
on<T extends { [key: string]: any }>(
|
|
type: `${REALTIME_LISTEN_TYPES.BROADCAST}`,
|
|
filter: { event: REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.UPDATE },
|
|
callback: (payload: {
|
|
type: `${REALTIME_LISTEN_TYPES.BROADCAST}`
|
|
event: REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.UPDATE
|
|
payload: RealtimeBroadcastUpdatePayload<T>
|
|
}) => void
|
|
): RealtimeChannel
|
|
on<T extends { [key: string]: any }>(
|
|
type: `${REALTIME_LISTEN_TYPES.BROADCAST}`,
|
|
filter: { event: REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.DELETE },
|
|
callback: (payload: {
|
|
type: `${REALTIME_LISTEN_TYPES.BROADCAST}`
|
|
event: REALTIME_POSTGRES_CHANGES_LISTEN_EVENT.DELETE
|
|
payload: RealtimeBroadcastDeletePayload<T>
|
|
}) => void
|
|
): RealtimeChannel
|
|
on<T extends { [key: string]: any }>(
|
|
type: `${REALTIME_LISTEN_TYPES.SYSTEM}`,
|
|
filter: {},
|
|
callback: (payload: any) => void
|
|
): RealtimeChannel
|
|
on(
|
|
type: `${REALTIME_LISTEN_TYPES}`,
|
|
filter: { event: string; [key: string]: string },
|
|
callback: (payload: any) => void
|
|
): RealtimeChannel {
|
|
if (this.state === CHANNEL_STATES.joined && type === REALTIME_LISTEN_TYPES.PRESENCE) {
|
|
this.socket.log(
|
|
'channel',
|
|
`resubscribe to ${this.topic} due to change in presence callbacks on joined channel`
|
|
)
|
|
this.unsubscribe().then(async () => await this.subscribe())
|
|
}
|
|
return this._on(type, filter, callback)
|
|
}
|
|
/**
|
|
* Sends a broadcast message explicitly via REST API.
|
|
*
|
|
* This method always uses the REST API endpoint regardless of WebSocket connection state.
|
|
* Useful when you want to guarantee REST delivery or when gradually migrating from implicit REST fallback.
|
|
*
|
|
* @param event The name of the broadcast event
|
|
* @param payload Payload to be sent (required)
|
|
* @param opts Options including timeout
|
|
* @returns Promise resolving to object with success status, and error details if failed
|
|
*/
|
|
async httpSend(
|
|
event: string,
|
|
payload: any,
|
|
opts: { timeout?: number } = {}
|
|
): Promise<{ success: true } | { success: false; status: number; error: string }> {
|
|
if (payload === undefined || payload === null) {
|
|
return Promise.reject('Payload is required for httpSend()')
|
|
}
|
|
|
|
const headers: Record<string, string> = {
|
|
apikey: this.socket.apiKey ? this.socket.apiKey : '',
|
|
'Content-Type': 'application/json',
|
|
}
|
|
|
|
if (this.socket.accessTokenValue) {
|
|
headers['Authorization'] = `Bearer ${this.socket.accessTokenValue}`
|
|
}
|
|
|
|
const options = {
|
|
method: 'POST',
|
|
headers,
|
|
body: JSON.stringify({
|
|
messages: [
|
|
{
|
|
topic: this.subTopic,
|
|
event,
|
|
payload: payload,
|
|
private: this.private,
|
|
},
|
|
],
|
|
}),
|
|
}
|
|
|
|
const response = await this._fetchWithTimeout(
|
|
this.broadcastEndpointURL,
|
|
options,
|
|
opts.timeout ?? this.timeout
|
|
)
|
|
|
|
if (response.status === 202) {
|
|
return { success: true }
|
|
}
|
|
|
|
let errorMessage = response.statusText
|
|
try {
|
|
const errorBody = await response.json()
|
|
errorMessage = errorBody.error || errorBody.message || errorMessage
|
|
} catch {}
|
|
|
|
return Promise.reject(new Error(errorMessage))
|
|
}
|
|
|
|
/**
|
|
* Sends a message into the channel.
|
|
*
|
|
* @param args Arguments to send to channel
|
|
* @param args.type The type of event to send
|
|
* @param args.event The name of the event being sent
|
|
* @param args.payload Payload to be sent
|
|
* @param opts Options to be used during the send process
|
|
*/
|
|
async send(
|
|
args: {
|
|
type: 'broadcast' | 'presence' | 'postgres_changes'
|
|
event: string
|
|
payload?: any
|
|
[key: string]: any
|
|
},
|
|
opts: { [key: string]: any } = {}
|
|
): Promise<RealtimeChannelSendResponse> {
|
|
if (!this._canPush() && args.type === 'broadcast') {
|
|
console.warn(
|
|
'Realtime send() is automatically falling back to REST API. ' +
|
|
'This behavior will be deprecated in the future. ' +
|
|
'Please use httpSend() explicitly for REST delivery.'
|
|
)
|
|
|
|
const { event, payload: endpoint_payload } = args
|
|
const headers: Record<string, string> = {
|
|
apikey: this.socket.apiKey ? this.socket.apiKey : '',
|
|
'Content-Type': 'application/json',
|
|
}
|
|
|
|
if (this.socket.accessTokenValue) {
|
|
headers['Authorization'] = `Bearer ${this.socket.accessTokenValue}`
|
|
}
|
|
|
|
const options = {
|
|
method: 'POST',
|
|
headers,
|
|
body: JSON.stringify({
|
|
messages: [
|
|
{
|
|
topic: this.subTopic,
|
|
event,
|
|
payload: endpoint_payload,
|
|
private: this.private,
|
|
},
|
|
],
|
|
}),
|
|
}
|
|
|
|
try {
|
|
const response = await this._fetchWithTimeout(
|
|
this.broadcastEndpointURL,
|
|
options,
|
|
opts.timeout ?? this.timeout
|
|
)
|
|
|
|
await response.body?.cancel()
|
|
return response.ok ? 'ok' : 'error'
|
|
} catch (error: any) {
|
|
if (error.name === 'AbortError') {
|
|
return 'timed out'
|
|
} else {
|
|
return 'error'
|
|
}
|
|
}
|
|
} else {
|
|
return new Promise((resolve) => {
|
|
const push = this._push(args.type, args, opts.timeout || this.timeout)
|
|
|
|
if (args.type === 'broadcast' && !this.params?.config?.broadcast?.ack) {
|
|
resolve('ok')
|
|
}
|
|
|
|
push.receive('ok', () => resolve('ok'))
|
|
push.receive('error', () => resolve('error'))
|
|
push.receive('timeout', () => resolve('timed out'))
|
|
})
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Updates the payload that will be sent the next time the channel joins (reconnects).
|
|
* Useful for rotating access tokens or updating config without re-creating the channel.
|
|
*/
|
|
updateJoinPayload(payload: { [key: string]: any }): void {
|
|
this.joinPush.updatePayload(payload)
|
|
}
|
|
|
|
/**
|
|
* Leaves the channel.
|
|
*
|
|
* Unsubscribes from server events, and instructs channel to terminate on server.
|
|
* Triggers onClose() hooks.
|
|
*
|
|
* To receive leave acknowledgements, use the a `receive` hook to bind to the server ack, ie:
|
|
* channel.unsubscribe().receive("ok", () => alert("left!") )
|
|
*/
|
|
unsubscribe(timeout = this.timeout): Promise<'ok' | 'timed out' | 'error'> {
|
|
this.state = CHANNEL_STATES.leaving
|
|
const onClose = () => {
|
|
this.socket.log('channel', `leave ${this.topic}`)
|
|
this._trigger(CHANNEL_EVENTS.close, 'leave', this._joinRef())
|
|
}
|
|
|
|
this.joinPush.destroy()
|
|
|
|
let leavePush: Push | null = null
|
|
|
|
return new Promise<RealtimeChannelSendResponse>((resolve) => {
|
|
leavePush = new Push(this, CHANNEL_EVENTS.leave, {}, timeout)
|
|
leavePush
|
|
.receive('ok', () => {
|
|
onClose()
|
|
resolve('ok')
|
|
})
|
|
.receive('timeout', () => {
|
|
onClose()
|
|
resolve('timed out')
|
|
})
|
|
.receive('error', () => {
|
|
resolve('error')
|
|
})
|
|
|
|
leavePush.send()
|
|
if (!this._canPush()) {
|
|
leavePush.trigger('ok', {})
|
|
}
|
|
}).finally(() => {
|
|
leavePush?.destroy()
|
|
})
|
|
}
|
|
/**
|
|
* Teardown the channel.
|
|
*
|
|
* Destroys and stops related timers.
|
|
*/
|
|
teardown() {
|
|
this.pushBuffer.forEach((push: Push) => push.destroy())
|
|
this.pushBuffer = []
|
|
this.rejoinTimer.reset()
|
|
this.joinPush.destroy()
|
|
this.state = CHANNEL_STATES.closed
|
|
this.bindings = {}
|
|
}
|
|
|
|
/** @internal */
|
|
|
|
async _fetchWithTimeout(url: string, options: { [key: string]: any }, timeout: number) {
|
|
const controller = new AbortController()
|
|
const id = setTimeout(() => controller.abort(), timeout)
|
|
|
|
const response = await this.socket.fetch(url, {
|
|
...options,
|
|
signal: controller.signal,
|
|
})
|
|
|
|
clearTimeout(id)
|
|
|
|
return response
|
|
}
|
|
|
|
/** @internal */
|
|
_push(event: string, payload: { [key: string]: any }, timeout = this.timeout) {
|
|
if (!this.joinedOnce) {
|
|
throw `tried to push '${event}' to '${this.topic}' before joining. Use channel.subscribe() before pushing events`
|
|
}
|
|
let pushEvent = new Push(this, event, payload, timeout)
|
|
if (this._canPush()) {
|
|
pushEvent.send()
|
|
} else {
|
|
this._addToPushBuffer(pushEvent)
|
|
}
|
|
|
|
return pushEvent
|
|
}
|
|
|
|
/** @internal */
|
|
_addToPushBuffer(pushEvent: Push) {
|
|
pushEvent.startTimeout()
|
|
this.pushBuffer.push(pushEvent)
|
|
|
|
// Enforce buffer size limit
|
|
if (this.pushBuffer.length > MAX_PUSH_BUFFER_SIZE) {
|
|
const removedPush = this.pushBuffer.shift()
|
|
if (removedPush) {
|
|
removedPush.destroy()
|
|
this.socket.log(
|
|
'channel',
|
|
`discarded push due to buffer overflow: ${removedPush.event}`,
|
|
removedPush.payload
|
|
)
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Overridable message hook
|
|
*
|
|
* Receives all events for specialized message handling before dispatching to the channel callbacks.
|
|
* Must return the payload, modified or unmodified.
|
|
*
|
|
* @internal
|
|
*/
|
|
_onMessage(_event: string, payload: any, _ref?: string) {
|
|
return payload
|
|
}
|
|
|
|
/** @internal */
|
|
_isMember(topic: string): boolean {
|
|
return this.topic === topic
|
|
}
|
|
|
|
/** @internal */
|
|
_joinRef(): string {
|
|
return this.joinPush.ref
|
|
}
|
|
|
|
/** @internal */
|
|
_trigger(type: string, payload?: any, ref?: string) {
|
|
const typeLower = type.toLocaleLowerCase()
|
|
const { close, error, leave, join } = CHANNEL_EVENTS
|
|
const events: string[] = [close, error, leave, join]
|
|
if (ref && events.indexOf(typeLower) >= 0 && ref !== this._joinRef()) {
|
|
return
|
|
}
|
|
let handledPayload = this._onMessage(typeLower, payload, ref)
|
|
if (payload && !handledPayload) {
|
|
throw 'channel onMessage callbacks must return the payload, modified or unmodified'
|
|
}
|
|
|
|
if (['insert', 'update', 'delete'].includes(typeLower)) {
|
|
this.bindings.postgres_changes
|
|
?.filter((bind) => {
|
|
return bind.filter?.event === '*' || bind.filter?.event?.toLocaleLowerCase() === typeLower
|
|
})
|
|
.map((bind) => bind.callback(handledPayload, ref))
|
|
} else {
|
|
this.bindings[typeLower]
|
|
?.filter((bind) => {
|
|
if (['broadcast', 'presence', 'postgres_changes'].includes(typeLower)) {
|
|
if ('id' in bind) {
|
|
const bindId = bind.id
|
|
const bindEvent = bind.filter?.event
|
|
return (
|
|
bindId &&
|
|
payload.ids?.includes(bindId) &&
|
|
(bindEvent === '*' ||
|
|
bindEvent?.toLocaleLowerCase() === payload.data?.type.toLocaleLowerCase())
|
|
)
|
|
} else {
|
|
const bindEvent = bind?.filter?.event?.toLocaleLowerCase()
|
|
return bindEvent === '*' || bindEvent === payload?.event?.toLocaleLowerCase()
|
|
}
|
|
} else {
|
|
return bind.type.toLocaleLowerCase() === typeLower
|
|
}
|
|
})
|
|
.map((bind) => {
|
|
if (typeof handledPayload === 'object' && 'ids' in handledPayload) {
|
|
const postgresChanges = handledPayload.data
|
|
const { schema, table, commit_timestamp, type, errors } = postgresChanges
|
|
const enrichedPayload = {
|
|
schema: schema,
|
|
table: table,
|
|
commit_timestamp: commit_timestamp,
|
|
eventType: type,
|
|
new: {},
|
|
old: {},
|
|
errors: errors,
|
|
}
|
|
handledPayload = {
|
|
...enrichedPayload,
|
|
...this._getPayloadRecords(postgresChanges),
|
|
}
|
|
}
|
|
bind.callback(handledPayload, ref)
|
|
})
|
|
}
|
|
}
|
|
|
|
/** @internal */
|
|
_isClosed(): boolean {
|
|
return this.state === CHANNEL_STATES.closed
|
|
}
|
|
|
|
/** @internal */
|
|
_isJoined(): boolean {
|
|
return this.state === CHANNEL_STATES.joined
|
|
}
|
|
|
|
/** @internal */
|
|
_isJoining(): boolean {
|
|
return this.state === CHANNEL_STATES.joining
|
|
}
|
|
|
|
/** @internal */
|
|
_isLeaving(): boolean {
|
|
return this.state === CHANNEL_STATES.leaving
|
|
}
|
|
|
|
/** @internal */
|
|
_replyEventName(ref: string): string {
|
|
return `chan_reply_${ref}`
|
|
}
|
|
|
|
/** @internal */
|
|
_on(type: string, filter: { [key: string]: any }, callback: Function) {
|
|
const typeLower = type.toLocaleLowerCase()
|
|
const binding = {
|
|
type: typeLower,
|
|
filter: filter,
|
|
callback: callback,
|
|
}
|
|
|
|
if (this.bindings[typeLower]) {
|
|
this.bindings[typeLower].push(binding)
|
|
} else {
|
|
this.bindings[typeLower] = [binding]
|
|
}
|
|
|
|
return this
|
|
}
|
|
|
|
/** @internal */
|
|
_off(type: string, filter: { [key: string]: any }) {
|
|
const typeLower = type.toLocaleLowerCase()
|
|
|
|
if (this.bindings[typeLower]) {
|
|
this.bindings[typeLower] = this.bindings[typeLower].filter((bind) => {
|
|
return !(
|
|
bind.type?.toLocaleLowerCase() === typeLower &&
|
|
RealtimeChannel.isEqual(bind.filter, filter)
|
|
)
|
|
})
|
|
}
|
|
return this
|
|
}
|
|
|
|
/** @internal */
|
|
private static isEqual(obj1: { [key: string]: string }, obj2: { [key: string]: string }) {
|
|
if (Object.keys(obj1).length !== Object.keys(obj2).length) {
|
|
return false
|
|
}
|
|
|
|
for (const k in obj1) {
|
|
if (obj1[k] !== obj2[k]) {
|
|
return false
|
|
}
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
/**
|
|
* Compares two optional filter values for equality.
|
|
* Treats undefined, null, and empty string as equivalent empty values.
|
|
* @internal
|
|
*/
|
|
private static isFilterValueEqual(
|
|
serverValue: string | undefined | null,
|
|
clientValue: string | undefined
|
|
): boolean {
|
|
const normalizedServer = serverValue ?? undefined
|
|
const normalizedClient = clientValue ?? undefined
|
|
return normalizedServer === normalizedClient
|
|
}
|
|
|
|
/** @internal */
|
|
private _rejoinUntilConnected() {
|
|
this.rejoinTimer.scheduleTimeout()
|
|
if (this.socket.isConnected()) {
|
|
this._rejoin()
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Registers a callback that will be executed when the channel closes.
|
|
*
|
|
* @internal
|
|
*/
|
|
private _onClose(callback: Function) {
|
|
this._on(CHANNEL_EVENTS.close, {}, callback)
|
|
}
|
|
|
|
/**
|
|
* Registers a callback that will be executed when the channel encounteres an error.
|
|
*
|
|
* @internal
|
|
*/
|
|
private _onError(callback: Function) {
|
|
this._on(CHANNEL_EVENTS.error, {}, (reason: string) => callback(reason))
|
|
}
|
|
|
|
/**
|
|
* Returns `true` if the socket is connected and the channel has been joined.
|
|
*
|
|
* @internal
|
|
*/
|
|
private _canPush(): boolean {
|
|
return this.socket.isConnected() && this._isJoined()
|
|
}
|
|
|
|
/** @internal */
|
|
private _rejoin(timeout = this.timeout): void {
|
|
if (this._isLeaving()) {
|
|
return
|
|
}
|
|
this.socket._leaveOpenTopic(this.topic)
|
|
this.state = CHANNEL_STATES.joining
|
|
this.joinPush.resend(timeout)
|
|
}
|
|
|
|
/** @internal */
|
|
private _getPayloadRecords(payload: any) {
|
|
const records = {
|
|
new: {},
|
|
old: {},
|
|
}
|
|
|
|
if (payload.type === 'INSERT' || payload.type === 'UPDATE') {
|
|
records.new = Transformers.convertChangeData(payload.columns, payload.record)
|
|
}
|
|
|
|
if (payload.type === 'UPDATE' || payload.type === 'DELETE') {
|
|
records.old = Transformers.convertChangeData(payload.columns, payload.old_record)
|
|
}
|
|
|
|
return records
|
|
}
|
|
}
|