From 555ba342482efdf7a12a778a51846d1864cc8402 Mon Sep 17 00:00:00 2001 From: Joseph <162514273+josephletobar@users.noreply.github.com> Date: Mon, 11 Aug 2025 16:47:05 -0700 Subject: [PATCH] feat: introduce celery tasks --- app/main.py | 48 +++++++++++++++++++++++++------------------- app/tasks.py | 52 ++++++++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 6 ++++-- 3 files changed, 84 insertions(+), 22 deletions(-) create mode 100644 app/tasks.py diff --git a/app/main.py b/app/main.py index 68541dd..a5b7314 100644 --- a/app/main.py +++ b/app/main.py @@ -1,7 +1,6 @@ import os import time import pandas as pd -import glob import re from reportlab.lib.pagesizes import letter @@ -9,12 +8,18 @@ from reportlab.platypus import SimpleDocTemplate, Paragraph from nlp.gpt_utils import order_files -from generation.generate_notes import create_notes -from extraction.file_utils import handle_image, handle_pdf, get_file_type, handle_video, clear_output, split_text, move_file +from extraction.file_utils import get_file_type, clear_output, split_text, move_file from nlp.topic_modelling import topic_model -from extraction.outline import create_outline -from extraction.line_embed import line_sort -from nlp.embedding_utils import embed_sections +from tasks import ( + handle_image_task, + handle_pdf_task, + handle_video_task, + create_outline_task, + line_sort_task, + embed_sections_task, + create_notes_task, +) +from celery import group from config import NOTE_INPUTS_DIR, RAW_TEXT, SECTIONS, COMPLETED_NOTES_FILE, PREVIOUS_INPUTS @@ -84,38 +89,40 @@ def main(): # Work with the available files time.sleep(3) + processing_tasks = [] for file in files: file_path, file_type = get_file_type(file) valid_video_types = {"MP4", "AVI", "MKV", "MOV", "WMV", "FLV", "WEBM", "MPEG", "MPG", "OGV", "3GP", "MTS"} valid_image_types = {"PNG", "JPEG", "JPG", "BMP", "GIF", "TIFF", "WEBP"} try: - # For Images if file_type in valid_image_types: - handle_image(file, file_path) - # For PDFs + processing_tasks.append(handle_image_task.s(file, file_path)) elif file_type == 'PDF': - handle_pdf(file, file_path) - # For videos + processing_tasks.append(handle_pdf_task.s(file, file_path)) elif file_type in valid_video_types: - handle_video(file, file_path) - + processing_tasks.append(handle_video_task.s(file, file_path)) except Exception as e: print(f"Invalid file {e}") + + if processing_tasks: + group(processing_tasks).apply_async().get() + time.sleep(3) - # Creata an outline from the raw text and return a df with info - df= create_outline(RAW_TEXT) + # Create an outline from the raw text and return a df with info + df_records = create_outline_task.delay(RAW_TEXT).get() + df = pd.DataFrame(df_records) section_embeddings = { row["filename"]: row["embedding"] - for _, row in df.iterrows() + for row in df_records } # Sort lines into their respective sections - line_sort(RAW_TEXT, section_embeddings) + line_sort_task.delay(RAW_TEXT, section_embeddings).get() # Embed the new sorted sections - embed_sections(SECTIONS) + embed_sections_task.delay(SECTIONS).get() stop_timer.set() # Signal the timer thread to stop (processing done) timer_thread.join() # Wait for the timer thread to finish @@ -131,8 +138,9 @@ def main(): ) # Create notes on chunks - for section in sections: - create_notes(section, df) # Send topics GPT to make notes on + note_tasks = [create_notes_task.s(section, df_records) for section in sections] + if note_tasks: + group(note_tasks).apply_async().get() print("--------------") print("Notes Completed") diff --git a/app/tasks.py b/app/tasks.py new file mode 100644 index 0000000..996a919 --- /dev/null +++ b/app/tasks.py @@ -0,0 +1,52 @@ +import os +import pandas as pd +from celery import Celery +from extraction.file_utils import handle_image, handle_pdf, handle_video +from extraction.outline import create_outline +from extraction.line_embed import line_sort +from nlp.embedding_utils import embed_sections +from generation.generate_notes import create_notes + + +celery = Celery( + 'tasks', + broker=os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'), + backend=os.environ.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0') +) + + +@celery.task +def handle_image_task(file, file_path): + return handle_image(file, file_path) + + +@celery.task +def handle_pdf_task(file, file_path): + return handle_pdf(file, file_path) + + +@celery.task +def handle_video_task(file, file_path): + return handle_video(file, file_path) + + +@celery.task +def create_outline_task(raw_text_path): + df = create_outline(raw_text_path) + return df.to_dict(orient='records') + + +@celery.task +def line_sort_task(raw_text_path, section_embeddings): + return line_sort(raw_text_path, section_embeddings) + + +@celery.task +def embed_sections_task(sections_dir): + return embed_sections(sections_dir) + + +@celery.task +def create_notes_task(section, df_records): + df = pd.DataFrame(df_records) + return create_notes(section, df) diff --git a/requirements.txt b/requirements.txt index 45e639b..a047c7b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,5 +15,7 @@ pytesseract Pillow PyMuPDF vosk -markdown -pdf2image \ No newline at end of file +markdown +pdf2image +celery +redis