Stop exposing Websock queue length

Callers should be using rQwait() to ensure sufficient data is present,
and not poke around in the internal buffering.
This commit is contained in:
Pierre Ossman 2023-05-14 18:56:19 +02:00
parent 0180bc81c1
commit 55ffe8fc51
4 changed files with 50 additions and 71 deletions

View File

@ -24,38 +24,34 @@ export default class RawDecoder {
const pixelSize = depth == 8 ? 1 : 4; const pixelSize = depth == 8 ? 1 : 4;
const bytesPerLine = width * pixelSize; const bytesPerLine = width * pixelSize;
if (sock.rQwait("RAW", bytesPerLine)) { while (this._lines > 0) {
return false; if (sock.rQwait("RAW", bytesPerLine)) {
} return false;
const curY = y + (height - this._lines);
const currHeight = Math.min(this._lines,
Math.floor(sock.rQlen / bytesPerLine));
const pixels = width * currHeight;
let data = sock.rQshiftBytes(currHeight * bytesPerLine);
// Convert data if needed
if (depth == 8) {
const newdata = new Uint8Array(pixels * 4);
for (let i = 0; i < pixels; i++) {
newdata[i * 4 + 0] = ((data[i] >> 0) & 0x3) * 255 / 3;
newdata[i * 4 + 1] = ((data[i] >> 2) & 0x3) * 255 / 3;
newdata[i * 4 + 2] = ((data[i] >> 4) & 0x3) * 255 / 3;
newdata[i * 4 + 3] = 255;
} }
data = newdata;
}
// Max sure the image is fully opaque const curY = y + (height - this._lines);
for (let i = 0; i < pixels; i++) {
data[i * 4 + 3] = 255;
}
display.blitImage(x, curY, width, currHeight, data, 0); let data = sock.rQshiftBytes(bytesPerLine);
this._lines -= currHeight;
if (this._lines > 0) { // Convert data if needed
return false; if (depth == 8) {
const newdata = new Uint8Array(width * 4);
for (let i = 0; i < width; i++) {
newdata[i * 4 + 0] = ((data[i] >> 0) & 0x3) * 255 / 3;
newdata[i * 4 + 1] = ((data[i] >> 2) & 0x3) * 255 / 3;
newdata[i * 4 + 2] = ((data[i] >> 4) & 0x3) * 255 / 3;
newdata[i * 4 + 3] = 255;
}
data = newdata;
}
// Max sure the image is fully opaque
for (let i = 0; i < width; i++) {
data[i * 4 + 3] = 255;
}
display.blitImage(x, curY, width, 1, data, 0);
this._lines--;
} }
return true; return true;

View File

@ -958,7 +958,7 @@ export default class RFB extends EventTargetMixin {
} }
_handleMessage() { _handleMessage() {
if (this._sock.rQlen === 0) { if (this._sock.rQwait("message", 1)) {
Log.Warn("handleMessage called on an empty receive queue"); Log.Warn("handleMessage called on an empty receive queue");
return; return;
} }
@ -975,7 +975,7 @@ export default class RFB extends EventTargetMixin {
if (!this._normalMsg()) { if (!this._normalMsg()) {
break; break;
} }
if (this._sock.rQlen === 0) { if (this._sock.rQwait("message", 1)) {
break; break;
} }
} }
@ -2473,7 +2473,7 @@ export default class RFB extends EventTargetMixin {
.then(() => { .then(() => {
this._flushing = false; this._flushing = false;
// Resume processing // Resume processing
if (this._sock.rQlen > 0) { if (!this._sock.rQwait("message", 1)) {
this._handleMessage(); this._handleMessage();
} }
}); });

View File

@ -95,10 +95,6 @@ export default class Websock {
} }
// Receive Queue // Receive Queue
get rQlen() {
return this._rQlen - this._rQi;
}
rQpeek8() { rQpeek8() {
return this._rQ[this._rQi]; return this._rQ[this._rQi];
} }
@ -129,7 +125,7 @@ export default class Websock {
} }
rQshiftStr(len) { rQshiftStr(len) {
if (typeof(len) === 'undefined') { len = this.rQlen; } if (typeof(len) === 'undefined') { len = this._rQlen - this._rQi; }
let str = ""; let str = "";
// Handle large arrays in steps to avoid long strings on the stack // Handle large arrays in steps to avoid long strings on the stack
for (let i = 0; i < len; i += 4096) { for (let i = 0; i < len; i += 4096) {
@ -140,20 +136,20 @@ export default class Websock {
} }
rQshiftBytes(len) { rQshiftBytes(len) {
if (typeof(len) === 'undefined') { len = this.rQlen; } if (typeof(len) === 'undefined') { len = this._rQlen - this._rQi; }
this._rQi += len; this._rQi += len;
return new Uint8Array(this._rQ.buffer, this._rQi - len, len); return new Uint8Array(this._rQ.buffer, this._rQi - len, len);
} }
rQshiftTo(target, len) { rQshiftTo(target, len) {
if (len === undefined) { len = this.rQlen; } if (len === undefined) { len = this._rQlen - this._rQi; }
// TODO: make this just use set with views when using a ArrayBuffer to store the rQ // TODO: make this just use set with views when using a ArrayBuffer to store the rQ
target.set(new Uint8Array(this._rQ.buffer, this._rQi, len)); target.set(new Uint8Array(this._rQ.buffer, this._rQi, len));
this._rQi += len; this._rQi += len;
} }
rQpeekBytes(len) { rQpeekBytes(len) {
if (typeof(len) === 'undefined') { len = this.rQlen; } if (typeof(len) === 'undefined') { len = this._rQlen - this._rQi; }
return new Uint8Array(this._rQ.buffer, this._rQi, len); return new Uint8Array(this._rQ.buffer, this._rQi, len);
} }
@ -161,7 +157,7 @@ export default class Websock {
// to be available in the receive queue. Return true if we need to // to be available in the receive queue. Return true if we need to
// wait (and possibly print a debug message), otherwise false. // wait (and possibly print a debug message), otherwise false.
rQwait(msg, num, goback) { rQwait(msg, num, goback) {
if (this.rQlen < num) { if (this._rQlen - this._rQi < num) {
if (goback) { if (goback) {
if (this._rQi < goback) { if (this._rQi < goback) {
throw new Error("rQwait cannot backup " + goback + " bytes"); throw new Error("rQwait cannot backup " + goback + " bytes");
@ -294,7 +290,7 @@ export default class Websock {
// we don't want to grow unboundedly // we don't want to grow unboundedly
if (this._rQbufferSize > MAX_RQ_GROW_SIZE) { if (this._rQbufferSize > MAX_RQ_GROW_SIZE) {
this._rQbufferSize = MAX_RQ_GROW_SIZE; this._rQbufferSize = MAX_RQ_GROW_SIZE;
if (this._rQbufferSize - this.rQlen < minFit) { if (this._rQbufferSize - (this._rQlen - this._rQi) < minFit) {
throw new Error("Receive Queue buffer exceeded " + MAX_RQ_GROW_SIZE + " bytes, and the new message could not fit"); throw new Error("Receive Queue buffer exceeded " + MAX_RQ_GROW_SIZE + " bytes, and the new message could not fit");
} }
} }
@ -323,7 +319,7 @@ export default class Websock {
_recvMessage(e) { _recvMessage(e) {
this._DecodeMessage(e.data); this._DecodeMessage(e.data);
if (this.rQlen > 0) { if (this._rQlen - this._rQi > 0) {
this._eventHandlers.message(); this._eventHandlers.message();
if (this._rQlen == this._rQi) { if (this._rQlen == this._rQi) {
// All data has now been processed, this means we // All data has now been processed, this means we

View File

@ -17,56 +17,43 @@ describe('Websock', function () {
sock._rQ.set(RQ_TEMPLATE); sock._rQ.set(RQ_TEMPLATE);
sock._rQlen = RQ_TEMPLATE.length; sock._rQlen = RQ_TEMPLATE.length;
}); });
describe('rQlen', function () {
it('should return the length of the receive queue', function () {
sock._rQi = 0;
expect(sock.rQlen).to.equal(RQ_TEMPLATE.length);
});
it("should return the proper length if we read some from the receive queue", function () {
sock._rQi = 1;
expect(sock.rQlen).to.equal(RQ_TEMPLATE.length - 1);
});
});
describe('rQpeek8', function () { describe('rQpeek8', function () {
it('should peek at the next byte without poping it off the queue', function () { it('should peek at the next byte without poping it off the queue', function () {
const befLen = sock.rQlen; const befLen = sock._rQlen - sock._rQi;
const peek = sock.rQpeek8(); const peek = sock.rQpeek8();
expect(sock.rQpeek8()).to.equal(peek); expect(sock.rQpeek8()).to.equal(peek);
expect(sock.rQlen).to.equal(befLen); expect(sock._rQlen - sock._rQi).to.equal(befLen);
}); });
}); });
describe('rQshift8()', function () { describe('rQshift8()', function () {
it('should pop a single byte from the receive queue', function () { it('should pop a single byte from the receive queue', function () {
const peek = sock.rQpeek8(); const peek = sock.rQpeek8();
const befLen = sock.rQlen; const befLen = sock._rQlen - sock._rQi;
expect(sock.rQshift8()).to.equal(peek); expect(sock.rQshift8()).to.equal(peek);
expect(sock.rQlen).to.equal(befLen - 1); expect(sock._rQlen - sock._rQi).to.equal(befLen - 1);
}); });
}); });
describe('rQshift16()', function () { describe('rQshift16()', function () {
it('should pop two bytes from the receive queue and return a single number', function () { it('should pop two bytes from the receive queue and return a single number', function () {
const befLen = sock.rQlen; const befLen = sock._rQlen - sock._rQi;
const expected = (RQ_TEMPLATE[0] << 8) + RQ_TEMPLATE[1]; const expected = (RQ_TEMPLATE[0] << 8) + RQ_TEMPLATE[1];
expect(sock.rQshift16()).to.equal(expected); expect(sock.rQshift16()).to.equal(expected);
expect(sock.rQlen).to.equal(befLen - 2); expect(sock._rQlen - sock._rQi).to.equal(befLen - 2);
}); });
}); });
describe('rQshift32()', function () { describe('rQshift32()', function () {
it('should pop four bytes from the receive queue and return a single number', function () { it('should pop four bytes from the receive queue and return a single number', function () {
const befLen = sock.rQlen; const befLen = sock._rQlen - sock._rQi;
const expected = (RQ_TEMPLATE[0] << 24) + const expected = (RQ_TEMPLATE[0] << 24) +
(RQ_TEMPLATE[1] << 16) + (RQ_TEMPLATE[1] << 16) +
(RQ_TEMPLATE[2] << 8) + (RQ_TEMPLATE[2] << 8) +
RQ_TEMPLATE[3]; RQ_TEMPLATE[3];
expect(sock.rQshift32()).to.equal(expected); expect(sock.rQshift32()).to.equal(expected);
expect(sock.rQlen).to.equal(befLen - 4); expect(sock._rQlen - sock._rQi).to.equal(befLen - 4);
}); });
}); });
@ -77,12 +64,12 @@ describe('Websock', function () {
const shifted = sock.rQshiftStr(3); const shifted = sock.rQshiftStr(3);
expect(shifted).to.be.a('string'); expect(shifted).to.be.a('string');
expect(shifted).to.equal(String.fromCharCode.apply(null, Array.prototype.slice.call(new Uint8Array(RQ_TEMPLATE.buffer, befRQi, 3)))); expect(shifted).to.equal(String.fromCharCode.apply(null, Array.prototype.slice.call(new Uint8Array(RQ_TEMPLATE.buffer, befRQi, 3))));
expect(sock.rQlen).to.equal(befLen - 3); expect(sock._rQlen - sock._rQi).to.equal(befLen - 3);
}); });
it('should shift the entire rest of the queue off if no length is given', function () { it('should shift the entire rest of the queue off if no length is given', function () {
sock.rQshiftStr(); sock.rQshiftStr();
expect(sock.rQlen).to.equal(0); expect(sock._rQlen - sock._rQi).to.equal(0);
}); });
it('should be able to handle very large strings', function () { it('should be able to handle very large strings', function () {
@ -106,7 +93,7 @@ describe('Websock', function () {
const shifted = sock.rQshiftStr(); const shifted = sock.rQshiftStr();
expect(shifted).to.be.equal(expected); expect(shifted).to.be.equal(expected);
expect(sock.rQlen).to.equal(0); expect(sock._rQlen - sock._rQi).to.equal(0);
}); });
}); });
@ -117,12 +104,12 @@ describe('Websock', function () {
const shifted = sock.rQshiftBytes(3); const shifted = sock.rQshiftBytes(3);
expect(shifted).to.be.an.instanceof(Uint8Array); expect(shifted).to.be.an.instanceof(Uint8Array);
expect(shifted).to.array.equal(new Uint8Array(RQ_TEMPLATE.buffer, befRQi, 3)); expect(shifted).to.array.equal(new Uint8Array(RQ_TEMPLATE.buffer, befRQi, 3));
expect(sock.rQlen).to.equal(befLen - 3); expect(sock._rQlen - sock._rQi).to.equal(befLen - 3);
}); });
it('should shift the entire rest of the queue off if no length is given', function () { it('should shift the entire rest of the queue off if no length is given', function () {
sock.rQshiftBytes(); sock.rQshiftBytes();
expect(sock.rQlen).to.equal(0); expect(sock._rQlen - sock._rQi).to.equal(0);
}); });
}); });
@ -132,9 +119,9 @@ describe('Websock', function () {
}); });
it('should not modify the receive queue', function () { it('should not modify the receive queue', function () {
const befLen = sock.rQlen; const befLen = sock._rQlen - sock._rQi;
sock.rQpeekBytes(2); sock.rQpeekBytes(2);
expect(sock.rQlen).to.equal(befLen); expect(sock._rQlen - sock._rQi).to.equal(befLen);
}); });
it('should return an array containing the requested bytes of the receive queue', function () { it('should return an array containing the requested bytes of the receive queue', function () {