diff --git a/stream-helpers/mod.ts b/stream-helpers/mod.ts index d6a832d9..ee010da4 100644 --- a/stream-helpers/mod.ts +++ b/stream-helpers/mod.ts @@ -14,3 +14,4 @@ export * from "./last.ts"; export * from "./take.ts"; export * from "./take-while.ts"; export * from "./take-until.ts"; +export * from "./throttle.ts"; diff --git a/stream-helpers/package.json b/stream-helpers/package.json index 54bbf357..1dd92abb 100644 --- a/stream-helpers/package.json +++ b/stream-helpers/package.json @@ -1,7 +1,7 @@ { "name": "@effectionx/stream-helpers", "description": "Type-safe stream operators like filter, map, reduce, and forEach", - "version": "0.8.2", + "version": "0.9.0", "keywords": ["effection", "effectionx", "streams", "map", "filter", "reduce"], "type": "module", "main": "./dist/mod.js", diff --git a/stream-helpers/throttle.test.ts b/stream-helpers/throttle.test.ts new file mode 100644 index 00000000..41214e0f --- /dev/null +++ b/stream-helpers/throttle.test.ts @@ -0,0 +1,269 @@ +import { describe, it } from "@effectionx/bdd"; +import { createArraySignal, is } from "@effectionx/signals"; +import { timebox } from "@effectionx/timebox"; +import { expect } from "expect"; +import { createChannel, sleep, spawn } from "effection"; + +import { throttle } from "./throttle.ts"; +import { forEach } from "./for-each.ts"; +import { useFaucet } from "./test-helpers/faucet.ts"; + +interface Emission { + value: T; + time: number; +} + +describe("throttle", () => { + it("emits the first value immediately", function* () { + const source = createChannel(); + const stream = throttle(200)(source); + const subscription = yield* stream; + + const start = performance.now(); + const next = yield* spawn(() => subscription.next()); + yield* source.send(1); + + const result = yield* next; + const elapsed = performance.now() - start; + + expect(result.value).toBe(1); + expect(elapsed).toBeLessThan(50); + }); + + it("suppresses intermediate values and emits trailing", function* () { + const faucet = yield* useFaucet({ open: true }); + const stream = throttle(100)(faucet); + const emissions = yield* createArraySignal>([]); + + yield* spawn(() => + forEach(function* (value) { + emissions.push({ value, time: performance.now() }); + }, stream), + ); + + yield* sleep(0); + + yield* faucet.pour([1, 2, 3, 4, 5]); + + yield* is(emissions, (e) => e.length >= 2); + + const values = emissions.valueOf().map((e) => e.value); + expect(values[0]).toBe(1); + expect(values[1]).toBe(5); + expect(values).toHaveLength(2); + }); + + it("emits trailing value after the window expires", function* () { + const delay = 80; + const faucet = yield* useFaucet({ open: true }); + const stream = throttle(delay)(faucet); + const emissions = yield* createArraySignal>([]); + + yield* spawn(() => + forEach(function* (value) { + emissions.push({ value, time: performance.now() }); + }, stream), + ); + + yield* sleep(0); + + yield* faucet.pour([1, 2, 3]); + + yield* is(emissions, (e) => e.length >= 2); + + const [leading, trailing] = emissions.valueOf(); + const gap = trailing.time - leading.time; + + expect(leading.value).toBe(1); + expect(trailing.value).toBe(3); + expect(gap).toBeGreaterThanOrEqual(delay * 0.8); + }); + + it("does not emit trailing before the remaining delay elapses", function* () { + const delay = 100; + const source = createChannel(); + const stream = throttle(delay)(source); + const subscription = yield* stream; + + yield* spawn(function* () { + yield* sleep(0); + yield* source.send(1); + yield* source.send(2); + }); + + const first = yield* subscription.next(); + expect((first as { value: number }).value).toBe(1); + + // Must NOT resolve within the first half of the window. + const mid = yield* timebox(delay * 0.4, () => subscription.next()); + expect(mid.timeout).toBe(true); + + // After the full window, trailing is available. + const second = yield* subscription.next(); + expect((second as { value: number }).value).toBe(2); + }); + + it("emits at most once per delay window", function* () { + const delay = 60; + const faucet = yield* useFaucet({ open: true }); + const stream = throttle(delay)(faucet); + const emissions = yield* createArraySignal>([]); + + yield* spawn(() => + forEach(function* (value) { + emissions.push({ value, time: performance.now() }); + }, stream), + ); + + yield* sleep(0); + + // Two rapid bursts — the pump blocks on its next leading pull + // after the first burst's trailing, so the second pour triggers + // a fresh window. + yield* faucet.pour([1, 2, 3]); + yield* is(emissions, (e) => e.length >= 2); + yield* faucet.pour([10, 20, 30]); + yield* is(emissions, (e) => e.some((v) => v.value === 30)); + + const times = emissions.valueOf().map((e) => e.time); + for (let i = 1; i < times.length; i++) { + const gap = times[i] - times[i - 1]; + expect(gap).toBeGreaterThanOrEqual(delay * 0.8); + } + }); + + it("handles upstream completion during the window", function* () { + const source = createChannel(); + const stream = throttle(200)(source); + const emissions = yield* createArraySignal>([]); + + yield* spawn(() => + forEach(function* (value) { + emissions.push({ value, time: performance.now() }); + }, stream), + ); + + yield* sleep(0); + + yield* source.send(1); + yield* source.send(2); + yield* source.send(3); + yield* source.close(); + + yield* is(emissions, (e) => e.some((v) => v.value === 3)); + + const values = emissions.valueOf().map((e) => e.value); + expect(values).toContain(1); + expect(values).toContain(3); + }); + + it("closes only after trailing emission is handled", function* () { + const source = createChannel(); + const stream = throttle(200)(source); + const subscription = yield* stream; + + yield* spawn(function* () { + yield* sleep(0); + yield* source.send("a"); + yield* source.send("b"); + yield* source.close(42); + }); + + const first = yield* subscription.next(); + expect(first).toEqual({ done: false, value: "a" }); + + const second = yield* subscription.next(); + expect(second).toEqual({ done: false, value: "b" }); + + const third = yield* subscription.next(); + expect(third).toEqual({ done: true, value: 42 }); + }); + + it("buffers the latest window value, not the oldest", function* () { + const source = createChannel(); + const stream = throttle(100)(source); + const subscription = yield* stream; + + yield* spawn(function* () { + yield* sleep(0); + yield* source.send(1); + yield* source.send(2); + yield* source.send(3); + }); + + // Leading value — returned immediately. + const first = yield* subscription.next(); + expect(first).toEqual({ done: false, value: 1 }); + + // The pump absorbs 2 and 3 during its window. output.shift() + // blocks until the pump pushes the trailing value. + const second = yield* subscription.next(); + expect(second).toEqual({ done: false, value: 3 }); + }); + + it("enforces spacing when consumer drains a backlog", function* () { + const delay = 60; + const source = createChannel(); + const stream = throttle(delay)(source); + const subscription = yield* stream; + + yield* spawn(function* () { + yield* sleep(0); + for (let i = 1; i <= 6; i++) { + yield* source.send(i); + } + }); + + // Drain rapidly and record emission timestamps. output.shift() + // blocks until the pump pushes each item, and the delay gate + // enforces spacing between consecutive emissions. + const emissions: Emission[] = []; + const r1 = yield* subscription.next(); + emissions.push({ + value: (r1 as { value: number }).value, + time: performance.now(), + }); + + const r2 = yield* subscription.next(); + emissions.push({ + value: (r2 as { value: number }).value, + time: performance.now(), + }); + + // Verify values are the leading+trailing from the windows + expect(emissions[0].value).toBe(1); + expect(emissions[1].value).toBe(6); + + // The gap between the two emissions must respect delayMS even + // though both values were already buffered. + const gap = emissions[1].time - emissions[0].time; + expect(gap).toBeGreaterThanOrEqual(delay * 0.8); + }); + + it("passes through values spaced beyond the delay", function* () { + const delay = 20; + const faucet = yield* useFaucet({ open: true }); + const stream = throttle(delay)(faucet); + const results = yield* createArraySignal([]); + + yield* spawn(() => + forEach(function* (value) { + results.push(value); + }, stream), + ); + + yield* sleep(0); + + yield* faucet.pour(function* (send) { + yield* send(1); + yield* sleep(50); + yield* send(2); + yield* sleep(50); + yield* send(3); + }); + + yield* is(results, (r) => r.length >= 3); + + expect(results.valueOf()).toEqual([1, 2, 3]); + }); +}); diff --git a/stream-helpers/throttle.ts b/stream-helpers/throttle.ts new file mode 100644 index 00000000..f853b1dc --- /dev/null +++ b/stream-helpers/throttle.ts @@ -0,0 +1,131 @@ +import { timebox } from "@effectionx/timebox"; +import { createArraySignal } from "@effectionx/signals"; +import { type Operation, type Stream, sleep, spawn } from "effection"; + +/** + * A tagged output item. `flush` marks values that should bypass + * consumer-side delay enforcement (stream-completion trailing and the + * done sentinel). + */ +interface OutputItem { + result: IteratorResult; + flush: boolean; +} + +/** + * Throttles a stream to emit at most one value per `delayMS` milliseconds. + * + * Uses leading+trailing semantics: + * - The first upstream value is emitted immediately (leading edge). + * - While the throttle window is open, upstream values are consumed eagerly + * and only the latest is buffered. + * - After the window expires, the buffered value is emitted (trailing edge), + * which opens a new window. + * - Two emissions are never closer together than `delayMS`, both at the + * pump side (window timing) and at the consumer side (delay gate in + * `next()`), so a slow consumer cannot drain a backlog instantly. + * + * Stream-completion exception: if the upstream closes during an open window, + * the trailing value (if any) is emitted promptly without waiting for the + * remaining delay, and `done` follows on the next pull. This avoids adding + * artificial latency before propagating the close signal. + * + * @param delayMS - minimum milliseconds between emissions + */ +export function throttle( + delayMS: number, +): (stream: Stream) => Stream { + return (stream: Stream): Stream => ({ + *[Symbol.iterator]() { + const subscription = yield* stream; + const output = yield* createArraySignal>([]); + + // ── pump ────────────────────────────────────────────────────── + // A persistent background task that owns all upstream reads. + // It alternates between two phases: + // 1. Pull the next upstream value and push it (leading edge). + // 2. Open a window for delayMS: consume upstream, keep only + // the latest, then push it when the window expires + // (trailing edge). + yield* spawn(function* () { + while (true) { + // ── leading edge ──────────────────────────────────────── + const first = yield* subscription.next(); + if (first.done) { + output.push({ result: first, flush: true }); + return; + } + output.push({ + result: { done: false as const, value: first.value }, + flush: false, + }); + + // ── absorption window ─────────────────────────────────── + let trailing: A | undefined; + let hasTrailing = false; + const windowStart = performance.now(); + + while (true) { + const remaining = delayMS - (performance.now() - windowStart); + if (remaining <= 0) break; + + const tb = yield* timebox(remaining, () => subscription.next()); + + if (tb.timeout) { + break; + } + + if (tb.value.done) { + // Stream closed during window — flush trailing, then done + if (hasTrailing) { + output.push({ + result: { done: false as const, value: trailing as A }, + flush: true, + }); + } + output.push({ result: tb.value, flush: true }); + return; + } + + trailing = tb.value.value; + hasTrailing = true; + } + + // ── trailing edge ─────────────────────────────────────── + if (hasTrailing) { + output.push({ + result: { done: false as const, value: trailing as A }, + flush: false, + }); + } + } + }); + + // ── consumer-side delay gate ─────────────────────────────── + let lastEmitTime: number | undefined; + + return { + *next(): Operation> { + const { result, flush } = yield* output.shift(); + + // Enforce minimum spacing between non-flush emissions. + // The first emission (lastEmitTime undefined) passes through + // immediately. Flush items (stream-completion trailing and + // done) bypass the gate so close is not artificially delayed. + if (!result.done && !flush && lastEmitTime !== undefined) { + const wait = delayMS - (performance.now() - lastEmitTime); + if (wait > 0) { + yield* sleep(wait); + } + } + + if (!result.done) { + lastEmitTime = performance.now(); + } + + return result; + }, + }; + }, + }); +}