diff rlgwebd.js @ 55:96815eae4ebe

RLG-Web: make multiple watchers possible. Split the TermSession class into the new TermSession, which handles the PTY, and client classes, which handle HTTP sessions. These are Player and Watcher. This allows multiple watchers per game, and other improvements.
author John "Elwin" Edwards <elwin@sdf.org>
date Mon, 18 Jun 2012 13:43:51 -0700
parents 423ef87ddc9b
children 31bb3cf4f25f
line wrap: on
line diff
--- a/rlgwebd.js	Fri Jun 15 16:23:57 2012 -0700
+++ b/rlgwebd.js	Mon Jun 18 13:43:51 2012 -0700
@@ -7,6 +7,7 @@
 var url = require('url');
 var path = require('path');
 var fs = require('fs');
+var events = require('events');
 var child_process = require('child_process');
 var daemon = require(path.join(localModules, "daemon"));
 
@@ -52,39 +53,46 @@
 /* Global state */
 var logins = {};
 var sessions = {};
+var clients = {};
 var allowlogin = true;
+var nextsession = 0;
 
-/* Constructor for TermSessions.  Note that it opens the terminal and 
- * adds itself to the sessions dict. It currently assumes the user has
- * been authenticated.
+/* Constructor.  A TermSession handles a pty and the game running on it.
+ *   game: (String) Name of the game to launch.
+ *   lkey: (String, key) The user's id, a key into logins.
+ *   dims: (Array [Number, Number]) Height and width of the pty.
+ *   handlers: (Object) Key-value pairs, event names and functions to
+ *           install to handle them.
+ *  Events:
+ *   "open": Emitted on startup.  Parameters: success (Boolean)
+ *   "data": Data generated by child. Parameters: buf (Buffer)
+ *   "exit": Child terminated. Parameters: exitcode, signal
  */
