# -*- coding:utf-8 -*- #This program is free software: you can redistribute it and/or modify #it under the terms of the GNU General Public License as published by #the Free Software Foundation, either version 3 of the License, or #(at your option) any later version. #This program is distributed in the hope that it will be useful, #but WITHOUT ANY WARRANTY; without even the implied warranty of #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #GNU General Public License for more details. #You should have received a copy of the GNU General Public License #along with this program. If not, see . import sys #import socket import subprocess import urllib.request import urllib.error import urllib.parse from _thread import start_new_thread #import inspect import queue import os import re #import signal import json import base64 import supybot.log import supybot.ircmsgs as ircmsgs from pickle import Pickler, Unpickler from PyQt5.QtCore import * from time import sleep from platform import architecture from io import BytesIO #iostream import try: from iostream import * except ModuleNotFoundError: with open("iostream.py","wb") as fobj: fobj.write(urllib.request.urlopen("https://raw.githubusercontent.com/Fulgen301/pythonprograms/master/iostream/iostream.py").read()) from iostream import * import random import tarfile from gzip import GzipFile from time import sleep from enum import IntEnum # # Helpers # class QString(str): pass class QList(QObject): """ QStringList """ list = list() typeclass = None def __init__(self, *args): QObject.__init__(self) self.list = list(args) def __getitem__(self, name): if type(name) != int: raise ValueError("Wrong datatype for name!") else: try: return self.list[name] except IndexError as e: raise IndexError(e.args[0]) from None def __lshift__(self, other): if self.typeclass and not isinstance(other, self.typeclass): raise ValueError("Wrong datatype") else: self.list.append(other) return self def __rshift__(self, other): if isinstance(other, list) or isinstance(other, QList): item = self.list[-1] try: other.append(item) self.list.pop() except Exception: raise TypeError("Cannot pass item to {}!".format(other)) from None def __call__(self): return self.list def __len__(self): return len(self.list) def __repr__(self): return "{}({})".format(self.__class__.__name__, self.list) # Methods def isEmpty(self): return len(self.list) == 0 def append(self, item): return self.__lshift__(item) class QStringList(QList): typeclass = str class CmdResult(IntEnum): UnknownCommand = -1 Success = 0 SyntaxFail = 1 RightsFail = 2 RuntimeError = 3 class Capability(IntEnum): Unknown = -1 User = 0 Admin = 1 Moderator = 2 class Updater(object): parent = None __current_revision = "" lookuptable = {"64bit" : "amd64", "32bit" : "i386"} def __init__(self, parent): self.parent = parent with open(os.path.join(self.parent.path, "snapshot.id"), "rb") as fobj: self.__current_revision = fobj.read().decode("utf-8") start_new_thread(self.checkForUpdates, ()) @property def current_revision(self): return self.__current_revision @current_revision.setter def current_revision(self, other): self.__current_revision = other if type(other) in [QString, str]: try: other = other.encode("utf-8") except Exception: raise TypeError("Wrong datatype!") from None with open(os.path.join(self.parent.path, "snapshot.id"), "wb") as fobj: fobj.write(other) def checkForUpdates(self): while True: try: site = urllib.request.urlopen("http://openclonk.org/nightly-builds").read().decode("utf-8").split(" bool: if self.path == None: return False with open(os.path.join(self.path,"scenlist"), "rb") as fobj: self.scenlist.list = Unpickler(fobj).load() with open(os.path.join(self.path, "scenlist.league"), "rb") as fobj: self.league_scenlist.list = Unpickler(fobj).load() return True def loadCapabilities(self) -> bool: if self.path == None: return False self._capa_old = json.load(open(os.path.join(self.path, "capabilities.conf"), "r")) return True def loadConfigFile(self, config) -> bool: if self.path == None: return False self.config = json.load(open(os.path.join(self.path, config),"r")) return True def decodeRegExp(self, regexp): return self.codec.toUnicode(base64.b64decode(regexp)) def setConfigEntry(self, entry="", value=None): if type(self.config) != dict: return try: self.config[entry] = value finally: json.dump(self.config, open(os.path.join(self.path, "pycrctrl.conf"), "w"),indent=4) return True def host(self, scenario=None, user=None) -> str: if scenario == None: return (CmdResult.SyntaxFail, "Bitte gib einen Szenarionamen an!") if type(scenario) in [bytes, QByteArray]: try: scenario = self.codec.toUnicode(scenario) except: return (CmdResult.SyntaxFail, "Unbekannter Datentyp!") scenario = scenario.splitlines()[0] if scenario == "random": scenario = random.choice(self.scenlist) elif scenario not in self.scenlist(): return (CmdResult.SyntaxFail,'Szenario "{}" wurde nicht gefunden!'.format(scenario)) if self.thread_started == False: self.scenario = scenario self.thread_started = True start_new_thread(self.startClonk,()) return (CmdResult.Success, 'Szenario "{}" wird jetzt gehostet.'.format(scenario)) if self.queue.full() == False: self.queue.put(scenario) return (CmdResult.Success, 'Szenario "{}" wurde der Warteschlange hinzugefügt.'.format(scenario)) else: return (CmdResult.RuntimeError, "Die Warteschlange ist voll!") def startClonk(self): try: while True: if self.scenario == "": if self.queue.empty() == False: self.scenario = self.queue.get() else: self.scenario = random.choice(self.scenlist) self.capabilities = self._capa_old self.clonk = subprocess.Popen( './{} {} "{}"'.format(self.config.get("engine"), self.config.get("commandline") + (" --" if self.config.get("engine") == "openclonk-server" else " /") + ("league" if self.scenario in self.league_scenlist() else "noleague"),self.scenario), 0, None, subprocess.PIPE, subprocess.PIPE, subprocess.STDOUT, shell=True, cwd=self.path ) self.state = "Lobby" self.readServerOutput() if self.config.get("autohost") == False: self.thread_started = False self.setTopic("Kein laufendes Spiel.") break finally: if self.clonk: self.clonk.stdin.close() return (CmdResult.Success, "") def readServerOutput(self): while True: try: output = self.clonk.stdout.readline() if bool(re.match(self.decodeRegExp(self.config.get("RegExps")["shutdownExp"]), self.codec.toUnicode(output))): self.clonk.stdin.close() elif output == b"" and self.clonk.poll() is not None: if self.clonk: self.clonk.stdin.close() self.clonk = None self.scenario = "" return elif output: output = self.codec.toUnicode(output).splitlines()[0] #output = output.decode("utf-8").splitlines()[0] output = output[(output.find("] ") if output.find("] ") != -1 else -2)+len("] "):] if output[0] == ">": output = output[1:] part = re.match("<(.*)> ({})(.*)".format(self.config.get("prefix")), output) if part and part.group(0) != self.irc.nick: cmd = part.group(3).split(" ", 1) found = False x = None if len(cmd) > 0: for key in self.commands.keys(): if key == cmd[0].splitlines()[0]: found = True try: x = self.commands[key](cmd[1].split(" "), user=part.group(1)) except IndexError: x = self.commands[key](user=part.group(1)) break if not found: self.writeToServer('Unbekannter Befehl: "' + part.group(2) + '"!') if x: if type(x) == tuple and x[1] != "": self.writeToServer(x[1]) del x if bool(re.match(self.decodeRegExp(self.config.get("RegExps")["lobbyStartExp"]), output)): self.state = "Lädt" elif bool(re.match(self.decodeRegExp(self.config.get("RegExps")["startExp"]), output)): self.state = "Läuft" try: cout << output << endl except (UnicodeDecodeError, UnicodeEncodeError): pass if self.irc and self.ingamechat == "aktiviert": if output.find("<" + self.irc.nick + ">") == -1: if re.match(r"^<.*>", output) and output.find("[IRC]") == -1 and output.find(self.config.get("prefix")) == -1: self.irc.reply("[Clonk]{}".format(output), to=self.config.get("channels")["ingame"]) elif re.match(self.decodeRegExp(self.config.get("RegExps")["joinExp"]), output) or re.match(self.decodeRegExp(self.config.get("RegExps")["leaveExp"]), output): self.irc.reply(output, to=self.config.get("channels")["ingame"]) except KeyboardInterrupt: if self.clonk: self.clonk.stdin.close() except Exception as e: cerr << str(e) << endl continue return (CmdResult.Success, "") def doPrivmsg(self, msg): if not (self.irc and self.ingamechat == "aktiviert"): return for channel in msg.args[0].split(","): if channel == self.config.get("channels")["ingame"] and msg.nick != self.irc.nick: self.writeToServer("[IRC]<{}> {}".format(msg.nick, msg.args[1])) def writeToServer(self, text=None): if text == None and self.clonk == None: return (CmdResult.RuntimeError, "") elif type(text) not in [bytes, QByteArray]: try: text = self.codec.toUnicode(text) except: raise IOError("Cannot write anything else to the server except the following data types: QString, bytes, str, QByteArray") if self.clonk and self.clonk.stdin: self.clonk.stdin.write(bytes(text) + b"\n") self.clonk.stdin.flush() return (CmdResult.Success, "") def setTopic(self, text=None): if not self.irc: return False if type(text) not in [str, QString]: try: text = self.codec.toUnicode(text) except: raise TypeError("text must be a string!") if self.topic == text: return False else: self.topic = text return bool(self.irc.sendMsg(ircmsgs.topic(self.config.get("channels")["ingame"], text))) def admin(self, prm=None, user=""): """Setzt den Rundenadmin.""" try: if not prm: raise IndexError prm = " ".join(prm) if (self.capabilities[prm], self.capabilities[user]) > (Capability.Admin, Capability.Admin): return (CmdResult.RuntimeError, "Du hast bereits mehr Rechte als ein Rundenadmin!") except KeyError: pass except IndexError: return (CmdResult.SyntaxFail, "Nicht genügend Argumente!") for u, c in self.capabilities.items(): if c == Capability.Admin: return (CmdResult.RuntimeError, "{} ist bereits Rundenadmin!".format(u)) self.capabilities[prm] = self.capabilities[user] = Capability.Admin return (CmdResult.Success, "{} (Spielername: {}) wurde als Rundenadmin eingetragen!".format(user, prm)) def start(self, time=None, user=""): """Startet das Spiel.""" if not self.checkCapability(Capability.Admin, user): return (CmdResult.RightsFail, "Du hast nicht die nötige Berechtigung.".encode("utf-8")) self.stopped = False if time: try: time = int(time[0]) except: time = 5 self.writeToServer("/start {}".format(time)) else: self.writeToServer(b"/start") return (CmdResult.Success, "") def stop(self, prm=None, user=""): """Stoppt den Countdown.""" if not self.checkCapability(Capability.Admin, user): return (CmdResult.RightsFail, "Du hast nicht die nötige Berechtigung.".encode("utf-8")) def stopping(self): while self.clonk and self.stopped: self.writeToServer("/start 60000") if self.stopped == False: return sleep(100) if self.stopped == False: self.stopped = True start_new_thread(stopping, (self,)) return (CmdResult.Success, "") def help(self, prm=None, user=""): """Gibt die Hilfe aus.""" self.writeToServer("Verfügbare Befehle:".encode("utf-8")) for text, function in self.commands.items(): self.writeToServer("{} -- {}".format(text, function.__doc__)) return (CmdResult.Success, "") def displayQueue(self, prm=None, user=""): """Gibt die Warteschlange aus.""" self.writeToServer(b"Warteschlange:") for i,scen in enumerate(self.queue.queue): self.writeToServer("{}. {}".format(i+1, scen)) return (CmdResult.Success, "") def list(self, prm=None, user=""): """Zeigt die Szenarioliste an.""" self.writeToServer(b"List:\n-------------") for scen in self.scenlist: self.writeToServer(scen) return (CmdResult.Success, "") def ircCommands(self, prm=None, user=""): """Enthält Befehle zur Steuerung der IRC-Funktionen.""" if not self.checkCapability(Capability.Admin, user): return (CmdResult.RightsFail, "Du hast nicht die nötige Berechtigung.".encode("utf-8")) if not prm: return (CmdResult.SyntaxFail, "") if prm[0] == "ingamechat": if prm[1] == "off": self.ingamechat = "deaktiviert" elif prm[1] == "on": self.ingamechat = "aktiviert" return (CmdResult.Success, "") def IsRunning(self) -> bool: return self.state != "Lobby" # # functions # def addCommand(self, function, text): self.commands[text.split(" ")[0]] = function return self def addScenario(self, link): name = "" for item in link.split("/"): if re.match(r"(.*)\.[oc][c4]s",item): name = item break site = urllib.request.urlopen(link).read() #WARNING: Raises an error if the link is invalid! with open(os.path.join(self.path, name),"wb") as fobj: fobj.write(site) try: self.scenlist().index(name) except Exception: self.scenlist().append(name) Pickler(open(os.path.join(self.path, "scenlist"), "wb")).dump(self.scenlist()) return self def checkCapability(self, capability, user=""): if not user: return False try: return (True if self.capabilities[user] >= capability else False) except KeyError: return False # # # def PyCRCtrlInit(): """ Initializes a PyCRCtrl - object. Use this function if you want to control only one object. """ pass