diff options
| author | Fulgen301 <tokmajigeorge@gmail.com> | 2017-07-11 13:25:22 +0200 |
|---|---|---|
| committer | Fulgen301 <tokmajigeorge@gmail.com> | 2017-07-11 13:25:22 +0200 |
| commit | e004eb61cd497cf208e926e0892f21c979966c81 (patch) | |
| tree | 62979e25f46bf8a95c5aed3dd6474f14e0d52c18 | |
| parent | 27cf3b77c1c78bfb7d56a86e740d28f22aef527b (diff) | |
| download | pycrctrl-e004eb61cd497cf208e926e0892f21c979966c81.tar.gz pycrctrl-e004eb61cd497cf208e926e0892f21c979966c81.zip | |
Replace spaces with tabs
| -rw-r--r-- | pycrctrl/pycrctrl.py | 1118 |
1 files changed, 559 insertions, 559 deletions
diff --git a/pycrctrl/pycrctrl.py b/pycrctrl/pycrctrl.py index 584341f..9d6866e 100644 --- a/pycrctrl/pycrctrl.py +++ b/pycrctrl/pycrctrl.py @@ -33,12 +33,12 @@ from io import BytesIO #iostream import try: - from iostream import * + from iostream import * except ModuleNotFoundError: - with open("iostream.py","wb") as fobj: - fobj.write(urllib.request.urlopen("https://raw.githubusercontent.com/Fulgen301/pythonprograms/master/iostream/iostream.py").read()) - - from iostream import * + with open("iostream.py","wb") as fobj: + fobj.write(urllib.request.urlopen("https://raw.githubusercontent.com/Fulgen301/pythonprograms/master/iostream/iostream.py").read()) + + from iostream import * import random import tarfile @@ -51,568 +51,568 @@ from enum import IntEnum # class list(list): - """ Extended list """ - - def __lshift__(self, other): - self.append(other) - return self + """ Extended list """ + + def __lshift__(self, other): + self.append(other) + return self - def __rshift__(self, other): - if isinstance(other, list): - item = self.__getitem__(len(self) - 1) - try: - other.append(item) - self.pop() - except Exception: - raise TypeError("Cannot pass item to {}!".format(other)) from None - - # Methods - - def isEmpty(self): - return len(self) == 0 + def __rshift__(self, other): + if isinstance(other, list): + item = self.__getitem__(len(self) - 1) + try: + other.append(item) + self.pop() + except Exception: + raise TypeError("Cannot pass item to {}!".format(other)) from None + + # Methods + + def isEmpty(self): + return len(self) == 0 class CmdResult(IntEnum): - UnknownCommand = -1 - Success = 0 - SyntaxFail = 1 - RightsFail = 2 - RuntimeError = 3 + UnknownCommand = -1 + Success = 0 + SyntaxFail = 1 + RightsFail = 2 + RuntimeError = 3 class Updater(object): - parent = None - __current_revision = "" - lookuptable = {"64bit" : "amd64", "32bit" : "i386"} - - def __init__(self, parent): - self.parent = parent - with open(os.path.join(self.parent.path, "snapshot.id"), "rb") as fobj: - self.__current_revision = fobj.read().decode("utf-8") - start_new_thread(self.checkForUpdates, ()) - - @property - def current_revision(self): - return self.__current_revision - - @current_revision.setter - def current_revision(self, other): - self.__current_revision = other - if type(other) == str: - try: - other = other.encode("utf-8") - except Exception: - raise TypeError("Wrong datatype!") from None - - with open(os.path.join(self.parent.path, "snapshot.id"), "wb") as fobj: - fobj.write(other) - - def checkForUpdates(self): - while True: - try: - site = urllib.request.urlopen("http://openclonk.org/nightly-builds").read().decode("utf-8").split("<a href='/builds/nightly/snapshots/") - site.remove(site[0]) - site = [i.split("' title")[0] for i in site] - - x = None - for i in site: - x = re.match(r"openclonk-snapshot-(.*)-(.*)-{}-{}-.*".format(sys.platform, self.lookuptable[architecture()[0]]), i) - if x: - rev = x.group(2) - - if self.current_revision != rev: - self.current_revision = rev - self.loadNewSnapshot(x) - - break - if not x: - cout << "Regex didn't match" << endl - - except Exception as e: - cerr << str(e) << endl - finally: - sleep(10) - - def loadNewSnapshot(self, reg): - cout << "Downloading snapshot..." << endl - with open(os.path.join(self.parent.path, "snapshot"), "wb") as fobj: - fobj.write(urllib.request.urlopen("http://openclonk.org/builds/nightly/snapshots/{}".format(reg.group(0).split("' title")[0])).read()) - - #extract the snapshot - tar = tarfile.open(os.path.join(self.parent.path, "snapshot"), mode="r:bz2") - tar.extractall(path=self.parent.path) - cout << "New snapshot has been extracted." << endl - - #get the openclonk-server autobuild - site = json.loads(urllib.request.urlopen("https://autobuild.openclonk.org/api/v1/jobs").read().decode("utf-8")) - - for commit in site: - for build in commit["builds"]: - if re.match(r"{}-{}-.*".format(sys.platform, self.lookuptable[architecture()[0]]), build["platform"]["triplet"]): - for b in build["components"]: - reg = re.match(r".*/openclonk-server-(.*)-(.*)-(.*)-.*", str(b["path"])) #skip the engine check as the only useful one is openclonk-server - #if reg: - # cout << str(reg) << " " - # cout << reg.group(1) << " " << reg.group(2) << " " << reg.group(3) << " " << " :: " - # cout << self.current_revision << sys.platform << self.lookuptable[architecture()[0]] << endl - # cout << endl - if reg and (reg.group(1), reg.group(2), reg.group(3)) == (self.current_revision[:-3], sys.platform, self.lookuptable[architecture()[0]]): - cout << "Downloading openclonk-server build..." << endl - buffer = BytesIO() - buffer.write(urllib.request.urlopen("https://autobuild.openclonk.org/static/binaries/{}".format(b["path"])).read()) - buffer.seek(0) - with open(os.path.join(self.parent.path, "openclonk-server"), "wb") as fobj: - fobj.write(GzipFile(fileobj=buffer).read()) - - cout << "New openclonk-server build has been extracted." << endl - os.chmod(os.path.join(self.parent.path, "openclonk-server"), os.stat(os.path.join(self.parent.path, "openclonk-server")).st_mode | 64) - return True - - + parent = None + __current_revision = "" + lookuptable = {"64bit" : "amd64", "32bit" : "i386"} + + def __init__(self, parent): + self.parent = parent + with open(os.path.join(self.parent.path, "snapshot.id"), "rb") as fobj: + self.__current_revision = fobj.read().decode("utf-8") + start_new_thread(self.checkForUpdates, ()) + + @property + def current_revision(self): + return self.__current_revision + + @current_revision.setter + def current_revision(self, other): + self.__current_revision = other + if type(other) == str: + try: + other = other.encode("utf-8") + except Exception: + raise TypeError("Wrong datatype!") from None + + with open(os.path.join(self.parent.path, "snapshot.id"), "wb") as fobj: + fobj.write(other) + + def checkForUpdates(self): + while True: + try: + site = urllib.request.urlopen("http://openclonk.org/nightly-builds").read().decode("utf-8").split("<a href='/builds/nightly/snapshots/") + site.remove(site[0]) + site = [i.split("' title")[0] for i in site] + + x = None + for i in site: + x = re.match(r"openclonk-snapshot-(.*)-(.*)-{}-{}-.*".format(sys.platform, self.lookuptable[architecture()[0]]), i) + if x: + rev = x.group(2) + + if self.current_revision != rev: + self.current_revision = rev + self.loadNewSnapshot(x) + + break + if not x: + cout << "Regex didn't match" << endl + + except Exception as e: + cerr << str(e) << endl + finally: + sleep(10) + + def loadNewSnapshot(self, reg): + cout << "Downloading snapshot..." << endl + with open(os.path.join(self.parent.path, "snapshot"), "wb") as fobj: + fobj.write(urllib.request.urlopen("http://openclonk.org/builds/nightly/snapshots/{}".format(reg.group(0).split("' title")[0])).read()) + + #extract the snapshot + tar = tarfile.open(os.path.join(self.parent.path, "snapshot"), mode="r:bz2") + tar.extractall(path=self.parent.path) + cout << "New snapshot has been extracted." << endl + + #get the openclonk-server autobuild + site = json.loads(urllib.request.urlopen("https://autobuild.openclonk.org/api/v1/jobs").read().decode("utf-8")) + + for commit in site: + for build in commit["builds"]: + if re.match(r"{}-{}-.*".format(sys.platform, self.lookuptable[architecture()[0]]), build["platform"]["triplet"]): + for b in build["components"]: + reg = re.match(r".*/openclonk-server-(.*)-(.*)-(.*)-.*", str(b["path"])) #skip the engine check as the only useful one is openclonk-server + #if reg: + # cout << str(reg) << " " + # cout << reg.group(1) << " " << reg.group(2) << " " << reg.group(3) << " " << " :: " + # cout << self.current_revision << sys.platform << self.lookuptable[architecture()[0]] << endl + # cout << endl + if reg and (reg.group(1), reg.group(2), reg.group(3)) == (self.current_revision[:-3], sys.platform, self.lookuptable[architecture()[0]]): + cout << "Downloading openclonk-server build..." << endl + buffer = BytesIO() + buffer.write(urllib.request.urlopen("https://autobuild.openclonk.org/static/binaries/{}".format(b["path"])).read()) + buffer.seek(0) + with open(os.path.join(self.parent.path, "openclonk-server"), "wb") as fobj: + fobj.write(GzipFile(fileobj=buffer).read()) + + cout << "New openclonk-server build has been extracted." << endl + os.chmod(os.path.join(self.parent.path, "openclonk-server"), os.stat(os.path.join(self.parent.path, "openclonk-server")).st_mode | 64) + return True + + class PyCRCtrl(object): - """Server control""" - - clonk = None - thread_started = False - stopped = False - config = {} - - scenario = "" - commands = {} - - path = str() - scenlist = list() - league_scenlist = list() - - topic = "Kein laufendes Spiel." - league = True - - __state = "Lobby" - __ingamechat = "aktiviert" - - updater = None - - def __init__(self, irc=None, path=None, config="pycrctrl.conf"): - if sys.platform == "win32": - raise NotImplementedError("{} wird nicht unterstützt!".format(sys.platform)) - - self.irc = irc - self.path = path - self.loadConfigFile(config) - self.loadScenarioList() - - self.queue = queue.Queue(5) - if self.config["Clonk"]["Updater"]["enabled"]: - self.updater = Updater(self) - - def __ostream__(self, ostream): - return "PyCRCtrl: commandline: {}, channel: {}, scenario: {}".format(self.commandline, self.config["Clonk"]["IRC"]["Channels"]["ingame"], (self.scenario if self.scenario != "" else "None")) - - @property - def state(self): - return self.__state - - @state.setter - def state(self, text): - if text in ["Lobby", "Lädt", "Läuft"]: - self.__state = text - topic = "Aktuelles Szenario: {} | {}{} | Ingamechat ist {}.".format(self.scenario, self.state, (" | Liga" if self.scenario in self.league_scenlist else ""), self.ingamechat) - self.setTopic(topic) - - @property - def ingamechat(self): - return self.__ingamechat - - @ingamechat.setter - def ingamechat(self, text): - if text in ["aktiviert", "deaktiviert"]: - self.__ingamechat = text - self.state = self.state - - def loadScenarioList(self) -> bool: - if self.path == None: - return False - - with open(os.path.join(self.path,"scenlist"), "rb") as fobj: - self.scenlist = Unpickler(fobj).load() - - with open(os.path.join(self.path, "scenlist.league"), "rb") as fobj: - self.league_scenlist = Unpickler(fobj).load() - - return True - - - def loadConfigFile(self, config) -> bool: - if self.path == None: - return False - - conf = os.path.join(self.path, config) - - if os.path.isdir(conf): - return False - - elif os.path.isfile(conf): - self.config = self.setupConfig(json.load(open(conf, "r"))) - return True - - elif not os.path.exists(conf): - c = { - "General" : { - "useLogfile" : True - }, - - "Clonk" : { - "engine" : "clonk", - "commandline" : ["fullscreen", ["lobby",300], ["config","config.txt"], "record", "faircrew"], - "commandlinePrefix" : "/", - "prefix" : "@", - - "RegExps" : { - "lobbyStartExp" : "", - "startExp" : "", - "joinExp" : "", - "leaveExp" : "", - "shutdownExp" : "" - }, - - "autohost" : False, - "IRC" : { - "ingamechat" : True, - "Channels" : { - "parent" : "", - "ingame" : "" - } - }, - - "Rights" : { - "admin" : True, - "moderator" : True - }, - - "Updater" : { - "enabled" : False, - "RegExps" : { - "autobuild" : "", - "snapshot" : "" - }, - "Addresses" : { - "snapshotList" : "", - "snapshotDownload" : "", - "autobuildList" : "", - "autobuildAddress" : "" - } - } - } - } - - json.dump(c, open(conf, "w"), indent=4) - - self.config = self.setupConfig(c) - return True - - def setupConfig(self, config): - #commandline - sep = ":" if config["Clonk"]["commandlinePrefix"] == "/" else "=" - - res = "" - for entry in config["Clonk"]["commandline"]: - if isinstance(entry, dict): - for i in entry.items(): - res += "{}{}{}{} ".format(config["Clonk"]["commandlinePrefix"], (i[0]), sep, str(i[1])) - else: - res += "{}{} ".format(config["Clonk"]["commandlinePrefix"], entry) - - config["Clonk"]["commandline"] = res - - #regexps - - config["Clonk"]["RegExps"] = { - i:re.compile(self.decodeRegExp(config["Clonk"]["RegExps"][i], config["Clonk"]["encoding"])) for i in config["Clonk"]["RegExps"] - } - - self.ingamechat = config["Clonk"]["IRC"]["ingamechat"] - return config - - def decodeRegExp(self, regexp, encoding=None): - return base64.b64decode(regexp).decode(encoding or self.config["Clonk"]["encoding"]) - - def host(self, scenario=None, user=None) -> str: - if scenario == None: - return (CmdResult.SyntaxFail, "Bitte gib einen Szenarionamen an!") - if hasattr(scenario, "decode"): - try: - scenario = scenario.decode(self.config["Clonk"]["encoding"], "replace") - except: - return (CmdResult.RuntimeError, "Dekodierfehler. Bitte kontaktiere den Hoster dieses Servers.") - - scenario = scenario.splitlines()[0] - if scenario == "random": - scenario = random.choice(self.scenlist) - - elif scenario not in self.scenlist: - return (CmdResult.SyntaxFail,'Szenario "{}" wurde nicht gefunden!'.format(scenario)) - - if self.thread_started == False: - self.scenario = scenario - self.thread_started = True - start_new_thread(self.startClonk,()) - return (CmdResult.Success, 'Szenario "{}" wird jetzt gehostet.'.format(scenario)) - - if self.queue.full() == False: - self.queue.put(scenario) - return (CmdResult.Success, 'Szenario "{}" wurde der Warteschlange hinzugefügt.'.format(scenario)) - else: - return (CmdResult.RuntimeError, "Die Warteschlange ist voll!") - - def startClonk(self): - try: - while True: - if self.scenario == "": - if self.queue.empty() == False: - self.scenario = self.queue.get() - else: - self.scenario = random.choice(self.scenlist) - - self.clonk = subprocess.Popen( - './{} {} "{}"'.format(self.config["Clonk"]["engine"], self.config["Clonk"]["commandline"] + " " + self.config["Clonk"]["commandlinePrefix"] + ("league" if self.scenario in self.league_scenlist else "noleague"), self.scenario), - 0, - None, - subprocess.PIPE, - subprocess.PIPE, - subprocess.STDOUT, - shell=True, - cwd=self.path, - encoding=self.config["Clonk"]["encoding"] - ) - self.state = "Lobby" - self.readServerOutput() - if self.config["Clonk"]["autohost"] == False: - self.thread_started = False - self.setTopic("Kein laufendes Spiel.") - break - - finally: - if self.clonk: - self.clonk.stdin.close() - - return (CmdResult.Success, "") - - def readServerOutput(self): - while True: - try: - output = self.clonk.stdout.readline() - - if self.config["Clonk"]["RegExps"]["shutdownExp"].match(output): - self.clonk.stdin.close() - elif output == "" and self.clonk.poll() is not None: - if self.clonk: - self.clonk.stdin.close() - self.clonk = None - self.scenario = "" - return - - elif output: - output = output.splitlines()[0] - #output = output.decode("utf-8").splitlines()[0] - output = output[(output.find("] ") if output.find("] ") != -1 else -2)+len("] "):] - - if output[0] == ">": - output = output[1:] - - part = re.match("<(.*)> ({})(.*)".format(self.config["Clonk"]["prefix"]), output) - if part and part.group(0) != self.irc.nick: - cmd = part.group(3).split(" ", 1) - found = False - x = None - if len(cmd) > 0: - for key in self.commands.keys(): - if key == cmd[0].splitlines()[0]: - found = True - try: - x = self.commands[key](cmd[1].split(" "), user=part.group(1)) - except IndexError: - x = self.commands[key](user=part.group(1)) - break - - if not found: - self.writeToServer('Unbekannter Befehl: "' + part.group(2) + '"!') - if x: - if type(x) == tuple and x[1] != "": - self.writeToServer(x[1]) - del x - if self.config["Clonk"]["RegExps"]["lobbyStartExp"].match(output): - self.state = "Lädt" - - elif self.config["Clonk"]["RegExps"]["startExp"].match(output): - self.state = "Läuft" - try: - cout << output << endl - except (UnicodeDecodeError, UnicodeEncodeError): - pass - - if self.irc and self.ingamechat == "aktiviert": - if output.find("<" + self.irc.nick + ">") == -1: - if re.match(r"^<.*>", output) and output.find("[IRC]") == -1 and output.find(self.config["Clonk"]["prefix"]) == -1: - self.irc.reply("[Clonk]{}".format(output), to=self.config["Clonk"]["IRC"]["Channels"]["ingame"]) - - elif self.config["Clonk"]["RegExps"]["joinExp"].match(output) or self.config["Clonk"]["RegExps"]["leaveExp"].match(output): - self.irc.reply(output, to=self.config["Clonk"]["IRC"]["Channels"]["ingame"]) - - - except KeyboardInterrupt: - if self.clonk: - self.clonk.stdin.close() - - except Exception as e: - cerr << str(e) << endl - continue - - return (CmdResult.Success, "") - - def doPrivmsg(self, msg): - if not (self.irc and self.ingamechat == "aktiviert"): - return - for channel in msg.args[0].split(","): - if channel == self.config["Clonk"]["IRC"]["Channels"]["ingame"] and msg.nick != self.irc.nick: - self.writeToServer("[IRC]<{}> {}".format(msg.nick, msg.args[1])) - - - def writeToServer(self, text=None): - if text == None and self.clonk == None: - return (CmdResult.RuntimeError, "") - elif type(text) != str: - try: - text = text.decode(self.config["Clonk"]["encoding"]) - except: - raise IOError("Cannot write anything else to the server except the following data types: bytes, str (got {})".format(type(text).__name__)) - - if self.clonk and self.clonk.stdin: - self.clonk.stdin.write(text + "\n") - self.clonk.stdin.flush() - return (CmdResult.Success, "") - - def setTopic(self, text=None): - if not self.irc: - return False - - if type(text) != str: - try: - text = text.decode(self.config["Clonk"]["encoding"]) - except: - raise TypeError("text must be a string!") - - if self.topic == text: - return False - - else: - self.topic = text - return bool(self.irc.sendMsg(ircmsgs.topic(self.config["Clonk"]["IRC"]["Channels"]["ingame"], text))) - - - def start(self, time=None, user=""): - """Startet das Spiel.""" - self.stopped = False - if time: - try: - time = int(time[0]) - except: - time = 5 - self.writeToServer("/start {}".format(time)) - else: - self.writeToServer("/start") - - return (CmdResult.Success, "") - - - def stop(self, prm=None, user=""): - """Stoppt den Countdown.""" - def stopping(self): - while self.clonk and self.stopped: - self.writeToServer("/start 60000") - if self.stopped == False: - return - sleep(100) - - if self.stopped == False: - self.stopped = True - start_new_thread(stopping, (self,)) - - return (CmdResult.Success, "") - - - def help(self, prm=None, user=""): - """Gibt die Hilfe aus.""" - self.writeToServer("Verfügbare Befehle:") - for text, function in self.commands.items(): - self.writeToServer("{} -- {}".format(text, function.__doc__)) - - return (CmdResult.Success, "") - - - def displayQueue(self, prm=None, user=""): - """Gibt die Warteschlange aus.""" - self.writeToServer("Warteschlange:") - - for i,scen in enumerate(self.queue.queue): - self.writeToServer("{}. {}".format(i+1, scen)) - - return (CmdResult.Success, "") - - - def list(self, prm=None, user=""): - """Zeigt die Szenarioliste an.""" - self.writeToServer("List:\n-------------") - for scen in self.scenlist: - self.writeToServer(scen) - - return (CmdResult.Success, "") - - def ircCommands(self, prm=None, user=""): - """Enthält Befehle zur Steuerung der IRC-Funktionen.""" - if not prm: - return (CmdResult.SyntaxFail, "") - - if prm[0] == "ingamechat": - if prm[1] == "off": - self.ingamechat = "deaktiviert" - elif prm[1] == "on": - self.ingamechat = "aktiviert" - - return (CmdResult.Success, "") - - def IsRunning(self) -> bool: - return self.state != "Lobby" - - # - # functions - # - - def addCommand(self, function, text): - self.commands[text.split(" ")[0]] = function - return self - - def addScenario(self, link): - name = "" - for item in link.split("/"): - if re.match(r"(.*)\.[oc][c4]s",item): - name = item - break - - site = urllib.request.urlopen(link).read() #WARNING: Raises an error if the link is invalid! - with open(os.path.join(self.path, name),"wb") as fobj: - fobj.write(site) - - try: - self.scenlist.index(name) - except Exception: - self.scenlist.append(name) - Pickler(open(os.path.join(self.path, "scenlist"), "wb")).dump(self.scenlist) - return self - + """Server control""" + + clonk = None + thread_started = False + stopped = False + config = {} + + scenario = "" + commands = {} + + path = str() + scenlist = list() + league_scenlist = list() + + topic = "Kein laufendes Spiel." + league = True + + __state = "Lobby" + __ingamechat = "aktiviert" + + updater = None + + def __init__(self, irc=None, path=None, config="pycrctrl.conf"): + if sys.platform == "win32": + raise NotImplementedError("{} wird nicht unterstützt!".format(sys.platform)) + + self.irc = irc + self.path = path + self.loadConfigFile(config) + self.loadScenarioList() + + self.queue = queue.Queue(5) + if self.config["Clonk"]["Updater"]["enabled"]: + self.updater = Updater(self) + + def __ostream__(self, ostream): + return "PyCRCtrl: commandline: {}, channel: {}, scenario: {}".format(self.commandline, self.config["Clonk"]["IRC"]["Channels"]["ingame"], (self.scenario if self.scenario != "" else "None")) + + @property + def state(self): + return self.__state + + @state.setter + def state(self, text): + if text in ["Lobby", "Lädt", "Läuft"]: + self.__state = text + topic = "Aktuelles Szenario: {} | {}{} | Ingamechat ist {}.".format(self.scenario, self.state, (" | Liga" if self.scenario in self.league_scenlist else ""), self.ingamechat) + self.setTopic(topic) + + @property + def ingamechat(self): + return self.__ingamechat + + @ingamechat.setter + def ingamechat(self, text): + if text in ["aktiviert", "deaktiviert"]: + self.__ingamechat = text + self.state = self.state + + def loadScenarioList(self) -> bool: + if self.path == None: + return False + + with open(os.path.join(self.path,"scenlist"), "rb") as fobj: + self.scenlist = Unpickler(fobj).load() + + with open(os.path.join(self.path, "scenlist.league"), "rb") as fobj: + self.league_scenlist = Unpickler(fobj).load() + + return True + + + def loadConfigFile(self, config) -> bool: + if self.path == None: + return False + + conf = os.path.join(self.path, config) + + if os.path.isdir(conf): + return False + + elif os.path.isfile(conf): + self.config = self.setupConfig(json.load(open(conf, "r"))) + return True + + elif not os.path.exists(conf): + c = { + "General" : { + "useLogfile" : True + }, + + "Clonk" : { + "engine" : "clonk", + "commandline" : ["fullscreen", ["lobby",300], ["config","config.txt"], "record", "faircrew"], + "commandlinePrefix" : "/", + "prefix" : "@", + + "RegExps" : { + "lobbyStartExp" : "", + "startExp" : "", + "joinExp" : "", + "leaveExp" : "", + "shutdownExp" : "" + }, + + "autohost" : False, + "IRC" : { + "ingamechat" : True, + "Channels" : { + "parent" : "", + "ingame" : "" + } + }, + + "Rights" : { + "admin" : True, + "moderator" : True + }, + + "Updater" : { + "enabled" : False, + "RegExps" : { + "autobuild" : "", + "snapshot" : "" + }, + "Addresses" : { + "snapshotList" : "", + "snapshotDownload" : "", + "autobuildList" : "", + "autobuildAddress" : "" + } + } + } + } + + json.dump(c, open(conf, "w"), indent=4) + + self.config = self.setupConfig(c) + return True + + def setupConfig(self, config): + #commandline + sep = ":" if config["Clonk"]["commandlinePrefix"] == "/" else "=" + + res = "" + for entry in config["Clonk"]["commandline"]: + if isinstance(entry, dict): + for i in entry.items(): + res += "{}{}{}{} ".format(config["Clonk"]["commandlinePrefix"], (i[0]), sep, str(i[1])) + else: + res += "{}{} ".format(config["Clonk"]["commandlinePrefix"], entry) + + config["Clonk"]["commandline"] = res + + #regexps + + config["Clonk"]["RegExps"] = { + i:re.compile(self.decodeRegExp(config["Clonk"]["RegExps"][i], config["Clonk"]["encoding"])) for i in config["Clonk"]["RegExps"] + } + + self.ingamechat = config["Clonk"]["IRC"]["ingamechat"] + return config + + def decodeRegExp(self, regexp, encoding=None): + return base64.b64decode(regexp).decode(encoding or self.config["Clonk"]["encoding"]) + + def host(self, scenario=None, user=None) -> str: + if scenario == None: + return (CmdResult.SyntaxFail, "Bitte gib einen Szenarionamen an!") + if hasattr(scenario, "decode"): + try: + scenario = scenario.decode(self.config["Clonk"]["encoding"], "replace") + except: + return (CmdResult.RuntimeError, "Dekodierfehler. Bitte kontaktiere den Hoster dieses Servers.") + + scenario = scenario.splitlines()[0] + if scenario == "random": + scenario = random.choice(self.scenlist) + + elif scenario not in self.scenlist: + return (CmdResult.SyntaxFail,'Szenario "{}" wurde nicht gefunden!'.format(scenario)) + + if self.thread_started == False: + self.scenario = scenario + self.thread_started = True + start_new_thread(self.startClonk,()) + return (CmdResult.Success, 'Szenario "{}" wird jetzt gehostet.'.format(scenario)) + + if self.queue.full() == False: + self.queue.put(scenario) + return (CmdResult.Success, 'Szenario "{}" wurde der Warteschlange hinzugefügt.'.format(scenario)) + else: + return (CmdResult.RuntimeError, "Die Warteschlange ist voll!") + + def startClonk(self): + try: + while True: + if self.scenario == "": + if self.queue.empty() == False: + self.scenario = self.queue.get() + else: + self.scenario = random.choice(self.scenlist) + + self.clonk = subprocess.Popen( + './{} {} "{}"'.format(self.config["Clonk"]["engine"], self.config["Clonk"]["commandline"] + " " + self.config["Clonk"]["commandlinePrefix"] + ("league" if self.scenario in self.league_scenlist else "noleague"), self.scenario), + 0, + None, + subprocess.PIPE, + subprocess.PIPE, + subprocess.STDOUT, + shell=True, + cwd=self.path, + encoding=self.config["Clonk"]["encoding"] + ) + self.state = "Lobby" + self.readServerOutput() + if self.config["Clonk"]["autohost"] == False: + self.thread_started = False + self.setTopic("Kein laufendes Spiel.") + break + + finally: + if self.clonk: + self.clonk.stdin.close() + + return (CmdResult.Success, "") + + def readServerOutput(self): + while True: + try: + output = self.clonk.stdout.readline() + + if self.config["Clonk"]["RegExps"]["shutdownExp"].match(output): + self.clonk.stdin.close() + elif output == "" and self.clonk.poll() is not None: + if self.clonk: + self.clonk.stdin.close() + self.clonk = None + self.scenario = "" + return + + elif output: + output = output.splitlines()[0] + #output = output.decode("utf-8").splitlines()[0] + output = output[(output.find("] ") if output.find("] ") != -1 else -2)+len("] "):] + + if output[0] == ">": + output = output[1:] + + part = re.match("<(.*)> ({})(.*)".format(self.config["Clonk"]["prefix"]), output) + if part and part.group(0) != self.irc.nick: + cmd = part.group(3).split(" ", 1) + found = False + x = None + if len(cmd) > 0: + for key in self.commands.keys(): + if key == cmd[0].splitlines()[0]: + found = True + try: + x = self.commands[key](cmd[1].split(" "), user=part.group(1)) + except IndexError: + x = self.commands[key](user=part.group(1)) + break + + if not found: + self.writeToServer('Unbekannter Befehl: "' + part.group(2) + '"!') + if x: + if type(x) == tuple and x[1] != "": + self.writeToServer(x[1]) + del x + if self.config["Clonk"]["RegExps"]["lobbyStartExp"].match(output): + self.state = "Lädt" + + elif self.config["Clonk"]["RegExps"]["startExp"].match(output): + self.state = "Läuft" + try: + cout << output << endl + except (UnicodeDecodeError, UnicodeEncodeError): + pass + + if self.irc and self.ingamechat == "aktiviert": + if output.find("<" + self.irc.nick + ">") == -1: + if re.match(r"^<.*>", output) and output.find("[IRC]") == -1 and output.find(self.config["Clonk"]["prefix"]) == -1: + self.irc.reply("[Clonk]{}".format(output), to=self.config["Clonk"]["IRC"]["Channels"]["ingame"]) + + elif self.config["Clonk"]["RegExps"]["joinExp"].match(output) or self.config["Clonk"]["RegExps"]["leaveExp"].match(output): + self.irc.reply(output, to=self.config["Clonk"]["IRC"]["Channels"]["ingame"]) + + + except KeyboardInterrupt: + if self.clonk: + self.clonk.stdin.close() + + except Exception as e: + cerr << str(e) << endl + continue + + return (CmdResult.Success, "") + + def doPrivmsg(self, msg): + if not (self.irc and self.ingamechat == "aktiviert"): + return + for channel in msg.args[0].split(","): + if channel == self.config["Clonk"]["IRC"]["Channels"]["ingame"] and msg.nick != self.irc.nick: + self.writeToServer("[IRC]<{}> {}".format(msg.nick, msg.args[1])) + + + def writeToServer(self, text=None): + if text == None and self.clonk == None: + return (CmdResult.RuntimeError, "") + elif type(text) != str: + try: + text = text.decode(self.config["Clonk"]["encoding"]) + except: + raise IOError("Cannot write anything else to the server except the following data types: bytes, str (got {})".format(type(text).__name__)) + + if self.clonk and self.clonk.stdin: + self.clonk.stdin.write(text + "\n") + self.clonk.stdin.flush() + return (CmdResult.Success, "") + + def setTopic(self, text=None): + if not self.irc: + return False + + if type(text) != str: + try: + text = text.decode(self.config["Clonk"]["encoding"]) + except: + raise TypeError("text must be a string!") + + if self.topic == text: + return False + + else: + self.topic = text + return bool(self.irc.sendMsg(ircmsgs.topic(self.config["Clonk"]["IRC"]["Channels"]["ingame"], text))) + + + def start(self, time=None, user=""): + """Startet das Spiel.""" + self.stopped = False + if time: + try: + time = int(time[0]) + except: + time = 5 + self.writeToServer("/start {}".format(time)) + else: + self.writeToServer("/start") + + return (CmdResult.Success, "") + + + def stop(self, prm=None, user=""): + """Stoppt den Countdown.""" + def stopping(self): + while self.clonk and self.stopped: + self.writeToServer("/start 60000") + if self.stopped == False: + return + sleep(100) + + if self.stopped == False: + self.stopped = True + start_new_thread(stopping, (self,)) + + return (CmdResult.Success, "") + + + def help(self, prm=None, user=""): + """Gibt die Hilfe aus.""" + self.writeToServer("Verfügbare Befehle:") + for text, function in self.commands.items(): + self.writeToServer("{} -- {}".format(text, function.__doc__)) + + return (CmdResult.Success, "") + + + def displayQueue(self, prm=None, user=""): + """Gibt die Warteschlange aus.""" + self.writeToServer("Warteschlange:") + + for i,scen in enumerate(self.queue.queue): + self.writeToServer("{}. {}".format(i+1, scen)) + + return (CmdResult.Success, "") + + + def list(self, prm=None, user=""): + """Zeigt die Szenarioliste an.""" + self.writeToServer("List:\n-------------") + for scen in self.scenlist: + self.writeToServer(scen) + + return (CmdResult.Success, "") + + def ircCommands(self, prm=None, user=""): + """Enthält Befehle zur Steuerung der IRC-Funktionen.""" + if not prm: + return (CmdResult.SyntaxFail, "") + + if prm[0] == "ingamechat": + if prm[1] == "off": + self.ingamechat = "deaktiviert" + elif prm[1] == "on": + self.ingamechat = "aktiviert" + + return (CmdResult.Success, "") + + def IsRunning(self) -> bool: + return self.state != "Lobby" + + # + # functions + # + + def addCommand(self, function, text): + self.commands[text.split(" ")[0]] = function + return self + + def addScenario(self, link): + name = "" + for item in link.split("/"): + if re.match(r"(.*)\.[oc][c4]s",item): + name = item + break + + site = urllib.request.urlopen(link).read() #WARNING: Raises an error if the link is invalid! + with open(os.path.join(self.path, name),"wb") as fobj: + fobj.write(site) + + try: + self.scenlist.index(name) + except Exception: + self.scenlist.append(name) + Pickler(open(os.path.join(self.path, "scenlist"), "wb")).dump(self.scenlist) + return self + # # # def PyCRCtrlInit(): - """ - Initializes a PyCRCtrl - object. Use this function if you want to control only one object. - """ - pass + """ + Initializes a PyCRCtrl - object. Use this function if you want to control only one object. + """ + pass |
