Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion abstract-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ export type AbstractPoolConstructorOptions = AbstractRelayConstructorOptions & {
onRelayConnectionFailure?: (url: string) => void
// onRelayConnectionSuccess is called with the URL of a relay that succeeds the initial connection
onRelayConnectionSuccess?: (url: string) => void
// onRelayDisconnect is called when a relay disconnects during an active session
// (useful for monitoring connection health in long-running subscriptions)
onRelayDisconnect?: (url: string) => void
// onRelayReconnect is called when a relay successfully reconnects after a disconnection
onRelayReconnect?: (url: string) => void
// allowConnectingToRelay takes a relay URL and the operation being performed
// return false to skip connecting to that relay
allowConnectingToRelay?: (url: string, operation: ['read', Filter[]] | ['write', Event]) => boolean
Expand Down Expand Up @@ -52,6 +57,8 @@ export class AbstractSimplePool {
public trustedRelayURLs: Set<string> = new Set()
public onRelayConnectionFailure?: (url: string) => void
public onRelayConnectionSuccess?: (url: string) => void
public onRelayDisconnect?: (url: string) => void
public onRelayReconnect?: (url: string) => void
public allowConnectingToRelay?: (url: string, operation: ['read', Filter[]] | ['write', Event]) => boolean
public maxWaitForConnection: number

Expand All @@ -61,10 +68,12 @@ export class AbstractSimplePool {
this.verifyEvent = opts.verifyEvent
this._WebSocket = opts.websocketImplementation
this.enablePing = opts.enablePing
this.enableReconnect = opts.enableReconnect || false
this.enableReconnect = opts.enableReconnect ?? true
this.automaticallyAuth = opts.automaticallyAuth
this.onRelayConnectionFailure = opts.onRelayConnectionFailure
this.onRelayConnectionSuccess = opts.onRelayConnectionSuccess
this.onRelayDisconnect = opts.onRelayDisconnect
this.onRelayReconnect = opts.onRelayReconnect
this.allowConnectingToRelay = opts.allowConnectingToRelay
this.maxWaitForConnection = opts.maxWaitForConnection || 3000
}
Expand All @@ -87,8 +96,12 @@ export class AbstractSimplePool {
enableReconnect: this.enableReconnect,
})
relay.onclose = () => {
this.onRelayDisconnect?.(url)
this.relays.delete(url)
}
relay.onreconnect = () => {
this.onRelayReconnect?.(url)
}
this.relays.set(url, relay)
}

