from flask import Blueprint, request, jsonify, g, url_for from sqlalchemy import select, and_, func, distinct, or_ from sqlalchemy.exc import IntegrityError from werkzeug.datastructures import MultiDict import os import uuid import math from config import DEFAULT_COURSE_COVER from db.model import db, Course, Category, User, Chat, Enrollment from utils.utils import random_string_generator, split_pdf_into_pages_with_text from utils.auth import auth_required, requires_role from constants import * from config import * from constants import PublishedStatus from typing import Union from db.model import UserRole course = Blueprint('course', __name__) @course.route('/listAll') def list_all_courses(): limit: int = int(request.args.get('limit', 10)) offset: int = int(request.args.get('offset', 0)) category_uuid: str = request.args.get('category_uuid') search_q: str = request.args.get('search_q', '').strip() sort_by: str = request.args.get('sort_by', '').strip() show_pending: bool = bool(int(request.args.get('show_pending', 0))) available_sorts = ['date_asc', 'date_desc', 'name_asc', 'name_desc', 'students_desc', 'students_asc'] if category_uuid is not None: category_uuid: uuid.UUID = uuid.UUID(request.args.get('category_uuid')) # Build the query as required query: select = select(Course) if search_q != '': query = query.where(or_(Course.name.like(f'%{search_q}%'), Course.description.like(f'%{search_q}%'), User.firstName.like(f'%{search_q}%'))) if category_uuid is not None: query = query.where(Course.categoryID == category_uuid) if g.get('is_authed'): if show_pending and g.current_user.role == int(UserRole.ADMIN): query = query.where(Course.publishedStatus == int(PublishedStatus.PENDING)) else: query = query.where(Course.publishedStatus == int(PublishedStatus.APPROVED)) #total_pages_for_offset: int = db.session.execute(func.count(Course.id).select_from(Course)).scalar()/limit total_pages_for_offset: int = db.session.execute( select(func.count()).select_from(query.subquery()) ).scalar() / limit query = query.limit(limit).offset(offset) total_pages: int = math.ceil(total_pages_for_offset) if sort_by in available_sorts: if sort_by == 'date_asc': query = query.order_by(Course.creationDate.asc()) elif sort_by == 'date_desc': query = query.order_by(Course.creationDate.desc()) elif sort_by == 'name_asc': query = query.order_by(Course.name.asc()) elif sort_by == 'name_desc': query = query.order_by(Course.name.desc()) elif sort_by == 'students_desc': query = query.order_by(Course.totalEnrolled.desc()) elif sort_by == 'students_asc': query = query.order_by(Course.totalEnrolled.asc()) courses: list[Course] = db.session.execute(query).scalars() course_list: list[dict] = [] for item in courses: course_list.append( { 'id': item.id, 'name': item.name, 'description': item.description, 'isActive': item.isActive, 'creationDate': item.creationDate, 'coverImage': url_for('send_file', filename=item.coverImage), 'totalEnrolled': item.totalEnrolled, 'author': { 'id': item.author.id, 'firstName': item.author.firstName, 'lastName': item.author.lastName, 'username': item.author.username, 'bio': item.author.bio, 'lastOnline': item.author.lastOnline, 'pfpFilename': url_for('send_file', filename=item.author.pfpFilename) }, 'category': { 'id': item.categoryID, 'name': item.category.name, 'description': item.category.description } }) return jsonify({ 'total_pages': total_pages, 'current_offset': offset, 'limit': limit, 'data': course_list, }) @course.route('/enroll') @auth_required() def enroll_user(): if not request.form.get('course_uuid'): return jsonify({'message': 'Missing required parameter "course_uuid" '}), 400 course_uuid: uuid.UUID = uuid.UUID(request.form.get('course_uuid')) selected_course: Course = db.session.execute(select(Course).where(Course.id == course_uuid)).scalar() if not selected_course: return jsonify({'message': 'Course not found'}), 404 new_enroll: Enrollment = Enrollment( userID=g.current_user.id, courseID=course_uuid ) try: selected_course.totalEnrolled = selected_course.totalEnrolled + 1 db.session.add(new_enroll) db.session.commit() except IntegrityError: return jsonify({'message': 'Already enrolled to this course'}) return jsonify({'message': 'Enrollment successful'}), 200 @course.route('/createCourse', methods=['POST']) @auth_required() def create_course(): form_data: dict = request.form course_uploaded_cover_image: MultiDict|None = request.files.get('cover_image', None) course_uploaded_pdf: MultiDict|None = request.files.get('course_pdf', None) cover_file_name: str = DEFAULT_COURSE_COVER pdf_file_name: str = '' pdf_total_pages: int = 1 if course_uploaded_cover_image is not None: cover_file_name: str = random_string_generator(32)+"."+course_uploaded_cover_image.filename.split('.')[-1] course_uploaded_cover_image.save(os.path.join(USER_UPLOADS_DIR, cover_file_name)) if course_uploaded_pdf is not None: pdf_file_name: str = random_string_generator(32) +"."+ course_uploaded_pdf.filename.split('.')[-1] course_uploaded_pdf.save(os.path.join(USER_UPLOADS_DIR, pdf_file_name)) pdf_parts_root_dir = os.path.join(USER_UPLOADS_DIR, pdf_file_name + "_parts") os.makedirs(pdf_parts_root_dir, exist_ok=True) pdf_total_pages = split_pdf_into_pages_with_text( pdf_path=os.path.join(USER_UPLOADS_DIR, pdf_file_name), output_directory=pdf_parts_root_dir ) published_status: PublishedStatus = PublishedStatus.PENDING try: course_name: str = form_data['course_name'] except KeyError: return jsonify({'message': 'Course name cannot be empty'}), 401 course_description: str = form_data.get('course_description', '') category_id: uuid.UUID = uuid.UUID(form_data['category_uuid']) page_for_community: int = int(form_data.get('page_for_community', 1)) # TODO: Add this field to model category: Category = db.session.execute(select(Category).where(Category.id == category_id)).scalar() # author: User = db.session.execute(select(User).where(User.id == g.current_user.id)).scalar() new_course: Course = Course( name=course_name, categoryID=category_id, authorID=g.current_user.id, category=category, author=g.current_user, description=course_description, isActive=True, pageForCommunity=page_for_community, publishedStatus=int(published_status), coverImage=cover_file_name, serverFilename=pdf_file_name, totalPages=pdf_total_pages, enrollments=[], quizzes=[], chats=[] ) # chat: Chat = Chat(courseID=new_course.id) TODO: Add a welcome chat for this course db.session.add(new_course) db.session.commit() return jsonify({'message': 'Course was created successfully.'}), 200 @course.route('/update', methods=['UPDATE', 'DELETE']) @auth_required() def update_course(): form_data = request.form course_id: uuid.UUID = uuid.UUID(form_data['course_id']) selected_course: Course|None = None if g.current_user.role == int(UserRole.ADMIN): selected_course: Course = db.session.execute(select(Course).where(and_( Course.id == course_id ))).scalar() else: selected_course: Course = db.session.execute(select(Course).where(and_( Course.id == course_id, Course.publishedStatus != int(PublishedStatus.BANNED) ))).scalar() if not selected_course: return jsonify({'message': 'The course could not be found'}), 404 if request.method == 'DELETE': if selected_course.authorID == g.current_user.id or g.current_user.role == int(UserRole.ADMIN): db.session.delete(selected_course) db.session.commit() return jsonify({'message': 'Course was deleted successfully'}), 200 else: return jsonify({'message': 'Unauthorized for this change'}), 401 else: # Update the data if selected_course.authorID == g.current_user.id or g.current_user.role == int(UserRole.ADMIN): if form_data.get('course_name'): selected_course.name = form_data.get('course_name') if form_data.get('course_description'): selected_course.description = form_data.get('course_description') if form_data.get('category_uuid'): selected_course.categoryID = uuid.UUID(form_data.get('category_uuid')) if form_data.get('isActive'): selected_course.isActive = bool(int(form_data.get('active'))) # Admin Guarded if form_data.get('published_status'): if g.current_user.role != int(UserRole.ADMIN): return jsonify({'message': 'Unauthorized'}), 401 valid_states: list[int] = [ int(e) for e in [PublishedStatus.APPROVED, PublishedStatus.PENDING, PublishedStatus.DECLINED, PublishedStatus.REVOKED, PublishedStatus.BANNED, PublishedStatus.DRAFT] ] if int(form_data.get('published_status')) not in valid_states: return jsonify({'message': 'Invalid state to update'}), 401 selected_course.publishedStatus = int(form_data.get('published_status')) if request.files.get('cover_image'): cover_file_name: str = random_string_generator(32) + request.files.get('cover_image').filename.split('.')[-1] request.files.get('cover_image').save(os.path.join(USER_UPLOADS_DIR, cover_file_name)) selected_course.coverImage = cover_file_name if request.files.get('course_pdf'): pdf_file_name: str = random_string_generator(32) + request.files.get('course_pdf').filename.split('.')[1] request.files.get('course_pdf').save(os.path.join(USER_UPLOADS_DIR, pdf_file_name)) selected_course.serverFilename = pdf_file_name if g.current_user.role != int(UserRole.ADMIN): selected_course.publishedStatus = int(PublishedStatus.PENDING) db.session.commit() return jsonify({'message': 'Course info updated'}), 200 else: return jsonify({'message': 'Unauthorized for this change'}), 401 @course.route('/info/') def course_info(course_uuid): course_uuid: uuid.UUID = uuid.UUID(course_uuid) selected_course: Course = db.session.execute(select(Course).where(and_(Course.id == course_uuid))).scalar() if not selected_course: return jsonify({'message': 'The course does not exist'}), 404 # Only allow owner or admin to query info for course that is not published or is pending if not selected_course.isActive or selected_course.publishedStatus != int(PublishedStatus.APPROVED): if g.get("is_authed"): if g.current_user.role == int(UserRole.ADMIN) or g.current_user.id == selected_course.authorID: pass else: return jsonify({'message': 'The course does not exist.'}), 404 self_enrollment_record: Union[None, Enrollment] = None self_enrollment_data: dict = {} if g.get("is_authed"): self_enrollment_record: Enrollment = db.session.execute( select(Enrollment).where( and_( Enrollment.courseID == selected_course.id, Enrollment.userID == g.current_user.id ) ) ) if self_enrollment_record: self_enrollment_data = { 'lastActivity': self_enrollment_record.lastActivity, 'currentPage': self_enrollment_record.currentPage, 'maxPage': self_enrollment_record.maxPage, 'joinedDate': self_enrollment_record.joinedDate, 'userID': self_enrollment_record.userID } # Get total enrolled user and total unique user chatting about the course and put it in dict summary_user: dict = { 'totalEnrolled': db.session.execute( select(func.count(Enrollment.id)).where(Enrollment.courseID == course_uuid) ).scalar(), 'usersInChat': db.session.execute( select(func.count(distinct(Chat.userID))).select_from(Chat).where(Chat.courseID == course_uuid) ).scalar(), 'totalChats': db.session.execute( select(func.count()).select_from(Chat).where(Chat.courseID == course_uuid) ).scalar() } pages: list = [] if self_enrollment_record: for i in range(selected_course.totalPages): pages.append( url_for('get_pdf_file_as_pages', filename=selected_course.serverFilename, page=i + 1, dtype='pdf') ) else: if selected_course.totalPages < 3: pages.append( url_for('get_pdf_file_as_pages', filename=selected_course.serverFilename, page=1, dtype='pdf') ) else: for i in range(3): pages.append( url_for('get_pdf_file_as_pages', filename=selected_course.serverFilename, page=i+1, dtype='pdf') ) return jsonify({ 'message': 'successful', 'data': { 'id': selected_course.id, 'courseName': selected_course.name, 'courseDescription': selected_course.description, 'isActive': selected_course.isActive, 'publishedStatus': selected_course.publishedStatus, 'creationDate': selected_course.creationDate, # TODO: Format to particular structure 'coverImage': url_for('send_file', filename=selected_course.coverImage), 'serverFilename': url_for('send_file', filename=selected_course.serverFilename), 'totalPages': 100, 'author': { 'id': selected_course.authorID, 'username': selected_course.author.username, 'firstName': selected_course.author.firstName, 'lastName': selected_course.author.lastName, 'pfpFilename': url_for('send_file', filename=selected_course.author.pfpFilename), 'bio': selected_course.author.bio }, 'selfEnrollment': { 'isEnrolled': self_enrollment_record is not None, 'data': self_enrollment_data }, 'enrollmentSummary': summary_user } }), 200 @course.route('/getCategories', methods=['GET']) def get_categories(): categories: list[Category] = db.session.execute(select(Category)).scalars() cat_list: list[dict] = [] for category in categories: cat_list.append( { 'id': category.id, 'name': category.name, 'description': category.description, 'isActive': category.isActive, 'creationDate': category.creationDate } ) return jsonify(cat_list), 200 @course.route('/createCategory', methods=['Post']) @auth_required() @requires_role([UserRole.ADMIN]) def create_category(): try: new_cat: Category = Category( name=request.form['name'], description=request.form.get('description'), isActive=bool(int(request.form.get('isActive'))), courses=[] ) except KeyError: return jsonify({'message': 'Missing required parameter "name" '}), 400 db.session.add(new_cat) db.session.commit() return jsonify({'message': 'Category created'}), 201 @course.route('/updateCategory', methods=['POST', 'DELETE']) @auth_required() @requires_role([UserRole.ADMIN]) def update_category(): form_data: dict = request.form try: category_id: uuid.UUID = uuid.UUID(form_data['category_id']) except KeyError: return jsonify({'message': 'Missing required parameter "category_id" '}), 400 selected_category: Category = db.session.execute(select(Category).where(Category.id == category_id)).scalar() if not selected_category: return jsonify({'message': 'Category not found'}), 404 if request.method == 'DELETE': db.session.delete(selected_category) db.session.commit() return jsonify({'message': 'Category deleted'}), 200 else: if form_data.get('name'): selected_category.name = form_data.get('name') if form_data.get('description'): selected_category.description = form_data.get('description') if form_data.get('isActive'): selected_category.isActive = bool(int(form_data.get('isActive'))) db.session.commit() return jsonify({'message': 'Category updated'}), 200 @course.route('/enrolled') @auth_required() def enrolled_courses(): enrollments: Course = db.session.execute( select(Enrollment).where( and_(Enrollment.userID == g.current_user.id, Course.publishedStatus == int(PublishedStatus.APPROVED)) ) ).scalars() enrolled_list: list[dict] = [] for enroll_row in enrollments: item = enroll_row.course enrolled_list.append( { 'id': enroll_row.id, 'course_id': enroll_row.courseID, 'lastActivity': enroll_row.lastActivity, 'currentPage': enroll_row.currentPage, 'maxPage': enroll_row.maxPage, 'joinedDate': enroll_row.joinedDate, 'userID': enroll_row.userID, 'name': item.name, 'description': item.description, 'isActive': item.isActive, 'creationDate': item.creationDate, 'coverImage': url_for('send_file', filename=item.coverImage), 'totalEnrolled': item.totalEnrolled, 'author': { 'id': item.author.id, 'firstName': item.author.firstName, 'lastName': item.author.lastName, 'username': item.author.username, 'bio': item.author.bio, 'lastOnline': item.author.lastOnline, 'pfpFilename': url_for('send_file', filename=item.author.pfpFilename) }, 'category': { 'id': item.categoryID, 'name': item.category.name, 'description': item.category.description } }) return jsonify(enrolled_list), 200 @course.route('/myCourses') @auth_required() def my_courses(): courses: Course = db.session.execute(select(Course).where(Course.authorID == g.current_user.id) ).scalars() course_list: list[dict] = [] for item in courses: course_list.append( { 'id': item.id, 'name': item.name, 'description': item.description, 'isActive': item.isActive, 'publishedStatus': item.publishedStatus, 'creationDate': item.creationDate, 'coverImage': url_for('send_file', filename=item.coverImage), 'totalEnrolled': item.totalEnrolled, 'author': { 'id': item.author.id, 'firstName': item.author.firstName, 'lastName': item.author.lastName, 'username': item.author.username, 'bio': item.author.bio, 'lastOnline': item.author.lastOnline, 'pfpFilename': url_for('send_file', filename=item.author.pfpFilename) }, 'category': { 'id': item.categoryID, 'name': item.category.name, 'description': item.category.description } }) return jsonify(course_list), 200