# -*- coding:utf-8 -*- #This program is free software: you can redistribute it and/or modify #it under the terms of the GNU General Public License as published by #the Free Software Foundation, either version 3 of the License, or #(at your option) any later version. #This program is distributed in the hope that it will be useful, #but WITHOUT ANY WARRANTY; without even the implied warranty of #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #GNU General Public License for more details. #You should have received a copy of the GNU General Public License #along with this program. If not, see . import sys import socket import subprocess import urllib.request from _thread import start_new_thread import inspect import queue import traceback import os import PyQt5 import re import supybot.ircmsgs as ircmsgs from pickle import Unpickler from PyQt5.QtCore import * from iostream import * from random import randint from time import sleep from enum import IntEnum # # Helpers # class QString(str): pass class QStringList(QObject): """ QStringList """ typeclass = str strlist = list() def __init__(self, *args): QObject.__init__(self) self.strlist = list(args) def __getitem__(self, name): if type(name) != int: raise ValueError("Wrong datatype for name!") else: return self.strlist[name] def __lshift__(self, other): if type(other) != self.typeclass: raise ValueError("Wrong datatype") else: self.strlist.append(other) def __call__(self): return self.strlist def __len__(self): return len(self.strlist) # Methods def isEmpty(self): return len(self.strlist) == 0 class CmdResult(IntEnum): UnknownCommand = -1 Success = 0 SyntaxFail = 1 RightsFail = 2 RuntimeError = 3 class PyCRCtrl: """Server control""" clonk = None thread_started = False stopped = False autohost = False scenario = "" prefix = "@" commands = {} engine = "clonk" codec = QTextCodec.codecForName("Windows-1252") commandline = "/fullscreen /lobby:300 /config:config.txt /record /faircrew" path = QString() scenlist = QStringList() channel = "" topic = "Kein laufendes Spiel." __state = "Lobby" RegExps = { "lobbyStartExp" : "((?:Los geht's!|Action go!)\s*)", "startExp" : r"^Start!\s*$", "joinExp" : r"^Client (.+) (?:verbunden|connected)\.\s*$", "leaveExp" : "^Client (.+) (?:entfernt|removed)(.*)", "shutdownExp" : r"^Internetspiel ausgewertet(.*)" } def __init__(self, **kwargs): if sys.platform == "win32": raise NotImplementedError("{} wird nicht unterstützt!".format(sys.platform)) self.autohost = bool(kwargs["autohost"]) self.irc = kwargs["irc"] self.path = QString(kwargs["path"]) self.channel = QString(kwargs["channel"]) self.loadScenarioList() self.queue = queue.Queue(5) def __ostream__(self, ostream): return "PyCRCtrl: commandline: {}, channel: {}, scenario: {}".format(self.commandline, self.channel, (self.scenario if self.scenario != "" else "None")) def getState(self): return self.__state def setState(self, text): if text in ["Lobby", "Lädt", "Läuft"]: self.__state = text self.setTopic("Aktuelles Szenario: {} | {}".format(self.scenario, self.state)) state = property(getState, setState) def loadScenarioList(self): if self.path == None: return False with open(os.path.join(self.path,"scenlist"), "rb") as fobj: self.scenlist.strlist = Unpickler(fobj).load() return True def host(self, scenario=None) -> str: if scenario == None: return (CmdResult.SyntaxFail, "Bitte gib einen Szenarionamen an!") if type(scenario) in [bytes, QByteArray]: try: scenario = self.codec.toUnicode(scenario) except: return (CmdResult.SyntaxFail, "Unbekannter Datentyp!") scenario = scenario.splitlines()[0] if scenario == "random": scenario = self.scenlist[randint(0, len(self.scenlist)-1)] elif scenario not in self.scenlist(): return (CmdResult.SyntaxFail,'Szenario "{}" wurde nicht gefunden!'.format(scenario)) if self.thread_started == False: self.scenario = scenario self.thread_started = True start_new_thread(self.startClonk,()) return (CmdResult.Success, 'Szenario "{}" wird jetzt gehostet.'.format(scenario)) if self.queue.full() == False: self.queue.put(scenario) return (CmdResult.Success, 'Szenario "{}" wurde der Warteschlange hinzugefügt.'.format(scenario)) else: return (CmdResult.RuntimeError, "Die Warteschlange ist voll!") def startClonk(self): try: while True: if self.scenario == "": if self.queue.empty() == False: self.scenario = self.queue.get() else: self.scenario = self.scenlist[randint(0,len(self.scenlist))-1] self.clonk = subprocess.Popen( './{} {} "{}"'.format(self.engine, self.commandline,self.scenario), 0, None, subprocess.PIPE, subprocess.PIPE, subprocess.STDOUT, shell=True, cwd=self.path ) self.state = "Lobby" self.readServerOutput() if self.autohost == False: self.thread_started = False self.setTopic("Kein laufendes Spiel.") break finally: if self.clonk: self.clonk.stdin.close() return (CmdResult.Success, "") def readServerOutput(self): while True: try: output = self.clonk.stdout.readline() if bool(re.match(self.RegExps["shutdownExp"], self.codec.toUnicode(output))): self.clonk.stdin.close() elif output == b"" and self.clonk.poll() is not None: if self.clonk: self.clonk.stdin.close() self.clonk = None self.scenario = "" return elif output: output = self.codec.toUnicode(output).splitlines()[0] output = output[(output.find("] ") if output.find("] ") != -1 else -2)+len("] "):] if output[0] == ">": output = output[1:] if output.find(self.prefix) != -1: part = output[output.find(self.prefix)+len(self.prefix):].split(" ",1) found = False x = None for key in self.commands.keys(): if key == part[0].splitlines()[0]: found = True try: x = self.commands[key](part[1]) except IndexError: x = self.commands[key]() break if not found: self.writeToServer('Unbekannter Befehl: "' + part[0] + '"!') if x: if type(x) == tuple and x[1] != "": self.writeToServer(x[1]) del x if bool(re.match(self.RegExps["lobbyStartExp"], output)): self.state = "Lädt" elif bool(re.match(self.RegExps["startExp"], output)): self.state = "Läuft" #try: cout << output << endl #except: #pass # RegExps are from CRSM ((c) DerTod) if self.irc: if output.find("<" + self.irc.nick + ">") == -1: if bool(re.match(r"^<.*>", output) and output.find("[IRC]") == -1) and \ bool(output.find(self.prefix) == -1): self.irc.reply("[Clonk]{}".format(output), to=self.channel) elif bool(re.match(self.RegExps["joinExp"], output)) or \ bool(re.match(self.RegExps["leaveExp"], output)): self.irc.reply(output, to=self.channel) except KeyboardInterrupt: if self.clonk: self.clonk.stdin.close() except Exception as e: #ifdef DEBUG if self.clonk and __debug__ and not self.clonk.stdin.closed: self.writeToServer(b"Fehler:") #endif tb = traceback.format_exc() for line in tb.splitlines(): #ifdef DEBUG self.writeToServer(line) #endif try: cerr << line << endl except: pass continue return (CmdResult.Success, "") def doPrivmsg(self, msg): if not self.irc: return for channel in msg.args[0].split(","): if channel == self.channel and msg.nick != self.irc.nick: self.writeToServer("[IRC]<{}> {}".format(msg.nick, msg.args[1])) def writeToServer(self, text=None): if text == None and self.clonk == None: return (CmdResult.RuntimeError, "") elif type(text) not in [bytes, QByteArray]: try: text = self.codec.fromUnicode(text) except: raise IOError("Cannot write anything else to the server except the following data types: QString, bytes, str, QByteArray") if self.clonk and self.clonk.stdin: self.clonk.stdin.write(bytes(text) + b"\n") self.clonk.stdin.flush() return (CmdResult.Success, "") def setTopic(self, text=None): if not self.irc: return False if type(text) not in [str, QString]: try: text = self.codec.toUnicode(text) except: raise TypeError("text must be a string!") if self.topic == text: return False else: self.topic = text return bool(self.irc.sendMsg(ircmsgs.topic(self.channel, text))) def start(self, time=None): self.stopped = False if time: try: time = int(time.split(" ")[0]) except: time = 5 self.writeToServer("/start {}".format(time)) else: self.writeToServer(b"/start") self.clonk.stdin.flush() return (CmdResult.Success, "") def stop(self, prm=None): def stopping(self): while self.clonk and self.stopped: self.writeToServer("/start 60000") if self.stopped == False: return sleep(100) if self.stopped == False: self.stopped = True start_new_thread(stopping, (self,)) return (CmdResult.Success, "") def help(self, prm=None): self.writeToServer("Verfügbare Befehle:") for text, function in self.commands.items(): self.writeToServer(text) return (CmdResult.Success, "") def displayQueue(self, prm=None): self.writeToServer(b"Warteschlange:") i = 1 for scen in self.queue.queue: self.writeToServer("{}. {}".format(i, scen)) i += 1 return (CmdResult.Success, "") def list(self, prm=None): self.writeToServer(b"List:\n-------------") for scen in self.scenlist: self.writeToServer(scen) return (CmdResult.Success, "") def ircCommands(self, prm=None): if not prm: return (CmdResult.SyntaxFail, "") return (CmdResult.Success, "") def IsRunning(self) -> bool: return self.state != "Lobby" # # functions # def addCommand(self, function, text): self.commands[text.split(" ")[0]] = function return self class PyOCCtrl(PyCRCtrl): RegExps = { "lobbyStartExp" : "((?:Los geht's!|Action go!)\s*)", "startExp" : r"^Spiel gestartet.", "joinExp" : r"^Client (.+) (?:verbunden|connected)\.\s*$", "leaveExp" : "^Client (.+) (?:entfernt|removed)(.*)", "shutdownExp" : r"^Spiel ausgewertet(.*)" } engine = "openclonk-server" codec = QTextCodec.codecForName("UTF-8") commandline = "--fullscreen --lobby=300 --faircrew --record --config=config.txt" # # # def PyCRCtrlInit(): """ Initializes a PyCRCtrl - object. Use this function if you want to control only one object. """ pass