Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions stream-helpers/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ export * from "./last.ts";
export * from "./take.ts";
export * from "./take-while.ts";
export * from "./take-until.ts";
export * from "./throttle.ts";
2 changes: 1 addition & 1 deletion stream-helpers/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@effectionx/stream-helpers",
"description": "Type-safe stream operators like filter, map, reduce, and forEach",
"version": "0.8.2",
"version": "0.9.0",
"keywords": ["effection", "effectionx", "streams", "map", "filter", "reduce"],
"type": "module",
"main": "./dist/mod.js",
Expand Down
236 changes: 236 additions & 0 deletions stream-helpers/throttle.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
import { describe, it } from "@effectionx/bdd";
import { createArraySignal, is } from "@effectionx/signals";
import { expect } from "expect";
import { createChannel, sleep, spawn } from "effection";

import { throttle } from "./throttle.ts";
import { forEach } from "./for-each.ts";
import { useFaucet } from "./test-helpers/faucet.ts";

interface Emission<T> {
value: T;
time: number;
}

describe("throttle", () => {
it("emits the first value immediately", function* () {
const source = createChannel<number, never>();
const stream = throttle<number>(200)(source);
const subscription = yield* stream;

const start = performance.now();
const next = yield* spawn(() => subscription.next());
yield* source.send(1);

const result = yield* next;
const elapsed = performance.now() - start;

expect(result.value).toBe(1);
expect(elapsed).toBeLessThan(50);
});

it("suppresses intermediate values and emits trailing", function* () {
const faucet = yield* useFaucet<number>({ open: true });
const stream = throttle<number>(100)(faucet);
const emissions = yield* createArraySignal<Emission<number>>([]);

yield* spawn(() =>
forEach(function* (value) {
emissions.push({ value, time: performance.now() });
}, stream),
);

yield* sleep(0);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a sleep. Maybe use a resource for your loop?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made forEach a resource in #209 — how does this look? It'd need to be merged before this PR.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd hate to lose synchronous foreach and would prefer it to stay blocking.

I think there are a couple of ways to have our cake and eat it too.

  1. have both a foreground and background versions
  • forEach<T, TClose>(stream: Stream<T, TClose>, body): Operation<TClose> -- foregroundu
  • useForEach<T, TClose>(stream: Stream<T, TClose>, body): Future<TClose> -- background
  1. allow forEach to accept both a subscription and a stream. That allows you to subscribe first and then spawn safely in the background.

  2. add a "live stream" helper that takes a stream which does (2) for you

useBoundStream(stream: Stream): Stream

not sure what a good name for this is, but it takes a stream, subscribes to it, and then return a stream that returns that subscription. This ensures that any stream interfaces can use it it the background while ensuring that they do not miss any items.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about just useStream?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does that indicate enough context? I worry that the programmer (ai or otherwise) might see that and think: "well of course I want to use the stream"

What differenties this from a normal steam is that it laid over a "live" subscription.

  • useSubscribedStream()
  • useLiveStream()
  • useUniqueStream()

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In observables word, I believe these are called hot observables. We could use that terminology to reduce naming dissonance for similar concepts.

useHotStream()

Sounds steamy.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, useHotStream() sounds kinda gross tbqh :)


yield* faucet.pour([1, 2, 3, 4, 5]);

yield* is(emissions, (e) => e.length >= 2);

const values = emissions.valueOf().map((e) => e.value);
expect(values[0]).toBe(1);
expect(values[1]).toBe(5);
expect(values).toHaveLength(2);
});

it("emits trailing value after the window expires", function* () {
const delay = 80;
const faucet = yield* useFaucet<number>({ open: true });
const stream = throttle<number>(delay)(faucet);
const emissions = yield* createArraySignal<Emission<number>>([]);

yield* spawn(() =>
forEach(function* (value) {
emissions.push({ value, time: performance.now() });
}, stream),
);

yield* sleep(0);

yield* faucet.pour([1, 2, 3]);

yield* is(emissions, (e) => e.length >= 2);

const [leading, trailing] = emissions.valueOf();
const gap = trailing.time - leading.time;

expect(leading.value).toBe(1);
expect(trailing.value).toBe(3);
expect(gap).toBeGreaterThanOrEqual(delay * 0.8);
});

it("does not emit trailing before the remaining delay elapses", function* () {
const delay = 100;
const faucet = yield* useFaucet<number>({ open: true });
const stream = throttle<number>(delay)(faucet);
const emissions = yield* createArraySignal<Emission<number>>([]);

yield* spawn(() =>
forEach(function* (value) {
emissions.push({ value, time: performance.now() });
}, stream),
);

yield* sleep(0);

yield* faucet.pour([1, 2]);

// Checkpoint inside the window: only the leading value should exist.
// sleep() here creates a timing scenario, not waiting for a result.
yield* sleep(delay * 0.4);
expect(emissions.valueOf()).toHaveLength(1);
expect(emissions.valueOf()[0].value).toBe(1);

// Now wait for trailing to actually arrive
yield* is(emissions, (e) => e.length >= 2);
expect(emissions.valueOf()[1].value).toBe(2);
});
Comment thread
coderabbitai[bot] marked this conversation as resolved.

it("emits at most once per delay window", function* () {
const delay = 60;
const faucet = yield* useFaucet<number>({ open: true });
const stream = throttle<number>(delay)(faucet);
const emissions = yield* createArraySignal<Emission<number>>([]);

yield* spawn(() =>
forEach(function* (value) {
emissions.push({ value, time: performance.now() });
}, stream),
);

yield* sleep(0);

// Two rapid bursts separated by a gap longer than the delay
yield* faucet.pour([1, 2, 3]);
yield* is(emissions, (e) => e.length >= 2);
yield* sleep(delay + 20);
yield* faucet.pour([10, 20, 30]);
yield* is(emissions, (e) => e.some((v) => v.value === 30));

const times = emissions.valueOf().map((e) => e.time);
for (let i = 1; i < times.length; i++) {
const gap = times[i] - times[i - 1];
expect(gap).toBeGreaterThanOrEqual(delay * 0.8);
}
});

it("handles upstream completion during the window", function* () {
const source = createChannel<number, void>();
const stream = throttle<number>(200)(source);
const emissions = yield* createArraySignal<Emission<number>>([]);

yield* spawn(() =>
forEach(function* (value) {
emissions.push({ value, time: performance.now() });
}, stream),
);

yield* sleep(0);

yield* source.send(1);
yield* source.send(2);
yield* source.send(3);
yield* source.close();

yield* is(emissions, (e) => e.some((v) => v.value === 3));

const values = emissions.valueOf().map((e) => e.value);
expect(values).toContain(1);
expect(values).toContain(3);
});

it("closes only after trailing emission is handled", function* () {
const source = createChannel<string, number>();
const stream = throttle<string>(200)(source);
const subscription = yield* stream;

yield* spawn(function* () {
yield* sleep(0);
yield* source.send("a");
yield* source.send("b");
yield* source.close(42);
});

const first = yield* subscription.next();
expect(first).toEqual({ done: false, value: "a" });

const second = yield* subscription.next();
expect(second).toEqual({ done: false, value: "b" });

const third = yield* subscription.next();
expect(third).toEqual({ done: true, value: 42 });
});

it("yields the latest window value when consumer is slower than the window", function* () {
const source = createChannel<number, never>();
const stream = throttle<number>(100)(source);
const subscription = yield* stream;

// Pump three values in a spawned task so they queue up while the
// consumer is idle.
yield* spawn(function* () {
yield* sleep(0);
yield* source.send(1);
yield* source.send(2);
yield* source.send(3);
});

// Leading value — returned immediately.
const first = yield* subscription.next();
expect(first).toEqual({ done: false, value: 1 });

// Wait well beyond delayMS so the window has long expired.
yield* sleep(500);

// Must be the latest value the absorber saw during the window, not
// the oldest queued one.
const second = yield* subscription.next();
expect(second).toEqual({ done: false, value: 3 });
});

it("passes through values spaced beyond the delay", function* () {
const delay = 20;
const faucet = yield* useFaucet<number>({ open: true });
const stream = throttle<number>(delay)(faucet);
const results = yield* createArraySignal<number>([]);

yield* spawn(() =>
forEach(function* (value) {
results.push(value);
}, stream),
);

yield* sleep(0);

yield* faucet.pour(function* (send) {
yield* send(1);
yield* sleep(50);
yield* send(2);
yield* sleep(50);
yield* send(3);
});

yield* is(results, (r) => r.length >= 3);

expect(results.valueOf()).toEqual([1, 2, 3]);
});
});
91 changes: 91 additions & 0 deletions stream-helpers/throttle.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import { timebox } from "@effectionx/timebox";
import { createArraySignal } from "@effectionx/signals";
import { type Stream, spawn } from "effection";

