aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFulgen301 <tokmajigeorge@gmail.com>2017-07-11 13:25:22 +0200
committerFulgen301 <tokmajigeorge@gmail.com>2017-07-11 13:25:22 +0200
commite004eb61cd497cf208e926e0892f21c979966c81 (patch)
tree62979e25f46bf8a95c5aed3dd6474f14e0d52c18
parent27cf3b77c1c78bfb7d56a86e740d28f22aef527b (diff)
downloadpycrctrl-e004eb61cd497cf208e926e0892f21c979966c81.tar.gz
pycrctrl-e004eb61cd497cf208e926e0892f21c979966c81.zip
Replace spaces with tabs
-rw-r--r--pycrctrl/pycrctrl.py1118
1 files changed, 559 insertions, 559 deletions
diff --git a/pycrctrl/pycrctrl.py b/pycrctrl/pycrctrl.py
index 584341f..9d6866e 100644
--- a/pycrctrl/pycrctrl.py
+++ b/pycrctrl/pycrctrl.py
@@ -33,12 +33,12 @@ from io import BytesIO
#iostream import
try:
- from iostream import *
+ from iostream import *
except ModuleNotFoundError:
- with open("iostream.py","wb") as fobj:
- fobj.write(urllib.request.urlopen("https://raw.githubusercontent.com/Fulgen301/pythonprograms/master/iostream/iostream.py").read())
-
- from iostream import *
+ with open("iostream.py","wb") as fobj:
+ fobj.write(urllib.request.urlopen("https://raw.githubusercontent.com/Fulgen301/pythonprograms/master/iostream/iostream.py").read())
+
+ from iostream import *
import random
import tarfile
@@ -51,568 +51,568 @@ from enum import IntEnum
#
class list(list):
- """ Extended list """
-
- def __lshift__(self, other):
- self.append(other)
- return self
+ """ Extended list """
+
+ def __lshift__(self, other):
+ self.append(other)
+ return self
- def __rshift__(self, other):
- if isinstance(other, list):
- item = self.__getitem__(len(self) - 1)
- try:
- other.append(item)
- self.pop()
- except Exception:
- raise TypeError("Cannot pass item to {}!".format(other)) from None
-
- # Methods
-
- def isEmpty(self):
- return len(self) == 0
+ def __rshift__(self, other):
+ if isinstance(other, list):
+ item = self.__getitem__(len(self) - 1)
+ try:
+ other.append(item)
+ self.pop()
+ except Exception:
+ raise TypeError("Cannot pass item to {}!".format(other)) from None
+
+ # Methods
+
+ def isEmpty(self):
+ return len(self) == 0
class CmdResult(IntEnum):
- UnknownCommand = -1
- Success = 0
- SyntaxFail = 1
- RightsFail = 2
- RuntimeError = 3
+ UnknownCommand = -1
+ Success = 0
+ SyntaxFail = 1
+ RightsFail = 2
+ RuntimeError = 3
class Updater(object):
- parent = None
- __current_revision = ""
- lookuptable = {"64bit" : "amd64", "32bit" : "i386"}
-
- def __init__(self, parent):
- self.parent = parent
- with open(os.path.join(self.parent.path, "snapshot.id"), "rb") as fobj:
- self.__current_revision = fobj.read().decode("utf-8")
- start_new_thread(self.checkForUpdates, ())
-
- @property
- def current_revision(self):
- return self.__current_revision
-
- @current_revision.setter
- def current_revision(self, other):
- self.__current_revision = other
- if type(other) == str:
- try:
- other = other.encode("utf-8")
- except Exception:
- raise TypeError("Wrong datatype!") from None
-
- with open(os.path.join(self.parent.path, "snapshot.id"), "wb") as fobj:
- fobj.write(other)
-
- def checkForUpdates(self):
- while True:
- try:
- site = urllib.request.urlopen("http://openclonk.org/nightly-builds").read().decode("utf-8").split("<a href='/builds/nightly/snapshots/")
- site.remove(site[0])
- site = [i.split("' title")[0] for i in site]
-
- x = None
- for i in site:
- x = re.match(r"openclonk-snapshot-(.*)-(.*)-{}-{}-.*".format(sys.platform, self.lookuptable[architecture()[0]]), i)
- if x:
- rev = x.group(2)
-
- if self.current_revision != rev:
- self.current_revision = rev
- self.loadNewSnapshot(x)
-
- break
- if not x:
- cout << "Regex didn't match" << endl
-
- except Exception as e:
- cerr << str(e) << endl
- finally:
- sleep(10)
-
- def loadNewSnapshot(self, reg):
- cout << "Downloading snapshot..." << endl
- with open(os.path.join(self.parent.path, "snapshot"), "wb") as fobj:
- fobj.write(urllib.request.urlopen("http://openclonk.org/builds/nightly/snapshots/{}".format(reg.group(0).split("' title")[0])).read())
-
- #extract the snapshot
- tar = tarfile.open(os.path.join(self.parent.path, "snapshot"), mode="r:bz2")
- tar.extractall(path=self.parent.path)
- cout << "New snapshot has been extracted." << endl
-
- #get the openclonk-server autobuild
- site = json.loads(urllib.request.urlopen("https://autobuild.openclonk.org/api/v1/jobs").read().decode("utf-8"))
-
- for commit in site:
- for build in commit["builds"]:
- if re.match(r"{}-{}-.*".format(sys.platform, self.lookuptable[architecture()[0]]), build["platform"]["triplet"]):
- for b in build["components"]:
- reg = re.match(r".*/openclonk-server-(.*)-(.*)-(.*)-.*", str(b["path"])) #skip the engine check as the only useful one is openclonk-server
- #if reg:
- # cout << str(reg) << " "
- # cout << reg.group(1) << " " << reg.group(2) << " " << reg.group(3) << " " << " :: "
- # cout << self.current_revision << sys.platform << self.lookuptable[architecture()[0]] << endl
- # cout << endl
- if reg and (reg.group(1), reg.group(2), reg.group(3)) == (self.current_revision[:-3], sys.platform, self.lookuptable[architecture()[0]]):
- cout << "Downloading openclonk-server build..." << endl
- buffer = BytesIO()
- buffer.write(urllib.request.urlopen("https://autobuild.openclonk.org/static/binaries/{}".format(b["path"])).read())
- buffer.seek(0)
- with open(os.path.join(self.parent.path, "openclonk-server"), "wb") as fobj:
- fobj.write(GzipFile(fileobj=buffer).read())
-
- cout << "New openclonk-server build has been extracted." << endl
- os.chmod(os.path.join(self.parent.path, "openclonk-server"), os.stat(os.path.join(self.parent.path, "openclonk-server")).st_mode | 64)
- return True
-
-
+ parent = None
+ __current_revision = ""
+ lookuptable = {"64bit" : "amd64", "32bit" : "i386"}
+
+ def __init__(self, parent):
+ self.parent = parent
+ with open(os.path.join(self.parent.path, "snapshot.id"), "rb") as fobj:
+ self.__current_revision = fobj.read().decode("utf-8")
+ start_new_thread(self.checkForUpdates, ())
+
+ @property
+ def current_revision(self):
+ return self.__current_revision
+
+ @current_revision.setter
+ def current_revision(self, other):
+ self.__current_revision = other
+ if type(other) == str:
+ try:
+ other = other.encode("utf-8")
+ except Exception:
+ raise TypeError("Wrong datatype!") from None
+
+ with open(os.path.join(self.parent.path, "snapshot.id"), "wb") as fobj:
+ fobj.write(other)
+
+ def checkForUpdates(self):
+ while True:
+ try:
+ site = urllib.request.urlopen("http://openclonk.org/nightly-builds").read().decode("utf-8").split("<a href='/builds/nightly/snapshots/")
+ site.remove(site[0])
+ site = [i.split("' title")[0] for i in site]
+
+ x = None
+ for i in site:
+ x = re.match(r"openclonk-snapshot-(.*)-(.*)-{}-{}-.*".format(sys.platform, self.lookuptable[architecture()[0]]), i)
+ if x:
+ rev = x.group(2)
+
+ if self.current_revision != rev:
+ self.current_revision = rev
+ self.loadNewSnapshot(x)
+
+ break
+ if not x:
+ cout << "Regex didn't match" << endl
+
+ except Exception as e:
+ cerr << str(e) << endl
+ finally:
+ sleep(10)
+
+ def loadNewSnapshot(self, reg):
+ cout << "Downloading snapshot..." << endl
+ with open(os.path.join(self.parent.path, "snapshot"), "wb") as fobj:
+ fobj.write(urllib.request.urlopen("http://openclonk.org/builds/nightly/snapshots/{}".format(reg.group(0).split("' title")[0])).read())
+
+ #extract the snapshot
+ tar = tarfile.open(os.path.join(self.parent.path, "snapshot"), mode="r:bz2")
+ tar.extractall(path=self.parent.path)
+ cout << "New snapshot has been extracted." << endl
+
+ #get the openclonk-server autobuild
+ site = json.loads(urllib.request.urlopen("https://autobuild.openclonk.org/api/v1/jobs").read().decode("utf-8"))
+
+ for commit in site:
+ for build in commit["builds"]:
+ if re.match(r"{}-{}-.*".format(sys.platform, self.lookuptable[architecture()[0]]), build["platform"]["triplet"]):
+ for b in build["components"]:
+ reg = re.match(r".*/openclonk-server-(.*)-(.*)-(.*)-.*", str(b["path"])) #skip the engine check as the only useful one is openclonk-server
+ #if reg:
+ # cout << str(reg) << " "
+ # cout << reg.group(1) << " " << reg.group(2) << " " << reg.group(3) << " " << " :: "
+ # cout << self.current_revision << sys.platform << self.lookuptable[architecture()[0]] << endl
+ # cout << endl
+ if reg and (reg.group(1), reg.group(2), reg.group(3)) == (self.current_revision[:-3], sys.platform, self.lookuptable[architecture()[0]]):
+ cout << "Downloading openclonk-server build..." << endl
+ buffer = BytesIO()
+ buffer.write(urllib.request.urlopen("https://autobuild.openclonk.org/static/binaries/{}".format(b["path"])).read())
+ buffer.seek(0)
+ with open(os.path.join(self.parent.path, "openclonk-server"), "wb") as fobj:
+ fobj.write(GzipFile(fileobj=buffer).read())
+
+ cout << "New openclonk-server build has been extracted." << endl
+ os.chmod(os.path.join(self.parent.path, "openclonk-server"), os.stat(os.path.join(self.parent.path, "openclonk-server")).st_mode | 64)
+ return True
+
+
class PyCRCtrl(object):
- """Server control"""
-
- clonk = None
- thread_started = False
- stopped = False
- config = {}
-
- scenario = ""
- commands = {}
-
- path = str()
- scenlist = list()
- league_scenlist = list()
-
- topic = "Kein laufendes Spiel."
- league = True
-
- __state = "Lobby"
- __ingamechat = "aktiviert"
-
- updater = None
-
- def __init__(self, irc=None, path=None, config="pycrctrl.conf"):
- if sys.platform == "win32":
- raise NotImplementedError("{} wird nicht unterstützt!".format(sys.platform))
-
- self.irc = irc
- self.path = path
- self.loadConfigFile(config)
- self.loadScenarioList()
-
- self.queue = queue.Queue(5)
- if self.config["Clonk"]["Updater"]["enabled"]:
- self.updater = Updater(self)
-
- def __ostream__(self, ostream):
- return "PyCRCtrl: commandline: {}, channel: {}, scenario: {}".format(self.commandline, self.config["Clonk"]["IRC"]["Channels"]["ingame"], (self.scenario if self.scenario != "" else "None"))
-
- @property
- def state(self):
- return self.__state
-
- @state.setter
- def state(self, text):
- if text in ["Lobby", "Lädt", "Läuft"]:
- self.__state = text
- topic = "Aktuelles Szenario: {} | {}{} | Ingamechat ist {}.".format(self.scenario, self.state, (" | Liga" if self.scenario in self.league_scenlist else ""), self.ingamechat)
- self.setTopic(topic)
-
- @property
- def ingamechat(self):
- return self.__ingamechat
-
- @ingamechat.setter
- def ingamechat(self, text):
- if text in ["aktiviert", "deaktiviert"]:
- self.__ingamechat = text
- self.state = self.state
-
- def loadScenarioList(self) -> bool:
- if self.path == None:
- return False
-
- with open(os.path.join(self.path,"scenlist"), "rb") as fobj:
- self.scenlist = Unpickler(fobj).load()
-
- with open(os.path.join(self.path, "scenlist.league"), "rb") as fobj:
- self.league_scenlist = Unpickler(fobj).load()
-
- return True
-
-
- def loadConfigFile(self, config) -> bool:
- if self.path == None:
- return False
-
- conf = os.path.join(self.path, config)
-
- if os.path.isdir(conf):
- return False
-
- elif os.path.isfile(conf):
- self.config = self.setupConfig(json.load(open(conf, "r")))
- return True
-
- elif not os.path.exists(conf):
- c = {
- "General" : {
- "useLogfile" : True
- },
-
- "Clonk" : {
- "engine" : "clonk",
- "commandline" : ["fullscreen", ["lobby",300], ["config","config.txt"], "record", "faircrew"],
- "commandlinePrefix" : "/",
- "prefix" : "@",
-
- "RegExps" : {
- "lobbyStartExp" : "",
- "startExp" : "",
- "joinExp" : "",
- "leaveExp" : "",
- "shutdownExp" : ""
- },
-
- "autohost" : False,
- "IRC" : {
- "ingamechat" : True,
- "Channels" : {
- "parent" : "",
- "ingame" : ""
- }
- },
-
- "Rights" : {
- "admin" : True,
- "moderator" : True
- },
-
- "Updater" : {
- "enabled" : False,
- "RegExps" : {
- "autobuild" : "",
- "snapshot" : ""
- },
- "Addresses" : {
- "snapshotList" : "",
- "snapshotDownload" : "",
- "autobuildList" : "",
- "autobuildAddress" : ""
- }
- }
- }
- }
-
- json.dump(c, open(conf, "w"), indent=4)
-
- self.config = self.setupConfig(c)
- return True
-
- def setupConfig(self, config):
- #commandline
- sep = ":" if config["Clonk"]["commandlinePrefix"] == "/" else "="
-
- res = ""
- for entry in config["Clonk"]["commandline"]:
- if isinstance(entry, dict):
- for i in entry.items():
- res += "{}{}{}{} ".format(config["Clonk"]["commandlinePrefix"], (i[0]), sep, str(i[1]))
- else:
- res += "{}{} ".format(config["Clonk"]["commandlinePrefix"], entry)
-
- config["Clonk"]["commandline"] = res
-
- #regexps
-
- config["Clonk"]["RegExps"] = {
- i:re.compile(self.decodeRegExp(config["Clonk"]["RegExps"][i], config["Clonk"]["encoding"])) for i in config["Clonk"]["RegExps"]
- }
-
- self.ingamechat = config["Clonk"]["IRC"]["ingamechat"]
- return config
-
- def decodeRegExp(self, regexp, encoding=None):
- return base64.b64decode(regexp).decode(encoding or self.config["Clonk"]["encoding"])
-
- def host(self, scenario=None, user=None) -> str:
- if scenario == None:
- return (CmdResult.SyntaxFail, "Bitte gib einen Szenarionamen an!")
- if hasattr(scenario, "decode"):
- try:
- scenario = scenario.decode(self.config["Clonk"]["encoding"], "replace")
- except:
- return (CmdResult.RuntimeError, "Dekodierfehler. Bitte kontaktiere den Hoster dieses Servers.")
-
- scenario = scenario.splitlines()[0]
- if scenario == "random":
- scenario = random.choice(self.scenlist)
-
- elif scenario not in self.scenlist:
- return (CmdResult.SyntaxFail,'Szenario "{}" wurde nicht gefunden!'.format(scenario))
-
- if self.thread_started == False:
- self.scenario = scenario
- self.thread_started = True
- start_new_thread(self.startClonk,())
- return (CmdResult.Success, 'Szenario "{}" wird jetzt gehostet.'.format(scenario))
-
- if self.queue.full() == False:
- self.queue.put(scenario)
- return (CmdResult.Success, 'Szenario "{}" wurde der Warteschlange hinzugefügt.'.format(scenario))
- else:
- return (CmdResult.RuntimeError, "Die Warteschlange ist voll!")
-
- def startClonk(self):
- try:
- while True:
- if self.scenario == "":
- if self.queue.empty() == False:
- self.scenario = self.queue.get()
- else:
- self.scenario = random.choice(self.scenlist)
-
- self.clonk = subprocess.Popen(
- './{} {} "{}"'.format(self.config["Clonk"]["engine"], self.config["Clonk"]["commandline"] + " " + self.config["Clonk"]["commandlinePrefix"] + ("league" if self.scenario in self.league_scenlist else "noleague"), self.scenario),
- 0,
- None,
- subprocess.PIPE,
- subprocess.PIPE,
- subprocess.STDOUT,
- shell=True,
- cwd=self.path,
- encoding=self.config["Clonk"]["encoding"]
- )
- self.state = "Lobby"
- self.readServerOutput()
- if self.config["Clonk"]["autohost"] == False:
- self.thread_started = False
- self.setTopic("Kein laufendes Spiel.")
- break
-
- finally:
- if self.clonk:
- self.clonk.stdin.close()
-
- return (CmdResult.Success, "")
-
- def readServerOutput(self):
- while True:
- try:
- output = self.clonk.stdout.readline()
-
- if self.config["Clonk"]["RegExps"]["shutdownExp"].match(output):
- self.clonk.stdin.close()
- elif output == "" and self.clonk.poll() is not None:
- if self.clonk:
- self.clonk.stdin.close()
- self.clonk = None
- self.scenario = ""
- return
-
- elif output:
- output = output.splitlines()[0]
- #output = output.decode("utf-8").splitlines()[0]
- output = output[(output.find("] ") if output.find("] ") != -1 else -2)+len("] "):]
-
- if output[0] == ">":
- output = output[1:]
-
- part = re.match("<(.*)> ({})(.*)".format(self.config["Clonk"]["prefix"]), output)
- if part and part.group(0) != self.irc.nick:
- cmd = part.group(3).split(" ", 1)
- found = False
- x = None
- if len(cmd) > 0:
- for key in self.commands.keys():
- if key == cmd[0].splitlines()[0]:
- found = True
- try:
- x = self.commands[key](cmd[1].split(" "), user=part.group(1))
- except IndexError:
- x = self.commands[key](user=part.group(1))
- break
-
- if not found:
- self.writeToServer('Unbekannter Befehl: "' + part.group(2) + '"!')
- if x:
- if type(x) == tuple and x[1] != "":
- self.writeToServer(x[1])
- del x
- if self.config["Clonk"]["RegExps"]["lobbyStartExp"].match(output):
- self.state = "Lädt"
-
- elif self.config["Clonk"]["RegExps"]["startExp"].match(output):
- self.state = "Läuft"
- try:
- cout << output << endl
- except (UnicodeDecodeError, UnicodeEncodeError):
- pass
-
- if self.irc and self.ingamechat == "aktiviert":
- if output.find("<" + self.irc.nick + ">") == -1:
- if re.match(r"^<.*>", output) and output.find("[IRC]") == -1 and output.find(self.config["Clonk"]["prefix"]) == -1:
- self.irc.reply("[Clonk]{}".format(output), to=self.config["Clonk"]["IRC"]["Channels"]["ingame"])
-
- elif self.config["Clonk"]["RegExps"]["joinExp"].match(output) or self.config["Clonk"]["RegExps"]["leaveExp"].match(output):
- self.irc.reply(output, to=self.config["Clonk"]["IRC"]["Channels"]["ingame"])
-
-
- except KeyboardInterrupt:
- if self.clonk:
- self.clonk.stdin.close()
-
- except Exception as e:
- cerr << str(e) << endl
- continue
-
- return (CmdResult.Success, "")
-
- def doPrivmsg(self, msg):
- if not (self.irc and self.ingamechat == "aktiviert"):
- return
- for channel in msg.args[0].split(","):
- if channel == self.config["Clonk"]["IRC"]["Channels"]["ingame"] and msg.nick != self.irc.nick:
- self.writeToServer("[IRC]<{}> {}".format(msg.nick, msg.args[1]))
-
-
- def writeToServer(self, text=None):
- if text == None and self.clonk == None:
- return (CmdResult.RuntimeError, "")
- elif type(text) != str:
- try:
- text = text.decode(self.config["Clonk"]["encoding"])
- except:
- raise IOError("Cannot write anything else to the server except the following data types: bytes, str (got {})".format(type(text).__name__))
-
- if self.clonk and self.clonk.stdin:
- self.clonk.stdin.write(text + "\n")
- self.clonk.stdin.flush()
- return (CmdResult.Success, "")
-
- def setTopic(self, text=None):
- if not self.irc:
- return False
-
- if type(text) != str:
- try:
- text = text.decode(self.config["Clonk"]["encoding"])
- except:
- raise TypeError("text must be a string!")
-
- if self.topic == text:
- return False
-
- else:
- self.topic = text
- return bool(self.irc.sendMsg(ircmsgs.topic(self.config["Clonk"]["IRC"]["Channels"]["ingame"], text)))
-
-
- def start(self, time=None, user=""):
- """Startet das Spiel."""
- self.stopped = False
- if time:
- try:
- time = int(time[0])
- except:
- time = 5
- self.writeToServer("/start {}".format(time))
- else:
- self.writeToServer("/start")
-
- return (CmdResult.Success, "")
-
-
- def stop(self, prm=None, user=""):
- """Stoppt den Countdown."""
- def stopping(self):
- while self.clonk and self.stopped:
- self.writeToServer("/start 60000")
- if self.stopped == False:
- return
- sleep(100)
-
- if self.stopped == False:
- self.stopped = True
- start_new_thread(stopping, (self,))
-
- return (CmdResult.Success, "")
-
-
- def help(self, prm=None, user=""):
- """Gibt die Hilfe aus."""
- self.writeToServer("Verfügbare Befehle:")
- for text, function in self.commands.items():
- self.writeToServer("{} -- {}".format(text, function.__doc__))
-
- return (CmdResult.Success, "")
-
-
- def displayQueue(self, prm=None, user=""):
- """Gibt die Warteschlange aus."""
- self.writeToServer("Warteschlange:")
-
- for i,scen in enumerate(self.queue.queue):
- self.writeToServer("{}. {}".format(i+1, scen))
-
- return (CmdResult.Success, "")
-
-
- def list(self, prm=None, user=""):
- """Zeigt die Szenarioliste an."""
- self.writeToServer("List:\n-------------")
- for scen in self.scenlist:
- self.writeToServer(scen)
-
- return (CmdResult.Success, "")
-
- def ircCommands(self, prm=None, user=""):
- """Enthält Befehle zur Steuerung der IRC-Funktionen."""
- if not prm:
- return (CmdResult.SyntaxFail, "")
-
- if prm[0] == "ingamechat":
- if prm[1] == "off":
- self.ingamechat = "deaktiviert"
- elif prm[1] == "on":
- self.ingamechat = "aktiviert"
-
- return (CmdResult.Success, "")
-
- def IsRunning(self) -> bool:
- return self.state != "Lobby"
-
- #
- # functions
- #
-
- def addCommand(self, function, text):
- self.commands[text.split(" ")[0]] = function
- return self
-
- def addScenario(self, link):
- name = ""
- for item in link.split("/"):
- if re.match(r"(.*)\.[oc][c4]s",item):
- name = item
- break
-
- site = urllib.request.urlopen(link).read() #WARNING: Raises an error if the link is invalid!
- with open(os.path.join(self.path, name),"wb") as fobj:
- fobj.write(site)
-
- try:
- self.scenlist.index(name)
- except Exception:
- self.scenlist.append(name)
- Pickler(open(os.path.join(self.path, "scenlist"), "wb")).dump(self.scenlist)
- return self
-
+ """Server control"""
+
+ clonk = None
+ thread_started = False
+ stopped = False
+ config = {}
+
+ scenario = ""
+ commands = {}
+
+ path = str()
+ scenlist = list()
+ league_scenlist = list()
+
+ topic = "Kein laufendes Spiel."
+ league = True
+
+ __state = "Lobby"
+ __ingamechat = "aktiviert"
+
+ updater = None
+
+ def __init__(self, irc=None, path=None, config="pycrctrl.conf"):
+ if sys.platform == "win32":
+ raise NotImplementedError("{} wird nicht unterstützt!".format(sys.platform))
+
+ self.irc = irc
+ self.path = path
+ self.loadConfigFile(config)
+ self.loadScenarioList()
+
+ self.queue = queue.Queue(5)
+ if self.config["Clonk"]["Updater"]["enabled"]:
+ self.updater = Updater(self)
+
+ def __ostream__(self, ostream):
+ return "PyCRCtrl: commandline: {}, channel: {}, scenario: {}".format(self.commandline, self.config["Clonk"]["IRC"]["Channels"]["ingame"], (self.scenario if self.scenario != "" else "None"))
+
+ @property
+ def state(self):
+ return self.__state
+
+ @state.setter
+ def state(self, text):
+ if text in ["Lobby", "Lädt", "Läuft"]:
+ self.__state = text
+ topic = "Aktuelles Szenario: {} | {}{} | Ingamechat ist {}.".format(self.scenario, self.state, (" | Liga" if self.scenario in self.league_scenlist else ""), self.ingamechat)
+ self.setTopic(topic)
+
+ @property
+ def ingamechat(self):
+ return self.__ingamechat
+
+ @ingamechat.setter
+ def ingamechat(self, text):
+ if text in ["aktiviert", "deaktiviert"]:
+ self.__ingamechat = text
+ self.state = self.state
+
+ def loadScenarioList(self) -> bool:
+ if self.path == None:
+ return False
+
+ with open(os.path.join(self.path,"scenlist"), "rb") as fobj:
+ self.scenlist = Unpickler(fobj).load()
+
+ with open(os.path.join(self.path, "scenlist.league"), "rb") as fobj:
+ self.league_scenlist = Unpickler(fobj).load()
+
+ return True
+
+
+ def loadConfigFile(self, config) -> bool:
+ if self.path == None:
+ return False
+
+ conf = os.path.join(self.path, config)
+
+ if os.path.isdir(conf):
+ return False
+
+ elif os.path.isfile(conf):
+ self.config = self.setupConfig(json.load(open(conf, "r")))
+ return True
+
+ elif not os.path.exists(conf):
+ c = {
+ "General" : {
+ "useLogfile" : True
+ },
+
+ "Clonk" : {
+ "engine" : "clonk",
+ "commandline" : ["fullscreen", ["lobby",300], ["config","config.txt"], "record", "faircrew"],
+ "commandlinePrefix" : "/",
+ "prefix" : "@",
+
+ "RegExps" : {
+ "lobbyStartExp" : "",
+ "startExp" : "",
+ "joinExp" : "",
+ "leaveExp" : "",
+ "shutdownExp" : ""
+ },
+
+ "autohost" : False,
+ "IRC" : {
+ "ingamechat" : True,
+ "Channels" : {
+ "parent" : "",
+ "ingame" : ""
+ }
+ },
+
+ "Rights" : {
+ "admin" : True,
+ "moderator" : True
+ },
+
+ "Updater" : {
+ "enabled" : False,
+ "RegExps" : {
+ "autobuild" : "",
+ "snapshot" : ""
+ },
+ "Addresses" : {
+ "snapshotList" : "",
+ "snapshotDownload" : "",
+ "autobuildList" : "",
+ "autobuildAddress" : ""
+ }
+ }
+ }
+ }
+
+ json.dump(c, open(conf, "w"), indent=4)
+
+ self.config = self.setupConfig(c)
+ return True
+
+ def setupConfig(self, config):
+ #commandline
+ sep = ":" if config["Clonk"]["commandlinePrefix"] == "/" else "="
+
+ res = ""
+ for entry in config["Clonk"]["commandline"]:
+ if isinstance(entry, dict):
+ for i in entry.items():
+ res += "{}{}{}{} ".format(config["Clonk"]["commandlinePrefix"], (i[0]), sep, str(i[1]))
+ else:
+ res += "{}{} ".format(config["Clonk"]["commandlinePrefix"], entry)
+
+ config["Clonk"]["commandline"] = res
+
+ #regexps
+
+ config["Clonk"]["RegExps"] = {
+ i:re.compile(self.decodeRegExp(config["Clonk"]["RegExps"][i], config["Clonk"]["encoding"])) for i in config["Clonk"]["RegExps"]
+ }
+
+ self.ingamechat = config["Clonk"]["IRC"]["ingamechat"]
+ return config
+
+ def decodeRegExp(self, regexp, encoding=None):
+ return base64.b64decode(regexp).decode(encoding or self.config["Clonk"]["encoding"])
+
+ def host(self, scenario=None, user=None) -> str:
+ if scenario == None:
+ return (CmdResult.SyntaxFail, "Bitte gib einen Szenarionamen an!")
+ if hasattr(scenario, "decode"):
+ try:
+ scenario = scenario.decode(self.config["Clonk"]["encoding"], "replace")
+ except:
+ return (CmdResult.RuntimeError, "Dekodierfehler. Bitte kontaktiere den Hoster dieses Servers.")
+
+ scenario = scenario.splitlines()[0]
+ if scenario == "random":
+ scenario = random.choice(self.scenlist)
+
+ elif scenario not in self.scenlist:
+ return (CmdResult.SyntaxFail,'Szenario "{}" wurde nicht gefunden!'.format(scenario))
+
+ if self.thread_started == False:
+ self.scenario = scenario
+ self.thread_started = True
+ start_new_thread(self.startClonk,())
+ return (CmdResult.Success, 'Szenario "{}" wird jetzt gehostet.'.format(scenario))
+
+ if self.queue.full() == False:
+ self.queue.put(scenario)
+ return (CmdResult.Success, 'Szenario "{}" wurde der Warteschlange hinzugefügt.'.format(scenario))
+ else:
+ return (CmdResult.RuntimeError, "Die Warteschlange ist voll!")
+
+ def startClonk(self):
+ try:
+ while True:
+ if self.scenario == "":
+ if self.queue.empty() == False:
+ self.scenario = self.queue.get()
+ else:
+ self.scenario = random.choice(self.scenlist)
+
+ self.clonk = subprocess.Popen(
+ './{} {} "{}"'.format(self.config["Clonk"]["engine"], self.config["Clonk"]["commandline"] + " " + self.config["Clonk"]["commandlinePrefix"] + ("league" if self.scenario in self.league_scenlist else "noleague"), self.scenario),
+ 0,
+ None,
+ subprocess.PIPE,
+ subprocess.PIPE,
+ subprocess.STDOUT,
+ shell=True,
+ cwd=self.path,
+ encoding=self.config["Clonk"]["encoding"]
+ )
+ self.state = "Lobby"
+ self.readServerOutput()
+ if self.config["Clonk"]["autohost"] == False:
+ self.thread_started = False
+ self.setTopic("Kein laufendes Spiel.")
+ break
+
+ finally:
+ if self.clonk:
+ self.clonk.stdin.close()
+
+ return (CmdResult.Success, "")
+
+ def readServerOutput(self):
+ while True:
+ try:
+ output = self.clonk.stdout.readline()
+
+ if self.config["Clonk"]["RegExps"]["shutdownExp"].match(output):
+ self.clonk.stdin.close()
+ elif output == "" and self.clonk.poll() is not None:
+ if self.clonk:
+ self.clonk.stdin.close()
+ self.clonk = None
+ self.scenario = ""
+ return
+
+ elif output:
+ output = output.splitlines()[0]
+ #output = output.decode("utf-8").splitlines()[0]
+ output = output[(output.find("] ") if output.find("] ") != -1 else -2)+len("] "):]
+
+ if output[0] == ">":
+ output = output[1:]
+
+ part = re.match("<(.*)> ({})(.*)".format(self.config["Clonk"]["prefix"]), output)
+ if part and part.group(0) != self.irc.nick:
+ cmd = part.group(3).split(" ", 1)
+ found = False
+ x = None
+ if len(cmd) > 0:
+ for key in self.commands.keys():
+ if key == cmd[0].splitlines()[0]:
+ found = True
+ try:
+ x = self.commands[key](cmd[1].split(" "), user=part.group(1))
+ except IndexError:
+ x = self.commands[key](user=part.group(1))
+ break
+
+ if not found:
+ self.writeToServer('Unbekannter Befehl: "' + part.group(2) + '"!')
+ if x:
+ if type(x) == tuple and x[1] != "":
+ self.writeToServer(x[1])
+ del x
+ if self.config["Clonk"]["RegExps"]["lobbyStartExp"].match(output):
+ self.state = "Lädt"
+
+ elif self.config["Clonk"]["RegExps"]["startExp"].match(output):
+ self.state = "Läuft"
+ try:
+ cout << output << endl
+ except (UnicodeDecodeError, UnicodeEncodeError):
+ pass
+
+ if self.irc and self.ingamechat == "aktiviert":
+ if output.find("<" + self.irc.nick + ">") == -1:
+ if re.match(r"^<.*>", output) and output.find("[IRC]") == -1 and output.find(self.config["Clonk"]["prefix"]) == -1:
+ self.irc.reply("[Clonk]{}".format(output), to=self.config["Clonk"]["IRC"]["Channels"]["ingame"])
+
+ elif self.config["Clonk"]["RegExps"]["joinExp"].match(output) or self.config["Clonk"]["RegExps"]["leaveExp"].match(output):
+ self.irc.reply(output, to=self.config["Clonk"]["IRC"]["Channels"]["ingame"])
+
+
+ except KeyboardInterrupt:
+ if self.clonk:
+ self.clonk.stdin.close()
+
+ except Exception as e:
+ cerr << str(e) << endl
+ continue
+
+ return (CmdResult.Success, "")
+
+ def doPrivmsg(self, msg):
+ if not (self.irc and self.ingamechat == "aktiviert"):
+ return
+ for channel in msg.args[0].split(","):
+ if channel == self.config["Clonk"]["IRC"]["Channels"]["ingame"] and msg.nick != self.irc.nick:
+ self.writeToServer("[IRC]<{}> {}".format(msg.nick, msg.args[1]))
+
+
+ def writeToServer(self, text=None):
+ if text == None and self.clonk == None:
+ return (CmdResult.RuntimeError, "")
+ elif type(text) != str:
+ try:
+ text = text.decode(self.config["Clonk"]["encoding"])
+ except:
+ raise IOError("Cannot write anything else to the server except the following data types: bytes, str (got {})".format(type(text).__name__))
+
+ if self.clonk and self.clonk.stdin:
+ self.clonk.stdin.write(text + "\n")
+ self.clonk.stdin.flush()
+ return (CmdResult.Success, "")
+
+ def setTopic(self, text=None):
+ if not self.irc:
+ return False
+
+ if type(text) != str:
+ try:
+ text = text.decode(self.config["Clonk"]["encoding"])
+ except:
+ raise TypeError("text must be a string!")
+
+ if self.topic == text:
+ return False
+
+ else:
+ self.topic = text
+ return bool(self.irc.sendMsg(ircmsgs.topic(self.config["Clonk"]["IRC"]["Channels"]["ingame"], text)))
+
+
+ def start(self, time=None, user=""):
+ """Startet das Spiel."""
+ self.stopped = False
+ if time:
+ try:
+ time = int(time[0])
+ except:
+ time = 5
+ self.writeToServer("/start {}".format(time))
+ else:
+ self.writeToServer("/start")
+
+ return (CmdResult.Success, "")
+
+
+ def stop(self, prm=None, user=""):
+ """Stoppt den Countdown."""
+ def stopping(self):
+ while self.clonk and self.stopped:
+ self.writeToServer("/start 60000")
+ if self.stopped == False:
+ return
+ sleep(100)
+
+ if self.stopped == False:
+ self.stopped = True
+ start_new_thread(stopping, (self,))
+
+ return (CmdResult.Success, "")
+
+
+ def help(self, prm=None, user=""):
+ """Gibt die Hilfe aus."""
+ self.writeToServer("Verfügbare Befehle:")
+ for text, function in self.commands.items():
+ self.writeToServer("{} -- {}".format(text, function.__doc__))
+
+ return (CmdResult.Success, "")
+
+
+ def displayQueue(self, prm=None, user=""):
+ """Gibt die Warteschlange aus."""
+ self.writeToServer("Warteschlange:")
+
+ for i,scen in enumerate(self.queue.queue):
+ self.writeToServer("{}. {}".format(i+1, scen))
+
+ return (CmdResult.Success, "")
+
+
+ def list(self, prm=None, user=""):
+ """Zeigt die Szenarioliste an."""
+ self.writeToServer("List:\n-------------")
+ for scen in self.scenlist:
+ self.writeToServer(scen)
+
+ return (CmdResult.Success, "")
+
+ def ircCommands(self, prm=None, user=""):
+ """Enthält Befehle zur Steuerung der IRC-Funktionen."""
+ if not prm:
+ return (CmdResult.SyntaxFail, "")
+
+ if prm[0] == "ingamechat":
+ if prm[1] == "off":
+ self.ingamechat = "deaktiviert"
+ elif prm[1] == "on":
+ self.ingamechat = "aktiviert"
+
+ return (CmdResult.Success, "")
+
+ def IsRunning(self) -> bool:
+ return self.state != "Lobby"
+
+ #
+ # functions
+ #
+
+ def addCommand(self, function, text):
+ self.commands[text.split(" ")[0]] = function
+ return self
+
+ def addScenario(self, link):
+ name = ""
+ for item in link.split("/"):
+ if re.match(r"(.*)\.[oc][c4]s",item):
+ name = item
+ break
+
+ site = urllib.request.urlopen(link).read() #WARNING: Raises an error if the link is invalid!
+ with open(os.path.join(self.path, name),"wb") as fobj:
+ fobj.write(site)
+
+ try:
+ self.scenlist.index(name)
+ except Exception:
+ self.scenlist.append(name)
+ Pickler(open(os.path.join(self.path, "scenlist"), "wb")).dump(self.scenlist)
+ return self
+
#
#
#
def PyCRCtrlInit():
- """
- Initializes a PyCRCtrl - object. Use this function if you want to control only one object.
- """
- pass
+ """
+ Initializes a PyCRCtrl - object. Use this function if you want to control only one object.
+ """
+ pass