parent
d8b426fa33
commit
0ba527a3fa
@ -1,3 +1,152 @@ |
|||||||
from flask import Blueprint |
from flask import Blueprint, request, jsonify, g |
||||||
|
from werkzeug.datastructures import MultiDict |
||||||
|
import os |
||||||
|
import uuid |
||||||
|
from sqlalchemy import select |
||||||
|
from config import DEFAULT_BADGE_ICON, USER_UPLOADS_DIR |
||||||
|
from db.model import db, Badge, User |
||||||
|
from utils.utils import random_string_generator |
||||||
|
from utils.auth import auth_required, requires_role |
||||||
|
from constants import UserRole |
||||||
|
|
||||||
badge = Blueprint('badge', __name__) |
badge = Blueprint('badge', __name__) |
||||||
|
|
||||||
|
@badge.route('/create', methods=['POST']) |
||||||
|
@auth_required() |
||||||
|
@requires_role([UserRole.ADMIN]) |
||||||
|
def create_badge(): |
||||||
|
""" |
||||||
|
Create a new badge. Only accessible by admin users. |
||||||
|
Requires: badge_name, description |
||||||
|
Optional: icon_image, can_claim |
||||||
|
""" |
||||||
|
try: |
||||||
|
form_data: dict = request.form |
||||||
|
badge_uploaded_icon: MultiDict|None = request.files.get('icon_image', None) |
||||||
|
|
||||||
|
# Validate required fields |
||||||
|
try: |
||||||
|
badge_name: str = form_data['badge_name'] |
||||||
|
if len(badge_name) > 16: |
||||||
|
return jsonify({'message': 'Badge name cannot exceed 16 characters'}), 400 |
||||||
|
except KeyError: |
||||||
|
return jsonify({'message': 'Badge name cannot be empty'}), 400 |
||||||
|
|
||||||
|
# Handle icon upload |
||||||
|
icon_file_name: str = DEFAULT_BADGE_ICON |
||||||
|
if badge_uploaded_icon is not None: |
||||||
|
icon_file_name = f"{random_string_generator(32)}.{badge_uploaded_icon.filename.split('.')[-1]}" |
||||||
|
badge_uploaded_icon.save(os.path.join(USER_UPLOADS_DIR, icon_file_name)) |
||||||
|
|
||||||
|
# Get optional fields |
||||||
|
badge_description: str = form_data.get('description', '') |
||||||
|
can_claim: bool = form_data.get('can_claim', 'false').lower() == 'true' |
||||||
|
|
||||||
|
# Create new badge |
||||||
|
new_badge: Badge = Badge( |
||||||
|
name=badge_name, |
||||||
|
description=badge_description, |
||||||
|
icon=icon_file_name, |
||||||
|
canClaim=can_claim, |
||||||
|
user_badges=[] |
||||||
|
) |
||||||
|
|
||||||
|
db.session.add(new_badge) |
||||||
|
db.session.commit() |
||||||
|
|
||||||
|
return jsonify({ |
||||||
|
'message': 'Badge was created successfully.', |
||||||
|
'badge': { |
||||||
|
'id': str(new_badge.id), |
||||||
|
'name': new_badge.name, |
||||||
|
'description': new_badge.description, |
||||||
|
'icon': new_badge.icon, |
||||||
|
'canClaim': new_badge.canClaim |
||||||
|
} |
||||||
|
}), 200 |
||||||
|
|
||||||
|
except Exception as e: |
||||||
|
db.session.rollback() |
||||||
|
return jsonify({'message': f'An error occurred: {str(e)}'}), 500 |
||||||
|
|
||||||
|
@badge.route('/update/<badge_id>', methods=['PUT']) |
||||||
|
@auth_required() |
||||||
|
@requires_role([UserRole.ADMIN]) |
||||||
|
def update_badge(badge_id): |
||||||
|
""" |
||||||
|
Update an existing badge. Only accessible by admin users. |
||||||
|
""" |
||||||
|
try: |
||||||
|
badge_to_update = db.session.get(Badge, uuid.UUID(badge_id)) |
||||||
|
if not badge_to_update: |
||||||
|
return jsonify({'message': 'Badge not found'}), 404 |
||||||
|
|
||||||
|
form_data: dict = request.form |
||||||
|
badge_uploaded_icon: MultiDict|None = request.files.get('icon_image', None) |
||||||
|
|
||||||
|
# Update icon if provided |
||||||
|
if badge_uploaded_icon is not None: |
||||||
|
new_icon_name = f"{random_string_generator(32)}.{badge_uploaded_icon.filename.split('.')[-1]}" |
||||||
|
badge_uploaded_icon.save(os.path.join(USER_UPLOADS_DIR, new_icon_name)) |
||||||
|
# Remove old icon if it's not the default |
||||||
|
if badge_to_update.icon != DEFAULT_BADGE_ICON: |
||||||
|
try: |
||||||
|
os.remove(os.path.join(USER_UPLOADS_DIR, badge_to_update.icon)) |
||||||
|
except OSError: |
||||||
|
pass # File might not exist |
||||||
|
badge_to_update.icon = new_icon_name |
||||||
|
|
||||||
|
# Update other fields if provided |
||||||
|
if 'badge_name' in form_data: |
||||||
|
if len(form_data['badge_name']) > 16: |
||||||
|
return jsonify({'message': 'Badge name cannot exceed 16 characters'}), 400 |
||||||
|
badge_to_update.name = form_data['badge_name'] |
||||||
|
|
||||||
|
if 'description' in form_data: |
||||||
|
badge_to_update.description = form_data['description'] |
||||||
|
|
||||||
|
if 'can_claim' in form_data: |
||||||
|
badge_to_update.canClaim = form_data['can_claim'].lower() == 'true' |
||||||
|
|
||||||
|
db.session.commit() |
||||||
|
|
||||||
|
return jsonify({ |
||||||
|
'message': 'Badge was updated successfully.', |
||||||
|
'badge': { |
||||||
|
'id': str(badge_to_update.id), |
||||||
|
'name': badge_to_update.name, |
||||||
|
'description': badge_to_update.description, |
||||||
|
'icon': badge_to_update.icon, |
||||||
|
'canClaim': badge_to_update.canClaim |
||||||
|
} |
||||||
|
}), 200 |
||||||
|
|
||||||
|
except Exception as e: |
||||||
|
db.session.rollback() |
||||||
|
return jsonify({'message': f'An error occurred: {str(e)}'}), 500 |
||||||
|
|
||||||
|
@badge.route('/list', methods=['GET']) |
||||||
|
@auth_required() |
||||||
|
def list_badges(): |
||||||
|
""" |
||||||
|
List all badges. Accessible by all authenticated users. |
||||||
|
""" |
||||||
|
try: |
||||||
|
badges = db.session.execute( |
||||||
|
select(Badge) |
||||||
|
.order_by(Badge.createDate.desc()) |
||||||
|
).scalars().all() |
||||||
|
|
||||||
|
return jsonify({ |
||||||
|
'badges': [{ |
||||||
|
'id': str(badge.id), |
||||||
|
'name': badge.name, |
||||||
|
'description': badge.description, |
||||||
|
'icon': badge.icon, |
||||||
|
'canClaim': badge.canClaim, |
||||||
|
'createDate': badge.createDate.isoformat() |
||||||
|
} for badge in badges] |
||||||
|
}), 200 |
||||||
|
|
||||||
|
except Exception as e: |
||||||
|
return jsonify({'message': f'An error occurred: {str(e)}'}), 500 |
@ -0,0 +1,29 @@ |
|||||||
|
import os |
||||||
|
from PyPDF2 import PdfReader, PdfWriter |
||||||
|
|
||||||
|
PATH = 'pdfcontent' |
||||||
|
if not os.path.exists(PATH): |
||||||
|
os.makedirs(PATH) |
||||||
|
|
||||||
|
def getPdfFile(filepath): |
||||||
|
with open(filepath, 'rb') as pdf_file: |
||||||
|
reader = PdfReader(pdf_file) |
||||||
|
for page_num in range(len(reader.pages)): |
||||||
|
page = reader.pages[page_num] |
||||||
|
text = page.extract_text() |
||||||
|
|
||||||
|
# Save as text file |
||||||
|
if text: # Ensure there's text on the page before saving |
||||||
|
filename = os.path.splitext(os.path.basename(filepath))[0] |
||||||
|
output_txt_filename = os.path.join(PATH, f"{filename}_page{page_num + 1}.txt") |
||||||
|
with open(output_txt_filename, 'w', encoding='utf-8') as output_file: |
||||||
|
output_file.write(text) |
||||||
|
print(f"Page {page_num + 1} extracted and saved as {output_txt_filename}") |
||||||
|
|
||||||
|
# Save as PDF file |
||||||
|
writer = PdfWriter() |
||||||
|
writer.add_page(page) |
||||||
|
output_pdf_filename = os.path.join(PATH, f"{filename}_page{page_num + 1}.pdf") |
||||||
|
with open(output_pdf_filename, 'wb') as output_pdf_file: |
||||||
|
writer.write(output_pdf_file) |
||||||
|
print(f"Page {page_num + 1} extracted and saved as {output_pdf_filename}") |
Loading…
Reference in new issue