Implement quiz API and badge API

main
Casu Al Snek 6 months ago
parent 716e5cffb4
commit 607902fd48
  1. 13
      backend/app.py
  2. 49
      backend/blueprints/badge/__init__.py
  3. 43
      backend/blueprints/course/__init__.py
  4. 226
      backend/blueprints/quiz/__init__.py
  5. 1
      backend/config.py
  6. 9
      backend/db/model.py
  7. 37
      backend/utils/utils.py

@ -20,6 +20,8 @@ from blueprints.admin import admin as adminBlueprint
from blueprints.chat import chat as chatBlueprint from blueprints.chat import chat as chatBlueprint
from blueprints.public import public_summary as publicBlueprint from blueprints.public import public_summary as publicBlueprint
from blueprints.course import course as courseBlueprint from blueprints.course import course as courseBlueprint
from blueprints.badge import badge_route as badgeBlueprint
from blueprints.quiz import quiz as quizBlueprint
app = Flask(__name__) app = Flask(__name__)
@ -39,7 +41,11 @@ db.init_app(app)
app.register_blueprint(profileBlueprint, url_prefix='/api/profile') app.register_blueprint(profileBlueprint, url_prefix='/api/profile')
app.register_blueprint(sessionBlueprint,url_prefix='/api/session') app.register_blueprint(sessionBlueprint,url_prefix='/api/session')
app.register_blueprint(adminBlueprint,url_prefix='/api/admin') app.register_blueprint(adminBlueprint,url_prefix='/api/admin')
app.register_blueprint(badgeBlueprint, url_prefix='/api/badge')
# TODO: Register Notif API
app.register_blueprint(chatBlueprint,url_prefix='/api/chat') app.register_blueprint(chatBlueprint,url_prefix='/api/chat')
app.register_blueprint(quizBlueprint,url_prefix='/api/quiz')
app.register_blueprint(publicBlueprint,url_prefix='/api/public') app.register_blueprint(publicBlueprint,url_prefix='/api/public')
app.register_blueprint(courseBlueprint,url_prefix='/api/course') app.register_blueprint(courseBlueprint,url_prefix='/api/course')
@ -47,6 +53,13 @@ app.register_blueprint(courseBlueprint,url_prefix='/api/course')
def send_file(filename): def send_file(filename):
return send_from_directory(USER_UPLOADS_DIR, filename) return send_from_directory(USER_UPLOADS_DIR, filename)
@app.route('/courseSegment/<string:filename/<int:page>/<string:dtype>>')
def get_pdf_file_as_pages(filename: str, page: int, dtype: str):
if dtype == 'txt':
return send_from_directory(os.path.join(USER_UPLOADS_DIR, filename+'_parts'), f"{page}.txt")
else:
return send_from_directory(os.path.join(USER_UPLOADS_DIR, filename+'_parts'), f"{page}.pdf")
@app.route('/', methods=['GET', 'POST']) @app.route('/', methods=['GET', 'POST'])
def homepage(): def homepage():
return {'message': 'Welcome back !'}, 200 return {'message': 'Welcome back !'}, 200

@ -1,3 +1,48 @@
from flask import Blueprint from flask import Blueprint, url_for, jsonify, g
from utils.auth import auth_required
from db.model import db, Badge, UserBadge
from sqlalchemy import select
badge_route = Blueprint('badge', __name__)
badge = Blueprint('badge', __name__) @badge_route.route('/listAllBadges')
def all_badges():
badges: list[Badge] = db.session.execute(select(Badge)).scalars()
data: list = []
for bgd in badges:
data.append({
'id': bgd.id,
'name': bgd.name,
'description': bgd.description,
'createDate': bgd.createDate,
'icon': url_for('send_file', filename=bgd.icon)
'canClaim': bgd.canClaim
})
return jsonify({
'count': len(data),
'data': data
})
@badge_route.route('/myBadges')
@auth_required()
def my_badges():
user_badges: list[UserBadge] = db.session.execute(select(UserBadge).where(
UserBadge.userID == g.current_user.id
)).scalars()
data: list = []
for ub in user_badges:
bgd = ub.badge
data.append({
'id': ub.id,
'badgeID': bgd.id,
'userID': ub.userID,
'name': bgd.name,
'description': bgd.description,
'createDate': bgd.createDate,
'icon': url_for('send_file', filename=bgd.icon)
'canClaim': bgd.canClaim,
'claimedDate': ub.claimedDate,
})
return jsonify({
'count': len(data),
'data': data
})

