Mercurial > hg > rlgwebd
comparison rlgwebd.js @ 23:21de24c08aed
Implement message order correction on the server side.
rlgwebd can now correct for the client's messages arriving in the wrong
order, in theory. I haven't found a good way of testing it yet.
author | John "Elwin" Edwards <elwin@sdf.org> |
---|---|
date | Thu, 24 May 2012 11:36:57 -0700 |
parents | 51d59a0e3b20 |
children | 9b58f8d3ea70 |
comparison
equal
deleted
inserted
replaced
22:51d59a0e3b20 | 23:21de24c08aed |
---|---|
59 sessions[this.sessid] = this; | 59 sessions[this.sessid] = this; |
60 /* State for messaging. */ | 60 /* State for messaging. */ |
61 this.nsend = 0; | 61 this.nsend = 0; |
62 this.nrecv = 0; | 62 this.nrecv = 0; |
63 this.msgQ = [] | 63 this.msgQ = [] |
64 this.Qtimeout = null; | |
64 /* Set up the sizes. */ | 65 /* Set up the sizes. */ |
65 this.w = Math.floor(Number(dims[1])); | 66 this.w = Math.floor(Number(dims[1])); |
66 if (!(this.w > 0 && this.w < 256)) | 67 if (!(this.w > 0 && this.w < 256)) |
67 this.w = 80; | 68 this.w = 80; |
68 this.h = Math.floor(Number(dims[0])); | 69 this.h = Math.floor(Number(dims[0])); |
104 ss.alive = false; | 105 ss.alive = false; |
105 fs.unlink(ss.lock); | 106 fs.unlink(ss.lock); |
106 /* Wait for all the data to get collected */ | 107 /* Wait for all the data to get collected */ |
107 setTimeout(ss.cleanup, 1000); | 108 setTimeout(ss.cleanup, 1000); |
108 }); | 109 }); |
109 this.write = function (data) { | 110 this.write = function (data, n) { |
110 if (this.alive) | 111 if (!this.alive || typeof (n) != "number") { |
112 return; | |
113 } | |
114 //console.log("Got message " + n); | |
115 var oindex = n - this.nrecv; | |
116 if (oindex === 0) { | |
117 //console.log("Writing message " + n); | |
111 this.child.stdin.write(data); | 118 this.child.stdin.write(data); |
112 /* Otherwise, throw some kind of exception? */ | 119 this.nrecv++; |
120 var next; | |
121 while ((next = this.msgQ.shift()) !== undefined) { | |
122 //console.log("Writing message " + this.nrecv); | |
123 this.child.stdin.write(next); | |
124 this.nrecv++; | |
125 } | |
126 if (this.msgQ.length == 0 && this.Qtimeout) { | |
127 clearTimeout(this.Qtimeout); | |
128 this.Qtimeout = null; | |
129 } | |
130 } | |
131 else if (oindex > 0 && oindex <= 1024) { | |
132 console.log("Stashing message " + n + " at " + (oindex - 1)); | |
133 this.msgQ[oindex - 1] = data; | |
134 if (!this.Qtimeout) { | |
135 var nextn = this.nrecv + this.msgQ.length + 1; | |
136 this.Qtimeout = setTimeout(this.flushQ, 30000, this, nextn); | |
137 } | |
138 } | |
139 /* Otherwise, discard it */ | |
140 return; | |
141 }; | |
142 this.flushQ = function (session, n) { | |
143 /* Callback for when an unreceived message times out. | |
144 * n is the first empty space that will not be given up on. */ | |
145 if (!session.alive) | |
146 return; | |
147 session.nrecv++; | |
148 var next; | |
149 /* Clear the queue up to n */ | |
150 while (session.nrecv < n) { | |
151 next = session.msgQ.shift(); | |
152 if (next !== undefined) | |
153 session.child.stdin.write(next); | |
154 session.nrecv++; | |
155 } | |
156 /* Clear out anything that's ready. */ | |
157 while ((next = session.msgQ.shift()) !== undefined) { | |
158 session.child.stdin.write(next); | |
159 session.nrecv++; | |
160 } | |
161 /* Now set another timeout if necessary. */ | |
162 if (session.msgQ.length != 0) { | |
163 var nextn = session.nrecv + session.msgQ.length + 1; | |
164 session.Qtimeout = setTimeout(session.flushQ, 30000, session, nextn); | |
165 } | |
166 console.log("Flushing queue for session " + session.sessid); | |
113 }; | 167 }; |
114 this.read = function () { | 168 this.read = function () { |
115 if (this.data.length == 0) | 169 if (this.data.length == 0) |
116 return null; | 170 return null; |
117 var pos = 0; | 171 var pos = 0; |
532 sendError(res, 2, "incomplete byte"); | 586 sendError(res, 2, "incomplete byte"); |
533 return; | 587 return; |
534 } | 588 } |
535 keybuf = new Buffer(hexstr, "hex"); | 589 keybuf = new Buffer(hexstr, "hex"); |
536 /* TODO OoO correction */ | 590 /* TODO OoO correction */ |
537 cterm.write(keybuf); | 591 cterm.write(keybuf, formdata.n); |
538 } | 592 } |
539 readFeed(res, cterm); | 593 readFeed(res, cterm); |
540 } | 594 } |
541 else if (target == "/login") { | 595 else if (target == "/login") { |
542 login(req, res, formdata); | 596 login(req, res, formdata); |