-/* TODO take a callback, or emit success/err events. */
-function TermSession(game, user, dims, lkey) {
-  /* First make sure starting the game will work. */
+function TermSession(game, lkey, dims, handlers) {
+  var ss = this;
+  /* Subclass EventEmitter to do the hard work. */
+  events.EventEmitter.call(this);
+  for (var evname in handlers)
+    this.on(evname, handlers[evname]);
+  /* Don't launch anything that's not a real game. */
   if (game in games) {
     this.game = games[game];
   }
   else {
-    // TODO: throw an exception instead
-    return null;
+    this.emit('open', false);
+    return;
   }
-  this.player = String(user);
-  this.key = lkey;
-  /* This order seems to best avoid race conditions... */
-  this.alive = false;
-  // A kludge until TermSession is rewritten to handle real watching.
-  this.sendq = false;
-  this.sessid = randkey(2);
-  while (this.sessid in sessions) {
-    this.sessid = randkey(2);
+  if (lkey in logins) {
+    this.key = lkey;
+    this.pname = logins[lkey].name;
+  }
+  else {
+    this.emit('open', false);
+    return;
   }
   /* Grab a spot in the sessions table. */
+  this.sessid = nextsession++;
   sessions[this.sessid] = this;
-  /* State for messaging. */
-  this.nsend = 0;
-  this.nrecv = 0;
-  this.msgQ = []
-  this.Qtimeout = null;
   /* Set up the sizes. */
   this.w = Math.floor(Number(dims[1]));
   if (!(this.w > 0 && this.w < 256))
@@ -98,22 +106,21 @@
     childenv[key] = process.env[key];
   }
   childenv["PTYHELPER"] = String(this.h) + "x" + String(this.w);
-  /* TODO handle tty-opening errors */
-  /* TODO make argument-finding into a method */
-  args = [this.game.path, "-n", user.toString()];
+  args = [this.game.path, "-n", this.pname];
   this.child = child_process.spawn("/bin/ptyhelper", args, {"env": childenv});
-  var ss = this;
-  this.alive = true;
-  this.data = [];
+  this.emit('open', true);
   /* Set up the lockfile and ttyrec */
   var ts = timestamp();
   var progressdir = "/dgldir/inprogress-" + this.game.uname;
-  this.lock = path.join(progressdir, this.player + ":node:" + ts + ".ttyrec");
+  this.lock = path.join(progressdir, this.pname + ":node:" + ts + ".ttyrec");
   var lmsg = this.child.pid.toString() + '\n' + this.w + '\n' + this.h + '\n'; 
   fs.writeFile(this.lock, lmsg, "utf8"); 
-  var ttyrec = path.join("/dgldir/ttyrec", this.player, this.game.uname, 
+  var ttyrec = path.join("/dgldir/ttyrec", this.pname, this.game.uname, 
                ts + ".ttyrec");
   this.record = fs.createWriteStream(ttyrec, { mode: 0664 });
+  logins[lkey].sessions.push(this.sessid);
+  tslog("%s playing %s (index %d, pid %d)", this.pname, this.game.uname, 
+            this.sessid, this.child.pid);
   /* END setup */
   function ttyrec_chunk(buf) {
     var ts = new Date();
@@ -123,115 +130,180 @@
     chunk.writeUInt32LE(1000 * (ts.getTime() % 1000), 4);
     chunk.writeUInt32LE(buf.length, 8);
     buf.copy(chunk, 12);
-    ss.data.push(chunk);
     ss.record.write(chunk);
+    ss.emit('data', buf);
   }
   this.child.stdout.on("data", ttyrec_chunk);
   this.child.stderr.on("data", ttyrec_chunk);
+  this.write = function(data) {
+    this.child.stdin.write(data);
+  };
   this.child.on("exit", function (code, signal) {
-    ss.exitcode = (code != null ? code : 255);
-    ss.alive = false;
     fs.unlink(ss.lock);
-    /* Wait for all the data to get collected */
-    setTimeout(ss.cleanup, 1000);
+    ss.record.end();
+    ss.emit('exit', code, signal);
+    var id = ss.sessid;
+    delete sessions[id];
+    tslog("Session %s ended.", id);
   });
+  this.close = function () {
+    this.child.kill('SIGHUP');
+  };
+}
+TermSession.prototype = new events.EventEmitter();
+
+function Watcher(session) {
+  var ss = this; // that
+  this.session = session;
+  this.alive = true;
+  /* State for messaging. */
+  this.nsend = 0;
+  this.sendQ = [];
+  /* Get a place in the table. */
+  this.id = randkey(2);
+  while (this.id in clients) {
+    this.id = randkey(2);
+  }
+  clients[this.id] = this;
+  function dataH(buf) {
+    var reply = {};
+    reply.t = "d";
+    reply.n = ss.nsend++;
+    reply.d = buf.toString("hex");
+    ss.sendQ.push(reply);
+  }
+  function exitH(code, signal) {
+    ss.alive = false;
+    ss.sendQ.push({"t": "q"});
+  }
+  session.on('data', dataH);
+  session.on('exit', exitH);
+  this.read = function() {
+    /* Returns an array of all outstanding messages, empty if none. */
+    var temp = this.sendQ;
+    this.sendQ = [];
+    /* Clean up if finished. */
+    if (!this.alive) {
+      delete clients[this.id];
+    }
+    return temp;
+  };
+  this.quit = function() {
+    this.session.removeListener('data', dataH);
+    this.session.removeListener('exit', exitH);
+    delete clients[this.id];
+  };
+}
+
+function Player(gamename, lkey, dims, callback) {
+  var ss = this;
+  this.alive = false;
+  /* State for messaging. */
+  this.nsend = 0;
+  this.nrecv = 0;
+  this.sendQ = [];
+  this.recvQ = []
+  this.Qtimeout = null;
+  /* Get a place in the table. */
+  this.id = randkey(2);
+  while (this.id in clients) {
+    this.id = randkey(2);
+  }
+  clients[this.id] = this;
+
+  this.read = function() {
+    var temp = this.sendQ;
+    this.sendQ = [];
+    /* Clean up if finished. */
+    if (!this.alive) {
+      clearTimeout(this.Qtimeout);
+      delete clients[this.id];
+    }
+    return temp;
+  };
   this.write = function (data, n) {
     if (!this.alive || typeof (n) != "number") {
       return;
     }
-    //console.log("Got message " + n);
     var oindex = n - this.nrecv;
     if (oindex === 0) {
-      //console.log("Writing message " + n);
-      this.child.stdin.write(data);
+      this.session.write(data);
       this.nrecv++;
       var next;
-      while ((next = this.msgQ.shift()) !== undefined) {
-        //console.log("Writing message " + this.nrecv);
-        this.child.stdin.write(next);
+      while ((next = this.recvQ.shift()) !== undefined) {
+        this.session.write(next);
         this.nrecv++;
       }
-      if (this.msgQ.length == 0 && this.Qtimeout) {
+      if (this.recvQ.length == 0 && this.Qtimeout) {
         clearTimeout(this.Qtimeout);
         this.Qtimeout = null;
       }
     }
     else if (oindex > 0 && oindex <= 1024) {
-      tslog("Stashing message %d at %d", n, oindex - 1);
-      this.msgQ[oindex - 1] = data;
+      tslog("Client %s: Stashing message %d at %d", this.id,  n, oindex - 1);
+      this.recvQ[oindex - 1] = data;
       if (!this.Qtimeout) {
-        var nextn = this.nrecv + this.msgQ.length + 1;
+        var nextn = this.nrecv + this.recvQ.length + 1;
         this.Qtimeout = setTimeout(this.flushQ, 30000, this, nextn);
       }
     }
     /* Otherwise, discard it */
     return;
   };
-  this.flushQ = function (session, n) {
+  this.flushQ = function (client, n) {
     /* Callback for when an unreceived message times out.
      * n is the first empty space that will not be given up on. */
-    if (!session.alive)
+    if (!client.alive || client.nrecv >= n)
       return;
-    session.nrecv++;
+    client.nrecv++;
     var next;
     /* Clear the queue up to n */
-    while (session.nrecv < n) {
-      next = session.msgQ.shift();
+    while (client.nrecv < n) {
+      next = client.recvQ.shift();
       if (next !== undefined)
-        session.child.stdin.write(next);
-      session.nrecv++;
+        client.session.write(next);
+      client.nrecv++;
     }
     /* Clear out anything that's ready. */
-    while ((next = session.msgQ.shift()) !== undefined) {
-      session.child.stdin.write(next);
-      session.nrecv++;
+    while ((next = client.recvQ.shift()) !== undefined) {
+      client.session.write(next);
+      client.nrecv++;
     }
     /* Now set another timeout if necessary. */
-    if (session.msgQ.length != 0) {
-      var nextn = session.nrecv + session.msgQ.length + 1;
-      session.Qtimeout = setTimeout(session.flushQ, 30000, session, nextn);
+    if (client.recvQ.length != 0) {
+      var nextn = client.nrecv + client.recvQ.length + 1;
+      client.Qtimeout = setTimeout(client.flushQ, 30000, client, nextn);
     }
-    tslog("Flushing queue for session %s", session.sessid);
+    tslog("Flushing queue for player %s", player.id);
   };
-  this.read = function () {
-    if (this.data.length == 0)
-      return null;
-    var pos = 0;
-    var i = 0;
-    for (i = 0; i < this.data.length; i++)
-      pos += this.data[i].length - 12;
-    var nbuf = new Buffer(pos);
-    var tptr;
-    pos = 0;
-    while (this.data.length > 0) {
-      tptr = this.data.shift();
-      tptr.copy(nbuf, pos, 12);
-      pos += tptr.length - 12;
+  this.quit = function() {
+    if (this.alive)
+      this.session.close();
+  };
+  function openH(success) {
+    if (success) {
+      ss.alive = true;
     }
-    return nbuf;
-  };
-  this.close = function () {
-    if (this.alive)
-      this.child.kill('SIGHUP');
-  };
-  this.cleanup = function () {
-    /* Call this when the child is dead. */
-    if (ss.alive)
-      return;
-    ss.record.end();
-    /* Give the client a chance to read any leftover data. */
-    if (ss.data.length > 0 || !ss.sendq)
-      setTimeout(ss.remove, 8000);
-    else
-      ss.remove();
-  };
-  this.remove = function () {
-    var id = ss.sessid;
-    delete sessions[id];
-    tslog("Session %s removed.", id);
-  };
+    callback(ss, success);
+  }
+  function dataH(chunk) {
+    var reply = {};
+    reply.t = "d";
+    reply.n = ss.nsend++;
+    reply.d = chunk.toString("hex");
+    ss.sendQ.push(reply);
+  }
+  function exitH(code, signal) {
+    ss.alive = false;
+    ss.sendQ.push({"t": "q"});
+  }
+  var handlers = {'open': openH, 'data': dataH, 'exit': exitH};
+  this.session = new TermSession(gamename, lkey, dims, handlers);
 }
 
+/* Some functions which check whether a player is currently playing or 
+ * has a saved game.  Maybe someday they will provide information on 
+ * the game. */
 function checkprogress(user, game, callback, args) {