2025-11-13 10:36:09 +01:00

224 lines
7.6 KiB
Python

from functools import wraps
from bottle import Bottle, request, response
from simple_chat_api.config import JWT_SECRET, hash_context
import jwt
from bottle import request, response
import time
from json import dumps, loads
from simple_chat_api.utils import read_keys_from_request
app = Bottle()
def username_by_token(request) -> str | None:
"""
Extract username from JWT token in request cookies.
Returns non on jwt.ExpiredSignatureError and jwt.InvalidTokenError
:param request: Bottle request object
:return: username or None
"""
token = request.get_cookie("oauth2")
if not token:
return None
try:
decoded = jwt.decode(token, JWT_SECRET, algorithms=["HS256"], options={"verify_sub": False})
curent_time = time.time()
if decoded.get("exp", float("inf")) + decoded.get("iat", float("inf")) < curent_time:
return None
return decoded["sub"]["user"]
except (jwt.ExpiredSignatureError, jwt.InvalidTokenError) as e:
print(f"Token error: {e}")
return None
def user_guard(reyection_msg: str = "Requires authentication", allow_anonymous: bool = False):
def user_guard_decorator(fn: callable):
@wraps(fn)
def wrapper(*args, **kwargs):
"""
Given a bottle request, try authenticating the user by its cookies.
If authentication fails, return a 401 with the given reyection_msg.
Otherwise, call the decorated function with the user as first argument.
:param args: arguments to pass to decorated function
:param kwargs: keyword arguments to pass to decorated function
:return: decorated function result or 401 rejection
"""
username = username_by_token(request)
if not username and not allow_anonymous:
response.status = 401
return dumps({"error": reyection_msg})
if username:
user = request.db_connector.get_user(username)
return fn(user, *args, **kwargs)
return wrapper
return user_guard_decorator
def admin_guard(reyection_msg: str = "Requires admin priveledges"):
def admin_guard_decorator(fn: callable):
@wraps(fn)
@user_guard(reyection_msg)
def wrapper(user, *args, **kwargs):
"""
Check if an authenticated user has admin priveledges.
user_guard is used to ensure authentication.
"""
if user.role != "admin":
response.status = 401
return dumps({"error": reyection_msg})
return fn(user, *args, **kwargs)
return wrapper
return admin_guard_decorator
@app.route("/token", method=["POST"])
def token():
"""
Login endpoint that returns a JWT token on successful authentication.
Expects JSON body with "user" and "password" fields.
Sets JWT token as cookie "oauth2".
:return: JSON with JWT token or error message
"""
body = request.body.read()
try:
data = loads(body.decode('utf-8'))
username = data.get("user")
password = data.get("password")
except:
response.status = 400
return dumps({"error": "Invalid JSON format"})
user = request.db_connector.get_user(username)
if not user or not hash_context.verify(password, user.hash):
response.status = 401
return dumps({"error": "Invalid username or password"})
jwt_content = {
"sub": {
"user": user.name,
"role": user.role
},
"iat": time.time(),
"exp": time.time() + 60 * 20 # 20 minutes expiration
}
token = jwt.encode(jwt_content, JWT_SECRET, algorithm="HS256")
response.set_cookie("oauth2", token, max_age=60*20, path='/', samesite='lax')
response.status = 200
return dumps(jwt_content)
@app.route("/", method=["GET"])
def get_user():
"""
Get the username of the authenticated user from the JWT token.
If no valid token is present, returns 401 Unauthorized.
This is a helper endpoint with no real usecase.
"""
username = username_by_token(request)
if not username:
response.status = 401
response.content_type = 'application/json'
return dumps({"error": "Unauthorized"})
return dumps({"name": username})
@app.route("/add", method=["POST"])
@admin_guard()
def add_user(user):
"""
Add a new user to the system.
Expects JSON body with "new_user", "new_password", and optional "new_admin" fields.
Rejected by methodes of admin_guard if the requester is not an admin.
:param user: authenticated admin user object
:return: JSON message indicating success or error
"""
data = read_keys_from_request(["new_user", "new_password", "new_admin"])
role = "admin" if data.get("new_admin", False) else "user"
response.content_type = 'application/json'
try:
request.db_connector.add_user(data["new_user"], hash_context.hash(data["new_password"]), role)
response.status = 200
return dumps({"message": "User created successfully"})
except ValueError as e:
response.status = 400
return dumps({"error": str(e)})
@app.route("/changePassword", method=["POST"])
@user_guard()
def change_password(user):
"""
Request password change for the authenticated user.
Rejected by user_guard if the requester is not authenticated.
Rejected whe the old password does not match.
Expects JSON body with "old_password" and "new_password" fields.
:param user: authenticated user object
:return: JSON message indicating success or error
"""
data = read_keys_from_request(["new_password", "old_password"])
response.content_type = 'application/json'
try:
if not hash_context.verify(data["old_password"], user.hash):
response.status = 400
return dumps({"error": "Old password is incorrect"})
request.db_connector.change_password(user.name, hash_context.hash(data["new_password"]))
response.status = 200
return dumps({"message": "Password changed successfully"})
except ValueError as e:
response.status = 400
return dumps({"error": str(e)})
@app.route("/delete/<deletion_target>", method=["POST"])
@user_guard()
def delete_user(user, deletion_target: str):
"""
Delete a user from the system.
The authenticated user can delete itself or an admin can delete any user.
Expects the username of the user to delete as a URL parameter.
:param user: authenticated user object
:param deletion_target: username of the user to delete
:return: JSON message indicating success or error
"""
response.content_type = 'application/json'
is_admin = admin_guard()(lambda _: True)() == True
# Note: we could just use user.role == "admin" but we
# want to follow potential requirement changes in the decorator
if user.name != deletion_target and not is_admin:
response.status = 401
return dumps({"error": "Not permited"})
try:
request.db_connector.delete_user(deletion_target)
response.status = 200
return dumps({"message": "User deleted successfully"})
except ValueError as e:
response.status = 400
return dumps({"error": str(e)})
@app.route("/getAll", method=["GET"])
@admin_guard()
def get_all_users(_):
"""
Get a list of all users in the system.
Rejected by admin_guard if the requester is not an admin.
:return: JSON list of users with their names and roles
"""
response.content_type = 'application/json'
users = request.db_connector.get_user(None)
user_list = [{"name": u.name, "role": u.role} for u in users]
return dumps(user_list)