Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 4 additions & 9 deletions signals/array.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { timebox } from "@effectionx/timebox";
import { describe, it } from "@effectionx/vitest";
import { expect } from "expect";
import { race, sleep, spawn, withResolvers } from "effection";
import { sleep, spawn, withResolvers } from "effection";

import { createArraySignal } from "./array.ts";

Expand Down Expand Up @@ -37,15 +38,9 @@ describe("array signal", () => {

array.set([1, 2, 3]);

const result = yield* race([
subscription.next(),
(function* () {
yield* sleep(1);
return "sleep won; update not received";
})(),
]);
const result = yield* timebox(10, () => subscription.next());

expect(result).toEqual("sleep won; update not received");
expect(result.timeout).toEqual(true);
resolve();
});

Expand Down
12 changes: 10 additions & 2 deletions signals/package.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
{
"name": "@effectionx/signals",
"description": "Reactive signals and computed values for Effection operations",
"version": "0.5.3",
"keywords": ["reactivity"],
"version": "0.6.0",
"keywords": [
"effection",
"effectionx",
"reactivity",
"signals",
"computed",
"state"
],
"type": "module",
"main": "./dist/mod.js",
"types": "./dist/mod.d.ts",
Expand Down Expand Up @@ -31,6 +38,7 @@
},
"sideEffects": false,
"devDependencies": {
"@effectionx/timebox": "workspace:*",
"@effectionx/vitest": "workspace:*",
"effection": "^4"
}
Expand Down
75 changes: 31 additions & 44 deletions signals/set.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { createSignal, type Operation, resource } from "effection";
import type { ValueSignal } from "./types.ts";
import { type Operation, resource } from "effection";
import { is, Set } from "immutable";
import type { ValueSignal } from "./types.ts";
import { createValueSignal } from "./value.ts";

