Compare commits

...

7 Commits

Author SHA1 Message Date
fine4me aff6d939ca user reoutes 6 months ago
fine4me 0ba527a3fa added pdf seperation 6 months ago
fine4me d8b426fa33 made chat before and after 6 months ago
fine4me 971dbf2088 made public summary removed checkguard 6 months ago
fine4me 1ace9341b9 admin priviledge 6 months ago
fine4me 576214ce5d made changes in chat 6 months ago
PANDACUSHION d4855b9615 changes in quiz 6 months ago
  1. 184
      backend/blueprints/admin/__init__.py
  2. 153
      backend/blueprints/badge/__init__.py
  3. 188
      backend/blueprints/chat/__init__.py
  4. 109
      backend/blueprints/notification/__init__.py
  5. 1
      backend/blueprints/profile/__init__.py
  6. 71
      backend/blueprints/public/__init__.py
  7. 52
      backend/blueprints/quiz/__init__.py
  8. 5
      frontend/edu-connect/package-lock.json
  9. 1
      frontend/edu-connect/package.json
  10. 29
      pdfreader.py

@ -0,0 +1,184 @@
from utils .auth import auth_required, requires_role
from flask import Blueprint, jsonify, g,url_for
from db.model import User, Course, Enrollment, Chat, db
from sqlalchemy import select, func, desc, and_
from datetime import datetime, timedelta
from constants import UserRole
admin = Blueprint('admin', __name__)
@admin.route('/stats/users', methods=['GET'])
@auth_required()
@requires_role([UserRole.ADMIN])
def get_user_stats():
"""
Get total users and authors count.
Only accessible by admin users.
"""
try:
# Get total users
total_users = db.session.execute(
select(func.count()).select_from(User)
).scalar()
# Get authors (users who have created courses)
authors = db.session.execute(
select(func.count(func.distinct(Course.authorID)))
.select_from(Course)
).scalar()
return jsonify({
'stats': {
'totalUsers': total_users,
'totalAuthors': authors
}
}), 200
except Exception as e:
return jsonify({'message': f'An error occurred: {str(e)}'}), 500
@admin.route('/stats/enrollments', methods=['GET'])
@auth_required()
@requires_role([UserRole.ADMIN])
def get_enrollment_stats():
"""
Get course enrollment and discussion statistics.
Only accessible by admin users.
"""
try:
# Get enrollment and user counts
enrollment_stats = db.session.execute(
select(
func.count(Enrollment.id).label('total_enrollments'),
func.count(func.distinct(Enrollment.userID)).label('enrolled_users')
)
.select_from(Enrollment)
).first()
# Get course-wise enrollment counts
course_stats = db.session.execute(
select(
Course.name,
func.count(Enrollment.id).label('enrollment_count')
)
.join(Course, Course.id == Enrollment.courseID)
.group_by(Course.id)
.order_by(desc('enrollment_count'))
).all()
return jsonify({
'stats': {
'totalEnrollments': enrollment_stats.total_enrollments,
'totalEnrolledUsers': enrollment_stats.enrolled_users,
'courseEnrollments': [{
'courseName': stat.name,
'enrollmentCount': stat.enrollment_count
} for stat in course_stats]
}
}), 200
except Exception as e:
return jsonify({'message': f'An error occurred: {str(e)}'}), 500
@admin.route('/stats/discussions', methods=['GET'])
@auth_required()
@requires_role([UserRole.ADMIN])
def get_discussion_stats():
"""
Get chat room activity statistics.
Only accessible by admin users.
"""
try:
# Get activity for last 24 hours
twenty_four_hours_ago = datetime.now() - timedelta(hours=24)
# Get active rooms and their stats
active_rooms = db.session.execute(
select(
Course.name,
func.count(Chat.id).label('message_count'),
func.count(func.distinct(Chat.userID)).label('active_users')
)
.join(Course, Course.id == Chat.courseID)
.where(Chat.chatDate >= twenty_four_hours_ago)
.group_by(Course.id)
.order_by(desc('message_count'))
).all()
# Get total active rooms
total_active_rooms = len(active_rooms)
# Get most active room
most_active_room = None
if active_rooms:
most_active = active_rooms[0]
most_active_room = {
'name': most_active.name,
'messageCount': most_active.message_count,
'activeUsers': most_active.active_users
}
# Get total active users across all rooms
total_active_users = db.session.execute(
select(func.count(func.distinct(Chat.userID)))
.where(Chat.chatDate >= twenty_four_hours_ago)
).scalar()
return jsonify({
'stats': {
'totalActiveRooms': total_active_rooms,
'totalActiveUsers': total_active_users,
'mostActiveRoom': most_active_room,
'activeRooms': [{
'roomName': room.name,
'messageCount': room.message_count,
'activeUsers': room.active_users
} for room in active_rooms]
}
}), 200
except Exception as e:
return jsonify({'message': f'An error occurred: {str(e)}'}), 500
@admin.route('/stats/userDetail', methods=['GET'])
@auth_required()
@requires_role([UserRole.ADMIN])
def get_user_details():
"""
Get detailed information for all users.
Only accessible by admin users.
"""
try:
current_user: User = g.current_user
# Get all users with basic info
users = db.session.execute(
select(User)
.order_by(desc(User.joinedDate))
).scalars()
# Format user data
user_list = [{
'id': str(user.id),
'username': user.username,
'email': user.email,
'firstName': user.firstName,
'lastName': user.lastName,
'profilePicture': url_for('send_file', filename=current_user.pfpFilename, _external=True),
'bio': user.bio,
'role': user.role,
'isActivated': user.isActivated,
'joinedDate': user.joinedDate.isoformat(),
'lastOnline': user.lastOnline.isoformat(),
'dateOfBirth': user.dob.isoformat() if user.dob else None
} for user in users]
return jsonify({
'users': user_list,
'count': len(user_list)
}), 200
except Exception as e:
return jsonify({'message': f'An error occurred: {str(e)}'}), 500