Expand Down
5 changes: 5 additions & 0 deletions abstract-relay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export class AbstractRelay {
private _connected: boolean = false

public onclose: (() => void) | null = null
public onreconnect: (() => void) | null = null
public onnotice: (msg: string) => void = msg => console.debug(`NOTICE from ${this.url}: ${msg}`)
public onauth: undefined | ((evt: EventTemplate) => Promise<VerifiedEvent>)

Expand Down Expand Up @@ -170,6 +171,10 @@ export class AbstractRelay {
const isReconnection = this.reconnectAttempts > 0
this.reconnectAttempts = 0

if (isReconnection) {
this.onreconnect?.()
}

// resubscribe to all open subscriptions
for (const sub of this.openSubs.values()) {
sub.eosed = false
Expand Down
51 changes: 26 additions & 25 deletions pool.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,15 @@ test('get()', async () => {

test('ping-pong timeout in pool', async () => {
const mockRelay = mockRelays[0]
pool = new SimplePool({ enablePing: true })
pool = new SimplePool({ enablePing: true, enableReconnect: false })
const relay = await pool.ensureRelay(mockRelay.url)

// ensureRelay() calls connect() internally, which starts the ping interval
// with the default frequency (29s). We need to restart it with our test values.
relay.pingTimeout = 50
relay.pingFrequency = 50
clearInterval((relay as any).pingIntervalHandle)
;(relay as any).pingIntervalHandle = setInterval(() => (relay as any).pingpong(), relay.pingFrequency)

let closed = false
const closedPromise = new Promise<void>(resolve => {
Expand Down Expand Up @@ -257,34 +262,33 @@ test('reconnect on disconnect in pool', async () => {
const mockRelay = mockRelays[0]
pool = new SimplePool({ enablePing: true, enableReconnect: true })
const relay = await pool.ensureRelay(mockRelay.url)

// Restart ping interval with test-friendly timing (see ping-pong test above)
relay.pingTimeout = 50
relay.pingFrequency = 50
relay.resubscribeBackoff = [50, 100]

let closes = 0
relay.onclose = () => {
closes++
}
clearInterval((relay as any).pingIntervalHandle)
;(relay as any).pingIntervalHandle = setInterval(() => (relay as any).pingpong(), relay.pingFrequency)

expect(relay.connected).toBeTrue()

// wait for the first ping to succeed
await new Promise(resolve => setTimeout(resolve, 75))
expect(closes).toBe(0)

// now make it unresponsive
mockRelay.unresponsive = true

// wait for the second ping to fail, which will trigger a close
// wait for disconnect (relay.connected becomes false)
// note: onclose is NOT called when enableReconnect is true — the relay
// goes straight to reconnection instead of signaling a permanent close
await new Promise(resolve => {
const interval = setInterval(() => {
if (closes > 0) {
if (!relay.connected) {
clearInterval(interval)
resolve(null)
}
}, 10)
})
expect(closes).toBe(1)
expect(relay.connected).toBeFalse()

// now make it responsive again
Expand All @@ -301,7 +305,6 @@ test('reconnect on disconnect in pool', async () => {
})

expect(relay.connected).toBeTrue()
expect(closes).toBe(1)
})

test('reconnect with filter update in pool', async () => {
Expand All @@ -311,14 +314,13 @@ test('reconnect with filter update in pool', async () => {
enableReconnect: true,
})
const relay = await pool.ensureRelay(mockRelay.url)

// Restart ping interval with test-friendly timing (see ping-pong test above)
relay.pingTimeout = 50
relay.pingFrequency = 50
relay.resubscribeBackoff = [50, 100]

let closes = 0
relay.onclose = () => {
closes++
}
clearInterval((relay as any).pingIntervalHandle)
;(relay as any).pingIntervalHandle = setInterval(() => (relay as any).pingpong(), relay.pingFrequency)

expect(relay.connected).toBeTrue()

Expand All @@ -327,21 +329,19 @@ test('reconnect with filter update in pool', async () => {

// wait for the first ping to succeed
await new Promise(resolve => setTimeout(resolve, 75))
expect(closes).toBe(0)

// now make it unresponsive
mockRelay.unresponsive = true

// wait for the second ping to fail, which will trigger a close
// wait for disconnect (relay.connected becomes false)
await new Promise(resolve => {
const interval = setInterval(() => {
if (closes > 0) {
if (!relay.connected) {
clearInterval(interval)
resolve(null)
}
}, 10)
})
expect(closes).toBe(1)
expect(relay.connected).toBeFalse()

// now make it responsive again
Expand All @@ -358,7 +358,6 @@ test('reconnect with filter update in pool', async () => {
})

expect(relay.connected).toBeTrue()
expect(closes).toBe(1)

// check if filter was updated
expect(sub.filters[0].since).toBeGreaterThan(1)
Expand Down Expand Up @@ -407,10 +406,12 @@ test('oninvalidevent is called through the pool for invalid events', async done
})

const sk = generateSecretKey()
const wrongFieldTypeEvent = [finalizeEvent(
{ kind: 1, content: 'hello', created_at: Math.floor(Date.now() / 1000), tags: [] },
sk,
)].map(v => { (v as any).kind = '1'; return v })[0]
const wrongFieldTypeEvent = [
finalizeEvent({ kind: 1, content: 'hello', created_at: Math.floor(Date.now() / 1000), tags: [] }, sk),
].map(v => {
;(v as any).kind = '1'
return v
})[0]

relay._onmessage({ data: JSON.stringify(['EVENT', sub.id, wrongFieldTypeEvent]) } as MessageEvent)
})
37 changes: 18 additions & 19 deletions relay.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -294,31 +294,26 @@ test('reconnect on disconnect', async () => {
relay.pingFrequency = 50
relay.resubscribeBackoff = [50, 100] // short backoff for testing

let closes = 0
relay.onclose = () => {
closes++
}

await relay.connect()
expect(relay.connected).toBeTrue()

// wait for the first ping to succeed
await new Promise(resolve => setTimeout(resolve, 75))
expect(closes).toBe(0)

// now make it unresponsive
mockRelay.unresponsive = true

// wait for the second ping to fail, which will trigger a close
// wait for disconnect (relay.connected becomes false)
// note: onclose is NOT called when enableReconnect is true — the relay
// goes straight to reconnection instead of signaling a permanent close
await new Promise(resolve => {
const interval = setInterval(() => {
if (closes > 0) {
if (!relay.connected) {
clearInterval(interval)
resolve(null)
}
}, 10)
})
expect(closes).toBe(1)
expect(relay.connected).toBeFalse()

// now make it responsive again
Expand All @@ -335,7 +330,6 @@ test('reconnect on disconnect', async () => {
})

expect(relay.connected).toBeTrue()
expect(closes).toBe(1) // should not have closed again
})

test('oninvalidevent is called for malformed events', async done => {
Expand All @@ -353,15 +347,20 @@ test('oninvalidevent is called for malformed events', async done => {
})

const sk = generateSecretKey()
const wrongFieldTypeEvent = [finalizeEvent(
{
kind: 1,
content: 'content',
created_at: 0,
tags: [],
},
sk
)].map(v => { (v as any).kind = '1'; return v })[0]
const wrongFieldTypeEvent = [
finalizeEvent(
{
kind: 1,
content: 'content',
created_at: 0,
tags: [],
},
sk,
),
].map(v => {
;(v as any).kind = '1'
return v
})[0]

relay._onmessage({ data: JSON.stringify(['EVENT', sub.id, wrongFieldTypeEvent]) } as MessageEvent)
})
Expand Down
Loading