Mercurial > hg > rlgwebd
diff rlgwebd.js @ 23:21de24c08aed
Implement message order correction on the server side.
rlgwebd can now correct for the client's messages arriving in the wrong
order, in theory. I haven't found a good way of testing it yet.
author | John "Elwin" Edwards <elwin@sdf.org> |
---|---|
date | Thu, 24 May 2012 11:36:57 -0700 |
parents | 51d59a0e3b20 |
children | 9b58f8d3ea70 |
line wrap: on
line diff
--- a/rlgwebd.js Wed May 23 14:28:47 2012 -0700 +++ b/rlgwebd.js Thu May 24 11:36:57 2012 -0700 @@ -61,6 +61,7 @@ this.nsend = 0; this.nrecv = 0; this.msgQ = [] + this.Qtimeout = null; /* Set up the sizes. */ this.w = Math.floor(Number(dims[1])); if (!(this.w > 0 && this.w < 256)) @@ -106,10 +107,63 @@ /* Wait for all the data to get collected */ setTimeout(ss.cleanup, 1000); }); - this.write = function (data) { - if (this.alive) + this.write = function (data, n) { + if (!this.alive || typeof (n) != "number") { + return; + } + //console.log("Got message " + n); + var oindex = n - this.nrecv; + if (oindex === 0) { + //console.log("Writing message " + n); this.child.stdin.write(data); - /* Otherwise, throw some kind of exception? */ + this.nrecv++; + var next; + while ((next = this.msgQ.shift()) !== undefined) { + //console.log("Writing message " + this.nrecv); + this.child.stdin.write(next); + this.nrecv++; + } + if (this.msgQ.length == 0 && this.Qtimeout) { + clearTimeout(this.Qtimeout); + this.Qtimeout = null; + } + } + else if (oindex > 0 && oindex <= 1024) { + console.log("Stashing message " + n + " at " + (oindex - 1)); + this.msgQ[oindex - 1] = data; + if (!this.Qtimeout) { + var nextn = this.nrecv + this.msgQ.length + 1; + this.Qtimeout = setTimeout(this.flushQ, 30000, this, nextn); + } + } + /* Otherwise, discard it */ + return; + }; + this.flushQ = function (session, n) { + /* Callback for when an unreceived message times out. + * n is the first empty space that will not be given up on. */ + if (!session.alive) + return; + session.nrecv++; + var next; + /* Clear the queue up to n */ + while (session.nrecv < n) { + next = session.msgQ.shift(); + if (next !== undefined) + session.child.stdin.write(next); + session.nrecv++; + } + /* Clear out anything that's ready. */ + while ((next = session.msgQ.shift()) !== undefined) { + session.child.stdin.write(next); + session.nrecv++; + } + /* Now set another timeout if necessary. */ + if (session.msgQ.length != 0) { + var nextn = session.nrecv + session.msgQ.length + 1; + session.Qtimeout = setTimeout(session.flushQ, 30000, session, nextn); + } + console.log("Flushing queue for session " + session.sessid); }; this.read = function () { if (this.data.length == 0) @@ -534,7 +588,7 @@ } keybuf = new Buffer(hexstr, "hex"); /* TODO OoO correction */ - cterm.write(keybuf); + cterm.write(keybuf, formdata.n); } readFeed(res, cterm); }