diff options
Diffstat (limited to 'pycrctrl.py')
| -rw-r--r-- | pycrctrl.py | 476 |
1 files changed, 476 insertions, 0 deletions
diff --git a/pycrctrl.py b/pycrctrl.py new file mode 100644 index 0000000..6204f5d --- /dev/null +++ b/pycrctrl.py @@ -0,0 +1,476 @@ +# -*- coding:utf-8 -*- + +#This program is free software: you can redistribute it and/or modify +#it under the terms of the GNU General Public License as published by +#the Free Software Foundation, either version 3 of the License, or +#(at your option) any later version. + +#This program is distributed in the hope that it will be useful, +#but WITHOUT ANY WARRANTY; without even the implied warranty of +#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +#GNU General Public License for more details. + +#You should have received a copy of the GNU General Public License +#along with this program. If not, see <http://www.gnu.org/licenses/>. + +import sys + +if sys.version_info < (3,6): + class ModuleNotFoundError(ImportError): + pass + +import socket +import subprocess +import urllib.request +import urllib.error +import urllib.parse + +from _thread import start_new_thread +import inspect +import queue +import traceback +import os +import re +import signal +import json +import base64 +import supybot.ircmsgs as ircmsgs + +from pickle import Pickler, Unpickler +from PyQt5.QtCore import * +try: + from iostream import * +except ModuleNotFoundError: + with open("iostream.py","wb") as fobj: + fobj.write(urllib.request.urlopen("https://raw.githubusercontent.com/Fulgen301/pythonprograms/master/iostream/iostream.py").read()) + + from iostream import * + +import random +from time import sleep +from enum import IntEnum + +# +# Helpers +# + +class QString(str): + pass + +class QStringList(QObject): + """ QStringList """ + + typeclass = str + strlist = list() + + def __init__(self, *args): + QObject.__init__(self) + self.strlist = list(args) + + def __getitem__(self, name): + if type(name) != int: + raise ValueError("Wrong datatype for name!") + else: + return self.strlist[name] + + def __lshift__(self, other): + if type(other) != self.typeclass: + raise ValueError("Wrong datatype") + else: + self.strlist.append(other) + + def __call__(self): + return self.strlist + + def __len__(self): + return len(self.strlist) + + # Methods + + def isEmpty(self): + return len(self.strlist) == 0 + +class CmdResult(IntEnum): + UnknownCommand = -1 + Success = 0 + SyntaxFail = 1 + RightsFail = 2 + RuntimeError = 3 + +class PyCRCtrl(object): + """Server control""" + + clonk = None + thread_started = False + stopped = False + config = {} + + scenario = "" + #prefix = "@" + commands = {} + #engine = "clonk" + codec = None + #commandline = "/fullscreen /lobby:300 /config:config.txt /record /faircrew" + + #path = QString() + scenlist = QStringList() + league_scenlist = QStringList() + + #channels = {} + topic = "Kein laufendes Spiel." + league = True + + __state = "Lobby" + + #RegExps = { + # "lobbyStartExp" : "((?:Los geht's!|Action go!)\s*)", + # "startExp" : r"^Start!\s*$", + # "joinExp" : r"^Client (.+) (?:verbunden|connected)\.\s*$", + # "leaveExp" : "^Client (.+) (?:entfernt|removed)(.*)", + # "shutdownExp" : r"^Internetspiel ausgewertet(.*)" + # } + + def __init__(self, irc=None, path=None): + if sys.platform == "win32": + raise NotImplementedError("{} wird nicht unterstützt!".format(sys.platform)) + + self.irc = irc + self.path = path + self.loadConfigFile() + self.loadScenarioList() + self.codec = codec = QTextCodec.codecForName(self.config.get("encoding")) + self.queue = queue.Queue(5) + + def __ostream__(self, ostream): + return "PyCRCtrl: commandline: {}, channel: {}, scenario: {}".format(self.commandline, self.channels["ingame"], (self.scenario if self.scenario != "" else "None")) + + @property + def state(self): + return self.__state + + @state.setter + def state(self, text): + if text in ["Lobby", "Lädt", "Läuft"]: + self.__state = text + self.setTopic("Aktuelles Szenario: {} | {}".format(self.scenario, self.state)) + + def loadScenarioList(self) -> bool: + if self.path == None: + return False + + with open(os.path.join(self.path,"scenlist"), "rb") as fobj: + self.scenlist.strlist = Unpickler(fobj).load() + + with open(os.path.join(self.path, "scenlist.league"), "rb") as fobj: + self.league_scenlist.strlist = Unpickler(fobj).load() + + return True + + def loadConfigFile(self): + if self.path == None: + return False + + self.config = json.load(open(os.path.join(self.path, "pycrctrl.conf"),"r")) + return True + + def decodeRegExp(self, regexp): + return self.codec.toUnicode(base64.b64decode(regexp)) + + def setConfigEntry(self, entry="", value=None): + if type(self.config) != dict: + return + + try: + self.config[entry] = value + finally: + json.dump(self.config, open(os.path.join(self.path, "pycrctrl.conf"), "w"),indent=4) + + return True + + def host(self, scenario=None) -> str: + if scenario == None: + return (CmdResult.SyntaxFail, "Bitte gib einen Szenarionamen an!") + if type(scenario) in [bytes, QByteArray]: + try: + scenario = self.codec.toUnicode(scenario) + except: + return (CmdResult.SyntaxFail, "Unbekannter Datentyp!") + + scenario = scenario.splitlines()[0] + if scenario == "random": + scenario = random.choice(self.scenlist) + + elif scenario not in self.scenlist(): + return (CmdResult.SyntaxFail,'Szenario "{}" wurde nicht gefunden!'.format(scenario)) + + if self.thread_started == False: + self.scenario = scenario + self.thread_started = True + start_new_thread(self.startClonk,()) + return (CmdResult.Success, 'Szenario "{}" wird jetzt gehostet.'.format(scenario)) + + if self.queue.full() == False: + self.queue.put(scenario) + return (CmdResult.Success, 'Szenario "{}" wurde der Warteschlange hinzugefügt.'.format(scenario)) + else: + return (CmdResult.RuntimeError, "Die Warteschlange ist voll!") + + def startClonk(self): + try: + while True: + if self.scenario == "": + if self.queue.empty() == False: + self.scenario = self.queue.get() + else: + self.scenario = random.choice(self.scenlist) + self.clonk = subprocess.Popen( + './{} {} "{}"'.format(self.config.get("engine"), self.config.get("commandline") + (" --" if type(self).__name__ == "PyOCCtrl" else " /") + ("league" if self.scenario in self.league_scenlist() else "noleague"),self.scenario), + 0, + None, + subprocess.PIPE, + subprocess.PIPE, + subprocess.STDOUT, + shell=True, + cwd=self.path + ) + self.state = "Lobby" + self.readServerOutput() + if self.config.get("autohost") == False: + self.thread_started = False + self.setTopic("Kein laufendes Spiel.") + break + + finally: + if self.clonk: + self.clonk.stdin.close() + + return (CmdResult.Success, "") + + def readServerOutput(self): + while True: + try: + output = self.clonk.stdout.readline() + + if bool(re.match(self.decodeRegExp(self.config.get("RegExps")["shutdownExp"]), self.codec.toUnicode(output))): + self.clonk.stdin.close() + elif output == b"" and self.clonk.poll() is not None: + if self.clonk: + self.clonk.stdin.close() + self.clonk = None + self.scenario = "" + return + + elif output: + output = self.codec.toUnicode(output).splitlines()[0] + output = output[(output.find("] ") if output.find("] ") != -1 else -2)+len("] "):] + + if output[0] == ">": + output = output[1:] + if output.find(self.config.get("prefix")) != -1 and output.find("<{}>".format(self.irc.nick)) == -1: + part = output[output.find(self.config.get("prefix"))+len(self.config.get("prefix")):].split(" ",1) + found = False + x = None + if len(part) > 0: + for key in self.commands.keys(): + if key == part[0].splitlines()[0]: + found = True + try: + x = self.commands[key](part[1]) + except IndexError: + x = self.commands[key]() + break + + if not found: + self.writeToServer('Unbekannter Befehl: "' + part[0] + '"!') + if x: + if type(x) == tuple and x[1] != "": + self.writeToServer(x[1]) + del x + if bool(re.match(self.decodeRegExp(self.config.get("RegExps")["lobbyStartExp"]), output)): + self.state = "Lädt" + + elif bool(re.match(self.decodeRegExp(self.config.get("RegExps")["startExp"]), output)): + self.state = "Läuft" + try: + cout << output << endl + except (UnicodeDecodeError, UnicodeEncodeError): + pass + + if self.irc: + if output.find("<" + self.irc.nick + ">") == -1: + if bool(re.match(r"^<.*>", output) and output.find("[IRC]") == -1) and \ + bool(output.find(self.config.get("prefix")) == -1): + self.irc.reply("[Clonk]{}".format(output), to=self.config.get("channels")["ingame"]) + + elif bool(re.match(self.decodeRegExp(self.config.get("RegExps")["joinExp"]), output)) or \ + bool(re.match(self.decodeRegExp(self.config.get("RegExps")["leaveExp"]), output)): + self.irc.reply(output, to=self.config.get("channels")["ingame"]) + + + except KeyboardInterrupt: + if self.clonk: + self.clonk.stdin.close() + + except Exception as e: +#ifdef DEBUG + if self.clonk and __debug__ and not self.clonk.stdin.closed: + self.writeToServer(b"Fehler:") +#endif + tb = traceback.format_exc() + for line in tb.splitlines(): +#ifdef DEBUG + self.writeToServer(line) +#endif + try: + cerr << line << endl + except: + pass + continue + + return (CmdResult.Success, "") + + def doPrivmsg(self, msg): + if not self.irc: + return + for channel in msg.args[0].split(","): + if channel == self.config.get("channels")["ingame"] and msg.nick != self.irc.nick: + self.writeToServer("[IRC]<{}> {}".format(msg.nick, msg.args[1])) + + + def writeToServer(self, text=None): + if text == None and self.clonk == None: + return (CmdResult.RuntimeError, "") + elif type(text) not in [bytes, QByteArray]: + try: + text = self.codec.fromUnicode(text) + except: + raise IOError("Cannot write anything else to the server except the following data types: QString, bytes, str, QByteArray") + + if self.clonk and self.clonk.stdin: + self.clonk.stdin.write(bytes(text) + b"\n") + self.clonk.stdin.flush() + return (CmdResult.Success, "") + + def setTopic(self, text=None): + if not self.irc: + return False + + if type(text) not in [str, QString]: + try: + text = self.codec.toUnicode(text) + except: + raise TypeError("text must be a string!") + + if self.topic == text: + return False + + else: + self.topic = text + return bool(self.irc.sendMsg(ircmsgs.topic(self.config.get("channels")["ingame"], text))) + + def start(self, time=None): + self.stopped = False + if time: + try: + time = int(time.split(" ")[0]) + except: + time = 5 + self.writeToServer("/start {}".format(time)) + else: + self.writeToServer(b"/start") + + return (CmdResult.Success, "") + + + def stop(self, prm=None): + def stopping(self): + while self.clonk and self.stopped: + self.writeToServer("/start 60000") + if self.stopped == False: + return + sleep(100) + + if self.stopped == False: + self.stopped = True + start_new_thread(stopping, (self,)) + + return (CmdResult.Success, "") + + + def help(self, prm=None): + self.writeToServer("Verfügbare Befehle:") + for text, function in self.commands.items(): + self.writeToServer(text) + + return (CmdResult.Success, "") + + + def displayQueue(self, prm=None): + self.writeToServer(b"Warteschlange:") + + for i,scen in enumerate(self.queue.queue): + self.writeToServer("{}. {}".format(i+1, scen)) + + return (CmdResult.Success, "") + + + def list(self, prm=None): + self.writeToServer(b"List:\n-------------") + for scen in self.scenlist: + self.writeToServer(scen) + + return (CmdResult.Success, "") + + def ircCommands(self, prm=None): + if not prm: + return (CmdResult.SyntaxFail, "") + + return (CmdResult.Success, "") + + def IsRunning(self) -> bool: + return self.state != "Lobby" + + # + # functions + # + + def addCommand(self, function, text): + self.commands[text.split(" ")[0]] = function + return self + + def addScenario(self, link): + name = "" + for item in link.split("/"): + if re.match(r"(.*)\.[oc][c4]s",item): + name = item + break + + site = urllib.request.urlopen(link).read() #WARNING: Raises an error if the link is invalid! + with open(os.path.join(self.path, name),"wb") as fobj: + fobj.write(site) + + try: + self.scenlist().index(name) + except Exception: + self.scenlist().append(name) + Pickler(open(os.path.join(self.path, "scenlist"), "wb")).dump(self.scenlist()) + return self + + + +class PyOCCtrl(PyCRCtrl): + + + + engine = "openclonk-server" + codec = QTextCodec.codecForName("UTF-8") + commandline = "--fullscreen --lobby=300 --faircrew --record --config=config.txt" + +# +# +# +def PyCRCtrlInit(): + """ + Initializes a PyCRCtrl - object. Use this function if you want to control only one object. + """ + pass |
