From ccef89f55651e083fc2e818a251d09ff0bf9e75a Mon Sep 17 00:00:00 2001 From: Pierre Ossman Date: Tue, 30 May 2023 20:32:31 +0200 Subject: [PATCH] Implicitly flush Websock if needed Callers shouldn't have to deal with the internal buffering limits of Websock, so implicitly flush the buffer if more room is needed. --- core/rfb.js | 19 +------ core/websock.js | 23 ++++++++- tests/test.websock.js | 116 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 139 insertions(+), 19 deletions(-) diff --git a/core/rfb.js b/core/rfb.js index da95a386..fb9df0b9 100644 --- a/core/rfb.js +++ b/core/rfb.js @@ -3047,23 +3047,8 @@ RFB.messages = { } sock.sQpush32(length); - - // We have to keep track of from where in the data we begin creating the - // buffer for the flush in the next iteration. - let dataOffset = 0; - - let remaining = data.length; - while (remaining > 0) { - - let flushSize = Math.min(remaining, (sock._sQbufferSize - sock._sQlen)); - - sock.sQpushBytes(data.subarray(dataOffset, dataOffset + flushSize)); - sock.flush(); - - remaining -= flushSize; - dataOffset += flushSize; - } - + sock.sQpushBytes(data); + sock.flush(); }, setDesktopSize(sock, width, height, id, flags) { diff --git a/core/websock.js b/core/websock.js index e8e0390c..21327c31 100644 --- a/core/websock.js +++ b/core/websock.js @@ -176,15 +176,18 @@ export default class Websock { // Send Queue sQpush8(num) { + this._sQensureSpace(1); this._sQ[this._sQlen++] = num; } sQpush16(num) { + this._sQensureSpace(2); this._sQ[this._sQlen++] = (num >> 8) & 0xff; this._sQ[this._sQlen++] = (num >> 0) & 0xff; } sQpush32(num) { + this._sQensureSpace(4); this._sQ[this._sQlen++] = (num >> 24) & 0xff; this._sQ[this._sQlen++] = (num >> 16) & 0xff; this._sQ[this._sQlen++] = (num >> 8) & 0xff; @@ -197,8 +200,18 @@ export default class Websock { } sQpushBytes(bytes) { - this._sQ.set(bytes, this._sQlen); - this._sQlen += bytes.length; + for (let offset = 0;offset < bytes.length;) { + this._sQensureSpace(1); + + let chunkSize = this._sQbufferSize - this._sQlen; + if (chunkSize > bytes.length - offset) { + chunkSize = bytes.length - offset; + } + + this._sQ.set(bytes.subarray(offset, chunkSize), this._sQlen); + this._sQlen += chunkSize; + offset += chunkSize; + } } flush() { @@ -208,6 +221,12 @@ export default class Websock { } } + _sQensureSpace(bytes) { + if (this._sQbufferSize - this._sQlen < bytes) { + this.flush(); + } + } + // Event Handlers off(evt) { this._eventHandlers[evt] = () => {}; diff --git a/tests/test.websock.js b/tests/test.websock.js index b7fdd7d6..dc361b74 100644 --- a/tests/test.websock.js +++ b/tests/test.websock.js @@ -150,6 +150,8 @@ describe('Websock', function () { describe('Send queue methods', function () { let sock; + const bufferSize = 10 * 1024; + beforeEach(function () { let websock = new FakeWebSocket(); websock._open(); @@ -167,6 +169,18 @@ describe('Websock', function () { sock.sQpush8(42); expect(sock).to.have.sent(new Uint8Array([])); }); + it('should implicitly flush if the queue is full', function () { + for (let i = 0;i <= bufferSize;i++) { + sock.sQpush8(42); + } + + let expected = []; + for (let i = 0;i < bufferSize;i++) { + expected.push(42); + } + + expect(sock).to.have.sent(new Uint8Array(expected)); + }); }); describe('sQpush16()', function () { @@ -179,6 +193,19 @@ describe('Websock', function () { sock.sQpush16(420); expect(sock).to.have.sent(new Uint8Array([])); }); + it('should implicitly flush if the queue is full', function () { + for (let i = 0;i <= bufferSize/2;i++) { + sock.sQpush16(420); + } + + let expected = []; + for (let i = 0;i < bufferSize/2;i++) { + expected.push(1); + expected.push(164); + } + + expect(sock).to.have.sent(new Uint8Array(expected)); + }); }); describe('sQpush32()', function () { @@ -191,6 +218,21 @@ describe('Websock', function () { sock.sQpush32(420420); expect(sock).to.have.sent(new Uint8Array([])); }); + it('should implicitly flush if the queue is full', function () { + for (let i = 0;i <= bufferSize/4;i++) { + sock.sQpush32(420420); + } + + let expected = []; + for (let i = 0;i < bufferSize/4;i++) { + expected.push(0); + expected.push(6); + expected.push(106); + expected.push(68); + } + + expect(sock).to.have.sent(new Uint8Array(expected)); + }); }); describe('sQpushString()', function () { @@ -203,6 +245,41 @@ describe('Websock', function () { sock.sQpushString('\x12\x34\x56\x78\x90'); expect(sock).to.have.sent(new Uint8Array([])); }); + it('should implicitly flush if the queue is full', function () { + for (let i = 0;i <= bufferSize/5;i++) { + sock.sQpushString('\x12\x34\x56\x78\x90'); + } + + let expected = []; + for (let i = 0;i < bufferSize/5;i++) { + expected.push(0x12); + expected.push(0x34); + expected.push(0x56); + expected.push(0x78); + expected.push(0x90); + } + + expect(sock).to.have.sent(new Uint8Array(expected)); + }); + it('should implicitly split a large buffer', function () { + let str = ''; + for (let i = 0;i <= bufferSize/5;i++) { + str += '\x12\x34\x56\x78\x90'; + } + + sock.sQpushString(str); + + let expected = []; + for (let i = 0;i < bufferSize/5;i++) { + expected.push(0x12); + expected.push(0x34); + expected.push(0x56); + expected.push(0x78); + expected.push(0x90); + } + + expect(sock).to.have.sent(new Uint8Array(expected)); + }); }); describe('sQpushBytes()', function () { @@ -215,6 +292,45 @@ describe('Websock', function () { sock.sQpushBytes(new Uint8Array([0x12, 0x34, 0x56, 0x78, 0x90])); expect(sock).to.have.sent(new Uint8Array([])); }); + it('should implicitly flush if the queue is full', function () { + for (let i = 0;i <= bufferSize/5;i++) { + sock.sQpushBytes(new Uint8Array([0x12, 0x34, 0x56, 0x78, 0x90])); + } + + let expected = []; + for (let i = 0;i < bufferSize/5;i++) { + expected.push(0x12); + expected.push(0x34); + expected.push(0x56); + expected.push(0x78); + expected.push(0x90); + } + + expect(sock).to.have.sent(new Uint8Array(expected)); + }); + it('should implicitly split a large buffer', function () { + let buffer = []; + for (let i = 0;i <= bufferSize/5;i++) { + buffer.push(0x12); + buffer.push(0x34); + buffer.push(0x56); + buffer.push(0x78); + buffer.push(0x90); + } + + sock.sQpushBytes(new Uint8Array(buffer)); + + let expected = []; + for (let i = 0;i < bufferSize/5;i++) { + expected.push(0x12); + expected.push(0x34); + expected.push(0x56); + expected.push(0x78); + expected.push(0x90); + } + + expect(sock).to.have.sent(new Uint8Array(expected)); + }); }); describe('flush', function () {