diff options
Diffstat (limited to 'pycrctrl.py')
| -rw-r--r-- | pycrctrl.py | 424 |
1 files changed, 0 insertions, 424 deletions
diff --git a/pycrctrl.py b/pycrctrl.py deleted file mode 100644 index 23d9bbc..0000000 --- a/pycrctrl.py +++ /dev/null @@ -1,424 +0,0 @@ -# -*- coding:utf-8 -*- - -#This program is free software: you can redistribute it and/or modify -#it under the terms of the GNU General Public License as published by -#the Free Software Foundation, either version 3 of the License, or -#(at your option) any later version. - -#This program is distributed in the hope that it will be useful, -#but WITHOUT ANY WARRANTY; without even the implied warranty of -#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -#GNU General Public License for more details. - -#You should have received a copy of the GNU General Public License -#along with this program. If not, see <http://www.gnu.org/licenses/>. - -import sys - -import socket -import subprocess -import urllib.request -from _thread import start_new_thread -import inspect -import queue -import traceback -import os -import PyQt5 -import re -import supybot.ircmsgs as ircmsgs - -from pickle import Unpickler -from PyQt5.QtCore import * -from iostream import * - -from random import randint -from time import sleep -from enum import IntEnum - -# -# Helpers -# - -class QString(str): - pass - -class QStringList(QObject): - """ QStringList """ - - typeclass = str - strlist = list() - - def __init__(self, *args): - QObject.__init__(self) - self.strlist = list(args) - - def __getitem__(self, name): - if type(name) != int: - raise ValueError("Wrong datatype for name!") - else: - return self.strlist[name] - - def __lshift__(self, other): - if type(other) != self.typeclass: - raise ValueError("Wrong datatype") - else: - self.strlist.append(other) - - def __call__(self): - return self.strlist - - def __len__(self): - return len(self.strlist) - - # Methods - - def isEmpty(self): - return len(self.strlist) == 0 - -class CmdResult(IntEnum): - UnknownCommand = -1 - Success = 0 - SyntaxFail = 1 - RightsFail = 2 - RuntimeError = 3 - -class PyCRCtrl: - """Server control""" - - clonk = None - thread_started = False - stopped = False - autohost = False - - scenario = "" - prefix = "@" - commands = {} - engine = "clonk" - codec = QTextCodec.codecForName("Windows-1252") - commandline = "/fullscreen /lobby:300 /config:config.txt /record /faircrew" - - path = QString() - scenlist = QStringList() - - channel = "" - topic = "Kein laufendes Spiel." - - __state = "Lobby" - - RegExps = { - "lobbyStartExp" : "((?:Los geht's!|Action go!)\s*)", - "startExp" : r"^Start!\s*$", - "joinExp" : r"^Client (.+) (?:verbunden|connected)\.\s*$", - "leaveExp" : "^Client (.+) (?:entfernt|removed)(.*)", - "shutdownExp" : r"^Internetspiel ausgewertet(.*)" - } - - def __init__(self, **kwargs): - if sys.platform == "win32": - raise NotImplementedError("{} wird nicht unterstützt!".format(sys.platform)) - - self.autohost = bool(kwargs["autohost"]) - self.irc = kwargs["irc"] - self.path = QString(kwargs["path"]) - self.channel = QString(kwargs["channel"]) - self.loadScenarioList() - self.queue = queue.Queue(5) - - def __ostream__(self, ostream): - return "PyCRCtrl: commandline: {}, channel: {}, scenario: {}".format(self.commandline, self.channel, (self.scenario if self.scenario != "" else "None")) - - def getState(self): - return self.__state - - def setState(self, text): - if text in ["Lobby", "Lädt", "Läuft"]: - self.__state = text - self.setTopic("Aktuelles Szenario: {} | {}".format(self.scenario, self.state)) - - state = property(getState, setState) - - def loadScenarioList(self): - if self.path == None: - return False - - with open(os.path.join(self.path,"scenlist"), "rb") as fobj: - self.scenlist.strlist = Unpickler(fobj).load() - - return True - - def host(self, scenario=None) -> str: - if scenario == None: - return (CmdResult.SyntaxFail, "Bitte gib einen Szenarionamen an!") - if type(scenario) in [bytes, QByteArray]: - try: - scenario = self.codec.toUnicode(scenario) - except: - return (CmdResult.SyntaxFail, "Unbekannter Datentyp!") - - scenario = scenario.splitlines()[0] - if scenario == "random": - scenario = self.scenlist[randint(0, len(self.scenlist)-1)] - - elif scenario not in self.scenlist(): - return (CmdResult.SyntaxFail,'Szenario "{}" wurde nicht gefunden!'.format(scenario)) - - if self.thread_started == False: - self.scenario = scenario - self.thread_started = True - start_new_thread(self.startClonk,()) - return (CmdResult.Success, 'Szenario "{}" wird jetzt gehostet.'.format(scenario)) - - if self.queue.full() == False: - self.queue.put(scenario) - return (CmdResult.Success, 'Szenario "{}" wurde der Warteschlange hinzugefügt.'.format(scenario)) - else: - return (CmdResult.RuntimeError, "Die Warteschlange ist voll!") - - def startClonk(self): - try: - while True: - if self.scenario == "": - if self.queue.empty() == False: - self.scenario = self.queue.get() - else: - self.scenario = self.scenlist[randint(0,len(self.scenlist))-1] - self.clonk = subprocess.Popen( - './{} {} "{}"'.format(self.engine, self.commandline,self.scenario), - 0, - None, - subprocess.PIPE, - subprocess.PIPE, - subprocess.STDOUT, - shell=True, - cwd=self.path - ) - self.state = "Lobby" - self.readServerOutput() - if self.autohost == False: - self.thread_started = False - self.setTopic("Kein laufendes Spiel.") - break - - finally: - if self.clonk: - self.clonk.stdin.close() - - return (CmdResult.Success, "") - - def readServerOutput(self): - while True: - try: - output = self.clonk.stdout.readline() - - if bool(re.match(self.RegExps["shutdownExp"], self.codec.toUnicode(output))): - self.clonk.stdin.close() - elif output == b"" and self.clonk.poll() is not None: - if self.clonk: - self.clonk.stdin.close() - self.clonk = None - self.scenario = "" - return - - elif output: - output = self.codec.toUnicode(output).splitlines()[0] - output = output[(output.find("] ") if output.find("] ") != -1 else -2)+len("] "):] - - if output[0] == ">": - output = output[1:] - - if output.find(self.prefix) != -1: - part = output[output.find(self.prefix)+len(self.prefix):].split(" ",1) - found = False - x = None - for key in self.commands.keys(): - if key == part[0].splitlines()[0]: - found = True - try: - x = self.commands[key](part[1]) - except IndexError: - x = self.commands[key]() - break - - if not found: - self.writeToServer('Unbekannter Befehl: "' + part[0] + '"!') - if x: - if type(x) == tuple and x[1] != "": - self.writeToServer(x[1]) - del x - - if bool(re.match(self.RegExps["lobbyStartExp"], output)): - self.state = "Lädt" - - elif bool(re.match(self.RegExps["startExp"], output)): - self.state = "Läuft" - #try: - cout << output << endl - #except: - #pass - - # RegExps are from CRSM ((c) DerTod) - if self.irc: - if output.find("<" + self.irc.nick + ">") == -1: - if bool(re.match(r"^<.*>", output) and output.find("[IRC]") == -1) and \ - bool(output.find(self.prefix) == -1): - self.irc.reply("[Clonk]{}".format(output), to=self.channel) - - elif bool(re.match(self.RegExps["joinExp"], output)) or \ - bool(re.match(self.RegExps["leaveExp"], output)): - self.irc.reply(output, to=self.channel) - - - except KeyboardInterrupt: - if self.clonk: - self.clonk.stdin.close() - - except Exception as e: - #if self.clonk and __debug__ and not self.clonk.stdin.closed: - # self.writeToServer(b"Fehler:") - tb = traceback.format_exc() - ## - for line in tb.splitlines(): - # self.writeToServer(line) - #try: - cerr << line << endl - #except: - # pass - #else: - # continue - continue - - return (CmdResult.Success, "") - - def doPrivmsg(self, msg): - if not self.irc: - return - for channel in msg.args[0].split(","): - if channel == self.channel and msg.nick != self.irc.nick: - self.writeToServer("[IRC]<{}> {}".format(msg.nick, msg.args[1])) - - - def writeToServer(self, text=None): - if text == None and self.clonk == None: - return (CmdResult.RuntimeError, "") - elif type(text) not in [bytes, QByteArray]: - try: - text = self.codec.fromUnicode(text) - except: - raise IOError("Cannot write anything else to the server except the following data types: QString, bytes, str, QByteArray") - - if self.clonk and self.clonk.stdin: - self.clonk.stdin.write(bytes(text) + b"\n") - self.clonk.stdin.flush() - return (CmdResult.Success, "") - - def setTopic(self, text=None): - if not self.irc: - return False - - if type(text) not in [str, QString]: - try: - text = self.codec.toUnicode(text) - except: - raise TypeError("text must be a string!") - - if self.topic == text: - return False - - else: - self.topic = text - return bool(self.irc.sendMsg(ircmsgs.topic(self.channel, text))) - - def start(self, time=None): - self.stopped = False - if time: - try: - time = int(time.split(" ")[0]) - except: - time = 5 - self.writeToServer("/start {}".format(time)) - else: - self.writeToServer(b"/start") - self.clonk.stdin.flush() - - return (CmdResult.Success, "") - - - def stop(self, prm=None): - def stopping(self): - while self.clonk and self.stopped: - self.writeToServer("/start 6000") - if self.stopped == False: - return - sleep(10) - - if self.stopped == False: - self.stopped = True - start_new_thread(stopping, (self,)) - - return (CmdResult.Success, "") - - - def help(self, prm=None): - self.writeToServer("Verfügbare Befehle:") - for text, function in self.commands.items(): - self.writeToServer(text) - - return (CmdResult.Success, "") - - - def displayQueue(self, prm=None): - self.writeToServer(b"Warteschlange:") - i = 1 - for scen in self.queue.queue: - self.writeToServer("{}. {}".format(i, scen)) - i += 1 - - return (CmdResult.Success, "") - - - def list(self, prm=None): - self.writeToServer(b"List:\n-------------") - for scen in self.scenlist: - self.writeToServer(scen) - - return (CmdResult.Success, "") - - def ircCommands(self, prm=None): - if not prm: - return (CmdResult.SyntaxFail, "") - - return (CmdResult.Success, "") - - def IsRunning(self) -> bool: - return self.state != "Lobby" - - # - # functions - # - - def addCommand(self, function, text): - self.commands[text.split(" ")[0]] = function - return self - -class PyOCCtrl(PyCRCtrl): - - RegExps = { - "lobbyStartExp" : "((?:Los geht's!|Action go!)\s*)", - "startExp" : r"^Spiel gestartet.", - "joinExp" : r"^Client (.+) (?:verbunden|connected)\.\s*$", - "leaveExp" : "^Client (.+) (?:entfernt|removed)(.*)", - "shutdownExp" : r"^Spiel ausgewertet(.*)" - } - - engine = "openclonk-server" - codec = QTextCodec.codecForName("UTF-8") - commandline = "--fullscreen --lobby=300 --faircrew --record --config=config.txt" - -# -# -# -def PyCRCtrlInit(): - """ - Initializes a PyCRCtrl - object. Use this function if you want to control only one object. - """ - pass |
