diff --git a/spec/Scheduler-spec.ts b/spec/Scheduler-spec.ts index 0d0c1819fb..1c705cad2f 100644 --- a/spec/Scheduler-spec.ts +++ b/spec/Scheduler-spec.ts @@ -1,5 +1,6 @@ import { expect } from 'chai'; -import { queueScheduler as queue } from 'rxjs'; +import * as sinon from 'sinon'; +import { queueScheduler as queue, config } from 'rxjs'; import { QueueScheduler } from 'rxjs/internal/scheduler/QueueScheduler'; /** @test {Scheduler} */ @@ -50,17 +51,29 @@ describe('Scheduler.queue', () => { }); it('should be reusable after an error is thrown during execution', (done) => { + const sandbox = sinon.createSandbox(); + const fakeTimer = sandbox.useFakeTimers(); + const errors: any[] = []; + config.onUnhandledError = (err) => errors.push(err); + const results: number[] = []; - expect(() => { - queue.schedule(() => { - results.push(1); - }); + queue.schedule(() => { + results.push(1); + }); - queue.schedule(() => { - throw new Error('bad'); - }); - }).to.throw(Error, 'bad'); + queue.schedule(() => { + throw new Error('bad'); + }); + + // Error is reported asynchronously, not thrown synchronously + expect(results).to.deep.equal([1]); + fakeTimer.tick(0); + expect(errors.length).to.equal(1); + expect(errors[0].message).to.equal('bad'); + + config.onUnhandledError = null; + sandbox.restore(); setTimeout(() => { queue.schedule(() => { diff --git a/spec/schedulers/QueueScheduler-spec.ts b/spec/schedulers/QueueScheduler-spec.ts index 2d0e9a0059..53e26bdf61 100644 --- a/spec/schedulers/QueueScheduler-spec.ts +++ b/spec/schedulers/QueueScheduler-spec.ts @@ -1,6 +1,6 @@ import { expect } from 'chai'; import * as sinon from 'sinon'; -import { queueScheduler, Subscription, merge } from 'rxjs'; +import { queueScheduler, Subscription, merge, Subject, Observable, observeOn, config } from 'rxjs'; import { delay } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; import { observableMatcher } from '../helpers/observableMatcher'; @@ -59,26 +59,108 @@ describe('Scheduler.queue', () => { sandbox.restore(); }); - it('should unsubscribe the rest of the scheduled actions if an action throws an error', () => { + it('should report errors via reportUnhandledError instead of throwing synchronously', () => { + const sandbox = sinon.createSandbox(); + const fakeTimer = sandbox.useFakeTimers(); + const errors: any[] = []; + config.onUnhandledError = (err) => errors.push(err); + + queue.schedule(() => { + queue.schedule(() => { throw new Error('oops'); }); + }); + + // Error is not thrown synchronously + expect(errors).to.deep.equal([]); + + // Error surfaces asynchronously via onUnhandledError + fakeTimer.tick(0); + expect(errors.length).to.equal(1); + expect(errors[0].message).to.equal('oops'); + + config.onUnhandledError = null; + sandbox.restore(); + }); + + it('should not unsubscribe sibling actions when one action errors during flush', () => { + const sandbox = sinon.createSandbox(); + const fakeTimer = sandbox.useFakeTimers(); + config.onUnhandledError = () => {}; + const actions: Subscription[] = []; + queue.schedule(() => { + actions.push( + queue.schedule(() => { throw new Error('oops'); }), + queue.schedule(() => {}), + queue.schedule(() => {}) + ); + }); + + fakeTimer.tick(0); + + expect(actions[0].closed).to.be.true; + expect(actions[1].closed).to.be.false; + expect(actions[2].closed).to.be.false; + + config.onUnhandledError = null; + sandbox.restore(); + }); + + it('should continue executing sibling actions in the same flush after an error', () => { + const sandbox = sinon.createSandbox(); + const fakeTimer = sandbox.useFakeTimers(); + config.onUnhandledError = () => {}; + let action2Exec = false; let action3Exec = false; - let errorValue: any = undefined; - try { - queue.schedule(() => { - actions.push( - queue.schedule(() => { throw new Error('oops'); }), - queue.schedule(() => { action2Exec = true; }), - queue.schedule(() => { action3Exec = true; }) - ); - }); - } catch (e) { - errorValue = e; - } - expect(actions.every((action) => action.closed)).to.be.true; - expect(action2Exec).to.be.false; - expect(action3Exec).to.be.false; - expect(errorValue).exist; - expect(errorValue.message).to.equal('oops'); + queue.schedule(() => { + queue.schedule(() => { throw new Error('oops'); }); + queue.schedule(() => { action2Exec = true; }); + queue.schedule(() => { action3Exec = true; }); + }); + + // Siblings execute in the same flush — no need for a second flush + expect(action2Exec).to.be.true; + expect(action3Exec).to.be.true; + + fakeTimer.tick(0); + config.onUnhandledError = null; + sandbox.restore(); + }); + + it('should not tear down the subscriber chain when a queued action errors (NgRx pattern)', () => { + const sandbox = sinon.createSandbox(); + const fakeTimer = sandbox.useFakeTimers(); + config.onUnhandledError = () => {}; + + const source = new Subject(); + const results: number[] = []; + + const scheduled$ = source.pipe(observeOn(queue)); + + // Subscriber 1 will throw + const sub1 = scheduled$.subscribe({ + next: () => { throw new Error('subscriber 1 error'); } + }); + // Subscriber 2 is innocent + const sub2 = scheduled$.subscribe({ + next: (v) => results.push(v) + }); + + source.next(42); + + fakeTimer.tick(0); + + // The subscriber chain should remain intact — neither subscription + // should have been torn down by the error propagation. + expect(sub1.closed).to.be.false; + expect(sub2.closed).to.be.false; + expect(results).to.deep.equal([42]); + + // Verify the store is still alive — subsequent emissions still work + source.next(99); + expect(results).to.deep.equal([42, 99]); + + config.onUnhandledError = null; + sandbox.restore(); }); }); diff --git a/src/internal/scheduler/AsyncScheduler.ts b/src/internal/scheduler/AsyncScheduler.ts index b05e2e376a..4ddd82be8b 100644 --- a/src/internal/scheduler/AsyncScheduler.ts +++ b/src/internal/scheduler/AsyncScheduler.ts @@ -1,6 +1,7 @@ import { Scheduler } from '../Scheduler'; import { Action } from './Action'; import { AsyncAction } from './AsyncAction'; +import { reportUnhandledError } from '../util/reportUnhandledError'; import { TimerHandle } from './timerHandle'; export class AsyncScheduler extends Scheduler { @@ -36,17 +37,16 @@ export class AsyncScheduler extends Scheduler { do { if ((error = action.execute(action.state, action.delay))) { - break; + // Report the error asynchronously so it doesn't tear down the + // synchronous subscriber chain (e.g. observeOn(queueScheduler)). + // The erroring action already unsubscribed itself in _execute(). + // Continue flushing remaining actions — they are independent + // operations that should not be affected by a sibling's error. + reportUnhandledError(error); + error = null; } } while ((action = actions.shift()!)); // exhaust the scheduler queue this._active = false; - - if (error) { - while ((action = actions.shift()!)) { - action.unsubscribe(); - } - throw error; - } } }