Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
.DS_Store
.vscode
tmp

/deno.lock
node_modules/
Expand Down
30 changes: 16 additions & 14 deletions cli/commands/call.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,22 @@ export function* call(config: CallType) {
name: name as any,
args: argsList,
});
let next = yield* subscription.next();
// log progress values and collect everything, including final return
while (!next.done) {
results.push(next.value);
yield* log.info(JSON.stringify(next.value));
next = yield* subscription.next();
}

if (out) {
try {
yield* until(writeFile(out, JSON.stringify(results, null, 2)));
} catch (e) {
let msg = e instanceof Error ? e.message : String(e);
yield* log.error(`failed to write ${out}: ${msg}`);
try {
let next = yield* subscription.next();
// log progress values and collect everything, including final return
while (!next.done) {
results.push(next.value);
yield* log.info(JSON.stringify(next.value));
next = yield* subscription.next();
}
} finally {
if (out) {
try {
yield* until(writeFile(out, JSON.stringify(results, null, 2)));
} catch (e) {
let msg = e instanceof Error ? e.message : String(e);
yield* log.error(`failed to write ${out}: ${msg}`);
}
}
}
}
3 changes: 2 additions & 1 deletion cli/commands/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export function run(config: RunType["config"], passthroughArgs: string[]) {
}
});

