Skip to content

Commit

Permalink
feat(rstream): add CloseMode enum, update StreamMerge, StreamSync & opts
Browse files Browse the repository at this point in the history
  • Loading branch information
postspectacular committed Mar 3, 2019
1 parent c414423 commit f0d53b4
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 16 deletions.
11 changes: 10 additions & 1 deletion packages/rstream/src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,15 @@ export const enum State {
DISABLED // TODO currently unused
}

/**
* Closing behavior for `StreamMerge` and `StreamSync`.
*/
export const enum CloseMode {
NEVER,
FIRST,
LAST
}

/**
* Reverse lookup for `State` enums
*/
Expand Down Expand Up @@ -50,4 +59,4 @@ export interface IStream<T> extends ISubscriber<T> {
export type StreamCancel = () => void;
export type StreamSource<T> = (sub: Stream<T>) => StreamCancel;

export let DEBUG = false;
export let DEBUG = true;
29 changes: 22 additions & 7 deletions packages/rstream/src/stream-merge.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { IID } from "@thi.ng/api";
import { Transducer } from "@thi.ng/transducers";
import { ISubscribable, State } from "./api";
import { CloseMode, ISubscribable, State } from "./api";
import { Subscription } from "./subscription";
import { closeMode } from "./utils/close";
import { nextID } from "./utils/idgen";

export interface StreamMergeOpts<A, B> extends IID<string> {
Expand All @@ -18,16 +19,23 @@ export interface StreamMergeOpts<A, B> extends IID<string> {
* exhausted. Set to false to keep the instance alive, regardless of
* inputs.
*/
close: boolean;

/**
* If false or `CloseMode.NEVER`, StreamMerge stays active even if
* all inputs are done. If true (default) or `CloseMode.LAST`, the
* StreamMerge closes when the last input is done. If
* `CloseMode.FIRST`, the instance closes when the first input is
* done.
*/
close: boolean | CloseMode;
}

/**
* Returns a new `StreamMerge` instance, a subscription type consuming
* inputs from multiple inputs and passing received values on to any
* subscribers. Input streams can be added and removed dynamically. By
* default, `StreamMerge` calls `done()` when the last active input is
* done, but this behavior can be overridden via the `close` option (set
* it to `false`).
* done, but this behavior can be overridden via the `close` option.
*
* ```
* merge({
Expand Down Expand Up @@ -66,19 +74,23 @@ export interface StreamMergeOpts<A, B> extends IID<string> {
* // ["a", 3]
* // ["b", 30]
* ```
*
* @see StreamMergeOpts
*
* @param opts
*/
export const merge = <A, B>(opts?: Partial<StreamMergeOpts<A, B>>) =>
new StreamMerge(opts);

export class StreamMerge<A, B> extends Subscription<A, B> {
sources: Map<ISubscribable<A>, Subscription<A, any>>;
autoClose: boolean;
closeMode: CloseMode;

constructor(opts?: Partial<StreamMergeOpts<A, B>>) {
opts = opts || {};
super(null, opts.xform, null, opts.id || `streammerge-${nextID()}`);
this.sources = new Map();
this.autoClose = opts.close !== false;
this.closeMode = closeMode(opts.close);
if (opts.src) {
this.addAll(opts.src);
}
Expand Down Expand Up @@ -159,7 +171,10 @@ export class StreamMerge<A, B> extends Subscription<A, B> {

protected markDone(src: ISubscribable<A>) {
this.remove(src);
if (this.autoClose && !this.sources.size) {
if (
this.closeMode === CloseMode.FIRST ||
(this.closeMode === CloseMode.LAST && !this.sources.size)
) {
this.done();
}
}
Expand Down
30 changes: 22 additions & 8 deletions packages/rstream/src/stream-sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import {
partitionSync,
Transducer
} from "@thi.ng/transducers";
import { ISubscribable, State } from "./api";
import { CloseMode, ISubscribable, State } from "./api";
import { Subscription } from "./subscription";
import { closeMode } from "./utils/close";
import { nextID } from "./utils/idgen";

export interface StreamSyncOpts<A, B> extends IID<string> {
Expand Down Expand Up @@ -43,9 +44,13 @@ export interface StreamSyncOpts<A, B> extends IID<string> {
*/
all: boolean;
/**
* If false, StreamSync stays active even if all inputs are done.
* If false or `CloseMode.NEVER`, StreamSync stays active even if
* all inputs are done. If true (default) or `CloseMode.LAST`, the
* StreamSync closes when the last input is done. If
* `CloseMode.FIRST`, the instance closes when the first input is
* done.
*/
close: boolean;
close: boolean | CloseMode;
}

/**
Expand All @@ -68,8 +73,7 @@ export interface StreamSyncOpts<A, B> extends IID<string> {
*
* Any done inputs are automatically removed. By default, `StreamSync`
* calls `done()` when the last active input is done, but this behavior
* can be overridden via the `close` constructor option (set to
* `false`).
* can be overridden via the `close` constructor option.
*
* ```ts
* const a = rs.stream();
Expand All @@ -91,6 +95,10 @@ export interface StreamSyncOpts<A, B> extends IID<string> {
* The synchronization is done via the `partitionSync()` transducer from
* the @thi.ng/transducers package. See this function's docs for further
* details.
*
* @see StreamSyncOpts
*
* @param opts
*/
export const sync = <A, B>(opts: Partial<StreamSyncOpts<A, B>>) =>
new StreamSync(opts);
Expand All @@ -117,7 +125,10 @@ export class StreamSync<A, B> extends Subscription<A, B> {
* these IDs are used to label inputs in result tuple
*/
sourceIDs: Set<string>;
autoClose: boolean;
/**
* closing behavior
*/
closeMode: CloseMode;

constructor(opts: Partial<StreamSyncOpts<A, B>>) {
let srcIDs = new Set<string>();
Expand All @@ -139,7 +150,7 @@ export class StreamSync<A, B> extends Subscription<A, B> {
this.invRealSourceIDs = new Map();
this.idSources = new Map();
this.sourceIDs = srcIDs;
this.autoClose = opts.close !== false;
this.closeMode = closeMode(opts.close);
if (opts.src) {
this.addAll(opts.src);
}
Expand Down Expand Up @@ -263,7 +274,10 @@ export class StreamSync<A, B> extends Subscription<A, B> {

protected markDone(src: ISubscribable<A>) {
this.remove(src);
if (this.autoClose && !this.sources.size) {
if (
this.closeMode === CloseMode.FIRST ||
(this.closeMode === CloseMode.LAST && !this.sources.size)
) {
this.done();
}
}
Expand Down
8 changes: 8 additions & 0 deletions packages/rstream/src/utils/close.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { CloseMode } from "../api";

export const closeMode = (close: boolean | CloseMode) =>
close === true || close === undefined
? CloseMode.LAST
: close === false
? CloseMode.NEVER
: close;

0 comments on commit f0d53b4

Please sign in to comment.