Mercurial > hg > rlgwebd
comparison rlgwebd.js @ 23:21de24c08aed
Implement message order correction on the server side.
rlgwebd can now correct for the client's messages arriving in the wrong
order, in theory. I haven't found a good way of testing it yet.
| author | John "Elwin" Edwards <elwin@sdf.org> |
|---|---|
| date | Thu, 24 May 2012 11:36:57 -0700 |
| parents | 51d59a0e3b20 |
| children | 9b58f8d3ea70 |
comparison
equal
deleted
inserted
replaced
| 22:51d59a0e3b20 | 23:21de24c08aed |
|---|---|
| 59 sessions[this.sessid] = this; | 59 sessions[this.sessid] = this; |
| 60 /* State for messaging. */ | 60 /* State for messaging. */ |
| 61 this.nsend = 0; | 61 this.nsend = 0; |
| 62 this.nrecv = 0; | 62 this.nrecv = 0; |
| 63 this.msgQ = [] | 63 this.msgQ = [] |
| 64 this.Qtimeout = null; | |
| 64 /* Set up the sizes. */ | 65 /* Set up the sizes. */ |
| 65 this.w = Math.floor(Number(dims[1])); | 66 this.w = Math.floor(Number(dims[1])); |
| 66 if (!(this.w > 0 && this.w < 256)) | 67 if (!(this.w > 0 && this.w < 256)) |
| 67 this.w = 80; | 68 this.w = 80; |
| 68 this.h = Math.floor(Number(dims[0])); | 69 this.h = Math.floor(Number(dims[0])); |
| 104 ss.alive = false; | 105 ss.alive = false; |
| 105 fs.unlink(ss.lock); | 106 fs.unlink(ss.lock); |
| 106 /* Wait for all the data to get collected */ | 107 /* Wait for all the data to get collected */ |
| 107 setTimeout(ss.cleanup, 1000); | 108 setTimeout(ss.cleanup, 1000); |
| 108 }); | 109 }); |
| 109 this.write = function (data) { | 110 this.write = function (data, n) { |
| 110 if (this.alive) | 111 if (!this.alive || typeof (n) != "number") { |
| 112 return; | |
| 113 } | |
| 114 //console.log("Got message " + n); | |
| 115 var oindex = n - this.nrecv; | |
| 116 if (oindex === 0) { | |
| 117 //console.log("Writing message " + n); | |
| 111 this.child.stdin.write(data); | 118 this.child.stdin.write(data); |
| 112 /* Otherwise, throw some kind of exception? */ | 119 this.nrecv++; |
| 120 var next; | |
| 121 while ((next = this.msgQ.shift()) !== undefined) { | |
| 122 //console.log("Writing message " + this.nrecv); | |
| 123 this.child.stdin.write(next); | |
| 124 this.nrecv++; | |
| 125 } | |
| 126 if (this.msgQ.length == 0 && this.Qtimeout) { | |
| 127 clearTimeout(this.Qtimeout); | |
| 128 this.Qtimeout = null; | |
| 129 } | |
| 130 } | |
| 131 else if (oindex > 0 && oindex <= 1024) { | |
| 132 console.log("Stashing message " + n + " at " + (oindex - 1)); | |
| 133 this.msgQ[oindex - 1] = data; | |
| 134 if (!this.Qtimeout) { | |
| 135 var nextn = this.nrecv + this.msgQ.length + 1; | |
| 136 this.Qtimeout = setTimeout(this.flushQ, 30000, this, nextn); | |
| 137 } | |
| 138 } | |
| 139 /* Otherwise, discard it */ | |
| 140 return; | |
| 141 }; | |
| 142 this.flushQ = function (session, n) { | |
| 143 /* Callback for when an unreceived message times out. | |
| 144 * n is the first empty space that will not be given up on. */ | |
| 145 if (!session.alive) | |
| 146 return; | |
| 147 session.nrecv++; | |
| 148 var next; | |
| 149 /* Clear the queue up to n */ | |
| 150 while (session.nrecv < n) { | |
| 151 next = session.msgQ.shift(); | |
| 152 if (next !== undefined) | |
| 153 session.child.stdin.write(next); | |
| 154 session.nrecv++; | |
| 155 } | |
| 156 /* Clear out anything that's ready. */ | |
| 157 while ((next = session.msgQ.shift()) !== undefined) { | |
| 158 session.child.stdin.write(next); | |
| 159 session.nrecv++; | |
| 160 } | |
| 161 /* Now set another timeout if necessary. */ | |
| 162 if (session.msgQ.length != 0) { | |
| 163 var nextn = session.nrecv + session.msgQ.length + 1; | |
| 164 session.Qtimeout = setTimeout(session.flushQ, 30000, session, nextn); | |
| 165 } | |
| 166 console.log("Flushing queue for session " + session.sessid); | |
| 113 }; | 167 }; |
| 114 this.read = function () { | 168 this.read = function () { |
| 115 if (this.data.length == 0) | 169 if (this.data.length == 0) |
| 116 return null; | 170 return null; |
| 117 var pos = 0; | 171 var pos = 0; |
| 532 sendError(res, 2, "incomplete byte"); | 586 sendError(res, 2, "incomplete byte"); |
| 533 return; | 587 return; |
| 534 } | 588 } |
| 535 keybuf = new Buffer(hexstr, "hex"); | 589 keybuf = new Buffer(hexstr, "hex"); |
| 536 /* TODO OoO correction */ | 590 /* TODO OoO correction */ |
| 537 cterm.write(keybuf); | 591 cterm.write(keybuf, formdata.n); |
| 538 } | 592 } |
| 539 readFeed(res, cterm); | 593 readFeed(res, cterm); |
| 540 } | 594 } |
| 541 else if (target == "/login") { | 595 else if (target == "/login") { |
| 542 login(req, res, formdata); | 596 login(req, res, formdata); |
