|
|
|
@ -1,9 +1,14 @@ |
|
|
|
|
from functools import wraps |
|
|
|
|
from flask import request, jsonify |
|
|
|
|
from sqlalchemy import select |
|
|
|
|
from sqlalchemy import select, and_ |
|
|
|
|
from ..db.model import User, Session, db |
|
|
|
|
from ..constants import UserRole |
|
|
|
|
from typing import Union |
|
|
|
|
|
|
|
|
|
def requires_role(roles=[]): |
|
|
|
|
def requires_role(roles: Union[None, UserRole] = None): |
|
|
|
|
if roles is None: |
|
|
|
|
roles = [UserRole.USER] |
|
|
|
|
roles = [int(r) for r in roles] |
|
|
|
|
def decorator(f): |
|
|
|
|
@wraps(f) |
|
|
|
|
def decorated_function(*args, **kwargs): |
|
|
|
@ -14,26 +19,19 @@ def requires_role(roles=[]): |
|
|
|
|
session_key = auth_header.split(' ')[1] |
|
|
|
|
except IndexError: |
|
|
|
|
return jsonify({'error': 'Invalid authorization header format'}), 401 |
|
|
|
|
|
|
|
|
|
session = db.session.execute( |
|
|
|
|
|
|
|
|
|
) |
|
|
|
|
session: Session = db.session.execute( |
|
|
|
|
select(User).where(and_(Session.key == session_key, Session.isValid == True)) |
|
|
|
|
).scalar() |
|
|
|
|
if not session: |
|
|
|
|
return jsonify({'error': 'Invalid or expired session'}), 401 |
|
|
|
|
user = User.query.get(session.userID) |
|
|
|
|
user = session.user |
|
|
|
|
if not user: |
|
|
|
|
return jsonify({'error': 'User not found'}), 401 |
|
|
|
|
|
|
|
|
|
return jsonify({'error': 'User not found for the Access token'}), 401 |
|
|
|
|
# If no roles specified, allow access |
|
|
|
|
if not roles: |
|
|
|
|
return f(*args, **kwargs) |
|
|
|
|
|
|
|
|
|
# Check if user has any of the required roles |
|
|
|
|
if user.role in roles: |
|
|
|
|
return f(*args, **kwargs) |
|
|
|
|
|
|
|
|
|
return jsonify({'error': 'Insufficient permissions'}), 403 |
|
|
|
|
|
|
|
|
|
return jsonify({'error': 'Not authorized'}), 403 |
|
|
|
|
return decorated_function |
|
|
|
|
|
|
|
|
|
return decorator |