Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 22 additions & 9 deletions spec/Scheduler-spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { expect } from 'chai';
import { queueScheduler as queue } from 'rxjs';
import * as sinon from 'sinon';
import { queueScheduler as queue, config } from 'rxjs';
import { QueueScheduler } from 'rxjs/internal/scheduler/QueueScheduler';

/** @test {Scheduler} */
Expand Down Expand Up @@ -50,17 +51,29 @@ describe('Scheduler.queue', () => {
});

it('should be reusable after an error is thrown during execution', (done) => {
const sandbox = sinon.createSandbox();
const fakeTimer = sandbox.useFakeTimers();
const errors: any[] = [];
config.onUnhandledError = (err) => errors.push(err);

const results: number[] = [];

expect(() => {
queue.schedule(() => {
results.push(1);
});
queue.schedule(() => {
results.push(1);
});

queue.schedule(() => {
throw new Error('bad');
});
}).to.throw(Error, 'bad');
queue.schedule(() => {
throw new Error('bad');
});

// Error is reported asynchronously, not thrown synchronously
expect(results).to.deep.equal([1]);
fakeTimer.tick(0);
expect(errors.length).to.equal(1);
expect(errors[0].message).to.equal('bad');

config.onUnhandledError = null;
sandbox.restore();

setTimeout(() => {
queue.schedule(() => {
Expand Down
120 changes: 101 additions & 19 deletions spec/schedulers/QueueScheduler-spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { expect } from 'chai';
import * as sinon from 'sinon';
import { queueScheduler, Subscription, merge } from 'rxjs';
import { queueScheduler, Subscription, merge, Subject, Observable, observeOn, config } from 'rxjs';
import { delay } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from '../helpers/observableMatcher';
Expand Down Expand Up @@ -59,26 +59,108 @@ describe('Scheduler.queue', () => {
sandbox.restore();
});

it('should unsubscribe the rest of the scheduled actions if an action throws an error', () => {
it('should report errors via reportUnhandledError instead of throwing synchronously', () => {
const sandbox = sinon.createSandbox();
const fakeTimer = sandbox.useFakeTimers();
const errors: any[] = [];
config.onUnhandledError = (err) => errors.push(err);

queue.schedule(() => {
queue.schedule(() => { throw new Error('oops'); });
});

// Error is not thrown synchronously
expect(errors).to.deep.equal([]);

// Error surfaces asynchronously via onUnhandledError
fakeTimer.tick(0);
expect(errors.length).to.equal(1);
expect(errors[0].message).to.equal('oops');

config.onUnhandledError = null;
sandbox.restore();
});

it('should not unsubscribe sibling actions when one action errors during flush', () => {
const sandbox = sinon.createSandbox();
const fakeTimer = sandbox.useFakeTimers();
config.onUnhandledError = () => {};

const actions: Subscription[] = [];
queue.schedule(() => {
actions.push(
queue.schedule(() => { throw new Error('oops'); }),
queue.schedule(() => {}),
queue.schedule(() => {})
);
});

fakeTimer.tick(0);

expect(actions[0].closed).to.be.true;
expect(actions[1].closed).to.be.false;
expect(actions[2].closed).to.be.false;

config.onUnhandledError = null;
sandbox.restore();
});

it('should continue executing sibling actions in the same flush after an error', () => {
const sandbox = sinon.createSandbox();
const fakeTimer = sandbox.useFakeTimers();
config.onUnhandledError = () => {};

let action2Exec = false;
let action3Exec = false;
let errorValue: any = undefined;
try {
queue.schedule(() => {
actions.push(
queue.schedule(() => { throw new Error('oops'); }),
queue.schedule(() => { action2Exec = true; }),
queue.schedule(() => { action3Exec = true; })
);
});
} catch (e) {
errorValue = e;
}
expect(actions.every((action) => action.closed)).to.be.true;
expect(action2Exec).to.be.false;
expect(action3Exec).to.be.false;
expect(errorValue).exist;
expect(errorValue.message).to.equal('oops');
queue.schedule(() => {
queue.schedule(() => { throw new Error('oops'); });
queue.schedule(() => { action2Exec = true; });
queue.schedule(() => { action3Exec = true; });
});

// Siblings execute in the same flush — no need for a second flush
expect(action2Exec).to.be.true;
expect(action3Exec).to.be.true;

fakeTimer.tick(0);
config.onUnhandledError = null;
sandbox.restore();
});

it('should not tear down the subscriber chain when a queued action errors (NgRx pattern)', () => {
const sandbox = sinon.createSandbox();
const fakeTimer = sandbox.useFakeTimers();
config.onUnhandledError = () => {};

const source = new Subject<number>();
const results: number[] = [];

const scheduled$ = source.pipe(observeOn(queue));

// Subscriber 1 will throw
const sub1 = scheduled$.subscribe({
next: () => { throw new Error('subscriber 1 error'); }
});
// Subscriber 2 is innocent
const sub2 = scheduled$.subscribe({
next: (v) => results.push(v)
});

source.next(42);

fakeTimer.tick(0);

// The subscriber chain should remain intact — neither subscription
// should have been torn down by the error propagation.
expect(sub1.closed).to.be.false;
expect(sub2.closed).to.be.false;
expect(results).to.deep.equal([42]);

// Verify the store is still alive — subsequent emissions still work
source.next(99);
expect(results).to.deep.equal([42, 99]);

config.onUnhandledError = null;
sandbox.restore();
});
});
16 changes: 8 additions & 8 deletions src/internal/scheduler/AsyncScheduler.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Scheduler } from '../Scheduler';
import { Action } from './Action';
import { AsyncAction } from './AsyncAction';
import { reportUnhandledError } from '../util/reportUnhandledError';
import { TimerHandle } from './timerHandle';

export class AsyncScheduler extends Scheduler {
Expand Down Expand Up @@ -36,17 +37,16 @@ export class AsyncScheduler extends Scheduler {

do {
if ((error = action.execute(action.state, action.delay))) {
break;
// Report the error asynchronously so it doesn't tear down the
// synchronous subscriber chain (e.g. observeOn(queueScheduler)).
// The erroring action already unsubscribed itself in _execute().
// Continue flushing remaining actions — they are independent
// operations that should not be affected by a sibling's error.
reportUnhandledError(error);
error = null;
}
} while ((action = actions.shift()!)); // exhaust the scheduler queue

this._active = false;

if (error) {
while ((action = actions.shift()!)) {
action.unsubscribe();
}
throw error;
}
}
}
Loading