You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
97 lines
3.1 KiB
97 lines
3.1 KiB
from email.policy import default
|
|
from flask import Blueprint, request, jsonify, current_app, g
|
|
from werkzeug.utils import secure_filename
|
|
from datetime import datetime
|
|
from utils.auth import auth_required, requires_role
|
|
from db.model import db
|
|
from db.model import User, Session, UserRole # Adjust based on your model's location
|
|
# from constants import UserRole
|
|
from werkzeug.security import generate_password_hash,check_password_hash
|
|
import uuid
|
|
import os
|
|
from config import *
|
|
from utils.utils import password_check_sanity,is_valid_email,InsecurePasswordException
|
|
from sqlalchemy.exc import IntegrityError
|
|
from sqlalchemy import select, and_
|
|
|
|
session = Blueprint('session', __name__)
|
|
|
|
@session.route('/create', methods=['POST'])
|
|
def login():
|
|
"""
|
|
Handle user login.
|
|
"""
|
|
data = request.form # Expecting JSON body
|
|
|
|
# Extract credentials from request
|
|
# username = data.get('username')
|
|
email = data.get('email')
|
|
password = data.get('password')
|
|
user_agent = request.headers.get('User-Agent', 'Unknown')
|
|
|
|
# Validate required fields
|
|
if not email or not password:
|
|
return jsonify({"error": "email and password are required"}), 400
|
|
|
|
# Find the user by username
|
|
# user = User.query.filter_by(username=username).first()
|
|
user = User.query.filter_by(email=email).first()
|
|
|
|
if not user:
|
|
return jsonify({"error": "Invalid email or password"}), 401
|
|
|
|
# Verify the password
|
|
if not check_password_hash(user.hash_password, password):
|
|
return jsonify({"error": "Invalid email or password"}), 401
|
|
|
|
# Create a new session
|
|
session_key = str(uuid.uuid4()) # Generate a unique session key
|
|
new_session = Session(
|
|
userID=user.id,
|
|
user=user, # Pass the user object here
|
|
key=session_key,
|
|
ua=user_agent,
|
|
creationDate=datetime.utcnow(),
|
|
lastUsed=datetime.utcnow(),
|
|
isValid=True
|
|
)
|
|
|
|
|
|
try:
|
|
db.session.add(new_session)
|
|
db.session.commit()
|
|
return jsonify({
|
|
"message": "Login successful",
|
|
"session_key": session_key,
|
|
"user_id": str(user.id)
|
|
}), 200
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
return jsonify({"error": "Login failed, please try again later."}), 500
|
|
|
|
|
|
@session.route('/destroy', methods=['POST'])
|
|
@auth_required()
|
|
def logout():
|
|
"""
|
|
Handle user logout by invalidating the session.
|
|
"""
|
|
try:
|
|
|
|
data = request.json # Expecting JSON body
|
|
except:
|
|
data = {}
|
|
target_session_key = data.get('session_key',None)
|
|
target_session = g.current_session
|
|
|
|
if target_session_key is not None:
|
|
target_session = db.session.execute(
|
|
select(Session).where(and_(
|
|
Session.key == target_session_key, Session.isValid == True, Session.userID == g.current_user.id
|
|
))
|
|
).scalar()
|
|
if target_session is None:
|
|
return jsonify({'message': 'The session key is invalid or does not belong to current user.'}), 401
|
|
target_session.isValid = False
|
|
db.session.commit()
|
|
return jsonify({'message': 'Session invalidated'}), 200
|
|
|