/**
* Throttles a stream to emit at most one value per `delayMS` milliseconds.
*
* Uses leading+trailing semantics:
* - The first upstream value is emitted immediately (leading edge).
* - While the throttle window is open, upstream values are consumed eagerly
* and only the latest is buffered.
* - After the window expires, the buffered value is emitted (trailing edge),
* which opens a new window.
* - Two emissions are never closer together than `delayMS`.
*
* Stream-completion exception: if the upstream closes during an open window,
* the trailing value (if any) is emitted promptly without waiting for the
* remaining delay, and `done` follows on the next pull. This avoids adding
* artificial latency before propagating the close signal.
*
* @param delayMS - minimum milliseconds between emissions
*/
export function throttle<A>(
delayMS: number,
): <TClose>(stream: Stream<A, TClose>) => Stream<A, TClose> {
return <TClose>(stream: Stream<A, TClose>): Stream<A, TClose> => ({
*[Symbol.iterator]() {
const subscription = yield* stream;
const output = yield* createArraySignal<IteratorResult<A, TClose>>([]);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Persist the terminal result after the queue drains.

The terminal IteratorResult is only queued once. After a consumer shifts that item, later next() calls block forever because the pump has already returned and nothing can push into output again. Keep a doneResult stash and replay it whenever the queue is empty.

🛠️ Proposed fix
     *[Symbol.iterator]() {
       const subscription = yield* stream;
       const output = yield* createArraySignal<IteratorResult<A, TClose>>([]);
+      let doneResult: IteratorReturnResult<TClose> | undefined;
 
       // ── pump ──────────────────────────────────────────────────────
       // A persistent background task that owns all upstream reads.
@@
           // ── leading edge ────────────────────────────────────────
           const first = yield* subscription.next();
           if (first.done) {
+            doneResult = first;
             output.push(first);
             return;
           }
@@
             if (tb.value.done) {
               // Stream closed during window — flush trailing, then done
               if (hasTrailing) {
                 output.push({ done: false as const, value: trailing as A });
               }
+              doneResult = tb.value;
               output.push(tb.value);
               return;
             }
@@
       return {
         *next() {
+          if (doneResult !== undefined && output.length === 0) {
+            return doneResult;
+          }
           return yield* output.shift();
         },
       };

Also applies to: 42-44, 63-69, 85-87

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@stream-helpers/throttle.ts` around lines 29 - 30, The terminal IteratorResult
currently gets queued only once (created via createArraySignal and referenced as
output) so after a consumer shifts it future next() calls hang; modify the
throttle/pump logic to stash the terminal result in a doneResult variable when
you enqueue the final { value, done: true } and, whenever emitting to output (or
when consumers call next and output would be empty), if doneResult is set then
replay/emit that doneResult instead of leaving the queue empty; update the code
paths around the pump and any places that push terminal values (the spots
corresponding to the createArraySignal<IteratorResult<A, TClose>> output usage
and the pump/emit logic) to check doneResult before returning or pushing new
items so the terminal result is durable for subsequent next() calls.

// ── pump ──────────────────────────────────────────────────────
// A persistent background task that owns all upstream reads.
// It alternates between two phases:
// 1. Pull the next upstream value and push it (leading edge).
// 2. Open a window for delayMS: consume upstream, keep only
// the latest, then push it when the window expires
// (trailing edge).
yield* spawn(function* () {
while (true) {
// ── leading edge ────────────────────────────────────────
const first = yield* subscription.next();
if (first.done) {
output.push(first);
return;
}
output.push({ done: false as const, value: first.value });

// ── absorption window ───────────────────────────────────
let trailing: A | undefined;
let hasTrailing = false;
const windowStart = performance.now();

while (true) {
const remaining = delayMS - (performance.now() - windowStart);
if (remaining <= 0) break;

const tb = yield* timebox(remaining, () => subscription.next());

if (tb.timeout) {
break;
}

if (tb.value.done) {
// Stream closed during window — flush trailing, then done
if (hasTrailing) {
output.push({ done: false as const, value: trailing as A });
}
output.push(tb.value);
return;
}

trailing = tb.value.value;
hasTrailing = true;
}

// ── trailing edge ───────────────────────────────────────
if (hasTrailing) {
output.push({ done: false as const, value: trailing as A });
}
}
});

// ── subscription ─────────────────────────────────────────────
return {
*next() {
return yield* output.shift();
},
};
},
});
}
Loading