aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFulgen301 <tokmajigeorge@gmailc.om>2017-07-30 21:03:44 +0200
committerFulgen301 <tokmajigeorge@gmailc.om>2017-07-30 21:03:44 +0200
commit6af848ee046c660116ebe34d0053471eced4ac10 (patch)
tree6bb409f782ca319ea9518c012c220743fce6cdab
parente004eb61cd497cf208e926e0892f21c979966c81 (diff)
downloadpycrctrl-6af848ee046c660116ebe34d0053471eced4ac10.tar.gz
pycrctrl-6af848ee046c660116ebe34d0053471eced4ac10.zip
* Move pycrctrl.py out of the folder
* Modify .gitignore * Change configuration format to INI
-rw-r--r--.gitignore3
-rw-r--r--pycrctrl.py593
-rw-r--r--pycrctrl/.gitignore2
-rw-r--r--pycrctrl/pycrctrl.py618
4 files changed, 596 insertions, 620 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..4e3bef2
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,3 @@
+*.cpython-36.pyc
+__pycache__
+.directory
diff --git a/pycrctrl.py b/pycrctrl.py
new file mode 100644
index 0000000..06dcb92
--- /dev/null
+++ b/pycrctrl.py
@@ -0,0 +1,593 @@
+# -*- coding:utf-8 -*-
+
+#Copyright (c) 2017, George Tokmaji
+
+#Permission to use, copy, modify, and/or distribute this software for any
+#purpose with or without fee is hereby granted, provided that the above
+#copyright notice and this permission notice appear in all copies.
+#
+#THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+#WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+#MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+#ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+#WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+#ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+#OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+import sys
+import os
+
+import subprocess
+import queue
+from _thread import start_new_thread
+from time import sleep
+from platform import architecture
+
+import re
+import configparser
+from io import BytesIO
+import logging
+
+import urllib.request
+import supybot.ircmsgs as ircmsgs
+
+import random
+
+import tarfile
+from enum import IntEnum
+
+from gzip import GzipFile
+
+#
+# Helpers
+#
+
+class list(list):
+ """ Extended list """
+
+ def __lshift__(self, other):
+ self.append(other)
+ return self
+
+ def __rshift__(self, other):
+ if isinstance(other, list):
+ item = self.__getitem__(len(self) - 1)
+ try:
+ other.append(item)
+ self.pop()
+ except Exception:
+ raise TypeError("Cannot pass item to {}!".format(other)) from None
+
+ # Methods
+
+ def isEmpty(self):
+ return len(self) == 0
+
+class CmdResult(IntEnum):
+ UnknownCommand = -1
+ Success = 0
+ SyntaxFail = 1
+ RightsFail = 2
+ RuntimeError = 3
+
+class Updater(object):
+ parent = None
+ __current_revision = ""
+ lookuptable = {"64bit" : "amd64", "32bit" : "i386"}
+
+ def __init__(self, parent):
+ self.parent = parent
+ with open(os.path.join(self.parent.path, "snapshot.id"), "rb") as fobj:
+ self.__current_revision = fobj.read().decode("utf-8")
+ start_new_thread(self.checkForUpdates, ())
+
+ @property
+ def current_revision(self):
+ return self.__current_revision
+
+ @current_revision.setter
+ def current_revision(self, other):
+ self.__current_revision = other
+ if type(other) == str:
+ try:
+ other = other.encode("utf-8")
+ except Exception:
+ raise TypeError("Wrong datatype!") from None
+
+ with open(os.path.join(self.parent.path, "snapshot.id"), "wb") as fobj:
+ fobj.write(other)
+
+ def checkForUpdates(self):
+ while True:
+ try:
+ site = urllib.request.urlopen(self.parent.config["Addresses"]["snapshotList"]).read().decode("utf-8").split(self.parent.config["Updater"]["SplitString"])
+ site.remove(site[0])
+ site = [i.split("' title")[0] for i in site]
+
+ x = None
+ for i in site:
+ x = re.match(self.parent.config["RegExps"]["Snapshot"].format(sys.platform, self.lookuptable[architecture()[0]]), i)
+ if x:
+ rev = x.group(2)
+
+ if self.current_revision != rev:
+ self.current_revision = rev
+ self.loadNewSnapshot(x)
+
+ break
+ if not x:
+ self.parent.log.error("Updater: Update regular expression doesn't match!")
+
+ except Exception as e:
+ cerr << str(e) << endl
+ finally:
+ sleep(10)
+
+ def loadNewSnapshot(self, reg):
+ self.parent.log.info("Downloading snapshot with id {}".format(self.current_revision))
+ with open(os.path.join(self.parent.path, "snapshot"), "wb") as fobj:
+ fobj.write(urllib.request.urlopen(self.parent.config["Addresses"]["snapshotDownload"].format(reg.group(0).split("' title")[0])).read())
+
+ #extract the snapshot
+ tar = tarfile.open(os.path.join(self.parent.path, "snapshot"), mode="r:bz2")
+ tar.extractall(path=self.parent.path)
+ self.parent.log.info("New snapshot has been extracted.")
+
+ #get the openclonk-server autobuild
+ site = json.loads(urllib.request.urlopen(self.parent.config["Addresses"]["autobuildList"]).read().decode("utf-8"))
+
+ for commit in site:
+ for build in commit["builds"]:
+ if re.match(r"{}-{}-.*".format(sys.platform, self.lookuptable[architecture()[0]]), build["platform"]["triplet"]):
+ for b in build["components"]:
+ reg = re.match(self.parent.config["RegExps"]["Autobuild"], str(b["path"])) #skip the engine check as the only useful one is openclonk-server
+ if reg and (reg.group(1), reg.group(2), reg.group(3)) == (self.current_revision[:-3], sys.platform, self.lookuptable[architecture()[0]]):
+ self.parent.log.info("Downloading autobuild with id {}".format(self.current_revision))
+ buffer = BytesIO()
+ buffer.write(urllib.request.urlopen(self.parent.config["Addresses"]["autobuildDownload"].format(b["path"])).read())
+ buffer.seek(0)
+ with open(os.path.join(self.parent.path, self.parent.config["Updater"]["BinaryName"]), "wb") as fobj:
+ fobj.write(GzipFile(fileobj=buffer).read())
+
+ self.parent.log.info("New openclonk-server build has been extracted.")
+ os.chmod(os.path.join(self.parent.path, self.parent.config["Updater"]["BinaryName"]), os.stat(os.path.join(self.parent.path, self.parent.config["Updater"]["BinaryName"])).st_mode | 64)
+ return True
+
+
+
+class PyCRCtrl(object):
+ """Server control"""
+
+ clonk = None
+ thread_started = False
+ stopped = False
+ config = configparser.ConfigParser()
+
+ scenario = ""
+ commands = {}
+
+ path = None
+ config_path = None
+ scenlist = []
+ league_scenlist = []
+
+ topic = "Kein laufendes Spiel."
+
+ __state = "Lobby"
+ __ingamechat = "aktiviert"
+
+ updater = None
+ log = None
+ shutdowned = False
+
+ @property
+ def state(self):
+ return self.__state
+
+ @state.setter
+ def state(self, text):
+ if text in ["Lobby", "Lädt", "Läuft"]:
+ self.__state = text
+ self.setTopic("Aktuelles Szenario: {} | {}{} | Ingamechat ist {}.".format(self.scenario, self.state, (" | Liga" if self.scenario in self.league_scenlist else ""), self.ingamechat))
+
+ @property
+ def ingamechat(self):
+ return self.__ingamechat
+
+ @ingamechat.setter
+ def ingamechat(self, text):
+ if text in ["aktiviert", "deaktiviert"]:
+ self.__ingamechat = text
+ self.state = self.state
+
+ def __init__(self, irc, path, config="pycrctrl.ini"):
+ self.irc = irc
+ self.path = path
+ self.loadConfigFile(config)
+ self.setupLog()
+ self.ingamechat = "aktiviert" if self.config["Clonk"].getboolean("Autohost") else "deaktiviert"
+ self.loadScenarioList()
+
+ self.queue = queue.Queue(5)
+ if self.config["Updater"].getboolean("Enabled"):
+ self.updater = Updater(self)
+
+ def loadScenarioList(self) -> None:
+ if self.path == None:
+ raise OSError("No path specified")
+
+ with open(os.path.join(self.path,"scenarios.lst"), "r") as fobj:
+ self.scenlist = fobj.readlines()
+
+ with open(os.path.join(self.path, "scenarios_league.lst"), "r") as fobj:
+ self.league_scenlist = fobj.readlines()
+
+ self.log.debug("Scenario lists loaded.")
+
+ def setupLog(self) -> None:
+ self.log = logging.getLogger(type(self).__name__)
+ self.log.setLevel(getattr(logging, self.config["Logging"]["Level"], logging.INFO))
+
+ ch = logging.FileHandler(
+ os.path.join(self.path, self.config["Logging"]["File"])
+ )
+ ch.setLevel(getattr(logging, self.config["Logging"]["Level"], logging.INFO))
+ ch.setFormatter(logging.Formatter("[%(asctime)s] %(levelname)s: %(message)s"))
+
+ self.log.addHandler(ch)
+ self.log.info("PyCRCtrl started.")
+
+ def loadConfigFile(self, config) -> None:
+ if self.path == None:
+ raise OSError("No path specified")
+
+ parser = configparser.ConfigParser()
+ self.config_path = conf = os.path.join(self.path, config)
+
+ if os.path.isdir(conf):
+ raise OSError("{} is a directory!".format(conf))
+
+ elif os.path.isfile(conf):
+ self.config.read(conf)
+
+ elif not os.path.exists(conf):
+ c = """[General]
+UseLogfile=true
+Prefix=@
+
+[Clonk]
+Engine=clonk
+Encoding=utf-8
+Commandline=/fullscreen /lobby:300 /config:config.txt /record /faircrew
+Autohost=false
+
+[IRC]
+Ingamechat=true
+
+ [Channels]
+ Parent="#clonk-SGGP"
+ Ingame="#clonk-SGGP-ingame"
+
+[Updater]
+Enabled=false
+ [Addresses]
+ snapshotList=http://openclonk.org/nightly-builds
+ snapshotDownload=http://openclonk.org/builds/nightly/snapshots/{}
+ autobuildList=https://autobuild.openclonk.org/api/v1/jobs
+ autobuildAddress=https://autobuild.openclonk.org/static/binaries/{}
+
+[RegExps]
+LobbyStart=((?:Los geht's!|Action go!)\\s*)
+Start=Start!
+PlayerJoin=^Client (.+) (?:verbunden|connected)\.\s*$
+PlayerLeave=^Client (.+) (?:entfernt|removed).*
+Shutdown=Spiel ausgewertet.*
+Autobuild=.*/openclonk-server-(.*)-(.*)-(.*)-.*
+Snapshot=openclonk-snapshot-(.*)-(.*)-{}-{}-."""
+
+ self.config.read_string(c)
+ with open(conf, "w") as fobj:
+ self.config.write(fobj)
+
+ def startClonk(self):
+ try:
+ while True:
+ if self.scenario == "":
+ if self.queue.empty() == False:
+ self.scenario = self.queue.get()
+ else:
+ self.scenario = random.choice(self.scenlist).splitlines()[0]
+
+ self.clonk = subprocess.Popen(
+ './{} {} "{}"'.format(self.config["Clonk"]["Engine"], self.config["Clonk"]["Commandline"] + " " + self.config["Clonk"]["commandlinePrefix"] + ("league" if self.scenario in self.league_scenlist else "noleague"), self.scenario),
+ 0,
+ None,
+ subprocess.PIPE,
+ subprocess.PIPE,
+ subprocess.STDOUT,
+ shell=True,
+ cwd=self.path,
+ encoding=self.config["Clonk"]["Encoding"]
+ )
+ self.state = "Lobby"
+ self.readServerOutput()
+ if self.config["Clonk"].getboolean("Autohost") == False:
+ self.thread_started = False
+ self.setTopic("Kein laufendes Spiel.")
+ break
+
+ finally:
+ if self.clonk:
+ self.clonk.stdin.close()
+
+ return (CmdResult.Success, "")
+
+ def readServerOutput(self):
+ while True:
+ try:
+ output = self.clonk.stdout.readline()
+
+ if re.match(self.config["RegExps"]["Shutdown"], output):
+ self.clonk.stdin.close()
+ elif output == "" and self.clonk.poll() is not None:
+ if self.clonk:
+ self.clonk.stdin.close()
+ self.clonk = None
+ self.scenario = ""
+ return
+
+ elif output:
+ output = output.splitlines()[0]
+ #output = output.decode("utf-8").splitlines()[0]
+ output = output[(output.find("] ") if output.find("] ") != -1 else -2)+len("] "):]
+
+ if output[0] == ">":
+ output = output[1:]
+
+ part = self.isMessage(output)
+ if part and part.group(0) != self.irc.nick:
+ self.log.info(output)
+ cmd = part.group(3).split(" ", 1)
+ found = False
+ x = None
+ if len(cmd) > 0:
+ for key in self.commands.keys():
+ if key == cmd[0].splitlines()[0]:
+ found = True
+ try:
+ x = self.commands[key](cmd[1].split(" "), user=part.group(1))
+ except IndexError:
+ x = self.commands[key](user=part.group(1))
+ break
+
+ if not found:
+ self.writeToServer('Unbekannter Befehl: "' + part.group(2) + '"!')
+ if x:
+ if type(x) == tuple and x[1] != "":
+ self.writeToServer(x[1])
+ del x
+ if re.match(self.config["RegExps"]["LobbyStart"], output):
+ self.state = "Lädt"
+
+ elif re.match(self.config["RegExps"]["Start"], output):
+ self.state = "Läuft"
+
+ if self.irc and self.ingamechat == "aktiviert":
+ if output.find("<" + self.irc.nick + ">") == -1:
+ if self.isMessage(output) and output.find("[IRC]") == -1 and output.find(self.config["General"]["Prefix"]) == -1:
+ self.irc.reply("[Clonk]{}".format(output), to=self.config["Channels"]["Ingame"])
+
+ elif any((re.match(self.config["RegExps"]["PlayerJoin"], output), re.match(self.config["RegExps"]["PlayerLeave"], output))):
+ self.irc.reply(output, to=self.config["Channels"]["Ingame"])
+
+
+ except KeyboardInterrupt:
+ if self.clonk.stdin:
+ self.clonk.stdin.close()
+
+ except Exception as e:
+ self.log.exception(e.args[0])
+ continue
+
+ return (CmdResult.Success, "")
+
+ def doPrivmsg(self, msg):
+ if not (self.irc and self.ingamechat == "aktiviert"):
+ return
+ for channel in msg.args[0].split(","):
+ if channel == self.config["Channels"]["Ingame"] and msg.nick != self.irc.nick:
+ self.writeToServer("[IRC]<{}> {}".format(msg.nick, msg.args[1]))
+
+ #
+ # Commands
+ #
+
+ def host(self, scenario=None, user=None) -> str:
+ if not scenario:
+ return (CmdResult.SyntaxFail, "Bitte gib einen Szenarionamen an!")
+ if hasattr(scenario, "decode"):
+ try:
+ scenario = scenario.decode(self.config["Clonk"]["Encoding"], "replace")
+ except:
+ self.log.warning("Unable to decode {}".format(scenario))
+ return (CmdResult.RuntimeError, "Dekodierfehler. Bitte kontaktiere den Hoster dieses Servers.")
+
+ scenario = scenario.splitlines()[0]
+ if scenario == "random":
+ scenario = random.choice(self.scenlist).splitlines()[0]
+
+ elif scenario not in self.scenlist:
+ return (CmdResult.SyntaxFail,'Szenario "{}" wurde nicht gefunden!'.format(scenario))
+
+ if self.thread_started == False:
+ self.scenario = scenario
+ self.thread_started = True
+ start_new_thread(self.startClonk,())
+ return (CmdResult.Success, 'Szenario "{}" wird jetzt gehostet.'.format(scenario))
+
+ if not self.queue.full():
+ self.queue.put(scenario)
+ return (CmdResult.Success, 'Szenario "{}" wurde der Warteschlange hinzugefügt.'.format(scenario))
+ else:
+ return (CmdResult.RuntimeError, "Die Warteschlange ist voll!")
+
+ def start(self, time=None, user=""):
+ """Startet das Spiel."""
+ self.stopped = False
+ if time:
+ try:
+ time = int(time[0])
+ except:
+ time = 5
+ self.writeToServer("/start {}".format(time))
+ else:
+ self.writeToServer("/start")
+
+ return (CmdResult.Success, "")
+
+
+ def stop(self, prm=None, user=""):
+ """Stoppt den Countdown."""
+ def stopping(self):
+ while self.clonk and self.stopped:
+ self.writeToServer("/start 60000")
+ if self.stopped == False:
+ return
+ sleep(100)
+
+ if self.stopped == False:
+ self.stopped = True
+ start_new_thread(stopping, (self,))
+
+ return (CmdResult.Success, "")
+
+
+ def help(self, prm=None, user=""):
+ """Gibt die Hilfe aus."""
+ self.writeToServer("Verfügbare Befehle:")
+ for text, function in self.commands.items():
+ self.writeToServer("{} -- {}".format(text, function.__doc__))
+
+ return (CmdResult.Success, "")
+
+
+ def displayQueue(self, prm=None, user=""):
+ """Gibt die Warteschlange aus."""
+ self.writeToServer("Warteschlange:")
+
+ for i,scen in enumerate(self.queue.queue):
+ self.writeToServer("{}. {}".format(i+1, scen))
+
+ return (CmdResult.Success, "")
+
+
+ def list(self, prm=None, user=""):
+ """Zeigt die Szenarioliste an."""
+ self.writeToServer("Szenarienlist:\n-------------")
+ for scen in self.scenlist:
+ self.writeToServer(scen + ("(Liga)" if scen in self.league_scenlist else ""))
+
+ return (CmdResult.Success, "")
+
+ def ircCommands(self, prm=None, user=""):
+ """Enthält Befehle zur Steuerung der IRC-Funktionen."""
+ if not prm:
+ return (CmdResult.SyntaxFail, "")
+
+ if prm[0] == "ingamechat":
+ if prm[1] == "off":
+ self.ingamechat = "deaktiviert"
+ elif prm[1] == "on":
+ self.ingamechat = "aktiviert"
+
+ return (CmdResult.Success, "")
+
+ #
+ # Methods
+ #
+
+ def isMessage(self, msg):
+ return re.match(self.config["RegExps"]["Message"].format(prefix=self.config["General"]["Prefix"]), msg)
+
+ def getMessageNick(self, msg):
+ m = self.isMessage(msg)
+ if m:
+ return m.group(1)
+
+ def addCommand(self, function, text):
+ self.commands[text.split(" ")[0]] = function
+ return self
+
+ def addScenario(self, link):
+ name = ""
+ for item in link.split("/"):
+ if re.match(r"(.*)\.[oc][c4]s",item):
+ name = item
+ break
+
+ site = urllib.request.urlopen(link).read() #WARNING: Raises an error if the link is invalid!
+ with open(os.path.join(self.path, name),"wb") as fobj:
+ fobj.write(site)
+
+ try:
+ self.scenlist.index(name)
+ except Exception:
+ self.scenlist.append(name)
+ return self
+
+ def writeToServer(self, text=None) -> tuple:
+ if text == None and self.clonk == None:
+ return (CmdResult.RuntimeError, "")
+ elif type(text) != str:
+ try:
+ text = text.decode(self.config["Clonk"]["Encoding"])
+ except:
+ raise IOError("Cannot write anything else to the server except the following data types: bytes, str (got {})".format(type(text).__name__))
+
+ if self.clonk and self.clonk.stdin:
+ self.clonk.stdin.write(text + "\n")
+ self.clonk.stdin.flush()
+ return (CmdResult.Success, "")
+
+ def setTopic(self, text=None) -> None:
+ if not self.irc:
+ return
+
+ if type(text) != str:
+ try:
+ text = text.decode(self.config["Clonk"]["Encoding"])
+ except:
+ raise TypeError("bytes or str expected, got {}".format(type(text).__name__))
+
+ if self.topic != text:
+ self.topic = text
+ channel = self.config["Channels"]["Ingame"]
+ if not channel.startswith("#"):
+ raise ValueError("Invalid channel: {} (ident: {})".format(channel, channel == "#clonk-SGGP-ingame"))
+ self.irc.sendMsg(ircmsgs.topic(self.config["Channels"]["Ingame"], text))
+
+ def shutdown(self):
+ if self.shutdowned:
+ return
+
+ self.log.info("Shutting down...")
+
+ del self.updater
+
+ if self.clonk and self.clonk.stdin and not self.clonk.stdin.closed:
+ self.clonk.stdin.close()
+
+ with open(self.config_path, "w") as fobj:
+ self.config.write(fobj)
+ self.log.debug("Config file saved.")
+
+ with open(os.path.join(self.path, "scenarios.lst"), "w") as fobj:
+ fobj.writelines(self.scenlist)
+
+ with open(os.path.join(self.path, "scenarios_league.lst"), "w") as fobj:
+ fobj.writelines(self.league_scenlist)
+
+ self.log.debug("Scenario lists saved.")
+ self.setTopic("Kein laufendes Spiel.")
+ logging.shutdown()
+ self.shutdowned = True
diff --git a/pycrctrl/.gitignore b/pycrctrl/.gitignore
deleted file mode 100644
index 86592bd..0000000
--- a/pycrctrl/.gitignore
+++ /dev/null
@@ -1,2 +0,0 @@
-*.cpython-36.pyc
-__pycache__ \ No newline at end of file
diff --git a/pycrctrl/pycrctrl.py b/pycrctrl/pycrctrl.py
deleted file mode 100644
index 9d6866e..0000000
--- a/pycrctrl/pycrctrl.py
+++ /dev/null
@@ -1,618 +0,0 @@
-# -*- coding:utf-8 -*-
-
-#Copyright (c) 2017, George Tokmaji
-
-#Permission to use, copy, modify, and/or distribute this software for any
-#purpose with or without fee is hereby granted, provided that the above
-#copyright notice and this permission notice appear in all copies.
-#
-#THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
-#WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
-#MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
-#ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
-#WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
-#ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
-#OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-
-import sys
-
-import subprocess
-import urllib.request
-
-from _thread import start_new_thread
-import queue
-import os
-import re
-import json
-import base64
-import supybot.ircmsgs as ircmsgs
-from pickle import Pickler, Unpickler
-from time import sleep
-from platform import architecture
-from io import BytesIO
-#iostream import
-
-try:
- from iostream import *
-except ModuleNotFoundError:
- with open("iostream.py","wb") as fobj:
- fobj.write(urllib.request.urlopen("https://raw.githubusercontent.com/Fulgen301/pythonprograms/master/iostream/iostream.py").read())
-
- from iostream import *
-
-import random
-import tarfile
-from gzip import GzipFile
-from time import sleep
-from enum import IntEnum
-
-#
-# Helpers
-#
-
-class list(list):
- """ Extended list """
-
- def __lshift__(self, other):
- self.append(other)
- return self
-
- def __rshift__(self, other):
- if isinstance(other, list):
- item = self.__getitem__(len(self) - 1)
- try:
- other.append(item)
- self.pop()
- except Exception:
- raise TypeError("Cannot pass item to {}!".format(other)) from None
-
- # Methods
-
- def isEmpty(self):
- return len(self) == 0
-
-class CmdResult(IntEnum):
- UnknownCommand = -1
- Success = 0
- SyntaxFail = 1
- RightsFail = 2
- RuntimeError = 3
-
-class Updater(object):
- parent = None
- __current_revision = ""
- lookuptable = {"64bit" : "amd64", "32bit" : "i386"}
-
- def __init__(self, parent):
- self.parent = parent
- with open(os.path.join(self.parent.path, "snapshot.id"), "rb") as fobj:
- self.__current_revision = fobj.read().decode("utf-8")
- start_new_thread(self.checkForUpdates, ())
-
- @property
- def current_revision(self):
- return self.__current_revision
-
- @current_revision.setter
- def current_revision(self, other):
- self.__current_revision = other
- if type(other) == str:
- try:
- other = other.encode("utf-8")
- except Exception:
- raise TypeError("Wrong datatype!") from None
-
- with open(os.path.join(self.parent.path, "snapshot.id"), "wb") as fobj:
- fobj.write(other)
-
- def checkForUpdates(self):
- while True:
- try:
- site = urllib.request.urlopen("http://openclonk.org/nightly-builds").read().decode("utf-8").split("<a href='/builds/nightly/snapshots/")
- site.remove(site[0])
- site = [i.split("' title")[0] for i in site]
-
- x = None
- for i in site:
- x = re.match(r"openclonk-snapshot-(.*)-(.*)-{}-{}-.*".format(sys.platform, self.lookuptable[architecture()[0]]), i)
- if x:
- rev = x.group(2)
-
- if self.current_revision != rev:
- self.current_revision = rev
- self.loadNewSnapshot(x)
-
- break
- if not x:
- cout << "Regex didn't match" << endl
-
- except Exception as e:
- cerr << str(e) << endl
- finally:
- sleep(10)
-
- def loadNewSnapshot(self, reg):
- cout << "Downloading snapshot..." << endl
- with open(os.path.join(self.parent.path, "snapshot"), "wb") as fobj:
- fobj.write(urllib.request.urlopen("http://openclonk.org/builds/nightly/snapshots/{}".format(reg.group(0).split("' title")[0])).read())
-
- #extract the snapshot
- tar = tarfile.open(os.path.join(self.parent.path, "snapshot"), mode="r:bz2")
- tar.extractall(path=self.parent.path)
- cout << "New snapshot has been extracted." << endl
-
- #get the openclonk-server autobuild
- site = json.loads(urllib.request.urlopen("https://autobuild.openclonk.org/api/v1/jobs").read().decode("utf-8"))
-
- for commit in site:
- for build in commit["builds"]:
- if re.match(r"{}-{}-.*".format(sys.platform, self.lookuptable[architecture()[0]]), build["platform"]["triplet"]):
- for b in build["components"]:
- reg = re.match(r".*/openclonk-server-(.*)-(.*)-(.*)-.*", str(b["path"])) #skip the engine check as the only useful one is openclonk-server
- #if reg:
- # cout << str(reg) << " "
- # cout << reg.group(1) << " " << reg.group(2) << " " << reg.group(3) << " " << " :: "
- # cout << self.current_revision << sys.platform << self.lookuptable[architecture()[0]] << endl
- # cout << endl
- if reg and (reg.group(1), reg.group(2), reg.group(3)) == (self.current_revision[:-3], sys.platform, self.lookuptable[architecture()[0]]):
- cout << "Downloading openclonk-server build..." << endl
- buffer = BytesIO()
- buffer.write(urllib.request.urlopen("https://autobuild.openclonk.org/static/binaries/{}".format(b["path"])).read())
- buffer.seek(0)
- with open(os.path.join(self.parent.path, "openclonk-server"), "wb") as fobj:
- fobj.write(GzipFile(fileobj=buffer).read())
-
- cout << "New openclonk-server build has been extracted." << endl
- os.chmod(os.path.join(self.parent.path, "openclonk-server"), os.stat(os.path.join(self.parent.path, "openclonk-server")).st_mode | 64)
- return True
-
-
-
-class PyCRCtrl(object):
- """Server control"""
-
- clonk = None
- thread_started = False
- stopped = False
- config = {}
-
- scenario = ""
- commands = {}
-
- path = str()
- scenlist = list()
- league_scenlist = list()
-
- topic = "Kein laufendes Spiel."
- league = True
-
- __state = "Lobby"
- __ingamechat = "aktiviert"
-
- updater = None
-
- def __init__(self, irc=None, path=None, config="pycrctrl.conf"):
- if sys.platform == "win32":
- raise NotImplementedError("{} wird nicht unterstützt!".format(sys.platform))
-
- self.irc = irc
- self.path = path
- self.loadConfigFile(config)
- self.loadScenarioList()
-
- self.queue = queue.Queue(5)
- if self.config["Clonk"]["Updater"]["enabled"]:
- self.updater = Updater(self)
-
- def __ostream__(self, ostream):
- return "PyCRCtrl: commandline: {}, channel: {}, scenario: {}".format(self.commandline, self.config["Clonk"]["IRC"]["Channels"]["ingame"], (self.scenario if self.scenario != "" else "None"))
-
- @property
- def state(self):
- return self.__state
-
- @state.setter
- def state(self, text):
- if text in ["Lobby", "Lädt", "Läuft"]:
- self.__state = text
- topic = "Aktuelles Szenario: {} | {}{} | Ingamechat ist {}.".format(self.scenario, self.state, (" | Liga" if self.scenario in self.league_scenlist else ""), self.ingamechat)
- self.setTopic(topic)
-
- @property
- def ingamechat(self):
- return self.__ingamechat
-
- @ingamechat.setter
- def ingamechat(self, text):
- if text in ["aktiviert", "deaktiviert"]:
- self.__ingamechat = text
- self.state = self.state
-
- def loadScenarioList(self) -> bool:
- if self.path == None:
- return False
-
- with open(os.path.join(self.path,"scenlist"), "rb") as fobj:
- self.scenlist = Unpickler(fobj).load()
-
- with open(os.path.join(self.path, "scenlist.league"), "rb") as fobj:
- self.league_scenlist = Unpickler(fobj).load()
-
- return True
-
-
- def loadConfigFile(self, config) -> bool:
- if self.path == None:
- return False
-
- conf = os.path.join(self.path, config)
-
- if os.path.isdir(conf):
- return False
-
- elif os.path.isfile(conf):
- self.config = self.setupConfig(json.load(open(conf, "r")))
- return True
-
- elif not os.path.exists(conf):
- c = {
- "General" : {
- "useLogfile" : True
- },
-
- "Clonk" : {
- "engine" : "clonk",
- "commandline" : ["fullscreen", ["lobby",300], ["config","config.txt"], "record", "faircrew"],
- "commandlinePrefix" : "/",
- "prefix" : "@",
-
- "RegExps" : {
- "lobbyStartExp" : "",
- "startExp" : "",
- "joinExp" : "",
- "leaveExp" : "",
- "shutdownExp" : ""
- },
-
- "autohost" : False,
- "IRC" : {
- "ingamechat" : True,
- "Channels" : {
- "parent" : "",
- "ingame" : ""
- }
- },
-
- "Rights" : {
- "admin" : True,
- "moderator" : True
- },
-
- "Updater" : {
- "enabled" : False,
- "RegExps" : {
- "autobuild" : "",
- "snapshot" : ""
- },
- "Addresses" : {
- "snapshotList" : "",
- "snapshotDownload" : "",
- "autobuildList" : "",
- "autobuildAddress" : ""
- }
- }
- }
- }
-
- json.dump(c, open(conf, "w"), indent=4)
-
- self.config = self.setupConfig(c)
- return True
-
- def setupConfig(self, config):
- #commandline
- sep = ":" if config["Clonk"]["commandlinePrefix"] == "/" else "="
-
- res = ""
- for entry in config["Clonk"]["commandline"]:
- if isinstance(entry, dict):
- for i in entry.items():
- res += "{}{}{}{} ".format(config["Clonk"]["commandlinePrefix"], (i[0]), sep, str(i[1]))
- else:
- res += "{}{} ".format(config["Clonk"]["commandlinePrefix"], entry)
-
- config["Clonk"]["commandline"] = res
-
- #regexps
-
- config["Clonk"]["RegExps"] = {
- i:re.compile(self.decodeRegExp(config["Clonk"]["RegExps"][i], config["Clonk"]["encoding"])) for i in config["Clonk"]["RegExps"]
- }
-
- self.ingamechat = config["Clonk"]["IRC"]["ingamechat"]
- return config
-
- def decodeRegExp(self, regexp, encoding=None):
- return base64.b64decode(regexp).decode(encoding or self.config["Clonk"]["encoding"])
-
- def host(self, scenario=None, user=None) -> str:
- if scenario == None:
- return (CmdResult.SyntaxFail, "Bitte gib einen Szenarionamen an!")
- if hasattr(scenario, "decode"):
- try:
- scenario = scenario.decode(self.config["Clonk"]["encoding"], "replace")
- except:
- return (CmdResult.RuntimeError, "Dekodierfehler. Bitte kontaktiere den Hoster dieses Servers.")
-
- scenario = scenario.splitlines()[0]
- if scenario == "random":
- scenario = random.choice(self.scenlist)
-
- elif scenario not in self.scenlist:
- return (CmdResult.SyntaxFail,'Szenario "{}" wurde nicht gefunden!'.format(scenario))
-
- if self.thread_started == False:
- self.scenario = scenario
- self.thread_started = True
- start_new_thread(self.startClonk,())
- return (CmdResult.Success, 'Szenario "{}" wird jetzt gehostet.'.format(scenario))
-
- if self.queue.full() == False:
- self.queue.put(scenario)
- return (CmdResult.Success, 'Szenario "{}" wurde der Warteschlange hinzugefügt.'.format(scenario))
- else:
- return (CmdResult.RuntimeError, "Die Warteschlange ist voll!")
-
- def startClonk(self):
- try:
- while True:
- if self.scenario == "":
- if self.queue.empty() == False:
- self.scenario = self.queue.get()
- else:
- self.scenario = random.choice(self.scenlist)
-
- self.clonk = subprocess.Popen(
- './{} {} "{}"'.format(self.config["Clonk"]["engine"], self.config["Clonk"]["commandline"] + " " + self.config["Clonk"]["commandlinePrefix"] + ("league" if self.scenario in self.league_scenlist else "noleague"), self.scenario),
- 0,
- None,
- subprocess.PIPE,
- subprocess.PIPE,
- subprocess.STDOUT,
- shell=True,
- cwd=self.path,
- encoding=self.config["Clonk"]["encoding"]
- )
- self.state = "Lobby"
- self.readServerOutput()
- if self.config["Clonk"]["autohost"] == False:
- self.thread_started = False
- self.setTopic("Kein laufendes Spiel.")
- break
-
- finally:
- if self.clonk:
- self.clonk.stdin.close()
-
- return (CmdResult.Success, "")
-
- def readServerOutput(self):
- while True:
- try:
- output = self.clonk.stdout.readline()
-
- if self.config["Clonk"]["RegExps"]["shutdownExp"].match(output):
- self.clonk.stdin.close()
- elif output == "" and self.clonk.poll() is not None:
- if self.clonk:
- self.clonk.stdin.close()
- self.clonk = None
- self.scenario = ""
- return
-
- elif output:
- output = output.splitlines()[0]
- #output = output.decode("utf-8").splitlines()[0]
- output = output[(output.find("] ") if output.find("] ") != -1 else -2)+len("] "):]
-
- if output[0] == ">":
- output = output[1:]
-
- part = re.match("<(.*)> ({})(.*)".format(self.config["Clonk"]["prefix"]), output)
- if part and part.group(0) != self.irc.nick:
- cmd = part.group(3).split(" ", 1)
- found = False
- x = None
- if len(cmd) > 0:
- for key in self.commands.keys():
- if key == cmd[0].splitlines()[0]:
- found = True
- try:
- x = self.commands[key](cmd[1].split(" "), user=part.group(1))
- except IndexError:
- x = self.commands[key](user=part.group(1))
- break
-
- if not found:
- self.writeToServer('Unbekannter Befehl: "' + part.group(2) + '"!')
- if x:
- if type(x) == tuple and x[1] != "":
- self.writeToServer(x[1])
- del x
- if self.config["Clonk"]["RegExps"]["lobbyStartExp"].match(output):
- self.state = "Lädt"
-
- elif self.config["Clonk"]["RegExps"]["startExp"].match(output):
- self.state = "Läuft"
- try:
- cout << output << endl
- except (UnicodeDecodeError, UnicodeEncodeError):
- pass
-
- if self.irc and self.ingamechat == "aktiviert":
- if output.find("<" + self.irc.nick + ">") == -1:
- if re.match(r"^<.*>", output) and output.find("[IRC]") == -1 and output.find(self.config["Clonk"]["prefix"]) == -1:
- self.irc.reply("[Clonk]{}".format(output), to=self.config["Clonk"]["IRC"]["Channels"]["ingame"])
-
- elif self.config["Clonk"]["RegExps"]["joinExp"].match(output) or self.config["Clonk"]["RegExps"]["leaveExp"].match(output):
- self.irc.reply(output, to=self.config["Clonk"]["IRC"]["Channels"]["ingame"])
-
-
- except KeyboardInterrupt:
- if self.clonk:
- self.clonk.stdin.close()
-
- except Exception as e:
- cerr << str(e) << endl
- continue
-
- return (CmdResult.Success, "")
-
- def doPrivmsg(self, msg):
- if not (self.irc and self.ingamechat == "aktiviert"):
- return
- for channel in msg.args[0].split(","):
- if channel == self.config["Clonk"]["IRC"]["Channels"]["ingame"] and msg.nick != self.irc.nick:
- self.writeToServer("[IRC]<{}> {}".format(msg.nick, msg.args[1]))
-
-
- def writeToServer(self, text=None):
- if text == None and self.clonk == None:
- return (CmdResult.RuntimeError, "")
- elif type(text) != str:
- try:
- text = text.decode(self.config["Clonk"]["encoding"])
- except:
- raise IOError("Cannot write anything else to the server except the following data types: bytes, str (got {})".format(type(text).__name__))
-
- if self.clonk and self.clonk.stdin:
- self.clonk.stdin.write(text + "\n")
- self.clonk.stdin.flush()
- return (CmdResult.Success, "")
-
- def setTopic(self, text=None):
- if not self.irc:
- return False
-
- if type(text) != str:
- try:
- text = text.decode(self.config["Clonk"]["encoding"])
- except:
- raise TypeError("text must be a string!")
-
- if self.topic == text:
- return False
-
- else:
- self.topic = text
- return bool(self.irc.sendMsg(ircmsgs.topic(self.config["Clonk"]["IRC"]["Channels"]["ingame"], text)))
-
-
- def start(self, time=None, user=""):
- """Startet das Spiel."""
- self.stopped = False
- if time:
- try:
- time = int(time[0])
- except:
- time = 5
- self.writeToServer("/start {}".format(time))
- else:
- self.writeToServer("/start")
-
- return (CmdResult.Success, "")
-
-
- def stop(self, prm=None, user=""):
- """Stoppt den Countdown."""
- def stopping(self):
- while self.clonk and self.stopped:
- self.writeToServer("/start 60000")
- if self.stopped == False:
- return
- sleep(100)
-
- if self.stopped == False:
- self.stopped = True
- start_new_thread(stopping, (self,))
-
- return (CmdResult.Success, "")
-
-
- def help(self, prm=None, user=""):
- """Gibt die Hilfe aus."""
- self.writeToServer("Verfügbare Befehle:")
- for text, function in self.commands.items():
- self.writeToServer("{} -- {}".format(text, function.__doc__))
-
- return (CmdResult.Success, "")
-
-
- def displayQueue(self, prm=None, user=""):
- """Gibt die Warteschlange aus."""
- self.writeToServer("Warteschlange:")
-
- for i,scen in enumerate(self.queue.queue):
- self.writeToServer("{}. {}".format(i+1, scen))
-
- return (CmdResult.Success, "")
-
-
- def list(self, prm=None, user=""):
- """Zeigt die Szenarioliste an."""
- self.writeToServer("List:\n-------------")
- for scen in self.scenlist:
- self.writeToServer(scen)
-
- return (CmdResult.Success, "")
-
- def ircCommands(self, prm=None, user=""):
- """Enthält Befehle zur Steuerung der IRC-Funktionen."""
- if not prm:
- return (CmdResult.SyntaxFail, "")
-
- if prm[0] == "ingamechat":
- if prm[1] == "off":
- self.ingamechat = "deaktiviert"
- elif prm[1] == "on":
- self.ingamechat = "aktiviert"
-
- return (CmdResult.Success, "")
-
- def IsRunning(self) -> bool:
- return self.state != "Lobby"
-
- #
- # functions
- #
-
- def addCommand(self, function, text):
- self.commands[text.split(" ")[0]] = function
- return self
-
- def addScenario(self, link):
- name = ""
- for item in link.split("/"):
- if re.match(r"(.*)\.[oc][c4]s",item):
- name = item
- break
-
- site = urllib.request.urlopen(link).read() #WARNING: Raises an error if the link is invalid!
- with open(os.path.join(self.path, name),"wb") as fobj:
- fobj.write(site)
-
- try:
- self.scenlist.index(name)
- except Exception:
- self.scenlist.append(name)
- Pickler(open(os.path.join(self.path, "scenlist"), "wb")).dump(self.scenlist)
- return self
-
-#
-#
-#
-def PyCRCtrlInit():
- """
- Initializes a PyCRCtrl - object. Use this function if you want to control only one object.
- """
- pass