comparison rlgwebd.js @ 23:21de24c08aed

Implement message order correction on the server side. rlgwebd can now correct for the client's messages arriving in the wrong order, in theory. I haven't found a good way of testing it yet.
author John "Elwin" Edwards <elwin@sdf.org>
date Thu, 24 May 2012 11:36:57 -0700
parents 51d59a0e3b20
children 9b58f8d3ea70
comparison
equal deleted inserted replaced
22:51d59a0e3b20 23:21de24c08aed
59 sessions[this.sessid] = this; 59 sessions[this.sessid] = this;
60 /* State for messaging. */ 60 /* State for messaging. */
61 this.nsend = 0; 61 this.nsend = 0;
62 this.nrecv = 0; 62 this.nrecv = 0;
63 this.msgQ = [] 63 this.msgQ = []
64 this.Qtimeout = null;
64 /* Set up the sizes. */ 65 /* Set up the sizes. */
65 this.w = Math.floor(Number(dims[1])); 66 this.w = Math.floor(Number(dims[1]));
66 if (!(this.w > 0 && this.w < 256)) 67 if (!(this.w > 0 && this.w < 256))
67 this.w = 80; 68 this.w = 80;
68 this.h = Math.floor(Number(dims[0])); 69 this.h = Math.floor(Number(dims[0]));
104 ss.alive = false; 105 ss.alive = false;
105 fs.unlink(ss.lock); 106 fs.unlink(ss.lock);
106 /* Wait for all the data to get collected */ 107 /* Wait for all the data to get collected */
107 setTimeout(ss.cleanup, 1000); 108 setTimeout(ss.cleanup, 1000);
108 }); 109 });
109 this.write = function (data) { 110 this.write = function (data, n) {
110 if (this.alive) 111 if (!this.alive || typeof (n) != "number") {
112 return;
113 }
114 //console.log("Got message " + n);
115 var oindex = n - this.nrecv;
116 if (oindex === 0) {
117 //console.log("Writing message " + n);
111 this.child.stdin.write(data); 118 this.child.stdin.write(data);
112 /* Otherwise, throw some kind of exception? */ 119 this.nrecv++;
120 var next;
121 while ((next = this.msgQ.shift()) !== undefined) {
122 //console.log("Writing message " + this.nrecv);
123 this.child.stdin.write(next);
124 this.nrecv++;
125 }
126 if (this.msgQ.length == 0 && this.Qtimeout) {
127 clearTimeout(this.Qtimeout);
128 this.Qtimeout = null;
129 }
130 }
131 else if (oindex > 0 && oindex <= 1024) {
132 console.log("Stashing message " + n + " at " + (oindex - 1));
133 this.msgQ[oindex - 1] = data;
134 if (!this.Qtimeout) {
135 var nextn = this.nrecv + this.msgQ.length + 1;
136 this.Qtimeout = setTimeout(this.flushQ, 30000, this, nextn);
137 }
138 }
139 /* Otherwise, discard it */
140 return;
141 };
142 this.flushQ = function (session, n) {
143 /* Callback for when an unreceived message times out.
144 * n is the first empty space that will not be given up on. */
145 if (!session.alive)
146 return;
147 session.nrecv++;
148 var next;
149 /* Clear the queue up to n */
150 while (session.nrecv < n) {
151 next = session.msgQ.shift();
152 if (next !== undefined)
153 session.child.stdin.write(next);
154 session.nrecv++;
155 }
156 /* Clear out anything that's ready. */
157 while ((next = session.msgQ.shift()) !== undefined) {
158 session.child.stdin.write(next);
159 session.nrecv++;
160 }
161 /* Now set another timeout if necessary. */
162 if (session.msgQ.length != 0) {
163 var nextn = session.nrecv + session.msgQ.length + 1;
164 session.Qtimeout = setTimeout(session.flushQ, 30000, session, nextn);
165 }
166 console.log("Flushing queue for session " + session.sessid);
113 }; 167 };
114 this.read = function () { 168 this.read = function () {
115 if (this.data.length == 0) 169 if (this.data.length == 0)
116 return null; 170 return null;
117 var pos = 0; 171 var pos = 0;
532 sendError(res, 2, "incomplete byte"); 586 sendError(res, 2, "incomplete byte");
533 return; 587 return;
534 } 588 }
535 keybuf = new Buffer(hexstr, "hex"); 589 keybuf = new Buffer(hexstr, "hex");
536 /* TODO OoO correction */ 590 /* TODO OoO correction */
537 cterm.write(keybuf); 591 cterm.write(keybuf, formdata.n);
538 } 592 }
539 readFeed(res, cterm); 593 readFeed(res, cterm);
540 } 594 }
541 else if (target == "/login") { 595 else if (target == "/login") {
542 login(req, res, formdata); 596 login(req, res, formdata);