Compare commits
7 Commits
main
...
manzilchec
Author | SHA1 | Date |
---|---|---|
|
aff6d939ca | 6 months ago |
|
0ba527a3fa | 6 months ago |
|
d8b426fa33 | 6 months ago |
|
971dbf2088 | 6 months ago |
|
1ace9341b9 | 6 months ago |
|
576214ce5d | 6 months ago |
|
d4855b9615 | 6 months ago |
@ -0,0 +1,184 @@ |
|||||||
|
from utils .auth import auth_required, requires_role |
||||||
|
from flask import Blueprint, jsonify, g,url_for |
||||||
|
from db.model import User, Course, Enrollment, Chat, db |
||||||
|
from sqlalchemy import select, func, desc, and_ |
||||||
|
from datetime import datetime, timedelta |
||||||
|
from constants import UserRole |
||||||
|
|
||||||
|
admin = Blueprint('admin', __name__) |
||||||
|
|
||||||
|
@admin.route('/stats/users', methods=['GET']) |
||||||
|
@auth_required() |
||||||
|
@requires_role([UserRole.ADMIN]) |
||||||
|
def get_user_stats(): |
||||||
|
""" |
||||||
|
Get total users and authors count. |
||||||
|
Only accessible by admin users. |
||||||
|
""" |
||||||
|
try: |
||||||
|
# Get total users |
||||||
|
total_users = db.session.execute( |
||||||
|
select(func.count()).select_from(User) |
||||||
|
).scalar() |
||||||
|
|
||||||
|
# Get authors (users who have created courses) |
||||||
|
authors = db.session.execute( |
||||||
|
select(func.count(func.distinct(Course.authorID))) |
||||||
|
.select_from(Course) |
||||||
|
).scalar() |
||||||
|
|
||||||
|
return jsonify({ |
||||||
|
'stats': { |
||||||
|
'totalUsers': total_users, |
||||||
|
'totalAuthors': authors |
||||||
|
} |
||||||
|
}), 200 |
||||||
|
|
||||||
|
except Exception as e: |
||||||
|
return jsonify({'message': f'An error occurred: {str(e)}'}), 500 |
||||||
|
|
||||||
|
@admin.route('/stats/enrollments', methods=['GET']) |
||||||
|
@auth_required() |
||||||
|
@requires_role([UserRole.ADMIN]) |
||||||
|
def get_enrollment_stats(): |
||||||
|
""" |
||||||
|
Get course enrollment and discussion statistics. |
||||||
|
Only accessible by admin users. |
||||||
|
""" |
||||||
|
try: |
||||||
|
# Get enrollment and user counts |
||||||
|
enrollment_stats = db.session.execute( |
||||||
|
select( |
||||||
|
func.count(Enrollment.id).label('total_enrollments'), |
||||||
|
func.count(func.distinct(Enrollment.userID)).label('enrolled_users') |
||||||
|
) |
||||||
|
.select_from(Enrollment) |
||||||
|
).first() |
||||||
|
|
||||||
|
# Get course-wise enrollment counts |
||||||
|
course_stats = db.session.execute( |
||||||
|
select( |
||||||
|
Course.name, |
||||||
|
func.count(Enrollment.id).label('enrollment_count') |
||||||
|
) |
||||||
|
.join(Course, Course.id == Enrollment.courseID) |
||||||
|
.group_by(Course.id) |
||||||
|
.order_by(desc('enrollment_count')) |
||||||
|
).all() |
||||||
|
|
||||||
|
return jsonify({ |
||||||
|
'stats': { |
||||||
|
'totalEnrollments': enrollment_stats.total_enrollments, |
||||||
|
'totalEnrolledUsers': enrollment_stats.enrolled_users, |
||||||
|
'courseEnrollments': [{ |
||||||
|
'courseName': stat.name, |
||||||
|
'enrollmentCount': stat.enrollment_count |
||||||
|
} for stat in course_stats] |
||||||
|
} |
||||||
|
}), 200 |
||||||
|
|
||||||
|
except Exception as e: |
||||||
|
return jsonify({'message': f'An error occurred: {str(e)}'}), 500 |
||||||
|
|
||||||
|
@admin.route('/stats/discussions', methods=['GET']) |
||||||
|
@auth_required() |
||||||
|
@requires_role([UserRole.ADMIN]) |
||||||
|
def get_discussion_stats(): |
||||||
|
""" |
||||||
|
Get chat room activity statistics. |
||||||
|
Only accessible by admin users. |
||||||
|
""" |
||||||
|
try: |
||||||
|
# Get activity for last 24 hours |
||||||
|
twenty_four_hours_ago = datetime.now() - timedelta(hours=24) |
||||||
|
|
||||||
|
# Get active rooms and their stats |
||||||
|
active_rooms = db.session.execute( |
||||||
|
select( |
||||||
|
Course.name, |
||||||
|
func.count(Chat.id).label('message_count'), |
||||||
|
func.count(func.distinct(Chat.userID)).label('active_users') |
||||||
|
) |
||||||
|
.join(Course, Course.id == Chat.courseID) |
||||||
|
.where(Chat.chatDate >= twenty_four_hours_ago) |
||||||
|
.group_by(Course.id) |
||||||
|
.order_by(desc('message_count')) |
||||||
|
).all() |
||||||
|
|
||||||
|
# Get total active rooms |
||||||
|
total_active_rooms = len(active_rooms) |
||||||
|
|
||||||
|
# Get most active room |
||||||
|
most_active_room = None |
||||||
|
if active_rooms: |
||||||
|
most_active = active_rooms[0] |
||||||
|
most_active_room = { |
||||||
|
'name': most_active.name, |
||||||
|
'messageCount': most_active.message_count, |
||||||
|
'activeUsers': most_active.active_users |
||||||
|
} |
||||||
|
|
||||||
|
# Get total active users across all rooms |
||||||
|
total_active_users = db.session.execute( |
||||||
|
select(func.count(func.distinct(Chat.userID))) |
||||||
|
.where(Chat.chatDate >= twenty_four_hours_ago) |
||||||
|
).scalar() |
||||||
|
|
||||||
|
return jsonify({ |
||||||
|
'stats': { |
||||||
|
'totalActiveRooms': total_active_rooms, |
||||||
|
'totalActiveUsers': total_active_users, |
||||||
|
'mostActiveRoom': most_active_room, |
||||||
|
'activeRooms': [{ |
||||||
|
'roomName': room.name, |
||||||
|
'messageCount': room.message_count, |
||||||
|
'activeUsers': room.active_users |
||||||
|
} for room in active_rooms] |
||||||
|
} |
||||||
|
}), 200 |
||||||
|
|
||||||
|
except Exception as e: |
||||||
|
return jsonify({'message': f'An error occurred: {str(e)}'}), 500 |
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@admin.route('/stats/userDetail', methods=['GET']) |
||||||
|
@auth_required() |
||||||
|
@requires_role([UserRole.ADMIN]) |
||||||
|
def get_user_details(): |
||||||
|
""" |
||||||
|
Get detailed information for all users. |
||||||
|
Only accessible by admin users. |
||||||
|
""" |
||||||
|
try: |
||||||
|
current_user: User = g.current_user |
||||||
|
|
||||||
|
# Get all users with basic info |
||||||
|
users = db.session.execute( |
||||||
|
select(User) |
||||||
|
.order_by(desc(User.joinedDate)) |
||||||
|
).scalars() |
||||||
|
|
||||||
|
# Format user data |
||||||
|
user_list = [{ |
||||||
|
'id': str(user.id), |
||||||
|
'username': user.username, |
||||||
|
'email': user.email, |
||||||
|
'firstName': user.firstName, |
||||||
|
'lastName': user.lastName, |
||||||
|
'profilePicture': url_for('send_file', filename=current_user.pfpFilename, _external=True), |
||||||
|
'bio': user.bio, |
||||||
|
'role': user.role, |
||||||
|
'isActivated': user.isActivated, |
||||||
|
'joinedDate': user.joinedDate.isoformat(), |
||||||
|
'lastOnline': user.lastOnline.isoformat(), |
||||||
|
'dateOfBirth': user.dob.isoformat() if user.dob else None |
||||||
|
} for user in users] |
||||||
|
|
||||||
|
return jsonify({ |
||||||
|
'users': user_list, |
||||||
|
'count': len(user_list) |
||||||
|
}), 200 |
||||||
|
|
||||||
|
except Exception as e: |
||||||
|
return jsonify({'message': f'An error occurred: {str(e)}'}), 500 |
@ -1,3 +1,152 @@ |
|||||||
from flask import Blueprint |
from flask import Blueprint, request, jsonify, g |
||||||
|
from werkzeug.datastructures import MultiDict |
||||||
|
import os |
||||||
|
import uuid |
||||||
|
from sqlalchemy import select |
||||||
|
from config import DEFAULT_BADGE_ICON, USER_UPLOADS_DIR |
||||||
|
from db.model import db, Badge, User |
||||||
|
from utils.utils import random_string_generator |
||||||
|
from utils.auth import auth_required, requires_role |
||||||
|
from constants import UserRole |
||||||
|
|
||||||
badge = Blueprint('badge', __name__) |
badge = Blueprint('badge', __name__) |
||||||
|
|
||||||
|
@badge.route('/create', methods=['POST']) |
||||||
|
@auth_required() |
||||||
|
@requires_role([UserRole.ADMIN]) |
||||||
|
def create_badge(): |
||||||
|
""" |
||||||
|
Create a new badge. Only accessible by admin users. |
||||||
|
Requires: badge_name, description |
||||||
|
Optional: icon_image, can_claim |
||||||
|
""" |
||||||
|
try: |
||||||
|
form_data: dict = request.form |
||||||
|
badge_uploaded_icon: MultiDict|None = request.files.get('icon_image', None) |
||||||
|
|
||||||
|
# Validate required fields |
||||||
|
try: |
||||||
|
badge_name: str = form_data['badge_name'] |
||||||
|
if len(badge_name) > 16: |
||||||
|
return jsonify({'message': 'Badge name cannot exceed 16 characters'}), 400 |
||||||
|
except KeyError: |
||||||
|
return jsonify({'message': 'Badge name cannot be empty'}), 400 |
||||||
|
|
||||||
|
# Handle icon upload |
||||||
|
icon_file_name: str = DEFAULT_BADGE_ICON |
||||||
|
if badge_uploaded_icon is not None: |
||||||
|
icon_file_name = f"{random_string_generator(32)}.{badge_uploaded_icon.filename.split('.')[-1]}" |
||||||
|
badge_uploaded_icon.save(os.path.join(USER_UPLOADS_DIR, icon_file_name)) |
||||||
|
|
||||||
|
# Get optional fields |
||||||
|
badge_description: str = form_data.get('description', '') |
||||||
|
can_claim: bool = form_data.get('can_claim', 'false').lower() == 'true' |
||||||
|
|
||||||
|
# Create new badge |
||||||
|
new_badge: Badge = Badge( |
||||||
|
name=badge_name, |
||||||
|
description=badge_description, |
||||||
|
icon=icon_file_name, |
||||||
|
canClaim=can_claim, |
||||||
|
user_badges=[] |
||||||
|
) |
||||||
|
|
||||||
|
db.session.add(new_badge) |
||||||
|
db.session.commit() |
||||||
|
|
||||||
|
return jsonify({ |
||||||
|
'message': 'Badge was created successfully.', |
||||||
|
'badge': { |
||||||
|
'id': str(new_badge.id), |
||||||
|
'name': new_badge.name, |
||||||
|
'description': new_badge.description, |
||||||
|
'icon': new_badge.icon, |
||||||
|
'canClaim': new_badge.canClaim |
||||||
|
} |
||||||
|
}), 200 |
||||||
|
|
||||||
|
except Exception as e: |
||||||
|
db.session.rollback() |
||||||
|
return jsonify({'message': f'An error occurred: {str(e)}'}), 500 |
||||||
|
|
||||||
|
@badge.route('/update/<badge_id>', methods=['PUT']) |
||||||
|
@auth_required() |
||||||
|
@requires_role([UserRole.ADMIN]) |
||||||
|
def update_badge(badge_id): |
||||||
|
""" |
||||||
|
Update an existing badge. Only accessible by admin users. |
||||||
|
""" |
||||||
|
try: |
||||||
|
badge_to_update = db.session.get(Badge, uuid.UUID(badge_id)) |
||||||
|
if not badge_to_update: |
||||||
|
return jsonify({'message': 'Badge not found'}), 404 |
||||||
|
|
||||||
|
form_data: dict = request.form |
||||||
|
badge_uploaded_icon: MultiDict|None = request.files.get('icon_image', None) |
||||||
|
|
||||||
|
# Update icon if provided |
||||||
|
if badge_uploaded_icon is not None: |
||||||
|
new_icon_name = f"{random_string_generator(32)}.{badge_uploaded_icon.filename.split('.')[-1]}" |
||||||
|
badge_uploaded_icon.save(os.path.join(USER_UPLOADS_DIR, new_icon_name)) |
||||||
|
# Remove old icon if it's not the default |
||||||
|
if badge_to_update.icon != DEFAULT_BADGE_ICON: |
||||||
|
try: |
||||||
|
os.remove(os.path.join(USER_UPLOADS_DIR, badge_to_update.icon)) |
||||||
|
except OSError: |
||||||
|
pass # File might not exist |
||||||
|
badge_to_update.icon = new_icon_name |
||||||
|
|
||||||
|
# Update other fields if provided |
||||||
|
if 'badge_name' in form_data: |
||||||
|
if len(form_data['badge_name']) > 16: |
||||||
|
return jsonify({'message': 'Badge name cannot exceed 16 characters'}), 400 |
||||||
|
badge_to_update.name = form_data['badge_name'] |
||||||
|
|
||||||
|
if 'description' in form_data: |
||||||
|
badge_to_update.description = form_data['description'] |
||||||
|
|
||||||
|
if 'can_claim' in form_data: |
||||||
|
badge_to_update.canClaim = form_data['can_claim'].lower() == 'true' |
||||||
|
|
||||||
|
db.session.commit() |
||||||
|
|
||||||
|
return jsonify({ |
||||||
|
'message': 'Badge was updated successfully.', |
||||||
|
'badge': { |
||||||
|
'id': str(badge_to_update.id), |
||||||
|
'name': badge_to_update.name, |
||||||
|
'description': badge_to_update.description, |
||||||
|
'icon': badge_to_update.icon, |
||||||
|
'canClaim': badge_to_update.canClaim |
||||||
|
} |
||||||
|
}), 200 |
||||||
|
|
||||||
|
except Exception as e: |
||||||
|
db.session.rollback() |
||||||
|
return jsonify({'message': f'An error occurred: {str(e)}'}), 500 |
||||||
|
|
||||||
|
@badge.route('/list', methods=['GET']) |
||||||
|
@auth_required() |
||||||
|
def list_badges(): |
||||||
|
""" |
||||||
|
List all badges. Accessible by all authenticated users. |
||||||
|
""" |
||||||
|
try: |
||||||
|
badges = db.session.execute( |
||||||
|
select(Badge) |
||||||
|
.order_by(Badge.createDate.desc()) |
||||||
|
).scalars().all() |
||||||
|
|
||||||
|
return jsonify({ |
||||||
|
'badges': [{ |
||||||
|
'id': str(badge.id), |
||||||
|
'name': badge.name, |
||||||
|
'description': badge.description, |
||||||
|
'icon': badge.icon, |
||||||
|
'canClaim': badge.canClaim, |
||||||
|
'createDate': badge.createDate.isoformat() |
||||||
|
} for badge in badges] |
||||||
|
}), 200 |
||||||
|
|
||||||
|
except Exception as e: |
||||||
|
return jsonify({'message': f'An error occurred: {str(e)}'}), 500 |
@ -1,66 +1,148 @@ |
|||||||
from flask import Blueprint, request, jsonify |
from flask import Blueprint, request, jsonify, g |
||||||
from uuid import UUID |
from db.model import User, Enrollment, Chat, db |
||||||
from db.model import db, User, Course, Enrollment |
import uuid |
||||||
|
from sqlalchemy import select, and_, desc |
||||||
|
from datetime import datetime |
||||||
|
|
||||||
chat = Blueprint('chat', __name__) |
chat = Blueprint('chat', __name__) |
||||||
|
|
||||||
@chat.route('/course/<uuid:course_id>/users', methods=['POST']) |
@chat.route('/<uuid:course_id>/messages', methods=['GET']) |
||||||
def get_users_assigned_to_course(course_id: UUID): |
def get_course_messages(course_id: uuid.UUID): |
||||||
""" |
""" |
||||||
Fetch all users assigned to a specific course. |
Fetch chat messages for a specific course with optional pagination. |
||||||
:param course_id: ID of the course to fetch users for (UUID). |
Only enrolled users can access the messages. |
||||||
:return: JSON response with users assigned to the course. |
|
||||||
|
Query Parameters: |
||||||
|
- before: UUID of the message to get older entries |
||||||
|
- after: UUID of the message to get newer entries |
||||||
|
- limit: Number of messages to return (default: 10) |
||||||
""" |
""" |
||||||
try: |
try: |
||||||
# Query the course to ensure it exists |
current_user: User = g.current_user |
||||||
course = Course.query.get(course_id) |
limit = int(request.args.get('limit', 10)) |
||||||
if not course: |
before_id = request.args.get('before') |
||||||
return jsonify({"error": "Course not found."}), 404 |
after_id = request.args.get('after') |
||||||
|
|
||||||
# Get the list of users assigned to the course |
# Verify user's enrollment |
||||||
users = User.query.filter(User.enrollments.any(course_id=course_id)).all() |
enrollment = db.session.execute( |
||||||
|
select(Enrollment).where( |
||||||
# Prepare the response data |
and_( |
||||||
user_data = [ |
Enrollment.courseID == course_id, |
||||||
{ |
Enrollment.userID == current_user.id |
||||||
"id": user.id, |
) |
||||||
"email": user.email, |
) |
||||||
"username": user.username, |
).scalar() |
||||||
"firstName": user.firstName, |
|
||||||
"lastName": user.lastName, |
if not enrollment: |
||||||
} |
return jsonify({'message': 'You are not enrolled in this course'}), 401 |
||||||
for user in users |
|
||||||
] |
# Base query |
||||||
|
query = select(Chat).where(Chat.courseID == course_id) |
||||||
return jsonify({"course": course.name, "users": user_data}), 200 |
|
||||||
except Exception as e: |
# Handle pagination |
||||||
return jsonify({"error": f"An error occurred: {str(e)}"}), 500 |
if before_id: |
||||||
|
try: |
||||||
|
reference_message = db.session.execute( |
||||||
|
select(Chat).where(Chat.id == uuid.UUID(before_id)) |
||||||
|
).scalar() |
||||||
|
|
||||||
|
if not reference_message: |
||||||
|
return jsonify({'message': 'Reference message not found'}), 404 |
||||||
|
|
||||||
|
query = query.where(Chat.chatDate < reference_message.chatDate) |
||||||
|
except ValueError: |
||||||
|
return jsonify({'message': 'Invalid message ID format'}), 400 |
||||||
|
elif after_id: |
||||||
|
try: |
||||||
|
reference_message = db.session.execute( |
||||||
|
select(Chat).where(Chat.id == uuid.UUID(after_id)) |
||||||
|
).scalar() |
||||||
|
|
||||||
|
if not reference_message: |
||||||
|
return jsonify({'message': 'Reference message not found'}), 404 |
||||||
|
|
||||||
|
query = query.where(Chat.chatDate > reference_message.chatDate) |
||||||
|
# For after queries, reverse the order later |
||||||
|
query = query.order_by(Chat.chatDate) |
||||||
|
except ValueError: |
||||||
|
return jsonify({'message': 'Invalid message ID format'}), 400 |
||||||
|
else: |
||||||
|
# Default ordering |
||||||
|
query = query.order_by(desc(Chat.chatDate)) |
||||||
|
|
||||||
|
# Apply limit and execute query |
||||||
|
query = query.limit(limit) |
||||||
|
messages = list(db.session.execute(query).scalars()) |
||||||
|
|
||||||
@chat.route('/course/<uuid:course_id>/chat', methods=['POST']) |
# Reverse the order for 'after' queries to maintain consistency |
||||||
def join_chat(course_id: UUID): |
if after_id: |
||||||
|
messages.reverse() |
||||||
|
|
||||||
|
# Format messages |
||||||
|
chat_messages = [{ |
||||||
|
'id': str(msg.id), |
||||||
|
'text': msg.textContent, |
||||||
|
'userId': str(msg.userID), |
||||||
|
'username': msg.user.username, |
||||||
|
'timestamp': msg.chatDate.isoformat() |
||||||
|
} for msg in messages] |
||||||
|
|
||||||
|
return jsonify({ |
||||||
|
'messages': chat_messages, |
||||||
|
'count': len(chat_messages), |
||||||
|
'hasMore': len(chat_messages) == limit |
||||||
|
}), 200 |
||||||
|
|
||||||
|
except Exception as e: |
||||||
|
return jsonify({'message': f'An error occurred: {str(e)}'}), 500 |
||||||
|
|
||||||
|
@chat.route('/<uuid:course_id>/send', methods=['POST']) |
||||||
|
def send_message(course_id: uuid.UUID): |
||||||
""" |
""" |
||||||
Allow users to join the chat only if they are enrolled in the course. |
Send a new chat message in a course. |
||||||
:param course_id: ID of the course (UUID). |
Only enrolled users can send messages. |
||||||
:return: JSON response indicating whether the user can join or not. |
|
||||||
""" |
""" |
||||||
try: |
try: |
||||||
# Get the user from the request (assume user_id is passed in the request body) |
current_user: User = g.current_user |
||||||
user_id = request.json.get('user_id') |
message_text = request.json.get('message') |
||||||
if not user_id: |
|
||||||
return jsonify({"error": "User ID is required."}), 400 |
if not message_text or not message_text.strip(): |
||||||
|
return jsonify({'message': 'Message content is required'}), 400 |
||||||
# Query the user and ensure they exist |
|
||||||
user = User.query.get(user_id) |
# Verify user's enrollment |
||||||
if not user: |
enrollment = db.session.execute( |
||||||
return jsonify({"error": "User not found."}), 404 |
select(Enrollment).where( |
||||||
|
and_( |
||||||
# Check if the user is enrolled in the course |
Enrollment.courseID == course_id, |
||||||
enrollment = Enrollment.query.filter_by(userID=user_id, courseID=course_id).first() |
Enrollment.userID == current_user.id |
||||||
|
) |
||||||
|
) |
||||||
|
).scalar() |
||||||
|
|
||||||
if not enrollment: |
if not enrollment: |
||||||
return jsonify({"error": "User is not enrolled in this course."}), 403 |
return jsonify({'message': 'You are not enrolled in this course'}), 401 |
||||||
|
|
||||||
|
# Create new chat message |
||||||
|
new_message = Chat( |
||||||
|
textContent=message_text, |
||||||
|
userID=current_user.id, |
||||||
|
courseID=course_id, |
||||||
|
chatDate=datetime.now() |
||||||
|
) |
||||||
|
|
||||||
|
db.session.add(new_message) |
||||||
|
|
||||||
|
# Update last activity in enrollment |
||||||
|
enrollment.lastActivity = datetime.now() |
||||||
|
|
||||||
|
db.session.commit() |
||||||
|
|
||||||
|
return jsonify({ |
||||||
|
'message': 'Message sent successfully', |
||||||
|
'messageId': str(new_message.id), |
||||||
|
'timestamp': new_message.chatDate.isoformat() |
||||||
|
}), 201 |
||||||
|
|
||||||
# If enrolled, allow the user to join the chat |
|
||||||
return jsonify({"message": f"User {user.username} is enrolled in the course and can join the chat."}), 200 |
|
||||||
except Exception as e: |
except Exception as e: |
||||||
return jsonify({"error": f"An error occurred: {str(e)}"}), 500 |
db.session.rollback() |
||||||
|
return jsonify({'message': f'An error occurred: {str(e)}'}), 500 |
@ -1,3 +1,108 @@ |
|||||||
from flask import Blueprint |
from flask import Blueprint, jsonify, request, g |
||||||
|
from utils.auth import auth_required |
||||||
|
from db.model import User, Notification, db |
||||||
|
from sqlalchemy import select, and_, desc |
||||||
|
from datetime import datetime |
||||||
|
import uuid |
||||||
|
|
||||||
notification = Blueprint('notification', __name__) |
notification = Blueprint('notification', __name__) |
||||||
|
|
||||||
|
@notification.route('/get', methods=['GET', 'POST']) |
||||||
|
@auth_required() |
||||||
|
def handle_notifications(): |
||||||
|
""" |
||||||
|
Unified endpoint for handling notifications: |
||||||
|
GET: Fetch notifications with optional before/after pagination |
||||||
|
POST: Mark notifications as seen |
||||||
|
|
||||||
|
Query Parameters for GET: |
||||||
|
- before: UUID of notification to get older entries |
||||||
|
- after: UUID of notification to get newer entries |
||||||
|
- limit: Number of notifications to return (default: 10) |
||||||
|
|
||||||
|
POST Body: |
||||||
|
{ |
||||||
|
"notificationIds": ["uuid1", "uuid2", ...] |
||||||
|
} |
||||||
|
""" |
||||||
|
if request.method == 'GET': |
||||||
|
try: |
||||||
|
current_user: User = g.current_user |
||||||
|
limit: int = int(request.args.get('limit', 10)) |
||||||
|
before_id = request.args.get('before') |
||||||
|
after_id = request.args.get('after') |
||||||
|
|
||||||
|
# Base query |
||||||
|
query = select(Notification).where( |
||||||
|
Notification.userID == current_user.id |
||||||
|
) |
||||||
|
|
||||||
|
# Handle pagination |
||||||
|
if before_id: |
||||||
|
try: |
||||||
|
reference_notification = db.session.execute( |
||||||
|
select(Notification) |
||||||
|
.where(and_( |
||||||
|
Notification.id == uuid.UUID(before_id), |
||||||
|
Notification.userID == current_user.id |
||||||
|
)) |
||||||
|
).scalar() |
||||||
|
|
||||||
|
if not reference_notification: |
||||||
|
return jsonify({'message': 'Reference notification not found'}), 404 |
||||||
|
|
||||||
|
query = query.where( |
||||||
|
Notification.notifDate < reference_notification.notifDate |
||||||
|
) |
||||||
|
except ValueError: |
||||||
|
return jsonify({'message': 'Invalid notification ID format'}), 400 |
||||||
|
|
||||||
|
elif after_id: |
||||||
|
try: |
||||||
|
reference_notification = db.session.execute( |
||||||
|
select(Notification) |
||||||
|
.where(and_( |
||||||
|
Notification.id == uuid.UUID(after_id), |
||||||
|
Notification.userID == current_user.id |
||||||
|
)) |
||||||
|
).scalar() |
||||||
|
|
||||||
|
if not reference_notification: |
||||||
|
return jsonify({'message': 'Reference notification not found'}), 404 |
||||||
|
|
||||||
|
query = query.where( |
||||||
|
Notification.notifDate > reference_notification.notifDate |
||||||
|
) |
||||||
|
# For after queries, we need to reverse the order later |
||||||
|
query = query.order_by(Notification.notifDate) |
||||||
|
except ValueError: |
||||||
|
return jsonify({'message': 'Invalid notification ID format'}), 400 |
||||||
|
else: |
||||||
|
# Default ordering |
||||||
|
query = query.order_by(desc(Notification.notifDate)) |
||||||
|
|
||||||
|
# Apply limit and execute query |
||||||
|
query = query.limit(limit) |
||||||
|
notifications = list(db.session.execute(query).scalars()) |
||||||
|
|
||||||
|
# Reverse the order for 'after' queries to maintain consistency |
||||||
|
if after_id: |
||||||
|
notifications.reverse() |
||||||
|
|
||||||
|
# Format response |
||||||
|
notif_list = [{ |
||||||
|
'id': str(notif.id), |
||||||
|
'type': notif.notificationType, |
||||||
|
'data': notif.notificationData, |
||||||
|
'seen': notif.isSeen, |
||||||
|
'date': notif.notifDate.isoformat() |
||||||
|
} for notif in notifications] |
||||||
|
|
||||||
|
return jsonify({ |
||||||
|
'notifications': notif_list, |
||||||
|
'count': len(notif_list), |
||||||
|
'hasMore': len(notif_list) == limit |
||||||
|
}), 200 |
||||||
|
|
||||||
|
except Exception as e: |
||||||
|
return jsonify({'message': f'An error occurred: {str(e)}'}), 500 |
@ -0,0 +1,71 @@ |
|||||||
|
from flask import Blueprint, jsonify |
||||||
|
from db.model import User, Course, db |
||||||
|
from sqlalchemy import select, func |
||||||
|
|
||||||
|
public_summary = Blueprint('public', __name__) |
||||||
|
|
||||||
|
@public_summary.route('/stats/total-users', methods=['GET']) |
||||||
|
def get_total_users(): |
||||||
|
""" |
||||||
|
Fetch total user count. |
||||||
|
""" |
||||||
|
try: |
||||||
|
total_users = db.session.execute( |
||||||
|
select(func.count()).select_from(User) |
||||||
|
).scalar() |
||||||
|
|
||||||
|
return jsonify({'totalUsers': total_users}), 200 |
||||||
|
|
||||||
|
except Exception as e: |
||||||
|
return jsonify({'message': f'An error occurred: {str(e)}'}), 500 |
||||||
|
|
||||||
|
|
||||||
|
@public_summary.route('/stats/total-authors', methods=['GET']) |
||||||
|
def get_total_authors(): |
||||||
|
""" |
||||||
|
Fetch total authors (users who have created courses). |
||||||
|
""" |
||||||
|
try: |
||||||
|
total_authors = db.session.execute( |
||||||
|
select(func.count(func.distinct(Course.authorID))) |
||||||
|
.select_from(Course) |
||||||
|
).scalar() |
||||||
|
|
||||||
|
return jsonify({'totalAuthors': total_authors}), 200 |
||||||
|
|
||||||
|
except Exception as e: |
||||||
|
return jsonify({'message': f'An error occurred: {str(e)}'}), 500 |
||||||
|
|
||||||
|
|
||||||
|
@public_summary.route('/stats/total-courses', methods=['GET']) |
||||||
|
def get_total_courses(): |
||||||
|
""" |
||||||
|
Fetch total course count. |
||||||
|
""" |
||||||
|
try: |
||||||
|
total_courses = db.session.execute( |
||||||
|
select(func.count()).select_from(Course) |
||||||
|
).scalar() |
||||||
|
|
||||||
|
return jsonify({'totalCourses': total_courses}), 200 |
||||||
|
|
||||||
|
except Exception as e: |
||||||
|
return jsonify({'message': f'An error occurred: {str(e)}'}), 500 |
||||||
|
|
||||||
|
|
||||||
|
@public_summary.route('/stats/subscribed-users', methods=['GET']) |
||||||
|
def get_subscribed_users(): |
||||||
|
""" |
||||||
|
Fetch count of users subscribed to the newsletter and are activated. |
||||||
|
""" |
||||||
|
try: |
||||||
|
subscribed_users = db.session.execute( |
||||||
|
select(func.count(User.email)) |
||||||
|
.select_from(User) |
||||||
|
.where(User.isActivated == True) |
||||||
|
).scalar() |
||||||
|
|
||||||
|
return jsonify({'subscribedNewsletter': subscribed_users}), 200 |
||||||
|
|
||||||
|
except Exception as e: |
||||||
|
return jsonify({'message': f'An error occurred: {str(e)}'}), 500 |
@ -0,0 +1,52 @@ |
|||||||
|
from flask import Blueprint |
||||||
|
from flask import Blueprint, request, jsonify, current_app, g |
||||||
|
from db.model import User, Category, Quiz, Course, Enrollment, QuizAttempt |
||||||
|
from db.model import db |
||||||
|
import uuid |
||||||
|
from sqlalchemy import select, and_ |
||||||
|
|
||||||
|
quiz = Blueprint('quiz', __name__) |
||||||
|
|
||||||
|
|
||||||
|
@quiz.route('/create', methods=['POST']) |
||||||
|
def create_quiz(): |
||||||
|
data: dict = request.form |
||||||
|
creator_user: User = g.current_user |
||||||
|
course_id: uuid.UUID = uuid.UUID(data.get('courseID')) |
||||||
|
enrollment_data: Enrollment = db.session.execute(select(Enrollment).where(and_(Enrollment.courseID == course_id, Enrollment.userID == creator_user.id))).scalar() |
||||||
|
if not enrollment_data: |
||||||
|
return jsonify({'message': 'You are not enrolled in this class'}), 401 |
||||||
|
|
||||||
|
get_quiz_data :str = '{"questions":[{"question":"What is the capital of France?","options":["Paris","London","Berlin","Madrid"],"answer":"Paris"},{"question":"What is 2 + 2?","options":["3","4","5","6"],"answer":"4"}]}' |
||||||
|
new_quiz = Quiz( |
||||||
|
creatorUserID = creator_user.id, |
||||||
|
courseID = course_id, |
||||||
|
quizJson = get_quiz_data, |
||||||
|
quiz_attempts = [] |
||||||
|
) |
||||||
|
db.session.add_all(new_quiz) |
||||||
|
db.session.commit() |
||||||
|
return jsonify({'message': 'Course was created successfully.'}), 200 |
||||||
|
|
||||||
|
@quiz.route('/attempt', methods=['POST']) |
||||||
|
def attempt_quiz(): |
||||||
|
data: dict = request.form |
||||||
|
attempt_user: User = g.current_user |
||||||
|
quiz_id: uuid.UUID = uuid.UUID(data.get('quizID')) |
||||||
|
course_data: Enrollment = db.session.execute(select(Enrollment).where(and_(Enrollment.courseID == course_data, Enrollment.userID == attempt_user.id))).scalar() |
||||||
|
if not course_data: |
||||||
|
return jsonify({'message': 'You are not enrolled in this class'}), 401 |
||||||
|
|
||||||
|
answerKey: str = '{"questions":[{"question":"What is the capital of France?","options":["Paris","London","Berlin","Madrid"],"answer":"Paris"},{"question":"What is 2 + 2?","options":["3","4","5","6"],"answer":"4"}]}' |
||||||
|
#function to calculate Score |
||||||
|
score_value = 0 |
||||||
|
|
||||||
|
new_attempt = QuizAttempt( |
||||||
|
userID = attempt_user, |
||||||
|
quizID = quiz_id, |
||||||
|
answerKey = answerKey, |
||||||
|
score = score_value |
||||||
|
) |
||||||
|
db.session.add_all(new_attempt) |
||||||
|
db.session.commit() |
||||||
|
return jsonify({'message': 'Quiz was appended sucessfully'}), 200 |
@ -0,0 +1,29 @@ |
|||||||
|
import os |
||||||
|
from PyPDF2 import PdfReader, PdfWriter |
||||||
|
|
||||||
|
PATH = 'pdfcontent' |
||||||
|
if not os.path.exists(PATH): |
||||||
|
os.makedirs(PATH) |
||||||
|
|
||||||
|
def getPdfFile(filepath): |
||||||
|
with open(filepath, 'rb') as pdf_file: |
||||||
|
reader = PdfReader(pdf_file) |
||||||
|
for page_num in range(len(reader.pages)): |
||||||
|
page = reader.pages[page_num] |
||||||
|
text = page.extract_text() |
||||||
|
|
||||||
|
# Save as text file |
||||||
|
if text: # Ensure there's text on the page before saving |
||||||
|
filename = os.path.splitext(os.path.basename(filepath))[0] |
||||||
|
output_txt_filename = os.path.join(PATH, f"{filename}_page{page_num + 1}.txt") |
||||||
|
with open(output_txt_filename, 'w', encoding='utf-8') as output_file: |
||||||
|
output_file.write(text) |
||||||
|
print(f"Page {page_num + 1} extracted and saved as {output_txt_filename}") |
||||||
|
|
||||||
|
# Save as PDF file |
||||||
|
writer = PdfWriter() |
||||||
|
writer.add_page(page) |
||||||
|
output_pdf_filename = os.path.join(PATH, f"{filename}_page{page_num + 1}.pdf") |
||||||
|
with open(output_pdf_filename, 'wb') as output_pdf_file: |
||||||
|
writer.write(output_pdf_file) |
||||||
|
print(f"Page {page_num + 1} extracted and saved as {output_pdf_filename}") |
Loading…
Reference in new issue