From 4bb7f4805bbcc1da3e447af14c3488bebf08aea8 Mon Sep 17 00:00:00 2001 From: Stewart McGown Date: Tue, 28 Apr 2026 22:54:05 +0100 Subject: [PATCH 1/2] fix(scheduler): don't unsubscribe sibling actions on error in AsyncScheduler flush MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR #6674 fixed AnimationFrameScheduler and AsapScheduler to scope flush cycles using a flushId, so errors only kill actions belonging to the same flush. AsyncScheduler (and QueueScheduler which extends it) was never given the same fix. Its error handler blindly unsubscribed ALL remaining queued actions when any single action threw. This is the root cause of NgRx "store death": observeOn(queueScheduler) queues independent subscriber notifications in the same flush cycle. When one subscriber's handler throws, the error path destroyed every other subscriber's pending action — permanently killing the store. The erroring action already unsubscribes itself inside AsyncAction._execute(). The remaining actions are independent operations that should survive. Remove the blanket unsubscribe loop in the error path. Relates to #6672, #4690, #2697 Made-with: Cursor --- spec/schedulers/QueueScheduler-spec.ts | 78 ++++++++++++++++++++++-- src/internal/scheduler/AsyncScheduler.ts | 3 - 2 files changed, 74 insertions(+), 7 deletions(-) diff --git a/spec/schedulers/QueueScheduler-spec.ts b/spec/schedulers/QueueScheduler-spec.ts index 2d0e9a0059..7dddafef4a 100644 --- a/spec/schedulers/QueueScheduler-spec.ts +++ b/spec/schedulers/QueueScheduler-spec.ts @@ -1,6 +1,6 @@ import { expect } from 'chai'; import * as sinon from 'sinon'; -import { queueScheduler, Subscription, merge } from 'rxjs'; +import { queueScheduler, Subscription, merge, Subject, Observable, observeOn } from 'rxjs'; import { delay } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; import { observableMatcher } from '../helpers/observableMatcher'; @@ -59,7 +59,7 @@ describe('Scheduler.queue', () => { sandbox.restore(); }); - it('should unsubscribe the rest of the scheduled actions if an action throws an error', () => { + it('should throw error but only unsubscribe the erroring action, not siblings', () => { const actions: Subscription[] = []; let action2Exec = false; let action3Exec = false; @@ -75,10 +75,80 @@ describe('Scheduler.queue', () => { } catch (e) { errorValue = e; } - expect(actions.every((action) => action.closed)).to.be.true; + expect(errorValue).exist; + expect(errorValue.message).to.equal('oops'); + expect(actions[0].closed).to.be.true; + expect(actions[1].closed).to.be.false; + expect(actions[2].closed).to.be.false; expect(action2Exec).to.be.false; expect(action3Exec).to.be.false; - expect(errorValue).exist; + }); + + it('should not unsubscribe sibling actions when one action errors during flush', () => { + const actions: Subscription[] = []; + let errorValue: any; + try { + queue.schedule(() => { + actions.push( + queue.schedule(() => { throw new Error('oops'); }), + queue.schedule(() => {}), + queue.schedule(() => {}) + ); + }); + } catch (e) { + errorValue = e; + } + expect(errorValue).to.exist; expect(errorValue.message).to.equal('oops'); + expect(actions[0].closed).to.be.true; + expect(actions[1].closed).to.be.false; + expect(actions[2].closed).to.be.false; + }); + + it('should execute surviving sibling actions in the next flush after an error', () => { + let action2Exec = false; + let action3Exec = false; + try { + queue.schedule(() => { + queue.schedule(() => { throw new Error('oops'); }); + queue.schedule(() => { action2Exec = true; }); + queue.schedule(() => { action3Exec = true; }); + }); + } catch (e) { + // expected + } + expect(action2Exec).to.be.false; + expect(action3Exec).to.be.false; + + queue.schedule(() => {}); + + expect(action2Exec).to.be.true; + expect(action3Exec).to.be.true; + }); + + it('should not destroy sibling scheduled actions when one errors (NgRx pattern)', () => { + // Replicates the NgRx pattern: an upstream observeOn(queueScheduler) + // delivers a value, and during that delivery, multiple downstream + // subscribers are independently scheduled onto the queue. + // When one subscriber's handler throws, the others should survive. + const results: number[] = []; + + try { + queue.schedule(() => { + // This outer action represents the upstream observeOn delivery. + // During execution, it causes multiple downstream subscribers + // to be independently scheduled (pushed to queue since _active is true). + queue.schedule(() => { throw new Error('subscriber 1 error'); }); + queue.schedule(() => { results.push(42); }); + }); + } catch (e) { + // expected from subscriber 1 + } + + expect(results).to.deep.equal([]); + + // The second action should have survived and executes in the next flush + queue.schedule(() => {}); + expect(results).to.deep.equal([42]); }); }); diff --git a/src/internal/scheduler/AsyncScheduler.ts b/src/internal/scheduler/AsyncScheduler.ts index b05e2e376a..c957558fcf 100644 --- a/src/internal/scheduler/AsyncScheduler.ts +++ b/src/internal/scheduler/AsyncScheduler.ts @@ -43,9 +43,6 @@ export class AsyncScheduler extends Scheduler { this._active = false; if (error) { - while ((action = actions.shift()!)) { - action.unsubscribe(); - } throw error; } } From 145a62f143c5179ed36555e979518cf5dac6d7bd Mon Sep 17 00:00:00 2001 From: Stewart McGown Date: Tue, 28 Apr 2026 23:59:07 +0100 Subject: [PATCH 2/2] fix(scheduler): report errors via reportUnhandledError instead of throwing synchronously MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When AsyncScheduler.flush() throws synchronously, the error propagates back through the synchronous call stack — through QueueAction.schedule(), executeSchedule(), and into the OperatorSubscriber that initiated the flush. This tears down the entire subscriber chain, permanently killing pipelines like NgRx's observeOn(queueScheduler) → scan → State. Change flush() to use reportUnhandledError() instead of throw: - Errors surface asynchronously via config.onUnhandledError or setTimeout (consistent with how RxJS handles unhandled subscriber errors) - The synchronous subscriber chain is not torn down - Remaining queued actions continue executing in the same flush (they are independent operations unaffected by a sibling's error) The erroring action still unsubscribes itself immediately in _execute(). This is analogous to how ConsumerObserver.next already handles subscriber errors — catch and report asynchronously rather than propagating synchronously through the call stack. Made-with: Cursor --- spec/Scheduler-spec.ts | 31 +++-- spec/schedulers/QueueScheduler-spec.ts | 158 ++++++++++++----------- src/internal/scheduler/AsyncScheduler.ts | 13 +- 3 files changed, 115 insertions(+), 87 deletions(-) diff --git a/spec/Scheduler-spec.ts b/spec/Scheduler-spec.ts index 0d0c1819fb..1c705cad2f 100644 --- a/spec/Scheduler-spec.ts +++ b/spec/Scheduler-spec.ts @@ -1,5 +1,6 @@ import { expect } from 'chai'; -import { queueScheduler as queue } from 'rxjs'; +import * as sinon from 'sinon'; +import { queueScheduler as queue, config } from 'rxjs'; import { QueueScheduler } from 'rxjs/internal/scheduler/QueueScheduler'; /** @test {Scheduler} */ @@ -50,17 +51,29 @@ describe('Scheduler.queue', () => { }); it('should be reusable after an error is thrown during execution', (done) => { + const sandbox = sinon.createSandbox(); + const fakeTimer = sandbox.useFakeTimers(); + const errors: any[] = []; + config.onUnhandledError = (err) => errors.push(err); + const results: number[] = []; - expect(() => { - queue.schedule(() => { - results.push(1); - }); + queue.schedule(() => { + results.push(1); + }); - queue.schedule(() => { - throw new Error('bad'); - }); - }).to.throw(Error, 'bad'); + queue.schedule(() => { + throw new Error('bad'); + }); + + // Error is reported asynchronously, not thrown synchronously + expect(results).to.deep.equal([1]); + fakeTimer.tick(0); + expect(errors.length).to.equal(1); + expect(errors[0].message).to.equal('bad'); + + config.onUnhandledError = null; + sandbox.restore(); setTimeout(() => { queue.schedule(() => { diff --git a/spec/schedulers/QueueScheduler-spec.ts b/spec/schedulers/QueueScheduler-spec.ts index 7dddafef4a..53e26bdf61 100644 --- a/spec/schedulers/QueueScheduler-spec.ts +++ b/spec/schedulers/QueueScheduler-spec.ts @@ -1,6 +1,6 @@ import { expect } from 'chai'; import * as sinon from 'sinon'; -import { queueScheduler, Subscription, merge, Subject, Observable, observeOn } from 'rxjs'; +import { queueScheduler, Subscription, merge, Subject, Observable, observeOn, config } from 'rxjs'; import { delay } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; import { observableMatcher } from '../helpers/observableMatcher'; @@ -59,96 +59,108 @@ describe('Scheduler.queue', () => { sandbox.restore(); }); - it('should throw error but only unsubscribe the erroring action, not siblings', () => { - const actions: Subscription[] = []; - let action2Exec = false; - let action3Exec = false; - let errorValue: any = undefined; - try { - queue.schedule(() => { - actions.push( - queue.schedule(() => { throw new Error('oops'); }), - queue.schedule(() => { action2Exec = true; }), - queue.schedule(() => { action3Exec = true; }) - ); - }); - } catch (e) { - errorValue = e; - } - expect(errorValue).exist; - expect(errorValue.message).to.equal('oops'); - expect(actions[0].closed).to.be.true; - expect(actions[1].closed).to.be.false; - expect(actions[2].closed).to.be.false; - expect(action2Exec).to.be.false; - expect(action3Exec).to.be.false; + it('should report errors via reportUnhandledError instead of throwing synchronously', () => { + const sandbox = sinon.createSandbox(); + const fakeTimer = sandbox.useFakeTimers(); + const errors: any[] = []; + config.onUnhandledError = (err) => errors.push(err); + + queue.schedule(() => { + queue.schedule(() => { throw new Error('oops'); }); + }); + + // Error is not thrown synchronously + expect(errors).to.deep.equal([]); + + // Error surfaces asynchronously via onUnhandledError + fakeTimer.tick(0); + expect(errors.length).to.equal(1); + expect(errors[0].message).to.equal('oops'); + + config.onUnhandledError = null; + sandbox.restore(); }); it('should not unsubscribe sibling actions when one action errors during flush', () => { + const sandbox = sinon.createSandbox(); + const fakeTimer = sandbox.useFakeTimers(); + config.onUnhandledError = () => {}; + const actions: Subscription[] = []; - let errorValue: any; - try { - queue.schedule(() => { - actions.push( - queue.schedule(() => { throw new Error('oops'); }), - queue.schedule(() => {}), - queue.schedule(() => {}) - ); - }); - } catch (e) { - errorValue = e; - } - expect(errorValue).to.exist; - expect(errorValue.message).to.equal('oops'); + queue.schedule(() => { + actions.push( + queue.schedule(() => { throw new Error('oops'); }), + queue.schedule(() => {}), + queue.schedule(() => {}) + ); + }); + + fakeTimer.tick(0); + expect(actions[0].closed).to.be.true; expect(actions[1].closed).to.be.false; expect(actions[2].closed).to.be.false; + + config.onUnhandledError = null; + sandbox.restore(); }); - it('should execute surviving sibling actions in the next flush after an error', () => { + it('should continue executing sibling actions in the same flush after an error', () => { + const sandbox = sinon.createSandbox(); + const fakeTimer = sandbox.useFakeTimers(); + config.onUnhandledError = () => {}; + let action2Exec = false; let action3Exec = false; - try { - queue.schedule(() => { - queue.schedule(() => { throw new Error('oops'); }); - queue.schedule(() => { action2Exec = true; }); - queue.schedule(() => { action3Exec = true; }); - }); - } catch (e) { - // expected - } - expect(action2Exec).to.be.false; - expect(action3Exec).to.be.false; - - queue.schedule(() => {}); + queue.schedule(() => { + queue.schedule(() => { throw new Error('oops'); }); + queue.schedule(() => { action2Exec = true; }); + queue.schedule(() => { action3Exec = true; }); + }); + // Siblings execute in the same flush — no need for a second flush expect(action2Exec).to.be.true; expect(action3Exec).to.be.true; + + fakeTimer.tick(0); + config.onUnhandledError = null; + sandbox.restore(); }); - it('should not destroy sibling scheduled actions when one errors (NgRx pattern)', () => { - // Replicates the NgRx pattern: an upstream observeOn(queueScheduler) - // delivers a value, and during that delivery, multiple downstream - // subscribers are independently scheduled onto the queue. - // When one subscriber's handler throws, the others should survive. + it('should not tear down the subscriber chain when a queued action errors (NgRx pattern)', () => { + const sandbox = sinon.createSandbox(); + const fakeTimer = sandbox.useFakeTimers(); + config.onUnhandledError = () => {}; + + const source = new Subject(); const results: number[] = []; - try { - queue.schedule(() => { - // This outer action represents the upstream observeOn delivery. - // During execution, it causes multiple downstream subscribers - // to be independently scheduled (pushed to queue since _active is true). - queue.schedule(() => { throw new Error('subscriber 1 error'); }); - queue.schedule(() => { results.push(42); }); - }); - } catch (e) { - // expected from subscriber 1 - } - - expect(results).to.deep.equal([]); - - // The second action should have survived and executes in the next flush - queue.schedule(() => {}); + const scheduled$ = source.pipe(observeOn(queue)); + + // Subscriber 1 will throw + const sub1 = scheduled$.subscribe({ + next: () => { throw new Error('subscriber 1 error'); } + }); + // Subscriber 2 is innocent + const sub2 = scheduled$.subscribe({ + next: (v) => results.push(v) + }); + + source.next(42); + + fakeTimer.tick(0); + + // The subscriber chain should remain intact — neither subscription + // should have been torn down by the error propagation. + expect(sub1.closed).to.be.false; + expect(sub2.closed).to.be.false; expect(results).to.deep.equal([42]); + + // Verify the store is still alive — subsequent emissions still work + source.next(99); + expect(results).to.deep.equal([42, 99]); + + config.onUnhandledError = null; + sandbox.restore(); }); }); diff --git a/src/internal/scheduler/AsyncScheduler.ts b/src/internal/scheduler/AsyncScheduler.ts index c957558fcf..4ddd82be8b 100644 --- a/src/internal/scheduler/AsyncScheduler.ts +++ b/src/internal/scheduler/AsyncScheduler.ts @@ -1,6 +1,7 @@ import { Scheduler } from '../Scheduler'; import { Action } from './Action'; import { AsyncAction } from './AsyncAction'; +import { reportUnhandledError } from '../util/reportUnhandledError'; import { TimerHandle } from './timerHandle'; export class AsyncScheduler extends Scheduler { @@ -36,14 +37,16 @@ export class AsyncScheduler extends Scheduler { do { if ((error = action.execute(action.state, action.delay))) { - break; + // Report the error asynchronously so it doesn't tear down the + // synchronous subscriber chain (e.g. observeOn(queueScheduler)). + // The erroring action already unsubscribed itself in _execute(). + // Continue flushing remaining actions — they are independent + // operations that should not be affected by a sibling's error. + reportUnhandledError(error); + error = null; } } while ((action = actions.shift()!)); // exhaust the scheduler queue this._active = false; - - if (error) { - throw error; - } } }