@ -7,7 +7,7 @@ import uuid
import math import math
from config import DEFAULT_COURSE_COVER from config import DEFAULT_COURSE_COVER
from db.model import db, Course, Category, User, Chat, Enrollment from db.model import db, Course, Category, User, Chat, Enrollment
from utils.utils import random_string_generator from ...utils.utils import random_string_generator, split_pdf_into_pages_with_text
from utils.auth import auth_required, requires_role from utils.auth import auth_required, requires_role
from constants import * from constants import *
from config import * from config import *
@ -123,12 +123,21 @@ def create_course():
course_uploaded_pdf: MultiDict|None = request.files.get('course_pdf', None) course_uploaded_pdf: MultiDict|None = request.files.get('course_pdf', None)
cover_file_name: str = DEFAULT_COURSE_COVER cover_file_name: str = DEFAULT_COURSE_COVER
pdf_file_name: str = '' pdf_file_name: str = ''
pdf_total_pages: int = 1
if course_uploaded_cover_image is not None: if course_uploaded_cover_image is not None:
cover_file_name: str = random_string_generator(32)+"."+course_uploaded_cover_image.filename.split('.')[-1] cover_file_name: str = random_string_generator(32)+"."+course_uploaded_cover_image.filename.split('.')[-1]
course_uploaded_cover_image.save(os.path.join(USER_UPLOADS_DIR, cover_file_name)) course_uploaded_cover_image.save(os.path.join(USER_UPLOADS_DIR, cover_file_name))
if course_uploaded_pdf is not None: if course_uploaded_pdf is not None:
pdf_file_name: str = random_string_generator(32) +"."+ course_uploaded_pdf.filename.split('.')[-1] pdf_file_name: str = random_string_generator(32) +"."+ course_uploaded_pdf.filename.split('.')[-1]
course_uploaded_pdf.save(os.path.join(USER_UPLOADS_DIR, pdf_file_name)) course_uploaded_pdf.save(os.path.join(USER_UPLOADS_DIR, pdf_file_name))
pdf_parts_root_dir = os.path.join(USER_UPLOADS_DIR, pdf_file_name + "_parts")
os.makedirs(pdf_parts_root_dir, exist_ok=True)
pdf_total_pages = split_pdf_into_pages_with_text(
pdf_path=os.path.join(USER_UPLOADS_DIR, pdf_file_name),
output_directory=pdf_parts_root_dir
)
published_status: PublishedStatus = PublishedStatus.PENDING published_status: PublishedStatus = PublishedStatus.PENDING
try: try:
course_name: str = form_data['course_name'] course_name: str = form_data['course_name']
@ -137,13 +146,13 @@ def create_course():
course_description: str = form_data.get('course_description', '') course_description: str = form_data.get('course_description', '')
category_id: uuid.UUID = uuid.UUID(form_data['category_uuid']) category_id: uuid.UUID = uuid.UUID(form_data['category_uuid'])
page_for_community: int = int(form_data.get('page_for_community', 1)) # TODO: Add this field to model page_for_community: int = int(form_data.get('page_for_community', 1)) # TODO: Add this field to model
catgory: Category = db.session.execute(select(Category).where(Category.id == category_id)).scalar() category: Category = db.session.execute(select(Category).where(Category.id == category_id)).scalar()
# author: User = db.session.execute(select(User).where(User.id == g.current_user.id)).scalar() # author: User = db.session.execute(select(User).where(User.id == g.current_user.id)).scalar()
new_course: Course = Course( new_course: Course = Course(
name=course_name, name=course_name,
categoryID=category_id, categoryID=category_id,
authorID=g.current_user.id, authorID=g.current_user.id,
category=catgory, category=category,
author=g.current_user, author=g.current_user,
description=course_description, description=course_description,
isActive=True, isActive=True,
@ -151,6 +160,7 @@ def create_course():
publishedStatus=int(published_status), publishedStatus=int(published_status),
coverImage=cover_file_name, coverImage=cover_file_name,
serverFilename=pdf_file_name, serverFilename=pdf_file_name,
totalPages=pdf_total_pages,
enrollments=[], enrollments=[],
quizzes=[], quizzes=[],
chats=[] chats=[]
@ -272,7 +282,32 @@ def course_info(course_uuid):
select(func.count()).select_from(Chat).where(Chat.courseID == course_uuid) select(func.count()).select_from(Chat).where(Chat.courseID == course_uuid)
).scalar() ).scalar()
} }
jsonify({ pages: list = []
if self_enrollment_record:
for i in range(selected_course.totalPages):
pages.append(
url_for('get_pdf_file_as_pages',
filename=selected_course.serverFilename,
page=i + 1,
dtype='pdf')
)
else:
if selected_course.totalPages < 3:
pages.append(
url_for('get_pdf_file_as_pages',
filename=selected_course.serverFilename,
page=1,
dtype='pdf')
)
else:
for i in range(3):
pages.append(
url_for('get_pdf_file_as_pages',
filename=selected_course.serverFilename,
page=i+1,
dtype='pdf')
)
return jsonify({
'message': 'successful', 'message': 'successful',
'data': { 'data': {
'id': selected_course.id, 'id': selected_course.id,

@ -0,0 +1,226 @@
import json
import os
import uuid
import requests
from flask import Blueprint, request, jsonify, g, url_for
from uuid import UUID
from ...db.model import db, User, Course, Enrollment,Chat, Quiz, QuizAttempt
from utils.auth import auth_required
import requests
from config import SPAM_SCORE_THRESHOLD, AI_SPAM_SERVICES_MICROSERVICE, USER_UPLOADS_DIR, AI_QUIZ_SERVICES_MICROSERVICE
from sqlalchemy import desc, select, and_
quiz = Blueprint('chat', __name__)
@quiz.route('/generate')
@auth_required()
def generate_quiz():
try:
course_id: uuid.UUID = uuid.UUID(request.form['course_id'])
current_page: int = int(request.form['page'])
except KeyError:
return jsonify({'message': 'course_id and page must be specified'}), 401
enrollment_record: Enrollment = db.session.execute(
select(Enrollment).where(and_(
Enrollment.courseID == course_id,
Enrollment.userID == g.current_user.id)
)
).scalar()
if not enrollment_record:
return jsonify({"error": "You are not enrolled in this course."}), 403
if current_page > enrollment_record.course.totalPages or current_page < 1:
return jsonify({
'message': 'Page range out of bound. No such page'
}), 404
# Everything is alright, now get the text in current page and generate quiz
current_page_text: str = ''
with open(
os.path.join(
USER_UPLOADS_DIR,
enrollment_record.course.serverFilename+"_parts",
f"{current_page}.txt")
) as f:
current_page_text = f.read()
quiz_data_resp = requests.post(AI_QUIZ_SERVICES_MICROSERVICE, json={"string_message": current_page_text})
if quiz_data_resp.status_code != 200:
return jsonify({"error": "Failed to make quiz request."}), 500
quiz_data = quiz_data_resp.json()
# Insert the quiz into table
rows: list[Quiz] = []
for quiz_item in quiz_data['questions']:
rows.append(
Quiz(
creatorUserID=g.current_user.id,
creatorUser=g.current_user,
quiz_attempts=[],
courseID=course_id,
course=enrollment_record.course,
quizQuestion=quiz_item['question'],
quizAnswers=json.dumps(quiz_item['options']),
quizCorrectAnswer=quiz_item['correct_answer']
)
)
db.session.add_all(rows)
db.session.commit()
return jsonify({'message': 'quizzes were generated for the current page'})
@quiz.route('/get/personalIncomplete')
@auth_required()
def get_incomplete_quiz():
try:
course_id: uuid.UUID = uuid.UUID(request.args['course_id'])
except KeyError:
return jsonify({'message': 'course_id must be specified'}), 401
quiz_rows: list[Quiz] = db.session.execute(select(Quiz).where(
and_(Quiz.creatorUserID == g.current_user.id, Quiz.creatorHasAttempted == False)
)).scalars()
data: list = []
for quiz_row in quiz_rows:
data.append(
{
'id': quiz_row.id,
'isActive': quiz_row.isActive,
'creationDate': quiz_row.creationDate,
'quizAnswers': quiz_row.quizAnswers,
'quizQuestion': quiz_row.quizQuestion,
'course': {
'id': quiz_row.course.id,
'name': quiz_row.course.name,
'description': quiz_row.course.description
},
'creator': {
'id': quiz_row.creatorUserID,
'firstName': quiz_row.creatorUser.firstName,
'lastName': quiz_row.creatorUser.lastName,
'username': quiz_row.creatorUser.username,
'pfpFilename': url_for('send_file', filename=quiz_row.creatorUser.pfpFilename)
}
}
)
return jsonify({
'count': len(data),
'data': data
}), 200
@quiz.route('/get/allComplete')
@auth_required()
def get_incomplete_quiz():
try:
course_id: uuid.UUID = uuid.UUID(request.args['course_id'])
except KeyError:
return jsonify({'message': 'course_id must be specified'}), 401
quiz_attempts: list[QuizAttempt] = db.session.execute(
select(QuizAttempt).where(and_(
QuizAttempt.userID == g.current_user.id,
Course.id == course_id
))).scalars() # IF THIS DOES NOT WORK, ADD COURSE IF TO QUIZ_ATTEMPT TABLE ITSELF
completes: list = []
for attempt in quiz_attempts:
quiz_row: Quiz = attempt.quiz
completes.append(
{
'id': attempt.id,
'quizID': quiz_row.id,
'isActive': quiz_row.isActive,
'creationDate': quiz_row.creationDate,
'quizAnswers': quiz_row.quizAnswers,
'quizQuestion': quiz_row.quizQuestion,
'userAnswer': attempt.userAnswer,
'quizCorrectAnswer': quiz_row.quizCorrectAnswer,
'isCorrect': attempt.isCorrect,
'course': {
'id': quiz_row.course.id,
'name': quiz_row.course.name,
'description': quiz_row.course.description
},
'creator': {
'id': quiz_row.creatorUserID,
'firstName': quiz_row.creatorUser.firstName,
'lastName': quiz_row.creatorUser.lastName,
'username': quiz_row.creatorUser.username,
'pfpFilename': url_for('send_file', filename=quiz_row.creatorUser.pfpFilename)
}
}
)
return jsonify({
'count': len(completes),
'data': completes
}), 200
@quiz.route('/submit')
@auth_required()
def get_incomplete_quiz():
try:
answer: str = request.form['answer'].strip()
quiz_id: uuid.UUID = uuid.UUID(request.form['course_id'])
except KeyError:
return jsonify({'message': 'course_id and answer must be specified'}), 401
quiz_already_attempted: QuizAttempt = db.session.execute(select(QuizAttempt).where(
and_(QuizAttempt.quizID == quiz_id, QuizAttempt.userID == g.current_user.id )
)).scalar()
if quiz_already_attempted:
return jsonify({'message': 'Already attempted this quiz'}), 401
quiz_row: Quiz = db.session.execute(select(Quiz).where(Quiz.id == quiz_id)).scalar()
if not quiz_row:
return jsonify({'message': 'Quiz does not exist'}), 404
valid_answers: list = json.loads(quiz_row.quizAnswers)
is_correct: bool = False
if answer not in valid_answers:
return jsonify({'message': 'No such choice of answer given'}), 404
if answer == quiz_row.quizCorrectAnswer:
is_correct = True
new_attempt: QuizAttempt = QuizAttempt(
userID=g.current_user.id,
user=g.current_user,
quizID=quiz_id,
quiz=quiz_row,
userAnswer=answer,
isCorrect=int(is_correct)
)
db.session.add(new_attempt)
if Quiz.creatorUser.id == g.current_user.id:
Quiz.creatorHasAttempted = True
db.session.commit()
return jsonify({
'message': 'Answer submitted',
'isCorrect': is_correct,
'attemptID': new_attempt.id,
'quizID': quiz_row.id,
'quizAnswers': quiz_row.quizAnswers,
'quizQuestion': quiz_row.quizQuestion,
'quizCorrectAnswer': quiz_row.quizCorrectAnswer,
'userAnswer': answer
})
@quiz.route('/quizData')
@auth_required()
def get_quiz_info():
try:
quiz_id: uuid.UUID = uuid.UUID(request.args['quiz_id'])
except KeyError:
return jsonify({'message': 'quiz_id must be specified'}), 401
quiz_row: Quiz = db.session.execute(select(Quiz).where(Quiz.id == quiz_id)).scalar()
if not quiz_row:
return jsonify({'message': 'Quiz does not exist'}), 404
return jsonify({
'id': quiz_row.id,
'isActive': quiz_row.isActive,
'creationDate': quiz_row.creationDate,
'quizAnswers': quiz_row.quizAnswers,
'quizQuestion': quiz_row.quizQuestion,
'course': {
'id': quiz_row.course.id,
'name': quiz_row.course.name,
'description': quiz_row.course.description
},
'creator': {
'id': quiz_row.creatorUserID,
'firstName': quiz_row.creatorUser.firstName,
'lastName': quiz_row.creatorUser.lastName,
'username': quiz_row.creatorUser.username,
'pfpFilename': url_for('send_file', filename=quiz_row.creatorUser.pfpFilename)
}
}), 200

@ -22,6 +22,7 @@ USER_UPLOADS_DIR: str = os.path.abspath(os.path.join(PROJECT_ROOT, "uploads"))
SPAM_SCORE_THRESHOLD: int = 6 SPAM_SCORE_THRESHOLD: int = 6
AI_SPAM_SERVICES_MICROSERVICE: str ='http://localhost:5000/test-spam' AI_SPAM_SERVICES_MICROSERVICE: str ='http://localhost:5000/test-spam'
AI_QUIZ_SERVICES_MICROSERVICE: str = 'http://localhost:5000/generate-questions'
DB_URI: str = f"{DB_ENGINE}://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}" DB_URI: str = f"{DB_ENGINE}://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
ACTIVATE_ACCOUNTS_ON_SIGNUP: bool = True ACTIVATE_ACCOUNTS_ON_SIGNUP: bool = True

@ -105,9 +105,12 @@ class Quiz(db.Model):
quiz_attempts: Mapped[List["QuizAttempt"]] = relationship(back_populates="quiz", cascade="all, delete-orphan") quiz_attempts: Mapped[List["QuizAttempt"]] = relationship(back_populates="quiz", cascade="all, delete-orphan")
courseID: Mapped[uuid.UUID] = mapped_column(ForeignKey("course.id")) courseID: Mapped[uuid.UUID] = mapped_column(ForeignKey("course.id"))
course: Mapped["Course"] = relationship(back_populates="quizzes") course: Mapped["Course"] = relationship(back_populates="quizzes")
quizJson: Mapped[str] = mapped_column(String, nullable=False) quizQuestion: Mapped[str] = mapped_column(String, nullable=False)
quizAnswers: Mapped[str] = mapped_column(String, nullable=False)
quizCorrectAnswer: Mapped[str] = mapped_column(String, nullable=False)
creationDate: Mapped[datetime] = mapped_column(DateTime, nullable=False, default=func.now()) creationDate: Mapped[datetime] = mapped_column(DateTime, nullable=False, default=func.now())
isActive: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True) isActive: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
creatorHasAttempted: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
class QuizAttempt(db.Model): class QuizAttempt(db.Model):
__tablename__ = 'quiz_attempts' __tablename__ = 'quiz_attempts'
@ -117,8 +120,8 @@ class QuizAttempt(db.Model):
user: Mapped["User"] = relationship(back_populates="quiz_attempts") user: Mapped["User"] = relationship(back_populates="quiz_attempts")
quizID: Mapped[uuid.UUID] = mapped_column(ForeignKey("quiz.id")) quizID: Mapped[uuid.UUID] = mapped_column(ForeignKey("quiz.id"))
quiz: Mapped["Quiz"] = relationship(back_populates="quiz_attempts") quiz: Mapped["Quiz"] = relationship(back_populates="quiz_attempts")
answerKey: Mapped[str] = mapped_column(String, nullable=False) userAnswer: Mapped[str] = mapped_column(String, nullable=False)
score: Mapped[int] = mapped_column(default=0, nullable=False) isCorrect: Mapped[int] = mapped_column(default=0, nullable=False)
attemptDate: Mapped[datetime] = mapped_column(DateTime, nullable=False, default=func.now()) attemptDate: Mapped[datetime] = mapped_column(DateTime, nullable=False, default=func.now())
class Chat(db.Model): class Chat(db.Model):

@ -2,12 +2,10 @@ import string
import hashlib import hashlib
import random import random
import os import os
from PyPDF2 import PdfReader from PyPDF2 import PdfReader, PdfWriter
from config import * from config import *
import re import re
FILE_NAME = 'manjil.pdf'
FILE_PATH = os.path.join(os.getcwd(), FILE_NAME)
def random_string_generator(string_length: int) -> str: def random_string_generator(string_length: int) -> str:
letters = string.ascii_letters letters = string.ascii_letters
@ -17,15 +15,6 @@ def random_string_generator(string_length: int) -> str:
def hash_string(string_value: str) ->str: def hash_string(string_value: str) ->str:
return hashlib.sha256(string_value.encode('utf-8')).hexdigest() return hashlib.sha256(string_value.encode('utf-8')).hexdigest()
def read_pdf_human_readable(file_path: str) -> list[str]:
pdf_page_text_contents: list = []
reader: PdfReader = PdfReader(file_path)
for i, page in enumerate(reader.pages):
text: str = page.extract_text()
if text:
pdf_page_text_contents.append(text.strip())
return pdf_page_text_contents
def is_valid_email(email): def is_valid_email(email):
pattern = r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$' pattern = r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$'
if re.match(pattern, email): if re.match(pattern, email):
@ -63,3 +52,27 @@ def password_check_sanity(passwd: str) -> bool:
class InsecurePasswordException(Exception): class InsecurePasswordException(Exception):
pass pass
def split_pdf_into_pages_with_text(pdf_path: str, output_directory: str) -> int:
with open(pdf_path, 'rb') as pdf_file:
reader = PdfReader(pdf_file)
page_counter = 1
for page_num in range(len(reader.pages)):
page = reader.pages[page_num]
text = page.extract_text()
if text is None:
text = ''
output_txt_filename = os.path.join(output_directory, f"{page_counter}.txt")
with open(output_txt_filename, 'w', encoding='utf-8') as output_file:
output_file.write(text)
# Save as PDF file
writer = PdfWriter()
writer.add_page(page)
output_pdf_filename = os.path.join(output_directory, f"{page_counter}.pdf")
with open(output_pdf_filename, 'wb') as output_pdf_file:
writer.write(output_pdf_file)
if len(reader.pages) == page_counter:
return len(reader.pages)
page_counter += 1
Loading…
Cancel
Save