Skip to content

Commit

Permalink
refactor(rstream): update types/generics
Browse files Browse the repository at this point in the history
- remove default generics from ISubscription due to inference troubles
- add ITransformable.map()
- fix unsubscribe() arg types in various classes
- update tests (generics only)
  • Loading branch information
postspectacular committed Mar 12, 2021
1 parent 36bff90 commit c982288
Show file tree
Hide file tree
Showing 9 changed files with 25 additions and 17 deletions.
8 changes: 6 additions & 2 deletions packages/rstream/src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ export interface ISubscriber<T> {
/**
* Internal use only. Do not use.
*/
__owner?: ISubscription<any>;
__owner?: ISubscription<any, any>;
[id: string]: any;
}

Expand Down Expand Up @@ -210,9 +210,13 @@ export interface ITransformable<B> {
transform<C>(
opts: WithTransform<B, C> & Partial<WithErrorHandlerOpts>
): ISubscription<B, C>;
map<C>(
fn: Fn<B, C>,
opts?: Partial<WithErrorHandlerOpts>
): ISubscription<B, C>;
}

export interface ISubscription<A = any, B = A>
export interface ISubscription<A, B>
extends IDeref<B | undefined>,
ISubscriber<A>,
ISubscribable<B>,
Expand Down
2 changes: 1 addition & 1 deletion packages/rstream/src/metastream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ export class MetaStream<A, B> extends Subscription<A, B> {
}
}

unsubscribe(sub?: Subscription<B, any>) {
unsubscribe(sub?: ISubscription<B, any>) {
if (this.stream && (!sub || this.subs.length === 1)) {
this.detach(!sub);
}
Expand Down
2 changes: 1 addition & 1 deletion packages/rstream/src/stream-merge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ export class StreamMerge<A, B> extends Subscription<A, B> {
return ok;
}

unsubscribe(sub?: Subscription<B, any>) {
unsubscribe(sub?: ISubscription<B, any>) {
if (!sub) {
for (let s of this.sources.values()) {
s.unsubscribe();
Expand Down
2 changes: 1 addition & 1 deletion packages/rstream/src/stream-sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ export class StreamSync<
return <A>res;
}

unsubscribe(sub?: Subscription<B, any>) {
unsubscribe(sub?: ISubscription<B, any>) {
if (!sub) {
LOGGER.debug(this.id, "unsub sources");
for (let s of this.sources.values()) {
Expand Down
2 changes: 1 addition & 1 deletion packages/rstream/src/subs/asidechain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export abstract class ASidechain<A, S, B> extends Subscription<A, B> {
super(undefined, opts);
}

unsubscribe(sub?: Subscription<any, any>) {
unsubscribe(sub?: ISubscription<B, any>) {
const res = super.unsubscribe(sub);
if (!sub || !this.subs.length) {
this.sideSub.unsubscribe();
Expand Down
6 changes: 3 additions & 3 deletions packages/rstream/src/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ export class Subscription<A, B> implements ISubscription<A, B> {
closeIn: CloseMode;
closeOut: CloseMode;
parent?: ISubscription<any, A>;
__owner?: ISubscription<any>;
__owner?: ISubscription<any, any>;

protected xform?: Reducer<B[], A>;
protected cacheLast: boolean;
Expand Down Expand Up @@ -228,7 +228,7 @@ export class Subscription<A, B> implements ISubscription<A, B> {
return this.transform(map(fn), opts || {});
}

unsubscribe(sub?: Partial<ISubscription<B>>) {
unsubscribe(sub?: ISubscription<B, any>) {
return sub ? this.unsubscribeChild(sub) : this.unsubscribeSelf();
}

Expand All @@ -240,7 +240,7 @@ export class Subscription<A, B> implements ISubscription<A, B> {
return true;
}

protected unsubscribeChild(sub: Partial<ISubscription<B>>) {
protected unsubscribeChild(sub: ISubscription<B, any>) {
LOGGER.debug(this.id, "unsub child", sub.id);
const idx = this.subs.indexOf(sub);
if (idx >= 0) {
Expand Down
2 changes: 1 addition & 1 deletion packages/rstream/test/from-iterable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ describe("fromIterable", function () {
});

it("finishes", (_done) => {
let sub: ISubscription = src.subscribe({
let sub: ISubscription<any, any> = src.subscribe({
next() {},
done() {
assert.strictEqual(src.getState(), State.DONE, "src not done");
Expand Down
2 changes: 1 addition & 1 deletion packages/rstream/test/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ describe("Subscription", function () {
});
let state2 = State.IDLE;
let err: any;
let sub2: ISubscription;
let sub2: ISubscription<any, any>;
sub2 = src.subscribe({
next() {
throw 1;
Expand Down
16 changes: 10 additions & 6 deletions packages/rstream/test/utils.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
import * as assert from "assert";
import { ISubscription, State } from "../src";

export const assertState = (x: ISubscription, state: State) =>
export const assertState = (x: ISubscription<any, any>, state: State) =>
assert.strictEqual(x.getState(), state, `${x.id} != ${State[state]}`);

export const assertIdle = (x: ISubscription) => assertState(x, State.IDLE);
export const assertIdle = (x: ISubscription<any, any>) =>
assertState(x, State.IDLE);

export const assertActive = (x: ISubscription) => assertState(x, State.ACTIVE);
export const assertActive = (x: ISubscription<any, any>) =>
assertState(x, State.ACTIVE);

export const assertDone = (x: ISubscription) => assertState(x, State.DONE);
export const assertDone = (x: ISubscription<any, any>) =>
assertState(x, State.DONE);

export const assertUnsub = (x: ISubscription) =>
export const assertUnsub = (x: ISubscription<any, any>) =>
assertState(x, State.UNSUBSCRIBED);

export const assertError = (x: ISubscription) => assertState(x, State.ERROR);
export const assertError = (x: ISubscription<any, any>) =>
assertState(x, State.ERROR);

0 comments on commit c982288

Please sign in to comment.