@ -1,3 +1,152 @@
from flask import Blueprint
from flask import Blueprint, request, jsonify, g
from werkzeug.datastructures import MultiDict
import os
import uuid
from sqlalchemy import select
from config import DEFAULT_BADGE_ICON, USER_UPLOADS_DIR
from db.model import db, Badge, User
from utils.utils import random_string_generator
from utils.auth import auth_required, requires_role
from constants import UserRole
badge = Blueprint('badge', __name__)
badge = Blueprint('badge', __name__)
@badge.route('/create', methods=['POST'])
@auth_required()
@requires_role([UserRole.ADMIN])
def create_badge():
"""
Create a new badge. Only accessible by admin users.
Requires: badge_name, description
Optional: icon_image, can_claim
"""
try:
form_data: dict = request.form
badge_uploaded_icon: MultiDict|None = request.files.get('icon_image', None)
# Validate required fields
try:
badge_name: str = form_data['badge_name']
if len(badge_name) > 16:
return jsonify({'message': 'Badge name cannot exceed 16 characters'}), 400
except KeyError:
return jsonify({'message': 'Badge name cannot be empty'}), 400
# Handle icon upload
icon_file_name: str = DEFAULT_BADGE_ICON
if badge_uploaded_icon is not None:
icon_file_name = f"{random_string_generator(32)}.{badge_uploaded_icon.filename.split('.')[-1]}"
badge_uploaded_icon.save(os.path.join(USER_UPLOADS_DIR, icon_file_name))
# Get optional fields
badge_description: str = form_data.get('description', '')
can_claim: bool = form_data.get('can_claim', 'false').lower() == 'true'
# Create new badge
new_badge: Badge = Badge(
name=badge_name,
description=badge_description,
icon=icon_file_name,
canClaim=can_claim,
user_badges=[]
)
db.session.add(new_badge)
db.session.commit()
return jsonify({
'message': 'Badge was created successfully.',
'badge': {
'id': str(new_badge.id),
'name': new_badge.name,
'description': new_badge.description,
'icon': new_badge.icon,
'canClaim': new_badge.canClaim
}
}), 200
except Exception as e:
db.session.rollback()
return jsonify({'message': f'An error occurred: {str(e)}'}), 500
@badge.route('/update/<badge_id>', methods=['PUT'])
@auth_required()
@requires_role([UserRole.ADMIN])
def update_badge(badge_id):
"""
Update an existing badge. Only accessible by admin users.
"""
try:
badge_to_update = db.session.get(Badge, uuid.UUID(badge_id))
if not badge_to_update:
return jsonify({'message': 'Badge not found'}), 404
form_data: dict = request.form
badge_uploaded_icon: MultiDict|None = request.files.get('icon_image', None)
# Update icon if provided
if badge_uploaded_icon is not None:
new_icon_name = f"{random_string_generator(32)}.{badge_uploaded_icon.filename.split('.')[-1]}"
badge_uploaded_icon.save(os.path.join(USER_UPLOADS_DIR, new_icon_name))
# Remove old icon if it's not the default
if badge_to_update.icon != DEFAULT_BADGE_ICON:
try:
os.remove(os.path.join(USER_UPLOADS_DIR, badge_to_update.icon))
except OSError:
pass # File might not exist
badge_to_update.icon = new_icon_name
# Update other fields if provided
if 'badge_name' in form_data:
if len(form_data['badge_name']) > 16:
return jsonify({'message': 'Badge name cannot exceed 16 characters'}), 400
badge_to_update.name = form_data['badge_name']
if 'description' in form_data:
badge_to_update.description = form_data['description']
if 'can_claim' in form_data:
badge_to_update.canClaim = form_data['can_claim'].lower() == 'true'
db.session.commit()
return jsonify({
'message': 'Badge was updated successfully.',
'badge': {
'id': str(badge_to_update.id),
'name': badge_to_update.name,
'description': badge_to_update.description,
'icon': badge_to_update.icon,
'canClaim': badge_to_update.canClaim
}
}), 200
except Exception as e:
db.session.rollback()
return jsonify({'message': f'An error occurred: {str(e)}'}), 500
@badge.route('/list', methods=['GET'])
@auth_required()
def list_badges():
"""
List all badges. Accessible by all authenticated users.
"""
try:
badges = db.session.execute(
select(Badge)
.order_by(Badge.createDate.desc())
).scalars().all()
return jsonify({
'badges': [{
'id': str(badge.id),
'name': badge.name,
'description': badge.description,
'icon': badge.icon,
'canClaim': badge.canClaim,
'createDate': badge.createDate.isoformat()
} for badge in badges]
}), 200
except Exception as e:
return jsonify({'message': f'An error occurred: {str(e)}'}), 500

