diff --git a/packages/core/realtime-js/src/RealtimeChannel.ts b/packages/core/realtime-js/src/RealtimeChannel.ts index ee857a1fe..1c68433db 100644 --- a/packages/core/realtime-js/src/RealtimeChannel.ts +++ b/packages/core/realtime-js/src/RealtimeChannel.ts @@ -266,6 +266,7 @@ export default class RealtimeChannel { if (!this.socket.isConnected()) { this.socket.connect() } + const delay = this.socket._recordChannelJoin() if (this.channelAdapter.isClosed()) { const { config: { broadcast, presence, private: isPrivate }, @@ -299,30 +300,38 @@ export default class RealtimeChannel { this._updateFilterMessage() - this.channelAdapter - .subscribe(timeout) - .receive('ok', async ({ postgres_changes }: PostgresChangesFilters) => { - // Only refresh auth if using callback-based tokens - if (!this.socket._isManualToken()) { - this.socket.setAuth() - } - if (postgres_changes === undefined) { - callback?.(REALTIME_SUBSCRIBE_STATES.SUBSCRIBED) - return - } - - this._updatePostgresBindings(postgres_changes, callback) - }) - .receive('error', (error: { [key: string]: any }) => { - this.state = CHANNEL_STATES.errored - callback?.( - REALTIME_SUBSCRIBE_STATES.CHANNEL_ERROR, - new Error(JSON.stringify(Object.values(error).join(', ') || 'error')) - ) - }) - .receive('timeout', () => { - callback?.(REALTIME_SUBSCRIBE_STATES.TIMED_OUT) - }) + const doSubscribe = () => { + this.channelAdapter + .subscribe(timeout) + .receive('ok', async ({ postgres_changes }: PostgresChangesFilters) => { + // Only refresh auth if using callback-based tokens + if (!this.socket._isManualToken()) { + this.socket.setAuth() + } + if (postgres_changes === undefined) { + callback?.(REALTIME_SUBSCRIBE_STATES.SUBSCRIBED) + return + } + + this._updatePostgresBindings(postgres_changes, callback) + }) + .receive('error', (error: { [key: string]: any }) => { + this.state = CHANNEL_STATES.errored + callback?.( + REALTIME_SUBSCRIBE_STATES.CHANNEL_ERROR, + new Error(JSON.stringify(Object.values(error).join(', ') || 'error')) + ) + }) + .receive('timeout', () => { + callback?.(REALTIME_SUBSCRIBE_STATES.TIMED_OUT) + }) + } + + if (delay > 0) { + setTimeout(doSubscribe, delay) + } else { + doSubscribe() + } } return this } diff --git a/packages/core/realtime-js/src/RealtimeClient.ts b/packages/core/realtime-js/src/RealtimeClient.ts index 46acee1eb..68e1bc6e4 100755 --- a/packages/core/realtime-js/src/RealtimeClient.ts +++ b/packages/core/realtime-js/src/RealtimeClient.ts @@ -11,6 +11,11 @@ import { } from './lib/constants' import Serializer from './lib/serializer' +import { + RateLimiter, + DEFAULT_SUBSCRIPTION_WARNING_CONFIG, + type SubscriptionWarningConfig, +} from './lib/rate-limiter' import { httpEndpointURL } from './lib/transformers' import RealtimeChannel from './RealtimeChannel' import type { RealtimeChannelOptions } from './RealtimeChannel' @@ -72,6 +77,7 @@ export type RealtimeClientOptions = { worker?: boolean workerUrl?: string accessToken?: () => Promise + subscriptionWarnings?: Partial | false } const WORKER_SCRIPT = ` @@ -177,6 +183,7 @@ export default class RealtimeClient { private _authPromise: Promise | null = null private _workerHeartbeatTimer: HeartbeatTimer = undefined private _pendingWorkerHeartbeatRef: string | null = null + private _rateLimiter: RateLimiter | null = null /** * Initializes the Socket. @@ -223,6 +230,13 @@ export default class RealtimeClient { this.httpEndpoint = httpEndpointURL(endPoint) this.fetch = this._resolveFetch(options?.fetch) + + if (options?.subscriptionWarnings !== false) { + this._rateLimiter = new RateLimiter({ + ...DEFAULT_SUBSCRIPTION_WARNING_CONFIG, + ...options?.subscriptionWarnings, + }) + } } /** @@ -412,13 +426,28 @@ export default class RealtimeClient { if (!exists) { const chan = new RealtimeChannel(`realtime:${topic}`, params, this) this.channels.push(chan) - return chan } else { return exists } } + /** + * Records a channel join attempt for rate limit tracking. + * Called by RealtimeChannel.subscribe() before initiating the join. + * @internal + */ + _recordChannelJoin(): number { + if (!this._rateLimiter) return 0 + + this._rateLimiter.recordJoin() + const { warning, delayMs } = this._rateLimiter.evaluate() + if (warning) { + this.log('warn', warning.message) + } + return delayMs + } + /** * Push out a message if the socket is connected. * diff --git a/packages/core/realtime-js/src/lib/rate-limiter.ts b/packages/core/realtime-js/src/lib/rate-limiter.ts new file mode 100644 index 000000000..99367546d --- /dev/null +++ b/packages/core/realtime-js/src/lib/rate-limiter.ts @@ -0,0 +1,88 @@ +export type SubscriptionWarningConfig = { + readonly joinRatePerSecond: number + readonly joinDelayMs: number +} + +export const DEFAULT_SUBSCRIPTION_WARNING_CONFIG = { + joinRatePerSecond: 20, + joinDelayMs: 100, +} as const satisfies SubscriptionWarningConfig + +export type SubscriptionWarning = { + readonly current: number + readonly threshold: number + readonly message: string +} + +export type RateLimitEvaluation = { + readonly warning: SubscriptionWarning | null + readonly delayMs: number +} + +const WARN_COOLDOWN_MS = 10_000 +const TROUBLESHOOTING_URL = + 'https://supabase.com/docs/guides/troubleshooting/realtime-too-many-channels-error' + +class RingBuffer { + private readonly slots: Float64Array + private head = 0 + private count = 0 + + constructor(capacity: number) { + this.slots = new Float64Array(capacity) + } + + push(ts: number): void { + this.slots[this.head] = ts + this.head = (this.head + 1) % this.slots.length + if (this.count < this.slots.length) this.count++ + } + + countWithin(ms: number, now: number): number { + const cutoff = now - ms + let n = 0 + for (let i = 0; i < this.count; i++) { + const idx = (this.head - 1 - i + this.slots.length) % this.slots.length + if (this.slots[idx] > cutoff) n++ + } + return n + } +} + +export class RateLimiter { + private readonly joins: RingBuffer + private lastWarnedAt = 0 + + constructor(private readonly config: SubscriptionWarningConfig) { + this.joins = new RingBuffer(config.joinRatePerSecond * 3) + } + + recordJoin(): void { + this.joins.push(Date.now()) + } + + evaluate(now: number = Date.now()): RateLimitEvaluation { + const { joinRatePerSecond, joinDelayMs } = this.config + const recentJoins = this.joins.countWithin(1000, now) + + let warning: SubscriptionWarning | null = null + if (recentJoins >= joinRatePerSecond) { + if (now - this.lastWarnedAt > WARN_COOLDOWN_MS) { + this.lastWarnedAt = now + warning = { + current: recentJoins, + threshold: joinRatePerSecond, + message: + `Realtime: ${recentJoins} channel joins in the last second (threshold: ${joinRatePerSecond}). ` + + `You may be creating channels too rapidly — this often indicates a missing cleanup or channels being created in a render loop. ` + + `See: ${TROUBLESHOOTING_URL}`, + } + } + } + + const excess = recentJoins - joinRatePerSecond + const delayMs = excess > 0 ? excess * joinDelayMs : 0 + + return { warning, delayMs } + } +} diff --git a/packages/core/realtime-js/test/RateLimiter.test.ts b/packages/core/realtime-js/test/RateLimiter.test.ts new file mode 100644 index 000000000..4c641d5a4 --- /dev/null +++ b/packages/core/realtime-js/test/RateLimiter.test.ts @@ -0,0 +1,237 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest' +import { RateLimiter, DEFAULT_SUBSCRIPTION_WARNING_CONFIG } from '../src/lib/rate-limiter' + +describe('RateLimiter', () => { + beforeEach(() => { + vi.useFakeTimers() + }) + + afterEach(() => { + vi.useRealTimers() + }) + + it('returns no warning when below threshold', () => { + const limiter = new RateLimiter(DEFAULT_SUBSCRIPTION_WARNING_CONFIG) + expect(limiter.evaluate().warning).toBeNull() + }) + + describe('join rate warnings', () => { + it('warns when joins per second reach the threshold', () => { + const limiter = new RateLimiter(DEFAULT_SUBSCRIPTION_WARNING_CONFIG) + + for (let i = 0; i < 20; i++) { + limiter.recordJoin() + } + + const { warning } = limiter.evaluate() + expect(warning).not.toBeNull() + expect(warning!.current).toBe(20) + expect(warning!.threshold).toBe(20) + expect(warning!.message).toContain('20 channel joins in the last second') + expect(warning!.message).toContain('render loop') + }) + + it('does not warn when joins are spread across seconds', () => { + const limiter = new RateLimiter(DEFAULT_SUBSCRIPTION_WARNING_CONFIG) + + for (let i = 0; i < 15; i++) { + limiter.recordJoin() + } + vi.advanceTimersByTime(1_100) + for (let i = 0; i < 15; i++) { + limiter.recordJoin() + } + + expect(limiter.evaluate().warning).toBeNull() + }) + + it('does not repeat the warning within the cooldown period', () => { + const limiter = new RateLimiter(DEFAULT_SUBSCRIPTION_WARNING_CONFIG) + + for (let i = 0; i < 20; i++) { + limiter.recordJoin() + } + expect(limiter.evaluate().warning).not.toBeNull() + + vi.advanceTimersByTime(5_000) + expect(limiter.evaluate().warning).toBeNull() + }) + + it('repeats the warning after the cooldown expires', () => { + const limiter = new RateLimiter(DEFAULT_SUBSCRIPTION_WARNING_CONFIG) + + for (let i = 0; i < 20; i++) { + limiter.recordJoin() + } + expect(limiter.evaluate().warning).not.toBeNull() + + vi.advanceTimersByTime(10_001) + + for (let i = 0; i < 20; i++) { + limiter.recordJoin() + } + expect(limiter.evaluate().warning).not.toBeNull() + }) + + it('does not warn after the 1 second window expires', () => { + const limiter = new RateLimiter(DEFAULT_SUBSCRIPTION_WARNING_CONFIG) + + for (let i = 0; i < 20; i++) { + limiter.recordJoin() + } + + vi.advanceTimersByTime(1_100) + + expect(limiter.evaluate().warning).toBeNull() + }) + + it('naturally resets once the time window expires without explicit purge', () => { + const limiter = new RateLimiter(DEFAULT_SUBSCRIPTION_WARNING_CONFIG) + + for (let i = 0; i < 25; i++) { + limiter.recordJoin() + } + expect(limiter.evaluate().warning).not.toBeNull() + + vi.advanceTimersByTime(1_100) + expect(limiter.evaluate().warning).toBeNull() + + for (let i = 0; i < 5; i++) { + limiter.recordJoin() + } + expect(limiter.evaluate().warning).toBeNull() + }) + + it('retains sub-threshold join counts across evaluate calls', () => { + const limiter = new RateLimiter(DEFAULT_SUBSCRIPTION_WARNING_CONFIG) + + for (let i = 0; i < 19; i++) { + limiter.recordJoin() + } + expect(limiter.evaluate().delayMs).toBe(0) + + limiter.recordJoin() + expect(limiter.evaluate().warning).not.toBeNull() + }) + }) + + describe('evaluate', () => { + it('returns both warning and delay in a single call', () => { + const limiter = new RateLimiter(DEFAULT_SUBSCRIPTION_WARNING_CONFIG) + + for (let i = 0; i < 25; i++) { + limiter.recordJoin() + } + + const result = limiter.evaluate() + expect(result.warning).not.toBeNull() + expect(result.warning!.current).toBe(25) + expect(result.delayMs).toBe(500) + }) + + it('returns no warning and zero delay when under threshold', () => { + const limiter = new RateLimiter(DEFAULT_SUBSCRIPTION_WARNING_CONFIG) + + for (let i = 0; i < 10; i++) { + limiter.recordJoin() + } + + const result = limiter.evaluate() + expect(result.warning).toBeNull() + expect(result.delayMs).toBe(0) + }) + + it('works with the _recordChannelJoin pattern', () => { + const limiter = new RateLimiter(DEFAULT_SUBSCRIPTION_WARNING_CONFIG) + + for (let i = 0; i < 19; i++) { + limiter.recordJoin() + const { warning, delayMs } = limiter.evaluate() + expect(warning).toBeNull() + expect(delayMs).toBe(0) + } + + limiter.recordJoin() + const atThreshold = limiter.evaluate() + expect(atThreshold.warning).not.toBeNull() + expect(atThreshold.delayMs).toBe(0) + + limiter.recordJoin() + const overThreshold = limiter.evaluate() + expect(overThreshold.warning).toBeNull() // cooldown active + expect(overThreshold.delayMs).toBe(100) + }) + }) + + describe('delay', () => { + it('returns 0 when below the join rate threshold', () => { + const limiter = new RateLimiter(DEFAULT_SUBSCRIPTION_WARNING_CONFIG) + + for (let i = 0; i < 19; i++) { + limiter.recordJoin() + } + + expect(limiter.evaluate().delayMs).toBe(0) + }) + + it('returns 0 when exactly at the join rate threshold', () => { + const limiter = new RateLimiter(DEFAULT_SUBSCRIPTION_WARNING_CONFIG) + + for (let i = 0; i < 20; i++) { + limiter.recordJoin() + } + + expect(limiter.evaluate().delayMs).toBe(0) + }) + + it('returns linear delay proportional to excess joins', () => { + const limiter = new RateLimiter(DEFAULT_SUBSCRIPTION_WARNING_CONFIG) + + for (let i = 0; i < 25; i++) { + limiter.recordJoin() + } + + expect(limiter.evaluate().delayMs).toBe(500) + }) + + it('respects a custom joinDelayMs', () => { + const limiter = new RateLimiter({ ...DEFAULT_SUBSCRIPTION_WARNING_CONFIG, joinDelayMs: 50 }) + + for (let i = 0; i < 23; i++) { + limiter.recordJoin() + } + + expect(limiter.evaluate().delayMs).toBe(150) + }) + + it('only counts joins within the last second', () => { + const limiter = new RateLimiter(DEFAULT_SUBSCRIPTION_WARNING_CONFIG) + + for (let i = 0; i < 25; i++) { + limiter.recordJoin() + } + + vi.advanceTimersByTime(1_100) + + expect(limiter.evaluate().delayMs).toBe(0) + }) + }) + + describe('ring buffer overflow', () => { + it('handles more joins than buffer capacity and still reports at least threshold', () => { + const config = { joinRatePerSecond: 20, joinDelayMs: 100 } + const limiter = new RateLimiter(config) + const capacity = config.joinRatePerSecond * 3 // 60 + + for (let i = 0; i < capacity + 10; i++) { + limiter.recordJoin() + } + + const { warning, delayMs } = limiter.evaluate() + expect(warning).not.toBeNull() + expect(warning!.current).toBeLessThanOrEqual(capacity) + expect(warning!.current).toBeGreaterThanOrEqual(config.joinRatePerSecond) + expect(delayMs).toBeGreaterThan(0) + }) + }) +}) diff --git a/packages/core/supabase-js/test/integration/bun/integration.test.ts b/packages/core/supabase-js/test/integration/bun/integration.test.ts index 66c405891..e7a9b1762 100644 --- a/packages/core/supabase-js/test/integration/bun/integration.test.ts +++ b/packages/core/supabase-js/test/integration/bun/integration.test.ts @@ -120,6 +120,49 @@ test('should get current user', async () => { expect(data.user!.email).toBe(email) }) +versions.forEach((vsn) => { + describe(`Realtime throttle with vsn ${vsn}`, () => { + test('should subscribe all channels even when rate is exceeded', async () => { + const client = createClient(SUPABASE_URL, ANON_KEY, { + realtime: { + heartbeatIntervalMs: 500, + vsn, + subscriptionWarnings: { joinRatePerSecond: 3, joinDelayMs: 50 }, + }, + }) + + const channelCount = 5 + const channels = Array.from({ length: channelCount }, (_, i) => + client.channel(`throttle-${vsn}-${i}-${Date.now()}`, { + config: { broadcast: { ack: true, self: true } }, + }) + ) + + const subscribed: string[] = [] + + channels.forEach((channel) => { + channel.subscribe((status) => { + if (status === 'SUBSCRIBED') subscribed.push(channel.topic) + }) + }) + + let attempts = 0 + while (subscribed.length < channelCount) { + if (attempts > 300) + throw new Error( + `Only ${subscribed.length}/${channelCount} channels subscribed after throttle delay` + ) + await new Promise((resolve) => setTimeout(resolve, 100)) + attempts++ + } + + expect(subscribed.length).toBe(channelCount) + + await client.removeAllChannels() + }, 35000) + }) +}) + test('should handle invalid credentials', async () => { await supabase.auth.signOut() const email = `bun-invalid-${Date.now()}@example.com` diff --git a/packages/core/supabase-js/test/integration/expo/__tests__/Index.test.tsx b/packages/core/supabase-js/test/integration/expo/__tests__/Index.test.tsx index a35b0e798..2cc33f05c 100644 --- a/packages/core/supabase-js/test/integration/expo/__tests__/Index.test.tsx +++ b/packages/core/supabase-js/test/integration/expo/__tests__/Index.test.tsx @@ -9,6 +9,33 @@ describe('Index', () => { const versions = [{ vsn: '1.0.0' }, { vsn: '2.0.0' }] versions.forEach(({ vsn }) => { + describe(`Realtime throttle with vsn: ${vsn}`, () => { + it('should subscribe all channels even when rate is exceeded', async () => { + const { getByTestId, unmount } = render() + + await waitFor( + () => { + const subscribed = parseInt(getByTestId('throttle_subscribed').props.children, 10) + const total = parseInt(getByTestId('throttle_channel_count').props.children, 10) + expect(subscribed).toBe(total) + }, + { + timeout: 30000, + interval: 500, + onTimeout: (error) => { + const subscribed = getByTestId('throttle_subscribed').props.children + const total = getByTestId('throttle_channel_count').props.children + throw new Error( + `Timeout: only ${subscribed}/${total} channels subscribed with vsn ${vsn}. ${error.message}` + ) + }, + } + ) + + unmount() + }, 35000) + }) + describe(`Realtime with vsn: ${vsn}`, () => { it('should display SUBSCRIBED status when realtime connection is established', async () => { const { getByTestId, unmount } = render() diff --git a/packages/core/supabase-js/test/integration/expo/app/index.tsx b/packages/core/supabase-js/test/integration/expo/app/index.tsx index dfa9489ed..5b0def7e4 100644 --- a/packages/core/supabase-js/test/integration/expo/app/index.tsx +++ b/packages/core/supabase-js/test/integration/expo/app/index.tsx @@ -8,20 +8,47 @@ const TEST_ANON_KEY = interface IndexProps { vsn?: string + throttleTest?: boolean } -export default function Index({ vsn = '1.0.0' }: IndexProps) { +export default function Index({ vsn = '1.0.0', throttleTest = false }: IndexProps) { const [realtimeStatus, setRealtimeStatus] = useState(null) const [receivedMessage, setReceivedMessage] = useState(null) + const [throttleSubscribed, setThrottleSubscribed] = useState(0) + const throttleChannelCount = 5 useEffect(() => { const supabase = createClient(SUPABASE_URL, TEST_ANON_KEY, { realtime: { heartbeatIntervalMs: 500, vsn: vsn, + ...(throttleTest ? { subscriptionWarnings: { joinRatePerSecond: 3, joinDelayMs: 50 } } : {}), } }) + if (throttleTest) { + const channels = Array.from({ length: throttleChannelCount }, (_, i) => + supabase.channel(`throttle-expo-${vsn}-${i}`, { + config: { broadcast: { ack: true, self: true } } + }) + ) + + let count = 0 + channels.forEach((channel) => { + channel.subscribe((status) => { + if (status === 'SUBSCRIBED') { + count++ + setThrottleSubscribed(count) + } + }) + }) + + return () => { + channels.forEach((ch) => ch.unsubscribe()) + supabase.realtime.disconnect() + } + } + const channel = supabase.channel(`realtime:public:todos-${vsn}`, { config: { broadcast: { ack: true, self: true } } }) @@ -50,7 +77,7 @@ export default function Index({ vsn = '1.0.0' }: IndexProps) { channel.unsubscribe() supabase.realtime.disconnect() } - }, [vsn]) + }, [vsn, throttleTest]) return ( {vsn} {realtimeStatus || ''} {receivedMessage || ''} + {String(throttleSubscribed)} + {String(throttleChannelCount)} ) } diff --git a/packages/core/supabase-js/test/integration/next/app/page.tsx b/packages/core/supabase-js/test/integration/next/app/page.tsx index 5772ce45a..4d17ebdfc 100644 --- a/packages/core/supabase-js/test/integration/next/app/page.tsx +++ b/packages/core/supabase-js/test/integration/next/app/page.tsx @@ -4,15 +4,44 @@ import { Suspense, useEffect, useState } from 'react' import { createClient } from '@/lib/supabase/client' import { useSearchParams } from 'next/navigation' +const THROTTLE_CHANNEL_COUNT = 5 + function HomeContent() { const searchParams = useSearchParams() const vsn = searchParams.get('vsn') || '1.0.0' + const throttleTest = searchParams.get('throttle') === 'true' const supabase = createClient(vsn) const [realtimeStatus, setRealtimeStatus] = useState(null) const [receivedMessage, setReceivedMessage] = useState(null) + const [throttleSubscribed, setThrottleSubscribed] = useState(0) useEffect(() => { + if (throttleTest) { + const throttleClient = createClient(vsn, { + realtime: { heartbeatIntervalMs: 500, vsn, subscriptionWarnings: { joinRatePerSecond: 3, joinDelayMs: 50 } }, + }) + const channels = Array.from({ length: THROTTLE_CHANNEL_COUNT }, (_, i) => + throttleClient.channel(`throttle-next-${vsn}-${i}`, { + config: { broadcast: { ack: true, self: true } } + }) + ) + + let count = 0 + channels.forEach((channel) => { + channel.subscribe((status) => { + if (status === 'SUBSCRIBED') { + count++ + setThrottleSubscribed(count) + } + }) + }) + + return () => { + channels.forEach((ch) => ch.unsubscribe()) + } + } + const channel = supabase.channel(`realtime:public:test-${vsn}`, { config: { broadcast: { ack: true, self: true } } }) @@ -39,7 +68,7 @@ function HomeContent() { return () => { channel.unsubscribe() } - }, [vsn]) + }, [vsn, throttleTest]) return (
@@ -48,6 +77,8 @@ function HomeContent() { {receivedMessage && (
{receivedMessage}
)} +
{throttleSubscribed}
+
{THROTTLE_CHANNEL_COUNT}
) } diff --git a/packages/core/supabase-js/test/integration/next/lib/supabase/client.ts b/packages/core/supabase-js/test/integration/next/lib/supabase/client.ts index 129e8c9f3..4e1a18489 100644 --- a/packages/core/supabase-js/test/integration/next/lib/supabase/client.ts +++ b/packages/core/supabase-js/test/integration/next/lib/supabase/client.ts @@ -1,6 +1,7 @@ import { createBrowserClient } from '@supabase/ssr' +import type { SupabaseClientOptions } from '@supabase/supabase-js' -export function createClient(vsn: string = '1.0.0') { +export function createClient(vsn: string = '1.0.0', extraOptions: Partial> = {}) { return createBrowserClient( process.env.NEXT_PUBLIC_SUPABASE_URL || 'http://127.0.0.1:54321', process.env.NEXT_PUBLIC_SUPABASE_ANON_KEY || @@ -9,7 +10,8 @@ export function createClient(vsn: string = '1.0.0') { realtime: { heartbeatIntervalMs: 500, vsn: vsn, - } + }, + ...extraOptions, } ) } diff --git a/packages/core/supabase-js/test/integration/next/tests/home.spec.ts b/packages/core/supabase-js/test/integration/next/tests/home.spec.ts index ce0f95115..1606d58c9 100644 --- a/packages/core/supabase-js/test/integration/next/tests/home.spec.ts +++ b/packages/core/supabase-js/test/integration/next/tests/home.spec.ts @@ -4,6 +4,21 @@ test.describe('Home Page', () => { const versions = [{ vsn: '1.0.0' }, { vsn: '2.0.0' }] versions.forEach(({ vsn }) => { + test.describe(`Realtime throttle with vsn: ${vsn}`, () => { + test('should subscribe all channels even when rate is exceeded', async ({ page }) => { + await page.goto(`/?vsn=${vsn}&throttle=true`) + + await expect(page.getByTestId('throttle_subscribed')).toHaveText( + String(5), + { timeout: 30000 } + ) + + const total = await page.getByTestId('throttle_channel_count').textContent() + const subscribed = await page.getByTestId('throttle_subscribed').textContent() + expect(subscribed).toBe(total) + }) + }) + test.describe(`Realtime with vsn: ${vsn}`, () => { test('should subscribe to realtime channel', async ({ page }) => { await page.goto(`/?vsn=${vsn}`) diff --git a/packages/core/supabase-js/test/integration/node-browser/index.html b/packages/core/supabase-js/test/integration/node-browser/index.html index 320ee2634..e1e99aa67 100644 --- a/packages/core/supabase-js/test/integration/node-browser/index.html +++ b/packages/core/supabase-js/test/integration/node-browser/index.html @@ -138,6 +138,43 @@ log('setTimeout calls: ' + JSON.stringify(timeoutCalls)) log('setInterval calls: ' + JSON.stringify(intervalCalls)) }, 3000) + + // Throttle test: create channels above rate limit and verify all subscribe + if (urlParams.get('throttle') === 'true') { + log('Starting throttle test...') + const throttleClient = createClient( + 'http://127.0.0.1:54321', + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6ImFub24iLCJleHAiOjE5ODM4MTI5OTZ9.CRXP1A7WOeoJeXxjNni43kdQwgnWNReilDMblYTn_I0', + { + realtime: { + transport: window.WebSocket, + heartbeatIntervalMs: 500, + vsn: vsn, + subscriptionWarnings: { joinRatePerSecond: 3, joinDelayMs: 50 }, + }, + } + ) + + const channelCount = 5 + let throttleSubscribed = 0 + const throttleChannels = [] + + for (let i = 0; i < channelCount; i++) { + const ch = throttleClient.channel('throttle-browser-' + vsn + '-' + i, { + config: { broadcast: { ack: true, self: true } }, + }) + ch.subscribe((status) => { + if (status === 'SUBSCRIBED') { + throttleSubscribed++ + log('Throttle channel subscribed: ' + throttleSubscribed + '/' + channelCount) + if (throttleSubscribed === channelCount) { + log('Throttle test complete: all channels subscribed') + } + } + }) + throttleChannels.push(ch) + } + } } catch (error) { log('Error in WebSocket test: ' + error.message) log('Error stack: ' + error.stack) diff --git a/packages/core/supabase-js/test/integration/node-browser/websocket.spec.ts b/packages/core/supabase-js/test/integration/node-browser/websocket.spec.ts index b78cf7dc3..cbff66103 100644 --- a/packages/core/supabase-js/test/integration/node-browser/websocket.spec.ts +++ b/packages/core/supabase-js/test/integration/node-browser/websocket.spec.ts @@ -4,6 +4,23 @@ test.describe('WebSocket Browser Tests', () => { const versions = [{ vsn: '1.0.0' }, { vsn: '2.0.0' }] versions.forEach(({ vsn }) => { + test.describe(`Realtime throttle with vsn: ${vsn}`, () => { + test('should subscribe all channels even when rate is exceeded', async ({ page }) => { + await page.goto(`/?vsn=${vsn}&throttle=true`) + await expect(page.locator('#log')).toBeVisible() + + await expect(page.locator('#log')).toContainText( + 'Throttle test complete: all channels subscribed', + { timeout: 30000 } + ) + + expect(await page.locator('#log').textContent()).not.toContain('Global error') + expect(await page.locator('#log').textContent()).not.toContain( + 'Unhandled promise rejection' + ) + }) + }) + test.describe(`WebSocket with vsn: ${vsn}`, () => { test('should test WebSocket transport', async ({ page }) => { await page.goto(`/?vsn=${vsn}`)