Compare commits
7 Commits
main
...
manzilchec
Author | SHA1 | Date |
---|---|---|
|
aff6d939ca | 6 months ago |
|
0ba527a3fa | 6 months ago |
|
d8b426fa33 | 6 months ago |
|
971dbf2088 | 6 months ago |
|
1ace9341b9 | 6 months ago |
|
576214ce5d | 6 months ago |
|
d4855b9615 | 6 months ago |
@ -0,0 +1,184 @@ |
||||
from utils .auth import auth_required, requires_role |
||||
from flask import Blueprint, jsonify, g,url_for |
||||
from db.model import User, Course, Enrollment, Chat, db |
||||
from sqlalchemy import select, func, desc, and_ |
||||
from datetime import datetime, timedelta |
||||
from constants import UserRole |
||||
|
||||
admin = Blueprint('admin', __name__) |
||||
|
||||
@admin.route('/stats/users', methods=['GET']) |
||||
@auth_required() |
||||
@requires_role([UserRole.ADMIN]) |
||||
def get_user_stats(): |
||||
""" |
||||
Get total users and authors count. |
||||
Only accessible by admin users. |
||||
""" |
||||
try: |
||||
# Get total users |
||||
total_users = db.session.execute( |
||||
select(func.count()).select_from(User) |
||||
).scalar() |
||||
|
||||
# Get authors (users who have created courses) |
||||
authors = db.session.execute( |
||||
select(func.count(func.distinct(Course.authorID))) |
||||
.select_from(Course) |
||||
).scalar() |
||||
|
||||
return jsonify({ |
||||
'stats': { |
||||
'totalUsers': total_users, |
||||
'totalAuthors': authors |
||||
} |
||||
}), 200 |
||||
|
||||
except Exception as e: |
||||
return jsonify({'message': f'An error occurred: {str(e)}'}), 500 |
||||
|
||||
@admin.route('/stats/enrollments', methods=['GET']) |
||||
@auth_required() |
||||
@requires_role([UserRole.ADMIN]) |
||||
def get_enrollment_stats(): |
||||
""" |
||||
Get course enrollment and discussion statistics. |
||||
Only accessible by admin users. |
||||
""" |
||||
try: |
||||
# Get enrollment and user counts |
||||
enrollment_stats = db.session.execute( |
||||
select( |
||||
func.count(Enrollment.id).label('total_enrollments'), |
||||
func.count(func.distinct(Enrollment.userID)).label('enrolled_users') |
||||
) |
||||
.select_from(Enrollment) |
||||
).first() |
||||
|
||||
# Get course-wise enrollment counts |
||||
course_stats = db.session.execute( |
||||
select( |
||||
Course.name, |
||||
func.count(Enrollment.id).label('enrollment_count') |
||||
) |
||||
.join(Course, Course.id == Enrollment.courseID) |
||||
.group_by(Course.id) |
||||
.order_by(desc('enrollment_count')) |
||||
).all() |
||||
|
||||
return jsonify({ |
||||
'stats': { |
||||
'totalEnrollments': enrollment_stats.total_enrollments, |
||||
'totalEnrolledUsers': enrollment_stats.enrolled_users, |
||||
'courseEnrollments': [{ |
||||
'courseName': stat.name, |
||||
'enrollmentCount': stat.enrollment_count |
||||
} for stat in course_stats] |
||||
} |
||||
}), 200 |
||||
|
||||
except Exception as e: |
||||
return jsonify({'message': f'An error occurred: {str(e)}'}), 500 |
||||
|
||||
@admin.route('/stats/discussions', methods=['GET']) |
||||
@auth_required() |
||||
@requires_role([UserRole.ADMIN]) |
||||
def get_discussion_stats(): |
||||
""" |
||||
Get chat room activity statistics. |
||||
Only accessible by admin users. |
||||
""" |
||||
try: |
||||
# Get activity for last 24 hours |
||||
twenty_four_hours_ago = datetime.now() - timedelta(hours=24) |
||||
|
||||
# Get active rooms and their stats |
||||
active_rooms = db.session.execute( |
||||
select( |
||||
Course.name, |
||||
func.count(Chat.id).label('message_count'), |
||||
func.count(func.distinct(Chat.userID)).label('active_users') |
||||
) |
||||
.join(Course, Course.id == Chat.courseID) |
||||
.where(Chat.chatDate >= twenty_four_hours_ago) |
||||
.group_by(Course.id) |
||||
.order_by(desc('message_count')) |
||||
).all() |
||||
|
||||
# Get total active rooms |
||||
total_active_rooms = len(active_rooms) |
||||
|
||||
# Get most active room |
||||
most_active_room = None |
||||
if active_rooms: |
||||
most_active = active_rooms[0] |
||||
most_active_room = { |
||||
'name': most_active.name, |
||||
'messageCount': most_active.message_count, |
||||
'activeUsers': most_active.active_users |
||||
} |
||||
|
||||
# Get total active users across all rooms |
||||
total_active_users = db.session.execute( |
||||
select(func.count(func.distinct(Chat.userID))) |
||||
.where(Chat.chatDate >= twenty_four_hours_ago) |
||||
).scalar() |
||||
|
||||
return jsonify({ |
||||
'stats': { |
||||
'totalActiveRooms': total_active_rooms, |
||||
'totalActiveUsers': total_active_users, |
||||
'mostActiveRoom': most_active_room, |
||||
'activeRooms': [{ |
||||
'roomName': room.name, |
||||
'messageCount': room.message_count, |
||||
'activeUsers': room.active_users |
||||
} for room in active_rooms] |
||||
} |
||||
}), 200 |
||||
|
||||
except Exception as e: |
||||
return jsonify({'message': f'An error occurred: {str(e)}'}), 500 |
||||
|
||||
|
||||
|
||||
@admin.route('/stats/userDetail', methods=['GET']) |
||||
@auth_required() |
||||
@requires_role([UserRole.ADMIN]) |
||||
def get_user_details(): |
||||
""" |
||||
Get detailed information for all users. |
||||
Only accessible by admin users. |
||||
""" |
||||
try: |
||||
current_user: User = g.current_user |
||||
|
||||
# Get all users with basic info |
||||
users = db.session.execute( |
||||
select(User) |
||||
.order_by(desc(User.joinedDate)) |
||||
).scalars() |
||||
|
||||
# Format user data |
||||
user_list = [{ |
||||
'id': str(user.id), |
||||
'username': user.username, |
||||
'email': user.email, |
||||
'firstName': user.firstName, |
||||
'lastName': user.lastName, |
||||
'profilePicture': url_for('send_file', filename=current_user.pfpFilename, _external=True), |
||||
'bio': user.bio, |
||||
'role': user.role, |
||||
'isActivated': user.isActivated, |
||||
'joinedDate': user.joinedDate.isoformat(), |
||||
'lastOnline': user.lastOnline.isoformat(), |
||||
'dateOfBirth': user.dob.isoformat() if user.dob else None |
||||
} for user in users] |
||||
|
||||
return jsonify({ |
||||
'users': user_list, |
||||
'count': len(user_list) |
||||
}), 200 |
||||
|
||||
except Exception as e: |
||||
return jsonify({'message': f'An error occurred: {str(e)}'}), 500 |
@ -1,3 +1,152 @@ |
||||
from flask import Blueprint |
||||
from flask import Blueprint, request, jsonify, g |
||||
from werkzeug.datastructures import MultiDict |
||||
import os |
||||
import uuid |
||||
from sqlalchemy import select |
||||
from config import DEFAULT_BADGE_ICON, USER_UPLOADS_DIR |
||||
from db.model import db, Badge, User |
||||
from utils.utils import random_string_generator |
||||
from utils.auth import auth_required, requires_role |
||||
from constants import UserRole |
||||
|
||||
badge = Blueprint('badge', __name__) |
||||
badge = Blueprint('badge', __name__) |
||||
|
||||
@badge.route('/create', methods=['POST']) |
||||
@auth_required() |
||||
@requires_role([UserRole.ADMIN]) |
||||
def create_badge(): |
||||
""" |
||||
Create a new badge. Only accessible by admin users. |
||||
Requires: badge_name, description |
||||
Optional: icon_image, can_claim |
||||
""" |
||||
try: |
||||
form_data: dict = request.form |
||||
badge_uploaded_icon: MultiDict|None = request.files.get('icon_image', None) |
||||
|
||||
# Validate required fields |
||||
try: |
||||
badge_name: str = form_data['badge_name'] |
||||
if len(badge_name) > 16: |
||||
return jsonify({'message': 'Badge name cannot exceed 16 characters'}), 400 |
||||
except KeyError: |
||||
return jsonify({'message': 'Badge name cannot be empty'}), 400 |
||||
|
||||
# Handle icon upload |
||||
icon_file_name: str = DEFAULT_BADGE_ICON |
||||
if badge_uploaded_icon is not None: |
||||
icon_file_name = f"{random_string_generator(32)}.{badge_uploaded_icon.filename.split('.')[-1]}" |
||||
badge_uploaded_icon.save(os.path.join(USER_UPLOADS_DIR, icon_file_name)) |
||||
|
||||
# Get optional fields |
||||
badge_description: str = form_data.get('description', '') |
||||
can_claim: bool = form_data.get('can_claim', 'false').lower() == 'true' |
||||
|
||||
# Create new badge |
||||
new_badge: Badge = Badge( |
||||
name=badge_name, |
||||
description=badge_description, |
||||
icon=icon_file_name, |
||||
canClaim=can_claim, |
||||
user_badges=[] |
||||
) |
||||
|
||||
db.session.add(new_badge) |
||||
db.session.commit() |
||||
|
||||
return jsonify({ |
||||
'message': 'Badge was created successfully.', |
||||
'badge': { |
||||
'id': str(new_badge.id), |
||||
'name': new_badge.name, |
||||
'description': new_badge.description, |
||||
'icon': new_badge.icon, |
||||
'canClaim': new_badge.canClaim |
||||
} |
||||
}), 200 |
||||
|
||||
except Exception as e: |
||||
db.session.rollback() |
||||
return jsonify({'message': f'An error occurred: {str(e)}'}), 500 |
||||
|
||||
@badge.route('/update/<badge_id>', methods=['PUT']) |
||||
@auth_required() |
||||
@requires_role([UserRole.ADMIN]) |
||||
def update_badge(badge_id): |
||||
""" |
||||
Update an existing badge. Only accessible by admin users. |
||||
""" |
||||
try: |
||||
badge_to_update = db.session.get(Badge, uuid.UUID(badge_id)) |
||||
if not badge_to_update: |
||||
return jsonify({'message': 'Badge not found'}), 404 |
||||
|
||||
form_data: dict = request.form |
||||
badge_uploaded_icon: MultiDict|None = request.files.get('icon_image', None) |
||||
|
||||
# Update icon if provided |
||||
if badge_uploaded_icon is not None: |
||||
new_icon_name = f"{random_string_generator(32)}.{badge_uploaded_icon.filename.split('.')[-1]}" |
||||
badge_uploaded_icon.save(os.path.join(USER_UPLOADS_DIR, new_icon_name)) |
||||
# Remove old icon if it's not the default |
||||
if badge_to_update.icon != DEFAULT_BADGE_ICON: |
||||
try: |
||||
os.remove(os.path.join(USER_UPLOADS_DIR, badge_to_update.icon)) |
||||
except OSError: |
||||
pass # File might not exist |
||||
badge_to_update.icon = new_icon_name |
||||
|
||||
# Update other fields if provided |
||||
if 'badge_name' in form_data: |
||||
if len(form_data['badge_name']) > 16: |
||||
return jsonify({'message': 'Badge name cannot exceed 16 characters'}), 400 |
||||
badge_to_update.name = form_data['badge_name'] |
||||
|
||||
if 'description' in form_data: |
||||
badge_to_update.description = form_data['description'] |
||||
|
||||
if 'can_claim' in form_data: |
||||
badge_to_update.canClaim = form_data['can_claim'].lower() == 'true' |
||||
|
||||
db.session.commit() |
||||
|
||||
return jsonify({ |
||||
'message': 'Badge was updated successfully.', |
||||
'badge': { |
||||
'id': str(badge_to_update.id), |
||||
'name': badge_to_update.name, |
||||
'description': badge_to_update.description, |
||||
'icon': badge_to_update.icon, |
||||
'canClaim': badge_to_update.canClaim |
||||
} |
||||
}), 200 |
||||
|
||||
except Exception as e: |
||||
db.session.rollback() |
||||
return jsonify({'message': f'An error occurred: {str(e)}'}), 500 |
||||
|
||||
@badge.route('/list', methods=['GET']) |
||||
@auth_required() |
||||
def list_badges(): |
||||
""" |
||||
List all badges. Accessible by all authenticated users. |
||||
""" |
||||
try: |
||||
badges = db.session.execute( |
||||
select(Badge) |
||||
.order_by(Badge.createDate.desc()) |
||||
).scalars().all() |
||||
|
||||
return jsonify({ |
||||
'badges': [{ |
||||
'id': str(badge.id), |
||||
'name': badge.name, |
||||
'description': badge.description, |
||||
'icon': badge.icon, |
||||
'canClaim': badge.canClaim, |
||||
'createDate': badge.createDate.isoformat() |
||||
} for badge in badges] |
||||
}), 200 |
||||
|
||||
except Exception as e: |
||||
return jsonify({'message': f'An error occurred: {str(e)}'}), 500 |
@ -1,66 +1,148 @@ |
||||
from flask import Blueprint, request, jsonify |
||||
from uuid import UUID |
||||
from db.model import db, User, Course, Enrollment |
||||
from flask import Blueprint, request, jsonify, g |
||||
from db.model import User, Enrollment, Chat, db |
||||
import uuid |
||||
from sqlalchemy import select, and_, desc |
||||
from datetime import datetime |
||||
|
||||
chat = Blueprint('chat', __name__) |
||||
|
||||
@chat.route('/course/<uuid:course_id>/users', methods=['POST']) |
||||
def get_users_assigned_to_course(course_id: UUID): |
||||
@chat.route('/<uuid:course_id>/messages', methods=['GET']) |
||||
def get_course_messages(course_id: uuid.UUID): |
||||
""" |
||||
Fetch all users assigned to a specific course. |
||||
:param course_id: ID of the course to fetch users for (UUID). |
||||
:return: JSON response with users assigned to the course. |
||||
Fetch chat messages for a specific course with optional pagination. |
||||
Only enrolled users can access the messages. |
||||
|
||||
Query Parameters: |
||||
- before: UUID of the message to get older entries |
||||
- after: UUID of the message to get newer entries |
||||
- limit: Number of messages to return (default: 10) |
||||
""" |
||||
try: |
||||
# Query the course to ensure it exists |
||||
course = Course.query.get(course_id) |
||||
if not course: |
||||
return jsonify({"error": "Course not found."}), 404 |
||||
|
||||
# Get the list of users assigned to the course |
||||
users = User.query.filter(User.enrollments.any(course_id=course_id)).all() |
||||
|
||||
# Prepare the response data |
||||
user_data = [ |
||||
{ |
||||
"id": user.id, |
||||
"email": user.email, |
||||
"username": user.username, |
||||
"firstName": user.firstName, |
||||
"lastName": user.lastName, |
||||
} |
||||
for user in users |
||||
] |
||||
|
||||
return jsonify({"course": course.name, "users": user_data}), 200 |
||||
current_user: User = g.current_user |
||||
limit = int(request.args.get('limit', 10)) |
||||
before_id = request.args.get('before') |
||||
after_id = request.args.get('after') |
||||
|
||||
# Verify user's enrollment |
||||
enrollment = db.session.execute( |
||||
select(Enrollment).where( |
||||
and_( |
||||
Enrollment.courseID == course_id, |
||||
Enrollment.userID == current_user.id |
||||
) |
||||
) |
||||
).scalar() |
||||
|
||||
if not enrollment: |
||||
return jsonify({'message': 'You are not enrolled in this course'}), 401 |
||||
|
||||
# Base query |
||||
query = select(Chat).where(Chat.courseID == course_id) |
||||
|
||||
# Handle pagination |
||||
if before_id: |
||||
try: |
||||
reference_message = db.session.execute( |
||||
select(Chat).where(Chat.id == uuid.UUID(before_id)) |
||||
).scalar() |
||||
|
||||
if not reference_message: |
||||
return jsonify({'message': 'Reference message not found'}), 404 |
||||
|
||||
query = query.where(Chat.chatDate < reference_message.chatDate) |
||||
except ValueError: |
||||
return jsonify({'message': 'Invalid message ID format'}), 400 |
||||
elif after_id: |
||||
try: |
||||
reference_message = db.session.execute( |
||||
select(Chat).where(Chat.id == uuid.UUID(after_id)) |
||||
).scalar() |
||||
|
||||
if not reference_message: |
||||
return jsonify({'message': 'Reference message not found'}), 404 |
||||
|
||||
query = query.where(Chat.chatDate > reference_message.chatDate) |
||||
# For after queries, reverse the order later |
||||
query = query.order_by(Chat.chatDate) |
||||
except ValueError: |
||||
return jsonify({'message': 'Invalid message ID format'}), 400 |
||||
else: |
||||
# Default ordering |
||||
query = query.order_by(desc(Chat.chatDate)) |
||||
|
||||
# Apply limit and execute query |
||||
query = query.limit(limit) |
||||
messages = list(db.session.execute(query).scalars()) |
||||
|
||||
# Reverse the order for 'after' queries to maintain consistency |
||||
if after_id: |
||||
messages.reverse() |
||||
|
||||
# Format messages |
||||
chat_messages = [{ |
||||
'id': str(msg.id), |
||||
'text': msg.textContent, |
||||
'userId': str(msg.userID), |
||||
'username': msg.user.username, |
||||
'timestamp': msg.chatDate.isoformat() |
||||
} for msg in messages] |
||||
|
||||
return jsonify({ |
||||
'messages': chat_messages, |
||||
'count': len(chat_messages), |
||||
'hasMore': len(chat_messages) == limit |
||||
}), 200 |
||||
|
||||
except Exception as e: |
||||
return jsonify({"error": f"An error occurred: {str(e)}"}), 500 |
||||
return jsonify({'message': f'An error occurred: {str(e)}'}), 500 |
||||
|
||||
|
||||
@chat.route('/course/<uuid:course_id>/chat', methods=['POST']) |
||||
def join_chat(course_id: UUID): |
||||
@chat.route('/<uuid:course_id>/send', methods=['POST']) |
||||
def send_message(course_id: uuid.UUID): |
||||
""" |
||||
Allow users to join the chat only if they are enrolled in the course. |
||||
:param course_id: ID of the course (UUID). |
||||
:return: JSON response indicating whether the user can join or not. |
||||
Send a new chat message in a course. |
||||
Only enrolled users can send messages. |
||||
""" |
||||
try: |
||||
# Get the user from the request (assume user_id is passed in the request body) |
||||
user_id = request.json.get('user_id') |
||||
if not user_id: |
||||
return jsonify({"error": "User ID is required."}), 400 |
||||
|
||||
# Query the user and ensure they exist |
||||
user = User.query.get(user_id) |
||||
if not user: |
||||
return jsonify({"error": "User not found."}), 404 |
||||
|
||||
# Check if the user is enrolled in the course |
||||
enrollment = Enrollment.query.filter_by(userID=user_id, courseID=course_id).first() |
||||
current_user: User = g.current_user |
||||
message_text = request.json.get('message') |
||||
|
||||
if not message_text or not message_text.strip(): |
||||
return jsonify({'message': 'Message content is required'}), 400 |
||||
|
||||
# Verify user's enrollment |
||||
enrollment = db.session.execute( |
||||
select(Enrollment).where( |
||||
and_( |
||||
Enrollment.courseID == course_id, |
||||
Enrollment.userID == current_user.id |
||||
) |
||||
) |
||||
).scalar() |
||||
|
||||
if not enrollment: |
||||
return jsonify({"error": "User is not enrolled in this course."}), 403 |
||||
|
||||
# If enrolled, allow the user to join the chat |
||||
return jsonify({"message": f"User {user.username} is enrolled in the course and can join the chat."}), 200 |
||||
return jsonify({'message': 'You are not enrolled in this course'}), 401 |
||||
|
||||
# Create new chat message |
||||
new_message = Chat( |
||||
textContent=message_text, |
||||
userID=current_user.id, |
||||
courseID=course_id, |
||||
chatDate=datetime.now() |
||||
) |
||||
|
||||
db.session.add(new_message) |
||||
|
||||
# Update last activity in enrollment |
||||
enrollment.lastActivity = datetime.now() |
||||
|
||||
db.session.commit() |
||||
|
||||
return jsonify({ |
||||
'message': 'Message sent successfully', |
||||
'messageId': str(new_message.id), |
||||
'timestamp': new_message.chatDate.isoformat() |
||||
}), 201 |
||||
|
||||
except Exception as e: |
||||
return jsonify({"error": f"An error occurred: {str(e)}"}), 500 |
||||
db.session.rollback() |
||||
return jsonify({'message': f'An error occurred: {str(e)}'}), 500 |
@ -1,3 +1,108 @@ |
||||
from flask import Blueprint |
||||
from flask import Blueprint, jsonify, request, g |
||||
from utils.auth import auth_required |
||||
from db.model import User, Notification, db |
||||
from sqlalchemy import select, and_, desc |
||||
from datetime import datetime |
||||
import uuid |
||||
|
||||
notification = Blueprint('notification', __name__) |
||||
notification = Blueprint('notification', __name__) |
||||
|
||||
@notification.route('/get', methods=['GET', 'POST']) |
||||
@auth_required() |
||||
def handle_notifications(): |
||||
""" |
||||
Unified endpoint for handling notifications: |
||||
GET: Fetch notifications with optional before/after pagination |
||||
POST: Mark notifications as seen |
||||
|
||||
Query Parameters for GET: |
||||
- before: UUID of notification to get older entries |
||||
- after: UUID of notification to get newer entries |
||||
- limit: Number of notifications to return (default: 10) |
||||
|
||||
POST Body: |
||||
{ |
||||
"notificationIds": ["uuid1", "uuid2", ...] |
||||
} |
||||
""" |
||||
if request.method == 'GET': |
||||
try: |
||||
current_user: User = g.current_user |
||||
limit: int = int(request.args.get('limit', 10)) |
||||
before_id = request.args.get('before') |
||||
after_id = request.args.get('after') |
||||
|
||||
# Base query |
||||
query = select(Notification).where( |
||||
Notification.userID == current_user.id |
||||
) |
||||
|
||||
# Handle pagination |
||||
if before_id: |
||||
try: |
||||
reference_notification = db.session.execute( |
||||
select(Notification) |
||||
.where(and_( |
||||
Notification.id == uuid.UUID(before_id), |
||||
Notification.userID == current_user.id |
||||
)) |
||||
).scalar() |
||||
|
||||
if not reference_notification: |
||||
return jsonify({'message': 'Reference notification not found'}), 404 |
||||
|
||||
query = query.where( |
||||
Notification.notifDate < reference_notification.notifDate |
||||
) |
||||
except ValueError: |
||||
return jsonify({'message': 'Invalid notification ID format'}), 400 |
||||
|
||||
elif after_id: |
||||
try: |
||||
reference_notification = db.session.execute( |
||||
select(Notification) |
||||
.where(and_( |
||||
Notification.id == uuid.UUID(after_id), |
||||
Notification.userID == current_user.id |
||||
)) |
||||
).scalar() |
||||
|
||||
if not reference_notification: |
||||
return jsonify({'message': 'Reference notification not found'}), 404 |
||||
|
||||
query = query.where( |
||||
Notification.notifDate > reference_notification.notifDate |
||||
) |
||||
# For after queries, we need to reverse the order later |
||||
query = query.order_by(Notification.notifDate) |
||||
except ValueError: |
||||
return jsonify({'message': 'Invalid notification ID format'}), 400 |
||||
else: |
||||
# Default ordering |
||||
query = query.order_by(desc(Notification.notifDate)) |
||||
|
||||
# Apply limit and execute query |
||||
query = query.limit(limit) |
||||
notifications = list(db.session.execute(query).scalars()) |
||||
|
||||
# Reverse the order for 'after' queries to maintain consistency |
||||
if after_id: |
||||
notifications.reverse() |
||||
|
||||
# Format response |
||||
notif_list = [{ |
||||
'id': str(notif.id), |
||||
'type': notif.notificationType, |
||||
'data': notif.notificationData, |
||||
'seen': notif.isSeen, |
||||
'date': notif.notifDate.isoformat() |
||||
} for notif in notifications] |
||||
|
||||
return jsonify({ |
||||
'notifications': notif_list, |
||||
'count': len(notif_list), |
||||
'hasMore': len(notif_list) == limit |
||||
}), 200 |
||||
|
||||
except Exception as e: |
||||
return jsonify({'message': f'An error occurred: {str(e)}'}), 500 |
@ -0,0 +1,71 @@ |
||||
from flask import Blueprint, jsonify |
||||
from db.model import User, Course, db |
||||
from sqlalchemy import select, func |
||||
|
||||
public_summary = Blueprint('public', __name__) |
||||
|
||||
@public_summary.route('/stats/total-users', methods=['GET']) |
||||
def get_total_users(): |
||||
""" |
||||
Fetch total user count. |
||||
""" |
||||
try: |
||||
total_users = db.session.execute( |
||||
select(func.count()).select_from(User) |
||||
).scalar() |
||||
|
||||
return jsonify({'totalUsers': total_users}), 200 |
||||
|
||||
except Exception as e: |
||||
return jsonify({'message': f'An error occurred: {str(e)}'}), 500 |
||||
|
||||
|
||||
@public_summary.route('/stats/total-authors', methods=['GET']) |
||||
def get_total_authors(): |
||||
""" |
||||
Fetch total authors (users who have created courses). |
||||
""" |
||||
try: |
||||
total_authors = db.session.execute( |
||||
select(func.count(func.distinct(Course.authorID))) |
||||
.select_from(Course) |
||||
).scalar() |
||||
|
||||
return jsonify({'totalAuthors': total_authors}), 200 |
||||
|
||||
except Exception as e: |
||||
return jsonify({'message': f'An error occurred: {str(e)}'}), 500 |
||||
|
||||
|
||||
@public_summary.route('/stats/total-courses', methods=['GET']) |
||||
def get_total_courses(): |
||||
""" |
||||
Fetch total course count. |
||||
""" |
||||
try: |
||||
total_courses = db.session.execute( |
||||
select(func.count()).select_from(Course) |
||||
).scalar() |
||||
|
||||
return jsonify({'totalCourses': total_courses}), 200 |
||||
|
||||
except Exception as e: |
||||
return jsonify({'message': f'An error occurred: {str(e)}'}), 500 |
||||
|
||||
|
||||
@public_summary.route('/stats/subscribed-users', methods=['GET']) |
||||
def get_subscribed_users(): |
||||
""" |
||||
Fetch count of users subscribed to the newsletter and are activated. |
||||
""" |
||||
try: |
||||
subscribed_users = db.session.execute( |
||||
select(func.count(User.email)) |
||||
.select_from(User) |
||||
.where(User.isActivated == True) |
||||
).scalar() |
||||
|
||||
return jsonify({'subscribedNewsletter': subscribed_users}), 200 |
||||
|
||||
except Exception as e: |
||||
return jsonify({'message': f'An error occurred: {str(e)}'}), 500 |
@ -0,0 +1,52 @@ |
||||
from flask import Blueprint |
||||
from flask import Blueprint, request, jsonify, current_app, g |
||||
from db.model import User, Category, Quiz, Course, Enrollment, QuizAttempt |
||||
from db.model import db |
||||
import uuid |
||||
from sqlalchemy import select, and_ |
||||
|
||||
quiz = Blueprint('quiz', __name__) |
||||
|
||||
|
||||
@quiz.route('/create', methods=['POST']) |
||||
def create_quiz(): |
||||
data: dict = request.form |
||||
creator_user: User = g.current_user |
||||
course_id: uuid.UUID = uuid.UUID(data.get('courseID')) |
||||
enrollment_data: Enrollment = db.session.execute(select(Enrollment).where(and_(Enrollment.courseID == course_id, Enrollment.userID == creator_user.id))).scalar() |
||||
if not enrollment_data: |
||||
return jsonify({'message': 'You are not enrolled in this class'}), 401 |
||||
|
||||
get_quiz_data :str = '{"questions":[{"question":"What is the capital of France?","options":["Paris","London","Berlin","Madrid"],"answer":"Paris"},{"question":"What is 2 + 2?","options":["3","4","5","6"],"answer":"4"}]}' |
||||
new_quiz = Quiz( |
||||
creatorUserID = creator_user.id, |
||||
courseID = course_id, |
||||
quizJson = get_quiz_data, |
||||
quiz_attempts = [] |
||||
) |
||||
db.session.add_all(new_quiz) |
||||
db.session.commit() |
||||
return jsonify({'message': 'Course was created successfully.'}), 200 |
||||
|
||||
@quiz.route('/attempt', methods=['POST']) |
||||
def attempt_quiz(): |
||||
data: dict = request.form |
||||
attempt_user: User = g.current_user |
||||
quiz_id: uuid.UUID = uuid.UUID(data.get('quizID')) |
||||
course_data: Enrollment = db.session.execute(select(Enrollment).where(and_(Enrollment.courseID == course_data, Enrollment.userID == attempt_user.id))).scalar() |
||||
if not course_data: |
||||
return jsonify({'message': 'You are not enrolled in this class'}), 401 |
||||
|
||||
answerKey: str = '{"questions":[{"question":"What is the capital of France?","options":["Paris","London","Berlin","Madrid"],"answer":"Paris"},{"question":"What is 2 + 2?","options":["3","4","5","6"],"answer":"4"}]}' |
||||
#function to calculate Score |
||||
score_value = 0 |
||||
|
||||
new_attempt = QuizAttempt( |
||||
userID = attempt_user, |
||||
quizID = quiz_id, |
||||
answerKey = answerKey, |
||||
score = score_value |
||||
) |
||||
db.session.add_all(new_attempt) |
||||
db.session.commit() |
||||
return jsonify({'message': 'Quiz was appended sucessfully'}), 200 |
@ -0,0 +1,29 @@ |
||||
import os |
||||
from PyPDF2 import PdfReader, PdfWriter |
||||
|
||||
PATH = 'pdfcontent' |
||||
if not os.path.exists(PATH): |
||||
os.makedirs(PATH) |
||||
|
||||
def getPdfFile(filepath): |
||||
with open(filepath, 'rb') as pdf_file: |
||||
reader = PdfReader(pdf_file) |
||||
for page_num in range(len(reader.pages)): |
||||
page = reader.pages[page_num] |
||||
text = page.extract_text() |
||||
|
||||
# Save as text file |
||||
if text: # Ensure there's text on the page before saving |
||||
filename = os.path.splitext(os.path.basename(filepath))[0] |
||||
output_txt_filename = os.path.join(PATH, f"{filename}_page{page_num + 1}.txt") |
||||
with open(output_txt_filename, 'w', encoding='utf-8') as output_file: |
||||
output_file.write(text) |
||||
print(f"Page {page_num + 1} extracted and saved as {output_txt_filename}") |
||||
|
||||
# Save as PDF file |
||||
writer = PdfWriter() |
||||
writer.add_page(page) |
||||
output_pdf_filename = os.path.join(PATH, f"{filename}_page{page_num + 1}.pdf") |
||||
with open(output_pdf_filename, 'wb') as output_pdf_file: |
||||
writer.write(output_pdf_file) |
||||
print(f"Page {page_num + 1} extracted and saved as {output_pdf_filename}") |
Loading…
Reference in new issue