Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 10 additions & 18 deletions stream-helpers/batch.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { describe, it } from "@effectionx/bdd";
import { createArraySignal, is } from "@effectionx/signals";
import { expect } from "expect";
import { createChannel, sleep, spawn } from "effection";
import { expect } from "expect";

import { batch } from "./batch.ts";
import { forEach } from "./for-each.ts";
Expand Down Expand Up @@ -47,17 +47,13 @@ describe("batch", () => {

let last = performance.now();

yield* spawn(() =>
forEach<readonly number[], void>(function* (batch) {
const now = performance.now();
windows.push(now - last);
last = now;
yield* forEach<readonly number[], void>(function* (batch) {
const now = performance.now();
windows.push(now - last);
last = now;

batches.push(batch);
}, stream),
);

yield* sleep(0);
batches.push(batch);
}, stream);

yield* faucet.pour(function* (send) {
for (let i = 1; i <= 10; i++) {
Expand All @@ -83,13 +79,9 @@ describe("batch", () => {

const batches = yield* createArraySignal<readonly number[]>([]);

yield* spawn(() =>
forEach<readonly number[], void>(function* (batch) {
batches.push(batch);
}, stream),
);

yield* sleep(0);
yield* forEach<readonly number[], void>(function* (batch) {
batches.push(batch);
}, stream);

yield* faucet.pour([1, 2, 3, 4, 5, 6]);

Expand Down
6 changes: 3 additions & 3 deletions stream-helpers/for-each.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { createChannel, sleep, spawn } from "effection";
import { describe, it } from "@effectionx/bdd";
import { createChannel, sleep, spawn } from "effection";
import { expect } from "expect";

import { forEach } from "./for-each.ts";
Expand All @@ -19,7 +19,7 @@ describe("forEach", () => {
yield* stream.close();
});

yield* forEach(function* (item: number) {
yield* yield* forEach(function* (item: number) {
processedItems.push(item);
}, stream);

Expand All @@ -36,7 +36,7 @@ describe("forEach", () => {
yield* stream.close(42); // Close with value 42
});

const closeValue = yield* forEach(function* () {}, stream);
const closeValue = yield* yield* forEach(function* () {}, stream);
expect(closeValue).toBe(42);
});
});
42 changes: 27 additions & 15 deletions stream-helpers/for-each.ts
Original file line number Diff line number Diff line change
@@ -1,42 +1,54 @@
import type { Operation, Stream } from "effection";
import {
type Operation,
type Stream,
type Task,
resource,
spawn,
} from "effection";

/**
* Invoke a function for each item passing through the stream.
* Subscribe to a stream and invoke a function for each item, returning a
* {@link Task} that resolves to the stream's close value.
*
* Because `forEach` is a resource, the subscription is established as soon as
* it is yielded, **before** the consumer loop begins. This makes it safe to
* use without an extra `spawn` + `sleep(0)` dance.
*
* @template T - The type of items in the stream
* @template TClose - The type of the close value returned when the stream ends
* @param fn - A function that processes each item from the stream.
* @param stream: A stream to process
* @param stream - A stream to process
*
* @example
* ```typescript
* import { forEach } from "./for-each.ts";
* import { createSignal } from "effection";
*
* // Process items from a stream
* const stream = createSignal<number, void>();
* // Background usage – subscribes and runs without blocking
* yield* forEach(function*(item) {
* console.log(`Processing: ${item}`);
* }, stream);
*
* yield* spawn(() => forEach(function*(item) {
* // Blocking usage – waits until the stream closes
* yield* (yield* forEach(function*(item) {
* console.log(`Processing: ${item}`);
* }, stream));
*
* yield* stream.send(1);
* yield* stream.send(2);
* ```
*/
export function forEach<T, TClose>(
fn: (item: T) => Operation<void>,
stream: Stream<T, TClose>,
): Operation<TClose> {
return {
*[Symbol.iterator]() {
let subscription = yield* stream;
): Operation<Task<TClose>> {
return resource(function* (provide) {
const subscription = yield* stream;
const task = yield* spawn(function* (): Operation<TClose> {
let next = yield* subscription.next();
while (!next.done) {
yield* fn(next.value);
next = yield* subscription.next();
}
return next.value;
},
};
});
yield* provide(task);
});
Comment on lines +42 to +53
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# Find and examine the for-each.ts file
find . -name "for-each.ts" -type f

Repository: thefrontside/effectionx

Length of output: 95


🏁 Script executed:

# Also locate any test files for forEach
find . -name "*for-each*" -o -name "*forEach*" | grep -E "\.(test|spec)\.(ts|tsx)?"

Repository: thefrontside/effectionx

Length of output: 100


🏁 Script executed:

# Check the directory structure for stream-helpers
ls -la stream-helpers/ 2>/dev/null || find . -type d -name "stream-helpers"

Repository: thefrontside/effectionx

Length of output: 2466


🏁 Script executed:

cat -n stream-helpers/for-each.ts

Repository: thefrontside/effectionx

Length of output: 2071


🏁 Script executed:

cat -n stream-helpers/for-each.test.ts

Repository: thefrontside/effectionx

Length of output: 1521


🏁 Script executed:

cat -n stream-helpers/drain.ts

Repository: thefrontside/effectionx

Length of output: 1482


🏁 Script executed:

# Search for patterns where provide() is used with spawn
rg "provide\(" stream-helpers/ -B 5 -A 2

Repository: thefrontside/effectionx

Length of output: 890


🏁 Script executed:

# Look for tests that halt or cancel tasks from stream operations
rg "\.halt\(\)" stream-helpers/ -B 3 -A 3

Repository: thefrontside/effectionx

Length of output: 523


🏁 Script executed:

# Check other stream operations for comparison with forEach pattern
cat -n stream-helpers/reduce.ts

Repository: thefrontside/effectionx

Length of output: 1868


🏁 Script executed:

# Look for documentation or tests about resource lifecycle
rg "resource\(" stream-helpers/ -B 2 -A 5

Repository: thefrontside/effectionx

Length of output: 1046


🏁 Script executed:

# Check policies directory for guidance
find . -path "*policies*" -name "*.md" | head -20

Repository: thefrontside/effectionx

Length of output: 702


🏁 Script executed:

# Look at the stream-helpers README
cat stream-helpers/README.md | head -100

Repository: thefrontside/effectionx

Length of output: 2706


🏁 Script executed:

# Search for other resource patterns with spawn
rg "spawn\(" . -B 10 -A 10 | grep -A 15 "resource"

Repository: thefrontside/effectionx

Length of output: 6048


🏁 Script executed:

# Check if there are examples of task.halt() in tests
rg "task\.halt\|\.halt\(\)" . --type ts -B 5 -A 5 | head -80

Repository: thefrontside/effectionx

Length of output: 49


🏁 Script executed:

# Read the relevant policies
cat .policies/ergonomics.md

Repository: thefrontside/effectionx

Length of output: 4267


🏁 Script executed:

cat .policies/structured-concurrency.md

Repository: thefrontside/effectionx

Length of output: 3956


🏁 Script executed:

# Look for any examples of Task halt behavior in tests
rg "\.halt\(\)" . --type ts -A 3 -B 3

Repository: thefrontside/effectionx

Length of output: 4099


🏁 Script executed:

# Check parallel.ts to see how similar resource/spawn patterns handle this
cat -n ./fx/parallel.ts | head -80

Repository: thefrontside/effectionx

Length of output: 3030


🏁 Script executed:

# Verify Effection's resource lifecycle semantics
rg "resource\(" . --type ts -A 8 | grep -A 8 "provide" | head -50

Repository: thefrontside/effectionx

Length of output: 2468


🏁 Script executed:

# Check if there are any comments or docs about this pattern in Effection
rg "resource.*spawn.*provide" . -C 5

Repository: thefrontside/effectionx

Length of output: 49


🏁 Script executed:

# Look for any issues or discussions about task.halt with resources
find . -name "*.md" -type f -exec grep -l "halt\|cancellation\|cleanup" {} \; | grep -E "README|GUIDE|DESIGN"

Repository: thefrontside/effectionx

Length of output: 187


🏁 Script executed:

# Check the README files for any discussion of resource/task lifecycle
cat ./effect-ts/README.md | grep -A 10 -B 5 "resource\|halt\|cleanup" | head -40

Repository: thefrontside/effectionx

Length of output: 1403


🏁 Script executed:

# Look for how forEach should work by checking if there are any examples
rg "forEach" . --type ts -B 5 -A 10 | grep -E "(yield|task|halt)" -A 3 -B 3 | head -80

Repository: thefrontside/effectionx

Length of output: 3895


The returned Task doesn't own the subscription lifetime.

Line 43 subscribes in the resource scope, but line 44 spawns a child task that only consumes from the subscription. Halting the returned Task stops the drain loop but leaves the resource scope mounted until the caller's scope exits, keeping the subscription alive. This violates .policies/ergonomics.md ("Helpers must preserve structured concurrency semantics") and .policies/structured-concurrency.md ("Cancellation, cleanup, and task lifetimes should be obvious and enforced").

Compare this to drain() (line 27–32), where the subscription lifetime equals the operation lifetime—clear and correct. Move the subscription acquisition into the spawned child task, or document that the returned Task doesn't own cleanup and callers must scope the entire operation. Add a test for early task.halt() to verify the intended semantics.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@stream-helpers/for-each.ts` around lines 42 - 53, The spawned Task currently
doesn't own the subscription because subscription is acquired in the outer
resource scope (subscription is set before spawn), so halt() on the returned
Task stops the child but leaves the subscription alive; move the subscription
acquisition into the spawned child (i.e., call yield* stream.next()/subscribe
inside the generator passed to spawn) so the spawned Operation<TClose> owns and
cleans up the subscription, mirroring drain()’s semantics (see resource(),
spawn(), subscription, task), and add a unit test that calls task.halt() early
to assert the subscription is closed when the task halts.

}
8 changes: 4 additions & 4 deletions stream-helpers/reduce.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { createChannel, type Operation, sleep, spawn } from "effection";
import { describe, it } from "@effectionx/bdd";
import type { Operation } from "effection";
import { expect } from "expect";

import { forEach } from "./for-each.ts";
Expand All @@ -17,7 +17,7 @@ describe("reduce", () => {
}, 0);

let sequence: number[] = [];
yield* forEach(function* (item) {
yield* yield* forEach(function* (item) {
sequence.push(item);
}, sum(stream));

Expand All @@ -44,7 +44,7 @@ describe("reduce", () => {
{} as Record<string, string>,
);

const closeValue = yield* forEach(function* () {}, merge(stream));
const closeValue = yield* yield* forEach(function* () {}, merge(stream));
expect(closeValue).toBe(42);
});

Expand All @@ -58,7 +58,7 @@ describe("reduce", () => {
}, 0);

let sequence: number[] = [];
yield* forEach(function* (item) {
yield* yield* forEach(function* (item) {
sequence.push(item);
}, sum(stream));

Expand Down
10 changes: 5 additions & 5 deletions stream-helpers/take-until.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ describe("takeUntil", () => {
it("should yield values until predicate is true, then close with matching value", function* () {
const values: { status: string }[] = [];

const closeValue = yield* forEach(
const closeValue = yield* yield* forEach(
function* (value) {
values.push(value);
},
Expand Down Expand Up @@ -39,7 +39,7 @@ describe("takeUntil", () => {
})(),
);

const closeValue = yield* forEach(
const closeValue = yield* yield* forEach(
function* (value) {
values.push(value);
},
Expand All @@ -53,7 +53,7 @@ describe("takeUntil", () => {
it("should close immediately with first value if it matches predicate", function* () {
const values: { status: string }[] = [];

const closeValue = yield* forEach(
const closeValue = yield* yield* forEach(
function* (value) {
values.push(value);
},
Expand All @@ -75,7 +75,7 @@ describe("takeUntil", () => {

const progressStatuses: string[] = [];

const result = yield* forEach(
const result = yield* yield* forEach(
function* (progress) {
progressStatuses.push(progress.status);
},
Expand Down Expand Up @@ -103,7 +103,7 @@ describe("takeUntil", () => {
takeUntil((x) => x === 4),
);

const closeValue = yield* forEach(function* (value) {
const closeValue = yield* yield* forEach(function* (value) {
values.push(value);
}, stream);

Expand Down
10 changes: 5 additions & 5 deletions stream-helpers/take-while.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ describe("takeWhile", () => {
it("should yield values while predicate is true", function* () {
const values: number[] = [];

const closeValue = yield* forEach(
const closeValue = yield* yield* forEach(
function* (value) {
values.push(value);
},
Expand All @@ -32,7 +32,7 @@ describe("takeWhile", () => {
})(),
);

const closeValue = yield* forEach(
const closeValue = yield* yield* forEach(
function* (value) {
values.push(value);
},
Expand All @@ -46,7 +46,7 @@ describe("takeWhile", () => {
it("should not include the failing value", function* () {
const values: number[] = [];

const closeValue = yield* forEach(
const closeValue = yield* yield* forEach(
function* (value) {
values.push(value);
},
Expand All @@ -60,7 +60,7 @@ describe("takeWhile", () => {
it("should stop immediately if first value fails predicate", function* () {
const values: number[] = [];

const closeValue = yield* forEach(
const closeValue = yield* yield* forEach(
function* (value) {
values.push(value);
},
Expand All @@ -79,7 +79,7 @@ describe("takeWhile", () => {
takeWhile((x) => x < 4),
);

const closeValue = yield* forEach(function* (value) {
const closeValue = yield* yield* forEach(function* (value) {
values.push(value);
}, stream);

Expand Down
10 changes: 5 additions & 5 deletions stream-helpers/take.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ describe("take", () => {
it("should take first n values and close with the nth value", function* () {
const values: number[] = [];

const closeValue = yield* forEach(
const closeValue = yield* yield* forEach(
function* (value) {
values.push(value);
},
Expand All @@ -32,7 +32,7 @@ describe("take", () => {
})(),
);

const closeValue = yield* forEach(function* (value) {
const closeValue = yield* yield* forEach(function* (value) {
values.push(value);
}, take<number>(5)(stream));

Expand All @@ -43,7 +43,7 @@ describe("take", () => {
it("should work with n=1", function* () {
const values: number[] = [];

const closeValue = yield* forEach(
const closeValue = yield* yield* forEach(
function* (value) {
values.push(value);
},
Expand All @@ -57,7 +57,7 @@ describe("take", () => {
it("should work with n=0", function* () {
const values: number[] = [];

const closeValue = yield* forEach(
const closeValue = yield* yield* forEach(
function* (value) {
values.push(value);
},
Expand All @@ -73,7 +73,7 @@ describe("take", () => {

const stream = pipe(streamOf([1, 2, 3, 4, 5]), take(2));

const closeValue = yield* forEach(function* (value) {
const closeValue = yield* yield* forEach(function* (value) {
values.push(value);
}, stream);

Expand Down
9 changes: 3 additions & 6 deletions watch/test/watch.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { forEach } from "@effectionx/stream-helpers";
import { expect } from "expect";

import process from "node:process";
import { spawn } from "effection";
import { watch } from "../watch.ts";
import { inspector, useFixture } from "./helpers.ts";

Expand Down Expand Up @@ -94,11 +93,9 @@ describe("watch", () => {

let first = yield* processes.expectNext();

yield* spawn(function* () {
yield* forEach(function* (line) {
output.push(`${line}`.trim());
}, first.process.stdout);
});
yield* forEach(function* (line) {
output.push(`${line}`.trim());
}, first.process.stdout);

yield* is(output, (array) => array.includes("started"));

Expand Down
Loading