Skip to content

Commit

Permalink
fix(rstream): update MetaStream unsub handling
Browse files Browse the repository at this point in the history
  • Loading branch information
postspectacular committed Mar 3, 2019
1 parent f0d53b4 commit b2e6e6f
Showing 1 changed file with 36 additions and 22 deletions.
58 changes: 36 additions & 22 deletions packages/rstream/src/metastream.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Fn } from "@thi.ng/api";
import { State } from "./api";
import { Subscription } from "./subscription";
import { nextID } from "./utils/idgen";

Expand Down Expand Up @@ -86,34 +87,47 @@ export class MetaStream<A, B> extends Subscription<A, B> {
}

next(x: A) {
if (this.stream) {
this.stream.unsubscribe(this.sub);
}
let stream = this.factory(x);
if (stream) {
this.stream = stream;
this.sub = this.stream.subscribe({
next: (x) => {
stream === this.stream && super.dispatch(x);
},
done: () => {
this.stream.unsubscribe(this.sub);
if (stream === this.stream) {
this.stream = null;
this.sub = null;
}
},
error: (e) => super.error(e)
});
if (this.state < State.DONE) {
if (this.stream) {
this.stream.unsubscribe(this.sub);
}
let stream = this.factory(x);
if (stream) {
this.stream = stream;
this.sub = this.stream.subscribe({
next: (x) => {
stream === this.stream && super.dispatch(x);
},
done: () => {
this.stream.unsubscribe(this.sub);
if (stream === this.stream) {
this.stream = null;
this.sub = null;
}
},
error: (e) => super.error(e)
});
}
}
}

done() {
if (this.stream) {
this.stream.unsubscribe(this.sub);
delete this.stream;
delete this.sub;
this.detach();
}
super.done();
}

unsubscribe(sub?: Subscription<B, any>) {
if (this.stream && (!sub || this.subs.length === 1)) {
this.detach();
}
return super.unsubscribe();
}

protected detach() {
this.stream.unsubscribe(this.sub);
delete this.stream;
delete this.sub;
}
}

0 comments on commit b2e6e6f

Please sign in to comment.