ayush 6 months ago
commit 716e5cffb4
  1. 6
      backend/app.py
  2. 235
      backend/blueprints/chat/__init__.py
  3. 333
      backend/blueprints/course/__init__.py
  4. 58
      backend/blueprints/profile/__init__.py
  5. 71
      backend/blueprints/public/__init__.py
  6. 3
      backend/config.py
  7. 7
      backend/db/model.py
  8. BIN
      backend/uploads/JRyiqfKMGnWcunvfcosPJxgBGQTjasLh.jpg
  9. BIN
      backend/uploads/izvjgAZCUwVlTZpoMdoFUnMoMoPNDkPD.pdf
  10. BIN
      backend/uploads/learn-with-us.jpg
  11. BIN
      backend/uploads/user_3e7bb78f-a0e8-42f2-bd9f-6792369e3e64_meme.jpg
  12. BIN
      backend/uploads/user_f7ab56f1-d692-409e-98ab-2a563f37e389_1638856701024.jpg

@ -18,6 +18,8 @@ from blueprints.profile import profile as profileBlueprint
from blueprints.session import session as sessionBlueprint
from blueprints.admin import admin as adminBlueprint
from blueprints.chat import chat as chatBlueprint
from blueprints.public import public_summary as publicBlueprint
from blueprints.course import course as courseBlueprint
app = Flask(__name__)
@ -28,7 +30,7 @@ CORS(app)
# r"/api/*": {"origins": "*"} # Allows CORS for all `/api/` routes
# })
# Set configuration directly on the app instance
app.config['ALLOWED_EXTENSIONS'] = {'png', 'jpg', 'jpeg', 'gif'}
app.config['ALLOWED_EXTENSIONS'] = {'png', 'jpg', 'jpeg', 'gif','pdf'}
app.config["SQLALCHEMY_DATABASE_URI"] = DB_URI
@ -38,6 +40,8 @@ app.register_blueprint(profileBlueprint, url_prefix='/api/profile')
app.register_blueprint(sessionBlueprint,url_prefix='/api/session')
app.register_blueprint(adminBlueprint,url_prefix='/api/admin')
app.register_blueprint(chatBlueprint,url_prefix='/api/chat')
app.register_blueprint(publicBlueprint,url_prefix='/api/public')
app.register_blueprint(courseBlueprint,url_prefix='/api/course')
@app.route('/media/<string:filename>')
def send_file(filename):

