Skip to content

Commit

Permalink
fix(webtransport): add proper framing
Browse files Browse the repository at this point in the history
WebTransport being a stream-based protocol, the chunking boundaries are
not necessarily preserved. That's why we need a header indicating the
type of the payload (plain text or binary) and its length.

See also: socketio/engine.io@a306db0
  • Loading branch information
darrachequesne committed Aug 1, 2023
1 parent 500085d commit d55c39e
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 63 deletions.
65 changes: 23 additions & 42 deletions lib/transports/webtransport.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,14 @@
import { Transport } from "../transport.js";
import { nextTick } from "./websocket-constructor.js";
import {
encodePacketToBinary,
decodePacketFromBinary,
Packet,
createPacketDecoderStream,
createPacketEncoderStream,
} from "engine.io-parser";
import debugModule from "debug"; // debug()

const debug = debugModule("engine.io-client:webtransport"); // debug()

function shouldIncludeBinaryHeader(packet, encoded) {
// 48 === "0".charCodeAt(0) (OPEN packet type)
// 54 === "6".charCodeAt(0) (NOOP packet type)
return (
packet.type === "message" &&
typeof packet.data !== "string" &&
encoded[0] >= 48 &&
encoded[0] <= 54
);
}

export class WT extends Transport {
private transport: any;
private writer: any;
Expand Down Expand Up @@ -52,10 +41,16 @@ export class WT extends Transport {
// note: we could have used async/await, but that would require some additional polyfills
this.transport.ready.then(() => {
this.transport.createBidirectionalStream().then((stream) => {
const reader = stream.readable.getReader();
this.writer = stream.writable.getWriter();
const decoderStream = createPacketDecoderStream(
Number.MAX_SAFE_INTEGER,
// TODO expose binarytype
"arraybuffer"
);
const reader = stream.readable.pipeThrough(decoderStream).getReader();

let binaryFlag;
const encoderStream = createPacketEncoderStream();
encoderStream.readable.pipeTo(stream.writable);
this.writer = encoderStream.writable.getWriter();

const read = () => {
reader
Expand All @@ -66,15 +61,7 @@ export class WT extends Transport {
return;
}
debug("received chunk: %o", value);
if (!binaryFlag && value.byteLength === 1 && value[0] === 54) {
binaryFlag = true;
} else {
// TODO expose binarytype
this.onPacket(
decodePacketFromBinary(value, binaryFlag, "arraybuffer")
);
binaryFlag = false;
}
this.onPacket(value);
read();
})
.catch((err) => {
Expand All @@ -84,10 +71,11 @@ export class WT extends Transport {

read();

const handshake = this.query.sid ? `0{"sid":"${this.query.sid}"}` : "0";
this.writer
.write(new TextEncoder().encode(handshake))
.then(() => this.onOpen());
const packet: Packet = { type: "open" };
if (this.query.sid) {
packet.data = `{"sid":"${this.query.sid}"}`;
}
this.writer.write(packet).then(() => this.onOpen());
});
});
}
Expand All @@ -99,20 +87,13 @@ export class WT extends Transport {
const packet = packets[i];
const lastPacket = i === packets.length - 1;

encodePacketToBinary(packet, (data) => {
if (shouldIncludeBinaryHeader(packet, data)) {
debug("writing binary header");
this.writer.write(Uint8Array.of(54));
this.writer.write(packet).then(() => {
if (lastPacket) {
nextTick(() => {
this.writable = true;
this.emitReserved("drain");
}, this.setTimeoutFn);
}
debug("writing chunk: %o", data);
this.writer.write(data).then(() => {
if (lastPacket) {
nextTick(() => {
this.writable = true;
this.emitReserved("drain");
}, this.setTimeoutFn);
}
});
});
}
}
Expand Down
38 changes: 19 additions & 19 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
"dependencies": {
"@socket.io/component-emitter": "~3.1.0",
"debug": "~4.3.1",
"engine.io-parser": "~5.1.0",
"engine.io-parser": "~5.2.1",
"ws": "~8.11.0",
"xmlhttprequest-ssl": "~2.0.0"
},
Expand All @@ -63,7 +63,7 @@
"@types/sinonjs__fake-timers": "^6.0.3",
"babel-loader": "^8.2.2",
"blob": "0.0.5",
"engine.io": "^6.5.0-alpha.1",
"engine.io": "^6.5.2-alpha.1",
"expect.js": "^0.3.1",
"express": "^4.17.1",
"mocha": "^10.2.0",
Expand Down
2 changes: 2 additions & 0 deletions test/webtransport.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import { Server } from "engine.io";
import { Socket } from "../build/esm-debug/index.js";
import { generateWebTransportCertificate } from "./util-wt.mjs";
import { createServer } from "http";
import { TransformStream } from "stream/web";

if (typeof window === "undefined") {
global.WebTransport = WebTransport;
global.TransformStream = TransformStream;
}

async function setup(opts, cb) {
Expand Down

0 comments on commit d55c39e

Please sign in to comment.