Skip to content

Commit

Permalink
feat(rstream): update defWorker() arg types
Browse files Browse the repository at this point in the history
- add `WorkerSource` type alias
- add support for zero-arg fns returning a worker instance
  - helps v. much with Vite's `xxx?worker` imports
  - see /examples/mandelbrot for usage
  • Loading branch information
postspectacular committed Nov 19, 2021
1 parent b2bdd11 commit e07521e
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 8 deletions.
2 changes: 2 additions & 0 deletions packages/rstream/src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -232,3 +232,5 @@ export interface IStream<T> extends ISubscriber<T> {

export type StreamCancel = () => void;
export type StreamSource<T> = (sub: Stream<T>) => StreamCancel | void;

export type WorkerSource = Worker | Blob | Fn0<Worker> | string;
7 changes: 6 additions & 1 deletion packages/rstream/src/defworker.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import { isFunction } from "@thi.ng/checks/is-function";
import type { WorkerSource } from "./api.js";

export const defInlineWorker = (src: string) =>
defWorker(new Blob([src], { type: "text/javascript" }));

export const defWorker = (worker: Worker | string | Blob) =>
export const defWorker = (worker: WorkerSource) =>
worker instanceof Worker
? worker
: isFunction(worker)
? worker()
: new Worker(
worker instanceof Blob ? URL.createObjectURL(worker) : worker
);
9 changes: 7 additions & 2 deletions packages/rstream/src/forkjoin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@ import { map } from "@thi.ng/transducers/map";
import { mapcat } from "@thi.ng/transducers/mapcat";
import { range } from "@thi.ng/transducers/range";
import { transduce } from "@thi.ng/transducers/transduce";
import type { CommonOpts, ISubscribable, ITransformable } from "./api.js";
import type {
CommonOpts,
ISubscribable,
ITransformable,
WorkerSource,
} from "./api.js";
import type { Subscription } from "./subscription.js";
import { sync } from "./sync.js";
import { tunnel } from "./tunnel.js";
Expand Down Expand Up @@ -47,7 +52,7 @@ export interface ForkJoinOpts<IN, MSG, RES, OUT> extends Partial<CommonOpts> {
* string. In the latter two cases, a worker is created
* automatically using `makeWorker()`.
*/
worker: string | Worker | Blob;
worker: WorkerSource;
/**
* Optional max number of workers to use. Defaults to
* `navigator.hardwareConcurrency` (or if unavailable, then 4 as
Expand Down
6 changes: 3 additions & 3 deletions packages/rstream/src/tunnel.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { Fn } from "@thi.ng/api";
import type { Fn, Fn0 } from "@thi.ng/api";
import { State } from "./api.js";
import { __nextID } from "./idgen.js";
import { LOGGER } from "./logger.js";
Expand All @@ -10,7 +10,7 @@ export interface TunnelOpts<A> {
* Tunnelled worker instance, source blob or script URL.
* If `interrupt` is enabled, the worker MUST be given as blob or URL.
*/
src: Worker | Blob | string;
src: Worker | Blob | string | Fn0<Worker>;
/**
* Max. number of worker instances to use. Only useful if
* `interrupt` is disabled. If more than one worker is used,
Expand Down Expand Up @@ -69,7 +69,7 @@ export const tunnel = <A, B>(opts: TunnelOpts<A>) => new Tunnel<A, B>(opts);
*/
export class Tunnel<A, B> extends Subscription<A, B> {
workers: Worker[];
src: Worker | Blob | string;
src: Worker | Blob | string | Fn0<Worker>;
transferables?: Fn<A, any[]>;
terminate: number;
interrupt: boolean;
Expand Down
4 changes: 2 additions & 2 deletions packages/rstream/src/worker.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { WithErrorHandlerOpts } from "./api.js";
import type { WithErrorHandlerOpts, WorkerSource } from "./api.js";
import { defWorker } from "./defworker.js";
import { __optsWithID } from "./idgen.js";
import { LOGGER } from "./logger.js";
Expand Down Expand Up @@ -38,7 +38,7 @@ export interface FromWorkerOpts extends WithErrorHandlerOpts {
* @param opts -
*/
export const fromWorker = <T>(
worker: Worker | Blob | string,
worker: WorkerSource,
opts?: Partial<FromWorkerOpts>
) => {
const _worker = defWorker(worker);
Expand Down

0 comments on commit e07521e

Please sign in to comment.