Merge pull request #1281 from jalfd/optimize-receive-buffer

Optimize receive buffer
This commit is contained in:
Samuel Mannehed 2019-09-25 13:26:02 +02:00 committed by GitHub
commit c51a77c2eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 40 additions and 22 deletions

View File

@ -27,7 +27,6 @@ export default class Websock {
this._rQi = 0; // Receive queue index this._rQi = 0; // Receive queue index
this._rQlen = 0; // Next write position in the receive queue this._rQlen = 0; // Next write position in the receive queue
this._rQbufferSize = 1024 * 1024 * 4; // Receive queue buffer size (4 MiB) this._rQbufferSize = 1024 * 1024 * 4; // Receive queue buffer size (4 MiB)
this._rQmax = this._rQbufferSize / 8;
// called in init: this._rQ = new Uint8Array(this._rQbufferSize); // called in init: this._rQ = new Uint8Array(this._rQbufferSize);
this._rQ = null; // Receive queue this._rQ = null; // Receive queue
@ -226,15 +225,15 @@ export default class Websock {
} }
_expand_compact_rQ(min_fit) { _expand_compact_rQ(min_fit) {
const resizeNeeded = min_fit || this.rQlen > this._rQbufferSize / 2; // if we're using less than 1/8th of the buffer even with the incoming bytes, compact in place
// instead of resizing
const required_buffer_size = (this._rQlen - this._rQi + min_fit) * 8;
const resizeNeeded = this._rQbufferSize < required_buffer_size;
if (resizeNeeded) { if (resizeNeeded) {
if (!min_fit) { // Make sure we always *at least* double the buffer size, and have at least space for 8x
// just double the size if we need to do compaction // the current amount of data
this._rQbufferSize *= 2; this._rQbufferSize = Math.max(this._rQbufferSize * 2, required_buffer_size);
} else {
// otherwise, make sure we satisy rQlen - rQi + min_fit < rQbufferSize / 8
this._rQbufferSize = (this.rQlen + min_fit) * 8;
}
} }
// we don't want to grow unboundedly // we don't want to grow unboundedly
@ -247,14 +246,13 @@ export default class Websock {
if (resizeNeeded) { if (resizeNeeded) {
const old_rQbuffer = this._rQ.buffer; const old_rQbuffer = this._rQ.buffer;
this._rQmax = this._rQbufferSize / 8;
this._rQ = new Uint8Array(this._rQbufferSize); this._rQ = new Uint8Array(this._rQbufferSize);
this._rQ.set(new Uint8Array(old_rQbuffer, this._rQi)); this._rQ.set(new Uint8Array(old_rQbuffer, this._rQi, this._rQlen - this._rQi));
} else { } else {
if (ENABLE_COPYWITHIN) { if (ENABLE_COPYWITHIN) {
this._rQ.copyWithin(0, this._rQi); this._rQ.copyWithin(0, this._rQi, this._rQlen);
} else { } else {
this._rQ.set(new Uint8Array(this._rQ.buffer, this._rQi)); this._rQ.set(new Uint8Array(this._rQ.buffer, this._rQi, this._rQlen - this._rQi));
} }
} }
@ -280,8 +278,6 @@ export default class Websock {
if (this._rQlen == this._rQi) { if (this._rQlen == this._rQi) {
this._rQlen = 0; this._rQlen = 0;
this._rQi = 0; this._rQi = 0;
} else if (this._rQlen > this._rQmax) {
this._expand_compact_rQ();
} }
} else { } else {
Log.Debug("Ignoring empty message"); Log.Debug("Ignoring empty message");

View File

@ -384,26 +384,35 @@ describe('Websock', function () {
expect(sock._eventHandlers.message).not.to.have.been.called; expect(sock._eventHandlers.message).not.to.have.been.called;
}); });
it('should compact the receive queue', function () { it('should compact the receive queue when a message handler empties it', function () {
// NB(sross): while this is an internal implementation detail, it's important to sock._eventHandlers.message = () => { sock.rQi = sock._rQlen; };
// test, otherwise the receive queue could become very large very quickly
sock._rQ = new Uint8Array([0, 1, 2, 3, 4, 5, 0, 0, 0, 0]); sock._rQ = new Uint8Array([0, 1, 2, 3, 4, 5, 0, 0, 0, 0]);
sock._rQlen = 6; sock._rQlen = 6;
sock.rQi = 6; sock.rQi = 6;
sock._rQmax = 3;
const msg = { data: new Uint8Array([1, 2, 3]).buffer }; const msg = { data: new Uint8Array([1, 2, 3]).buffer };
sock._mode = 'binary'; sock._mode = 'binary';
sock._recv_message(msg); sock._recv_message(msg);
expect(sock._rQlen).to.equal(3); expect(sock._rQlen).to.equal(0);
expect(sock.rQi).to.equal(0); expect(sock.rQi).to.equal(0);
}); });
it('should automatically resize the receive queue if the incoming message is too large', function () { it('should compact the receive queue when we reach the end of the buffer', function () {
sock._rQ = new Uint8Array(20);
sock._rQbufferSize = 20;
sock._rQlen = 20;
sock.rQi = 10;
const msg = { data: new Uint8Array([1, 2]).buffer };
sock._mode = 'binary';
sock._recv_message(msg);
expect(sock._rQlen).to.equal(12);
expect(sock.rQi).to.equal(0);
});
it('should automatically resize the receive queue if the incoming message is larger than the buffer', function () {
sock._rQ = new Uint8Array(20); sock._rQ = new Uint8Array(20);
sock._rQlen = 0; sock._rQlen = 0;
sock.rQi = 0; sock.rQi = 0;
sock._rQbufferSize = 20; sock._rQbufferSize = 20;
sock._rQmax = 2;
const msg = { data: new Uint8Array(30).buffer }; const msg = { data: new Uint8Array(30).buffer };
sock._mode = 'binary'; sock._mode = 'binary';
sock._recv_message(msg); sock._recv_message(msg);
@ -411,6 +420,19 @@ describe('Websock', function () {
expect(sock.rQi).to.equal(0); expect(sock.rQi).to.equal(0);
expect(sock._rQ.length).to.equal(240); // keep the invariant that rQbufferSize / 8 >= rQlen expect(sock._rQ.length).to.equal(240); // keep the invariant that rQbufferSize / 8 >= rQlen
}); });
it('should automatically resize the receive queue if the incoming message is larger than 1/8th of the buffer and we reach the end of the buffer', function () {
sock._rQ = new Uint8Array(20);
sock._rQlen = 16;
sock.rQi = 16;
sock._rQbufferSize = 20;
const msg = { data: new Uint8Array(6).buffer };
sock._mode = 'binary';
sock._recv_message(msg);
expect(sock._rQlen).to.equal(6);
expect(sock.rQi).to.equal(0);
expect(sock._rQ.length).to.equal(48);
});
}); });
describe('Data encoding', function () { describe('Data encoding', function () {