diff rlgwebd.js @ 23:21de24c08aed

Implement message order correction on the server side. rlgwebd can now correct for the client's messages arriving in the wrong order, in theory. I haven't found a good way of testing it yet.
author John "Elwin" Edwards <elwin@sdf.org>
date Thu, 24 May 2012 11:36:57 -0700
parents 51d59a0e3b20
children 9b58f8d3ea70
line wrap: on
line diff
--- a/rlgwebd.js	Wed May 23 14:28:47 2012 -0700
+++ b/rlgwebd.js	Thu May 24 11:36:57 2012 -0700
@@ -61,6 +61,7 @@
   this.nsend = 0;
   this.nrecv = 0;
   this.msgQ = []
+  this.Qtimeout = null;
   /* Set up the sizes. */
   this.w = Math.floor(Number(dims[1]));
   if (!(this.w > 0 && this.w < 256))
@@ -106,10 +107,63 @@
     /* Wait for all the data to get collected */
     setTimeout(ss.cleanup, 1000);
   });
-  this.write = function (data) {
-    if (this.alive)
+  this.write = function (data, n) {
+    if (!this.alive || typeof (n) != "number") {
+      return;
+    }
+    //console.log("Got message " + n);
+    var oindex = n - this.nrecv;
+    if (oindex === 0) {
+      //console.log("Writing message " + n);
       this.child.stdin.write(data);
-    /* Otherwise, throw some kind of exception? */
+      this.nrecv++;
+      var next;
+      while ((next = this.msgQ.shift()) !== undefined) {
+        //console.log("Writing message " + this.nrecv);
+        this.child.stdin.write(next);
+        this.nrecv++;
+      }
+      if (this.msgQ.length == 0 && this.Qtimeout) {
+        clearTimeout(this.Qtimeout);
+        this.Qtimeout = null;
+      }
+    }
+    else if (oindex > 0 && oindex <= 1024) {
+      console.log("Stashing message " + n + " at " + (oindex - 1));
+      this.msgQ[oindex - 1] = data;
+      if (!this.Qtimeout) {
+        var nextn = this.nrecv + this.msgQ.length + 1;
+        this.Qtimeout = setTimeout(this.flushQ, 30000, this, nextn);
+      }
+    }
+    /* Otherwise, discard it */
+    return;
+  };
+  this.flushQ = function (session, n) {
+    /* Callback for when an unreceived message times out.
+     * n is the first empty space that will not be given up on. */
+    if (!session.alive)
+      return;
+    session.nrecv++;
+    var next;
+    /* Clear the queue up to n */
+    while (session.nrecv < n) {
+      next = session.msgQ.shift();
+      if (next !== undefined)
+        session.child.stdin.write(next);
+      session.nrecv++;
+    }
+    /* Clear out anything that's ready. */
+    while ((next = session.msgQ.shift()) !== undefined) {
+      session.child.stdin.write(next);
+      session.nrecv++;
+    }
+    /* Now set another timeout if necessary. */
+    if (session.msgQ.length != 0) {
+      var nextn = session.nrecv + session.msgQ.length + 1;
+      session.Qtimeout = setTimeout(session.flushQ, 30000, session, nextn);
+    }
+    console.log("Flushing queue for session " + session.sessid);
   };
   this.read = function () {
     if (this.data.length == 0)
@@ -534,7 +588,7 @@
           }
           keybuf = new Buffer(hexstr, "hex");
           /* TODO OoO correction */
-          cterm.write(keybuf);
+          cterm.write(keybuf, formdata.n);
         }
         readFeed(res, cterm);
       }