Skip to content

Commit

Permalink
refactor(rstream): fix #201, extract ASidechain
Browse files Browse the repository at this point in the history
- extract abstract ASidechain base class
- update SidechainPartition/Toggle classes
  • Loading branch information
postspectacular committed Feb 10, 2020
1 parent 8ee2139 commit b88fa04
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 41 deletions.
1 change: 1 addition & 0 deletions packages/rstream/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export * from "./from/raf";
export * from "./from/view";
export * from "./from/worker";

export * from "./subs/asidechain";
export * from "./subs/bisect";
export * from "./subs/post-worker";
export * from "./subs/resolve";
Expand Down
27 changes: 27 additions & 0 deletions packages/rstream/src/subs/asidechain.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { CommonOpts } from "../api";
import { Subscription } from "../subscription";

/**
* Abstract base class for sidechained subscription types (e.g.
* {@link sidechainPartition}, {@link sidechainToggle}).
*/
export abstract class ASidechain<A, S, B> extends Subscription<A, B> {
sideSub!: Subscription<any, S>;

constructor(opts?: Partial<CommonOpts>) {
super(undefined, opts);
}

unsubscribe(sub?: Subscription<any, any>) {
const res = super.unsubscribe(sub);
if (!sub || !this.subs.length) {
this.sideSub.unsubscribe();
}
return res;
}

done() {
this.sideSub.unsubscribe();
super.done();
}
}
27 changes: 7 additions & 20 deletions packages/rstream/src/subs/sidechain-partition.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Predicate } from "@thi.ng/api";
import { CommonOpts, ISubscribable, State } from "../api";
import { Subscription } from "../subscription";
import { optsWithID } from "../utils/idgen";
import { ASidechain } from "./asidechain";

export interface SidechainPartitionOpts<T> extends CommonOpts {
pred: Predicate<T>;
Expand Down Expand Up @@ -39,16 +40,15 @@ export const sidechainPartition = <A, B>(
opts?: Partial<SidechainPartitionOpts<B>>
): Subscription<A, A[]> => new SidechainPartition<A, B>(side, opts);

export class SidechainPartition<A, B> extends Subscription<A, A[]> {
sideSub: Subscription<B, B>;
buf: A[];
export class SidechainPartition<T, S> extends ASidechain<T, S, T[]> {
buf: T[];

constructor(
side: ISubscribable<B>,
opts?: Partial<SidechainPartitionOpts<B>>
side: ISubscribable<S>,
opts?: Partial<SidechainPartitionOpts<S>>
) {
opts = optsWithID("sidepart", opts);
super(undefined, opts);
super(opts);
this.buf = [];
const pred = opts.pred || (() => true);
const $this = this;
Expand All @@ -69,22 +69,9 @@ export class SidechainPartition<A, B> extends Subscription<A, A[]> {
});
}

unsubscribe(sub?: Subscription<any, any>) {
const res = super.unsubscribe(sub);
if (!sub || !this.subs.length) {
this.sideSub.unsubscribe();
}
return res;
}

next(x: A) {
next(x: T) {
if (this.state < State.DONE) {
this.buf.push(x);
}
}

done() {
this.sideSub.unsubscribe();
super.done();
}
}
29 changes: 8 additions & 21 deletions packages/rstream/src/subs/sidechain-toggle.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { Predicate } from "@thi.ng/api";
import { CommonOpts, ISubscribable } from "../api";
import { CommonOpts, ISubscribable, State } from "../api";
import { Subscription } from "../subscription";
import { optsWithID } from "../utils/idgen";
import { ASidechain } from "./asidechain";

export interface SidechainToggleOpts<T> extends CommonOpts {
pred: Predicate<T>;
Expand Down Expand Up @@ -41,16 +42,15 @@ export const sidechainToggle = <A, B>(
opts?: Partial<SidechainToggleOpts<B>>
): Subscription<A, A> => new SidechainToggle(side, opts);

export class SidechainToggle<A, B> extends Subscription<A, A> {
sideSub: Subscription<B, B>;
export class SidechainToggle<T, S> extends ASidechain<T, S, T> {
isActive: boolean;

constructor(
side: ISubscribable<B>,
opts?: Partial<SidechainToggleOpts<B>>
side: ISubscribable<S>,
opts?: Partial<SidechainToggleOpts<S>>
) {
opts = optsWithID("sidetoggle", opts);
super(undefined, opts);
super(opts);
this.isActive = !!opts.initial;
const pred = opts.pred || (() => true);
const $this = this;
Expand All @@ -66,22 +66,9 @@ export class SidechainToggle<A, B> extends Subscription<A, A> {
});
}

unsubscribe(sub?: Subscription<any, any>) {
const res = super.unsubscribe(sub);
if (!sub || !this.subs.length) {
this.sideSub.unsubscribe();
}
return res;
}

next(x: A) {
if (this.isActive) {
next(x: T) {
if (this.isActive && this.state < State.DONE) {
super.next(x);
}
}

done() {
super.done();
this.sideSub.unsubscribe();
}
}

0 comments on commit b88fa04

Please sign in to comment.