@ -1,122 +1,51 @@
import uuid
from flask import Blueprint, request, jsonify,g
from uuid import UUID
from db.model import db, User, Course, Enrollment,Chat
from utils.auth import auth_required
import requests
from sqlalchemy import desc
from config import SPAM_SCORE_THRESHOLD, AI_SPAM_SERVICES_MICROSERVICE
from sqlalchemy import desc, select, and_
chat = Blueprint('chat', __name__)
SPAM_DETECTION_URL = "http://localhost:5000/test-spam"
@chat.route('/course/<uuid:course_id>/users', methods=['POST'])
def get_users_assigned_to_course(course_id: UUID):
"""
Fetch all users assigned to a specific course.
:param course_id: ID of the course to fetch users for (UUID).
:return: JSON response with users assigned to the course.
"""
try:
# Query the course to ensure it exists
course = Course.query.get(course_id)
if not course:
return jsonify({"error": "Course not found."}), 404
# Get the list of users assigned to the course
users = User.query.filter(User.enrollments.any(course_id=course_id)).all()
# Prepare the response data
user_data = [
{
"id": user.id,
"email": user.email,
"username": user.username,
"firstName": user.firstName,
"lastName": user.lastName,
}
for user in users
]
return jsonify({"course": course.name, "users": user_data}), 200
except Exception as e:
return jsonify({"error": f"An error occurred: {str(e)}"}), 500
@chat.route('/course/<uuid:course_id>/chat', methods=['POST'])
def join_chat(course_id: UUID):
"""
Allow users to join the chat only if they are enrolled in the course.
:param course_id: ID of the course (UUID).
:return: JSON response indicating whether the user can join or not.
"""
try:
# Get the user from the request (assume user_id is passed in the request body)
user_id = request.json.get('user_id')
if not user_id:
return jsonify({"error": "User ID is required."}), 400
# Query the user and ensure they exist
user = User.query.get(user_id)
if not user:
return jsonify({"error": "User not found."}), 404
# Check if the user is enrolled in the course
enrollment = Enrollment.query.filter_by(userID=user_id, courseID=course_id).first()
if not enrollment:
return jsonify({"error": "User is not enrolled in this course."}), 403
# If enrolled, allow the user to join the chat
return jsonify({"message": f"User {user.username} is enrolled in the course and can join the chat."}), 200
except Exception as e:
return jsonify({"error": f"An error occurred: {str(e)}"}), 500
@chat.route("/send-message", methods=["POST"])
@chat.route("/send", methods=["POST"])
@auth_required()
def create_chat():
"""
Create a chat message for a specific course.
"""
current_user = g.current_user # Fetch the logged-in user
data = request.get_json()
# Validate the payload
course_id = data.get("course_id")
message = data.get("message")
if not course_id or not message:
return jsonify({"error": "Course ID and message are required."}), 400
current_user: User = g.current_user # Fetch the logged-in user
data = request.form
if not data.get("course_id"):
return jsonify({"error": "Unknown Course"}), 400
if not data.get("message", "").strip():
return jsonify({'message': 'Silently rejected blank message'}), 200
try:
# Call the spam detection service
spam_response = requests.post(SPAM_DETECTION_URL, json={"test_message": message})
# Check if the user is enrolled in the course
enrollment_record: Enrollment = db.session.execute(
select(Enrollment).where(and_(
Enrollment.courseID == data.get("course_id"),
Enrollment.userID == current_user.id)
)
).scalar()
if not enrollment_record:
return jsonify({"error": "You are not enrolled in this course."}), 403
if enrollment_record.currentPage < enrollment_record.course.pageForCommunity:
return jsonify({
'message': 'You have not met the requirements to enter the chat.'
}), 403
# Create and save the chat message
spam_response = requests.post(AI_SPAM_SERVICES_MICROSERVICE, json={"test_message": data.get("message").strip()})
if spam_response.status_code != 200:
return jsonify({"error": "Failed to check message for spam."}), 500
spam_score = int(spam_response.json().get("spam_score", 0))
if spam_score > 6:
if spam_score > SPAM_SCORE_THRESHOLD:
return jsonify({"error": "This message contains suspicious links or is vulnerable."}), 400
# Verify the course exists
course = db.session.query(Course).filter_by(id=course_id).first()
if not course:
return jsonify({"error": "Invalid course ID."}), 404
# Check if the user is enrolled in the course
is_enrolled = db.session.query(Course).join(Enrollment).filter(
Enrollment.userID == current_user.id,
Enrollment.courseID == course_id
).first()
if not is_enrolled:
return jsonify({"error": "You are not enrolled in this course."}), 403
# Create and save the chat message
new_chat = Chat(
textContent=message,
textContent=data.get("message").strip(),
userID=current_user.id,
courseID=course_id,
user=g.current_user,
course=enrollment_record.course,
courseID=data.get("course_id"),
)
db.session.add(new_chat)
db.session.commit()
@ -128,56 +57,58 @@ def create_chat():
@chat.route("/get", methods=["GET"])
@auth_required()
def get_chat_history():
"""
Fetch chat history for a course.
"""
current_user = g.current_user # Logged-in user
course_id = request.args.get("course_id")
limit = int(request.args.get("limit", 20)) # Default to 20 messages
offset = int(request.args.get("offset", 0)) # Default to no offset (latest chats)
if not course_id:
return jsonify({"error": "Course ID is required."}), 400
def get_messages():
try:
# Verify the course exists
course = db.session.query(Course).filter_by(id=course_id).first()
if not course:
return jsonify({"error": "Invalid course ID."}), 404
# Check if the user is enrolled in the course
is_enrolled = db.session.query(Course).join(Enrollment).filter(
Enrollment.userID == current_user.id,
Enrollment.courseID == course_id
).first()
if not is_enrolled:
return jsonify({"error": "You are not enrolled in this course."}), 403
# Fetch the latest chat messages with limit and offset
chats = (
db.session.query(Chat)
.filter_by(courseID=course_id)
.order_by(desc(Chat.chatDate))
.offset(offset)
.limit(limit)
.all()
course_id: uuid.UUID = uuid.UUID(request.form.get('course_id'))
current_user: User = g.current_user
limit = int(request.form.get('limit', 10))
before_id = request.form.get('before')
after_id = request.form.get('after')
# Verify user's enrollment
enrollment = db.session.execute(
select(Enrollment).where(
and_(Enrollment.courseID == course_id, Enrollment.userID == current_user.id)
)
# Format the chat messages
chat_history = [
{
"id": str(chat.id),
"textContent": chat.textContent,
"userID": str(chat.userID),
"chatDate": chat.chatDate.isoformat(),
"is_mine": chat.userID == current_user.id,
}
for chat in chats
]
return jsonify({"chats": chat_history}), 200
).scalar()
if not enrollment:
return jsonify({"error": "You are not enrolled in this course."}), 403
query = select(Chat).where(Chat.courseID == course_id)
if before_id:
try:
reference_message: Chat = db.session.execute(
select(Chat).where(Chat.id == uuid.UUID(before_id))
).scalar()
if not reference_message:
return jsonify({'message': 'Reference message not found'}), 404
query = query.order_by(Chat.chatDate.desc()).where(Chat.chatDate < reference_message.chatDate)
except ValueError:
return jsonify({'message': 'Invalid message ID format'}), 400
elif after_id:
try:
reference_message = db.session.execute(
select(Chat).where(Chat.id == uuid.UUID(after_id))
).scalar()
if not reference_message:
return jsonify({'message': 'Reference message not found'}), 404
query = query.order_by(Chat.chatDate.asc()).where(Chat.chatDate > reference_message.chatDate)
except ValueError:
return jsonify({'message': 'Invalid message ID format'}), 400
else:
query = query.order_by(Chat.chatDate.desc())
query = query.limit(limit)
messages = db.session.execute(query).scalars()
chat_messages = [{
'id': str(msg.id),
'text': msg.textContent,
'userId': str(msg.userID),
'username': msg.user.username,
'timestamp': msg.chatDate.isoformat(),
'isSelf': int(msg.userID == g.current_user.id)
} for msg in messages]
return jsonify({
'messages': chat_messages,
'count': len(chat_messages),
}), 200
except Exception as e:
return jsonify({"error": f"An error occurred: {str(e)}"}), 500
return jsonify({'message': f'An error occurred: {str(e)}'}), 500

@ -1,19 +1,121 @@
from flask import Blueprint, request, jsonify, g
from sqlalchemy import select, and_
from flask import Blueprint, request, jsonify, g, url_for
from sqlalchemy import select, and_, func, distinct, or_
from sqlalchemy.exc import IntegrityError
from werkzeug.datastructures import MultiDict
import os
import uuid
import math
from config import DEFAULT_COURSE_COVER
from db.model import db, Course, Category, User, Chat
from db.model import db, Course, Category, User, Chat, Enrollment
from utils.utils import random_string_generator
from utils.auth import auth_required, requires_role
from constants import *
from config import *
from constants import PublishedStatus
from typing import Union
from db.model import UserRole
course = Blueprint('course', __name__)
@course.route('/create', methods=['POST'])
@course.route('/listAll')
def list_all_courses():
limit: int = int(request.args.get('limit', 10))
offset: int = int(request.args.get('offset', 0))
category_uuid: str = request.args.get('category_uuid')
search_q: str = request.args.get('search_q', '').strip()
sort_by: str = request.args.get('sort_by', '').strip()
show_pending: bool = bool(int(request.args.get('show_pending', 0)))
available_sorts = ['date_asc', 'date_desc', 'name_asc', 'name_desc', 'students_desc', 'students_asc']
if category_uuid is not None:
category_uuid: uuid.UUID = uuid.UUID(request.args.get('category_uuid'))
# Build the query as required
query: select = select(Course)
if search_q != '':
query = query.where(or_(Course.name.like(f'%{search_q}%'), Course.description.like(f'%{search_q}%'),
User.firstName.like(f'%{search_q}%')))
if category_uuid is not None:
query = query.where(Course.categoryID == category_uuid)
if g.get('is_authed'):
if show_pending and g.current_user.role == int(UserRole.ADMIN):
query = query.where(Course.publishedStatus == int(PublishedStatus.PENDING))
else:
query = query.where(Course.publishedStatus == int(PublishedStatus.APPROVED))
#total_pages_for_offset: int = db.session.execute(func.count(Course.id).select_from(Course)).scalar()/limit
total_pages_for_offset: int = db.session.execute(
select(func.count()).select_from(query.subquery())
).scalar() / limit
query = query.limit(limit).offset(offset)
total_pages: int = math.ceil(total_pages_for_offset)
if sort_by in available_sorts:
if sort_by == 'date_asc':
query = query.order_by(Course.creationDate.asc())
elif sort_by == 'date_desc':
query = query.order_by(Course.creationDate.desc())
elif sort_by == 'name_asc':
query = query.order_by(Course.name.asc())
elif sort_by == 'name_desc':
query = query.order_by(Course.name.desc())
elif sort_by == 'students_desc':
query = query.order_by(Course.totalEnrolled.desc())
elif sort_by == 'students_asc':
query = query.order_by(Course.totalEnrolled.asc())
courses: list[Course] = db.session.execute(query).scalars()
course_list: list[dict] = []
for item in courses:
course_list.append(
{
'id': item.id,
'name': item.name,
'description': item.description,
'isActive': item.isActive,
'creationDate': item.creationDate,
'coverImage': url_for('send_file', filename=item.coverImage),
'totalEnrolled': item.totalEnrolled,
'author': {
'id': item.author.id,
'firstName': item.author.firstName,
'lastName': item.author.lastName,
'username': item.author.username,
'bio': item.author.bio,
'lastOnline': item.author.lastOnline,
'pfpFilename': url_for('send_file', filename=item.author.pfpFilename)
},
'category': {
'id': item.categoryID,
'name': item.category.name,
'description': item.category.description
}
})
return jsonify({
'total_pages': total_pages,
'current_offset': offset,
'limit': limit,
'data': course_list,
})
@course.route('/enroll')
@auth_required()
def enroll_user():
if not request.form.get('course_uuid'):
return jsonify({'message': 'Missing required parameter "course_uuid" '}), 400
course_uuid: uuid.UUID = uuid.UUID(request.form.get('course_uuid'))
selected_course: Course = db.session.execute(select(Course).where(Course.id == course_uuid)).scalar()
if not selected_course:
return jsonify({'message': 'Course not found'}), 404
new_enroll: Enrollment = Enrollment(
userID=g.current_user.id,
courseID=course_uuid
)
try:
selected_course.totalEnrolled = selected_course.totalEnrolled + 1
db.session.add(new_enroll)
db.session.commit()
except IntegrityError:
return jsonify({'message': 'Already enrolled to this course'})
return jsonify({'message': 'Enrollment successful'}), 200
@course.route('/createCourse', methods=['POST'])
@auth_required()
def create_course():
form_data: dict = request.form
@ -22,27 +124,30 @@ def create_course():
cover_file_name: str = DEFAULT_COURSE_COVER
pdf_file_name: str = ''
if course_uploaded_cover_image is not None:
cover_file_name: str = random_string_generator(32)+course_uploaded_cover_image.filename.split('.')[-1]
cover_file_name: str = random_string_generator(32)+"."+course_uploaded_cover_image.filename.split('.')[-1]
course_uploaded_cover_image.save(os.path.join(USER_UPLOADS_DIR, cover_file_name))
if course_uploaded_pdf is not None:
pdf_file_name: str = random_string_generator(32) + course_uploaded_pdf.filename.split('.')[-1]
pdf_file_name: str = random_string_generator(32) +"."+ course_uploaded_pdf.filename.split('.')[-1]
course_uploaded_pdf.save(os.path.join(USER_UPLOADS_DIR, pdf_file_name))
published_status: PublishedStatus = PublishedStatus.DRAFT
published_status: PublishedStatus = PublishedStatus.PENDING
try:
course_name: str = form_data['course_name']
except KeyError:
return jsonify({'message': 'Course name cannot be empty'}), 401
course_description: str = form_data.get('course_description', '')
category_id: uuid.UUID = uuid.UUID(form_data['category_uuid'])
pages_required_for_community: int = int(form_data['community_unlock_at_pages']) # TODO: Add this field to model
page_for_community: int = int(form_data.get('page_for_community', 1)) # TODO: Add this field to model
catgory: Category = db.session.execute(select(Category).where(Category.id == category_id)).scalar()
# author: User = db.session.execute(select(User).where(User.id == g.current_user.id)).scalar()
new_course: Course = Course(
name=course_name,
categoryID=category_id,
authorID=g.current_user.id,
category=catgory,
author=g.current_user,
description=course_description,
isActive=True,
pageForCommunity=page_for_community,
publishedStatus=int(published_status),
coverImage=cover_file_name,
serverFilename=pdf_file_name,
@ -52,7 +157,7 @@ def create_course():
)
# chat: Chat = Chat(courseID=new_course.id) TODO: Add a welcome chat for this course
db.session.add_all(new_course)
db.session.add(new_course)
db.session.commit()
return jsonify({'message': 'Course was created successfully.'}), 200
@ -124,4 +229,210 @@ def update_course():
@course.route('/info/<string:course_uuid>')
def course_info(course_uuid):
course_uuid: uuid.UUID = uuid.UUID(course_uuid)
selected_course: Course = db.session.execute(select(Course).where(and_(Course.id == course_uuid))).scalar()
if not selected_course:
return jsonify({'message': 'The course does not exist'}), 404
# Only allow owner or admin to query info for course that is not published or is pending
if not selected_course.isActive or selected_course.publishedStatus != int(PublishedStatus.APPROVED):
if g.get("is_authed"):
if g.current_user.role == int(UserRole.ADMIN) or g.current_user.id == selected_course.authorID:
pass
else:
return jsonify({'message': 'The course does not exist.'}), 404
self_enrollment_record: Union[None, Enrollment] = None
self_enrollment_data: dict = {}
if g.get("is_authed"):
self_enrollment_record: Enrollment = db.session.execute(
select(Enrollment).where(
and_(
Enrollment.courseID == selected_course.id, Enrollment.userID == g.current_user.id
)
)
)
if self_enrollment_record:
self_enrollment_data = {
'lastActivity': self_enrollment_record.lastActivity,
'currentPage': self_enrollment_record.currentPage,
'maxPage': self_enrollment_record.maxPage,
'joinedDate': self_enrollment_record.joinedDate,
'userID': self_enrollment_record.userID
}
# Get total enrolled user and total unique user chatting about the course and put it in dict
summary_user: dict = {
'totalEnrolled': db.session.execute(
select(func.count(Enrollment.id)).where(Enrollment.courseID == course_uuid)
).scalar(),
'usersInChat': db.session.execute(
select(func.count(distinct(Chat.userID))).select_from(Chat).where(Chat.courseID == course_uuid)
).scalar(),
'totalChats': db.session.execute(
select(func.count()).select_from(Chat).where(Chat.courseID == course_uuid)
).scalar()
}
jsonify({
'message': 'successful',
'data': {
'id': selected_course.id,
'courseName': selected_course.name,
'courseDescription': selected_course.description,
'isActive': selected_course.isActive,
'publishedStatus': selected_course.publishedStatus,
'creationDate': selected_course.creationDate, # TODO: Format to particular structure
'coverImage': url_for('send_file', filename=selected_course.coverImage),
'serverFilename': url_for('send_file', filename=selected_course.serverFilename),
'totalPages': 100,
'author': {
'id': selected_course.authorID,
'username': selected_course.author.username,
'firstName': selected_course.author.firstName,
'lastName': selected_course.author.lastName,
'pfpFilename': url_for('send_file', filename=selected_course.author.pfpFilename),
'bio': selected_course.author.bio
},
'selfEnrollment': {
'isEnrolled': self_enrollment_record is not None,
'data': self_enrollment_data
},
'enrollmentSummary': summary_user
}
}), 200
@course.route('/getCategories', methods=['GET'])
def get_categories():
categories: list[Category] = db.session.execute(select(Category)).scalars()
cat_list: list[dict] = []
for category in categories:
cat_list.append(
{
'id': category.id,
'name': category.name,
'description': category.description,
'isActive': category.isActive,
'creationDate': category.creationDate
}
)
return jsonify(cat_list), 200
@course.route('/createCategory', methods=['Post'])
@auth_required()
@requires_role([UserRole.ADMIN])
def create_category():
try:
new_cat: Category = Category(
name=request.form['name'],
description=request.form.get('description'),
isActive=bool(int(request.form.get('isActive'))),
courses=[]
)
except KeyError:
return jsonify({'message': 'Missing required parameter "name" '}), 400
db.session.add(new_cat)
db.session.commit()
return jsonify({'message': 'Category created'}), 201
@course.route('/updateCategory', methods=['POST', 'DELETE'])
@auth_required()
@requires_role([UserRole.ADMIN])
def update_category():
form_data: dict = request.form
try:
category_id: uuid.UUID = uuid.UUID(form_data['category_id'])
except KeyError:
return jsonify({'message': 'Missing required parameter "category_id" '}), 400
selected_category: Category = db.session.execute(select(Category).where(Category.id == category_id)).scalar()
if not selected_category:
return jsonify({'message': 'Category not found'}), 404
if request.method == 'DELETE':
db.session.delete(selected_category)
db.session.commit()
return jsonify({'message': 'Category deleted'}), 200
else:
if form_data.get('name'):
selected_category.name = form_data.get('name')
if form_data.get('description'):
selected_category.description = form_data.get('description')
if form_data.get('isActive'):
selected_category.isActive = bool(int(form_data.get('isActive')))
db.session.commit()
return jsonify({'message': 'Category updated'}), 200
@course.route('/enrolled')
@auth_required()
def enrolled_courses():
enrollments: Course = db.session.execute(
select(Enrollment).where(
and_(Enrollment.userID == g.current_user.id, Course.publishedStatus == int(PublishedStatus.APPROVED))
)
).scalars()
enrolled_list: list[dict] = []
for enroll_row in enrollments:
item = enroll_row.course
enrolled_list.append(
{
'id': enroll_row.id,
'course_id': enroll_row.courseID,
'lastActivity': enroll_row.lastActivity,
'currentPage': enroll_row.currentPage,
'maxPage': enroll_row.maxPage,
'joinedDate': enroll_row.joinedDate,
'userID': enroll_row.userID,
'name': item.name,
'description': item.description,
'isActive': item.isActive,
'creationDate': item.creationDate,
'coverImage': url_for('send_file', filename=item.coverImage),
'totalEnrolled': item.totalEnrolled,
'author': {
'id': item.author.id,
'firstName': item.author.firstName,
'lastName': item.author.lastName,
'username': item.author.username,
'bio': item.author.bio,
'lastOnline': item.author.lastOnline,
'pfpFilename': url_for('send_file', filename=item.author.pfpFilename)
},
'category': {
'id': item.categoryID,
'name': item.category.name,
'description': item.category.description
}
})
return jsonify(enrolled_list), 200
@course.route('/myCourses')
@auth_required()
def enrolled_courses():
courses: Course = db.session.execute(select(Course).where(Course.authorID == g.current_user.id)
).scalars()
course_list: list[dict] = []
for item in courses:
course_list.append(
{
'id': item.id,
'name': item.name,
'description': item.description,
'isActive': item.isActive,
'publishedStatus': item.publishedStatus,
'creationDate': item.creationDate,
'coverImage': url_for('send_file', filename=item.coverImage),
'totalEnrolled': item.totalEnrolled,
'author': {
'id': item.author.id,
'firstName': item.author.firstName,
'lastName': item.author.lastName,
'username': item.author.username,
'bio': item.author.bio,
'lastOnline': item.author.lastOnline,
'pfpFilename': url_for('send_file', filename=item.author.pfpFilename)
},
'category': {
'id': item.categoryID,
'name': item.category.name,
'description': item.category.description
}
})
return jsonify(course_list), 200

@ -1,5 +1,6 @@
from email.policy import default
from flask import Blueprint, request, jsonify, current_app, g,url_for
from sqlalchemy import select
from werkzeug.utils import secure_filename
from datetime import datetime
from utils.auth import auth_required, requires_role
@ -15,7 +16,6 @@ from sqlalchemy.exc import IntegrityError
# from flask import url_for
profile = Blueprint('profile', __name__)
# Function to check allowed file extensions
def allowed_file(filename):
"""Check if the uploaded file has an allowed extension."""
return '.' in filename and filename.rsplit('.', 1)[1].lower() in current_app.config['ALLOWED_EXTENSIONS']
@ -110,13 +110,16 @@ def manage_profile():
Handle GET and PUT requests for the logged-in user's profile.
"""
current_user: User = g.current_user
if g.current_user.role == int(UserRole.ADMIN) and request.form.get('user_id'):
target_user: User = db.session.execute(
select(User).where(User.id == uuid.uuid(request.form.get('user_id')))
).scalar()
if not target_user:
return jsonify({'message': 'User not found'}), 404
else:
current_user = target_user
if request.method == 'GET':
try:
profile_picture = url_for('send_file', filename=current_user.pfpFilename, _external=True)
except:
profile_picture = ""
try:
# Construct the user profile data
profile_data = {
@ -133,24 +136,18 @@ def manage_profile():
"pfp_filename": current_user.pfpFilename,
"profile_picture": profile_picture,
}
return jsonify({"profile": profile_data}), 200
except Exception as e:
return jsonify({"error": f"Failed to fetch profile. Error: {str(e)}"}), 500
elif request.method == 'PUT':
# Update the user's profile using form data
try:
# email = request.form.get('email')
first_name = request.form.get('first_name')
last_name = request.form.get('last_name')
first_name = request.form.get('firstName')
last_name = request.form.get('lastName')
username = request.form.get('username')
dob = request.form.get('dob')
bio = request.form.get('bio')
# Update fields if provided
# if email:
# current_user.email = email
is_activated = request.form.get('isActivated')
if first_name:
current_user.firstName = first_name
if last_name:
@ -161,10 +158,9 @@ def manage_profile():
current_user.dob = dob # Ensure the date format is validated
if bio:
current_user.bio = bio
# Commit changes to the database
if is_activated:
current_user.isActivated = bool(int(is_activated))
db.session.commit()
return jsonify({"message": "Profile updated successfully."}), 200
except IntegrityError:
db.session.rollback()
@ -174,7 +170,6 @@ def manage_profile():
return jsonify({"error": f"Failed to update profile. Error: {str(e)}"}), 500
@profile.route('/update-profile-picture', methods=['PATCH'])
@auth_required()
def update_profile_picture():
@ -194,22 +189,18 @@ def update_profile_picture():
return jsonify({"error": "No selected file"}), 400
if not allowed_file(file.filename):
return jsonify({"error": "Invalid file type"}), 400
# Secure the filename and save the new file
filename = secure_filename(f"user_{user.id}_{file.filename}")
new_filepath = os.path.join(USER_UPLOADS_DIR, filename)
file.save(new_filepath)
# Delete the old profile picture (if it's not the default)
if user.pfpFilename != DEFAULT_PROFILE_FILE:
old_filepath = os.path.join(USER_UPLOADS_DIR, user.pfpFilename)
if os.path.exists(old_filepath):
os.remove(old_filepath)
# Update the user's profile picture
user.pfpFilename = filename
db.session.commit()
# Generate the new profile URL
profile_url = url_for('send_file',filename=user.pfpFilename,_external=True)
@ -233,36 +224,35 @@ def change_password():
user = g.current_user
data = request.form
if g.current_user.role == int(UserRole.ADMIN) and request.form.get('user_id'):
new_password = data.get('new_password')
target_user: User = db.session.execute(
select(User).where(User.id == uuid.uuid(request.form.get('user_id')))
).scalar()
if not target_user:
return jsonify({'message': 'User not found'}), 404
target_user.hash_password = generate_password_hash(new_password)
db.session.commit()
return jsonify({'message': 'Password changed successfully'}), 200
# Validate input data
current_password = data.get('current_password')
new_password = data.get('new_password')
confirm_password = data.get('confirm_password')
if not current_password or not new_password or not confirm_password:
return jsonify({"error": "All fields (current_password, new_password, confirm_password) are required"}), 400
# Check if current password matches the user's existing password
if not check_password_hash(user.hash_password, current_password):
return jsonify({"error": "Current password is incorrect"}), 400
# Check if new password and confirmation match
if new_password != confirm_password:
return jsonify({"error": "New password and confirm password do not match"}), 400
# Check for password complexity (optional)
# Validate password
try:
password_check_sanity(new_password)
except InsecurePasswordException as e:
return jsonify({"error": str(e)}), 400
# Update the user's password
user.hash_password = generate_password_hash(new_password)
db.session.commit()
return jsonify({"message": "Password updated successfully"}), 200
# @profile.route('/hello')
# @auth_required()
# @requires_role([UserRole.ADMIN])

@ -0,0 +1,71 @@
from flask import Blueprint, jsonify
from db.model import User, Course, db
from sqlalchemy import select, func
public_summary = Blueprint('public', __name__)
@public_summary.route('/stats/total-users', methods=['GET'])
def get_total_users():
"""
Fetch total user count.
"""
try:
total_users = db.session.execute(
select(func.count()).select_from(User)
).scalar()
return jsonify({'totalUsers': total_users}), 200
except Exception as e:
return jsonify({'message': f'An error occurred: {str(e)}'}), 500
@public_summary.route('/stats/total-authors', methods=['GET'])
def get_total_authors():
"""
Fetch total authors (users who have created courses).
"""
try:
total_authors = db.session.execute(
select(func.count(func.distinct(Course.authorID)))
.select_from(Course)
).scalar()
return jsonify({'totalAuthors': total_authors}), 200
except Exception as e:
return jsonify({'message': f'An error occurred: {str(e)}'}), 500
@public_summary.route('/stats/total-courses', methods=['GET'])
def get_total_courses():
"""
Fetch total course count.
"""
try:
total_courses = db.session.execute(
select(func.count()).select_from(Course)
).scalar()
return jsonify({'totalCourses': total_courses}), 200
except Exception as e:
return jsonify({'message': f'An error occurred: {str(e)}'}), 500
@public_summary.route('/stats/subscribed-users', methods=['GET'])
def get_subscribed_users():
"""
Fetch count of users subscribed to the newsletter and are activated.
"""
try:
subscribed_users = db.session.execute(
select(func.count(User.email))
.select_from(User)
.where(User.isActivated == True)
).scalar()
return jsonify({'subscribedNewsletter': subscribed_users}), 200
except Exception as e:
return jsonify({'message': f'An error occurred: {str(e)}'}), 500

@ -20,6 +20,9 @@ DISABLE_PASSWORD_SANITY_CHECKS: bool = False
PROJECT_ROOT: os.path = os.path.dirname(os.path.abspath(__file__))
USER_UPLOADS_DIR: str = os.path.abspath(os.path.join(PROJECT_ROOT, "uploads"))
SPAM_SCORE_THRESHOLD: int = 6
AI_SPAM_SERVICES_MICROSERVICE: str ='http://localhost:5000/test-spam'
DB_URI: str = f"{DB_ENGINE}://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
ACTIVATE_ACCOUNTS_ON_SIGNUP: bool = True

@ -1,6 +1,6 @@
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship, MappedAsDataclass
from sqlalchemy import types, text, String, DateTime, func, Boolean, ForeignKey, SmallInteger
from sqlalchemy import types, text, String, DateTime, func, Boolean, ForeignKey, SmallInteger, Integer
from datetime import datetime
import uuid
from typing import List
@ -73,8 +73,11 @@ class Course(db.Model):
authorID: Mapped[uuid.UUID] = mapped_column(ForeignKey("user.id"))
author: Mapped["User"] = relationship(back_populates="publications")
description: Mapped[str] = mapped_column(String(1024), nullable=False, default='')
pageForCommunity: Mapped[int] = mapped_column(Integer, nullable=False, default=1)
totalPages: Mapped[int] = mapped_column(Integer, nullable=False, default=1)
totalEnrolled: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
isActive: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
publishedStatus: Mapped[int] = mapped_column(SmallInteger, nullable=False, default=PublishedStatus.DRAFT)
publishedStatus: Mapped[int] = mapped_column(SmallInteger, nullable=False, default=int(PublishedStatus.PENDING))
creationDate: Mapped[datetime] = mapped_column(DateTime, nullable=False, default=func.now())
coverImage: Mapped[str] = mapped_column(String(256), nullable=False, default=DEFAULT_COURSE_COVER)
serverFilename: Mapped[str] = mapped_column(String(256), nullable=False, default='')

Binary file not shown.

After

Width:  |  Height:  |  Size: 28 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 301 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 28 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 3.9 MiB

Loading…
Cancel
Save