Skip to content

Commit

Permalink
feat(rstream): add error handling for transducer phase
Browse files Browse the repository at this point in the history
- update Subscription.next()
- update transduce()
  • Loading branch information
postspectacular committed Jun 8, 2020
1 parent dc37a19 commit 609424e
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 12 deletions.
10 changes: 8 additions & 2 deletions packages/rstream/src/subs/transduce.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { isReduced } from "@thi.ng/transducers";
import { isReduced, Reduced } from "@thi.ng/transducers";
import { Subscription } from "../subscription";
import type { Reducer, Transducer } from "@thi.ng/transducers";

Expand Down Expand Up @@ -39,7 +39,13 @@ export const transduce = <A, B, C>(
sub = src.subscribe(
{
next(x) {
const _acc = rfn[2](acc, x);
let _acc: C | Reduced<C>;
try {
_acc = rfn[2](acc, x);
} catch (e) {
reject(e);
return;
}
if (isReduced(_acc)) {
resolve(_acc.deref());
} else {
Expand Down
41 changes: 31 additions & 10 deletions packages/rstream/src/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
Reducer,
Transducer,
unreduced,
Reduced,
} from "@thi.ng/transducers";
import {
CloseMode,
Expand All @@ -30,7 +31,8 @@ import { nextID } from "./utils/idgen";
* Most other types in rstream, including {@link Stream}s, are
* `Subscription`s and all can be:
*
* - linked into directed graphs (sync or async & not necessarily DAGs)
* - connected into directed graphs (sync or async & not necessarily
* DAGs)
* - transformed using transducers (incl. support for early termination)
* - can have any number of subscribers (optionally each w/ their own
* transducers)
Expand All @@ -40,9 +42,17 @@ import { nextID } from "./utils/idgen";
* subscribers has an error handler itself
* - implement the {@link @thi.ng/api#IDeref} interface
*
* If a transducer is provided (via the `xform` option), all received
* values will be first processed by the transducer and only its
* transformed result(s) (if any) will be passed to downstream
* subscribers. Any uncaught errors *inside* the transducer will cause
* this subscription's error handler to be called and will stop this
* subscription from receiving any further values (by default, unless
* overridden).
*
* Subscription behavior can be customized via the additional (optional)
* options arg. See `CommonOpts` and `SubscriptionOpts` for further
* details.
* options arg. See {@link CommonOpts} and {@link SubscriptionOpts} for
* further details.
*
* @example
* ```ts
Expand Down Expand Up @@ -244,7 +254,13 @@ export class Subscription<A, B>
next(x: A) {
if (this.state < State.DONE) {
if (this.xform) {
const acc = this.xform[2]([], x);
let acc: B[] | Reduced<B[]>;
try {
acc = this.xform[2]([], x);
} catch (e) {
this.error(e);
return;
}
const uacc = unreduced(acc);
const n = uacc.length;
for (let i = 0; i < n; i++) {
Expand All @@ -260,13 +276,18 @@ export class Subscription<A, B>
done() {
LOGGER.debug(this.id, "entering done()");
if (this.state < State.DONE) {
if (this.xform) {
const acc = this.xform[1]([]);
const uacc = unreduced(acc);
const n = uacc.length;
for (let i = 0; i < n; i++) {
this.dispatch(uacc[i]);
try {
if (this.xform) {
const acc = this.xform[1]([]);
const uacc = unreduced(acc);
const n = uacc.length;
for (let i = 0; i < n; i++) {
this.dispatch(uacc[i]);
}
}
} catch (e) {
this.error(e);
return;
}
this.state = State.DONE;
for (let s of this.subs.slice()) {
Expand Down

0 comments on commit 609424e

Please sign in to comment.