# HG changeset patch # User John "Elwin" Edwards # Date 1340052231 25200 # Node ID 96815eae4ebe1412631737bbedfdef120f48565f # Parent de01aafd4dd6865d4c549aa2139a80967d7f0531 RLG-Web: make multiple watchers possible. Split the TermSession class into the new TermSession, which handles the PTY, and client classes, which handle HTTP sessions. These are Player and Watcher. This allows multiple watchers per game, and other improvements. diff -r de01aafd4dd6 -r 96815eae4ebe rlgterm.js --- a/rlgterm.js Fri Jun 15 16:23:57 2012 -0700 +++ b/rlgterm.js Mon Jun 18 13:43:51 2012 -0700 @@ -151,60 +151,61 @@ * All non-special responseTexts should be handed directly to this function. */ function processMsg(msg) { - var msgDict; + var msgDicts; var havedata = null; // eventual return value try { - msgDict = JSON.parse(msg); + msgDicts = JSON.parse(msg); } catch (e) { if (e instanceof SyntaxError) return null; } - if (!msgDict.t) - return null; - else if (msgDict.t == "E") { - if (msgDict.c == 1 || msgDict.c == 6 || msgDict.c == 7) { - gameover(); - if (msgDict.c == 1) { - logout(); + if (msgDicts.length === 0) + return false; + for (var j = 0; j < msgDicts.length; j++) { + if (!msgDicts[j].t) + continue; + else if (msgDicts[j].t == "E") { + if (msgDicts[j].c == 1 || msgDicts[j].c == 6 || msgDicts[j].c == 7) { + gameover(); + if (msgDicts[j].c == 1) { + logout(); + } } + debug(1, "Server error: " + msgDicts[j].s); } - debug(1, "Server error: " + msgDict.s); - } - else if (msgDict.t == "n") { - havedata = false; - } - // A data message - else if (msgDict.t == "d"){ - if (msgDict.n === nrecv) { - writeData(msgDict.d); - nrecv++; - /* Process anything in the queue that's now ready. */ - var next; - while ((next = msgQ.shift()) !== undefined) { - writeData(next.d); + // A data message + else if (msgDicts[j].t == "d") { + if (msgDicts[j].n === nrecv) { + writeData(msgDicts[j].d); nrecv++; + /* Process anything in the queue that's now ready. */ + var next; + while ((next = msgQ.shift()) !== undefined) { + writeData(next.d); + nrecv++; + } } + else if (msgDicts[j].n > nrecv) { + /* The current message comes after one still missing. Queue this one + * for later use. */ + debug(1, "Got packet " + msgDicts[j].n + ", expected " + nrecv); + msgQ[msgDicts[j].n - nrecv - 1] = msgDicts[j]; + } + else { + /* This message's number was encountered previously. */ + debug(1, "Discarding packet " + msgDicts[j].n + ", expected " + nrecv); + } + havedata = true; } - else if (msgDict.n > nrecv) { - /* The current message comes after one still missing. Queue this one - * for later use. */ - debug(1, "Got packet " + msgDict.n + ", expected " + nrecv); - msgQ[msgDict.n - nrecv - 1] = msgDict; + else if (msgDicts[j].t == "T") { + setTitle(msgDicts[j].d); + } + else if (msgDicts[j].t == "q") { + gameover(); } else { - /* This message's number was encountered previously. */ - debug(1, "Discarding packet " + msgDict.n + ", expected " + nrecv); + debug(1, "Unrecognized server message " + msg); } - havedata = true; - } - else if (msgDict.t == "T") { - setTitle(msgDict.d); - } - else if (msgDict.t == "q") { - gameover(); - } - else { - debug(1, "Unrecognized server message " + msg); } return havedata; } @@ -488,7 +489,7 @@ if (req.readyState != 4 || req.status != 200) return; var reply = JSON.parse(req.responseText); - if (reply.t == 'l') { + if (reply.t == 's') { /* Success */ termemu.sessid = reply.id; termemu.resize(reply.h, reply.w); diff -r de01aafd4dd6 -r 96815eae4ebe rlgwebd.js --- a/rlgwebd.js Fri Jun 15 16:23:57 2012 -0700 +++ b/rlgwebd.js Mon Jun 18 13:43:51 2012 -0700 @@ -7,6 +7,7 @@ var url = require('url'); var path = require('path'); var fs = require('fs'); +var events = require('events'); var child_process = require('child_process'); var daemon = require(path.join(localModules, "daemon")); @@ -52,39 +53,46 @@ /* Global state */ var logins = {}; var sessions = {}; +var clients = {}; var allowlogin = true; +var nextsession = 0; -/* Constructor for TermSessions. Note that it opens the terminal and - * adds itself to the sessions dict. It currently assumes the user has - * been authenticated. +/* Constructor. A TermSession handles a pty and the game running on it. + * game: (String) Name of the game to launch. + * lkey: (String, key) The user's id, a key into logins. + * dims: (Array [Number, Number]) Height and width of the pty. + * handlers: (Object) Key-value pairs, event names and functions to + * install to handle them. + * Events: + * "open": Emitted on startup. Parameters: success (Boolean) + * "data": Data generated by child. Parameters: buf (Buffer) + * "exit": Child terminated. Parameters: exitcode, signal */ -/* TODO take a callback, or emit success/err events. */ -function TermSession(game, user, dims, lkey) { - /* First make sure starting the game will work. */ +function TermSession(game, lkey, dims, handlers) { + var ss = this; + /* Subclass EventEmitter to do the hard work. */ + events.EventEmitter.call(this); + for (var evname in handlers) + this.on(evname, handlers[evname]); + /* Don't launch anything that's not a real game. */ if (game in games) { this.game = games[game]; } else { - // TODO: throw an exception instead - return null; + this.emit('open', false); + return; } - this.player = String(user); - this.key = lkey; - /* This order seems to best avoid race conditions... */ - this.alive = false; - // A kludge until TermSession is rewritten to handle real watching. - this.sendq = false; - this.sessid = randkey(2); - while (this.sessid in sessions) { - this.sessid = randkey(2); + if (lkey in logins) { + this.key = lkey; + this.pname = logins[lkey].name; + } + else { + this.emit('open', false); + return; } /* Grab a spot in the sessions table. */ + this.sessid = nextsession++; sessions[this.sessid] = this; - /* State for messaging. */ - this.nsend = 0; - this.nrecv = 0; - this.msgQ = [] - this.Qtimeout = null; /* Set up the sizes. */ this.w = Math.floor(Number(dims[1])); if (!(this.w > 0 && this.w < 256)) @@ -98,22 +106,21 @@ childenv[key] = process.env[key]; } childenv["PTYHELPER"] = String(this.h) + "x" + String(this.w); - /* TODO handle tty-opening errors */ - /* TODO make argument-finding into a method */ - args = [this.game.path, "-n", user.toString()]; + args = [this.game.path, "-n", this.pname]; this.child = child_process.spawn("/bin/ptyhelper", args, {"env": childenv}); - var ss = this; - this.alive = true; - this.data = []; + this.emit('open', true); /* Set up the lockfile and ttyrec */ var ts = timestamp(); var progressdir = "/dgldir/inprogress-" + this.game.uname; - this.lock = path.join(progressdir, this.player + ":node:" + ts + ".ttyrec"); + this.lock = path.join(progressdir, this.pname + ":node:" + ts + ".ttyrec"); var lmsg = this.child.pid.toString() + '\n' + this.w + '\n' + this.h + '\n'; fs.writeFile(this.lock, lmsg, "utf8"); - var ttyrec = path.join("/dgldir/ttyrec", this.player, this.game.uname, + var ttyrec = path.join("/dgldir/ttyrec", this.pname, this.game.uname, ts + ".ttyrec"); this.record = fs.createWriteStream(ttyrec, { mode: 0664 }); + logins[lkey].sessions.push(this.sessid); + tslog("%s playing %s (index %d, pid %d)", this.pname, this.game.uname, + this.sessid, this.child.pid); /* END setup */ function ttyrec_chunk(buf) { var ts = new Date(); @@ -123,115 +130,180 @@ chunk.writeUInt32LE(1000 * (ts.getTime() % 1000), 4); chunk.writeUInt32LE(buf.length, 8); buf.copy(chunk, 12); - ss.data.push(chunk); ss.record.write(chunk); + ss.emit('data', buf); } this.child.stdout.on("data", ttyrec_chunk); this.child.stderr.on("data", ttyrec_chunk); + this.write = function(data) { + this.child.stdin.write(data); + }; this.child.on("exit", function (code, signal) { - ss.exitcode = (code != null ? code : 255); - ss.alive = false; fs.unlink(ss.lock); - /* Wait for all the data to get collected */ - setTimeout(ss.cleanup, 1000); + ss.record.end(); + ss.emit('exit', code, signal); + var id = ss.sessid; + delete sessions[id]; + tslog("Session %s ended.", id); }); + this.close = function () { + this.child.kill('SIGHUP'); + }; +} +TermSession.prototype = new events.EventEmitter(); + +function Watcher(session) { + var ss = this; // that + this.session = session; + this.alive = true; + /* State for messaging. */ + this.nsend = 0; + this.sendQ = []; + /* Get a place in the table. */ + this.id = randkey(2); + while (this.id in clients) { + this.id = randkey(2); + } + clients[this.id] = this; + function dataH(buf) { + var reply = {}; + reply.t = "d"; + reply.n = ss.nsend++; + reply.d = buf.toString("hex"); + ss.sendQ.push(reply); + } + function exitH(code, signal) { + ss.alive = false; + ss.sendQ.push({"t": "q"}); + } + session.on('data', dataH); + session.on('exit', exitH); + this.read = function() { + /* Returns an array of all outstanding messages, empty if none. */ + var temp = this.sendQ; + this.sendQ = []; + /* Clean up if finished. */ + if (!this.alive) { + delete clients[this.id]; + } + return temp; + }; + this.quit = function() { + this.session.removeListener('data', dataH); + this.session.removeListener('exit', exitH); + delete clients[this.id]; + }; +} + +function Player(gamename, lkey, dims, callback) { + var ss = this; + this.alive = false; + /* State for messaging. */ + this.nsend = 0; + this.nrecv = 0; + this.sendQ = []; + this.recvQ = [] + this.Qtimeout = null; + /* Get a place in the table. */ + this.id = randkey(2); + while (this.id in clients) { + this.id = randkey(2); + } + clients[this.id] = this; + + this.read = function() { + var temp = this.sendQ; + this.sendQ = []; + /* Clean up if finished. */ + if (!this.alive) { + clearTimeout(this.Qtimeout); + delete clients[this.id]; + } + return temp; + }; this.write = function (data, n) { if (!this.alive || typeof (n) != "number") { return; } - //console.log("Got message " + n); var oindex = n - this.nrecv; if (oindex === 0) { - //console.log("Writing message " + n); - this.child.stdin.write(data); + this.session.write(data); this.nrecv++; var next; - while ((next = this.msgQ.shift()) !== undefined) { - //console.log("Writing message " + this.nrecv); - this.child.stdin.write(next); + while ((next = this.recvQ.shift()) !== undefined) { + this.session.write(next); this.nrecv++; } - if (this.msgQ.length == 0 && this.Qtimeout) { + if (this.recvQ.length == 0 && this.Qtimeout) { clearTimeout(this.Qtimeout); this.Qtimeout = null; } } else if (oindex > 0 && oindex <= 1024) { - tslog("Stashing message %d at %d", n, oindex - 1); - this.msgQ[oindex - 1] = data; + tslog("Client %s: Stashing message %d at %d", this.id, n, oindex - 1); + this.recvQ[oindex - 1] = data; if (!this.Qtimeout) { - var nextn = this.nrecv + this.msgQ.length + 1; + var nextn = this.nrecv + this.recvQ.length + 1; this.Qtimeout = setTimeout(this.flushQ, 30000, this, nextn); } } /* Otherwise, discard it */ return; }; - this.flushQ = function (session, n) { + this.flushQ = function (client, n) { /* Callback for when an unreceived message times out. * n is the first empty space that will not be given up on. */ - if (!session.alive) + if (!client.alive || client.nrecv >= n) return; - session.nrecv++; + client.nrecv++; var next; /* Clear the queue up to n */ - while (session.nrecv < n) { - next = session.msgQ.shift(); + while (client.nrecv < n) { + next = client.recvQ.shift(); if (next !== undefined) - session.child.stdin.write(next); - session.nrecv++; + client.session.write(next); + client.nrecv++; } /* Clear out anything that's ready. */ - while ((next = session.msgQ.shift()) !== undefined) { - session.child.stdin.write(next); - session.nrecv++; + while ((next = client.recvQ.shift()) !== undefined) { + client.session.write(next); + client.nrecv++; } /* Now set another timeout if necessary. */ - if (session.msgQ.length != 0) { - var nextn = session.nrecv + session.msgQ.length + 1; - session.Qtimeout = setTimeout(session.flushQ, 30000, session, nextn); + if (client.recvQ.length != 0) { + var nextn = client.nrecv + client.recvQ.length + 1; + client.Qtimeout = setTimeout(client.flushQ, 30000, client, nextn); } - tslog("Flushing queue for session %s", session.sessid); + tslog("Flushing queue for player %s", player.id); }; - this.read = function () { - if (this.data.length == 0) - return null; - var pos = 0; - var i = 0; - for (i = 0; i < this.data.length; i++) - pos += this.data[i].length - 12; - var nbuf = new Buffer(pos); - var tptr; - pos = 0; - while (this.data.length > 0) { - tptr = this.data.shift(); - tptr.copy(nbuf, pos, 12); - pos += tptr.length - 12; + this.quit = function() { + if (this.alive) + this.session.close(); + }; + function openH(success) { + if (success) { + ss.alive = true; } - return nbuf; - }; - this.close = function () { - if (this.alive) - this.child.kill('SIGHUP'); - }; - this.cleanup = function () { - /* Call this when the child is dead. */ - if (ss.alive) - return; - ss.record.end(); - /* Give the client a chance to read any leftover data. */ - if (ss.data.length > 0 || !ss.sendq) - setTimeout(ss.remove, 8000); - else - ss.remove(); - }; - this.remove = function () { - var id = ss.sessid; - delete sessions[id]; - tslog("Session %s removed.", id); - }; + callback(ss, success); + } + function dataH(chunk) { + var reply = {}; + reply.t = "d"; + reply.n = ss.nsend++; + reply.d = chunk.toString("hex"); + ss.sendQ.push(reply); + } + function exitH(code, signal) { + ss.alive = false; + ss.sendQ.push({"t": "q"}); + } + var handlers = {'open': openH, 'data': dataH, 'exit': exitH}; + this.session = new TermSession(gamename, lkey, dims, handlers); } +/* Some functions which check whether a player is currently playing or + * has a saved game. Maybe someday they will provide information on + * the game. */ function checkprogress(user, game, callback, args) { var progressdir = "/dgldir/inprogress-" + game.uname; fs.readdir(progressdir, function(err, files) { @@ -359,11 +431,9 @@ function reaper() { var now = new Date(); function reapcheck(session) { - if (!session.alive) - return; fs.fstat(session.record.fd, function (err, stats) { if (!err && now - stats.mtime > playtimeout) { - tslog("Reaping %s", session.sessid); + tslog("Reaping session %s", session.sessid); /* Dissociate it with its login name. */ var sn = logins[session.key].sessions.indexOf(session.sessid); if (sn >= 0) { @@ -403,7 +473,7 @@ if (expired.length > 0) { logins[lkey].ts = new Date(now); for (var j = 0; j < expired.length; j++) { - targarray.splice(targarray.indexOf(expired[j], 1)); + targarray.splice(targarray.indexOf(expired[j]), 1); } } } @@ -412,15 +482,15 @@ function login(req, res, formdata) { if (!allowlogin) { - sendError(res, 6, null); + sendError(res, 6, null, false); return; } if (!("name" in formdata)) { - sendError(res, 2, "Username not given."); + sendError(res, 2, "Username not given.", false); return; } else if (!("pw" in formdata)) { - sendError(res, 2, "Password not given."); + sendError(res, 2, "Password not given.", false); return; } var username = String(formdata["name"]); @@ -478,6 +548,7 @@ } var username = logins[lkey].name; var gname = formdata["game"]; + // If dims are not given or invalid, the constructor will handle it. var dims = [formdata["h"], formdata["w"]]; if (!(gname in games)) { sendError(res, 2, "No such game: " + gname); @@ -492,23 +563,41 @@ return; } // Game starting has been approved. - var nsession = new TermSession(gname, username, dims, lkey); - if (nsession) { - res.writeHead(200, {'Content-Type': 'application/json'}); - var reply = {"t": "l", "id": nsession.sessid, "w": nsession.w, "h": - nsession.h}; - res.write(JSON.stringify(reply)); - res.end(); - tslog("%s playing %s (key %s, pid %d)", username, gname, - nsession.sessid, nsession.child.pid); - logins[lkey].sessions.push(nsession.sessid); - } - else { - sendError(res, 5, "Failed to open TTY"); - tslog("Unable to allocate TTY for %s", gname); - } + var respondlaunch = function(nclient, success) { + if (success) { + res.writeHead(200, {'Content-Type': 'application/json'}); + var reply = {"t": "s", "id": nclient.id, "w": nclient.w, "h": + nclient.h}; + res.write(JSON.stringify(reply)); + res.end(); + } + else { + sendError(res, 5, "Failed to open TTY"); + tslog("Unable to allocate TTY for %s", gname); + } + }; + new Player(gname, lkey, dims, respondlaunch); + }; + checkprogress(username, games[gname], launch, []); +} + +function watch(req, res, formdata) { + if (!("n" in formdata)) { + sendError(res, 2, "Game number not given"); + return; } - checkprogress(username, games[gname], launch, []); + var gamenumber = Number(formdata["n"]); + if (!(gamenumber in sessions)) { + sendError(res, 7); + return; + } + var session = sessions[gamenumber]; + var watch = new Watcher(session); + var reply = {"t": "w", "id": watch.id, "w": session.w, "h": session.h}; + res.writeHead(200, {'Content-Type': 'application/json'}); + res.write(JSON.stringify(reply)); + res.end(); + tslog("Game %d is being watched (key %s)", gamenumber, watch.id); } /* Sets things up for a new user, like dgamelaunch's commands[register] */ @@ -574,27 +663,27 @@ return; } -function endgame(term, res) { - if (!term.alive) { - sendError(res, 7, null); +/* Ends the game, obviously. Less obviously, stops watching the game if + * the client is a Watcher instead of a Player. */ +function endgame(client, res) { + if (!client.alive) { + sendError(res, 7, null, true); return; } - term.close(); - var resheaders = {'Content-Type': 'application/json'}; - res.writeHead(200, resheaders); - res.write(JSON.stringify({"t": "q"})); - res.end(); - term.sendq = true; + client.quit(); + // Give things some time to happen. + if (client instanceof Player) + setTimeout(readFeed, 200, client, res); return; } -function findTermSession(formdata) { +function findClient(formdata, playersOnly) { if (typeof(formdata) != "object") return null; if ("id" in formdata) { - var sessid = formdata["id"]; - if (sessid in sessions) { - return sessions[sessid]; + var id = formdata["id"]; + if (id in clients && (!playersOnly || clients[id] instanceof Player)) { + return clients[id]; } } return null; @@ -648,47 +737,29 @@ return; } -function readFeed(res, term) { - if (term) { - var reply = {}; - var result = term.read(); - if (result == null) { - if (term.alive) - reply.t = "n"; - else { - if (allowlogin) { - reply.t = "q"; - term.sendq = true; - } - else { - sendError(res, 6, null); - return; - } - } - } - else { - reply.t = "d"; - reply.n = term.nsend++; - reply.d = result.toString("hex"); - } - res.writeHead(200, { "Content-Type": "application/json" }); - res.write(JSON.stringify(reply)); - res.end(); +function readFeed(client, res) { + if (!client) { + sendError(res, 7, null, true); + return; } - else { - sendError(res, 7, null); + var msgs = client.read(); + if (!allowlogin && !msgs.length) { + sendError(res, 6, null, true); + return; } + res.writeHead(200, { "Content-Type": "application/json" }); + res.write(JSON.stringify(msgs)); + res.end(); } function statusmsg(req, res) { var reply = {"s": allowlogin, "g": []}; for (var sessid in sessions) { - if (sessions[sessid].alive) { - var gamedesc = {}; - gamedesc["p"] = sessions[sessid].player; - gamedesc["g"] = sessions[sessid].game.name; - reply["g"].push(gamedesc); - } + var gamedesc = {}; + gamedesc["n"] = sessid; + gamedesc["p"] = sessions[sessid].pname; + gamedesc["g"] = sessions[sessid].game.name; + reply["g"].push(gamedesc); } res.writeHead(200, { "Content-Type": "application/json" }); if (req.method != 'HEAD') @@ -723,7 +794,7 @@ "Login failed", "Already playing", "Game launch failed", "Server shutting down", "Game not in progress" ]; -function sendError(res, ecode, msg) { +function sendError(res, ecode, msg, box) { res.writeHead(200, { "Content-Type": "application/json" }); var edict = {"t": "E"}; if (!(ecode < errorcodes.length && ecode > 0)) @@ -732,10 +803,14 @@ edict["s"] = errorcodes[ecode]; if (msg) edict["s"] += ": " + msg; - res.write(JSON.stringify(edict)); + if (box) + res.write(JSON.stringify([edict])); + else + res.write(JSON.stringify(edict)); res.end(); } +// TODO new-objects done to here function webHandler(req, res) { /* default headers for the response */ var resheaders = {'Content-Type': 'text/html'}; @@ -753,30 +828,33 @@ function respond() { formdata = getMsg(reqbody); var target = url.parse(req.url).pathname; - var cterm = findTermSession(formdata); /* First figure out if the client is POSTing to a command interface. */ if (req.method == 'POST') { if (target == '/feed') { - if (!cterm) { - sendError(res, 7, null); + var client = findClient(formdata, false); + if (!client) { + sendError(res, 7, null, true); return; } if (formdata.t == "q") { /* The client wants to terminate the process. */ - endgame(cterm, res); + endgame(client, res); } else if (formdata.t == "d" && typeof(formdata.d) == "string") { + if (!(client instanceof Player)) { + sendError(res, 7, "Watching", true); + return; + } /* process the keys */ hexstr = formdata.d.replace(/[^0-9a-f]/gi, ""); if (hexstr.length % 2 != 0) { - sendError(res, 2, "incomplete byte"); + sendError(res, 2, "incomplete byte", true); return; } keybuf = new Buffer(hexstr, "hex"); - /* TODO OoO correction */ - cterm.write(keybuf, formdata.n); + client.write(keybuf, formdata.n); } - readFeed(res, cterm); + readFeed(client, res); } else if (target == "/login") { login(req, res, formdata); @@ -787,6 +865,9 @@ else if (target == "/play") { startgame(req, res, formdata); } + else if (target == "/watch") { + watch(req, res, formdata); + } else { res.writeHead(405, resheaders); res.end(); @@ -797,13 +878,10 @@ if (req.method == 'HEAD') { res.writeHead(200, {"Content-Type": "application/json"}); res.end(); - return; } - if (!cterm) { - sendError(res, 7, null); - return; - } - readFeed(res, cterm); + else + sendError(res, 7, null, true); + return; } else if (target == '/status') { statusmsg(req, res); @@ -841,14 +919,13 @@ for (var sessid in sessions) { sessions[sessid].close(); } - setTimeout(shutdown, 10000); + setTimeout(shutdown, 2000); } } process.on("exit", function () { for (var sessid in sessions) { - if (sessions[sessid].alive) - sessions[sessid].child.kill('SIGHUP'); + sessions[sessid].child.kill('SIGHUP'); } tslog("Quitting..."); return;