Skip to content

Commit

Permalink
feat(rstream): add .transform() error handler opt (#276)
Browse files Browse the repository at this point in the history
BREAKING CHANGE: update ISubscribable contract, remove transducer
only version of `.subscribe()`, MUST provide dummy sub w/ transducer
OR (better) use `.transform()` instead (which also more clearly
communicates intention...)

- another breaking change side effect for `.subscribe()`:
  `next()` handlers MUST be provided now in first arg (child sub), this
   is NOT because they're mandatory now, but TS won't be able to
   correctly infer arg types when using `Partial<ISubscriber<T>>`
- add `DUMMY` subscriber constant w/ empty `next() {}`
- simplify internal `.subscribe()` logic
- add `WithErrorHandlerOpts` interface
- update `.transform()` & `.map()`: add error handling support
  • Loading branch information
postspectacular committed Mar 5, 2021
1 parent 594d806 commit 22c6f7c
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 52 deletions.
15 changes: 10 additions & 5 deletions packages/rstream/src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ export interface TransformableOpts<A, B> extends CommonOpts {
xform: Transducer<A, B>;
}

export interface WithErrorHandlerOpts extends CommonOpts {
/**
* Optional error handler to use for this
*/
error: Fn<any, void>;
}

export interface SubscriptionOpts<A, B> extends TransformableOpts<A, B> {
/**
* Parent stream / subscription.
Expand All @@ -96,18 +103,14 @@ export interface ISubscriber<T> {

export interface ISubscribable<T> extends IDeref<T | undefined>, IID<string> {
subscribe(
sub: Partial<ISubscriber<T>>,
sub: ISubscriber<T>,
opts?: Partial<CommonOpts>
): Subscription<T, T>;
subscribe<C>(
sub: Partial<ISubscriber<T>>,
xform: Transducer<T, C>,
opts?: Partial<CommonOpts>
): Subscription<T, C>;
subscribe<C>(
xform: Transducer<T, C>,
opts?: Partial<CommonOpts>
): Subscription<T, C>;
subscribe<C>(sub: Subscription<T, C>): Subscription<T, C>;
unsubscribe(sub?: Partial<ISubscriber<T>>): boolean;
getState(): State;
Expand Down Expand Up @@ -152,3 +155,5 @@ export type StreamSource<T> = (sub: Stream<T>) => StreamCancel | void;
export let LOGGER = NULL_LOGGER;

export const setLogger = (logger: ILogger) => (LOGGER = logger);

export const DUMMY: ISubscriber<any> = { next() {} };
10 changes: 3 additions & 7 deletions packages/rstream/src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,26 +105,22 @@ export class Stream<T> extends Subscription<T, T> implements IStream<T> {
}

subscribe(
sub: Partial<ISubscriber<T>>,
sub: ISubscriber<T>,
opts?: Partial<CommonOpts>
): Subscription<T, T>;
subscribe<C>(sub: Subscription<T, C>): Subscription<T, C>;
subscribe<C>(
xform: Transducer<T, C>,
opts?: Partial<CommonOpts>
): Subscription<T, C>;
subscribe<C>(
sub: Partial<ISubscriber<C>>,
xform: Transducer<T, C>,
opts?: Partial<CommonOpts>
): Subscription<T, C>;
subscribe(...args: any[]): any {
const wrapped = super.subscribe.apply(this, <any>args);
const sub = super.subscribe.apply(this, <any>args);
if (!this._inited) {
this._cancel = (this.src && this.src(this)) || (() => void 0);
this._inited = true;
}
return wrapped;
return sub;
}

unsubscribe(sub?: Subscription<T, any>) {
Expand Down
83 changes: 43 additions & 40 deletions packages/rstream/src/subscription.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Fn, IDeref, NULL_LOGGER, SEMAPHORE } from "@thi.ng/api";
import { peek } from "@thi.ng/arrays";
import { implementsFunction, isFunction, isPlainObject } from "@thi.ng/checks";
import { implementsFunction, isPlainObject } from "@thi.ng/checks";
import { illegalArity, illegalState } from "@thi.ng/errors";
import {
comp,
Expand All @@ -21,8 +21,9 @@ import {
LOGGER,
State,
SubscriptionOpts,
WithErrorHandlerOpts,
} from "./api";
import { nextID } from "./utils/idgen";
import { nextID, optsWithID } from "./utils/idgen";

/**
* Creates a new {@link Subscription} instance, the fundamental datatype
Expand Down Expand Up @@ -128,54 +129,44 @@ export class Subscription<A, B>

/**
* Creates new child subscription with given subscriber and/or
* transducer and optional subscription ID.
* transducer and options.
*/
subscribe(
sub: Partial<ISubscriber<B>>,
sub: ISubscriber<B>,
opts?: Partial<CommonOpts>
): Subscription<B, B>;
subscribe<C>(sub: Subscription<B, C>): Subscription<B, C>;
subscribe<C>(
xform: Transducer<B, C>,
opts?: Partial<CommonOpts>
): Subscription<B, C>;
subscribe<C>(
sub: Partial<ISubscriber<C>>,
xform: Transducer<B, C>,
opts?: Partial<CommonOpts>
): Subscription<B, C>;
subscribe(...args: any[]): any {
this.ensureState();
let sub: Subscription<any, any> | undefined;
let sub: ISubscriber<any> = args[0];
!peek(args) && args.pop();
const opts: Partial<SubscriptionOpts<any, any>> =
args.length > 1 && isPlainObject(peek(args))
? { ...args.pop() }
: {};
switch (args.length) {
case 1:
if (isFunction(args[0])) {
opts.xform = args[0];
!opts.id && (opts.id = `xform-${nextID()}`);
} else {
sub = args[0];
}
break;
case 2:
sub = args[0];
opts.xform = args[1];
break;
default:
illegalArity(args.length);
}
if (implementsFunction(sub!, "subscribe") && !opts.xform) {
sub!.parent = this;
let $sub: Subscription<any, any>;
if (implementsFunction(sub, "subscribe") && !opts.xform) {
$sub = <Subscription<any, any>>sub;
$sub.parent = this;
} else {
// FIXME inherit options from this sub or defaults?
sub = subscription<B, B>(sub, { parent: this, ...opts });
$sub = subscription<B, B>(sub, { parent: this, ...opts });
}
this.last !== SEMAPHORE && sub!.next(this.last);
return this.addWrapped(sub!);
this.last !== SEMAPHORE && $sub.next(this.last);
return this.addWrapped($sub);
}

/**
Expand All @@ -200,21 +191,30 @@ export class Subscription<A, B>
*
* Shorthand for `subscribe(comp(xf1, xf2,...), id)`
*/
transform<C>(
a: Transducer<B, C>,
opts?: Partial<CommonOpts>
): Subscription<B, C>;
// prettier-ignore
transform<C, D>(a: Transducer<B, C>, b: Transducer<C, D>, opts?: Partial<CommonOpts>): Subscription<B, D>;
transform<C>(a: Transducer<B, C>, opts?: Partial<WithErrorHandlerOpts>): Subscription<B, C>;
// prettier-ignore
transform<C, D>(a: Transducer<B, C>, b: Transducer<C, D>, opts?: Partial<WithErrorHandlerOpts>): Subscription<B, D>;
// prettier-ignore
transform<C, D, E>(a: Transducer<B, C>, b: Transducer<C, D>, c: Transducer<D, E>, opts?: Partial<CommonOpts>): Subscription<B, E>;
transform<C, D, E>(a: Transducer<B, C>, b: Transducer<C, D>, c: Transducer<D, E>, opts?: Partial<WithErrorHandlerOpts>): Subscription<B, E>;
// prettier-ignore
transform<C, D, E, F>(a: Transducer<B, C>, b: Transducer<C, D>, c: Transducer<D, E>, d: Transducer<E, F>, opts?: Partial<CommonOpts>): Subscription<B, F>;
transform(...xf: any[]) {
const n = xf.length - 1;
return isPlainObject(xf[n])
? this.subscribe((<any>comp)(...xf.slice(0, n)), xf[n])
: this.subscribe((<any>comp)(...xf));
transform<C, D, E, F>(a: Transducer<B, C>, b: Transducer<C, D>, c: Transducer<D, E>, d: Transducer<E, F>, opts?: Partial<WithErrorHandlerOpts>): Subscription<B, F>;
transform(...args: any[]) {
let sub: Partial<ISubscriber<B>> | undefined;
let opts: Partial<SubscriptionOpts<any, any>>;
const n = args.length - 1;
if (isPlainObject(args[n])) {
opts = optsWithID(`xform`, {
...args[n],
// @ts-ignore
xform: comp(...args.slice(0, n)),
});
sub = { error: (<WithErrorHandlerOpts>opts).error };
} else {
// @ts-ignore
opts = { xform: comp(...args) };
}
return this.subscribe(<any>sub, opts);
}

/**
Expand All @@ -225,8 +225,11 @@ export class Subscription<A, B>
* @param fn
* @param opts
*/
map<C>(fn: Fn<B, C>, opts?: Partial<CommonOpts>): Subscription<B, C> {
return this.subscribe(map(fn), opts);
map<C>(
fn: Fn<B, C>,
opts?: Partial<WithErrorHandlerOpts>
): Subscription<B, C> {
return this.transform(map(fn), opts);
}

/**
Expand Down Expand Up @@ -343,18 +346,18 @@ export class Subscription<A, B>
}
}

protected addWrapped(wrapped: Subscription<any, any>) {
this.subs.push(wrapped);
protected addWrapped(sub: Subscription<any, any>) {
this.subs.push(sub);
this.state = State.ACTIVE;
return wrapped;
return sub;
}

protected dispatch(x: B) {
// LOGGER.debug(this.id, "dispatch", x);
this.cacheLast && (this.last = x);
const subs = this.subs;
let n = subs.length;
let s: ISubscriber<B>;
let s: Partial<ISubscriber<B>>;
if (n === 1) {
s = subs[0];
try {
Expand Down

0 comments on commit 22c6f7c

Please sign in to comment.