from email.policy import default from flask import Blueprint, request, jsonify, current_app, g from werkzeug.utils import secure_filename from datetime import datetime from utils.auth import auth_required, requires_role from db.model import db from db.model import User, Session, UserRole # Adjust based on your model's location # from constants import UserRole from werkzeug.security import generate_password_hash,check_password_hash import uuid import os from config import * from utils.utils import password_check_sanity,is_valid_email,InsecurePasswordException from sqlalchemy.exc import IntegrityError from sqlalchemy import select, and_ session = Blueprint('session', __name__) @session.route('/create', methods=['POST']) def login(): """ Handle user login. """ data = request.form # Expecting JSON body # Extract credentials from request # username = data.get('username') email = data.get('email') password = data.get('password') user_agent = request.headers.get('User-Agent', 'Unknown') # Validate required fields if not email or not password: return jsonify({"error": "email and password are required"}), 400 # Find the user by username # user = User.query.filter_by(username=username).first() user = User.query.filter_by(email=email).first() if not user: return jsonify({"error": "Invalid email or password"}), 401 # Verify the password if not check_password_hash(user.hash_password, password): return jsonify({"error": "Invalid email or password"}), 401 # Create a new session session_key = str(uuid.uuid4()) # Generate a unique session key new_session = Session( userID=user.id, user=user, # Pass the user object here key=session_key, ua=user_agent, creationDate=datetime.utcnow(), lastUsed=datetime.utcnow(), isValid=True ) try: db.session.add(new_session) db.session.commit() return jsonify({ "message": "Login successful", "session_key": session_key, "user_id": str(user.id) }), 200 except Exception as e: db.session.rollback() return jsonify({"error": "Login failed, please try again later."}), 500 @session.route('/destroy', methods=['POST']) @auth_required() def logout(): """ Handle user logout by invalidating the session. """ try: data = request.json # Expecting JSON body except: data = {} target_session_key = data.get('session_key',None) target_session = g.current_session if target_session_key is not None: target_session = db.session.execute( select(Session).where(and_( Session.key == target_session_key, Session.isValid == True, Session.userID == g.current_user.id )) ).scalar() if target_session is None: return jsonify({'message': 'The session key is invalid or does not belong to current user.'}), 401 target_session.isValid = False db.session.commit() return jsonify({'message': 'Session invalidated'}), 200