Mercurial > hg > rlgallery-misc
view py/rlgall.py @ 70:277b126877c8
Add a Windows build link to the download page.
author | John "Elwin" Edwards |
---|---|
date | Mon, 25 Jan 2016 21:00:19 -0500 |
parents | 67bcca6e3cb1 |
children | 2cca66b3e262 |
line wrap: on
line source
# rlgall.py # Module for the Roguelike Gallery, using a postgres database # Requires Python 3.3 import re import os import psycopg2 from datetime import datetime import pytz import html # Configuration logdir = "/var/dgl/var/games/roguelike/" webdir = "/var/www/lighttpd/scoring/" ppagename = webdir + "players/{0}.html" hpagename = webdir + "highscores.html" # HTML fragments for templating phead = """<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01//EN" "http://www.w3.org/TR/html4/strict.dtd"> <html><head> <title>{0}</title> <link rel="stylesheet" href="/scoring/scores.css" type="text/css"> </head> """ ptop = """<body> <h1>Yendor Guild</h1> """ navtop = '<div class="nav"><a href="/">rlgallery.org</a> -> {0}</div>\n' navscore = '<div class="nav"><a href="/">rlgallery.org</a> -> \ <a href="/scoring/">Scores</a> -> {0}</div>\n' navplayer = '<div class="nav"><a href="/">rlgallery.org</a> -> \ <a href="/scoring/">Scores</a> -> <a href="/scoring/players/">Players</a> \ -> {0}</div>' pti = '<h2>{0}</h2>\n' secthead = '<h3>{0}</h3>\n' tblhead = '<div class="stable">\n' rowstart = '<div class="sentry">\n' rowend = '</div>\n' cell = ' <span class="sdata">{0}</span>\n' rcell = ' <span class="sdatar">{0}</span>\n' hcell = ' <span class="shdata">{0}</span>\n' tblend = '</div>\n' pend = """<div class="foot"> <a href="/">rlgallery.org</a> <a href="/recent.cgi">Recent Games</a> <a href="/scoring/high.cgi">High Scores</a> </div> </body></html> """ # This would be more useful if we had to do translation headerbook = {"endt":"End time", "score":"Score", "name":"Name", "xl":"XL", "fate":"Fate", "rank":"Rank", "game":"Game", "class":"Class", "depth":"Depth"} # Queries for the games table offselstr = "SELECT offbytes FROM games WHERE gname = %s;" newoffstr = "UPDATE games SET offbytes = %s WHERE gname = %s;" def getconn(): "Returns a database connection, or None if the connection fails." try: conn = psycopg2.connect("dbname=rlg") except psycopg2.OperationalError: return None return conn def recnameToTS(filename): pattern = "%Y-%m-%d.%H:%M:%S.ttyrec" try: dt = datetime.strptime(filename, pattern).replace(tzinfo=pytz.utc) return dt except ValueError: return None def playerlink(name): "Returns a link to a player's page" escname = html.escape(name) lstr = '<a href="/scoring/players/' + escname + '.html">' + escname + '</a>' return lstr def linktoArchive(entry): "Takes an entry dict and returns a link to the ttyrec archivist." lstr = '<a href="/archive.cgi?name={0};game={1};time={2}">{3}</a>' linktext = entry["endt"].strftime("%Y/%m/%d %H:%M:%S") stamp = int(entry["endt"].timestamp()) escname = html.escape(entry["name"]) return lstr.format(escname, entry["game"].uname, stamp, linktext) def maketablerow(cells, isheader=None): "Takes a list of strings and returns a HTML table row with each string \ in its own cell. isheader will make them header cells, obviously." if isheader: celler = hcell else: celler = cell rowstr = rowstart for entry in cells: rowstr = rowstr + celler.format(entry) rowstr = rowstr + rowend return rowstr def printTable(entries, fields, of): "Takes a list of entry dicts and a list of field strings and writes a \ HTML table to of." of.write(tblhead) clist = [] for field in fields: if field in headerbook: clist.append(headerbook[field]) else: clist.append("{0}".format(field)) of.write(maketablerow(clist, True)) rnum = 0 for entry in entries: clist = [] for field in fields: if field in entry: thing = entry[field] if field == "game": clist.append((thing.name, cell)) elif field == "xl" or field == "score" or field == "rank": # Numerics clist.append((str(thing), rcell)) elif field == "depth" or field == "maxdepth": clist.append((str(thing), cell)) elif field == "name": clist.append((playerlink(thing), cell)) elif field == "fate": clist.append((thing, cell)) elif field == "endt": clist.append((linktoArchive(entry), cell)) else: clist.append(("{0}".format(thing), cell)) else: clist.append(("N/A", cell)) rowstr = rowstart + "".join([ t.format(s) for (s, t) in clist ]) + rowend of.write(rowstr) of.write(tblend) return class Game: def __init__(self, name, uname): pass def logtoDict(self, entry): "Processes a log entry string, returning a dict." ndict = {"game": self} entrylist = entry.strip().split(self.logdelim, len(self.logspec) - 1) for item, value in zip(self.logspec, entrylist): if self.sqltypes[item] == "int": ndict[item] = int(value) if self.sqltypes[item] == "bool": ndict[item] = bool(int(value)) elif self.sqltypes[item] == "timestamptz": ndict[item] = datetime.fromtimestamp(int(value), pytz.utc) else: ndict[item] = value return ndict def getEntryDicts(self, entfile, entlist): "Reads logfile entries from entfile, interprets them according to the \ instructions in self.logspec/logdelim, and adds them as dicts to entlist." while True: nextentry = entfile.readline() if not nextentry: break if nextentry[-1] != '\n': break entlist.append(self.logtoDict(nextentry)) return def loadnew(self): conn = getconn() if conn == None: return [] cur = conn.cursor() # Get the previous offset cur.execute(offselstr, [self.uname]) offset = cur.fetchone()[0] newlist = [] try: scr = open(self.scores, encoding="utf-8") scr.seek(offset) self.getEntryDicts(scr, newlist) except IOError: noffset = offset # Can't read anything, assume no new games else: noffset = scr.tell() scr.close() cur.execute(newoffstr, [noffset, self.uname]) # The new players must already be added to the players table. updatenames = set([ e["name"] for e in newlist ]) self.postprocess(newlist) self.putIntoDB(newlist, conn) cur.close() conn.close() return updatenames def postprocess(self, gamelist): "Default postprocessing function: adds start time and ttyrec list" names = set([ e["name"] for e in gamelist ]) conn = getconn() if conn == None: return [] cur = conn.cursor() for nameF in names: # Get all this player's games ordered by time itsEntries = [ entry for entry in gamelist if entry["name"] == nameF ] itsEntries.sort(key=lambda e: e["endt"]) # Find the end time of the latest game already in the db tquery = "SELECT endt FROM {0} WHERE name = %s ORDER BY endt DESC LIMIT 1;".format(self.uname) cur.execute(tquery, [nameF]) result = cur.fetchone() if result: prev = result[0] else: prev = datetime.fromtimestamp(0, pytz.utc); ttyrecdir = "/var/dgl/dgldir/ttyrec/{0}/{1}/".format(nameF, self.uname) allfilekeys = [ (recnameToTS(f), f) for f in os.listdir(ttyrecdir) ] vfilekeys = [ e for e in allfilekeys if e[0] > prev ] vfilekeys.sort(key=lambda e: e[0]) # Now determine startt and ttyrecs for each game for i in range(0, len(itsEntries)): if i == 0: lowlim = prev else: lowlim = itsEntries[i-1]["endt"] hilim = itsEntries[i]["endt"] recs = [ k[1] for k in vfilekeys if lowlim <= k[0] < hilim ] if len(recs) == 0: # There inexplicably are no files. TODO log an error. itsEntries[i]["startt"] = lowlim itsEntries[i]["ttyrecs"] = [] else: itsEntries[i]["startt"] = recnameToTS(recs[0]) itsEntries[i]["ttyrecs"] = recs cur.close() conn.close() def putIntoDB(self, dictlist, conn): cur = conn.cursor() cur.executemany(self.insertq, [ d for d in dictlist if d["game"] == self ]) conn.commit() cur.close() return def tablerecent(self, of): "Prints the most recent games from the logfile, NOT the database." newest = [] try: scr = open(self.scores, encoding="utf-8") except FileNotFoundError: pass else: # Text streams don't support random seeking. try: scr.buffer.seek(self.lookback, 2) except OSError: scr.buffer.seek(0) # The file wasn't that long, start at the beginning if scr.buffer.tell() != 0: scr.buffer.readline() # Throw away the incomplete line self.getEntryDicts(scr, newest) newest.reverse() scr.close() of.write(secthead.format(self.name)) if not newest: of.write("<div>No one has braved this dungeon yet.</div>\n") else: printTable(newest, self.fields, of) return def getHigh(self, n=10, offset=0, inittime=None, finaltime=None): "Gets the n highest scores (starting at offset) from the database, \ returning a list of dicts." qfields = [] for f in self.rankfields: if f == "rank": qfields.append("rank(*) OVER (ORDER BY score DESC)") else: qfields.append(f) qstr = "SELECT " + ", ".join(qfields) + " FROM {0}".format(self.uname) qvals = [] if isinstance(inittime, datetime): qstr += " WHERE endt >= %s" qvals.append(inittime) if isinstance(finaltime, datetime): if finaltime < inittime: return [] qstr += " AND endt < %s" qvals.append(finaltime) elif isinstance(finaltime, datetime): qstr += " WHERE endt < %s" qvals.append(finaltime) try: n = int(n) except (ValueError, TypeError): return [] if n <= 0: return [] qstr += " LIMIT %s" qvals.append(n) try: offset = int(offset) except (ValueError, TypeError): return [] if offset > 0: qstr += " OFFSET %s" qvals.append(offset) qstr += ";" conn = psycopg2.connect("dbname=rlg") cur = conn.cursor() cur.execute(qstr, qvals) cols = [ col.name for col in cur.description ] data = [ dict(zip(cols, row)) for row in cur.fetchall() ] cur.close() conn.close() for d in data: d["game"] = self return data def getXLCounts(self, nmax=15): "Returns a list of (xlevel, gamecount) pairs." lquery = "SELECT count(*) FROM {0} WHERE xl = %s;".format(self.uname) mquery = "SELECT count(*) FROM {0} WHERE xl >= %s;".format(self.uname) try: nmax = int(nmax) except (ValueError, TypeError): return [] if nmax <= 0: return [] xlevels = range(1, nmax) results = [] conn = psycopg2.connect("dbname=rlg") cur = conn.cursor() for xl in xlevels: cur.execute(lquery, [xl]) results.append((xl, cur.fetchone()[0])) cur.execute(mquery, [nmax]) results.append((nmax, cur.fetchone()[0])) cur.close() conn.close() return results def getDepthCounts(self, nmax=30): "Returns a list of (maxdepth, gamecount) pairs." dqry = "SELECT count(*) FROM {0} WHERE maxdepth = %s;".format(self.uname) mqry = "SELECT count(*) FROM {0} WHERE maxdepth >= %s;".format(self.uname) try: nmax = int(nmax) except (ValueError, TypeError): return [] if nmax <= 0: return [] depths = range(1, nmax) results = [] conn = psycopg2.connect("dbname=rlg") cur = conn.cursor() for lev in depths: cur.execute(dqry, [lev]) results.append((lev, cur.fetchone()[0])) cur.execute(mqry, [nmax]) results.append((nmax, cur.fetchone()[0])) cur.close() conn.close() return results def getScoreCounts(self, blocks=10, size=1000): "Returns a list of (minscore, gamecount) pairs." sqry = "SELECT count(*) FROM {0} WHERE score >= %s AND score < %s;".format(self.uname) mqry = "SELECT count(*) FROM {0} WHERE score >= %s;".format(self.uname) try: blocks = int(blocks) size = int(size) except (ValueError, TypeError): return [] if blocks <= 0 or size <= 0: return [] marks = range(blocks - 1) results = [] conn = psycopg2.connect("dbname=rlg") cur = conn.cursor() for m in marks: cur.execute(sqry, [m * size, (m + 1) * size]) results.append((m * size, cur.fetchone()[0])) cur.execute(mqry, [(blocks - 1) * size]) results.append(((blocks - 1) * size, cur.fetchone()[0])) cur.close() conn.close() return results # End Game class definition class RogueGame(Game): def __init__(self, name, uname, suffix): self.name = name self.uname = uname self.scores = logdir + uname + ".log" self.logspec = ["endt", "score", "name", "xl", "fate"] self.sqltypes = {"endt": "timestamptz", "score": "int", "name": "text", "xl": "int", "fate": "text", "ttyrecs": "text ARRAY", "startt": "timestamptz", "depth": "int", "maxdepth": "int"} self.logdelim = " " self.lookback = -1500 # Construct the insert query fields = self.sqltypes.keys() colspec = ", ".join(fields) valspec = ", ".join([ "%({})s".format(c) for c in fields ]) self.insertq = "INSERT INTO {0} ({1}) VALUES ({2});".format(self.uname, colspec, valspec) # Class variables, used by some methods fields = ["name", "score", "xl", "fate", "endt"] rankfields = ["rank", "score", "name", "xl", "fate", "endt"] pfields = ["score", "xl", "fate", "endt"] def getRecent(self, n=20): "Gets the n most recent games out of the database, returning a list \ of dicts." try: n = int(n) except (ValueError, TypeError): return [] if n <= 0: return [] dictlist = [] conn = psycopg2.connect("dbname=rlg") cur = conn.cursor() qstr = "SELECT endt, score, name, xl, fate, startt FROM {0} ORDER BY endt DESC \ LIMIT %s;".format(self.uname) cur.execute(qstr, [n]) for record in cur: # This should be less hardcodish ndict = {"game": self} ndict["endt"] = record[0] ndict["score"] = record[1] ndict["name"] = record[2] ndict["xl"] = record[3] ndict["fate"] = record[4] ndict["startt"] = record[5] dictlist.append(ndict) cur.close() conn.close() return dictlist def getPlayer(self, player): "Gets all player's games from the database." qstr = "SELECT endt, score, xl, fate, startt FROM " + self.uname + " WHERE name = %s;" conn = getconn() if conn == None: return [] cur = conn.cursor() entries = [] cur.execute(qstr, [player]) for record in cur: ndict = {"game": self} ndict["name"] = player ndict["endt"] = record[0] ndict["score"] = record[1] ndict["xl"] = record[2] ndict["fate"] = record[3] ndict["startt"] = record[4] entries.append(ndict) cur.close() conn.close() return entries def putIntoDB(self, dictlist, conn): "Add the entries in dictlist to the database through connection conn, \ which needs the INSERT privilege." cur = conn.cursor() cur.executemany(self.insertq, [ d for d in dictlist if d["game"] == self ]) conn.commit() cur.close() return def postprocess(self, gamelist): lre = re.compile("^(quit|killed by (.*)) on level ([0-9]*)( \\[max ([0-9]*)\\] with the Amulet)?$") wre = re.compile("^escaped with the Amulet \\[deepest level: ([0-9]*)\\]$") for d in gamelist: m = lre.match(d["fate"]) if m: d["depth"] = int(m.group(3)) if m.group(4): d["maxdepth"] = int(m.group(5)) else: d["maxdepth"] = d["depth"] else: m = wre.match(d["fate"]) if m: d["depth"] = 0 d["maxdepth"] = int(m.group(1)) else: # Something went wrong d["depth"] = -1 d["maxdepth"] = -1 Game.postprocess(self, gamelist) # End RogueGame class definition class ARogueGame(Game): def __init__(self, name, uname, suffix): self.name = name self.uname = uname self.scores = logdir + uname + ".log" self.logspec = ["endt", "score", "name", "xl", "class", "depth", "maxdepth", "quest", "hadquest", "fate"] self.sqltypes = {"endt": "timestamptz", "score": "int", "name": "varchar(20)", "xl": "int", "class": "text", "depth": "int", "maxdepth": "int", "quest": "int", "hadquest": "bool", "fate": "text", "ttyrecs": "text ARRAY", "startt": "timestamptz"} self.logdelim = " " self.lookback = -1800 # Construct the insert query fields = self.sqltypes.keys() colspec = ", ".join(fields) valspec = ", ".join([ "%({})s".format(c) for c in fields ]) self.insertq = "INSERT INTO {0} ({1}) VALUES ({2});".format(self.uname, colspec, valspec) # Class variables fields = ["name", "score", "class", "xl", "fate", "depth", "endt"] rankfields = ["rank", "score", "name", "class", "xl", "fate", "depth", "endt"] pfields = ["score", "class", "xl", "fate", "depth", "endt"] def postprocess(self, gamelist): "Enforces consistent capitalization of the class title." for d in gamelist: d["class"] = d["class"].capitalize() Game.postprocess(self, gamelist) def getRecent(self, n=20): return [] def getPlayer(self, player): "Gets all player's games from the database." qstr = "SELECT endt, score, xl, class, fate, depth FROM " + self.uname + " WHERE name = %s;" conn = getconn() if conn == None: return [] cur = conn.cursor() entries = [] cur.execute(qstr, [player]) for record in cur: ndict = {"game": self} ndict["name"] = player ndict["endt"] = record[0] ndict["score"] = record[1] ndict["xl"] = record[2] ndict["class"] = record[3] ndict["fate"] = record[4] ndict["depth"] = record[5] entries.append(ndict) cur.close() conn.close() return entries rogue3 = RogueGame("Rogue V3", "rogue3", "r3") rogue4 = RogueGame("Rogue V4", "rogue4", "r4") rogue5 = RogueGame("Rogue V5", "rogue5", "r5") srogue = RogueGame("Super-Rogue", "srogue", "sr") arogue5 = ARogueGame("Advanced Rogue 5", "arogue5", "ar5") arogue7 = ARogueGame("Advanced Rogue 7", "arogue7", "ar7") xrogue = ARogueGame("XRogue", "xrogue", "xr") gamelist = [rogue3, rogue4, rogue5, srogue, arogue5, arogue7, xrogue] def playerpage(pname): "Generate a player's HTML page" # Write the beginning of the page ppagefi = open(ppagename.format(pname), "w", encoding="utf-8") cleanpname = html.escape(pname) ppagefi.write(phead.format(cleanpname)) ppagefi.write(ptop) ppagefi.write(navplayer.format(cleanpname)) ppagefi.write(pti.format("Results for " + cleanpname)) for game in gamelist: ppagefi.write(secthead.format(game.name)) entries = game.getPlayer(pname) if not entries: ppagefi.write("<div>" + cleanpname + " has not yet completed an " + "expedition in this dungeon.</div>\n") else: entries.sort(key=lambda e: e["endt"]) printTable(entries, game.pfields, ppagefi) scoresum = 0 for entry in entries: scoresum += int(entry["score"]) avgscr = scoresum // len(entries) ppagefi.write("<p>Average score: {0}</p>\n".format(avgscr)) # That should settle it. Print the end; then stop. ppagefi.write(pend) ppagefi.close() return def highpage(): # open the page and print the beginning highpfi = open(hpagename, "w", encoding="utf-8") highpfi.write(phead.format("High Scores")) highpfi.write(ptop) highpfi.write(navscore.format("High Scores")) highpfi.write(pti.format("List of High Scores")) for game in gamelist: highpfi.write(secthead.format(game.name)) entries = game.getHigh(40) if not entries: highpfi.write("<div>No one has braved this dungeon yet.</div>\n") else: printTable(entries, game.rankfields, highpfi) # That should finish it. highpfi.write(pend) highpfi.close() return