from email.policy import default from flask import Blueprint, request, jsonify, current_app, g,url_for from werkzeug.utils import secure_filename from datetime import datetime from utils.auth import auth_required, requires_role from db.model import db from db.model import User, Session, UserRole # Adjust based on your model's location # from constants import UserRole from werkzeug.security import generate_password_hash,check_password_hash import uuid import os from config import * from utils.utils import password_check_sanity,is_valid_email,InsecurePasswordException from sqlalchemy.exc import IntegrityError # from flask import url_for profile = Blueprint('profile', __name__) # Function to check allowed file extensions def allowed_file(filename): """Check if the uploaded file has an allowed extension.""" return '.' in filename and filename.rsplit('.', 1)[1].lower() in current_app.config['ALLOWED_EXTENSIONS'] @profile.route('/register', methods=['POST']) def register(): """Handle user registration.""" data = request.form # Extract form data email = data.get('email') first_name = data.get('firstName') last_name = data.get('lastName') username = data.get('username') password = data.get('password') bio = data.get('bio', '') dob = data.get('dob') # Optional field for date of birth role = int(data.get('role', UserRole.USER.value)) # Default to 'USER' role if not provided is_activated = True # New user will be activated initially profile_picture = request.files.get('profile_picture') # Validate email if not is_valid_email(email): return jsonify({"error": "Invalid email address"}), 400 # Validate password try: password_check_sanity(password) except InsecurePasswordException as e: return jsonify({"error": str(e)}), 400 # Validate required fields if not all([email, first_name, last_name, username, password]): return jsonify({"error": "Missing required fields"}), 400 # Check if the file is allowed and save it if profile_picture and allowed_file(profile_picture.filename): filename = secure_filename(profile_picture.filename) # file_path = os.path.join(USER_UPLOADS_DIR, filename) profile_picture.save( os.path.join(USER_UPLOADS_DIR, filename) ) else: filename = DEFAULT_PROFILE_FILE # Use a default image if no file is uploaded # Hash the password hashed_password = generate_password_hash(password) # Generate activation key (a UUID for example) activation_key = str(uuid.uuid4()) # Create a new user new_user = User( email=email, firstName=first_name, lastName=last_name, username=username, hash_password=hashed_password, bio=bio, dob=datetime.fromisoformat(dob) if dob else datetime(2002, 1, 1), pfpFilename=filename, role=role, isActivated=is_activated, activationKey=activation_key, sessions=[], user_badges=[], enrollments=[], quizzes=[], quiz_attempts=[], chats=[], notifications=[], publications=[] ) # Save the user to the database try: db.session.add(new_user) db.session.commit() return jsonify({"message": "User registered successfully."}), 201 except IntegrityError as e: db.session.rollback() return jsonify({"error": "User with this email or username already exists."}), 400 except Exception as e: db.session.rollback() return jsonify({"error": "Registration failed, please try again later."}), 500 #make a get request to get json on hello word @profile.route('/me', methods=['GET']) @auth_required() def get_profile(): """ Fetch the profile details of the currently logged-in user. """ # Get the current user from the `@auth_required` decorator current_user: User = g.current_user try: profile_url = url_for('send_file', filename=current_user.pfpFilename, _external=True) except: profile_url = "" try: # Construct the user profile data profile_data = { "id": str(current_user.id), "email": current_user.email, "first_name": current_user.firstName, "last_name": current_user.lastName, "username": current_user.username, "dob": current_user.dob.isoformat(), "joined_date": current_user.joinedDate.isoformat(), "last_online": current_user.lastOnline.isoformat(), "bio": current_user.bio, "role": current_user.role, "pfp_filename": current_user.pfpFilename, "profile_url":profile_url } return jsonify({"profile": profile_data}), 200 except Exception as e: return jsonify({"error": f"Failed to fetch profile. Error: {str(e)}"}), 500 @profile.route('/update-profile-picture', methods=['PATCH']) @auth_required() def update_profile_picture(): """ Allow the logged-in user to change their profile picture. """ user:User = g.current_user # Check if a file is attached if 'profile_picture' not in request.files: return jsonify({"error": "No file uploaded"}), 400 file = request.files['profile_picture'] # Validate file type if file.filename == '': return jsonify({"error": "No selected file"}), 400 if not allowed_file(file.filename): return jsonify({"error": "Invalid file type"}), 400 # Secure the filename and save the new file filename = secure_filename(f"user_{user.id}_{file.filename}") new_filepath = os.path.join(USER_UPLOADS_DIR, filename) file.save(new_filepath) # Delete the old profile picture (if it's not the default) if user.pfpFilename != DEFAULT_PROFILE_FILE: old_filepath = os.path.join(USER_UPLOADS_DIR, user.pfpFilename) if os.path.exists(old_filepath): os.remove(old_filepath) # Update the user's profile picture user.pfpFilename = filename db.session.commit() # Generate the new profile URL profile_url = url_for('send_file',filename=user.pfpFilename,_external=True) return jsonify({"message": "Profile picture updated successfully", "profile_url": profile_url}), 200 def allowed_file(filename): """ Validate file extensions. """ allowed_extensions = {'png', 'jpg', 'jpeg', 'gif'} return '.' in filename and filename.rsplit('.', 1)[1].lower() in allowed_extensions @profile.route('/change-password', methods=['POST']) @auth_required() def change_password(): """ Allow the logged-in user to change their password. The user must provide the current password, new password, and confirm the new password. """ user = g.current_user data = request.form # Validate input data current_password = data.get('current_password') new_password = data.get('new_password') confirm_password = data.get('confirm_password') if not current_password or not new_password or not confirm_password: return jsonify({"error": "All fields (current_password, new_password, confirm_password) are required"}), 400 # Check if current password matches the user's existing password if not check_password_hash(user.hash_password, current_password): return jsonify({"error": "Current password is incorrect"}), 400 # Check if new password and confirmation match if new_password != confirm_password: return jsonify({"error": "New password and confirm password do not match"}), 400 # Check for password complexity (optional) # Validate password try: password_check_sanity(new_password) except InsecurePasswordException as e: return jsonify({"error": str(e)}), 400 # Update the user's password user.hash_password = generate_password_hash(new_password) db.session.commit() return jsonify({"message": "Password updated successfully"}), 200