You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
FreeBug/backend/blueprints/profile/__init__.py

259 lines
10 KiB

from email.policy import default
from flask import Blueprint, request, jsonify, current_app, g,url_for
from sqlalchemy import select
from werkzeug.utils import secure_filename
from datetime import datetime
6 months ago
from utils.auth import auth_required, requires_role
from db.model import db
6 months ago
from db.model import User, Session, UserRole # Adjust based on your model's location
# from constants import UserRole
6 months ago
from werkzeug.security import generate_password_hash,check_password_hash
import uuid
import os
from config import *
from utils.utils import password_check_sanity,is_valid_email,InsecurePasswordException
from sqlalchemy.exc import IntegrityError
# from flask import url_for
profile = Blueprint('profile', __name__)
def allowed_file(filename):
"""Check if the uploaded file has an allowed extension."""
return '.' in filename and filename.rsplit('.', 1)[1].lower() in current_app.config['ALLOWED_EXTENSIONS']
@profile.route('/register', methods=['POST'])
def register():
"""Handle user registration."""
data = request.form
# Extract form data
email = data.get('email')
first_name = data.get('firstName')
last_name = data.get('lastName')
username = data.get('username')
password = data.get('password')
bio = data.get('bio', '')
dob = data.get('dob') # Optional field for date of birth
role = int(data.get('role', UserRole.USER.value)) # Default to 'USER' role if not provided
is_activated = True # New user will be activated initially
profile_picture = request.files.get('profile_picture')
# Validate email
if not is_valid_email(email):
return jsonify({"error": "Invalid email address"}), 400
# Validate password
try:
password_check_sanity(password)
except InsecurePasswordException as e:
return jsonify({"error": str(e)}), 400
# Validate required fields
if not all([email, first_name, last_name, username, password]):
return jsonify({"error": "Missing required fields"}), 400
# Check if the file is allowed and save it
if profile_picture and allowed_file(profile_picture.filename):
filename = secure_filename(profile_picture.filename)
# file_path = os.path.join(USER_UPLOADS_DIR, filename)
profile_picture.save(
os.path.join(USER_UPLOADS_DIR, filename)
)
else:
filename = DEFAULT_PROFILE_FILE # Use a default image if no file is uploaded
# Hash the password
hashed_password = generate_password_hash(password)
# Generate activation key (a UUID for example)
activation_key = str(uuid.uuid4())
# Create a new user
new_user = User(
email=email,
firstName=first_name,
lastName=last_name,
username=username,
hash_password=hashed_password,
bio=bio,
dob=datetime.fromisoformat(dob) if dob else datetime(2002, 1, 1),
pfpFilename=filename,
role=role,
isActivated=is_activated,
activationKey=activation_key,
sessions=[],
user_badges=[],
enrollments=[],
quizzes=[],
quiz_attempts=[],
chats=[],
6 months ago
notifications=[],
publications=[]
)
# Save the user to the database
try:
db.session.add(new_user)
db.session.commit()
return jsonify({"message": "User registered successfully."}), 201
except IntegrityError as e:
db.session.rollback()
return jsonify({"error": "User with this email or username already exists."}), 400
except Exception as e:
db.session.rollback()
return jsonify({"error": "Registration failed, please try again later."}), 500
6 months ago
6 months ago
#make a get request to get json on hello word
@profile.route('/me', methods=['GET', 'PUT'])
6 months ago
@auth_required()
def manage_profile():
"""
Handle GET and PUT requests for the logged-in user's profile.
"""
current_user: User = g.current_user
if g.current_user.role == int(UserRole.ADMIN) and request.form.get('user_id'):
target_user: User = db.session.execute(
select(User).where(User.id == uuid.uuid(request.form.get('user_id')))
).scalar()
if not target_user:
return jsonify({'message': 'User not found'}), 404
else:
current_user = target_user
if request.method == 'GET':
profile_picture = url_for('send_file', filename=current_user.pfpFilename, _external=True)
try:
# Construct the user profile data
profile_data = {
"id": str(current_user.id),
"email": current_user.email,
6 months ago
"firstName": current_user.firstName,
"lastName": current_user.lastName,
"username": current_user.username,
"dob": current_user.dob.isoformat() if current_user.dob else None,
"joined_date": current_user.joinedDate.isoformat(),
"last_online": current_user.lastOnline.isoformat(),
"bio": current_user.bio,
"role": current_user.role,
"pfp_filename": current_user.pfpFilename,
6 months ago
"profile_picture": profile_picture,
}
return jsonify({"profile": profile_data}), 200
except Exception as e:
return jsonify({"error": f"Failed to fetch profile. Error: {str(e)}"}), 500
elif request.method == 'PUT':
# Update the user's profile using form data
try:
6 months ago
first_name = request.form.get('firstName')
last_name = request.form.get('lastName')
username = request.form.get('username')
dob = request.form.get('dob')
bio = request.form.get('bio')
is_activated = request.form.get('isActivated')
if first_name:
current_user.firstName = first_name
if last_name:
current_user.lastName = last_name
if username:
current_user.username = username
if dob:
current_user.dob = dob # Ensure the date format is validated
if bio:
current_user.bio = bio
if is_activated:
current_user.isActivated = bool(int(is_activated))
db.session.commit()
return jsonify({"message": "Profile updated successfully."}), 200
except IntegrityError:
db.session.rollback()
return jsonify({"error": "Username must be unique. Please choose another."}), 400
except Exception as e:
db.session.rollback()
return jsonify({"error": f"Failed to update profile. Error: {str(e)}"}), 500
@profile.route('/update-profile-picture', methods=['PATCH'])
@auth_required()
def update_profile_picture():
"""
Allow the logged-in user to change their profile picture.
"""
user:User = g.current_user
# Check if a file is attached
if 'profile_picture' not in request.files:
return jsonify({"error": "No file uploaded"}), 400
file = request.files['profile_picture']
# Validate file type
if file.filename == '':
return jsonify({"error": "No selected file"}), 400
if not allowed_file(file.filename):
return jsonify({"error": "Invalid file type"}), 400
# Secure the filename and save the new file
filename = secure_filename(f"user_{user.id}_{file.filename}")
new_filepath = os.path.join(USER_UPLOADS_DIR, filename)
file.save(new_filepath)
# Delete the old profile picture (if it's not the default)
if user.pfpFilename != DEFAULT_PROFILE_FILE:
old_filepath = os.path.join(USER_UPLOADS_DIR, user.pfpFilename)
if os.path.exists(old_filepath):
os.remove(old_filepath)
# Update the user's profile picture
user.pfpFilename = filename
db.session.commit()
# Generate the new profile URL
profile_url = url_for('send_file',filename=user.pfpFilename,_external=True)
return jsonify({"message": "Profile picture updated successfully", "profile_url": profile_url}), 200
def allowed_file(filename):
"""
Validate file extensions.
"""
allowed_extensions = {'png', 'jpg', 'jpeg', 'gif'}
return '.' in filename and filename.rsplit('.', 1)[1].lower() in allowed_extensions
@profile.route('/change-password', methods=['POST'])
@auth_required()
def change_password():
"""
Allow the logged-in user to change their password.
The user must provide the current password, new password, and confirm the new password.
"""
user = g.current_user
data = request.form
if g.current_user.role == int(UserRole.ADMIN) and request.form.get('user_id'):
new_password = data.get('new_password')
target_user: User = db.session.execute(
select(User).where(User.id == uuid.uuid(request.form.get('user_id')))
).scalar()
if not target_user:
return jsonify({'message': 'User not found'}), 404
target_user.hash_password = generate_password_hash(new_password)
db.session.commit()
return jsonify({'message': 'Password changed successfully'}), 200
# Validate input data
current_password = data.get('current_password')
new_password = data.get('new_password')
confirm_password = data.get('confirm_password')
if not current_password or not new_password or not confirm_password:
return jsonify({"error": "All fields (current_password, new_password, confirm_password) are required"}), 400
# Check if current password matches the user's existing password
if not check_password_hash(user.hash_password, current_password):
return jsonify({"error": "Current password is incorrect"}), 400
# Check if new password and confirmation match
if new_password != confirm_password:
return jsonify({"error": "New password and confirm password do not match"}), 400
# Check for password complexity (optional)
# Validate password
try:
password_check_sanity(new_password)
except InsecurePasswordException as e:
return jsonify({"error": str(e)}), 400
# Update the user's password
user.hash_password = generate_password_hash(new_password)
db.session.commit()
return jsonify({"message": "Password updated successfully"}), 200