aboutsummaryrefslogtreecommitdiffstats
path: root/pycrctrl.py
diff options
context:
space:
mode:
authorFulgen301 <tokmajigeorge@gmail.com>2017-03-18 11:53:23 +0100
committerFulgen301 <tokmajigeorge@gmail.com>2017-07-11 11:10:31 +0200
commit5cd0eccd91dbfcb344029606e83dfd0ce355dac2 (patch)
treeaa26ad6ee663d095dfa111a806a9041c381148e5 /pycrctrl.py
parent6f4e06c596f717ad91f69c9b7a7b18395e14d5b0 (diff)
downloadpycrctrl-5cd0eccd91dbfcb344029606e83dfd0ce355dac2.tar.gz
pycrctrl-5cd0eccd91dbfcb344029606e83dfd0ce355dac2.zip
Add basic plugin functionality; move pycrctrl into a folder
Diffstat (limited to 'pycrctrl.py')
-rw-r--r--pycrctrl.py711
1 files changed, 0 insertions, 711 deletions
diff --git a/pycrctrl.py b/pycrctrl.py
deleted file mode 100644
index 02e2595..0000000
--- a/pycrctrl.py
+++ /dev/null
@@ -1,711 +0,0 @@
-# -*- coding:utf-8 -*-
-
-#This program is free software: you can redistribute it and/or modify
-#it under the terms of the GNU General Public License as published by
-#the Free Software Foundation, either version 3 of the License, or
-#(at your option) any later version.
-
-#This program is distributed in the hope that it will be useful,
-#but WITHOUT ANY WARRANTY; without even the implied warranty of
-#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-#GNU General Public License for more details.
-
-#You should have received a copy of the GNU General Public License
-#along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import sys
-
-#import socket
-import subprocess
-import urllib.request
-import urllib.error
-import urllib.parse
-
-from _thread import start_new_thread
-#import inspect
-import queue
-import os
-import re
-#import signal
-import json
-import base64
-import supybot.log
-import supybot.ircmsgs as ircmsgs
-
-from pickle import Pickler, Unpickler
-from PyQt5.QtCore import *
-from time import sleep
-from platform import architecture
-from io import BytesIO
-#iostream import
-
-try:
- from iostream import *
-except ModuleNotFoundError:
- with open("iostream.py","wb") as fobj:
- fobj.write(urllib.request.urlopen("https://raw.githubusercontent.com/Fulgen301/pythonprograms/master/iostream/iostream.py").read())
-
- from iostream import *
-
-import random
-import tarfile
-from gzip import GzipFile
-from time import sleep
-from enum import IntEnum
-
-#
-# Helpers
-#
-
-class QString(str):
- pass
-
-class QList(QObject):
- """ QStringList """
-
- list = list()
- typeclass = None
-
- def __init__(self, *args):
- QObject.__init__(self)
- self.list = list(args)
-
- def __getitem__(self, name):
- if type(name) != int:
- raise ValueError("Wrong datatype for name!")
- else:
- try:
- return self.list[name]
- except IndexError as e:
- raise IndexError(e.args[0]) from None
-
- def __lshift__(self, other):
- if self.typeclass and not isinstance(other, self.typeclass):
- raise ValueError("Wrong datatype")
- else:
- self.list.append(other)
- return self
-
- def __rshift__(self, other):
- if isinstance(other, list) or isinstance(other, QList):
- item = self.list[-1]
- try:
- other.append(item)
- self.list.pop()
- except Exception:
- raise TypeError("Cannot pass item to {}!".format(other)) from None
-
-
- def __call__(self):
- return self.list
-
- def __len__(self):
- return len(self.list)
-
- def __repr__(self):
- return "{}({})".format(self.__class__.__name__, self.list)
-
- # Methods
-
- def isEmpty(self):
- return len(self.list) == 0
-
- def append(self, item):
- return self.__lshift__(item)
-
-class QStringList(QList):
- typeclass = str
-
-class CmdResult(IntEnum):
- UnknownCommand = -1
- Success = 0
- SyntaxFail = 1
- RightsFail = 2
- RuntimeError = 3
-
-class Capability(IntEnum):
- Unknown = -1
- User = 0
- Admin = 1
- Moderator = 2
-
-class Updater(object):
- parent = None
- __current_revision = ""
- lookuptable = {"64bit" : "amd64", "32bit" : "i386"}
-
- def __init__(self, parent):
- self.parent = parent
- with open(os.path.join(self.parent.path, "snapshot.id"), "rb") as fobj:
- self.__current_revision = fobj.read().decode("utf-8")
- start_new_thread(self.checkForUpdates, ())
-
- @property
- def current_revision(self):
- return self.__current_revision
-
- @current_revision.setter
- def current_revision(self, other):
- self.__current_revision = other
- if type(other) in [QString, str]:
- try:
- other = other.encode("utf-8")
- except Exception:
- raise TypeError("Wrong datatype!") from None
-
- with open(os.path.join(self.parent.path, "snapshot.id"), "wb") as fobj:
- fobj.write(other)
-
- def checkForUpdates(self):
- while True:
- try:
- site = urllib.request.urlopen("http://openclonk.org/nightly-builds").read().decode("utf-8").split("<a href='/builds/nightly/snapshots/")
- site.remove(site[0])
- site = [i.split("' title")[0] for i in site]
-
- x = None
- for i in site:
- x = re.match(r"openclonk-snapshot-(.*)-(.*)-{}-{}-.*".format(sys.platform, self.lookuptable[architecture()[0]]), i)
- if x:
- rev = x.group(2)
-
- if self.current_revision != rev:
- self.current_revision = rev
- self.loadNewSnapshot(x)
-
- break
- if not x:
- cout << "Regex didn't match" << endl
-
- except Exception as e:
- supybot.log.exception(str(e))
- finally:
- sleep(10)
-
- def loadNewSnapshot(self, reg):
- supybot.log.info("Downloading snapshot...")
- with open(os.path.join(self.parent.path, "snapshot"), "wb") as fobj:
- fobj.write(urllib.request.urlopen("http://openclonk.org/builds/nightly/snapshots/{}".format(reg.group(0).split("' title")[0])).read())
-
- #extract the snapshot
- tar = tarfile.open(os.path.join(self.parent.path, "snapshot"), mode="r:bz2")
- tar.extractall(path=self.parent.path)
- supybot.log.info("New snapshot has been extracted.")
-
- #get the openclonk-server autobuild
- site = json.loads(urllib.request.urlopen("https://autobuild.openclonk.org/api/v1/jobs").read().decode("utf-8"))
-
- for commit in site:
- for build in commit["builds"]:
- if re.match(r"{}-{}-.*".format(sys.platform, self.lookuptable[architecture()[0]]), build["platform"]["triplet"]):
- for b in build["components"]:
- reg = re.match(r".*/openclonk-server-(.*)-(.*)-(.*)-.*", str(b["path"])) #skip the engine check as the only useful one is openclonk-server
- #if reg:
- # cout << str(reg) << " "
- # cout << reg.group(1) << " " << reg.group(2) << " " << reg.group(3) << " " << " :: "
- # cout << self.current_revision << sys.platform << self.lookuptable[architecture()[0]] << endl
- # cout << endl
- if reg and (reg.group(1), reg.group(2), reg.group(3)) == (self.current_revision[:-3], sys.platform, self.lookuptable[architecture()[0]]):
- supybot.log.info("Downloading openclonk-server build...")
- buffer = BytesIO()
- buffer.write(urllib.request.urlopen("https://autobuild.openclonk.org/static/binaries/{}".format(b["path"])).read())
- buffer.seek(0)
- with open(os.path.join(self.parent.path, "openclonk-server"), "wb") as fobj:
- fobj.write(GzipFile(fileobj=buffer).read())
-
- supybot.log.info("New openclonk-server build has been extracted.")
- os.chmod(os.path.join(self.parent.path, "openclonk-server"), os.stat(os.path.join(self.parent.path, "openclonk-server")).st_mode | 64)
- return True
-
-
-
-class PyCRCtrl(object):
- """Server control"""
-
- clonk = None
- thread_started = False
- stopped = False
- config = {}
-
- scenario = ""
- commands = {}
- codec = None
-
- path = QString()
- scenlist = QStringList()
- league_scenlist = QStringList()
-
- topic = "Kein laufendes Spiel."
- league = True
-
- __state = "Lobby"
- __ingamechat = "aktiviert"
-
- capabilities = {}
- _capa_old = {}
-
- updater = None
-
- def __init__(self, irc=None, path=None, config="pycrctrl.conf"):
- if sys.platform == "win32":
- raise NotImplementedError("{} wird nicht unterstützt!".format(sys.platform))
-
- self.irc = irc
- self.path = path
- self.loadConfigFile(config)
- self.loadCapabilities()
- self.loadScenarioList()
- self.codec = QTextCodec.codecForName(self.config.get("encoding"))
-
- self.queue = queue.Queue(5)
- if self.config.get("engine") == "openclonk-server":
- self.updater = Updater(self)
-
- def __ostream__(self, ostream):
- return "PyCRCtrl: commandline: {}, channel: {}, scenario: {}".format(self.commandline, self.channels["ingame"], (self.scenario if self.scenario != "" else "None"))
-
- @property
- def state(self):
- return self.__state
-
- @state.setter
- def state(self, text):
- if text in ["Lobby", "Lädt", "Läuft"]:
- self.__state = text
- topic = "Aktuelles Szenario: {} | {}{} | Ingamechat ist {}.".format(self.scenario, self.state, (" | Liga" if self.scenario in self.league_scenlist else ""), self.ingamechat)
- self.setTopic(topic)
-
- @property
- def ingamechat(self):
- return self.__ingamechat
-
- @ingamechat.setter
- def ingamechat(self, text):
- if text in ["aktiviert", "deaktiviert"]:
- self.__ingamechat = text
- self.state = self.state
-
- def loadScenarioList(self) -> bool:
- if self.path == None:
- return False
-
- with open(os.path.join(self.path,"scenlist"), "rb") as fobj:
- self.scenlist.list = Unpickler(fobj).load()
-
- with open(os.path.join(self.path, "scenlist.league"), "rb") as fobj:
- self.league_scenlist.list = Unpickler(fobj).load()
-
- return True
-
- def loadCapabilities(self) -> bool:
- if self.path == None:
- return False
-
- self._capa_old = json.load(open(os.path.join(self.path, "capabilities.conf"), "r"))
- return True
-
-
- def loadConfigFile(self, config) -> bool:
- if self.path == None:
- return False
-
- config = os.path.join(self.path, config)
-
- if os.path.isdir(config):
- return False
-
- elif os.path.isfile(config):
- self.config = json.load(open(os.path.join(self.path, config),"r"))
- return True
-
- elif not os.path.exists(config):
- c = {
- "General" : {
- "Directories" : {
- "plugins" : []
- },
- "useLogfile" : True,
- },
-
- "Clonk" : {
- "engine" : "clonk",
- "commandline" : ["fullscreen", ["lobby",300], ["config","config.txt"], "record", "faircrew"],
- "commandlinePrefix" : "/",
- "prefix" : "@",
-
- "Channels" : {
- "parent" : "",
- "ingame" : ""
- },
-
- "RegExps" : {
- "lobbyStartExp" : "",
- "startExp" : "",
- "joinExp" : "",
- "leaveExp" : "",
- "shutdownExp" : ""
- },
-
- "autohost" : False,
- "IRC" : {
- "ingamechat" : True
- },
-
- "Rights" : {
- "admin" : True,
- "moderator" : True
- }
- },
-
- "Updater" : {
- "enabled" : False,
- "Addresses" : {
- "snapshotList" : "",
- "snapshotDownload" : "",
- "autobuildList" : "",
- "autobuildAddress" : ""
- }
- }
- }
- }
-
- json.dump(c, open(config, "w"))
- return c
-
-
- def decodeRegExp(self, regexp):
- return self.codec.toUnicode(base64.b64decode(regexp))
-
- def setConfigEntry(self, entry="", value=None):
- if type(self.config) != dict:
- return
-
- try:
- self.config[entry] = value
- finally:
- json.dump(self.config, open(os.path.join(self.path, "pycrctrl.conf"), "w"),indent=4)
-
- return True
-
- def host(self, scenario=None, user=None) -> str:
- if scenario == None:
- return (CmdResult.SyntaxFail, "Bitte gib einen Szenarionamen an!")
- if type(scenario) in [bytes, QByteArray]:
- try:
- scenario = self.codec.toUnicode(scenario)
- except:
- return (CmdResult.SyntaxFail, "Unbekannter Datentyp!")
-
- scenario = scenario.splitlines()[0]
- if scenario == "random":
- scenario = random.choice(self.scenlist)
-
- elif scenario not in self.scenlist():
- return (CmdResult.SyntaxFail,'Szenario "{}" wurde nicht gefunden!'.format(scenario))
-
- if self.thread_started == False:
- self.scenario = scenario
- self.thread_started = True
- start_new_thread(self.startClonk,())
- return (CmdResult.Success, 'Szenario "{}" wird jetzt gehostet.'.format(scenario))
-
- if self.queue.full() == False:
- self.queue.put(scenario)
- return (CmdResult.Success, 'Szenario "{}" wurde der Warteschlange hinzugefügt.'.format(scenario))
- else:
- return (CmdResult.RuntimeError, "Die Warteschlange ist voll!")
-
- def startClonk(self):
- try:
- while True:
- if self.scenario == "":
- if self.queue.empty() == False:
- self.scenario = self.queue.get()
- else:
- self.scenario = random.choice(self.scenlist)
-
- self.capabilities = self._capa_old
- self.clonk = subprocess.Popen(
- './{} {} "{}"'.format(self.config.get("engine"), self.config.get("commandline") + (" --" if self.config.get("engine") == "openclonk-server" else " /") + ("league" if self.scenario in self.league_scenlist() else "noleague"),self.scenario),
- 0,
- None,
- subprocess.PIPE,
- subprocess.PIPE,
- subprocess.STDOUT,
- shell=True,
- cwd=self.path
- )
- self.state = "Lobby"
- self.readServerOutput()
- if self.config.get("autohost") == False:
- self.thread_started = False
- self.setTopic("Kein laufendes Spiel.")
- break
-
- finally:
- if self.clonk:
- self.clonk.stdin.close()
-
- return (CmdResult.Success, "")
-
- def readServerOutput(self):
- while True:
- try:
- output = self.clonk.stdout.readline()
-
- if bool(re.match(self.decodeRegExp(self.config.get("RegExps")["shutdownExp"]), self.codec.toUnicode(output))):
- self.clonk.stdin.close()
- elif output == b"" and self.clonk.poll() is not None:
- if self.clonk:
- self.clonk.stdin.close()
- self.clonk = None
- self.scenario = ""
- return
-
- elif output:
- output = self.codec.toUnicode(output).splitlines()[0]
- #output = output.decode("utf-8").splitlines()[0]
- output = output[(output.find("] ") if output.find("] ") != -1 else -2)+len("] "):]
-
- if output[0] == ">":
- output = output[1:]
-
- part = re.match("<(.*)> ({})(.*)".format(self.config.get("prefix")), output)
- if part and part.group(0) != self.irc.nick:
- cmd = part.group(3).split(" ", 1)
- found = False
- x = None
- if len(cmd) > 0:
- for key in self.commands.keys():
- if key == cmd[0].splitlines()[0]:
- found = True
- try:
- x = self.commands[key](cmd[1].split(" "), user=part.group(1))
- except IndexError:
- x = self.commands[key](user=part.group(1))
- break
-
- if not found:
- self.writeToServer('Unbekannter Befehl: "' + part.group(2) + '"!')
- if x:
- if type(x) == tuple and x[1] != "":
- self.writeToServer(x[1])
- del x
- if bool(re.match(self.decodeRegExp(self.config.get("RegExps")["lobbyStartExp"]), output)):
- self.state = "Lädt"
-
- elif bool(re.match(self.decodeRegExp(self.config.get("RegExps")["startExp"]), output)):
- self.state = "Läuft"
- try:
- cout << output << endl
- except (UnicodeDecodeError, UnicodeEncodeError):
- pass
-
- if self.irc and self.ingamechat == "aktiviert":
- if output.find("<" + self.irc.nick + ">") == -1:
- if re.match(r"^<.*>", output) and output.find("[IRC]") == -1 and output.find(self.config.get("prefix")) == -1:
- self.irc.reply("[Clonk]{}".format(output), to=self.config.get("channels")["ingame"])
-
- elif re.match(self.decodeRegExp(self.config.get("RegExps")["joinExp"]), output) or re.match(self.decodeRegExp(self.config.get("RegExps")["leaveExp"]), output):
- self.irc.reply(output, to=self.config.get("channels")["ingame"])
-
-
- except KeyboardInterrupt:
- if self.clonk:
- self.clonk.stdin.close()
-
- except Exception as e:
- cerr << str(e) << endl
- continue
-
- return (CmdResult.Success, "")
-
- def doPrivmsg(self, msg):
- if not (self.irc and self.ingamechat == "aktiviert"):
- return
- for channel in msg.args[0].split(","):
- if channel == self.config.get("channels")["ingame"] and msg.nick != self.irc.nick:
- self.writeToServer("[IRC]<{}> {}".format(msg.nick, msg.args[1]))
-
-
- def writeToServer(self, text=None):
- if text == None and self.clonk == None:
- return (CmdResult.RuntimeError, "")
- elif type(text) not in [bytes, QByteArray]:
- try:
- text = self.codec.toUnicode(text)
- except:
- raise IOError("Cannot write anything else to the server except the following data types: QString, bytes, str, QByteArray")
-
- if self.clonk and self.clonk.stdin:
- self.clonk.stdin.write(bytes(text) + b"\n")
- self.clonk.stdin.flush()
- return (CmdResult.Success, "")
-
- def setTopic(self, text=None):
- if not self.irc:
- return False
-
- if type(text) not in [str, QString]:
- try:
- text = self.codec.toUnicode(text)
- except:
- raise TypeError("text must be a string!")
-
- if self.topic == text:
- return False
-
- else:
- self.topic = text
- return bool(self.irc.sendMsg(ircmsgs.topic(self.config.get("channels")["ingame"], text)))
-
- def admin(self, prm=None, user=""):
- """Setzt den Rundenadmin."""
- try:
- if not prm:
- raise IndexError
- prm = " ".join(prm)
-
- if (self.capabilities[prm], self.capabilities[user]) > (Capability.Admin, Capability.Admin):
- return (CmdResult.RuntimeError, "Du hast bereits mehr Rechte als ein Rundenadmin!")
-
-
- except KeyError:
- pass
-
- except IndexError:
- return (CmdResult.SyntaxFail, "Nicht genügend Argumente!")
-
- for u, c in self.capabilities.items():
- if c == Capability.Admin:
- return (CmdResult.RuntimeError, "{} ist bereits Rundenadmin!".format(u))
-
-
- self.capabilities[prm] = self.capabilities[user] = Capability.Admin
- return (CmdResult.Success, "{} (Spielername: {}) wurde als Rundenadmin eingetragen!".format(user, prm))
-
-
- def start(self, time=None, user=""):
- """Startet das Spiel."""
- if not self.checkCapability(Capability.Admin, user): return (CmdResult.RightsFail, "Du hast nicht die nötige Berechtigung.".encode("utf-8"))
- self.stopped = False
- if time:
- try:
- time = int(time[0])
- except:
- time = 5
- self.writeToServer("/start {}".format(time))
- else:
- self.writeToServer(b"/start")
-
- return (CmdResult.Success, "")
-
-
- def stop(self, prm=None, user=""):
- """Stoppt den Countdown."""
- if not self.checkCapability(Capability.Admin, user): return (CmdResult.RightsFail, "Du hast nicht die nötige Berechtigung.".encode("utf-8"))
- def stopping(self):
- while self.clonk and self.stopped:
- self.writeToServer("/start 60000")
- if self.stopped == False:
- return
- sleep(100)
-
- if self.stopped == False:
- self.stopped = True
- start_new_thread(stopping, (self,))
-
- return (CmdResult.Success, "")
-
-
- def help(self, prm=None, user=""):
- """Gibt die Hilfe aus."""
- self.writeToServer("Verfügbare Befehle:".encode("utf-8"))
- for text, function in self.commands.items():
- self.writeToServer("{} -- {}".format(text, function.__doc__))
-
- return (CmdResult.Success, "")
-
-
- def displayQueue(self, prm=None, user=""):
- """Gibt die Warteschlange aus."""
- self.writeToServer(b"Warteschlange:")
-
- for i,scen in enumerate(self.queue.queue):
- self.writeToServer("{}. {}".format(i+1, scen))
-
- return (CmdResult.Success, "")
-
-
- def list(self, prm=None, user=""):
- """Zeigt die Szenarioliste an."""
- self.writeToServer(b"List:\n-------------")
- for scen in self.scenlist:
- self.writeToServer(scen)
-
- return (CmdResult.Success, "")
-
- def ircCommands(self, prm=None, user=""):
- """Enthält Befehle zur Steuerung der IRC-Funktionen."""
- if not self.checkCapability(Capability.Admin, user): return (CmdResult.RightsFail, "Du hast nicht die nötige Berechtigung.".encode("utf-8"))
- if not prm:
- return (CmdResult.SyntaxFail, "")
-
- if prm[0] == "ingamechat":
- if prm[1] == "off":
- self.ingamechat = "deaktiviert"
- elif prm[1] == "on":
- self.ingamechat = "aktiviert"
-
- return (CmdResult.Success, "")
-
- def IsRunning(self) -> bool:
- return self.state != "Lobby"
-
- #
- # functions
- #
-
- def addCommand(self, function, text):
- self.commands[text.split(" ")[0]] = function
- return self
-
- def addScenario(self, link):
- name = ""
- for item in link.split("/"):
- if re.match(r"(.*)\.[oc][c4]s",item):
- name = item
- break
-
- site = urllib.request.urlopen(link).read() #WARNING: Raises an error if the link is invalid!
- with open(os.path.join(self.path, name),"wb") as fobj:
- fobj.write(site)
-
- try:
- self.scenlist().index(name)
- except Exception:
- self.scenlist().append(name)
- Pickler(open(os.path.join(self.path, "scenlist"), "wb")).dump(self.scenlist())
- return self
-
- def checkCapability(self, capability, user=""):
- if not user:
- return False
-
- try:
- return (True if self.capabilities[user] >= capability else False)
-
- except KeyError:
- return False
-
- def registerHook(self, hook, function, replace=False):
- pass
-
-#
-#
-#
-def PyCRCtrlInit():
- """
- Initializes a PyCRCtrl - object. Use this function if you want to control only one object.
- """
- pass