Merge pull request #565 from kanaka/bug/dynamic-rq-resize

Resize Receive Queue to Fit Incoming Messages
This commit is contained in:
Solly Ross 2016-01-05 14:28:01 -05:00
commit 7e4475fa92
2 changed files with 61 additions and 17 deletions

View File

@ -66,6 +66,12 @@ function Websock() {
(function () {
"use strict";
// this has performance issues in some versions Chromium, and
// doesn't gain a tremendous amount of performance increase in Firefox
// at the moment. It may be valuable to turn it on in the future.
var ENABLE_COPYWITHIN = false;
var MAX_RQ_GROW_SIZE = 40 * 1024 * 1024; // 40 MiB
var typedArrayToString = (function () {
// This is only for PhantomJS, which doesn't like apply-ing
@ -340,9 +346,49 @@ function Websock() {
return new Uint8Array(this._sQ.buffer, 0, this._sQlen);
},
_expand_compact_rQ: function (min_fit) {
var resizeNeeded = min_fit || this._rQlen - this._rQi > this._rQbufferSize / 2;
if (resizeNeeded) {
if (!min_fit) {
// just double the size if we need to do compaction
this._rQbufferSize *= 2;
} else {
// otherwise, make sure we satisy rQlen - rQi + min_fit < rQbufferSize / 8
this._rQbufferSize = (this._rQlen - this._rQi + min_fit) * 8;
}
}
// we don't want to grow unboundedly
if (this._rQbufferSize > MAX_RQ_GROW_SIZE) {
this._rQbufferSize = MAX_RQ_GROW_SIZE;
if (this._rQbufferSize - this._rQlen - this._rQi < min_fit) {
throw new Exception("Receive Queue buffer exceeded " + MAX_RQ_GROW_SIZE + " bytes, and the new message could not fit");
}
}
if (resizeNeeded) {
var old_rQbuffer = this._rQ.buffer;
this._rQmax = this._rQbufferSize / 8;
this._rQ = new Uint8Array(this._rQbufferSize);
this._rQ.set(new Uint8Array(old_rQbuffer, this._rQi));
} else {
if (ENABLE_COPYWITHIN) {
this._rQ.copyWithin(0, this._rQi);
} else {
this._rQ.set(new Uint8Array(this._rQ.buffer, this._rQi));
}
}
this._rQlen = this._rQlen - this._rQi;
this._rQi = 0;
},
_decode_message: function (data) {
// push arraybuffer values onto the end
var u8 = new Uint8Array(data);
if (u8.length > this._rQbufferSize - this._rQlen) {
this._expand_compact_rQ(u8.length);
}
this._rQ.set(u8, this._rQlen);
this._rQlen += u8.length;
},
@ -357,23 +403,7 @@ function Websock() {
this._rQlen = 0;
this._rQi = 0;
} else if (this._rQlen > this._rQmax) {
if (this._rQlen - this._rQi > 0.5 * this._rQbufferSize) {
var old_rQbuffer = this._rQ.buffer;
this._rQbufferSize *= 2;
this._rQmax = this._rQbufferSize / 8;
this._rQ = new Uint8Array(this._rQbufferSize);
this._rQ.set(new Uint8Array(old_rQbuffer, this._rQi));
} else {
if (this._rQ.copyWithin) {
// Firefox only, ATM
this._rQ.copyWithin(0, this._rQi);
} else {
this._rQ.set(new Uint8Array(this._rQ.buffer, this._rQi));
}
}
this._rQlen = this._rQlen - this._rQi;
this._rQi = 0;
this._expand_compact_rQ();
}
} else {
Util.Debug("Ignoring empty message");

View File

@ -404,6 +404,20 @@ describe('Websock', function() {
expect(sock.get_rQi()).to.equal(0);
});
it('should automatically resize the receive queue if the incoming message is too large', function () {
sock._rQ = new Uint8Array(20);
sock._rQlen = 0;
sock.set_rQi(0);
sock._rQbufferSize = 20;
sock._rQmax = 2;
var msg = { data: new Uint8Array(30).buffer };
sock._mode = 'binary';
sock._recv_message(msg);
expect(sock._rQlen).to.equal(30);
expect(sock.get_rQi()).to.equal(0);
expect(sock._rQ.length).to.equal(240); // keep the invariant that rQbufferSize / 8 >= rQlen
});
it('should call the error event handler on an exception', function () {
sock._eventHandlers.error = sinon.spy();
sock._eventHandlers.message = sinon.stub().throws();