|
|
|
from functools import wraps
|
|
|
|
from flask import request, jsonify
|
|
|
|
from sqlalchemy import select, and_
|
|
|
|
from ..db.model import User, Session, db
|
|
|
|
from ..constants import UserRole
|
|
|
|
from typing import Union
|
|
|
|
|
|
|
|
def requires_role(roles: Union[None, UserRole] = None):
|
|
|
|
if roles is None:
|
|
|
|
roles = [UserRole.USER]
|
|
|
|
roles = [int(r) for r in roles]
|
|
|
|
def decorator(f):
|
|
|
|
@wraps(f)
|
|
|
|
def decorated_function(*args, **kwargs):
|
|
|
|
auth_header = request.headers.get('Authorization')
|
|
|
|
if not auth_header:
|
|
|
|
return jsonify({'error': 'No authorization header sent'}), 401
|
|
|
|
try:
|
|
|
|
session_key = auth_header.split(' ')[1]
|
|
|
|
except IndexError:
|
|
|
|
return jsonify({'error': 'Invalid authorization header format'}), 401
|
|
|
|
session: Session = db.session.execute(
|
|
|
|
select(Session).where(and_(Session.key == session_key, Session.isValid == True))
|
|
|
|
).scalar()
|
|
|
|
if not session:
|
|
|
|
return jsonify({'error': 'Invalid or expired session'}), 401
|
|
|
|
user = session.user
|
|
|
|
if not user:
|
|
|
|
return jsonify({'error': 'User not found for the Access token'}), 401
|
|
|
|
# If no roles specified, allow access
|
|
|
|
if not roles:
|
|
|
|
return f(*args, **kwargs)
|
|
|
|
if user.role in roles:
|
|
|
|
return f(*args, **kwargs)
|
|
|
|
return jsonify({'error': 'Not authorized'}), 403
|
|
|
|
return decorated_function
|
|
|
|
return decorator
|