# Copyright (c) 2018, George Tokmaji # Permission to use, copy, modify, and/or distribute this software for any # purpose with or without fee is hereby granted, provided that the above # copyright notice and this permission notice appear in all copies. # THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF # OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. import os, hashlib, base64, jwt, json from .database import * from bottle import install, HTTPResponse, route, request, default_app, post from bottle_jwt import JWTProviderPlugin, jwt_auth_required, JWTProvider def calculateUserHash(username : str, password : str) -> object: return hashlib.sha512(hashlib.sha512(username.encode("utf-8")).digest() + hashlib.sha512(password.encode("utf-8")).digest()) class AuthBackend(object): def authenticate_user(self, username, password): if username is None or password is None: raise HTTPResponse("Username or password missing", 400) session = DBSession() try: user = session.query(User).filter_by(name=username, hash=calculateUserHash(username, password).hexdigest()).one() except db.orm.exc.NoResultFound: return return {"id" : str(user.id), "name" : user.name, "email" : user.email} def get_user(self, user_id): session = DBSession() user = session.query(User).get(user_id) if user: return {"id" : str(user.id), "name" : user.name, "email" : user.email} def custom_create_token(self, user, ttl=None): user_id = json.dumps(user[self.id_field]).encode("utf-8") payload = { "sub" : base64.b64encode(user_id).decode("utf-8"), "name" : user["name"], "email" : user["email"] } if self.ttl: payload["exp"] = datetime.utcnow() + datetime.timedelta(seconds=ttl) if ttl else self.expires return jwt.encode(payload, self.secret, algorithm=self.algorithm), payload["exp"] JWTProvider.create_token = custom_create_token install(JWTProviderPlugin( keyword="jwt", auth_endpoint="/api/auth", backend=AuthBackend(), fields=("username", "password"), secret=os.environ["PARRY_SECRET"], ttl=30 * 60 )) def auth_basic(f): def checkAuth(*args, **kwargs): session = DBSession() try: User.query.filter_by(name=request.forms["username"], hash=calculateUserHash(request.forms["username"], request.forms["password"]).hexdigest()).first() except db.orm.exc.NoResultFound: return HTTPResponse(status=401) del request.forms["password"] return f(*args, **kwargs) return checkAuth def get_user(session : DBSession): try: return session.query(User).filter_by(name=request.get_user()["name"]).one() except db.orm.exc.NoResultFound: raise HTTPResponse(status=401)