@ -1,66 +1,148 @@
from flask import Blueprint, request, jsonify
from uuid import UUID
from db.model import db, User, Course, Enrollment
from flask import Blueprint, request, jsonify, g
from db.model import User, Enrollment, Chat, db
import uuid
from sqlalchemy import select, and_, desc
from datetime import datetime
chat = Blueprint('chat', __name__)
@chat.route('/course/<uuid:course_id>/users', methods=['POST'])
def get_users_assigned_to_course(course_id: UUID):
@chat.route('/<uuid:course_id>/messages', methods=['GET'])
def get_course_messages(course_id: uuid.UUID):
"""
Fetch all users assigned to a specific course.
:param course_id: ID of the course to fetch users for (UUID).
:return: JSON response with users assigned to the course.
Fetch chat messages for a specific course with optional pagination.
Only enrolled users can access the messages.
Query Parameters:
- before: UUID of the message to get older entries
- after: UUID of the message to get newer entries
- limit: Number of messages to return (default: 10)
"""
try:
# Query the course to ensure it exists
course = Course.query.get(course_id)
if not course:
return jsonify({"error": "Course not found."}), 404
# Get the list of users assigned to the course
users = User.query.filter(User.enrollments.any(course_id=course_id)).all()
# Prepare the response data
user_data = [
{
"id": user.id,
"email": user.email,
"username": user.username,
"firstName": user.firstName,
"lastName": user.lastName,
}
for user in users
]
return jsonify({"course": course.name, "users": user_data}), 200
current_user: User = g.current_user
limit = int(request.args.get('limit', 10))
before_id = request.args.get('before')
after_id = request.args.get('after')
# Verify user's enrollment
enrollment = db.session.execute(
select(Enrollment).where(
and_(
Enrollment.courseID == course_id,
Enrollment.userID == current_user.id
)
)
).scalar()
if not enrollment:
return jsonify({'message': 'You are not enrolled in this course'}), 401
# Base query
query = select(Chat).where(Chat.courseID == course_id)
# Handle pagination
if before_id:
try:
reference_message = db.session.execute(
select(Chat).where(Chat.id == uuid.UUID(before_id))
).scalar()
if not reference_message:
return jsonify({'message': 'Reference message not found'}), 404
query = query.where(Chat.chatDate < reference_message.chatDate)
except ValueError:
return jsonify({'message': 'Invalid message ID format'}), 400
elif after_id:
try:
reference_message = db.session.execute(
select(Chat).where(Chat.id == uuid.UUID(after_id))
).scalar()
if not reference_message:
return jsonify({'message': 'Reference message not found'}), 404
query = query.where(Chat.chatDate > reference_message.chatDate)
# For after queries, reverse the order later
query = query.order_by(Chat.chatDate)
except ValueError:
return jsonify({'message': 'Invalid message ID format'}), 400
else:
# Default ordering
query = query.order_by(desc(Chat.chatDate))
# Apply limit and execute query
query = query.limit(limit)
messages = list(db.session.execute(query).scalars())
# Reverse the order for 'after' queries to maintain consistency
if after_id:
messages.reverse()
# Format messages
chat_messages = [{
'id': str(msg.id),
'text': msg.textContent,
'userId': str(msg.userID),
'username': msg.user.username,
'timestamp': msg.chatDate.isoformat()
} for msg in messages]
return jsonify({
'messages': chat_messages,
'count': len(chat_messages),
'hasMore': len(chat_messages) == limit
}), 200
except Exception as e:
return jsonify({"error": f"An error occurred: {str(e)}"}), 500
return jsonify({'message': f'An error occurred: {str(e)}'}), 500
@chat.route('/course/<uuid:course_id>/chat', methods=['POST'])
def join_chat(course_id: UUID):
@chat.route('/<uuid:course_id>/send', methods=['POST'])
def send_message(course_id: uuid.UUID):
"""
Allow users to join the chat only if they are enrolled in the course.
:param course_id: ID of the course (UUID).
:return: JSON response indicating whether the user can join or not.
Send a new chat message in a course.
Only enrolled users can send messages.
"""
try:
# Get the user from the request (assume user_id is passed in the request body)
user_id = request.json.get('user_id')
if not user_id:
return jsonify({"error": "User ID is required."}), 400
# Query the user and ensure they exist
user = User.query.get(user_id)
if not user:
return jsonify({"error": "User not found."}), 404
# Check if the user is enrolled in the course
enrollment = Enrollment.query.filter_by(userID=user_id, courseID=course_id).first()
current_user: User = g.current_user
message_text = request.json.get('message')
if not message_text or not message_text.strip():
return jsonify({'message': 'Message content is required'}), 400
# Verify user's enrollment
enrollment = db.session.execute(
select(Enrollment).where(
and_(
Enrollment.courseID == course_id,
Enrollment.userID == current_user.id
)
)
).scalar()
if not enrollment:
return jsonify({"error": "User is not enrolled in this course."}), 403
# If enrolled, allow the user to join the chat
return jsonify({"message": f"User {user.username} is enrolled in the course and can join the chat."}), 200
return jsonify({'message': 'You are not enrolled in this course'}), 401
# Create new chat message
new_message = Chat(
textContent=message_text,
userID=current_user.id,
courseID=course_id,
chatDate=datetime.now()
)
db.session.add(new_message)
# Update last activity in enrollment
enrollment.lastActivity = datetime.now()
db.session.commit()
return jsonify({
'message': 'Message sent successfully',
'messageId': str(new_message.id),
'timestamp': new_message.chatDate.isoformat()
}), 201
except Exception as e:
return jsonify({"error": f"An error occurred: {str(e)}"}), 500
db.session.rollback()
return jsonify({'message': f'An error occurred: {str(e)}'}), 500

