Skip to content

Commit

Permalink
feat(rstream): update/rename fromView()
Browse files Browse the repository at this point in the history
BREAKING CHANGE: update/rename fromView(), add fromViewUnsafe()

- fromView() now performs type checking on given path and/or view transform
- existing `fromView()` renamed to `fromViewUnsafe()` (this is in line w/ related
  updates in thi.ng/atom & thi.ng/paths pkgs)
  • Loading branch information
postspectacular committed Mar 25, 2020
1 parent 67b7921 commit f5df4ab
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 47 deletions.
4 changes: 2 additions & 2 deletions packages/rstream/src/from/atom.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export interface FromAtomOpts<T> extends CommonOpts {
* emitted on the stream. If `emitFirst` is true (default), also emits
* atom's current value when first subscriber attaches to stream.
*
* Also see {@link fromView}
* Also see {@link fromView}, {@link fromViewUnsafe}
*
* @example
* ```ts
Expand All @@ -59,7 +59,7 @@ export const fromAtom = <T>(
opts = optsWithID("atom", <FromAtomOpts<T>>{
emitFirst: true,
changed: (a, b) => a !== b,
...opts
...opts,
});
return new Stream<T>((stream) => {
atom.addWatch(stream.id, (_, prev, curr) => {
Expand Down
143 changes: 121 additions & 22 deletions packages/rstream/src/from/view.ts
Original file line number Diff line number Diff line change
@@ -1,47 +1,75 @@
import { View } from "@thi.ng/atom";
import { Stream } from "../stream";
import { optsWithID } from "../utils/idgen";
import type { Fn, Path, Predicate2 } from "@thi.ng/api";
import type {
DeepPath,
Fn,
Path,
Path0,
Path1,
Path2,
Path3,
Path4,
Path5,
Path6,
Path7,
Path8,
PathVal1,
PathVal2,
PathVal3,
PathVal4,
PathVal5,
PathVal6,
PathVal7,
PathVal8,
Predicate2,
} from "@thi.ng/api";
import type { ReadonlyAtom } from "@thi.ng/atom";
import type { CommonOpts } from "../api";

export interface FromViewOpts<T> extends Partial<CommonOpts> {
path: Path;
tx?: Fn<any, T>;
equiv?: Predicate2<any>;
export interface FromViewOpts<P, A, B> extends Partial<CommonOpts> {
path: P;
tx?: Fn<A, B>;
equiv?: Predicate2<A>;
}

export type FromViewUnsafeOpts<T> = FromViewOpts<Path, any, T>;

/**
* Similar to {@link fromAtom}, but creates an eager derived view for a
* nested value in atom / cursor and yields stream of its value changes.
*
* @remarks
* Views are readonly and more lightweight versions of
* {@link @thi.ng/atom#Cursor | cursors}. The view checks for value
* changes with given `equiv` predicate ({@link @thi.ng/equiv#equiv} by
* default). If the predicate returns a falsy result, the new value is
* emitted on the stream. The first value emitted is always the
* (possibly transformed) current value at the stream's start time (i.e.
* when the first subscriber attaches).
* changes with given `equiv` predicate (default:
* {@link @thi.ng/equiv#equiv}). If the predicate returns a falsy result
* (i.e. the new value), the new value is emitted on the stream. The
* first value emitted is always the (possibly transformed) current
* value at the stream's start time (i.e. when the first subscriber
* attaches).
*
* If the `tx` option is given, the raw value is first passed to this
* transformer function and its result emitted on the stream instead.
*
* When the stream is cancelled the view is destroyed as well.
*
* Also see {@link @thi.ng/atom#defView}, {@link @thi.ng/atom#defViewUnsafe}
*
* @example
* ```ts
* db = new Atom({ a: 1, b: { c: 2 }});
* const db = defAtom<any>({ a: 1, b: { c: 2 }});
*
* fromView(
* fromViewUnsafe(
* db,
* {
* path: "b.c",
* tx: (x) => x != null ? x : "n/a"
* }).subscribe(trace("view:"))
* tx: (x) => x != null ? String(x) : "n/a"
* }
* ).subscribe(trace("view:"))
* // view: 2
*
* db.swapIn("b.c", (x: number) => x + 1);
* db.swapIn(["b","c"], (x: number) => x + 1);
* // view: 3
*
* db.reset({ a: 10 });
Expand All @@ -51,15 +79,86 @@ export interface FromViewOpts<T> extends Partial<CommonOpts> {
* @param atom -
* @param opts -
*/
export const fromView = <T>(
export const fromViewUnsafe = <T>(
atom: ReadonlyAtom<any>,
opts: FromViewUnsafeOpts<T>
): Stream<T extends undefined ? any : T> => fromView(atom, <any>opts);

/**
* Type checked version of {@link fromViewUnsafe}. Only the first 8 path
* levels are type checked.
*
* @remarks
* Stream value type is inferred from target path or (if given), the
* result type of the optional view transformer (`tx` option).
*
* Also see {@link @thi.ng/atom#defView},
* {@link @thi.ng/atom#defViewUnsafe}
*
* @param parent -
* @param opts -
*/
export function fromView<T, R = undefined>(
parent: ReadonlyAtom<T>,
opts: FromViewOpts<Path0, T, R>
): Stream<R extends undefined ? T : R>;
export function fromView<T, A, R = undefined>(
parent: ReadonlyAtom<T>,
opts: FromViewOpts<Path1<T, A>, PathVal1<T, A>, R>
): Stream<R extends undefined ? PathVal1<T, A> : R>;
export function fromView<T, A, B, R = undefined>(
parent: ReadonlyAtom<T>,
opts: FromViewOpts<Path2<T, A, B>, PathVal2<T, A, B>, R>
): Stream<R extends undefined ? PathVal2<T, A, B> : R>;
export function fromView<T, A, B, C, R = undefined>(
parent: ReadonlyAtom<T>,
opts: FromViewOpts<Path3<T, A, B, C>, PathVal3<T, A, B, C>, R>
): Stream<R extends undefined ? PathVal3<T, A, B, C> : R>;
export function fromView<T, A, B, C, D, R = undefined>(
parent: ReadonlyAtom<T>,
opts: FromViewOpts<Path4<T, A, B, C, D>, PathVal4<T, A, B, C, D>, R>
): Stream<R extends undefined ? PathVal4<T, A, B, C, D> : R>;
export function fromView<T, A, B, C, D, E, R = undefined>(
parent: ReadonlyAtom<T>,
opts: FromViewOpts<Path5<T, A, B, C, D, E>, PathVal5<T, A, B, C, D, E>, R>
): Stream<R extends undefined ? PathVal5<T, A, B, C, D, E> : R>;
export function fromView<T, A, B, C, D, E, F, R = undefined>(
parent: ReadonlyAtom<T>,
opts: FromViewOpts<
Path6<T, A, B, C, D, E, F>,
PathVal6<T, A, B, C, D, E, F>,
R
>
): Stream<R extends undefined ? PathVal6<T, A, B, C, D, E, F> : R>;
export function fromView<T, A, B, C, D, E, F, G, R = undefined>(
parent: ReadonlyAtom<T>,
opts: FromViewOpts<
Path7<T, A, B, C, D, E, F, G>,
PathVal7<T, A, B, C, D, E, F, G>,
R
>
): Stream<R extends undefined ? PathVal7<T, A, B, C, D, E, F, G> : R>;
export function fromView<T, A, B, C, D, E, F, G, H, R = undefined>(
parent: ReadonlyAtom<T>,
opts: FromViewOpts<
Path8<T, A, B, C, D, E, F, G, H>,
PathVal8<T, A, B, C, D, E, F, G, H>,
R
>
): Stream<R extends undefined ? PathVal8<T, A, B, C, D, E, F, G, H> : R>;
export function fromView<T, A, B, C, D, E, F, G, H, R = undefined>(
parent: ReadonlyAtom<T>,
opts: FromViewOpts<DeepPath<T, A, B, C, D, E, F, G, H>, any, R>
): Stream<R extends undefined ? any : R>;
export function fromView(
atom: ReadonlyAtom<any>,
opts: FromViewOpts<T>
): Stream<T> => {
opts = <FromViewOpts<T>>optsWithID("view", opts);
return new Stream<T>((stream) => {
opts: FromViewOpts<Path, any, any>
): Stream<any> {
opts = <FromViewUnsafeOpts<any>>optsWithID("view", opts);
return new Stream((stream) => {
let isActive = true;
const tx = opts.tx;
const view = new View<T>(
const view = new View(
atom,
opts.path,
tx
Expand All @@ -73,4 +172,4 @@ export const fromView = <T>(
view.release();
};
});
};
}
46 changes: 23 additions & 23 deletions packages/rstream/test/stream-sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
take
} from "@thi.ng/transducers";
import * as assert from "assert";
import { TIMEOUT } from "./config";
import {
CloseMode,
fromInterval,
Expand All @@ -16,9 +17,8 @@ import {
State,
stream,
sync,
transduce
transduce,
} from "../src";
import { TIMEOUT } from "./config";

describe("StreamSync", () => {
function adder() {
Expand All @@ -38,34 +38,34 @@ describe("StreamSync", () => {
let a1buf, a2buf;
const db = new Atom<any>({
a1: { ins: { a: 1, b: 2 } },
a2: { ins: { b: 10 } }
a2: { ins: { b: 10 } },
});
const a1 = sync({
src: [
(a = fromView(db, { path: "a1.ins.a" })),
(b = fromView(db, { path: "a1.ins.b" }))
(a = fromView(db, { path: ["a1", "ins", "a"] })),
(b = fromView(db, { path: ["a1", "ins", "b"] })),
],
xform: adder()
xform: adder(),
});
const a1res = a1.subscribe({
next(x) {
a1buf = x;
},
done() {
a1done = true;
}
},
});
const a2 = sync({
src: <any>[a1, (c = fromView(db, { path: "a2.ins.b" }))],
xform: adder()
src: <any>[a1, (c = fromView(db, { path: ["a2", "ins", "b"] }))],
xform: adder(),
});
const res = a2.subscribe({
next(x) {
a2buf = x;
},
done() {
a2done = true;
}
},
});
assert.equal(a1buf, 3);
assert.equal(a2buf, 13);
Expand Down Expand Up @@ -97,7 +97,7 @@ describe("StreamSync", () => {
const src = {
a: stream(),
b: stream(),
c: stream()
c: stream(),
};
const res: any[] = [];
const main = sync({ src, mergeOnly: true }).subscribe({
Expand All @@ -107,10 +107,10 @@ describe("StreamSync", () => {
{ c: 1 },
{ c: 1, b: 2 },
{ c: 1, b: 2, a: 3 },
{ c: 1, b: 2, a: 4 }
{ c: 1, b: 2, a: 4 },
]);
done();
}
},
});

src.c.next(1);
Expand All @@ -124,12 +124,12 @@ describe("StreamSync", () => {
const src = {
a: stream(),
b: stream(),
c: stream()
c: stream(),
};
const res: any[] = [];
const main = sync({
src,
mergeOnly: true
mergeOnly: true,
})
.transform(
// ensure `a` & `b` are present
Expand All @@ -140,10 +140,10 @@ describe("StreamSync", () => {
done: () => {
assert.deepEqual(res, [
{ c: 1, b: 2, a: 3 },
{ c: 1, b: 2, a: 4 }
{ c: 1, b: 2, a: 4 },
]);
done();
}
},
});

src.c.next(1);
Expand All @@ -162,8 +162,8 @@ describe("StreamSync", () => {
src: {
t: fromInterval(5),
a: fromPromise(delayed("aa", 20)),
b: fromPromise(delayed("bb", 40))
}
b: fromPromise(delayed("bb", 40)),
},
}),
comp(
take(1),
Expand All @@ -180,18 +180,18 @@ describe("StreamSync", () => {
const main = sync<number, any>({
src: [
fromIterable([1, 2, 3], { delay: TIMEOUT, id: "a" }),
fromIterable([1, 2, 3, 4], { delay: TIMEOUT, id: "b" })
fromIterable([1, 2, 3, 4], { delay: TIMEOUT, id: "b" }),
],
closeIn: CloseMode.NEVER,
closeOut: CloseMode.NEVER,
reset: true
reset: true,
});

const acc: any[] = [];
const sub = main.subscribe({
next(x) {
acc.push(x);
}
},
});

setTimeout(() => sub.unsubscribe(), 3.5 * TIMEOUT);
Expand All @@ -200,7 +200,7 @@ describe("StreamSync", () => {
assert.deepEqual(acc, [
{ a: 1, b: 1 },
{ a: 2, b: 2 },
{ a: 3, b: 3 }
{ a: 3, b: 3 },
]);
done();
}, 5 * TIMEOUT);
Expand Down

0 comments on commit f5df4ab

Please sign in to comment.