118 lines
3.5 KiB
Python
118 lines
3.5 KiB
Python
from functools import wraps
|
|
from bottle import Bottle, request, response
|
|
from config import JWT_SECRET
|
|
from config import hash_context
|
|
|
|
|
|
import jwt
|
|
from bottle import request, response
|
|
|
|
|
|
import time
|
|
from json import dumps, loads
|
|
|
|
from utils import read_keys_from_request
|
|
|
|
app = Bottle()
|
|
|
|
|
|
|
|
def user_guard(reyection_msg: str = "Requires authentication", allow_anonymous: bool = False):
|
|
def user_guard_decorator(fn: callable):
|
|
@wraps(fn)
|
|
def wrapper(*args, **kwargs):
|
|
username = username_by_token(request)
|
|
if not username and not allow_anonymous:
|
|
response.status = 401
|
|
return dumps({"error": reyection_msg})
|
|
if username:
|
|
user = request.db_connector.get_user(username)
|
|
return fn(user, *args, **kwargs)
|
|
return wrapper
|
|
return user_guard_decorator
|
|
|
|
def admin_guard(reyection_msg: str = "Requires admin priveledges"):
|
|
def admin_guard_decorator(fn: callable):
|
|
@wraps(fn)
|
|
@user_guard(reyection_msg)
|
|
def wrapper(user, *args, **kwargs):
|
|
if user.role != "admin":
|
|
response.status = 401
|
|
return dumps({"error": reyection_msg})
|
|
return fn(user, *args, **kwargs)
|
|
return wrapper
|
|
return admin_guard_decorator
|
|
|
|
|
|
@app.route("/token", method=["POST"])
|
|
def token():
|
|
body = request.body.read()
|
|
try:
|
|
data = loads(body.decode('utf-8'))
|
|
username = data.get("user")
|
|
password = data.get("password")
|
|
except:
|
|
response.status = 400
|
|
return dumps({"error": "Invalid JSON format"})
|
|
|
|
user = request.db_connector.get_user(username)
|
|
if not user or not hash_context.verify(password, user.hash):
|
|
response.status = 401
|
|
return dumps({"error": "Invalid username or password"})
|
|
|
|
jwt_content = {
|
|
"sub": {
|
|
"user": user.name,
|
|
"role": user.role
|
|
},
|
|
"iat": time.time(),
|
|
"exp": time.time() + 60 * 20 # 20 minutes expiration
|
|
}
|
|
token = jwt.encode(jwt_content, JWT_SECRET, algorithm="HS256")
|
|
response.set_cookie("oauth2", token, max_age=60*20, path='/', samesite='lax')
|
|
response.status = 200
|
|
return dumps(jwt_content)
|
|
|
|
|
|
def username_by_token(request) -> str | None:
|
|
token = request.get_cookie("oauth2")
|
|
if not token:
|
|
return None
|
|
|
|
try:
|
|
decoded = jwt.decode(token, JWT_SECRET, algorithms=["HS256"], options={"verify_sub": False})
|
|
curent_time = time.time()
|
|
if decoded.get("exp", float("inf")) + decoded.get("iat", float("inf")) < curent_time:
|
|
return None
|
|
return decoded["sub"]["user"]
|
|
except (jwt.ExpiredSignatureError, jwt.InvalidTokenError) as e:
|
|
print(f"Token error: {e}")
|
|
return None
|
|
|
|
|
|
@app.route("/", method=["GET"])
|
|
def get_user():
|
|
username = username_by_token(request)
|
|
if not username:
|
|
response.status = 401
|
|
response.content_type = 'application/json'
|
|
return dumps({"error": "Unauthorized"})
|
|
|
|
return dumps({"name": username})
|
|
|
|
@app.route("/add", method=["POST"])
|
|
@admin_guard()
|
|
def add_user(user):
|
|
data = read_keys_from_request(["new_user", "new_password", "new_admin"])
|
|
role = "admin" if data.get("new_admin", False) else "user"
|
|
response.content_type = 'application/json'
|
|
try:
|
|
request.db_connector.add_user(data["new_user"], hash_context.hash(data["new_password"]), role)
|
|
response.status = 200
|
|
return dumps({"message": "User created successfully"})
|
|
except ValueError as e:
|
|
response.status = 400
|
|
return dumps({"error": str(e)})
|
|
|
|
|