@ -1,3 +1,108 @@
from flask import Blueprint
from flask import Blueprint, jsonify, request, g
from utils.auth import auth_required
from db.model import User, Notification, db
from sqlalchemy import select, and_, desc
from datetime import datetime
import uuid
notification = Blueprint('notification', __name__)
notification = Blueprint('notification', __name__)
@notification.route('/get', methods=['GET', 'POST'])
@auth_required()
def handle_notifications():
"""
Unified endpoint for handling notifications:
GET: Fetch notifications with optional before/after pagination
POST: Mark notifications as seen
Query Parameters for GET:
- before: UUID of notification to get older entries
- after: UUID of notification to get newer entries
- limit: Number of notifications to return (default: 10)
POST Body:
{
"notificationIds": ["uuid1", "uuid2", ...]
}
"""
if request.method == 'GET':
try:
current_user: User = g.current_user
limit: int = int(request.args.get('limit', 10))
before_id = request.args.get('before')
after_id = request.args.get('after')
# Base query
query = select(Notification).where(
Notification.userID == current_user.id
)
# Handle pagination
if before_id:
try:
reference_notification = db.session.execute(
select(Notification)
.where(and_(
Notification.id == uuid.UUID(before_id),
Notification.userID == current_user.id
))
).scalar()
if not reference_notification:
return jsonify({'message': 'Reference notification not found'}), 404
query = query.where(
Notification.notifDate < reference_notification.notifDate
)
except ValueError:
return jsonify({'message': 'Invalid notification ID format'}), 400
elif after_id:
try:
reference_notification = db.session.execute(
select(Notification)
.where(and_(
Notification.id == uuid.UUID(after_id),
Notification.userID == current_user.id
))
).scalar()
if not reference_notification:
return jsonify({'message': 'Reference notification not found'}), 404
query = query.where(
Notification.notifDate > reference_notification.notifDate
)
# For after queries, we need to reverse the order later
query = query.order_by(Notification.notifDate)
except ValueError:
return jsonify({'message': 'Invalid notification ID format'}), 400
else:
# Default ordering
query = query.order_by(desc(Notification.notifDate))
# Apply limit and execute query
query = query.limit(limit)
notifications = list(db.session.execute(query).scalars())
# Reverse the order for 'after' queries to maintain consistency
if after_id:
notifications.reverse()
# Format response
notif_list = [{
'id': str(notif.id),
'type': notif.notificationType,
'data': notif.notificationData,
'seen': notif.isSeen,
'date': notif.notifDate.isoformat()
} for notif in notifications]
return jsonify({
'notifications': notif_list,
'count': len(notif_list),
'hasMore': len(notif_list) == limit
}), 200
except Exception as e:
return jsonify({'message': f'An error occurred: {str(e)}'}), 500

