diff --git a/README.md b/README.md index d72f934..c2e6382 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ Costa Rica [![GitHub](https://img.shields.io/badge/--181717?logo=github&logoColor=ffffff)](https://github.com/) [brown9804](https://github.com/brown9804) -Last updated: 2025-07-25 +Last updated: 2025-07-29 ----------------------------- @@ -17,6 +17,8 @@ Last updated: 2025-07-25 - Table structure and text are extracted using Azure Document Intelligence (Layout model). - Visual selection cues are detected using Azure AI Vision or image preprocessing. - Visual indicators are mapped to structured data, returning only the selected values in a clean JSON format. +- Advanced semantic understanding is provided by Azure OpenAI to analyze document content and context. +- Multiple file formats are supported, including PDFs and various image formats. - The logic is abstracted to support multiple layout variations, so the system adapts easily to new document formats and selection styles. > [!IMPORTANT] @@ -65,11 +67,14 @@ Last updated: 2025-07-25 -> How to extract layout elements from PDFs stored in an Azure Storage Account, process them using Azure Document Intelligence, and store the results in Cosmos DB for further analysis. +> `How can you extract layout, text, visual, and other elements` from `PDFs` stored in an Azure Storage Account, process them using Azure AI services, and `store the results` in Cosmos DB for `further analysis?` This solution is `designed to accelerate the process` of building your own implementation. Please `feel free to use any of the provided reference.` I'm happy to contribute. Once this solution is deployed: > -> 1. Upload your PDFs to an Azure Blob Storage container.
-> 2. An Azure Function is triggered by the upload, which calls the Azure Document Intelligence Layout API to analyze the document structure.
-> 3. The extracted layout data (such as tables, checkboxes, and text) is parsed and subsequently stored in a Cosmos DB database, ensuring a seamless and automated workflow from document upload to data storage. +> 1. Upload your documents: Just `drop your PDFs or images into an Azure Storage container`and the system takes over from there. +> 2. Automated intelligent processing: Behind the scenes, `Azure Functions orchestrates a powerful AI workflow`: +> - Document Intelligence pulls out tables, text, and form data +> - AI Vision spots visual cues like checkmarks and highlights +> - Azure OpenAI understands what the document actually means +> 3. Centralized information management: `All extracted data is stored in Cosmos DB`, organized and accessible. The system `adapts to differents document layouts without requiring custom code for each format.` > [!NOTE] > Advantages of Document Intelligence for organizations handling with large volumes of documents:
@@ -447,7 +452,7 @@ Last updated: 2025-07-25
- Total views -

Refresh Date: 2025-07-25

+ Total views +

Refresh Date: 2025-07-29

diff --git a/src/function_app.py b/src/function_app.py index 69e73ad..f2fc626 100644 --- a/src/function_app.py +++ b/src/function_app.py @@ -1,506 +1,226 @@ +""" +Modular PDF Layout Extraction with Azure AI Document Intelligence +Supporting Multiple Document Versions with Visual Selection Cues + +This Azure Function provides comprehensive PDF analysis using Azure's built-in capabilities: +1. Azure Document Intelligence for structured extraction (primary PDF processing) +2. Azure AI Vision for image analysis (complementary visual processing) +3. Azure OpenAI for semantic analysis and document understanding +4. Native Azure cloud processing without external dependencies + +Modular Architecture: +- Separate module files for different functional areas +- Easier to code, debug, and maintain +- Clear separation of concerns +""" + +# IMPORTS AND SETUP import logging import azure.functions as func -from azure.ai.formrecognizer import DocumentAnalysisClient, AnalyzeResult -from azure.core.credentials import AzureKeyCredential -from azure.cosmos import CosmosClient, PartitionKey, exceptions -from azure.identity import DefaultAzureCredential -import os -import uuid -import json -from datetime import datetime import time -from typing import List, Dict, Any, Optional -from PIL import Image +import traceback +import os +from typing import Dict, Any, List, Optional, Union from io import BytesIO -import requests # For REST API to Vision -from pdf2image import convert_from_bytes # For PDF to image conversion +from datetime import datetime +# Import functions from modules +from modules.clients.azure_clients import ( + initialize_form_recognizer_client, + initialize_openai_client, + get_vision_api_config +) +from modules.processors.document_intelligence import ( + analyze_pdf, + extract_layout_data +) +from modules.processors.vision_processing import ( + analyze_image_with_vision, + process_image_file +) +from modules.processors.llm_processing import ( + analyze_content_with_llm, + prepare_content_for_llm +) +from modules.output.display_manager import ( + display_complete_vision_output, + display_complete_llm_output, + display_final_concatenated_output +) +from modules.storage.cosmos_manager import ( + initialize_cosmos_client, + create_database_if_not_exists, + create_container_if_not_exists, + prepare_document_for_storage, + store_document +) +from modules.utils.file_helpers import generate_document_id, get_file_info +from modules.utils.validation import validate_required_env_vars +from modules.utils.logging_helpers import log_processing_step +from modules.utils.time_helpers import calculate_processing_time + +# Initialize the function app app = func.FunctionApp(http_auth_level=func.AuthLevel.FUNCTION) -## DEFINITIONS -def initialize_form_recognizer_client() -> DocumentAnalysisClient: - endpoint = os.getenv("FORM_RECOGNIZER_ENDPOINT") - key = os.getenv("FORM_RECOGNIZER_KEY") - if not isinstance(key, str): - raise ValueError("FORM_RECOGNIZER_KEY must be a string") - logging.info(f"Form Recognizer endpoint: {endpoint}") - return DocumentAnalysisClient(endpoint=endpoint, credential=AzureKeyCredential(key)) - -def read_pdf_content(myblob: func.InputStream) -> bytes: - logging.info(f"Reading PDF content from blob: {myblob.name}") - return myblob.read() - -def analyze_pdf(form_recognizer_client: DocumentAnalysisClient, pdf_bytes: bytes) -> AnalyzeResult: - logging.info("Starting PDF layout analysis.") - poller = form_recognizer_client.begin_analyze_document( - model_id="prebuilt-layout", - document=pdf_bytes - ) - logging.info("PDF layout analysis in progress.") - result = poller.result() - logging.info("PDF layout analysis completed.") - num_pages = len(result.pages) if hasattr(result, "pages") and isinstance(result.pages, list) else 0 - num_tables = len(result.tables) if hasattr(result, "tables") and isinstance(result.tables, list) else 0 - num_styles = len(result.styles) if hasattr(result, "styles") and result.styles is not None else 0 - logging.info(f"Document has {num_pages} page(s), {num_tables} table(s), and {num_styles} style(s).") - return result - -def extract_layout_data(result: AnalyzeResult, visual_cues: Optional[List[Dict[str, Any]]] = None, source_file: str = "unknown") -> Dict[str, Any]: - logging.info("Extracting layout data from analysis result.") - - layout_data = { - "id": str(uuid.uuid4()), - "metadata": { - "processed_at": datetime.utcnow().isoformat(), - "source_file": source_file, - "pages_count": len(result.pages) if hasattr(result, "pages") else 0, - "tables_count": len(result.tables) if hasattr(result, "tables") else 0, - "visual_cues_count": len(visual_cues) if visual_cues else 0 - }, - "pages": [] - } - visual_cues = visual_cues or [] # List of dicts with visual cue info per cell - - # Log styles - if hasattr(result, "styles") and result.styles: - for idx, style in enumerate(result.styles): - content_type = "handwritten" if style.is_handwritten else "no handwritten" - logging.info(f"Document contains {content_type} content") - - # Process each page - for page in result.pages: - logging.info(f"--- Page {page.page_number} ---") - page_data = { - "page_number": page.page_number, - "lines": [line.content for line in page.lines], - "tables": [], - "selection_marks": [ - {"state": mark.state, "confidence": mark.confidence} - for mark in page.selection_marks - ] if hasattr(page, 'selection_marks') and page.selection_marks else [] - } - - # Log extracted lines - for line_idx, line in enumerate(page.lines): - logging.info(f"Line {line_idx}: '{line.content}'") - - # Log selection marks - if hasattr(page, 'selection_marks') and page.selection_marks: - for selection_mark in page.selection_marks: - logging.info( - f"Selection mark is '{selection_mark.state}' with confidence {selection_mark.confidence}" - ) - - # Extract tables - page_tables = [ - table for table in result.tables - if any(region.page_number == page.page_number for region in table.bounding_regions) - ] if hasattr(result, 'tables') and result.tables else [] - - for table_index, table in enumerate(page_tables): - logging.info(f"Table {table_index}: {table.row_count} rows, {table.column_count} columns") - - table_data = { - "row_count": table.row_count, - "column_count": table.column_count, - "cells": [] - } - - for cell in table.cells: - content = cell.content.strip() - # Find matching visual cue for this cell (if any) - cue = next((vc for vc in visual_cues if vc.get("page_number") == page.page_number and vc.get("row_index") == cell.row_index and vc.get("column_index") == cell.column_index), None) - cell_info = { - "row_index": cell.row_index, - "column_index": cell.column_index, - "content": content, - "visual_cue": cue["cue_type"] if cue else None - } - table_data["cells"].append(cell_info) - logging.info(f"Cell[{cell.row_index}][{cell.column_index}]: '{content}', visual_cue: {cell_info['visual_cue']}") - - page_data["tables"].append(table_data) - - layout_data["pages"].append(page_data) - - try: - preview = json.dumps(layout_data, indent=2) - logging.info("Structured layout data preview:\n" + preview) - except Exception as e: - logging.warning(f"Could not serialize layout data for preview: {e}") - - return layout_data - -def save_layout_data_to_cosmos(layout_data: Dict[str, Any]) -> None: - try: - endpoint = os.getenv("COSMOS_DB_ENDPOINT") - key = os.getenv("COSMOS_DB_KEY") - aad_credentials = DefaultAzureCredential() - client = CosmosClient(endpoint, credential=aad_credentials, consistency_level='Session') - logging.info("Successfully connected to Cosmos DB using AAD default credential") - except Exception as e: - logging.error(f"Error connecting to Cosmos DB: {e}") - return +# MAIN AZURE FUNCTION +@app.blob_trigger(arg_name="myblob", path="pdfinvoices/{name}", + connection="invoicecontosostorage_STORAGE") +def BlobTriggerPDFsMultiLayoutsAIDocIntelligence(myblob: func.InputStream) -> None: + """ + Blob trigger Azure Function for comprehensive PDF document analysis + Processes PDF files using Azure Document Intelligence, AI Vision, and OpenAI + """ + start_time = datetime.now() - database_name = "ContosoDBDocIntellig" - container_name = "Layouts" - try: - database = client.create_database_if_not_exists(database_name) - logging.info(f"Database '{database_name}' does not exist. Creating it.") - except exceptions.CosmosResourceExistsError: - database = client.get_database_client(database_name) - logging.info(f"Database '{database_name}' already exists.") - - database.read() - logging.info(f"Reading into '{database_name}' DB") - - try: - container = database.create_container( - id=container_name, - partition_key=PartitionKey(path="/id"), - offer_throughput=400 - ) - logging.info(f"Container '{container_name}' does not exist. Creating it.") - except exceptions.CosmosResourceExistsError: - container = database.get_container_client(container_name) - logging.info(f"Container '{container_name}' already exists.") - except exceptions.CosmosHttpResponseError: - raise - - container.read() - logging.info(f"Reading into '{container}' container") + # Get blob information + blob_name = myblob.name + file_content = myblob.read() + + log_processing_step("Starting Document Analysis", f"Processing blob: {blob_name}") + + # Validate required environment variables + required_env_vars = [ + "FORM_RECOGNIZER_ENDPOINT", + "FORM_RECOGNIZER_KEY", + "AZURE_OPENAI_ENDPOINT", + "AZURE_OPENAI_KEY", + "AZURE_OPENAI_GPT4_DEPLOYMENT", + "VISION_API_ENDPOINT", + "VISION_API_KEY" + ] + validate_required_env_vars(required_env_vars) + + # Generate unique document ID + document_id = generate_document_id() + + # Extract filename from blob path + original_filename = blob_name.split('/')[-1] if '/' in blob_name else blob_name + + log_processing_step("File Processing", f"Processing file: {original_filename}") + + # Initialize Azure clients + log_processing_step("Client Initialization", "Setting up Azure service clients") + + # Initialize Form Recognizer client + form_recognizer_client = initialize_form_recognizer_client() - try: - response = container.upsert_item(layout_data) - logging.info(f"Saved processed layout data to Cosmos DB. Response: {response}") - except Exception as e: - logging.error(f"Error inserting item into Cosmos DB: {e}") + # Initialize OpenAI client + openai_client = initialize_openai_client() -def call_vision_api(image_bytes: bytes, subscription_key: str, endpoint: str, max_retries: int = 3) -> Dict[str, Any]: - vision_url = endpoint + "/vision/v3.2/analyze" - headers = { - 'Ocp-Apim-Subscription-Key': subscription_key, - 'Content-Type': 'application/octet-stream' - } - params = { - 'visualFeatures': 'Objects,Color,Text', # Added Text feature for better text detection - 'language': 'en', - 'model-version': 'latest' - } - - for attempt in range(max_retries): + # Get Vision API configuration + vision_config = get_vision_api_config() + + # DOCUMENT INTELLIGENCE PROCESSING + log_processing_step("Document Intelligence Analysis", "Analyzing PDF with Azure Document Intelligence") + + # Analyze PDF with Document Intelligence + document_result = analyze_pdf(form_recognizer_client, file_content) + + # Extract layout data + layout_data = extract_layout_data(document_result) + + # Add document ID and filename to layout data + layout_data["document_id"] = document_id + layout_data["filename"] = original_filename + + log_processing_step("Document Intelligence Complete", f"Extracted {len(layout_data.get('pages', []))} pages") + + # AI VISION PROCESSING + log_processing_step("AI Vision Analysis", "Processing with Azure AI Vision") + try: - response = requests.post(vision_url, headers=headers, params=params, data=image_bytes) - response.raise_for_status() - return response.json() - except requests.exceptions.HTTPError as http_err: - if hasattr(http_err, 'response') and http_err.response.status_code == 429: # Too Many Requests - if attempt < max_retries - 1: - retry_after = int(http_err.response.headers.get('Retry-After', 1)) - logging.warning(f"Rate limit hit, waiting {retry_after} seconds...") - time.sleep(retry_after) - continue - logging.error(f"HTTP error occurred: {http_err}") - raise - except Exception as err: - logging.error(f"Error calling Vision API: {err}") - if attempt < max_retries - 1: - time.sleep(2 ** attempt) # Exponential backoff - continue - raise - - raise Exception("Max retries exceeded for Vision API call") - -def extract_visual_cues_from_vision(vision_result: Dict[str, Any], page_number: int) -> List[Dict[str, Any]]: - """ - Extract visual cues from Azure Vision API results with enhanced detection capabilities. - Detects: checkboxes, filled areas, handwritten text, signatures, tables, and form elements - - Args: - vision_result: The response from Azure Vision API - page_number: Current page being processed + # Process with AI Vision for additional insights + vision_analysis = analyze_image_with_vision(vision_config, file_content) + + # Display complete Vision output + display_complete_vision_output(vision_analysis, "- Azure AI Vision Analysis") + + # Add vision analysis to layout data + layout_data["vision_analysis"] = vision_analysis + + except Exception as e: + logging.warning(f"Vision analysis failed (continuing without it): {e}") + layout_data["vision_analysis_error"] = str(e) - Returns: - List of detected visual cues with their properties and confidence scores - """ - cues: List[Dict[str, Any]] = [] - - if not vision_result: - logging.warning(f"Empty vision result for page {page_number}") - return cues - - # Enhanced object detection with better classification - if 'objects' in vision_result: - for obj in vision_result['objects']: - if 'rectangle' in obj: - rect = obj['rectangle'] - x, y = rect.get('x', 0), rect.get('y', 0) - w, h = rect.get('w', 0), rect.get('h', 0) - confidence = obj.get('confidence', 0.0) + # LLM SEMANTIC ANALYSIS + log_processing_step("LLM Semantic Analysis", "Analyzing content with Azure OpenAI") + + try: + # Prepare content for LLM analysis + prepared_content = prepare_content_for_llm(layout_data, "pdf") + + # Analyze with LLM + llm_analysis = analyze_content_with_llm( + openai_client, + prepared_content, + deployment_name=os.getenv("AZURE_OPENAI_GPT4_DEPLOYMENT") + ) + + # Display complete LLM output + display_complete_llm_output(llm_analysis) + + # Add LLM analysis to layout data + layout_data["llm_analysis"] = llm_analysis + + except Exception as e: + logging.warning(f"LLM analysis failed (continuing without it): {e}") + layout_data["llm_analysis_error"] = str(e) + + # FINAL OUTPUT DISPLAY + log_processing_step("Final Output Generation", "Displaying complete processing results") + + # Display the final concatenated output with all processing results + display_final_concatenated_output(layout_data) + + # OPTIONAL: STORE IN COSMOS DB + cosmos_endpoint = os.getenv("COSMOS_DB_ENDPOINT") + cosmos_key = os.getenv("COSMOS_DB_KEY") + + if cosmos_endpoint and cosmos_key: + try: + log_processing_step("Data Storage", "Storing results in Cosmos DB") - # Improved checkbox detection with confidence threshold - if 0.8 <= w/h <= 1.2 and 10 <= w <= 50 and 10 <= h <= 50 and confidence > 0.6: - cues.append({ - "page_number": page_number, - "x": x, - "y": y, - "width": w, - "height": h, - "cue_type": "checkbox", - "confidence": confidence, - "metadata": { - "aspect_ratio": w/h, - "area": w * h - } - }) + # Initialize Cosmos client and containers + cosmos_client = initialize_cosmos_client(cosmos_endpoint, cosmos_key) + database = create_database_if_not_exists(cosmos_client, "DocumentAnalysisDB") + container = create_container_if_not_exists(database, "ProcessedDocuments") - # Detect possible table structures - elif w > 100 and h > 100 and 'table' in obj.get('tags', []): - cues.append({ - "page_number": page_number, - "x": x, - "y": y, - "width": w, - "height": h, - "cue_type": "table", - "confidence": confidence - }) - - # Enhanced color analysis for form elements - if 'color' in vision_result: - color_info = vision_result['color'] - dominant_colors = color_info.get('dominantColors', []) - for color in dominant_colors: - color_lower = color.lower() - if color_lower in ['gray', 'grey']: - cues.append({ - "page_number": page_number, - "cue_type": "filled_area", - "color": color_lower, - "confidence": color_info.get('dominantColorConfidence', 0.0), - "metadata": { - "color_scheme": color_info.get('accentColor'), - "is_black_and_white": color_info.get('isBWImg', False) - } - }) - - # Enhanced text analysis with better handwriting and signature detection - if 'text' in vision_result: - for text_result in vision_result.get('text', {}).get('lines', []): - content = text_result.get('content', '').strip() - confidence = text_result.get('confidence', 0.0) - - if text_result.get('isHandwritten', False): - cue_type = "signature" if _is_likely_signature(content) else "handwritten" - cues.append({ - "page_number": page_number, - "text": content, - "cue_type": cue_type, - "confidence": confidence, - "metadata": { - "length": len(content), - "position": text_result.get('boundingBox', {}), - "detected_language": text_result.get('language', 'unknown') - } - }) - - # Log what we found - if cues: - logging.info(f"Found {len(cues)} visual cues on page {page_number}: {[c['cue_type'] for c in cues]}") - else: - logging.info(f"No visual cues detected on page {page_number}") - - return cues - -def _is_likely_signature(text: str) -> bool: - """ - Detect if the given text is likely to be a signature based on heuristics. - - Args: - text: The text content to analyze + # Prepare and store document + document_for_storage = prepare_document_for_storage(layout_data, original_filename) + stored_doc = store_document(container, document_for_storage) + + layout_data["storage_info"] = { + "stored": True, + "document_id": stored_doc["id"], + "timestamp": stored_doc["timestamp"] + } + + except Exception as e: + logging.warning(f"Storage failed (continuing without it): {e}") + layout_data["storage_error"] = str(e) - Returns: - bool: True if the text matches signature patterns - """ - # Common signature indicators - signature_indicators = [ - lambda t: len(t.split()) <= 3, # Most signatures are 1-3 words - lambda t: any(c.isalpha() for c in t), # Contains letters - lambda t: len(t) < 50, # Not too long - lambda t: not t.isupper(), # Not all uppercase (unlikely for signatures) - lambda t: not any(c.isdigit() for c in t) # Usually no numbers in signatures - ] - - return all(indicator(text) for indicator in signature_indicators) - -def convert_pdf_to_images(pdf_bytes: bytes) -> List[Image.Image]: - images = convert_from_bytes(pdf_bytes) - return images - -def extract_skill_selections_from_table(table_data): - """ - Given a table_data dict (as in your layout_data['pages'][x]['tables'][y]), - returns a list of dicts: [{"skill": ..., "selected": ...}, ...] - Assumes first column is skill name, columns 2-7 are options 0-5. - """ - skills = [] - for row in range(table_data["row_count"]): - skill_name = None - selected = None - for cell in table_data["cells"]: - if cell["row_index"] == row: - col = cell["column_index"] - content = cell["content"].replace("\n", " ").strip() - # First column is skill name - if col == 0: - skill_name = content - # Columns 2-7 are options 0-5 - elif 2 <= col <= 7: - if ":selected:" in content: - selected = col - 2 # 0-based - if skill_name and selected is not None: - skills.append({"skill": skill_name, "selected": selected}) - return skills - -def infer_table_title(table_data, page_lines): - """ - Try to infer the table title by looking for text above the table or in the first row/merged cells. - page_lines: list of all lines on the page (in order) - """ - # Find the minimum row_index in the table (should be 0) - min_row = min(cell["row_index"] for cell in table_data["cells"]) - # Get all cells in the first row - first_row_cells = [cell for cell in table_data["cells"] if cell["row_index"] == min_row] - # If any cell in the first row spans all columns, treat as title - for cell in first_row_cells: - if cell.get("column_span", 1) == table_data["column_count"] and cell["content"].strip(): - return cell["content"].strip() - # Otherwise, look for a line above the first row that is not in the table - # Find the topmost cell's content - top_cell_content = None - if first_row_cells: - top_cell_content = first_row_cells[0]["content"].strip() - # Try to find a line above the table that is not the top cell content - if page_lines and top_cell_content: - for idx, line in enumerate(page_lines): - if line.strip() == top_cell_content and idx > 0: - # Return the previous line as the title - prev_line = page_lines[idx-1].strip() - if prev_line: - return prev_line - # Fallback: use the top cell content if not empty - if top_cell_content: - return top_cell_content - return "Unknown Table" - -@app.blob_trigger(arg_name="myblob", path="pdfinvoices/{name}", - connection="invoicecontosostorage_STORAGE") -def BlobTriggerContosoPDFLayoutsDocIntelligence(myblob: func.InputStream) -> None: - logging.info(f"Python blob trigger function processed blob\n" - f"Name: {myblob.name}\n" - f"Blob Size: {myblob.length} bytes") - - try: - form_recognizer_client = initialize_form_recognizer_client() - pdf_bytes = read_pdf_content(myblob) - logging.info("Successfully read PDF content from blob.") - except Exception as e: - logging.error(f"Error reading PDF: {e}") - return - - try: - result = analyze_pdf(form_recognizer_client, pdf_bytes) - logging.info("Successfully analyzed PDF using Document Intelligence.") - except Exception as e: - logging.error(f"Error analyzing PDF: {e}") - return - - # --- Step: Convert PDF to image and call Azure AI Vision --- - visual_cues = [] - try: - # Validate Vision API credentials - vision_key = os.getenv("VISION_API_KEY") - vision_endpoint = os.getenv("VISION_API_ENDPOINT") + # CALCULATE PROCESSING TIME + end_time = datetime.now() + processing_time_info = calculate_processing_time(start_time, end_time) + + layout_data["processing_time"] = processing_time_info + + log_processing_step( + "Processing Complete", + f"Total time: {processing_time_info['duration_formatted']}" + ) + + logging.info(f"Successfully processed blob: {blob_name}") - if not vision_key or not vision_endpoint: - logging.warning("Vision API credentials not configured - skipping visual cue detection") - else: - images = convert_pdf_to_images(pdf_bytes) - if not images: - logging.warning("No images extracted from PDF") - else: - for page_num, image in enumerate(images, start=1): - img_bytes_io = BytesIO() - image.save(img_bytes_io, format='JPEG') - img_bytes = img_bytes_io.getvalue() - vision_result = call_vision_api(img_bytes, vision_key, vision_endpoint) - cues = extract_visual_cues_from_vision(vision_result, page_num) - visual_cues.extend(cues) - logging.info(f"Visual cues extracted: {visual_cues}") - except Exception as e: - logging.error(f"Error processing visual cues with AI Vision: {e}") - # Continue processing without visual cues - - try: - layout_data = extract_layout_data(result, visual_cues, myblob.name) - logging.info("Successfully extracted and merged layout data.") - except Exception as e: - logging.error(f"Error extracting layout data: {e}") - return - - try: - save_layout_data_to_cosmos(layout_data) - logging.info("Successfully saved layout data to Cosmos DB.") except Exception as e: - logging.error(f"Error saving layout data to Cosmos DB: {e}") - - # For each table, infer the title, create both DataFrame-like and summary JSON, log both, and save only the summary JSON - for page in layout_data["pages"]: - page_lines = page.get("lines", []) - for table in page["tables"]: - # --- Table Title Inference --- - table_title = infer_table_title(table, page_lines) - - # --- DataFrame-like JSON --- - # Build a 2D array of cell contents - df_like = [[None for _ in range(table["column_count"])] for _ in range(table["row_count"]) ] - for cell in table["cells"]: - r, c = cell["row_index"], cell["column_index"] - df_like[r][c] = cell["content"].strip() - df_json = { - "table_title": table_title, - "data": df_like - } - - # --- Pretty-print table as grid --- - def pretty_print_table(table_title, df_like): - # Find max width for each column - if not df_like or not df_like[0]: - return "(Empty table)" - col_widths = [max(len(str(row[c])) if row[c] is not None else 0 for row in df_like) for c in range(len(df_like[0]))] - lines = [] - lines.append(f"Table: {table_title}") - border = "+" + "+".join("-" * (w+2) for w in col_widths) + "+" - lines.append(border) - for i, row in enumerate(df_like): - row_str = "|" + "|".join(f" {str(cell) if cell is not None else '' :<{col_widths[j]}} " for j, cell in enumerate(row)) + "|" - lines.append(row_str) - lines.append(border) - return "\n".join(lines) - - pretty_table_str = pretty_print_table(table_title, df_like) - logging.info(f"\n{pretty_table_str}") - - # --- Summary JSON --- - skill_selections = extract_skill_selections_from_table(table) - summary = { - "table_title": table_title, - "skills": skill_selections - } - - # Log both outputs for user inspection - logging.info(f"Table DataFrame-like JSON: {json.dumps(df_json, indent=2)}") - logging.info(f"Table summary JSON: {json.dumps(summary, indent=2)}") - # Only save the summary JSON if needed (e.g., to Cosmos DB or elsewhere) - # (Current implementation saves only the main layout_data to Cosmos DB) + # Define a default blob name in case we fail early + blob_info = "unknown blob" + + # Only use blob_name if it was defined before the error + if 'blob_name' in dir(): # safer than using locals() + blob_info = f"blob {blob_name}" + + logging.error(f"Document analysis failed for {blob_info}: {e}") + logging.error(f"Traceback: {traceback.format_exc()}") + raise diff --git a/src/modules/clients/azure_clients.py b/src/modules/clients/azure_clients.py new file mode 100644 index 0000000..af3f1a2 --- /dev/null +++ b/src/modules/clients/azure_clients.py @@ -0,0 +1,69 @@ +""" +Client Manager Module +Handles initialization of all Azure service clients +""" + +import os +import logging +from azure.ai.formrecognizer import DocumentAnalysisClient +from azure.core.credentials import AzureKeyCredential +from azure.identity import DefaultAzureCredential +from openai import AzureOpenAI + + +def initialize_form_recognizer_client(): + """Initialize Azure Document Intelligence client""" + endpoint = os.getenv("FORM_RECOGNIZER_ENDPOINT") + key = os.getenv("FORM_RECOGNIZER_KEY") + + if not isinstance(key, str): + raise ValueError("FORM_RECOGNIZER_KEY must be a string") + + logging.info(f"Form Recognizer endpoint: {endpoint}") + return DocumentAnalysisClient(endpoint=endpoint, credential=AzureKeyCredential(key)) + + +def initialize_openai_client(): + """Initialize the Azure OpenAI client for LLM processing""" + endpoint = os.getenv("AZURE_OPENAI_ENDPOINT") + key = os.getenv("AZURE_OPENAI_KEY") + api_version = os.getenv("AZURE_OPENAI_API_VERSION", "2024-02-15-preview") + + if not endpoint or not key: + logging.warning("Azure OpenAI configuration missing or incomplete") + return None + + try: + client = AzureOpenAI( + azure_endpoint=endpoint, + api_key=key, + api_version=api_version + ) + logging.info(f"Azure OpenAI client initialized with API version: {api_version}") + return client + except Exception as e: + logging.error(f"Failed to initialize Azure OpenAI client: {e}") + return None + + +def get_vision_api_config(): + """Get the Vision API configuration from environment variables""" + key = os.getenv("VISION_API_KEY") + endpoint = os.getenv("VISION_API_ENDPOINT") + + supported_versions = ["2024-04-01", "2024-02-01-preview", "2023-10-01"] + configured_version = os.getenv("VISION_API_VERSION", "2024-04-01") + + config = { + "key": key, + "endpoint": endpoint, + "version": configured_version, + "fallback_versions": supported_versions + } + + if key and endpoint: + logging.info(f"Vision API configuration loaded (API version: {configured_version})") + else: + logging.warning("Vision API configuration missing or incomplete") + + return config diff --git a/src/modules/output/display_manager.py b/src/modules/output/display_manager.py new file mode 100644 index 0000000..5ba06ea --- /dev/null +++ b/src/modules/output/display_manager.py @@ -0,0 +1,98 @@ +""" +Output Displayer Module +Handles comprehensive output display for all processing results +""" + +import logging +import json + + +def display_complete_vision_output(vision_result, processing_stage=""): + """Display complete AI Vision analysis output""" + logging.info("=" * 80) + logging.info(f"== COMPLETE AI VISION ANALYSIS OUTPUT {processing_stage} ==") + logging.info("=" * 80) + try: + complete_vision_output = json.dumps(vision_result, indent=2, ensure_ascii=False) + logging.info(f"Full AI Vision Analysis Results:\n{complete_vision_output}") + except Exception as e: + logging.warning(f"Could not display complete Vision analysis: {e}") + logging.info(f"Vision Analysis (string format): {str(vision_result)}") + logging.info("=" * 80) + + +def display_complete_llm_output(llm_result): + """Display complete LLM analysis output""" + logging.info("=" * 80) + logging.info("== COMPLETE LLM ANALYSIS OUTPUT ==") + logging.info("=" * 80) + try: + complete_llm_output = json.dumps(llm_result, indent=2, ensure_ascii=False) + logging.info(f"Full LLM Analysis Results:\n{complete_llm_output}") + except Exception as e: + logging.warning(f"Could not display complete LLM analysis: {e}") + logging.info(f"LLM Analysis (string format): {str(llm_result)}") + logging.info("=" * 80) + + +def display_final_concatenated_output(layout_data): + """Display the final concatenated output with all processing results""" + logging.info("=" * 80) + logging.info("== FINAL CONCATENATED PDF INFORMATION OUTPUT ==") + logging.info("== ALL PROCESSING RESULTS COMBINED ==") + logging.info("=" * 80) + + try: + final_complete_output = json.dumps(layout_data, indent=2, ensure_ascii=False) + logging.info("COMPLETE FINAL OUTPUT (All AI Processing Results):") + logging.info(final_complete_output) + except Exception as e: + logging.warning(f"Could not display complete final output as JSON: {e}") + _display_structured_fallback(layout_data) + + logging.info("=" * 80) + logging.info("== END OF COMPLETE PDF INFORMATION ==") + logging.info("=" * 80) + + +def _display_structured_fallback(layout_data): + """Fallback structured display when JSON fails""" + logging.info("COMPLETE FINAL OUTPUT (Structured Display):") + logging.info(f"Document ID: {layout_data.get('id', 'Unknown')}") + logging.info(f"File Type: {layout_data.get('file_type', 'Unknown')}") + logging.info(f"Original Filename: {layout_data.get('original_filename', 'Unknown')}") + + # Display pages information + if 'pages' in layout_data: + logging.info(f"Number of Pages: {len(layout_data['pages'])}") + for page_idx, page in enumerate(layout_data['pages']): + logging.info(f"--- PAGE {page_idx + 1} ---") + + if 'lines' in page: + logging.info(f"Text Lines ({len(page['lines'])}):") + for line in page['lines']: + logging.info(f" {line}") + + if 'tables' in page: + logging.info(f"Tables ({len(page['tables'])}):") + for table_idx, table in enumerate(page['tables']): + logging.info(f" Table {table_idx + 1}: {table.get('row_count', 0)} rows × {table.get('column_count', 0)} columns") + if 'cells' in table: + for cell in table['cells']: + logging.info(f" [R{cell.get('row_index', 0)},C{cell.get('column_index', 0)}]: {cell.get('content', '')}") + + # Display Vision Analysis if available + if 'vision_analysis' in layout_data: + logging.info("--- AI VISION ANALYSIS ---") + for key, value in layout_data['vision_analysis'].items(): + logging.info(f" {key}: {value}") + + # Display LLM Analysis if available + if 'llm_analysis' in layout_data: + logging.info("--- LLM ANALYSIS ---") + llm_data = layout_data['llm_analysis'] + if isinstance(llm_data, dict): + for key, value in llm_data.items(): + logging.info(f" {key}: {value}") + else: + logging.info(f" {llm_data}") diff --git a/src/modules/processors/document_intelligence.py b/src/modules/processors/document_intelligence.py new file mode 100644 index 0000000..3dc97c6 --- /dev/null +++ b/src/modules/processors/document_intelligence.py @@ -0,0 +1,90 @@ +""" +Document Processor Module +Handles Document Intelligence processing and data extraction +""" + +import logging +import uuid + + +def analyze_pdf(form_recognizer_client, pdf_bytes): + """Analyze PDF using Azure Document Intelligence""" + logging.info("Starting PDF layout analysis.") + poller = form_recognizer_client.begin_analyze_document( + model_id="prebuilt-layout", + document=pdf_bytes + ) + logging.info("PDF layout analysis in progress.") + result = poller.result() + logging.info("PDF layout analysis completed.") + logging.info(f"Document has {len(result.pages)} page(s), {len(result.tables)} table(s), and {len(result.styles)} style(s).") + return result + + +def extract_layout_data(result): + """Extract structured data from Document Intelligence results""" + logging.info("Extracting layout data from analysis result.") + + layout_data = { + "id": str(uuid.uuid4()), + "pages": [] + } + + # Log styles + for idx, style in enumerate(result.styles): + content_type = "handwritten" if style.is_handwritten else "no handwritten" + logging.info(f"Document contains {content_type} content") + + # Process each page + for page in result.pages: + logging.info(f"--- Page {page.page_number} ---") + page_data = { + "page_number": page.page_number, + "lines": [line.content for line in page.lines], + "tables": [], + "selection_marks": [ + {"state": mark.state, "confidence": mark.confidence} + for mark in page.selection_marks + ] + } + + # Log extracted lines + for line_idx, line in enumerate(page.lines): + logging.info(f"Line {line_idx}: '{line.content}'") + + # Log selection marks + for selection_mark in page.selection_marks: + logging.info( + f"Selection mark is '{selection_mark.state}' with confidence {selection_mark.confidence}" + ) + + # Extract tables + page_tables = [ + table for table in result.tables + if any(region.page_number == page.page_number for region in table.bounding_regions) + ] + + for table_index, table in enumerate(page_tables): + logging.info(f"Table {table_index}: {table.row_count} rows, {table.column_count} columns") + + table_data = { + "row_count": table.row_count, + "column_count": table.column_count, + "cells": [] + } + + for cell in table.cells: + cell_data = { + "row_index": cell.row_index, + "column_index": cell.column_index, + "content": cell.content, + "row_span": cell.row_span, + "column_span": cell.column_span + } + table_data["cells"].append(cell_data) + + page_data["tables"].append(table_data) + + layout_data["pages"].append(page_data) + + return layout_data diff --git a/src/modules/processors/llm_processing.py b/src/modules/processors/llm_processing.py new file mode 100644 index 0000000..cffd055 --- /dev/null +++ b/src/modules/processors/llm_processing.py @@ -0,0 +1,96 @@ +""" +LLM Analyzer Module +Handles Azure OpenAI LLM processing +""" + +import logging +import json +import os + + +def analyze_content_with_llm(client, content_text, deployment_name=None, images=None, prompt=None): + """Process content using Azure OpenAI with or without images""" + if not client: + logging.warning("No Azure OpenAI client available, skipping LLM analysis") + return None + + try: + if not prompt: + prompt = """You are an expert document analyzer. Analyze the provided content and extract key information. + Identify: + 1. Document type (invoice, form, report, etc.) + 2. Key entities (people, companies, places) + 3. Important dates and amounts + 4. Main purpose of the document + 5. Any notable observations + + Format your response as a structured JSON with these sections. + """ + + # Use the provided deployment or fall back to environment variable + deployment_id = deployment_name or os.getenv("AZURE_OPENAI_GPT4_DEPLOYMENT", "gpt-4") + messages = [{"role": "system", "content": prompt}] + + # Add text content + messages.append({"role": "user", "content": content_text[:8000]}) + + # Add image content if available + if images and len(images) > 0: + content_items = [{"type": "text", "text": "Analyze this document:"}] + + for i, img_base64 in enumerate(images[:5]): + content_items.append({ + "type": "image_url", + "image_url": {"url": f"data:image/jpeg;base64,{img_base64}"} + }) + + messages.append({"role": "user", "content": content_items}) + + logging.info(f"Calling Azure OpenAI with deployment: {deployment_id}") + response = client.chat.completions.create( + model=deployment_id, + messages=messages, + max_tokens=1024, + temperature=0.0 + ) + + result_text = response.choices[0].message.content + + # Try to parse JSON response + try: + if "```json" in result_text and "```" in result_text.split("```json", 1)[1]: + json_str = result_text.split("```json", 1)[1].split("```", 1)[0] + result = json.loads(json_str) + else: + result = json.loads(result_text) + except json.JSONDecodeError: + result = {"analysis": result_text} + + logging.info("Successfully received and processed LLM response") + return result + + except Exception as e: + logging.error(f"Error in LLM processing: {e}") + return {"error": str(e)} + + +def prepare_content_for_llm(layout_data, file_format): + """Prepare content text from layout data for LLM processing""" + content_text = "" + + if file_format == 'pdf': + for page in layout_data['pages']: + content_text += f"\n--- PAGE {page['page_number']} ---\n" + content_text += "\n".join(page['lines']) + + for i, table in enumerate(page['tables']): + content_text += f"\n--- TABLE {i+1} ---\n" + for cell in table['cells']: + content_text += f"[Row {cell['row_index']}, Col {cell['column_index']}]: {cell['content']}\n" + elif file_format == 'image': + content_text = f"Image caption: {layout_data['vision_analysis']['caption']}\n" + content_text += "Extracted text:\n" + for line in layout_data['pages'][0]['lines']: + content_text += f"{line}\n" + + return content_text diff --git a/src/modules/processors/vision_processing.py b/src/modules/processors/vision_processing.py new file mode 100644 index 0000000..b859251 --- /dev/null +++ b/src/modules/processors/vision_processing.py @@ -0,0 +1,99 @@ +""" +Vision Analyzer Module +Handles Azure AI Vision API processing +""" + +import logging +import uuid +import time +import requests +from io import BytesIO + + +def analyze_image_with_vision(image_bytes, vision_config, request_id=None): + """Analyze an image using Azure AI Vision API""" + if not vision_config.get("endpoint") or not vision_config.get("key"): + logging.warning("Vision API configuration is missing, skipping vision analysis") + return None + + req_id = request_id or str(uuid.uuid4())[:8] + logging.info(f"[Vision-{req_id}] Starting image analysis with Azure AI Vision") + + vision_endpoint = vision_config.get("endpoint") + vision_key = vision_config.get("key") + current_version = vision_config.get("version", "2024-04-01") + + try: + # Build API URL based on version + if current_version.startswith("2024"): + analyze_url = f"{vision_endpoint}/computervision/imageanalysis:analyze?api-version={current_version}&features=caption,read" + else: + analyze_url = f"{vision_endpoint}/computervision/imageanalysis:analyze?api-version={current_version}&features=caption,read" + + headers = { + 'Content-Type': 'application/octet-stream', + 'Ocp-Apim-Subscription-Key': vision_key, + 'x-ms-client-request-id': req_id + } + + logging.info(f"[Vision-{req_id}] Making request to: {analyze_url}") + + start_time = time.time() + response = requests.post(analyze_url, headers=headers, data=image_bytes, timeout=30) + api_latency = time.time() - start_time + + logging.info(f"[Vision-{req_id}] Response received in {api_latency:.2f}s with status {response.status_code}") + + response.raise_for_status() + result = response.json() + + # Add tracking information + result['request_id'] = req_id + result['api_version_used'] = current_version + + logging.info(f"[Vision-{req_id}] Successfully processed with API version {current_version}") + return result + + except Exception as e: + logging.error(f"[Vision-{req_id}] Vision API error: {str(e)}") + return { + "error": "Vision API failed", + "details": str(e), + "api_version": current_version + } + + +def process_image_file(pdf_bytes, vision_config, invocation_id): + """Process image files using Vision API""" + vision_result = analyze_image_with_vision(pdf_bytes, vision_config, request_id=invocation_id) + + if vision_result and 'error' not in vision_result: + # Extract text lines from Vision API response + text_lines = [] + if 'read' in vision_result and 'blocks' in vision_result['read']: + for block in vision_result['read']['blocks']: + if 'lines' in block: + for line in block['lines']: + if 'text' in line: + text_lines.append(line['text']) + + layout_data = { + "id": str(uuid.uuid4()), + "file_type": "image", + "pages": [{ + "page_number": 1, + "lines": text_lines, + "tables": [], + "selection_marks": [] + }], + "vision_analysis": { + "caption": vision_result.get("caption", {}).get("text", ""), + "confidence": vision_result.get("caption", {}).get("confidence", 0), + "api_version": vision_config.get("version", "unknown") + } + } + + return layout_data, vision_result + else: + logging.error(f"[Job-{invocation_id}] Vision API processing failed for image") + return None, None diff --git a/src/modules/storage/cosmos_manager.py b/src/modules/storage/cosmos_manager.py new file mode 100644 index 0000000..8ff042a --- /dev/null +++ b/src/modules/storage/cosmos_manager.py @@ -0,0 +1,130 @@ +""" +Storage Manager Module +Handles data persistence operations with Azure Cosmos DB +""" + +import logging +import json +from datetime import datetime +import azure.cosmos.cosmos_client as cosmos_client +import azure.cosmos.exceptions as exceptions + + +def initialize_cosmos_client(endpoint, key): + """Initialize and return a Cosmos DB client""" + return cosmos_client.CosmosClient(endpoint, key) + + +def create_database_if_not_exists(client, database_name): + """Create database if it doesn't exist""" + try: + database = client.create_database_if_not_exists(id=database_name) + logging.info(f"Database '{database_name}' ready") + return database + except exceptions.CosmosHttpResponseError as e: + logging.error(f"Failed to create/access database: {e}") + raise + + +def create_container_if_not_exists(database, container_name, partition_key_path="/id"): + """Create container if it doesn't exist""" + try: + container = database.create_container_if_not_exists( + id=container_name, + partition_key={"paths": [partition_key_path], "kind": "Hash"}, + offer_throughput=400 + ) + logging.info(f"Container '{container_name}' ready") + return container + except exceptions.CosmosHttpResponseError as e: + logging.error(f"Failed to create/access container: {e}") + raise + + +def prepare_document_for_storage(layout_data, original_filename=None): + """Prepare the layout data for storage with metadata""" + document = { + "id": layout_data.get("id", f"doc_{int(datetime.now().timestamp())}"), + "timestamp": datetime.now().isoformat(), + "original_filename": original_filename or layout_data.get("original_filename", "unknown"), + "file_type": layout_data.get("file_type", "pdf"), + "processing_status": "completed", + "content": layout_data + } + + # Ensure all nested data is JSON serializable + try: + json.dumps(document) + except (TypeError, ValueError) as e: + logging.warning(f"Document contains non-serializable data: {e}") + document["content"] = str(layout_data) + document["serialization_issue"] = str(e) + + return document + + +def store_document(container, document): + """Store document in Cosmos DB container""" + try: + stored_item = container.create_item(body=document) + logging.info(f"Document stored successfully with ID: {stored_item['id']}") + return stored_item + except exceptions.CosmosHttpResponseError as e: + logging.error(f"Failed to store document: {e}") + raise + + +def retrieve_document(container, document_id, partition_key=None): + """Retrieve document from Cosmos DB container""" + try: + if partition_key is None: + partition_key = document_id + + item = container.read_item(item=document_id, partition_key=partition_key) + logging.info(f"Document retrieved successfully: {document_id}") + return item + except exceptions.CosmosResourceNotFoundError: + logging.warning(f"Document not found: {document_id}") + return None + except exceptions.CosmosHttpResponseError as e: + logging.error(f"Failed to retrieve document: {e}") + raise + + +def query_documents(container, query, parameters=None): + """Query documents from Cosmos DB container""" + try: + items = list(container.query_items( + query=query, + parameters=parameters or [], + enable_cross_partition_query=True + )) + logging.info(f"Query returned {len(items)} documents") + return items + except exceptions.CosmosHttpResponseError as e: + logging.error(f"Failed to query documents: {e}") + raise + + +def update_document(container, document_id, updates, partition_key=None): + """Update an existing document in Cosmos DB""" + try: + if partition_key is None: + partition_key = document_id + + # First retrieve the existing document + existing_doc = retrieve_document(container, document_id, partition_key) + if not existing_doc: + raise ValueError(f"Document {document_id} not found for update") + + # Apply updates + existing_doc.update(updates) + existing_doc["last_updated"] = datetime.now().isoformat() + + # Replace the document + updated_item = container.replace_item(item=document_id, body=existing_doc) + logging.info(f"Document updated successfully: {document_id}") + return updated_item + except exceptions.CosmosHttpResponseError as e: + logging.error(f"Failed to update document: {e}") + raise diff --git a/src/modules/utils/data_helpers.py b/src/modules/utils/data_helpers.py new file mode 100644 index 0000000..544c3e6 --- /dev/null +++ b/src/modules/utils/data_helpers.py @@ -0,0 +1,24 @@ +""" +Data Helper Functions +Utilities for data manipulation and processing +""" + +import logging + + +def safe_get_nested_value(dictionary, keys, default=None): + """Safely get nested dictionary value using dot notation keys""" + try: + value = dictionary + for key in keys: + value = value[key] + return value + except (KeyError, TypeError): + return default + + +def truncate_text(text, max_length=100, suffix="..."): + """Truncate text to specified length with suffix""" + if len(text) <= max_length: + return text + return text[:max_length - len(suffix)] + suffix diff --git a/src/modules/utils/file_helpers.py b/src/modules/utils/file_helpers.py new file mode 100644 index 0000000..c3b70ca --- /dev/null +++ b/src/modules/utils/file_helpers.py @@ -0,0 +1,94 @@ +""" +File Helper Functions +Utilities for file operations and encoding +""" + +import logging +import base64 +import uuid +import os +import mimetypes +from datetime import datetime + + +def generate_document_id(): + """Generate a unique document ID""" + return str(uuid.uuid4()) + + +def encode_file_to_base64(file_path): + """Encode a file to base64 string""" + try: + with open(file_path, "rb") as file: + encoded_string = base64.b64encode(file.read()).decode('utf-8') + return encoded_string + except Exception as e: + logging.error(f"Failed to encode file to base64: {e}") + raise + + +def decode_base64_to_bytes(base64_string): + """Decode base64 string to bytes""" + try: + return base64.b64decode(base64_string) + except Exception as e: + logging.error(f"Failed to decode base64 string: {e}") + raise + + +def get_file_info(file_path): + """Get file information including size, type, and timestamps""" + try: + stat_info = os.stat(file_path) + mime_type, _ = mimetypes.guess_type(file_path) + + return { + "filename": os.path.basename(file_path), + "size_bytes": stat_info.st_size, + "mime_type": mime_type, + "created": datetime.fromtimestamp(stat_info.st_ctime).isoformat(), + "modified": datetime.fromtimestamp(stat_info.st_mtime).isoformat(), + "accessed": datetime.fromtimestamp(stat_info.st_atime).isoformat() + } + except Exception as e: + logging.error(f"Failed to get file info: {e}") + raise + + +def validate_file_type(file_path, allowed_extensions=None): + """Validate file type based on extension""" + if allowed_extensions is None: + allowed_extensions = ['.pdf', '.png', '.jpg', '.jpeg', '.bmp', '.tiff'] + + file_extension = os.path.splitext(file_path)[1].lower() + is_valid = file_extension in allowed_extensions + + if not is_valid: + logging.warning(f"File type {file_extension} not in allowed types: {allowed_extensions}") + + return is_valid + + +def sanitize_filename(filename): + """Sanitize filename by removing invalid characters""" + invalid_chars = '<>:"/\\|?*' + sanitized = filename + for char in invalid_chars: + sanitized = sanitized.replace(char, '_') + return sanitized + + +def cleanup_temp_files(file_paths): + """Clean up temporary files""" + cleaned_count = 0 + for file_path in file_paths: + try: + if os.path.exists(file_path): + os.remove(file_path) + cleaned_count += 1 + logging.info(f"Cleaned up temp file: {file_path}") + except Exception as e: + logging.warning(f"Failed to clean up temp file {file_path}: {e}") + + logging.info(f"Cleaned up {cleaned_count} temporary files") + return cleaned_count diff --git a/src/modules/utils/logging_helpers.py b/src/modules/utils/logging_helpers.py new file mode 100644 index 0000000..7fcf257 --- /dev/null +++ b/src/modules/utils/logging_helpers.py @@ -0,0 +1,25 @@ +""" +Logging Helper Functions +Utilities for consistent logging and formatting +""" + +import logging +from datetime import datetime + + +def log_processing_step(step_name, details=None): + """Log a processing step with consistent formatting""" + separator = "-" * 50 + logging.info(separator) + logging.info(f"PROCESSING STEP: {step_name}") + if details: + logging.info(f"Details: {details}") + logging.info(f"Timestamp: {format_timestamp()}") + logging.info(separator) + + +def format_timestamp(timestamp=None): + """Format timestamp for logging and display""" + if timestamp is None: + timestamp = datetime.now() + return timestamp.strftime("%Y-%m-%d %H:%M:%S") diff --git a/src/modules/utils/time_helpers.py b/src/modules/utils/time_helpers.py new file mode 100644 index 0000000..a64004d --- /dev/null +++ b/src/modules/utils/time_helpers.py @@ -0,0 +1,20 @@ +""" +Time Helper Functions +Utilities for time calculations and processing +""" + +from datetime import datetime + + +def calculate_processing_time(start_time, end_time=None): + """Calculate processing time duration""" + if end_time is None: + end_time = datetime.now() + + duration = end_time - start_time + return { + "duration_seconds": duration.total_seconds(), + "duration_formatted": str(duration), + "start_time": start_time.isoformat(), + "end_time": end_time.isoformat() + } diff --git a/src/modules/utils/validation.py b/src/modules/utils/validation.py new file mode 100644 index 0000000..4b44039 --- /dev/null +++ b/src/modules/utils/validation.py @@ -0,0 +1,23 @@ +""" +Validation Helper Functions +Environment validation and configuration checking +""" + +import os +import logging + + +def validate_required_env_vars(required_vars): + """Validate that required environment variables are set""" + missing_vars = [] + for var in required_vars: + if not os.getenv(var): + missing_vars.append(var) + + if missing_vars: + error_msg = f"Missing required environment variables: {missing_vars}" + logging.error(error_msg) + raise ValueError(error_msg) + + logging.info("All required environment variables are set") + return True diff --git a/src/requirements.txt b/src/requirements.txt index 0c757b4..2933fe7 100644 --- a/src/requirements.txt +++ b/src/requirements.txt @@ -2,11 +2,23 @@ # The Python Worker is managed by Azure Functions platform # Manually managing azure-functions-worker may cause unexpected issues -azure-functions -azure-ai-formrecognizer -azure-core -azure-cosmos==4.3.0 -azure-identity==1.7.0 -Pillow==10.0.1 -pdf2image==1.16.3 -requests==2.31.0 +# Core Azure Functions dependencies +azure-functions>=1.18.0,<2.0.0 + +# Azure AI and Document Processing - Essential +azure-ai-formrecognizer>=3.3.0,<4.0.0 +azure-core>=1.29.0,<2.0.0 +azure-cosmos>=4.3.0,<5.0.0 +azure-identity>=1.15.0,<2.0.0 + +# HTTP requests - Essential +requests>=2.31.0,<3.0.0 + +# Image Processing +Pillow>=10.0.1,<11.0.0 + +# Azure OpenAI for LLM processing +openai>=1.3.0,<2.0.0 + +# Essential utilities +python-dateutil>=2.8.0,<3.0.0 diff --git a/terraform-infrastructure/README.md b/terraform-infrastructure/README.md index 754b370..e1235fb 100644 --- a/terraform-infrastructure/README.md +++ b/terraform-infrastructure/README.md @@ -109,7 +109,7 @@ graph TD;
- Total views -

Refresh Date: 2025-07-25

+ Total views +

Refresh Date: 2025-07-29