yield* spawn(function* () {
const recordTask = yield* spawn(function* () {
yield* ready.operation;
if (config.inspectRecord) {
yield* recordNodeMapToFile(host, config.inspectRecord, config.protocol);
Expand All @@ -56,6 +56,7 @@ export function run(config: RunType["config"], passthroughArgs: string[]) {
});

let status = yield* child.join();
yield* recordTask;

yield* provide(status.code);
} finally {
Expand Down
23 changes: 23 additions & 0 deletions debug.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#!/bin/bash
set -euo pipefail
PORT=$(node -e "const net = require('node:net'); const server = net.createServer(); server.listen(0, () => { console.log(server.address().port); server.close(); });")
pnpm cli --inspect-port "$PORT" --inspect-package ./loader.ts --inspect-pause --inspect-record tmp/out.json ./examples/forever.ts >tmp/inspector.log 2>tmp/inspector.err &
INSPECT_PID=$!
sleep 2
set +e
pnpm cli call watchScopes --host http://localhost:$PORT --out tmp/watchScopes.json >tmp/watchScopes.log 2>tmp/watchScopes.err &
WATCH_PID=$!
pnpm cli call recordNodeMap --host http://localhost:$PORT --out tmp/recordNodeMap.json >tmp/recordNodeMap.log 2>tmp/recordNodeMap.err &
RECORD_PID=$!
set -e
sleep 1
pnpm cli call play --host http://localhost:$PORT --out tmp/call-play.json >tmp/call-play.log 2>tmp/call-play.err
sleep 5
kill -INT "$INSPECT_PID" 2>/dev/null || true
sleep 1
pkill -TERM -P "$INSPECT_PID" 2>/dev/null || true
wait $INSPECT_PID 2>/dev/null || true
wait $WATCH_PID 2>/dev/null || true
wait $RECORD_PID 2>/dev/null || true

# pnpm cli --inspect-package ./loader.ts --inspect-pause --inspect-record tmp/out.json ./examples/forever.ts
40 changes: 34 additions & 6 deletions examples/forever.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,56 @@
import { main, suspend, spawn, sleep, useAttributes, type Task } from "effection";

await main(function* () {
yield* useAttributes({ name: "Main" });

try {
yield* spawn(function* () {
yield* useAttributes({ name: "supervisor", role: "watcher" });
yield* sleep(1000);
return "supervisor done";
});

let background = yield* spawn(function* () {
yield* useAttributes({ name: "background", purpose: "heartbeat" });
let beats = 0;
while (true) {
yield* sleep(200);
beats++;
yield* useAttributes({ name: "background", beats });
}
});

let tasks: Task<string>[] = [];
for (let i = 1; i <= 5; i++) {
for (let i = 1; i <= 15; i++) {
let task = yield* spawn(function* () {
yield* useAttributes({ name: "child", number: i });
yield* useAttributes({ name: "child", number: i, category: i % 2 === 0 ? "even" : "odd" });

let helper = yield* spawn(function* () {
yield* useAttributes({ name: "helper", child: i });
yield* sleep(40 * i);
return `helper ${i} done`;
});

yield* sleep(50 * i);
yield* helper.halt();
console.log(`child ${i} done`);
return `child ${i} done`;
});

tasks.push(task);
}

// let some tasks start up
yield* sleep(150);

for (let t of tasks) {
yield* t.halt();
}
yield* background.halt();

console.log("all children halted, but we will never finish...");
console.log("some children halted, but we will never finish...");
// never finish; keeps the loader running which keeps the UI server up during tests
// but also simulates a long-running process that we can attach to
// when this receives a SIGTERM, it will push every effect into the `finally` block
// and start to shut all of the children down, we don't need to call `.halt()` on anything
// but we have halted a few of the children simply to see some variable movement in our tree
yield* suspend();
} finally {
console.log("shutting down, forever was a fallacy");
Expand Down
43 changes: 33 additions & 10 deletions examples/spawn-children.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,42 @@
import { main, sleep, spawn, useAttributes, type Task } from "effection";

await main(function* () {
let tasks: Task<string>[] = [];
for (let i = 1; i <= 10; i++) {
let task = yield* spawn(function* () {
yield* useAttributes({ name: "child", number: i });
let delay = Math.random() * 1000;
yield* sleep(delay);
return `${i} is done`;
let groupTasks: Task<string>[] = [];
for (let group of ["alpha", "beta"]) {
let groupTask = yield* spawn(function* () {
yield* useAttributes({ name: "group", label: group });
let childTasks: Task<string>[] = [];

for (let i = 1; i <= 5; i++) {
let childTask = yield* spawn(function* () {
yield* useAttributes({ name: "child", group, number: i });

let worker = yield* spawn(function* () {
yield* useAttributes({ name: "worker", stage: "inner", number: i });
yield* sleep(Math.random() * 300);
return `worker ${group}-${i} done`;
});

yield* sleep(Math.random() * 120);
yield* worker.halt();
return `${group}-${i} done`;
});

childTasks.push(childTask);
}

for (let child of childTasks) {
yield* sleep(Math.random() * 200);
yield* child.halt();
}

return `${group} done`;
});
tasks.push(task);

groupTasks.push(groupTask);
}

for (let task of tasks) {
yield* sleep(Math.random() * 200);
for (let task of groupTasks) {
yield* task.halt();
}

Expand Down
1 change: 1 addition & 0 deletions lib/implementations/player.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ export const player = createImplementation(protocol, function* (root) {
yield* setContext({ status: "playing" });
yield* cxt.resume();
}
return null;
}),

watchPlayerState: () =>
Expand Down
4 changes: 1 addition & 3 deletions lib/implementations/scope.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import { pipe } from "remeda";
import { createSubject } from "@effectionx/stream-helpers";
import { AttributesContext, getLabels } from "../labels.ts";
import { updateNodeMap } from "../update-node-map.ts";
import { truncate } from "../truncate.ts";

const Id = createContext<string>("@effectionx/inspector.id", "global");
const Children = createContext<Set<Scope>>("@effection/scope.children", new Set());
Expand Down Expand Up @@ -78,7 +77,7 @@ export const scope = createImplementation(protocol, function* (root) {

return {
watchScopes: () =>
pipe(stream, createSubject<ScopeEvent>({ type: "tree", value: readTree(root) }), truncate()),
pipe(stream, createSubject<ScopeEvent>({ type: "tree", value: readTree(root) })),
getScopes: op(function* () {
return readTree(root);
}),
Expand All @@ -87,7 +86,6 @@ export const scope = createImplementation(protocol, function* (root) {
stream,
createSubject<ScopeEvent>({ type: "tree", value: readTree(root) }),
updateNodeMap({}),
truncate(),
),
};
}) as Inspector<typeof protocol.methods>;
Expand Down
22 changes: 22 additions & 0 deletions lib/maybe.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
export type Maybe<T> =
| {
readonly exists: false;
}
| {
readonly exists: true;
readonly value: T;
};

export function Just(): Maybe<void>;
export function Just<T>(value: T): Maybe<T>;
export function Just<T>(value?: T): Maybe<T | undefined> {
if (typeof value === "undefined") {
return { exists: true } as Maybe<T>;
} else {
return { exists: true, value };
}
}

export function Nothing<T = void>(): Maybe<T> {
return { exists: false };
}
2 changes: 1 addition & 1 deletion lib/protocols/player.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ export const protocol = createProtocol({
play: {
args: type("never[]"),
progress: type("never"),
returns: type("undefined"),
returns: type("null"),
},
});
4 changes: 3 additions & 1 deletion lib/sse-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ export function createSSEClient<M extends Methods>(
}

let { type, data } = next.value;
let parsed = JSON.parse(data);
// check so we don't throw on our parse of the data,
// but we will throw in validation instead
let parsed = data === "undefined" ? undefined : JSON.parse(data);
if (type === "yield") {
return {
done: false,
Expand Down
Loading
Loading