@ -132,7 +132,6 @@ def get_profile():
"pfp_filename": current_user.pfpFilename,
"profile_url":profile_url
}
return jsonify({"profile": profile_data}), 200
except Exception as e:
return jsonify({"error": f"Failed to fetch profile. Error: {str(e)}"}), 500

@ -0,0 +1,71 @@
from flask import Blueprint, jsonify
from db.model import User, Course, db
from sqlalchemy import select, func
public_summary = Blueprint('public', __name__)
@public_summary.route('/stats/total-users', methods=['GET'])
def get_total_users():
"""
Fetch total user count.
"""
try:
total_users = db.session.execute(
select(func.count()).select_from(User)
).scalar()
return jsonify({'totalUsers': total_users}), 200
except Exception as e:
return jsonify({'message': f'An error occurred: {str(e)}'}), 500
@public_summary.route('/stats/total-authors', methods=['GET'])
def get_total_authors():
"""
Fetch total authors (users who have created courses).
"""
try:
total_authors = db.session.execute(
select(func.count(func.distinct(Course.authorID)))
.select_from(Course)
).scalar()
return jsonify({'totalAuthors': total_authors}), 200
except Exception as e:
return jsonify({'message': f'An error occurred: {str(e)}'}), 500
@public_summary.route('/stats/total-courses', methods=['GET'])
def get_total_courses():
"""
Fetch total course count.
"""
try:
total_courses = db.session.execute(
select(func.count()).select_from(Course)
).scalar()
return jsonify({'totalCourses': total_courses}), 200
except Exception as e:
return jsonify({'message': f'An error occurred: {str(e)}'}), 500
@public_summary.route('/stats/subscribed-users', methods=['GET'])
def get_subscribed_users():
"""
Fetch count of users subscribed to the newsletter and are activated.
"""
try:
subscribed_users = db.session.execute(
select(func.count(User.email))
.select_from(User)
.where(User.isActivated == True)
).scalar()
return jsonify({'subscribedNewsletter': subscribed_users}), 200
except Exception as e:
return jsonify({'message': f'An error occurred: {str(e)}'}), 500

