2025-07-31 00:41:19 +02:00

131 lines
4.0 KiB
Python

from bottle import response, request, Bottle
from json import dumps, loads
import time
from db_handler import DbConnector
from functools import wraps
import jwt
from passlib.context import CryptContext
import bcrypt
# Needet because of: https://github.com/pyca/bcrypt/issues/684
if not hasattr(bcrypt, '__about__'):
bcrypt.__about__ = type('about', (object,), {'__version__': bcrypt.__version__})
def user_guard(reyection_msg: str = "Requires authentication", allow_anonymous: bool = False):
def user_guard_decorator(fn: callable):
@wraps(fn)
async def wrapper(*args, **kwargs):
username = username_by_token(request)
if not username and not allow_anonymous:
response.status = 401
return dumps({"error": reyection_msg})
if username:
user = request.db_connector.get_user(username)
return await fn(user, *args, **kwargs)
return wrapper
return user_guard_decorator
def admin_guard(reyection_msg: str = "Requires admin priveledges"):
def admin_guard_decorator(fn: callable):
@wraps(fn)
@user_guard(reyection_msg)
async def wrapper(user, *args, **kwargs):
if user.role != "admin":
response.status = 401
return dumps({"error": reyection_msg})
return await fn(user, *args, **kwargs)
return wrapper
return admin_guard_decorator
def read_keys_from_request(keys: None|dict = None):
result = {}
try:
body = request.body.read()
data = loads(body.decode('utf-8'))
except:
return result
if keys:
missing_keys = [key for key in keys if key not in data]
if missing_keys:
raise ValueError(f"Missing required keys: {', '.join(missing_keys)}")
data = {key: data[key] for key in keys}
return data
hash_context = CryptContext(schemes=["bcrypt"])
app = Bottle()
@app.route("/auth/token", method=["POST"])
def token():
body = request.body.read()
try:
data = loads(body.decode('utf-8'))
username = data.get("user")
password = data.get("password")
except:
response.status = 400
return dumps({"error": "Invalid JSON format"})
user = request.db_connector.get_user(username)
if not user or not hash_context.verify(password, user.hash):
response.status = 401
return dumps({"error": "Invalid username or password"})
jwt_content = {
"sub": {
"user": user.name,
},
"iat": time.time(),
"exp": 60 * 20
}
token = jwt.encode(jwt_content, "secret", algorithm="HS256")
response.set_cookie("oauth2", token, max_age=60*20, path='/')
response.status = 200
return dumps(jwt_content)
def username_by_token(request) -> str | None:
token = request.get_cookie("oauth2")
if not token:
return None
try:
decoded = jwt.decode(token, "secret", algorithms=["HS256"])
curent_time = time.time()
if decoded.get("exp", float("inf")) + decoded.get("iat", float("inf")) < curent_time:
return None
return decoded["sub"]["user"]
except (jwt.ExpiredSignatureError, jwt.InvalidTokenError):
return None
@app.route("/user", method=["GET"])
def get_user():
username = username_by_token(request)
if not username:
response.status = 401
return dumps({"error": "Unauthorized"})
return dumps({"name": username})
@app.route("/user_add", method=["POST"])
@admin_guard()
def add_user(user):
data = read_keys_from_request(["new_user", "new_password"])
request.db_connector.add_user(data["new_user"], hash_context.genhash)
def initialize_app():
db = DbConnector("sqlite:///./data/db.sqlite")
@app.hook('before_request')
def attach_resources():
request.db_connector = db
body = request.body.read()
return app
if __name__ == "__main__":
initialize_app()
app.run(host='localhost', port=8080, debug=True)