# -*- coding:utf-8 -*-
#This program is free software: you can redistribute it and/or modify
#it under the terms of the GNU General Public License as published by
#the Free Software Foundation, either version 3 of the License, or
#(at your option) any later version.
#This program is distributed in the hope that it will be useful,
#but WITHOUT ANY WARRANTY; without even the implied warranty of
#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
#GNU General Public License for more details.
#You should have received a copy of the GNU General Public License
#along with this program. If not, see .
import sys
#import socket
import subprocess
import urllib.request
import urllib.error
import urllib.parse
from _thread import start_new_thread
#import inspect
import queue
import os
import re
#import signal
import json
import base64
import supybot.log
import supybot.ircmsgs as ircmsgs
from pickle import Pickler, Unpickler
from PyQt5.QtCore import *
from time import sleep
from platform import architecture
from io import BytesIO
#iostream import
try:
from iostream import *
except ModuleNotFoundError:
with open("iostream.py","wb") as fobj:
fobj.write(urllib.request.urlopen("https://raw.githubusercontent.com/Fulgen301/pythonprograms/master/iostream/iostream.py").read())
from iostream import *
import random
import tarfile
from gzip import GzipFile
from time import sleep
from enum import IntEnum
#
# Helpers
#
class QString(str):
pass
class QList(QObject):
""" QStringList """
list = list()
typeclass = None
def __init__(self, *args):
QObject.__init__(self)
self.list = list(args)
def __getitem__(self, name):
if type(name) != int:
raise ValueError("Wrong datatype for name!")
else:
try:
return self.list[name]
except IndexError as e:
raise IndexError(e.args[0]) from None
def __lshift__(self, other):
if self.typeclass and not isinstance(other, self.typeclass):
raise ValueError("Wrong datatype")
else:
self.list.append(other)
return self
def __rshift__(self, other):
if isinstance(other, list) or isinstance(other, QList):
item = self.list[-1]
try:
other.append(item)
self.list.pop()
except Exception:
raise TypeError("Cannot pass item to {}!".format(other)) from None
def __call__(self):
return self.list
def __len__(self):
return len(self.list)
def __repr__(self):
return "{}({})".format(self.__class__.__name__, self.list)
# Methods
def isEmpty(self):
return len(self.list) == 0
def append(self, item):
return self.__lshift__(item)
class QStringList(QList):
typeclass = str
class CmdResult(IntEnum):
UnknownCommand = -1
Success = 0
SyntaxFail = 1
RightsFail = 2
RuntimeError = 3
class Capability(IntEnum):
Unknown = -1
User = 0
Admin = 1
Moderator = 2
class Updater(object):
parent = None
__current_revision = ""
lookuptable = {"64bit" : "amd64", "32bit" : "i386"}
def __init__(self, parent):
self.parent = parent
with open(os.path.join(self.parent.path, "snapshot.id"), "rb") as fobj:
self.__current_revision = fobj.read().decode("utf-8")
start_new_thread(self.checkForUpdates, ())
@property
def current_revision(self):
return self.__current_revision
@current_revision.setter
def current_revision(self, other):
self.__current_revision = other
if type(other) in [QString, str]:
try:
other = other.encode("utf-8")
except Exception:
raise TypeError("Wrong datatype!") from None
with open(os.path.join(self.parent.path, "snapshot.id"), "wb") as fobj:
fobj.write(other)
def checkForUpdates(self):
while True:
try:
site = urllib.request.urlopen("http://openclonk.org/nightly-builds").read().decode("utf-8").split(" bool:
if self.path == None:
return False
with open(os.path.join(self.path,"scenlist"), "rb") as fobj:
self.scenlist.list = Unpickler(fobj).load()
with open(os.path.join(self.path, "scenlist.league"), "rb") as fobj:
self.league_scenlist.list = Unpickler(fobj).load()
return True
def loadCapabilities(self) -> bool:
if self.path == None:
return False
self._capa_old = json.load(open(os.path.join(self.path, "capabilities.conf"), "r"))
return True
def loadConfigFile(self, config) -> bool:
if self.path == None:
return False
self.config = json.load(open(os.path.join(self.path, config),"r"))
return True
def decodeRegExp(self, regexp):
return self.codec.toUnicode(base64.b64decode(regexp))
def setConfigEntry(self, entry="", value=None):
if type(self.config) != dict:
return
try:
self.config[entry] = value
finally:
json.dump(self.config, open(os.path.join(self.path, "pycrctrl.conf"), "w"),indent=4)
return True
def host(self, scenario=None, user=None) -> str:
if scenario == None:
return (CmdResult.SyntaxFail, "Bitte gib einen Szenarionamen an!")
if type(scenario) in [bytes, QByteArray]:
try:
scenario = scenario.decode("utf-8")
except:
return (CmdResult.SyntaxFail, "Unbekannter Datentyp!")
scenario = scenario.splitlines()[0]
if scenario == "random":
scenario = random.choice(self.scenlist)
elif scenario not in self.scenlist():
return (CmdResult.SyntaxFail,'Szenario "{}" wurde nicht gefunden!'.format(scenario))
if self.thread_started == False:
self.scenario = scenario
self.thread_started = True
start_new_thread(self.startClonk,())
return (CmdResult.Success, 'Szenario "{}" wird jetzt gehostet.'.format(scenario))
if self.queue.full() == False:
self.queue.put(scenario)
return (CmdResult.Success, 'Szenario "{}" wurde der Warteschlange hinzugefügt.'.format(scenario))
else:
return (CmdResult.RuntimeError, "Die Warteschlange ist voll!")
def startClonk(self):
try:
while True:
if self.scenario == "":
if self.queue.empty() == False:
self.scenario = self.queue.get()
else:
self.scenario = random.choice(self.scenlist)
self.capabilities = self._capa_old
self.clonk = subprocess.Popen(
'./{} {} "{}"'.format(self.config.get("engine"), self.config.get("commandline") + (" --" if self.config.get("engine") == "openclonk-server" else " /") + ("league" if self.scenario in self.league_scenlist() else "noleague"),self.scenario),
0,
None,
subprocess.PIPE,
subprocess.PIPE,
subprocess.STDOUT,
shell=True,
cwd=self.path
)
self.state = "Lobby"
self.readServerOutput()
if self.config.get("autohost") == False:
self.thread_started = False
self.setTopic("Kein laufendes Spiel.")
break
finally:
if self.clonk:
self.clonk.stdin.close()
return (CmdResult.Success, "")
def readServerOutput(self):
while True:
try:
output = self.clonk.stdout.readline()
if bool(re.match(self.decodeRegExp(self.config.get("RegExps")["shutdownExp"]), output.decode("utf-8"))):
self.clonk.stdin.close()
elif output == b"" and self.clonk.poll() is not None:
if self.clonk:
self.clonk.stdin.close()
self.clonk = None
self.scenario = ""
return
elif output:
#output = self.codec.toUnicode(output).splitlines()[0]
output = output.decode("utf-8").splitlines()[0]
output = output[(output.find("] ") if output.find("] ") != -1 else -2)+len("] "):]
if output[0] == ">":
output = output[1:]
part = re.match("<(.*)> ({})(.*)".format(self.config.get("prefix")), output)
if part and part.group(0) != self.irc.nick:
cmd = part.group(3).split(" ", 1)
found = False
x = None
if len(cmd) > 0:
for key in self.commands.keys():
if key == cmd[0].splitlines()[0]:
found = True
try:
x = self.commands[key](cmd[1].split(" "), user=part.group(1))
except IndexError:
x = self.commands[key](user=part.group(1))
break
if not found:
self.writeToServer('Unbekannter Befehl: "' + part.group(2) + '"!')
if x:
if type(x) == tuple and x[1] != "":
self.writeToServer(x[1])
del x
if bool(re.match(self.decodeRegExp(self.config.get("RegExps")["lobbyStartExp"]), output)):
self.state = "Lädt"
elif bool(re.match(self.decodeRegExp(self.config.get("RegExps")["startExp"]), output)):
self.state = "Läuft"
try:
cout << output << endl
except (UnicodeDecodeError, UnicodeEncodeError):
pass
if self.irc and self.ingamechat == "aktiviert":
if output.find("<" + self.irc.nick + ">") == -1:
if re.match(r"^<.*>", output) and output.find("[IRC]") == -1 and output.find(self.config.get("prefix")) == -1:
self.irc.reply("[Clonk]{}".format(output), to=self.config.get("channels")["ingame"])
elif re.match(self.decodeRegExp(self.config.get("RegExps")["joinExp"]), output) or re.match(self.decodeRegExp(self.config.get("RegExps")["leaveExp"]), output):
self.irc.reply(output, to=self.config.get("channels")["ingame"])
except KeyboardInterrupt:
if self.clonk:
self.clonk.stdin.close()
except Exception as e:
cerr << str(e) << endl
continue
return (CmdResult.Success, "")
def doPrivmsg(self, msg):
if not (self.irc and self.ingamechat == "aktiviert"):
return
for channel in msg.args[0].split(","):
if channel == self.config.get("channels")["ingame"] and msg.nick != self.irc.nick:
self.writeToServer("[IRC]<{}> {}".format(msg.nick, msg.args[1]))
def writeToServer(self, text=None):
if text == None and self.clonk == None:
return (CmdResult.RuntimeError, "")
elif type(text) not in [bytes, QByteArray]:
try:
text = text.encode("utf-8")
except:
raise IOError("Cannot write anything else to the server except the following data types: QString, bytes, str, QByteArray")
if self.clonk and self.clonk.stdin:
self.clonk.stdin.write(bytes(text) + b"\n")
self.clonk.stdin.flush()
return (CmdResult.Success, "")
def setTopic(self, text=None):
if not self.irc:
return False
if type(text) not in [str, QString]:
try:
text = self.codec.toUnicode(text)
except:
raise TypeError("text must be a string!")
if self.topic == text:
return False
else:
self.topic = text
return bool(self.irc.sendMsg(ircmsgs.topic(self.config.get("channels")["ingame"], text)))
def admin(self, prm=None, user=""):
"""Setzt den Rundenadmin."""
try:
if not prm:
raise IndexError
prm = " ".join(prm)
if (self.capabilities[prm], self.capabilities[user]) > (Capability.Admin, Capability.Admin):
return (CmdResult.RuntimeError, "Du hast bereits mehr Rechte als ein Rundenadmin!")
except KeyError:
pass
except IndexError:
return (CmdResult.SyntaxFail, "Nicht genügend Argumente!")
for u, c in self.capabilities.items():
if c == Capability.Admin:
return (CmdResult.RuntimeError, "{} ist bereits Rundenadmin!".format(u))
self.capabilities[prm] = self.capabilities[user] = Capability.Admin
return (CmdResult.Success, "{} (Spielername: {}) wurde als Rundenadmin eingetragen!".format(user, prm))
def start(self, time=None, user=""):
"""Startet das Spiel."""
if not self.checkCapability(Capability.Admin, user): return (CmdResult.RightsFail, "Du hast nicht die nötige Berechtigung.".encode("utf-8"))
self.stopped = False
if time:
try:
time = int(time[0])
except:
time = 5
self.writeToServer("/start {}".format(time))
else:
self.writeToServer(b"/start")
return (CmdResult.Success, "")
def stop(self, prm=None, user=""):
"""Stoppt den Countdown."""
if not self.checkCapability(Capability.Admin, user): return (CmdResult.RightsFail, "Du hast nicht die nötige Berechtigung.".encode("utf-8"))
def stopping(self):
while self.clonk and self.stopped:
self.writeToServer("/start 60000")
if self.stopped == False:
return
sleep(100)
if self.stopped == False:
self.stopped = True
start_new_thread(stopping, (self,))
return (CmdResult.Success, "")
def help(self, prm=None, user=""):
"""Gibt die Hilfe aus."""
self.writeToServer("Verfügbare Befehle:".encode("utf-8"))
for text, function in self.commands.items():
self.writeToServer("{} -- {}".format(text, function.__doc__))
return (CmdResult.Success, "")
def displayQueue(self, prm=None, user=""):
"""Gibt die Warteschlange aus."""
self.writeToServer(b"Warteschlange:")
for i,scen in enumerate(self.queue.queue):
self.writeToServer("{}. {}".format(i+1, scen))
return (CmdResult.Success, "")
def list(self, prm=None, user=""):
"""Zeigt die Szenarioliste an."""
self.writeToServer(b"List:\n-------------")
for scen in self.scenlist:
self.writeToServer(scen)
return (CmdResult.Success, "")
def ircCommands(self, prm=None, user=""):
"""Enthält Befehle zur Steuerung der IRC-Funktionen."""
if not self.checkCapability(Capability.Admin, user): return (CmdResult.RightsFail, "Du hast nicht die nötige Berechtigung.".encode("utf-8"))
if not prm:
return (CmdResult.SyntaxFail, "")
if prm[0] == "ingamechat":
if prm[1] == "off":
self.ingamechat = "deaktiviert"
elif prm[1] == "on":
self.ingamechat = "aktiviert"
return (CmdResult.Success, "")
def IsRunning(self) -> bool:
return self.state != "Lobby"
#
# functions
#
def addCommand(self, function, text):
self.commands[text.split(" ")[0]] = function
return self
def addScenario(self, link):
name = ""
for item in link.split("/"):
if re.match(r"(.*)\.[oc][c4]s",item):
name = item
break
site = urllib.request.urlopen(link).read() #WARNING: Raises an error if the link is invalid!
with open(os.path.join(self.path, name),"wb") as fobj:
fobj.write(site)
try:
self.scenlist().index(name)
except Exception:
self.scenlist().append(name)
Pickler(open(os.path.join(self.path, "scenlist"), "wb")).dump(self.scenlist())
return self
def checkCapability(self, capability, user=""):
if not user:
return False
try:
return (True if self.capabilities[user] >= capability else False)
except KeyError:
return False
#
#
#
def PyCRCtrlInit():
"""
Initializes a PyCRCtrl - object. Use this function if you want to control only one object.
"""
pass