comparison rlgwebd.js @ 55:96815eae4ebe

RLG-Web: make multiple watchers possible. Split the TermSession class into the new TermSession, which handles the PTY, and client classes, which handle HTTP sessions. These are Player and Watcher. This allows multiple watchers per game, and other improvements.
author John "Elwin" Edwards <elwin@sdf.org>
date Mon, 18 Jun 2012 13:43:51 -0700
parents 423ef87ddc9b
children 31bb3cf4f25f
comparison
equal deleted inserted replaced
54:de01aafd4dd6 55:96815eae4ebe
5 var http = require('http'); 5 var http = require('http');
6 var net = require('net'); 6 var net = require('net');
7 var url = require('url'); 7 var url = require('url');
8 var path = require('path'); 8 var path = require('path');
9 var fs = require('fs'); 9 var fs = require('fs');
10 var events = require('events');
10 var child_process = require('child_process'); 11 var child_process = require('child_process');
11 var daemon = require(path.join(localModules, "daemon")); 12 var daemon = require(path.join(localModules, "daemon"));
12 13
13 /* Configuration variables */ 14 /* Configuration variables */
14 // These first two files are NOT in the chroot. 15 // These first two files are NOT in the chroot.
50 }; 51 };
51 52
52 /* Global state */ 53 /* Global state */
53 var logins = {}; 54 var logins = {};
54 var sessions = {}; 55 var sessions = {};
56 var clients = {};
55 var allowlogin = true; 57 var allowlogin = true;
56 58 var nextsession = 0;
57 /* Constructor for TermSessions. Note that it opens the terminal and 59
58 * adds itself to the sessions dict. It currently assumes the user has 60 /* Constructor. A TermSession handles a pty and the game running on it.
59 * been authenticated. 61 * game: (String) Name of the game to launch.
62 * lkey: (String, key) The user's id, a key into logins.
63 * dims: (Array [Number, Number]) Height and width of the pty.
64 * handlers: (Object) Key-value pairs, event names and functions to
65 * install to handle them.
66 * Events:
67 * "open": Emitted on startup. Parameters: success (Boolean)
68 * "data": Data generated by child. Parameters: buf (Buffer)
69 * "exit": Child terminated. Parameters: exitcode, signal
60 */ 70 */
61 /* TODO take a callback, or emit success/err events. */ 71 function TermSession(game, lkey, dims, handlers) {
62 function TermSession(game, user, dims, lkey) { 72 var ss = this;
63 /* First make sure starting the game will work. */ 73 /* Subclass EventEmitter to do the hard work. */
74 events.EventEmitter.call(this);
75 for (var evname in handlers)
76 this.on(evname, handlers[evname]);
77 /* Don't launch anything that's not a real game. */
64 if (game in games) { 78 if (game in games) {
65 this.game = games[game]; 79 this.game = games[game];
66 } 80 }
67 else { 81 else {
68 // TODO: throw an exception instead 82 this.emit('open', false);
69 return null; 83 return;
70 } 84 }
71 this.player = String(user); 85 if (lkey in logins) {
72 this.key = lkey; 86 this.key = lkey;
73 /* This order seems to best avoid race conditions... */ 87 this.pname = logins[lkey].name;
74 this.alive = false; 88 }
75 // A kludge until TermSession is rewritten to handle real watching. 89 else {
76 this.sendq = false; 90 this.emit('open', false);
77 this.sessid = randkey(2); 91 return;
78 while (this.sessid in sessions) {
79 this.sessid = randkey(2);
80 } 92 }
81 /* Grab a spot in the sessions table. */ 93 /* Grab a spot in the sessions table. */
94 this.sessid = nextsession++;
82 sessions[this.sessid] = this; 95 sessions[this.sessid] = this;
83 /* State for messaging. */
84 this.nsend = 0;
85 this.nrecv = 0;
86 this.msgQ = []
87 this.Qtimeout = null;
88 /* Set up the sizes. */ 96 /* Set up the sizes. */
89 this.w = Math.floor(Number(dims[1])); 97 this.w = Math.floor(Number(dims[1]));
90 if (!(this.w > 0 && this.w < 256)) 98 if (!(this.w > 0 && this.w < 256))
91 this.w = 80; 99 this.w = 80;
92 this.h = Math.floor(Number(dims[0])); 100 this.h = Math.floor(Number(dims[0]));
96 var childenv = {}; 104 var childenv = {};
97 for (var key in process.env) { 105 for (var key in process.env) {
98 childenv[key] = process.env[key]; 106 childenv[key] = process.env[key];
99 } 107 }
100 childenv["PTYHELPER"] = String(this.h) + "x" + String(this.w); 108 childenv["PTYHELPER"] = String(this.h) + "x" + String(this.w);
101 /* TODO handle tty-opening errors */ 109 args = [this.game.path, "-n", this.pname];
102 /* TODO make argument-finding into a method */
103 args = [this.game.path, "-n", user.toString()];
104 this.child = child_process.spawn("/bin/ptyhelper", args, {"env": childenv}); 110 this.child = child_process.spawn("/bin/ptyhelper", args, {"env": childenv});
105 var ss = this; 111 this.emit('open', true);
106 this.alive = true;
107 this.data = [];
108 /* Set up the lockfile and ttyrec */ 112 /* Set up the lockfile and ttyrec */
109 var ts = timestamp(); 113 var ts = timestamp();
110 var progressdir = "/dgldir/inprogress-" + this.game.uname; 114 var progressdir = "/dgldir/inprogress-" + this.game.uname;
111 this.lock = path.join(progressdir, this.player + ":node:" + ts + ".ttyrec"); 115 this.lock = path.join(progressdir, this.pname + ":node:" + ts + ".ttyrec");
112 var lmsg = this.child.pid.toString() + '\n' + this.w + '\n' + this.h + '\n'; 116 var lmsg = this.child.pid.toString() + '\n' + this.w + '\n' + this.h + '\n';
113 fs.writeFile(this.lock, lmsg, "utf8"); 117 fs.writeFile(this.lock, lmsg, "utf8");
114 var ttyrec = path.join("/dgldir/ttyrec", this.player, this.game.uname, 118 var ttyrec = path.join("/dgldir/ttyrec", this.pname, this.game.uname,
115 ts + ".ttyrec"); 119 ts + ".ttyrec");
116 this.record = fs.createWriteStream(ttyrec, { mode: 0664 }); 120 this.record = fs.createWriteStream(ttyrec, { mode: 0664 });
121 logins[lkey].sessions.push(this.sessid);
122 tslog("%s playing %s (index %d, pid %d)", this.pname, this.game.uname,
123 this.sessid, this.child.pid);
117 /* END setup */ 124 /* END setup */
118 function ttyrec_chunk(buf) { 125 function ttyrec_chunk(buf) {
119 var ts = new Date(); 126 var ts = new Date();
120 var chunk = new Buffer(buf.length + 12); 127 var chunk = new Buffer(buf.length + 12);
121 /* TTYREC headers */ 128 /* TTYREC headers */
122 chunk.writeUInt32LE(Math.floor(ts.getTime() / 1000), 0); 129 chunk.writeUInt32LE(Math.floor(ts.getTime() / 1000), 0);
123 chunk.writeUInt32LE(1000 * (ts.getTime() % 1000), 4); 130 chunk.writeUInt32LE(1000 * (ts.getTime() % 1000), 4);
124 chunk.writeUInt32LE(buf.length, 8); 131 chunk.writeUInt32LE(buf.length, 8);
125 buf.copy(chunk, 12); 132 buf.copy(chunk, 12);
126 ss.data.push(chunk);
127 ss.record.write(chunk); 133 ss.record.write(chunk);
134 ss.emit('data', buf);
128 } 135 }
129 this.child.stdout.on("data", ttyrec_chunk); 136 this.child.stdout.on("data", ttyrec_chunk);
130 this.child.stderr.on("data", ttyrec_chunk); 137 this.child.stderr.on("data", ttyrec_chunk);
138 this.write = function(data) {
139 this.child.stdin.write(data);
140 };
131 this.child.on("exit", function (code, signal) { 141 this.child.on("exit", function (code, signal) {
132 ss.exitcode = (code != null ? code : 255); 142 fs.unlink(ss.lock);
143 ss.record.end();
144 ss.emit('exit', code, signal);
145 var id = ss.sessid;
146 delete sessions[id];
147 tslog("Session %s ended.", id);
148 });
149 this.close = function () {
150 this.child.kill('SIGHUP');
151 };
152 }
153 TermSession.prototype = new events.EventEmitter();
154
155 function Watcher(session) {
156 var ss = this; // that
157 this.session = session;
158 this.alive = true;
159 /* State for messaging. */
160 this.nsend = 0;
161 this.sendQ = [];
162 /* Get a place in the table. */
163 this.id = randkey(2);
164 while (this.id in clients) {
165 this.id = randkey(2);
166 }
167 clients[this.id] = this;
168 function dataH(buf) {
169 var reply = {};
170 reply.t = "d";
171 reply.n = ss.nsend++;
172 reply.d = buf.toString("hex");
173 ss.sendQ.push(reply);
174 }
175 function exitH(code, signal) {
133 ss.alive = false; 176 ss.alive = false;
134 fs.unlink(ss.lock); 177 ss.sendQ.push({"t": "q"});
135 /* Wait for all the data to get collected */ 178 }
136 setTimeout(ss.cleanup, 1000); 179 session.on('data', dataH);
137 }); 180 session.on('exit', exitH);
181 this.read = function() {
182 /* Returns an array of all outstanding messages, empty if none. */
183 var temp = this.sendQ;
184 this.sendQ = [];
185 /* Clean up if finished. */
186 if (!this.alive) {
187 delete clients[this.id];
188 }
189 return temp;
190 };
191 this.quit = function() {
192 this.session.removeListener('data', dataH);
193 this.session.removeListener('exit', exitH);
194 delete clients[this.id];
195 };
196 }
197
198 function Player(gamename, lkey, dims, callback) {
199 var ss = this;
200 this.alive = false;
201 /* State for messaging. */
202 this.nsend = 0;
203 this.nrecv = 0;
204 this.sendQ = [];
205 this.recvQ = []
206 this.Qtimeout = null;
207 /* Get a place in the table. */
208 this.id = randkey(2);
209 while (this.id in clients) {
210 this.id = randkey(2);
211 }
212 clients[this.id] = this;
213
214 this.read = function() {
215 var temp = this.sendQ;
216 this.sendQ = [];
217 /* Clean up if finished. */
218 if (!this.alive) {
219 clearTimeout(this.Qtimeout);
220 delete clients[this.id];
221 }
222 return temp;
223 };
138 this.write = function (data, n) { 224 this.write = function (data, n) {
139 if (!this.alive || typeof (n) != "number") { 225 if (!this.alive || typeof (n) != "number") {
140 return; 226 return;
141 } 227 }
142 //console.log("Got message " + n);
143 var oindex = n - this.nrecv; 228 var oindex = n - this.nrecv;
144 if (oindex === 0) { 229 if (oindex === 0) {
145 //console.log("Writing message " + n); 230 this.session.write(data);
146 this.child.stdin.write(data);
147 this.nrecv++; 231 this.nrecv++;
148 var next; 232 var next;
149 while ((next = this.msgQ.shift()) !== undefined) { 233 while ((next = this.recvQ.shift()) !== undefined) {
150 //console.log("Writing message " + this.nrecv); 234 this.session.write(next);
151 this.child.stdin.write(next);
152 this.nrecv++; 235 this.nrecv++;
153 } 236 }
154 if (this.msgQ.length == 0 && this.Qtimeout) { 237 if (this.recvQ.length == 0 && this.Qtimeout) {
155 clearTimeout(this.Qtimeout); 238 clearTimeout(this.Qtimeout);
156 this.Qtimeout = null; 239 this.Qtimeout = null;
157 } 240 }
158 } 241 }
159 else if (oindex > 0 && oindex <= 1024) { 242 else if (oindex > 0 && oindex <= 1024) {
160 tslog("Stashing message %d at %d", n, oindex - 1); 243 tslog("Client %s: Stashing message %d at %d", this.id, n, oindex - 1);
161 this.msgQ[oindex - 1] = data; 244 this.recvQ[oindex - 1] = data;
162 if (!this.Qtimeout) { 245 if (!this.Qtimeout) {
163 var nextn = this.nrecv + this.msgQ.length + 1; 246 var nextn = this.nrecv + this.recvQ.length + 1;
164 this.Qtimeout = setTimeout(this.flushQ, 30000, this, nextn); 247 this.Qtimeout = setTimeout(this.flushQ, 30000, this, nextn);
165 } 248 }
166 } 249 }
167 /* Otherwise, discard it */ 250 /* Otherwise, discard it */
168 return; 251 return;
169 }; 252 };
170 this.flushQ = function (session, n) { 253 this.flushQ = function (client, n) {
171 /* Callback for when an unreceived message times out. 254 /* Callback for when an unreceived message times out.
172 * n is the first empty space that will not be given up on. */ 255 * n is the first empty space that will not be given up on. */
173 if (!session.alive) 256 if (!client.alive || client.nrecv >= n)
174 return; 257 return;
175 session.nrecv++; 258 client.nrecv++;
176 var next; 259 var next;
177 /* Clear the queue up to n */ 260 /* Clear the queue up to n */
178 while (session.nrecv < n) { 261 while (client.nrecv < n) {
179 next = session.msgQ.shift(); 262 next = client.recvQ.shift();
180 if (next !== undefined) 263 if (next !== undefined)
181 session.child.stdin.write(next); 264 client.session.write(next);
182 session.nrecv++; 265 client.nrecv++;
183 } 266 }
184 /* Clear out anything that's ready. */ 267 /* Clear out anything that's ready. */
185 while ((next = session.msgQ.shift()) !== undefined) { 268 while ((next = client.recvQ.shift()) !== undefined) {
186 session.child.stdin.write(next); 269 client.session.write(next);
187 session.nrecv++; 270 client.nrecv++;
188 } 271 }
189 /* Now set another timeout if necessary. */ 272 /* Now set another timeout if necessary. */
190 if (session.msgQ.length != 0) { 273 if (client.recvQ.length != 0) {
191 var nextn = session.nrecv + session.msgQ.length + 1; 274 var nextn = client.nrecv + client.recvQ.length + 1;
192 session.Qtimeout = setTimeout(session.flushQ, 30000, session, nextn); 275 client.Qtimeout = setTimeout(client.flushQ, 30000, client, nextn);
193 } 276 }
194 tslog("Flushing queue for session %s", session.sessid); 277 tslog("Flushing queue for player %s", player.id);
195 }; 278 };
196 this.read = function () { 279 this.quit = function() {
197 if (this.data.length == 0) 280 if (this.alive)
198 return null; 281 this.session.close();
199 var pos = 0;
200 var i = 0;
201 for (i = 0; i < this.data.length; i++)
202 pos += this.data[i].length - 12;
203 var nbuf = new Buffer(pos);
204 var tptr;
205 pos = 0;
206 while (this.data.length > 0) {
207 tptr = this.data.shift();
208 tptr.copy(nbuf, pos, 12);
209 pos += tptr.length - 12;
210 }
211 return nbuf;
212 }; 282 };
213 this.close = function () { 283 function openH(success) {
214 if (this.alive) 284 if (success) {
215 this.child.kill('SIGHUP'); 285 ss.alive = true;
216 }; 286 }
217 this.cleanup = function () { 287 callback(ss, success);
218 /* Call this when the child is dead. */ 288 }
219 if (ss.alive) 289 function dataH(chunk) {
220 return; 290 var reply = {};
221 ss.record.end(); 291 reply.t = "d";
222 /* Give the client a chance to read any leftover data. */ 292 reply.n = ss.nsend++;
223 if (ss.data.length > 0 || !ss.sendq) 293 reply.d = chunk.toString("hex");
224 setTimeout(ss.remove, 8000); 294 ss.sendQ.push(reply);
225 else 295 }
226 ss.remove(); 296 function exitH(code, signal) {
227 }; 297 ss.alive = false;
228 this.remove = function () { 298 ss.sendQ.push({"t": "q"});
229 var id = ss.sessid; 299 }
230 delete sessions[id]; 300 var handlers = {'open': openH, 'data': dataH, 'exit': exitH};
231 tslog("Session %s removed.", id); 301 this.session = new TermSession(gamename, lkey, dims, handlers);
232 }; 302 }
233 } 303
234 304 /* Some functions which check whether a player is currently playing or
305 * has a saved game. Maybe someday they will provide information on
306 * the game. */
235 function checkprogress(user, game, callback, args) { 307 function checkprogress(user, game, callback, args) {
236 var progressdir = "/dgldir/inprogress-" + game.uname; 308 var progressdir = "/dgldir/inprogress-" + game.uname;
237 fs.readdir(progressdir, function(err, files) { 309 fs.readdir(progressdir, function(err, files) {
238 if (err) { 310 if (err) {
239 args.unshift(err, null); 311 args.unshift(err, null);
357 } 429 }
358 430
359 function reaper() { 431 function reaper() {
360 var now = new Date(); 432 var now = new Date();
361 function reapcheck(session) { 433 function reapcheck(session) {
362 if (!session.alive)
363 return;
364 fs.fstat(session.record.fd, function (err, stats) { 434 fs.fstat(session.record.fd, function (err, stats) {
365 if (!err && now - stats.mtime > playtimeout) { 435 if (!err && now - stats.mtime > playtimeout) {
366 tslog("Reaping %s", session.sessid); 436 tslog("Reaping session %s", session.sessid);
367 /* Dissociate it with its login name. */ 437 /* Dissociate it with its login name. */
368 var sn = logins[session.key].sessions.indexOf(session.sessid); 438 var sn = logins[session.key].sessions.indexOf(session.sessid);
369 if (sn >= 0) { 439 if (sn >= 0) {
370 logins[session.key].sessions.splice(sn, 1); 440 logins[session.key].sessions.splice(sn, 1);
371 if (now - logins[session.key].ts > playtimeout) 441 if (now - logins[session.key].ts > playtimeout)
401 expired.push(targarray[i]); 471 expired.push(targarray[i]);
402 } 472 }
403 if (expired.length > 0) { 473 if (expired.length > 0) {