# Copyright (c) 2018, George Tokmaji # Permission to use, copy, modify, and/or distribute this software for any # purpose with or without fee is hereby granted, provided that the above # copyright notice and this permission notice appear in all copies. # THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF # OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. import sys import os, re, json, math import requests, hashlib from bottle import route, run, Bottle, request, static_file, response, hook, HTTPResponse, JSONPlugin, install import threading os.chdir(os.path.dirname(__file__)) from .database import * BLOCKSIZE = 1024 ** 2 locks = { "io" : threading.Lock() } def with_lock(k): def wrapper(f): def func(*args, **kwargs): with locks[k]: return f(*args, **kwargs) return func return wrapper def calculateHashForResource(resource : requests.Response) -> object: hashobj = hashlib.sha1() calculateHashForFile(resource.raw, hashobj) assert(resource.raw.tell()) if "content-length" not in resource.headers: resource.headers["Content-Length"] = resource.raw.tell() return hashobj def calculateHashForFile(file, hashobj : object = None) -> object: if hashobj is None: hashobj = hashlib.sha1() while True: block = file.read(BLOCKSIZE) if not block: break hashobj.update(block) return hashobj def notAllowed(): raise HTTPResponse(f"Cannot {request.method} {request.path}") @hook('after_request') def enable_cors(): response.headers["Access-Control-Allow-Origin"] = "*" # Auth def calculateUserHash(username : str, password : str) -> object: return hashlib.sha512(hashlib.sha512(username.encode("utf-8")).digest() + hashlib.sha512(password.encode("utf-8")).digest()) def auth_basic(f): def checkAuth(*args, **kwargs): session = DBSession() try: User.query.filter_by(name=requests.forms["username"], hash=calculateUserHash(request.forms["username"], request.forms["password"]).hexdigest()).first() except db.orm.exc.NoResultFound: return HTTPResponse(status=401) del request.forms["password"] return f(*args, **kwargs) return checkAuth class ParryEncoder(json.JSONEncoder): _default = json.JSONEncoder.default def default(self, obj): if isinstance(obj, ObjectId): return str(obj) return self._default(obj) install(JSONPlugin(json_dumps=lambda s: json.dumps(s, cls=ParryEncoder)))