Skip to content

Commit

Permalink
feat(rstream): add emitLast metastream option
Browse files Browse the repository at this point in the history
  • Loading branch information
postspectacular committed Jun 14, 2020
1 parent 5110d50 commit 1073735
Showing 1 changed file with 25 additions and 5 deletions.
30 changes: 25 additions & 5 deletions packages/rstream/src/metastream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,17 @@ import { CloseMode, CommonOpts, State } from "./api";
import { Subscription } from "./subscription";
import { optsWithID } from "./utils/idgen";

export interface MetaStreamOpts extends CommonOpts {
/**
* If true, emits the last received value from the metastream's
* current child stream (if any) when the metastream's parent is
* calling `.done()`.
*
* @defaultValue false
*/
emitLast: boolean;
}

/**
* Returns a {@link Subscription} which transforms each incoming value
* into a new {@link Stream}, subscribes to it (via an hidden / internal
Expand Down Expand Up @@ -87,20 +98,24 @@ import { optsWithID } from "./utils/idgen";
*/
export const metaStream = <A, B>(
factory: Fn<A, Subscription<B, B>>,
opts?: Partial<CommonOpts>
opts?: Partial<MetaStreamOpts>
) => new MetaStream(factory, opts);

export class MetaStream<A, B> extends Subscription<A, B> {
factory: Fn<A, Subscription<B, B>>;
stream?: Subscription<B, B>;
sub?: Subscription<B, B>;
emitLast: boolean;
doneRequested: boolean;

constructor(
factory: Fn<A, Subscription<B, B>>,
opts?: Partial<CommonOpts>
opts: Partial<MetaStreamOpts> = {}
) {
super(undefined, optsWithID("metastram", opts));
this.factory = factory;
this.emitLast = opts.emitLast === true;
this.doneRequested = false;
}

next(x: A) {
Expand All @@ -114,6 +129,7 @@ export class MetaStream<A, B> extends Subscription<A, B> {
this.sub = this.stream.subscribe({
next: (x) => {
stream === this.stream && super.dispatch(x);
this.doneRequested && this.done();
},
done: () => {
this.stream!.unsubscribe(this.sub);
Expand All @@ -130,10 +146,14 @@ export class MetaStream<A, B> extends Subscription<A, B> {
}

done() {
if (this.stream) {
this.detach(true);
if (this.emitLast && !this.doneRequested) {
this.doneRequested = true;
} else {
if (this.stream) {
this.detach(true);
}
this.closeIn !== CloseMode.NEVER && super.done();
}
this.closeIn !== CloseMode.NEVER && super.done();
}

unsubscribe(sub?: Subscription<B, any>) {
Expand Down

0 comments on commit 1073735

Please sign in to comment.