Implicitly flush Websock if needed

Callers shouldn't have to deal with the internal buffering limits of
Websock, so implicitly flush the buffer if more room is needed.
This commit is contained in:
Pierre Ossman 2023-05-30 20:32:31 +02:00
parent f8b65f9fe1
commit ccef89f556
3 changed files with 139 additions and 19 deletions

View File

@ -3047,23 +3047,8 @@ RFB.messages = {
} }
sock.sQpush32(length); sock.sQpush32(length);
sock.sQpushBytes(data);
// We have to keep track of from where in the data we begin creating the
// buffer for the flush in the next iteration.
let dataOffset = 0;
let remaining = data.length;
while (remaining > 0) {
let flushSize = Math.min(remaining, (sock._sQbufferSize - sock._sQlen));
sock.sQpushBytes(data.subarray(dataOffset, dataOffset + flushSize));
sock.flush(); sock.flush();
remaining -= flushSize;
dataOffset += flushSize;
}
}, },
setDesktopSize(sock, width, height, id, flags) { setDesktopSize(sock, width, height, id, flags) {

View File

@ -176,15 +176,18 @@ export default class Websock {
// Send Queue // Send Queue
sQpush8(num) { sQpush8(num) {
this._sQensureSpace(1);
this._sQ[this._sQlen++] = num; this._sQ[this._sQlen++] = num;
} }
sQpush16(num) { sQpush16(num) {
this._sQensureSpace(2);
this._sQ[this._sQlen++] = (num >> 8) & 0xff; this._sQ[this._sQlen++] = (num >> 8) & 0xff;
this._sQ[this._sQlen++] = (num >> 0) & 0xff; this._sQ[this._sQlen++] = (num >> 0) & 0xff;
} }
sQpush32(num) { sQpush32(num) {
this._sQensureSpace(4);
this._sQ[this._sQlen++] = (num >> 24) & 0xff; this._sQ[this._sQlen++] = (num >> 24) & 0xff;
this._sQ[this._sQlen++] = (num >> 16) & 0xff; this._sQ[this._sQlen++] = (num >> 16) & 0xff;
this._sQ[this._sQlen++] = (num >> 8) & 0xff; this._sQ[this._sQlen++] = (num >> 8) & 0xff;
@ -197,8 +200,18 @@ export default class Websock {
} }
sQpushBytes(bytes) { sQpushBytes(bytes) {
this._sQ.set(bytes, this._sQlen); for (let offset = 0;offset < bytes.length;) {
this._sQlen += bytes.length; this._sQensureSpace(1);
let chunkSize = this._sQbufferSize - this._sQlen;
if (chunkSize > bytes.length - offset) {
chunkSize = bytes.length - offset;
}
this._sQ.set(bytes.subarray(offset, chunkSize), this._sQlen);
this._sQlen += chunkSize;
offset += chunkSize;
}
} }
flush() { flush() {
@ -208,6 +221,12 @@ export default class Websock {
} }
} }
_sQensureSpace(bytes) {
if (this._sQbufferSize - this._sQlen < bytes) {
this.flush();
}
}
// Event Handlers // Event Handlers
off(evt) { off(evt) {
this._eventHandlers[evt] = () => {}; this._eventHandlers[evt] = () => {};

View File

@ -150,6 +150,8 @@ describe('Websock', function () {
describe('Send queue methods', function () { describe('Send queue methods', function () {
let sock; let sock;
const bufferSize = 10 * 1024;
beforeEach(function () { beforeEach(function () {
let websock = new FakeWebSocket(); let websock = new FakeWebSocket();
websock._open(); websock._open();
@ -167,6 +169,18 @@ describe('Websock', function () {
sock.sQpush8(42); sock.sQpush8(42);
expect(sock).to.have.sent(new Uint8Array([])); expect(sock).to.have.sent(new Uint8Array([]));
}); });
it('should implicitly flush if the queue is full', function () {
for (let i = 0;i <= bufferSize;i++) {
sock.sQpush8(42);
}
let expected = [];
for (let i = 0;i < bufferSize;i++) {
expected.push(42);
}
expect(sock).to.have.sent(new Uint8Array(expected));
});
}); });
describe('sQpush16()', function () { describe('sQpush16()', function () {
@ -179,6 +193,19 @@ describe('Websock', function () {
sock.sQpush16(420); sock.sQpush16(420);
expect(sock).to.have.sent(new Uint8Array([])); expect(sock).to.have.sent(new Uint8Array([]));
}); });
it('should implicitly flush if the queue is full', function () {
for (let i = 0;i <= bufferSize/2;i++) {
sock.sQpush16(420);
}
let expected = [];
for (let i = 0;i < bufferSize/2;i++) {
expected.push(1);
expected.push(164);
}
expect(sock).to.have.sent(new Uint8Array(expected));
});
}); });
describe('sQpush32()', function () { describe('sQpush32()', function () {
@ -191,6 +218,21 @@ describe('Websock', function () {
sock.sQpush32(420420); sock.sQpush32(420420);
expect(sock).to.have.sent(new Uint8Array([])); expect(sock).to.have.sent(new Uint8Array([]));
}); });
it('should implicitly flush if the queue is full', function () {
for (let i = 0;i <= bufferSize/4;i++) {
sock.sQpush32(420420);
}
let expected = [];
for (let i = 0;i < bufferSize/4;i++) {
expected.push(0);
expected.push(6);
expected.push(106);
expected.push(68);
}
expect(sock).to.have.sent(new Uint8Array(expected));
});
}); });
describe('sQpushString()', function () { describe('sQpushString()', function () {
@ -203,6 +245,41 @@ describe('Websock', function () {
sock.sQpushString('\x12\x34\x56\x78\x90'); sock.sQpushString('\x12\x34\x56\x78\x90');
expect(sock).to.have.sent(new Uint8Array([])); expect(sock).to.have.sent(new Uint8Array([]));
}); });
it('should implicitly flush if the queue is full', function () {
for (let i = 0;i <= bufferSize/5;i++) {
sock.sQpushString('\x12\x34\x56\x78\x90');
}
let expected = [];
for (let i = 0;i < bufferSize/5;i++) {
expected.push(0x12);
expected.push(0x34);
expected.push(0x56);
expected.push(0x78);
expected.push(0x90);
}
expect(sock).to.have.sent(new Uint8Array(expected));
});
it('should implicitly split a large buffer', function () {
let str = '';
for (let i = 0;i <= bufferSize/5;i++) {
str += '\x12\x34\x56\x78\x90';
}
sock.sQpushString(str);
let expected = [];
for (let i = 0;i < bufferSize/5;i++) {
expected.push(0x12);
expected.push(0x34);
expected.push(0x56);
expected.push(0x78);
expected.push(0x90);
}
expect(sock).to.have.sent(new Uint8Array(expected));
});
}); });
describe('sQpushBytes()', function () { describe('sQpushBytes()', function () {
@ -215,6 +292,45 @@ describe('Websock', function () {
sock.sQpushBytes(new Uint8Array([0x12, 0x34, 0x56, 0x78, 0x90])); sock.sQpushBytes(new Uint8Array([0x12, 0x34, 0x56, 0x78, 0x90]));
expect(sock).to.have.sent(new Uint8Array([])); expect(sock).to.have.sent(new Uint8Array([]));
}); });
it('should implicitly flush if the queue is full', function () {
for (let i = 0;i <= bufferSize/5;i++) {
sock.sQpushBytes(new Uint8Array([0x12, 0x34, 0x56, 0x78, 0x90]));
}
let expected = [];
for (let i = 0;i < bufferSize/5;i++) {
expected.push(0x12);
expected.push(0x34);
expected.push(0x56);
expected.push(0x78);
expected.push(0x90);
}
expect(sock).to.have.sent(new Uint8Array(expected));
});
it('should implicitly split a large buffer', function () {
let buffer = [];
for (let i = 0;i <= bufferSize/5;i++) {
buffer.push(0x12);
buffer.push(0x34);
buffer.push(0x56);
buffer.push(0x78);
buffer.push(0x90);
}
sock.sQpushBytes(new Uint8Array(buffer));
let expected = [];
for (let i = 0;i < bufferSize/5;i++) {
expected.push(0x12);
expected.push(0x34);
expected.push(0x56);
expected.push(0x78);
expected.push(0x90);
}
expect(sock).to.have.sent(new Uint8Array(expected));
});
}); });
describe('flush', function () { describe('flush', function () {