diff --git a/packages/rstream/src/subs/transduce.ts b/packages/rstream/src/subs/transduce.ts index 0e0ec0f5bc..0e482a775d 100644 --- a/packages/rstream/src/subs/transduce.ts +++ b/packages/rstream/src/subs/transduce.ts @@ -1,4 +1,4 @@ -import { isReduced } from "@thi.ng/transducers"; +import { isReduced, Reduced } from "@thi.ng/transducers"; import { Subscription } from "../subscription"; import type { Reducer, Transducer } from "@thi.ng/transducers"; @@ -39,7 +39,13 @@ export const transduce = ( sub = src.subscribe( { next(x) { - const _acc = rfn[2](acc, x); + let _acc: C | Reduced; + try { + _acc = rfn[2](acc, x); + } catch (e) { + reject(e); + return; + } if (isReduced(_acc)) { resolve(_acc.deref()); } else { diff --git a/packages/rstream/src/subscription.ts b/packages/rstream/src/subscription.ts index 7799192ef4..8d62d974bb 100644 --- a/packages/rstream/src/subscription.ts +++ b/packages/rstream/src/subscription.ts @@ -9,6 +9,7 @@ import { Reducer, Transducer, unreduced, + Reduced, } from "@thi.ng/transducers"; import { CloseMode, @@ -30,7 +31,8 @@ import { nextID } from "./utils/idgen"; * Most other types in rstream, including {@link Stream}s, are * `Subscription`s and all can be: * - * - linked into directed graphs (sync or async & not necessarily DAGs) + * - connected into directed graphs (sync or async & not necessarily + * DAGs) * - transformed using transducers (incl. support for early termination) * - can have any number of subscribers (optionally each w/ their own * transducers) @@ -40,9 +42,17 @@ import { nextID } from "./utils/idgen"; * subscribers has an error handler itself * - implement the {@link @thi.ng/api#IDeref} interface * + * If a transducer is provided (via the `xform` option), all received + * values will be first processed by the transducer and only its + * transformed result(s) (if any) will be passed to downstream + * subscribers. Any uncaught errors *inside* the transducer will cause + * this subscription's error handler to be called and will stop this + * subscription from receiving any further values (by default, unless + * overridden). + * * Subscription behavior can be customized via the additional (optional) - * options arg. See `CommonOpts` and `SubscriptionOpts` for further - * details. + * options arg. See {@link CommonOpts} and {@link SubscriptionOpts} for + * further details. * * @example * ```ts @@ -244,7 +254,13 @@ export class Subscription next(x: A) { if (this.state < State.DONE) { if (this.xform) { - const acc = this.xform[2]([], x); + let acc: B[] | Reduced; + try { + acc = this.xform[2]([], x); + } catch (e) { + this.error(e); + return; + } const uacc = unreduced(acc); const n = uacc.length; for (let i = 0; i < n; i++) { @@ -260,13 +276,18 @@ export class Subscription done() { LOGGER.debug(this.id, "entering done()"); if (this.state < State.DONE) { - if (this.xform) { - const acc = this.xform[1]([]); - const uacc = unreduced(acc); - const n = uacc.length; - for (let i = 0; i < n; i++) { - this.dispatch(uacc[i]); + try { + if (this.xform) { + const acc = this.xform[1]([]); + const uacc = unreduced(acc); + const n = uacc.length; + for (let i = 0; i < n; i++) { + this.dispatch(uacc[i]); + } } + } catch (e) { + this.error(e); + return; } this.state = State.DONE; for (let s of this.subs.slice()) {