@ -0,0 +1,52 @@
from flask import Blueprint
from flask import Blueprint, request, jsonify, current_app, g
from db.model import User, Category, Quiz, Course, Enrollment, QuizAttempt
from db.model import db
import uuid
from sqlalchemy import select, and_
quiz = Blueprint('quiz', __name__)
@quiz.route('/create', methods=['POST'])
def create_quiz():
data: dict = request.form
creator_user: User = g.current_user
course_id: uuid.UUID = uuid.UUID(data.get('courseID'))
enrollment_data: Enrollment = db.session.execute(select(Enrollment).where(and_(Enrollment.courseID == course_id, Enrollment.userID == creator_user.id))).scalar()
if not enrollment_data:
return jsonify({'message': 'You are not enrolled in this class'}), 401
get_quiz_data :str = '{"questions":[{"question":"What is the capital of France?","options":["Paris","London","Berlin","Madrid"],"answer":"Paris"},{"question":"What is 2 + 2?","options":["3","4","5","6"],"answer":"4"}]}'
new_quiz = Quiz(
creatorUserID = creator_user.id,
courseID = course_id,
quizJson = get_quiz_data,
quiz_attempts = []
)
db.session.add_all(new_quiz)
db.session.commit()
return jsonify({'message': 'Course was created successfully.'}), 200
@quiz.route('/attempt', methods=['POST'])
def attempt_quiz():
data: dict = request.form
attempt_user: User = g.current_user
quiz_id: uuid.UUID = uuid.UUID(data.get('quizID'))
course_data: Enrollment = db.session.execute(select(Enrollment).where(and_(Enrollment.courseID == course_data, Enrollment.userID == attempt_user.id))).scalar()
if not course_data:
return jsonify({'message': 'You are not enrolled in this class'}), 401
answerKey: str = '{"questions":[{"question":"What is the capital of France?","options":["Paris","London","Berlin","Madrid"],"answer":"Paris"},{"question":"What is 2 + 2?","options":["3","4","5","6"],"answer":"4"}]}'
#function to calculate Score
score_value = 0
new_attempt = QuizAttempt(
userID = attempt_user,
quizID = quiz_id,
answerKey = answerKey,
score = score_value
)
db.session.add_all(new_attempt)
db.session.commit()
return jsonify({'message': 'Quiz was appended sucessfully'}), 200

@ -12,6 +12,7 @@
"@radix-ui/react-slot": "^1.1.1",
"class-variance-authority": "^0.7.1",
"clsx": "^2.1.1",
"edu-connect": "file:",
"lucide-react": "^0.471.0",
"next": "14.2.23",
"react": "^18",
@ -1712,6 +1713,10 @@
"resolved": "https://registry.npmjs.org/eastasianwidth/-/eastasianwidth-0.2.0.tgz",
"integrity": "sha512-I88TYZWc9XiYHRQ4/3c5rjjfgkjhLyW2luGIheGERbNQ6OY7yTybanSpDXZa8y7VUP9YmDcYa+eyq4ca7iLqWA=="
},
"node_modules/edu-connect": {
"resolved": "",
"link": true
},
"node_modules/emoji-regex": {
"version": "9.2.2",
"resolved": "https://registry.npmjs.org/emoji-regex/-/emoji-regex-9.2.2.tgz",

@ -13,6 +13,7 @@
"@radix-ui/react-slot": "^1.1.1",
"class-variance-authority": "^0.7.1",
"clsx": "^2.1.1",
"edu-connect": "file:",
"lucide-react": "^0.471.0",
"next": "14.2.23",
"react": "^18",

@ -0,0 +1,29 @@
import os
from PyPDF2 import PdfReader, PdfWriter
PATH = 'pdfcontent'
if not os.path.exists(PATH):
os.makedirs(PATH)
def getPdfFile(filepath):
with open(filepath, 'rb') as pdf_file:
reader = PdfReader(pdf_file)
for page_num in range(len(reader.pages)):
page = reader.pages[page_num]
text = page.extract_text()
# Save as text file
if text: # Ensure there's text on the page before saving
filename = os.path.splitext(os.path.basename(filepath))[0]
output_txt_filename = os.path.join(PATH, f"{filename}_page{page_num + 1}.txt")
with open(output_txt_filename, 'w', encoding='utf-8') as output_file:
output_file.write(text)
print(f"Page {page_num + 1} extracted and saved as {output_txt_filename}")
# Save as PDF file
writer = PdfWriter()
writer.add_page(page)
output_pdf_filename = os.path.join(PATH, f"{filename}_page{page_num + 1}.pdf")
with open(output_pdf_filename, 'wb') as output_pdf_file:
writer.write(output_pdf_file)
print(f"Page {page_num + 1} extracted and saved as {output_pdf_filename}")
Loading…
Cancel
Save