diff --git a/.gitignore b/.gitignore index 6d30eb7..4071ee4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ .DS_Store .vscode +tmp /deno.lock node_modules/ diff --git a/cli/commands/call.ts b/cli/commands/call.ts index 820a91f..bbdc687 100644 --- a/cli/commands/call.ts +++ b/cli/commands/call.ts @@ -24,20 +24,22 @@ export function* call(config: CallType) { name: name as any, args: argsList, }); - let next = yield* subscription.next(); - // log progress values and collect everything, including final return - while (!next.done) { - results.push(next.value); - yield* log.info(JSON.stringify(next.value)); - next = yield* subscription.next(); - } - - if (out) { - try { - yield* until(writeFile(out, JSON.stringify(results, null, 2))); - } catch (e) { - let msg = e instanceof Error ? e.message : String(e); - yield* log.error(`failed to write ${out}: ${msg}`); + try { + let next = yield* subscription.next(); + // log progress values and collect everything, including final return + while (!next.done) { + results.push(next.value); + yield* log.info(JSON.stringify(next.value)); + next = yield* subscription.next(); + } + } finally { + if (out) { + try { + yield* until(writeFile(out, JSON.stringify(results, null, 2))); + } catch (e) { + let msg = e instanceof Error ? e.message : String(e); + yield* log.error(`failed to write ${out}: ${msg}`); + } } } } diff --git a/cli/commands/run.ts b/cli/commands/run.ts index d71014a..da93857 100644 --- a/cli/commands/run.ts +++ b/cli/commands/run.ts @@ -41,7 +41,7 @@ export function run(config: RunType["config"], passthroughArgs: string[]) { } }); - yield* spawn(function* () { + const recordTask = yield* spawn(function* () { yield* ready.operation; if (config.inspectRecord) { yield* recordNodeMapToFile(host, config.inspectRecord, config.protocol); @@ -56,6 +56,7 @@ export function run(config: RunType["config"], passthroughArgs: string[]) { }); let status = yield* child.join(); + yield* recordTask; yield* provide(status.code); } finally { diff --git a/debug.sh b/debug.sh new file mode 100755 index 0000000..a392804 --- /dev/null +++ b/debug.sh @@ -0,0 +1,23 @@ +#!/bin/bash +set -euo pipefail +PORT=$(node -e "const net = require('node:net'); const server = net.createServer(); server.listen(0, () => { console.log(server.address().port); server.close(); });") +pnpm cli --inspect-port "$PORT" --inspect-package ./loader.ts --inspect-pause --inspect-record tmp/out.json ./examples/forever.ts >tmp/inspector.log 2>tmp/inspector.err & +INSPECT_PID=$! +sleep 2 +set +e +pnpm cli call watchScopes --host http://localhost:$PORT --out tmp/watchScopes.json >tmp/watchScopes.log 2>tmp/watchScopes.err & +WATCH_PID=$! +pnpm cli call recordNodeMap --host http://localhost:$PORT --out tmp/recordNodeMap.json >tmp/recordNodeMap.log 2>tmp/recordNodeMap.err & +RECORD_PID=$! +set -e +sleep 1 +pnpm cli call play --host http://localhost:$PORT --out tmp/call-play.json >tmp/call-play.log 2>tmp/call-play.err +sleep 5 +kill -INT "$INSPECT_PID" 2>/dev/null || true +sleep 1 +pkill -TERM -P "$INSPECT_PID" 2>/dev/null || true +wait $INSPECT_PID 2>/dev/null || true +wait $WATCH_PID 2>/dev/null || true +wait $RECORD_PID 2>/dev/null || true + +# pnpm cli --inspect-package ./loader.ts --inspect-pause --inspect-record tmp/out.json ./examples/forever.ts \ No newline at end of file diff --git a/examples/forever.ts b/examples/forever.ts index 66721c6..67e26d9 100644 --- a/examples/forever.ts +++ b/examples/forever.ts @@ -1,28 +1,56 @@ import { main, suspend, spawn, sleep, useAttributes, type Task } from "effection"; await main(function* () { + yield* useAttributes({ name: "Main" }); + try { + yield* spawn(function* () { + yield* useAttributes({ name: "supervisor", role: "watcher" }); + yield* sleep(1000); + return "supervisor done"; + }); + + let background = yield* spawn(function* () { + yield* useAttributes({ name: "background", purpose: "heartbeat" }); + let beats = 0; + while (true) { + yield* sleep(200); + beats++; + yield* useAttributes({ name: "background", beats }); + } + }); + let tasks: Task[] = []; - for (let i = 1; i <= 5; i++) { + for (let i = 1; i <= 15; i++) { let task = yield* spawn(function* () { - yield* useAttributes({ name: "child", number: i }); + yield* useAttributes({ name: "child", number: i, category: i % 2 === 0 ? "even" : "odd" }); + + let helper = yield* spawn(function* () { + yield* useAttributes({ name: "helper", child: i }); + yield* sleep(40 * i); + return `helper ${i} done`; + }); + yield* sleep(50 * i); + yield* helper.halt(); console.log(`child ${i} done`); return `child ${i} done`; }); + tasks.push(task); } // let some tasks start up yield* sleep(150); - for (let t of tasks) { - yield* t.halt(); - } + yield* background.halt(); - console.log("all children halted, but we will never finish..."); + console.log("some children halted, but we will never finish..."); // never finish; keeps the loader running which keeps the UI server up during tests // but also simulates a long-running process that we can attach to + // when this receives a SIGTERM, it will push every effect into the `finally` block + // and start to shut all of the children down, we don't need to call `.halt()` on anything + // but we have halted a few of the children simply to see some variable movement in our tree yield* suspend(); } finally { console.log("shutting down, forever was a fallacy"); diff --git a/examples/spawn-children.ts b/examples/spawn-children.ts index 2a07f9f..5db4778 100644 --- a/examples/spawn-children.ts +++ b/examples/spawn-children.ts @@ -1,19 +1,42 @@ import { main, sleep, spawn, useAttributes, type Task } from "effection"; await main(function* () { - let tasks: Task[] = []; - for (let i = 1; i <= 10; i++) { - let task = yield* spawn(function* () { - yield* useAttributes({ name: "child", number: i }); - let delay = Math.random() * 1000; - yield* sleep(delay); - return `${i} is done`; + let groupTasks: Task[] = []; + for (let group of ["alpha", "beta"]) { + let groupTask = yield* spawn(function* () { + yield* useAttributes({ name: "group", label: group }); + let childTasks: Task[] = []; + + for (let i = 1; i <= 5; i++) { + let childTask = yield* spawn(function* () { + yield* useAttributes({ name: "child", group, number: i }); + + let worker = yield* spawn(function* () { + yield* useAttributes({ name: "worker", stage: "inner", number: i }); + yield* sleep(Math.random() * 300); + return `worker ${group}-${i} done`; + }); + + yield* sleep(Math.random() * 120); + yield* worker.halt(); + return `${group}-${i} done`; + }); + + childTasks.push(childTask); + } + + for (let child of childTasks) { + yield* sleep(Math.random() * 200); + yield* child.halt(); + } + + return `${group} done`; }); - tasks.push(task); + + groupTasks.push(groupTask); } - for (let task of tasks) { - yield* sleep(Math.random() * 200); + for (let task of groupTasks) { yield* task.halt(); } diff --git a/lib/implementations/player.ts b/lib/implementations/player.ts index 7e83e10..7b1b9c2 100644 --- a/lib/implementations/player.ts +++ b/lib/implementations/player.ts @@ -35,6 +35,7 @@ export const player = createImplementation(protocol, function* (root) { yield* setContext({ status: "playing" }); yield* cxt.resume(); } + return null; }), watchPlayerState: () => diff --git a/lib/implementations/scope.ts b/lib/implementations/scope.ts index 73da88b..88c4d90 100644 --- a/lib/implementations/scope.ts +++ b/lib/implementations/scope.ts @@ -8,7 +8,6 @@ import { pipe } from "remeda"; import { createSubject } from "@effectionx/stream-helpers"; import { AttributesContext, getLabels } from "../labels.ts"; import { updateNodeMap } from "../update-node-map.ts"; -import { truncate } from "../truncate.ts"; const Id = createContext("@effectionx/inspector.id", "global"); const Children = createContext>("@effection/scope.children", new Set()); @@ -78,7 +77,7 @@ export const scope = createImplementation(protocol, function* (root) { return { watchScopes: () => - pipe(stream, createSubject({ type: "tree", value: readTree(root) }), truncate()), + pipe(stream, createSubject({ type: "tree", value: readTree(root) })), getScopes: op(function* () { return readTree(root); }), @@ -87,7 +86,6 @@ export const scope = createImplementation(protocol, function* (root) { stream, createSubject({ type: "tree", value: readTree(root) }), updateNodeMap({}), - truncate(), ), }; }) as Inspector; diff --git a/lib/maybe.ts b/lib/maybe.ts new file mode 100644 index 0000000..8340063 --- /dev/null +++ b/lib/maybe.ts @@ -0,0 +1,22 @@ +export type Maybe = + | { + readonly exists: false; + } + | { + readonly exists: true; + readonly value: T; + }; + +export function Just(): Maybe; +export function Just(value: T): Maybe; +export function Just(value?: T): Maybe { + if (typeof value === "undefined") { + return { exists: true } as Maybe; + } else { + return { exists: true, value }; + } +} + +export function Nothing(): Maybe { + return { exists: false }; +} diff --git a/lib/protocols/player.ts b/lib/protocols/player.ts index 4ba1b28..3aaffcf 100644 --- a/lib/protocols/player.ts +++ b/lib/protocols/player.ts @@ -10,6 +10,6 @@ export const protocol = createProtocol({ play: { args: type("never[]"), progress: type("never"), - returns: type("undefined"), + returns: type("null"), }, }); diff --git a/lib/sse-client.ts b/lib/sse-client.ts index 8a7a95d..bf77c73 100644 --- a/lib/sse-client.ts +++ b/lib/sse-client.ts @@ -73,7 +73,9 @@ export function createSSEClient( } let { type, data } = next.value; - let parsed = JSON.parse(data); + // check so we don't throw on our parse of the data, + // but we will throw in validation instead + let parsed = data === "undefined" ? undefined : JSON.parse(data); if (type === "yield") { return { done: false, diff --git a/lib/sse-server.ts b/lib/sse-server.ts index 0c336ce..ebb2f36 100644 --- a/lib/sse-server.ts +++ b/lib/sse-server.ts @@ -1,10 +1,10 @@ import { - type Operation, + createQueue, createScope, ensure, - once, - race, + type Operation, resource, + sleep, spawn, suspend, until, @@ -15,9 +15,17 @@ import { import type { Handle, Methods } from "./types.ts"; import { validateUnsafe } from "./validate.ts"; -import { createEventStream, defineEventHandler, H3, serve, serveStatic } from "h3"; +import { + createEventStream, + defineEventHandler, + type EventStreamMessage, + H3, + serve, + serveStatic, +} from "h3"; import { readFile, stat } from "node:fs/promises"; import { join } from "node:path"; +import { type Maybe, Nothing } from "./maybe.ts"; export interface SSEServerOptions { port: number; @@ -33,18 +41,40 @@ export function useSSEServer( return resource(function* (provide) { yield* useAttributes({ name: "SSEServer", port }); - let [scope, destroy] = createScope(yield* useScope()); + let currentScope = yield* useScope(); let app = new H3(); + let [requestScope, destroyRequestScope] = createScope(currentScope); for (let name of methodNames) { app.all(`/${String(name)}`, async (event) => { let { req } = event; - // when the response ends for whatever reason, h3 by default closes the stream - // instead we opt to handle the close ourselves, and kill the task on that event - let stream = createEventStream(event, { autoclose: false }); + // The SSE stream is bound to the HTTP request lifecycle. `autoclose` + // means the stream will automatically finish when the response ends. + // If the client disconnects, `stream.onClosed()` will halt the request task. + let stream = createEventStream(event, { autoclose: true }); + // The queue buffers outgoing EventStream messages. The producer writes + // to the queue and the `drain()` consumer flushes them into the stream. + // This way the producer can start shutdown and stop sending messages, + // and out of band from that action, we can finalize and send anything + // remaining in the queue before closing the stream. + let queue = createQueue>(); + + function* drain() { + let next = yield* queue.next(); + while (!next.done) { + yield* until(stream.push([next.value])); + next = yield* queue.next(); + } + } - scope.run(function* () { + requestScope.run(function* () { + yield* spawn(function* () { + while (true) { + yield* sleep(1000); + yield* until(stream.pushComment("keepalive")); + } + }); yield* ensure(function* () { yield* until(stream.flush()); yield* until(stream.close()); @@ -62,7 +92,8 @@ export function useSSEServer( handle.invoke({ name, args }), ); - let events = yield* spawn(function* () { + // The middleware producer reads from the protocol subscription and enqueues events + yield* spawn(function* () { try { yield* useAttributes({ name: "EventStream" }); @@ -71,43 +102,51 @@ export function useSSEServer( if (req.signal.aborted) { break; } - yield* until( - stream.push({ - event: "yield", - data: JSON.stringify(next.value), - }), - ); + queue.add({ + event: "yield", + data: JSON.stringify(next.value), + }); next = yield* subscription.next(); } - let value = validateUnsafe(protocol.methods[name].returns, next?.value); - yield* until( - stream.push({ + + // If the request was aborted it means that the client has disconnected + // while we were sending progress events. It will trigger the `break` above, + // drop out of the loop and we won't be able to emit the final event. + if (!req.signal.aborted) { + let value = validateUnsafe(protocol.methods[name].returns, next.value); + // to avoid throws around JSON handling, let it throw in validation + let data = value === undefined ? "undefined" : JSON.stringify(value); + queue.add({ event: "return", - data: JSON.stringify(value), - }), - ); - // return sent, we can consider the stream finalized - // and skip remaining steps in the finally block + data, + }); + } } catch (cause) { let error = cause instanceof Error ? cause : new Error("unknown", { cause }); let { name, message } = error; - yield* until( - stream.push({ - event: "throw", - data: JSON.stringify({ name, message }), - }), - ); + queue.add({ + event: "throw", + data: JSON.stringify({ name, message }), + }); + } finally { + queue.close(Nothing()); + yield* drain(); } }); try { - yield* race([events, once(req.signal, "abort")]); + yield* drain(); } finally { + // `drain()` has finished either because the queue closed normally or + // because the request was interrupted. Clean up the underlying + // protocol subscription first, then allow any remaining queued + // events to drain. yield* flush(); + queue.close(Nothing()); } }); - return await stream.send(); + return stream.send(); }); } @@ -149,7 +188,7 @@ export function useSSEServer( try { yield* provide(`http://localhost:${port}`); } finally { - yield* destroy(); + yield* destroyRequestScope(); yield* until(server.close()); } }); diff --git a/lib/truncate.ts b/lib/truncate.ts index cdf5c2c..e6e2d23 100644 --- a/lib/truncate.ts +++ b/lib/truncate.ts @@ -1,21 +1,22 @@ -import { createSignal, each, resource, spawn, type Subscription, type Stream } from "effection"; +import { createQueue, each, resource, spawn, type Subscription, type Stream } from "effection"; export function truncate(): (source: Stream) => Stream { return (source: Stream) => resource>(function* (provide) { - let target = createSignal(); + let queue = createQueue(); yield* spawn(function* () { for (let event of yield* each(source)) { - target.send(event); + queue.add(event); yield* each.next(); } }); try { - yield* provide(yield* target); + yield* provide(queue); } finally { - target.close(null); + console.log("marking queue done"); + queue.close(null); } }); } diff --git a/loader.ts b/loader.ts index b54957e..befa65c 100644 --- a/loader.ts +++ b/loader.ts @@ -1,5 +1,5 @@ import process from "node:process"; -import { global, useAttributes } from "effection"; +import { global, scoped, useAttributes } from "effection"; import { api } from "effection/experimental"; import { combine } from "./mod.ts"; import { scope } from "./lib/implementations/scope.ts"; @@ -38,7 +38,7 @@ if (config.ok) { yield* pause(); } - yield* body(args); + yield* scoped(() => body(args)); } finally { yield* detach(); process.stderr.write("detached, inspector shut down"); diff --git a/package.json b/package.json index 3ef8645..4fa7e76 100644 --- a/package.json +++ b/package.json @@ -32,8 +32,8 @@ }, "scripts": { "start": "vite --host 0.0.0.0 ui", - "example": "node --experimental-strip-types --import ./loader.ts ./examples/spawn-children.ts", - "cli": "node --experimental-strip-types ./cli/index.ts", + "example": "node --import ./loader.ts ./examples/spawn-children.ts", + "cli": "node ./cli/index.ts", "build": "pnpm run build:tsc && pnpm run build:ui", "build:tsc": "tsc -p tsconfig.build.json", "build:ui": "vite build ui", @@ -55,7 +55,7 @@ "@standard-schema/spec": "^1.0.0", "arktype": "^2.1.27", "configliere": "^0.3.0", - "h3": "2.0.1-rc.14", + "h3": "2.0.1-rc.20", "parse-sse": "^0.1.0", "remeda": "^2" }, @@ -66,7 +66,7 @@ "@playwright/test": "^1.58.2", "@shoelace-style/shoelace": "^2.20.1", "@types/d3": "^7.4.3", - "@types/node": "22", + "@types/node": "24", "d3": "^7.9.0", "effection": "4.1.0-alpha.7", "expect": "^30.2.0", @@ -83,7 +83,8 @@ "packageManager": "pnpm@9.15.9", "pnpm": { "patchedDependencies": { - "@effectionx/test-adapter@0.7.2": "patches/@effectionx__test-adapter@0.7.2.patch" + "@effectionx/test-adapter@0.7.2": "patches/@effectionx__test-adapter@0.7.2.patch", + "effection@4.1.0-alpha.7": "patches/effection@4.1.0-alpha.7.patch" } } } diff --git a/patches/effection@4.1.0-alpha.7.patch b/patches/effection@4.1.0-alpha.7.patch new file mode 100644 index 0000000..a8cb708 --- /dev/null +++ b/patches/effection@4.1.0-alpha.7.patch @@ -0,0 +1,70 @@ +diff --git a/lib/delimiter.ts b/lib/delimiter.ts +index a262961e8..cba294469 100644 +--- a/lib/delimiter.ts ++++ b/lib/delimiter.ts +@@ -12,6 +12,7 @@ export class Delimiter + finalized = false; + future = withResolvers>>(); + computed = false; ++ settling = false; + routine?: Coroutine; + outcome?: Maybe>; + +@@ -33,6 +34,10 @@ export class Delimiter + this.exit(Nothing()); + } + ++ settle(): void { ++ this.settling = true; ++ } ++ + *close(): Operation { + let done = this.future.operation; + let interrupted = !this.computed; +@@ -46,6 +51,8 @@ export class Delimiter + if (!this.outcome) { + this.interrupt(); + yield* this.close(); ++ } else if (!this.finalized) { ++ yield* this.close(); + } else { + if (interrupted && this.outcome.exists && !this.outcome.value.ok) { + throw this.outcome.value.error; +@@ -65,7 +72,7 @@ export class Delimiter + if (!this.routine) { + this.finalized = true; + this.future.resolve(this.outcome); +- } else { ++ } else if (!this.settling) { + this.routine.return(Ok(this.outcome)); + } + } + diff --git a/lib/task-group.ts b/lib/task-group.ts + index cbabbc9b3..adbc7efc8 100644 +--- a/lib/task-group.ts ++++ b/lib/task-group.ts +@@ -1,6 +1,8 @@ + import { createContext } from "./context.ts"; + import { box } from "./box.ts"; ++import { DelimiterContext } from "./delimiter.ts"; + import { Ok, unbox } from "./result.ts"; ++import { useScope } from "./scope.ts"; + import type { Operation, Task } from "./types.ts"; + + export class TaskGroup { +@@ -40,7 +42,14 @@ export function encapsulate(operation: () => Operation): Operation { + try { + return yield* operation(); + } finally { +- yield* group.halt(); ++ let scope = yield* useScope(); ++ let delimiter = scope.expect(DelimiterContext); ++ delimiter.settle(); ++ try { ++ yield* group.halt(); ++ } finally { ++ delimiter.settling = false; ++ } + } + }); + } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 754b8b2..84ac0ab 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -8,6 +8,9 @@ patchedDependencies: '@effectionx/test-adapter@0.7.2': hash: 5e55lwc63rmijksdakvpd5lq2m path: patches/@effectionx__test-adapter@0.7.2.patch + effection@4.1.0-alpha.7: + hash: wipyqrz763hb3akxfrnccbogri + path: patches/effection@4.1.0-alpha.7.patch importers: @@ -21,10 +24,10 @@ importers: version: 0.7.5 '@effectionx/process': specifier: ^0.7.3 - version: 0.7.3(effection@4.1.0-alpha.7) + version: 0.7.3(effection@4.1.0-alpha.7(patch_hash=wipyqrz763hb3akxfrnccbogri)) '@effectionx/stream-helpers': specifier: ^0.8.0 - version: 0.8.0(effection@4.1.0-alpha.7) + version: 0.8.0(effection@4.1.0-alpha.7(patch_hash=wipyqrz763hb3akxfrnccbogri)) '@standard-schema/spec': specifier: ^1.0.0 version: 1.1.0 @@ -35,8 +38,8 @@ importers: specifier: ^0.3.0 version: 0.3.0 h3: - specifier: 2.0.1-rc.14 - version: 2.0.1-rc.14 + specifier: 2.0.1-rc.20 + version: 2.0.1-rc.20 parse-sse: specifier: ^0.1.0 version: 0.1.0 @@ -46,7 +49,7 @@ importers: devDependencies: '@effectionx/bdd': specifier: ^0.4.3 - version: 0.4.3(effection@4.1.0-alpha.7) + version: 0.4.3(effection@4.1.0-alpha.7(patch_hash=wipyqrz763hb3akxfrnccbogri)) '@nano-router/history': specifier: ^4.0.4 version: 4.0.4 @@ -63,14 +66,14 @@ importers: specifier: ^7.4.3 version: 7.4.3 '@types/node': - specifier: '22' - version: 22.19.11 + specifier: '24' + version: 24.12.2 d3: specifier: ^7.9.0 version: 7.9.0 effection: specifier: 4.1.0-alpha.7 - version: 4.1.0-alpha.7 + version: 4.1.0-alpha.7(patch_hash=wipyqrz763hb3akxfrnccbogri) expect: specifier: ^30.2.0 version: 30.2.0 @@ -88,10 +91,10 @@ importers: version: 5.9.3 vite: specifier: ^7.3.1 - version: 7.3.1(@types/node@22.19.11)(lightningcss@1.31.1)(tsx@4.21.0) + version: 7.3.1(@types/node@24.12.2)(lightningcss@1.31.1)(tsx@4.21.0) vite-plugin-static-copy: specifier: ^3.2.0 - version: 3.2.0(vite@7.3.1(@types/node@22.19.11)(lightningcss@1.31.1)(tsx@4.21.0)) + version: 3.2.0(vite@7.3.1(@types/node@24.12.2)(lightningcss@1.31.1)(tsx@4.21.0)) packages: @@ -890,8 +893,8 @@ packages: '@types/istanbul-reports@3.0.4': resolution: {integrity: sha512-pk2B1NWalF9toCRu6gjBzR69syFjP4Od8WRAX+0mmf9lAjCRicLOWc+ZrxZHx/0XRjotgkF9t6iaMJ+aXcOdZQ==} - '@types/node@22.19.11': - resolution: {integrity: sha512-BH7YwL6rA93ReqeQS1c4bsPpcfOmJasG+Fkr6Y59q83f9M1WcBRHR2vM+P9eOisYRcN3ujQoiZY8uk5W+1WL8w==} + '@types/node@24.12.2': + resolution: {integrity: sha512-A1sre26ke7HDIuY/M23nd9gfB+nrmhtYyMINbjI1zHJxYteKR6qSMX56FsmjMcDb3SMcjJg5BiRRgOCC/yBD0g==} '@types/react@19.2.14': resolution: {integrity: sha512-ilcTH/UniCkMdtexkoCN0bI7pMcJDvmQFPvuPvmEaYA/NSfFTAgdUSLAoVjaRJm7+6PvcM+q1zYOwS4wTYMF9w==} @@ -1160,8 +1163,8 @@ packages: graceful-fs@4.2.11: resolution: {integrity: sha512-RbJ5/jmFcNNCcDV5o9eTnBLJ/HszWV0P73bc+Ff4nS/rJj+YaS6IGyiOL0VoBYX+l1Wrl3k63h/KrH+nhJ0XvQ==} - h3@2.0.1-rc.14: - resolution: {integrity: sha512-163qbGmTr/9rqQRNuqMqtgXnOUAkE4KTdauiC9y0E5iG1I65kte9NyfWvZw5RTDMt6eY+DtyoNzrQ9wA2BfvGQ==} + h3@2.0.1-rc.20: + resolution: {integrity: sha512-28ljodXuUp0fZovdiSRq4G9OgrxCztrJe5VdYzXAB7ueRvI7pIUqLU14Xi3XqdYJ/khXjfpUOOD2EQa6CmBgsg==} engines: {node: '>=20.11.1'} hasBin: true peerDependencies: @@ -1389,6 +1392,7 @@ packages: querystring@0.2.1: resolution: {integrity: sha512-wkvS7mL/JMugcup3/rMitHmd9ecIGd2lhFhK9N3UUQ450h66d1r3Y9nvXzQAW1Lq+wyx61k/1pfKS5KuKiyEbg==} engines: {node: '>=0.4.x'} + deprecated: The querystring API is considered Legacy. new code should use the URLSearchParams API instead. react-is@18.3.1: resolution: {integrity: sha512-/LLMVyas0ljjAtoYiPqYiL8VWXzUUdThrmU5+n20DZv+a+ClRoevUzw5JxU+Ieh5/c87ytoTBV9G1FiKfNJdmg==} @@ -1411,8 +1415,8 @@ packages: engines: {node: '>=18.0.0', npm: '>=8.0.0'} hasBin: true - rou3@0.7.12: - resolution: {integrity: sha512-iFE4hLDuloSWcD7mjdCDhx2bKcIsYbtOTpfH5MHHLSKMOUyjqQXTeZVa289uuwEGEKFoE/BAPbhaU4B774nceg==} + rou3@0.8.1: + resolution: {integrity: sha512-ePa+XGk00/3HuCqrEnK3LxJW7I0SdNg6EFzKUJG73hMAdDcOUC/i/aSz7LSDwLrGr33kal/rqOGydzwl6U7zBA==} rw@1.3.3: resolution: {integrity: sha512-PdhdWy89SiZogBLaw42zdeqtRJ//zFd2PgQavcICDUgJT5oW10QCRKbJ6bg4r0/UY2M6BWd5tkxuGFRvCkgfHQ==} @@ -1439,8 +1443,8 @@ packages: resolution: {integrity: sha512-UXWMKhLOwVKb728IUtQPXxfYU+usdybtUrK/8uGE8CQMvrhOpwvzDBwj0QhSL7MQc7vIsISBG8VQ8+IDQxpfQA==} engines: {node: '>=0.10.0'} - srvx@0.11.4: - resolution: {integrity: sha512-m/2p87bqWZ94xpRN06qNBwh0xq/D0dXajnvPDSHFqrTogxuTWYNP1UHz6Cf+oY7D+NPLY35TJAp4ESIKn0WArQ==} + srvx@0.11.15: + resolution: {integrity: sha512-iXsux0UcOjdvs0LCMa2Ws3WwcDUozA3JN3BquNXkaFPP7TpRqgunKdEgoZ/uwb1J6xaYHfxtz9Twlh6yzwM6Tg==} engines: {node: '>=20.16.0'} hasBin: true @@ -1477,8 +1481,8 @@ packages: engines: {node: '>=14.17'} hasBin: true - undici-types@6.21.0: - resolution: {integrity: sha512-iwDZqg0QAGrg9Rav5H4n0M64c3mkR59cJ6wQp+7C4nI0gsmExaedaYLNO44eT4AtBBwjbTiGPMlt2Md0T9H9JQ==} + undici-types@7.16.0: + resolution: {integrity: sha512-Zz+aZWSj8LE6zoxD+xrjh4VfkIG8Ya6LvYkZqtUQGJPZjYl53ypCaUwWqo7eI0x66KBGeRo+mlBEkMSeSZ38Nw==} vite-plugin-static-copy@3.2.0: resolution: {integrity: sha512-g2k9z8B/1Bx7D4wnFjPLx9dyYGrqWMLTpwTtPHhcU+ElNZP2O4+4OsyaficiDClus0dzVhdGvoGFYMJxoXZ12Q==} @@ -1551,43 +1555,43 @@ snapshots: '@ctrl/tinycolor@4.2.0': {} - '@effectionx/bdd@0.4.3(effection@4.1.0-alpha.7)': + '@effectionx/bdd@0.4.3(effection@4.1.0-alpha.7(patch_hash=wipyqrz763hb3akxfrnccbogri))': dependencies: - '@effectionx/test-adapter': 0.7.2(patch_hash=5e55lwc63rmijksdakvpd5lq2m)(effection@4.1.0-alpha.7) - effection: 4.1.0-alpha.7 + '@effectionx/test-adapter': 0.7.2(patch_hash=5e55lwc63rmijksdakvpd5lq2m)(effection@4.1.0-alpha.7(patch_hash=wipyqrz763hb3akxfrnccbogri)) + effection: 4.1.0-alpha.7(patch_hash=wipyqrz763hb3akxfrnccbogri) - '@effectionx/node@0.2.2(effection@4.1.0-alpha.7)': + '@effectionx/node@0.2.2(effection@4.1.0-alpha.7(patch_hash=wipyqrz763hb3akxfrnccbogri))': dependencies: - effection: 4.1.0-alpha.7 + effection: 4.1.0-alpha.7(patch_hash=wipyqrz763hb3akxfrnccbogri) - '@effectionx/process@0.7.3(effection@4.1.0-alpha.7)': + '@effectionx/process@0.7.3(effection@4.1.0-alpha.7(patch_hash=wipyqrz763hb3akxfrnccbogri))': dependencies: - '@effectionx/node': 0.2.2(effection@4.1.0-alpha.7) + '@effectionx/node': 0.2.2(effection@4.1.0-alpha.7(patch_hash=wipyqrz763hb3akxfrnccbogri)) cross-spawn: 7.0.6 ctrlc-windows: 2.2.0 - effection: 4.1.0-alpha.7 + effection: 4.1.0-alpha.7(patch_hash=wipyqrz763hb3akxfrnccbogri) shellwords-ts: 3.0.1 - '@effectionx/signals@0.5.1(effection@4.1.0-alpha.7)': + '@effectionx/signals@0.5.1(effection@4.1.0-alpha.7(patch_hash=wipyqrz763hb3akxfrnccbogri))': dependencies: - effection: 4.1.0-alpha.7 + effection: 4.1.0-alpha.7(patch_hash=wipyqrz763hb3akxfrnccbogri) immutable: 5.1.4 - '@effectionx/stream-helpers@0.8.0(effection@4.1.0-alpha.7)': + '@effectionx/stream-helpers@0.8.0(effection@4.1.0-alpha.7(patch_hash=wipyqrz763hb3akxfrnccbogri))': dependencies: - '@effectionx/signals': 0.5.1(effection@4.1.0-alpha.7) - '@effectionx/timebox': 0.4.1(effection@4.1.0-alpha.7) - effection: 4.1.0-alpha.7 + '@effectionx/signals': 0.5.1(effection@4.1.0-alpha.7(patch_hash=wipyqrz763hb3akxfrnccbogri)) + '@effectionx/timebox': 0.4.1(effection@4.1.0-alpha.7(patch_hash=wipyqrz763hb3akxfrnccbogri)) + effection: 4.1.0-alpha.7(patch_hash=wipyqrz763hb3akxfrnccbogri) immutable: 5.1.4 remeda: 2.33.6 - '@effectionx/test-adapter@0.7.2(patch_hash=5e55lwc63rmijksdakvpd5lq2m)(effection@4.1.0-alpha.7)': + '@effectionx/test-adapter@0.7.2(patch_hash=5e55lwc63rmijksdakvpd5lq2m)(effection@4.1.0-alpha.7(patch_hash=wipyqrz763hb3akxfrnccbogri))': dependencies: - effection: 4.1.0-alpha.7 + effection: 4.1.0-alpha.7(patch_hash=wipyqrz763hb3akxfrnccbogri) - '@effectionx/timebox@0.4.1(effection@4.1.0-alpha.7)': + '@effectionx/timebox@0.4.1(effection@4.1.0-alpha.7(patch_hash=wipyqrz763hb3akxfrnccbogri))': dependencies: - effection: 4.1.0-alpha.7 + effection: 4.1.0-alpha.7(patch_hash=wipyqrz763hb3akxfrnccbogri) '@esbuild/aix-ppc64@0.27.3': optional: true @@ -1688,7 +1692,7 @@ snapshots: '@jest/pattern@30.0.1': dependencies: - '@types/node': 22.19.11 + '@types/node': 24.12.2 jest-regex-util: 30.0.1 '@jest/schemas@30.0.5': @@ -1701,7 +1705,7 @@ snapshots: '@jest/schemas': 30.0.5 '@types/istanbul-lib-coverage': 2.0.6 '@types/istanbul-reports': 3.0.4 - '@types/node': 22.19.11 + '@types/node': 24.12.2 '@types/yargs': 17.0.35 chalk: 4.1.2 @@ -2098,9 +2102,9 @@ snapshots: dependencies: '@types/istanbul-lib-report': 3.0.3 - '@types/node@22.19.11': + '@types/node@24.12.2': dependencies: - undici-types: 6.21.0 + undici-types: 7.16.0 '@types/react@19.2.14': dependencies: @@ -2347,7 +2351,7 @@ snapshots: detect-libc@2.1.2: {} - effection@4.1.0-alpha.7: {} + effection@4.1.0-alpha.7(patch_hash=wipyqrz763hb3akxfrnccbogri): {} esbuild@0.27.3: optionalDependencies: @@ -2414,10 +2418,10 @@ snapshots: graceful-fs@4.2.11: {} - h3@2.0.1-rc.14: + h3@2.0.1-rc.20: dependencies: - rou3: 0.7.12 - srvx: 0.11.4 + rou3: 0.8.1 + srvx: 0.11.15 has-flag@4.0.0: {} @@ -2472,7 +2476,7 @@ snapshots: jest-mock@30.2.0: dependencies: '@jest/types': 30.2.0 - '@types/node': 22.19.11 + '@types/node': 24.12.2 jest-util: 30.2.0 jest-regex-util@30.0.1: {} @@ -2480,7 +2484,7 @@ snapshots: jest-util@30.2.0: dependencies: '@jest/types': 30.2.0 - '@types/node': 22.19.11 + '@types/node': 24.12.2 chalk: 4.1.2 ci-info: 4.4.0 graceful-fs: 4.2.11 @@ -2699,7 +2703,7 @@ snapshots: '@rollup/rollup-win32-x64-msvc': 4.57.1 fsevents: 2.3.3 - rou3@0.7.12: {} + rou3@0.8.1: {} rw@1.3.3: {} @@ -2717,7 +2721,7 @@ snapshots: source-map-js@1.2.1: {} - srvx@0.11.4: {} + srvx@0.11.15: {} stack-utils@2.0.6: dependencies: @@ -2750,17 +2754,17 @@ snapshots: typescript@5.9.3: {} - undici-types@6.21.0: {} + undici-types@7.16.0: {} - vite-plugin-static-copy@3.2.0(vite@7.3.1(@types/node@22.19.11)(lightningcss@1.31.1)(tsx@4.21.0)): + vite-plugin-static-copy@3.2.0(vite@7.3.1(@types/node@24.12.2)(lightningcss@1.31.1)(tsx@4.21.0)): dependencies: chokidar: 3.6.0 p-map: 7.0.4 picocolors: 1.1.1 tinyglobby: 0.2.15 - vite: 7.3.1(@types/node@22.19.11)(lightningcss@1.31.1)(tsx@4.21.0) + vite: 7.3.1(@types/node@24.12.2)(lightningcss@1.31.1)(tsx@4.21.0) - vite@7.3.1(@types/node@22.19.11)(lightningcss@1.31.1)(tsx@4.21.0): + vite@7.3.1(@types/node@24.12.2)(lightningcss@1.31.1)(tsx@4.21.0): dependencies: esbuild: 0.27.3 fdir: 6.5.0(picomatch@4.0.3) @@ -2769,7 +2773,7 @@ snapshots: rollup: 4.57.1 tinyglobby: 0.2.15 optionalDependencies: - '@types/node': 22.19.11 + '@types/node': 24.12.2 fsevents: 2.3.3 lightningcss: 1.31.1 tsx: 4.21.0 diff --git a/tests/fs.ts b/tests/fs.ts new file mode 100644 index 0000000..de8d155 --- /dev/null +++ b/tests/fs.ts @@ -0,0 +1,57 @@ +import * as fsp from "node:fs/promises"; +import type { Dirent, Stats } from "node:fs"; +import { createApi } from "effection/experimental"; +import type { Operation } from "effection"; +import { ensure, until } from "effection"; + +export interface Fs { + stat(path: string): Operation; + lstat(path: string): Operation; + readdir(path: string): Operation; + readdirDirents(path: string): Operation; + rm(path: string, options?: { recursive?: boolean; force?: boolean }): Operation; + copyFile(src: string, dest: string): Operation; + readTextFile(path: string): Operation; + writeTextFile(path: string, content: string): Operation; +} + +const fsApi = createApi("fs", { + stat(path: string) { + return until(fsp.stat(path)); + }, + + lstat(path: string) { + return until(fsp.lstat(path)); + }, + + readdir(path: string) { + return until(fsp.readdir(path)); + }, + + readdirDirents(path: string) { + return until(fsp.readdir(path, { withFileTypes: true })); + }, + + rm(path: string, options?: { recursive?: boolean; force?: boolean }) { + return until(fsp.rm(path, options)); + }, + + copyFile(src: string, dest: string) { + return until(fsp.copyFile(src, dest)); + }, + + *readTextFile(path: string) { + yield* ensure(function* () { + yield* until(fsp.rm(path, { force: true })); + }); + + return yield* until(fsp.readFile(path, "utf-8")); + }, + + writeTextFile(path: string, content: string) { + return until(fsp.writeFile(path, content)); + }, +} satisfies Fs); + +export const fs = fsApi.operations; +export { fsApi }; diff --git a/tests/implementation.test.ts b/tests/implementation.test.ts index 039a15a..fde45c5 100644 --- a/tests/implementation.test.ts +++ b/tests/implementation.test.ts @@ -5,6 +5,7 @@ import { scope as arktypeScope } from "arktype"; import type { Method } from "../lib/types.ts"; import { createImplementation, createProtocol } from "../lib/mod.ts"; +import { player } from "../lib/implementations/player.ts"; describe("createImplementation()", () => { it("attach yields a handle with protocol and methods and invoke calls the method", function* () { @@ -51,4 +52,20 @@ describe("createImplementation()", () => { const next = yield* sub.next(); expect(next).toEqual({ done: true, value: "hello" }); }); + + it("play returns null", function* () { + const [rootScope] = createScope(); + const handle = yield* player.attach(rootScope); + + let result; + let invocation = rootScope.run(function* () { + const stream = handle.invoke({ name: "play", args: [] }); + const sub = yield* stream; + result = yield* sub.next(); + }); + + yield* invocation; + + expect(result).toEqual({ done: true, value: null }); + }); }); diff --git a/tests/inspector-process.test.ts b/tests/inspector-process.test.ts new file mode 100644 index 0000000..6a193bb --- /dev/null +++ b/tests/inspector-process.test.ts @@ -0,0 +1,482 @@ +import { beforeEach, describe, it } from "@effectionx/bdd"; +import { strict as assert } from "node:assert"; +import { createServer } from "node:net"; +import { readFile } from "node:fs/promises"; +import { join } from "node:path"; +import { call, sleep, spawn, until, withResolvers, type Yielded } from "effection"; +import { exec } from "@effectionx/process"; +import { fs } from "./fs.ts"; + +function getAvailablePort(): Promise { + return new Promise((resolve, reject) => { + const server = createServer(); + + server.once("error", (error) => { + reject(error); + }); + + server.listen(0, () => { + const address = server.address(); + if (address && typeof address === "object") { + const port = address.port; + server.close((error) => { + if (error) { + reject(error); + } else { + resolve(port); + } + }); + } else { + reject(new Error("unable to determine ephemeral port")); + } + }); + }); +} + +function* startInspector(examplePath: string, outFile: string) { + let port = yield* call(getAvailablePort); + let cliScript = join(process.cwd(), "cli", "index.ts"); + let inspectorProc = yield* exec(process.execPath, { + arguments: [ + cliScript, + "--inspect-package", + "./loader.ts", + "--inspect-port", + String(port), + "--inspect-pause", + "--inspect-record", + outFile, + examplePath, + ], + cwd: process.cwd(), + env: { ...(process.env as Record), INSPECT_PORT: String(port) }, + }); + + const ready = withResolvers(); + let readyResolved = false; + yield* spawn(function* () { + let subscription = yield* inspectorProc.stderr; + let next = yield* subscription.next(); + while (!next.done) { + const text = new TextDecoder().decode(next.value); + if (text.includes("running at http://localhost")) { + readyResolved = true; + ready.resolve(); + } + next = yield* subscription.next(); + } + if (!readyResolved) { + ready.reject(new Error("inspector exited before ready")); + } + }); + + yield* ready.operation; + return { port, cliScript, inspectorProc }; +} + +function mainDestroyed(snapshots: Array>): boolean { + let mainIds = new Set(); + let createdById = new Map(); + let destroyedIds = new Set(); + + for (let snapshot of snapshots) { + if (snapshot.type === "tree" && Array.isArray(snapshot.value)) { + for (let node of snapshot.value) { + if (node?.data?.["@effection/attributes"]?.name === "Main" && typeof node.id === "string") { + mainIds.add(node.id); + } + } + } + + if ( + snapshot.type === "set" && + snapshot.contextName === "@effection/attributes" && + snapshot.contextValue?.name === "Main" && + typeof snapshot.id === "string" + ) { + mainIds.add(snapshot.id); + } + + if ( + snapshot.type === "created" && + typeof snapshot.id === "string" && + typeof snapshot.parentId === "string" + ) { + createdById.set(snapshot.id, snapshot.parentId); + } + + if ( + (snapshot.type === "destroying" || snapshot.type === "destroyed") && + typeof snapshot.id === "string" + ) { + destroyedIds.add(snapshot.id); + if (mainIds.has(snapshot.id)) { + return true; + } + } + } + + if (mainIds.size === 0) { + return false; + } + + let descendantIds = new Set(mainIds); + let changed = true; + while (changed) { + changed = false; + for (let [id, parentId] of createdById.entries()) { + if (!descendantIds.has(id) && descendantIds.has(parentId)) { + descendantIds.add(id); + changed = true; + } + } + } + + for (let snapshot of snapshots) { + if ( + (snapshot.type === "destroying" || snapshot.type === "destroyed") && + typeof snapshot.id === "string" && + descendantIds.has(snapshot.id) + ) { + return true; + } + } + + return false; +} + +describe("inspector child process", () => { + describe("with program through completion", () => { + let tmpDir: string; + let outFile: string; + let playFile: string; + let watchFile: string; + let port: number; + let cliScript: string; + let inspectorProc: Yielded> | undefined; + + beforeEach(function* () { + tmpDir = join(process.cwd(), "tmp"); + outFile = join(tmpDir, "out.json"); + playFile = join(tmpDir, "call-play.json"); + watchFile = join(tmpDir, "watchScopes.json"); + + let inspector = yield* startInspector("./examples/spawn-children.ts", outFile); + port = inspector.port; + cliScript = inspector.cliScript; + inspectorProc = inspector.inspectorProc; + }); + + it("starts Main and child scopes and shuts down cleanly", function* () { + let playProc = yield* exec(process.execPath, { + arguments: [ + cliScript, + "call", + "play", + "--host", + `http://localhost:${port}`, + "--out", + playFile, + ], + cwd: process.cwd(), + env: process.env as Record, + }); + yield* playProc.expect(); + + yield* inspectorProc!.join(); + + const raw = yield* fs.readTextFile(outFile); + const snapshots = JSON.parse(raw) as Array>; + const foundMain = snapshots.some((snapshot) => + Object.values(snapshot).some( + (node) => node.data?.["@effection/attributes"]?.name === "Main", + ), + ); + const foundInspector = snapshots.some((snapshot) => + Object.values(snapshot).some( + (node) => node.data?.["@effection/attributes"]?.name === "Inspector", + ), + ); + const foundChild = snapshots.some((snapshot) => + Object.values(snapshot).some( + (node) => node.data?.["@effection/attributes"]?.name === "child", + ), + ); + const finalSnapshot = snapshots[snapshots.length - 1]; + const childPresentAtEnd = Object.values(finalSnapshot).some( + (node) => node.data?.["@effection/attributes"]?.name === "child", + ); + const mainPresentAtEnd = Object.values(finalSnapshot).some( + (node) => node.data?.["@effection/attributes"]?.name === "Main", + ); + const inspectorPresentAtEnd = Object.values(finalSnapshot).some( + (node) => node.data?.["@effection/attributes"]?.name === "Inspector", + ); + + assert(foundMain, "expected Main node"); + assert(foundInspector, "expected Inspector node"); + assert(foundChild, "expected child node during run"); + assert(mainPresentAtEnd, "expected Main to remain present in final recorded state"); + assert(inspectorPresentAtEnd, "expected Inspector to remain present in final recorded state"); + assert(!childPresentAtEnd, "expected child scopes to be gone after shutdown"); + }); + + it("captures the same shutdown state through watchScopes", function* () { + let watchTask = yield* spawn(function* () { + let watchProc = yield* exec(process.execPath, { + arguments: [ + cliScript, + "call", + "watchScopes", + "--host", + `http://localhost:${port}`, + "--out", + watchFile, + ], + cwd: process.cwd(), + env: process.env as Record, + }); + yield* watchProc.join(); + }); + + yield* sleep(250); + let playProc = yield* exec(process.execPath, { + arguments: [ + cliScript, + "call", + "play", + "--host", + `http://localhost:${port}`, + "--out", + playFile, + ], + cwd: process.cwd(), + env: process.env as Record, + }); + yield* playProc.expect(); + + yield* inspectorProc!.join(); + try { + yield* watchTask.join(); + } catch { + // watchScopes may terminate with a non-zero exit when the inspector shuts down; + // the output file should still contain the recorded events. + } + + const raw = yield* until(readFile(watchFile, "utf-8")); + const snapshots = JSON.parse(raw) as Array<{ + type: string; + value?: Array<{ data?: { [key: string]: any } }>; + contextName?: string; + contextValue?: { [key: string]: any }; + }>; + const foundMain = snapshots.some( + (snapshot) => + (snapshot.type === "tree" && + snapshot.value?.some( + (node) => node.data?.["@effection/attributes"]?.name === "Main", + )) || + (snapshot.type === "set" && + snapshot.contextName === "@effection/attributes" && + snapshot.contextValue?.name === "Main"), + ); + const foundInspector = snapshots.some( + (snapshot) => + (snapshot.type === "tree" && + snapshot.value?.some( + (node) => node.data?.["@effection/attributes"]?.name === "Inspector", + )) || + (snapshot.type === "set" && + snapshot.contextName === "@effection/attributes" && + snapshot.contextValue?.name === "Inspector"), + ); + const foundChild = snapshots.some( + (snapshot) => + (snapshot.type === "tree" && + snapshot.value?.some( + (node) => node.data?.["@effection/attributes"]?.name === "child", + )) || + (snapshot.type === "set" && + snapshot.contextName === "@effection/attributes" && + snapshot.contextValue?.name === "child"), + ); + const foundMainDestroyed = mainDestroyed(snapshots); + + assert(foundMain, "expected Main node"); + assert(foundInspector, "expected Inspector node"); + assert(foundChild, "expected child node"); + assert(foundMainDestroyed, "expected Main scope to be destroyed in watchScopes"); + }); + }); + + describe("with forever example", () => { + let tmpDir: string; + let outFile: string; + let playFile: string; + let watchFile: string; + let port: number; + let cliScript: string; + let inspectorProc: Yielded> | undefined; + + beforeEach(function* () { + tmpDir = join(process.cwd(), "tmp"); + outFile = join(tmpDir, "out.json"); + playFile = join(tmpDir, "call-play.json"); + watchFile = join(tmpDir, "watchScopes.json"); + + let inspector = yield* startInspector("./examples/forever.ts", outFile); + port = inspector.port; + cliScript = inspector.cliScript; + inspectorProc = inspector.inspectorProc; + }); + + it("starts Main and child scopes and shuts down cleanly after SIGTERM", function* () { + let playProc = yield* exec(process.execPath, { + arguments: [ + cliScript, + "call", + "play", + "--host", + `http://localhost:${port}`, + "--out", + playFile, + ], + cwd: process.cwd(), + env: process.env as Record, + }); + yield* playProc.expect(); + + yield* sleep(150); + if (inspectorProc?.pid) { + process.kill(-inspectorProc.pid, "SIGTERM"); + } + yield* inspectorProc!.join(); + + const raw = yield* fs.readTextFile(outFile); + const snapshots = JSON.parse(raw) as Array>; + const foundMain = snapshots.some((snapshot) => + Object.values(snapshot).some( + (node) => node.data?.["@effection/attributes"]?.name === "Main", + ), + ); + const foundInspector = snapshots.some((snapshot) => + Object.values(snapshot).some( + (node) => node.data?.["@effection/attributes"]?.name === "Inspector", + ), + ); + const foundChild = snapshots.some((snapshot) => + Object.values(snapshot).some( + (node) => node.data?.["@effection/attributes"]?.name === "child", + ), + ); + const finalSnapshot = snapshots[snapshots.length - 1]; + const childPresentAtEnd = Object.values(finalSnapshot).some( + (node) => node.data?.["@effection/attributes"]?.name === "child", + ); + const mainPresentAtEnd = Object.values(finalSnapshot).some( + (node) => node.data?.["@effection/attributes"]?.name === "Main", + ); + const inspectorPresentAtEnd = Object.values(finalSnapshot).some( + (node) => node.data?.["@effection/attributes"]?.name === "Inspector", + ); + + assert(foundMain, "expected Main node"); + assert(foundInspector, "expected Inspector node"); + assert(foundChild, "expected child node during run"); + assert(mainPresentAtEnd, "expected Main to remain present in final recorded state"); + assert(inspectorPresentAtEnd, "expected Inspector to remain present in final recorded state"); + assert(!childPresentAtEnd, "expected child scopes to be gone after shutdown"); + }); + + it("captures the same shutdown state through watchScopes after SIGTERM", function* () { + let watchTask = yield* spawn(function* () { + let watchProc = yield* exec(process.execPath, { + arguments: [ + cliScript, + "call", + "watchScopes", + "--host", + `http://localhost:${port}`, + "--out", + watchFile, + ], + cwd: process.cwd(), + env: process.env as Record, + }); + yield* watchProc.join(); + }); + + yield* sleep(250); + let playProc = yield* exec(process.execPath, { + arguments: [ + cliScript, + "call", + "play", + "--host", + `http://localhost:${port}`, + "--out", + playFile, + ], + cwd: process.cwd(), + env: process.env as Record, + }); + yield* playProc.expect(); + + yield* sleep(150); + if (inspectorProc?.pid) { + process.kill(-inspectorProc.pid, "SIGTERM"); + } + yield* inspectorProc!.join(); + try { + yield* watchTask.join(); + } catch { + // watchScopes may terminate with a non-zero exit when the inspector shuts down; + // the output file should still contain the recorded events. + } + + const raw = yield* until(readFile(watchFile, "utf-8")); + const snapshots = JSON.parse(raw) as Array<{ + type: string; + value?: Array<{ data?: { [key: string]: any } }>; + contextName?: string; + contextValue?: { [key: string]: any }; + }>; + const foundMain = snapshots.some( + (snapshot) => + (snapshot.type === "tree" && + snapshot.value?.some( + (node) => node.data?.["@effection/attributes"]?.name === "Main", + )) || + (snapshot.type === "set" && + snapshot.contextName === "@effection/attributes" && + snapshot.contextValue?.name === "Main"), + ); + const foundInspector = snapshots.some( + (snapshot) => + (snapshot.type === "tree" && + snapshot.value?.some( + (node) => node.data?.["@effection/attributes"]?.name === "Inspector", + )) || + (snapshot.type === "set" && + snapshot.contextName === "@effection/attributes" && + snapshot.contextValue?.name === "Inspector"), + ); + const foundChild = snapshots.some( + (snapshot) => + (snapshot.type === "tree" && + snapshot.value?.some( + (node) => node.data?.["@effection/attributes"]?.name === "child", + )) || + (snapshot.type === "set" && + snapshot.contextName === "@effection/attributes" && + snapshot.contextValue?.name === "child"), + ); + const foundMainDestroyed = mainDestroyed(snapshots); + + assert(foundMain, "expected Main node"); + assert(foundInspector, "expected Inspector node"); + assert(foundChild, "expected child node"); + assert(foundMainDestroyed, "expected Main scope to be destroyed in watchScopes"); + }); + }); +}); diff --git a/tests/sse-server.test.ts b/tests/sse-server.test.ts new file mode 100644 index 0000000..15e7513 --- /dev/null +++ b/tests/sse-server.test.ts @@ -0,0 +1,180 @@ +import { describe, it } from "@effectionx/bdd"; +import { expect } from "expect"; +import { createServer } from "node:net"; +import type { Method, Handle, Inspector } from "../lib/types.ts"; +import { createProtocol } from "../lib/mod.ts"; +import { attach } from "../lib/attach.ts"; +import { createScope, call, suspend, useScope, withResolvers } from "effection"; +import { scope as arktypeScope } from "arktype"; +import { useSSEServer } from "../lib/sse-server.ts"; + +describe("useSSEServer()", () => { + describe("generic echo protocol", () => { + it("keeps an active SSE request alive long enough to flush during shutdown", function* () { + const [serverScope, destroyServerScope] = createScope(); + const [clientScope, destroyClientScope] = createScope(); + + const schema = arktypeScope({ + NoneArr: "never[]", + None: "never", + Str: "string", + }).export(); + + const protocol = createProtocol({ + echo: { + args: schema.NoneArr, + progress: schema.None, + returns: schema.Str, + }, + }); + + const requestStarted = withResolvers(); + const continueRequest = withResolvers(); + + const handle = { + protocol, + methods: {} as any, + invoke() { + return (function* () { + return { + *next() { + requestStarted.resolve(); + yield* continueRequest.operation; + return { done: true, value: "goodbye" }; + }, + }; + })(); + }, + } as unknown as Handle<{ echo: Method }>; + + const addressResolver = withResolvers(); + serverScope.run(function* () { + const address = yield* useSSEServer(handle, { port: yield* call(getAvailablePort) }); + addressResolver.resolve(address); + yield* suspend(); + }); + + const address = yield* addressResolver.operation; + + const requestTask = clientScope.run(function* () { + const response = yield* call(() => + fetch(`${address}/echo`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: "[]", + }), + ); + + const text = yield* call(async () => { + const decoder = new TextDecoder(); + const reader = response.body?.getReader(); + let result = ""; + + if (!reader) { + throw new Error("response body is missing"); + } + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + result += decoder.decode(value, { stream: true }); + } + + return result; + }); + + return { status: response.status, text }; + }); + + yield* requestStarted.operation; + continueRequest.resolve(); + yield* destroyServerScope(); + + const result = yield* requestTask; + yield* destroyClientScope(); + + expect(result.status).toBe(200); + expect(result.text).toContain("event: return"); + expect(result.text).toContain("goodbye"); + }); + }); + + describe("loader path integration", () => { + it("uses attach() and useSSEServer() together and keeps the SSE request alive on shutdown", function* () { + const schema = arktypeScope({ + NoneArr: "never[]", + None: "never", + Str: "string", + }).export(); + + const protocol = createProtocol({ + echo: { + args: schema.NoneArr, + progress: schema.None, + returns: schema.Str, + }, + }); + + const handle: Handle<{ echo: Method }> = { + protocol, + methods: { + echo: function* () { + return { + *next() { + return { done: true, value: "goodbye" }; + }, + }; + }, + } as any, + invoke({ name, args }) { + return this.methods[name](...args); + }, + }; + + const inspector: Inspector<{ echo: Method }> = { + protocol, + *attach(_scope) { + return handle; + }, + }; + + const scope = yield* useScope(); + const addressResolver = withResolvers(); + const detach = yield* attach(scope, inspector, function* (handle) { + const address = yield* useSSEServer(handle, { port: yield* call(getAvailablePort) }); + addressResolver.resolve(address); + }); + + const address = yield* addressResolver.operation; + yield* detach(); + + expect(address).toMatch(/^http:\/\/localhost:\d+$/); + }); + }); +}); + +function getAvailablePort(): Promise { + return new Promise((resolve, reject) => { + const server = createServer(); + + server.once("error", (error) => { + reject(error); + }); + + server.listen(0, () => { + const address = server.address(); + if (address && typeof address === "object") { + const port = address.port; + server.close((error) => { + if (error) { + reject(error); + } else { + resolve(port); + } + }); + } else { + reject(new Error("unable to determine ephemeral port")); + } + }); + }); +} diff --git a/tsconfig.base.json b/tsconfig.base.json index f2ee221..55f6b0e 100644 --- a/tsconfig.base.json +++ b/tsconfig.base.json @@ -1,5 +1,6 @@ { "compilerOptions": { + "types": ["node"], "strict": true, "incremental": true, "composite": true,