/**
* A signal that represents a Set.
Expand Down Expand Up @@ -41,50 +42,36 @@ export function createSetSignal<T>(
initial: Array<T> = [],
): Operation<SetSignal<T>> {
return resource(function* (provide) {
const signal = createSignal<Set<T>, void>();
const signal = yield* createValueSignal(Set.of<T>(...initial), {
equals: is,
});

const ref = { current: Set.of<T>(...initial) };

function set(value: Iterable<T>) {
if (is(ref.current, value)) {
return ref.current;
}
ref.current = Set.of<T>(...value);
signal.send(ref.current);
return ref.current;
function set(value: Iterable<T>): Set<T> {
return signal.set(Set.of<T>(...value));
}

try {
yield* provide({
[Symbol.iterator]: signal[Symbol.iterator],
set,
update(updater) {
return set(updater(ref.current));
},
add(item) {
ref.current = ref.current.add(item);
signal.send(ref.current);
return ref.current;
},
difference(items) {
ref.current = ref.current.subtract(items);
signal.send(ref.current);
return ref.current;
},
delete(item) {
if (ref.current.has(item)) {
ref.current = ref.current.delete(item);
signal.send(ref.current);
return true;
}
return false;
},
valueOf() {
return ref.current.toSet();
},
});
} finally {
signal.close();
}
yield* provide({
[Symbol.iterator]: signal[Symbol.iterator],
set,
update(updater) {
return set(updater(signal.valueOf().toSet()));
},
add(item) {
return signal.set(signal.valueOf().add(item));
},
difference(items) {
return signal.set(signal.valueOf().subtract(items));
},
delete(item) {
if (signal.valueOf().has(item)) {
signal.set(signal.valueOf().delete(item));
return true;
}
return false;
},
valueOf() {
return signal.valueOf().toSet();
},
});
});
}
4 changes: 3 additions & 1 deletion signals/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ import type { Stream } from "effection";

/**
* A signal is a stream with set, update, and valueOf methods.
* Subscribing to a signal will yield the current value of the signal.
* Subscribing to a signal yields values as they change.
* Use {@link createSubject} from `@effectionx/stream-helpers` to replay
* the latest value to new subscribers.
*/
export interface ValueSignal<T> extends Stream<T, void> {
/**
Expand Down
81 changes: 63 additions & 18 deletions signals/value.test.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
import {
createChannel,
each,
race,
sleep,
spawn,
withResolvers,
} from "effection";
import { timebox } from "@effectionx/timebox";
import { createChannel, each, spawn, withResolvers } from "effection";
import { describe, it } from "@effectionx/vitest";
import { expect } from "expect";
import { createValueSignal } from "./value.ts";
Expand Down Expand Up @@ -43,21 +37,72 @@ describe("value", () => {
yield* spawn(function* () {
signal.set(true);

let next = yield* race([
subscription.next(),
(function* () {
yield* sleep(1);
return "sleep won; update not received";
})(),
]);
let next = yield* timebox(10, () => subscription.next());

expect(next).toEqual("sleep won; update not received");
expect(next.timeout).toEqual(true);

signal.set(false);

next = yield* subscription.next();
const updated = yield* subscription.next();

expect(next.value).toEqual(false);
expect(updated.value).toEqual(false);
resolve();
});

yield* operation;
});
it("does not emit when setting NaN to NaN", function* () {
expect.assertions(2);
const signal = yield* createValueSignal(Number.NaN);

const { resolve, operation } = withResolvers<void>();

const updates = createChannel<number>();
const subscription = yield* updates;

yield* spawn(function* () {
for (const update of yield* each(signal)) {
yield* updates.send(update);
yield* each.next();
}
});

yield* spawn(function* () {
signal.set(Number.NaN);

const result = yield* timebox(10, () => subscription.next());

expect(result.timeout).toEqual(true);
expect(Number.isNaN(signal.valueOf())).toEqual(true);
resolve();
});

yield* operation;
});
it("treats -0 and +0 as different values", function* () {
expect.assertions(3);
const signal = yield* createValueSignal(-0);

const { resolve, operation } = withResolvers<void>();

const updates = createChannel<number>();
const subscription = yield* updates;

yield* spawn(function* () {
for (const update of yield* each(signal)) {
yield* updates.send(update);
yield* each.next();
}
});

yield* spawn(function* () {
signal.set(0);

const next = yield* subscription.next();

expect(Object.is(next.value, 0)).toEqual(true);
expect(Object.is(next.value, -0)).toEqual(false);
expect(Object.is(signal.valueOf(), 0)).toEqual(true);
resolve();
});

Expand Down
29 changes: 26 additions & 3 deletions signals/value.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,37 @@ import { createSignal, type Operation, resource } from "effection";

import type { ValueSignal } from "./types.ts";

export function createValueSignal<T>(initial: T): Operation<ValueSignal<T>> {
/**
* Configuration for {@link createValueSignal}.
*/
export interface CreateValueSignalOptions<T> {
/**
* Returns `true` when two values should be treated as unchanged.
*
* Defaults to `Object.is`.
*/
equals?: (current: T, next: T) => boolean;
}

/**
* Creates a value-backed signal with configurable equality semantics.
*
* @param initial - Initial signal value.
* @param options - Equality overrides.
* @returns A value signal resource.
*/
export function createValueSignal<T>(
initial: T,
options: CreateValueSignalOptions<T> = {},
): Operation<ValueSignal<T>> {
return resource(function* (provide) {
const signal = createSignal<T, void>();
const equals = options.equals ?? Object.is;

const ref = { current: initial };

function set(value: T) {
if (value !== ref.current) {
function set(value: T): T {
if (!equals(ref.current, value)) {
ref.current = value;

signal.send(ref.current);
Expand Down
27 changes: 9 additions & 18 deletions stream-helpers/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -368,14 +368,19 @@ Subject helper converts any stream into a multicast stream that replays the
latest value to new subscribers. It's analogous to
[RxJS BehaviorSubject](https://www.learnrxjs.io/learn-rxjs/subjects/behaviorsubject).

Applying the subject to a stream returns a resource. Yielding that resource
starts an internal drain that actively tracks the upstream, so late subscribers
always receive the most recent value — even if no other subscriber has pulled
it.

```typescript
import { createSubject } from "@effectionx/stream-helpers";
import { createChannel, spawn } from "effection";
import { createChannel } from "effection";

function* example() {
const subject = createSubject<number>();
const channel = createChannel<number, void>();
const downstream = subject(channel);
const downstream = yield* subject(channel);

// First subscriber
const sub1 = yield* downstream;
Expand All @@ -392,22 +397,8 @@ function* example() {
}
```

Use it with a pipe operator to convert any stream into a behavior subject:

```typescript
import { createSubject, map } from "@effectionx/stream-helpers";
import { pipe } from "remeda";

const subject = createSubject<string>();

const stream = pipe(
source,
map(function* (x) {
return x.toString();
}),
subject,
);
```
> **Note:** `createSubject()` returns a resource, so it must be yielded with
> `yield*`. It cannot be used directly as a pipe transformer.

### Passthrough Tracker

Expand Down
2 changes: 1 addition & 1 deletion stream-helpers/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@effectionx/stream-helpers",
"description": "Type-safe stream operators like filter, map, reduce, and forEach",
"version": "0.8.2",
"version": "0.9.0",
"keywords": ["streams"],
"type": "module",
"main": "./dist/mod.js",
Expand Down
Loading
Loading