diff --git a/README.md b/README.md
index d72f934..c2e6382 100644
--- a/README.md
+++ b/README.md
@@ -8,7 +8,7 @@ Costa Rica
[](https://github.com/)
[brown9804](https://github.com/brown9804)
-Last updated: 2025-07-25
+Last updated: 2025-07-29
-----------------------------
@@ -17,6 +17,8 @@ Last updated: 2025-07-25
- Table structure and text are extracted using Azure Document Intelligence (Layout model).
- Visual selection cues are detected using Azure AI Vision or image preprocessing.
- Visual indicators are mapped to structured data, returning only the selected values in a clean JSON format.
+- Advanced semantic understanding is provided by Azure OpenAI to analyze document content and context.
+- Multiple file formats are supported, including PDFs and various image formats.
- The logic is abstracted to support multiple layout variations, so the system adapts easily to new document formats and selection styles.
> [!IMPORTANT]
@@ -65,11 +67,14 @@ Last updated: 2025-07-25
-> How to extract layout elements from PDFs stored in an Azure Storage Account, process them using Azure Document Intelligence, and store the results in Cosmos DB for further analysis.
+> `How can you extract layout, text, visual, and other elements` from `PDFs` stored in an Azure Storage Account, process them using Azure AI services, and `store the results` in Cosmos DB for `further analysis?` This solution is `designed to accelerate the process` of building your own implementation. Please `feel free to use any of the provided reference.` I'm happy to contribute. Once this solution is deployed:
>
-> 1. Upload your PDFs to an Azure Blob Storage container.
-> 2. An Azure Function is triggered by the upload, which calls the Azure Document Intelligence Layout API to analyze the document structure.
-> 3. The extracted layout data (such as tables, checkboxes, and text) is parsed and subsequently stored in a Cosmos DB database, ensuring a seamless and automated workflow from document upload to data storage.
+> 1. Upload your documents: Just `drop your PDFs or images into an Azure Storage container`and the system takes over from there.
+> 2. Automated intelligent processing: Behind the scenes, `Azure Functions orchestrates a powerful AI workflow`:
+> - Document Intelligence pulls out tables, text, and form data
+> - AI Vision spots visual cues like checkmarks and highlights
+> - Azure OpenAI understands what the document actually means
+> 3. Centralized information management: `All extracted data is stored in Cosmos DB`, organized and accessible. The system `adapts to differents document layouts without requiring custom code for each format.`
> [!NOTE]
> Advantages of Document Intelligence for organizations handling with large volumes of documents:
@@ -447,7 +452,7 @@ Last updated: 2025-07-25
-

-
Refresh Date: 2025-07-25
+

+
Refresh Date: 2025-07-29
diff --git a/src/function_app.py b/src/function_app.py
index 69e73ad..f2fc626 100644
--- a/src/function_app.py
+++ b/src/function_app.py
@@ -1,506 +1,226 @@
+"""
+Modular PDF Layout Extraction with Azure AI Document Intelligence
+Supporting Multiple Document Versions with Visual Selection Cues
+
+This Azure Function provides comprehensive PDF analysis using Azure's built-in capabilities:
+1. Azure Document Intelligence for structured extraction (primary PDF processing)
+2. Azure AI Vision for image analysis (complementary visual processing)
+3. Azure OpenAI for semantic analysis and document understanding
+4. Native Azure cloud processing without external dependencies
+
+Modular Architecture:
+- Separate module files for different functional areas
+- Easier to code, debug, and maintain
+- Clear separation of concerns
+"""
+
+# IMPORTS AND SETUP
import logging
import azure.functions as func
-from azure.ai.formrecognizer import DocumentAnalysisClient, AnalyzeResult
-from azure.core.credentials import AzureKeyCredential
-from azure.cosmos import CosmosClient, PartitionKey, exceptions
-from azure.identity import DefaultAzureCredential
-import os
-import uuid
-import json
-from datetime import datetime
import time
-from typing import List, Dict, Any, Optional
-from PIL import Image
+import traceback
+import os
+from typing import Dict, Any, List, Optional, Union
from io import BytesIO
-import requests # For REST API to Vision
-from pdf2image import convert_from_bytes # For PDF to image conversion
+from datetime import datetime
+# Import functions from modules
+from modules.clients.azure_clients import (
+ initialize_form_recognizer_client,
+ initialize_openai_client,
+ get_vision_api_config
+)
+from modules.processors.document_intelligence import (
+ analyze_pdf,
+ extract_layout_data
+)
+from modules.processors.vision_processing import (
+ analyze_image_with_vision,
+ process_image_file
+)
+from modules.processors.llm_processing import (
+ analyze_content_with_llm,
+ prepare_content_for_llm
+)
+from modules.output.display_manager import (
+ display_complete_vision_output,
+ display_complete_llm_output,
+ display_final_concatenated_output
+)
+from modules.storage.cosmos_manager import (
+ initialize_cosmos_client,
+ create_database_if_not_exists,
+ create_container_if_not_exists,
+ prepare_document_for_storage,
+ store_document
+)
+from modules.utils.file_helpers import generate_document_id, get_file_info
+from modules.utils.validation import validate_required_env_vars
+from modules.utils.logging_helpers import log_processing_step
+from modules.utils.time_helpers import calculate_processing_time
+
+# Initialize the function app
app = func.FunctionApp(http_auth_level=func.AuthLevel.FUNCTION)
-## DEFINITIONS
-def initialize_form_recognizer_client() -> DocumentAnalysisClient:
- endpoint = os.getenv("FORM_RECOGNIZER_ENDPOINT")
- key = os.getenv("FORM_RECOGNIZER_KEY")
- if not isinstance(key, str):
- raise ValueError("FORM_RECOGNIZER_KEY must be a string")
- logging.info(f"Form Recognizer endpoint: {endpoint}")
- return DocumentAnalysisClient(endpoint=endpoint, credential=AzureKeyCredential(key))
-
-def read_pdf_content(myblob: func.InputStream) -> bytes:
- logging.info(f"Reading PDF content from blob: {myblob.name}")
- return myblob.read()
-
-def analyze_pdf(form_recognizer_client: DocumentAnalysisClient, pdf_bytes: bytes) -> AnalyzeResult:
- logging.info("Starting PDF layout analysis.")
- poller = form_recognizer_client.begin_analyze_document(
- model_id="prebuilt-layout",
- document=pdf_bytes
- )
- logging.info("PDF layout analysis in progress.")
- result = poller.result()
- logging.info("PDF layout analysis completed.")
- num_pages = len(result.pages) if hasattr(result, "pages") and isinstance(result.pages, list) else 0
- num_tables = len(result.tables) if hasattr(result, "tables") and isinstance(result.tables, list) else 0
- num_styles = len(result.styles) if hasattr(result, "styles") and result.styles is not None else 0
- logging.info(f"Document has {num_pages} page(s), {num_tables} table(s), and {num_styles} style(s).")
- return result
-
-def extract_layout_data(result: AnalyzeResult, visual_cues: Optional[List[Dict[str, Any]]] = None, source_file: str = "unknown") -> Dict[str, Any]:
- logging.info("Extracting layout data from analysis result.")
-
- layout_data = {
- "id": str(uuid.uuid4()),
- "metadata": {
- "processed_at": datetime.utcnow().isoformat(),
- "source_file": source_file,
- "pages_count": len(result.pages) if hasattr(result, "pages") else 0,
- "tables_count": len(result.tables) if hasattr(result, "tables") else 0,
- "visual_cues_count": len(visual_cues) if visual_cues else 0
- },
- "pages": []
- }
- visual_cues = visual_cues or [] # List of dicts with visual cue info per cell
-
- # Log styles
- if hasattr(result, "styles") and result.styles:
- for idx, style in enumerate(result.styles):
- content_type = "handwritten" if style.is_handwritten else "no handwritten"
- logging.info(f"Document contains {content_type} content")
-
- # Process each page
- for page in result.pages:
- logging.info(f"--- Page {page.page_number} ---")
- page_data = {
- "page_number": page.page_number,
- "lines": [line.content for line in page.lines],
- "tables": [],
- "selection_marks": [
- {"state": mark.state, "confidence": mark.confidence}
- for mark in page.selection_marks
- ] if hasattr(page, 'selection_marks') and page.selection_marks else []
- }
-
- # Log extracted lines
- for line_idx, line in enumerate(page.lines):
- logging.info(f"Line {line_idx}: '{line.content}'")
-
- # Log selection marks
- if hasattr(page, 'selection_marks') and page.selection_marks:
- for selection_mark in page.selection_marks:
- logging.info(
- f"Selection mark is '{selection_mark.state}' with confidence {selection_mark.confidence}"
- )
-
- # Extract tables
- page_tables = [
- table for table in result.tables
- if any(region.page_number == page.page_number for region in table.bounding_regions)
- ] if hasattr(result, 'tables') and result.tables else []
-
- for table_index, table in enumerate(page_tables):
- logging.info(f"Table {table_index}: {table.row_count} rows, {table.column_count} columns")
-
- table_data = {
- "row_count": table.row_count,
- "column_count": table.column_count,
- "cells": []
- }
-
- for cell in table.cells:
- content = cell.content.strip()
- # Find matching visual cue for this cell (if any)
- cue = next((vc for vc in visual_cues if vc.get("page_number") == page.page_number and vc.get("row_index") == cell.row_index and vc.get("column_index") == cell.column_index), None)
- cell_info = {
- "row_index": cell.row_index,
- "column_index": cell.column_index,
- "content": content,
- "visual_cue": cue["cue_type"] if cue else None
- }
- table_data["cells"].append(cell_info)
- logging.info(f"Cell[{cell.row_index}][{cell.column_index}]: '{content}', visual_cue: {cell_info['visual_cue']}")
-
- page_data["tables"].append(table_data)
-
- layout_data["pages"].append(page_data)
-
- try:
- preview = json.dumps(layout_data, indent=2)
- logging.info("Structured layout data preview:\n" + preview)
- except Exception as e:
- logging.warning(f"Could not serialize layout data for preview: {e}")
-
- return layout_data
-
-def save_layout_data_to_cosmos(layout_data: Dict[str, Any]) -> None:
- try:
- endpoint = os.getenv("COSMOS_DB_ENDPOINT")
- key = os.getenv("COSMOS_DB_KEY")
- aad_credentials = DefaultAzureCredential()
- client = CosmosClient(endpoint, credential=aad_credentials, consistency_level='Session')
- logging.info("Successfully connected to Cosmos DB using AAD default credential")
- except Exception as e:
- logging.error(f"Error connecting to Cosmos DB: {e}")
- return
+# MAIN AZURE FUNCTION
+@app.blob_trigger(arg_name="myblob", path="pdfinvoices/{name}",
+ connection="invoicecontosostorage_STORAGE")
+def BlobTriggerPDFsMultiLayoutsAIDocIntelligence(myblob: func.InputStream) -> None:
+ """
+ Blob trigger Azure Function for comprehensive PDF document analysis
+ Processes PDF files using Azure Document Intelligence, AI Vision, and OpenAI
+ """
+ start_time = datetime.now()
- database_name = "ContosoDBDocIntellig"
- container_name = "Layouts"
-
try:
- database = client.create_database_if_not_exists(database_name)
- logging.info(f"Database '{database_name}' does not exist. Creating it.")
- except exceptions.CosmosResourceExistsError:
- database = client.get_database_client(database_name)
- logging.info(f"Database '{database_name}' already exists.")
-
- database.read()
- logging.info(f"Reading into '{database_name}' DB")
-
- try:
- container = database.create_container(
- id=container_name,
- partition_key=PartitionKey(path="/id"),
- offer_throughput=400
- )
- logging.info(f"Container '{container_name}' does not exist. Creating it.")
- except exceptions.CosmosResourceExistsError:
- container = database.get_container_client(container_name)
- logging.info(f"Container '{container_name}' already exists.")
- except exceptions.CosmosHttpResponseError:
- raise
-
- container.read()
- logging.info(f"Reading into '{container}' container")
+ # Get blob information
+ blob_name = myblob.name
+ file_content = myblob.read()
+
+ log_processing_step("Starting Document Analysis", f"Processing blob: {blob_name}")
+
+ # Validate required environment variables
+ required_env_vars = [
+ "FORM_RECOGNIZER_ENDPOINT",
+ "FORM_RECOGNIZER_KEY",
+ "AZURE_OPENAI_ENDPOINT",
+ "AZURE_OPENAI_KEY",
+ "AZURE_OPENAI_GPT4_DEPLOYMENT",
+ "VISION_API_ENDPOINT",
+ "VISION_API_KEY"
+ ]
+ validate_required_env_vars(required_env_vars)
+
+ # Generate unique document ID
+ document_id = generate_document_id()
+
+ # Extract filename from blob path
+ original_filename = blob_name.split('/')[-1] if '/' in blob_name else blob_name
+
+ log_processing_step("File Processing", f"Processing file: {original_filename}")
+
+ # Initialize Azure clients
+ log_processing_step("Client Initialization", "Setting up Azure service clients")
+
+ # Initialize Form Recognizer client
+ form_recognizer_client = initialize_form_recognizer_client()
- try:
- response = container.upsert_item(layout_data)
- logging.info(f"Saved processed layout data to Cosmos DB. Response: {response}")
- except Exception as e:
- logging.error(f"Error inserting item into Cosmos DB: {e}")
+ # Initialize OpenAI client
+ openai_client = initialize_openai_client()
-def call_vision_api(image_bytes: bytes, subscription_key: str, endpoint: str, max_retries: int = 3) -> Dict[str, Any]:
- vision_url = endpoint + "/vision/v3.2/analyze"
- headers = {
- 'Ocp-Apim-Subscription-Key': subscription_key,
- 'Content-Type': 'application/octet-stream'
- }
- params = {
- 'visualFeatures': 'Objects,Color,Text', # Added Text feature for better text detection
- 'language': 'en',
- 'model-version': 'latest'
- }
-
- for attempt in range(max_retries):
+ # Get Vision API configuration
+ vision_config = get_vision_api_config()
+
+ # DOCUMENT INTELLIGENCE PROCESSING
+ log_processing_step("Document Intelligence Analysis", "Analyzing PDF with Azure Document Intelligence")
+
+ # Analyze PDF with Document Intelligence
+ document_result = analyze_pdf(form_recognizer_client, file_content)
+
+ # Extract layout data
+ layout_data = extract_layout_data(document_result)
+
+ # Add document ID and filename to layout data
+ layout_data["document_id"] = document_id
+ layout_data["filename"] = original_filename
+
+ log_processing_step("Document Intelligence Complete", f"Extracted {len(layout_data.get('pages', []))} pages")
+
+ # AI VISION PROCESSING
+ log_processing_step("AI Vision Analysis", "Processing with Azure AI Vision")
+
try:
- response = requests.post(vision_url, headers=headers, params=params, data=image_bytes)
- response.raise_for_status()
- return response.json()
- except requests.exceptions.HTTPError as http_err:
- if hasattr(http_err, 'response') and http_err.response.status_code == 429: # Too Many Requests
- if attempt < max_retries - 1:
- retry_after = int(http_err.response.headers.get('Retry-After', 1))
- logging.warning(f"Rate limit hit, waiting {retry_after} seconds...")
- time.sleep(retry_after)
- continue
- logging.error(f"HTTP error occurred: {http_err}")
- raise
- except Exception as err:
- logging.error(f"Error calling Vision API: {err}")
- if attempt < max_retries - 1:
- time.sleep(2 ** attempt) # Exponential backoff
- continue
- raise
-
- raise Exception("Max retries exceeded for Vision API call")
-
-def extract_visual_cues_from_vision(vision_result: Dict[str, Any], page_number: int) -> List[Dict[str, Any]]:
- """
- Extract visual cues from Azure Vision API results with enhanced detection capabilities.
- Detects: checkboxes, filled areas, handwritten text, signatures, tables, and form elements
-
- Args:
- vision_result: The response from Azure Vision API
- page_number: Current page being processed
+ # Process with AI Vision for additional insights
+ vision_analysis = analyze_image_with_vision(vision_config, file_content)
+
+ # Display complete Vision output
+ display_complete_vision_output(vision_analysis, "- Azure AI Vision Analysis")
+
+ # Add vision analysis to layout data
+ layout_data["vision_analysis"] = vision_analysis
+
+ except Exception as e:
+ logging.warning(f"Vision analysis failed (continuing without it): {e}")
+ layout_data["vision_analysis_error"] = str(e)
- Returns:
- List of detected visual cues with their properties and confidence scores
- """
- cues: List[Dict[str, Any]] = []
-
- if not vision_result:
- logging.warning(f"Empty vision result for page {page_number}")
- return cues
-
- # Enhanced object detection with better classification
- if 'objects' in vision_result:
- for obj in vision_result['objects']:
- if 'rectangle' in obj:
- rect = obj['rectangle']
- x, y = rect.get('x', 0), rect.get('y', 0)
- w, h = rect.get('w', 0), rect.get('h', 0)
- confidence = obj.get('confidence', 0.0)
+ # LLM SEMANTIC ANALYSIS
+ log_processing_step("LLM Semantic Analysis", "Analyzing content with Azure OpenAI")
+
+ try:
+ # Prepare content for LLM analysis
+ prepared_content = prepare_content_for_llm(layout_data, "pdf")
+
+ # Analyze with LLM
+ llm_analysis = analyze_content_with_llm(
+ openai_client,
+ prepared_content,
+ deployment_name=os.getenv("AZURE_OPENAI_GPT4_DEPLOYMENT")
+ )
+
+ # Display complete LLM output
+ display_complete_llm_output(llm_analysis)
+
+ # Add LLM analysis to layout data
+ layout_data["llm_analysis"] = llm_analysis
+
+ except Exception as e:
+ logging.warning(f"LLM analysis failed (continuing without it): {e}")
+ layout_data["llm_analysis_error"] = str(e)
+
+ # FINAL OUTPUT DISPLAY
+ log_processing_step("Final Output Generation", "Displaying complete processing results")
+
+ # Display the final concatenated output with all processing results
+ display_final_concatenated_output(layout_data)
+
+ # OPTIONAL: STORE IN COSMOS DB
+ cosmos_endpoint = os.getenv("COSMOS_DB_ENDPOINT")
+ cosmos_key = os.getenv("COSMOS_DB_KEY")
+
+ if cosmos_endpoint and cosmos_key:
+ try:
+ log_processing_step("Data Storage", "Storing results in Cosmos DB")
- # Improved checkbox detection with confidence threshold
- if 0.8 <= w/h <= 1.2 and 10 <= w <= 50 and 10 <= h <= 50 and confidence > 0.6:
- cues.append({
- "page_number": page_number,
- "x": x,
- "y": y,
- "width": w,
- "height": h,
- "cue_type": "checkbox",
- "confidence": confidence,
- "metadata": {
- "aspect_ratio": w/h,
- "area": w * h
- }
- })
+ # Initialize Cosmos client and containers
+ cosmos_client = initialize_cosmos_client(cosmos_endpoint, cosmos_key)
+ database = create_database_if_not_exists(cosmos_client, "DocumentAnalysisDB")
+ container = create_container_if_not_exists(database, "ProcessedDocuments")
- # Detect possible table structures
- elif w > 100 and h > 100 and 'table' in obj.get('tags', []):
- cues.append({
- "page_number": page_number,
- "x": x,
- "y": y,
- "width": w,
- "height": h,
- "cue_type": "table",
- "confidence": confidence
- })
-
- # Enhanced color analysis for form elements
- if 'color' in vision_result:
- color_info = vision_result['color']
- dominant_colors = color_info.get('dominantColors', [])
- for color in dominant_colors:
- color_lower = color.lower()
- if color_lower in ['gray', 'grey']:
- cues.append({
- "page_number": page_number,
- "cue_type": "filled_area",
- "color": color_lower,
- "confidence": color_info.get('dominantColorConfidence', 0.0),
- "metadata": {
- "color_scheme": color_info.get('accentColor'),
- "is_black_and_white": color_info.get('isBWImg', False)
- }
- })
-
- # Enhanced text analysis with better handwriting and signature detection
- if 'text' in vision_result:
- for text_result in vision_result.get('text', {}).get('lines', []):
- content = text_result.get('content', '').strip()
- confidence = text_result.get('confidence', 0.0)
-
- if text_result.get('isHandwritten', False):
- cue_type = "signature" if _is_likely_signature(content) else "handwritten"
- cues.append({
- "page_number": page_number,
- "text": content,
- "cue_type": cue_type,
- "confidence": confidence,
- "metadata": {
- "length": len(content),
- "position": text_result.get('boundingBox', {}),
- "detected_language": text_result.get('language', 'unknown')
- }
- })
-
- # Log what we found
- if cues:
- logging.info(f"Found {len(cues)} visual cues on page {page_number}: {[c['cue_type'] for c in cues]}")
- else:
- logging.info(f"No visual cues detected on page {page_number}")
-
- return cues
-
-def _is_likely_signature(text: str) -> bool:
- """
- Detect if the given text is likely to be a signature based on heuristics.
-
- Args:
- text: The text content to analyze
+ # Prepare and store document
+ document_for_storage = prepare_document_for_storage(layout_data, original_filename)
+ stored_doc = store_document(container, document_for_storage)
+
+ layout_data["storage_info"] = {
+ "stored": True,
+ "document_id": stored_doc["id"],
+ "timestamp": stored_doc["timestamp"]
+ }
+
+ except Exception as e:
+ logging.warning(f"Storage failed (continuing without it): {e}")
+ layout_data["storage_error"] = str(e)
- Returns:
- bool: True if the text matches signature patterns
- """
- # Common signature indicators
- signature_indicators = [
- lambda t: len(t.split()) <= 3, # Most signatures are 1-3 words
- lambda t: any(c.isalpha() for c in t), # Contains letters
- lambda t: len(t) < 50, # Not too long
- lambda t: not t.isupper(), # Not all uppercase (unlikely for signatures)
- lambda t: not any(c.isdigit() for c in t) # Usually no numbers in signatures
- ]
-
- return all(indicator(text) for indicator in signature_indicators)
-
-def convert_pdf_to_images(pdf_bytes: bytes) -> List[Image.Image]:
- images = convert_from_bytes(pdf_bytes)
- return images
-
-def extract_skill_selections_from_table(table_data):
- """
- Given a table_data dict (as in your layout_data['pages'][x]['tables'][y]),
- returns a list of dicts: [{"skill": ..., "selected": ...}, ...]
- Assumes first column is skill name, columns 2-7 are options 0-5.
- """
- skills = []
- for row in range(table_data["row_count"]):
- skill_name = None
- selected = None
- for cell in table_data["cells"]:
- if cell["row_index"] == row:
- col = cell["column_index"]
- content = cell["content"].replace("\n", " ").strip()
- # First column is skill name
- if col == 0:
- skill_name = content
- # Columns 2-7 are options 0-5
- elif 2 <= col <= 7:
- if ":selected:" in content:
- selected = col - 2 # 0-based
- if skill_name and selected is not None:
- skills.append({"skill": skill_name, "selected": selected})
- return skills
-
-def infer_table_title(table_data, page_lines):
- """
- Try to infer the table title by looking for text above the table or in the first row/merged cells.
- page_lines: list of all lines on the page (in order)
- """
- # Find the minimum row_index in the table (should be 0)
- min_row = min(cell["row_index"] for cell in table_data["cells"])
- # Get all cells in the first row
- first_row_cells = [cell for cell in table_data["cells"] if cell["row_index"] == min_row]
- # If any cell in the first row spans all columns, treat as title
- for cell in first_row_cells:
- if cell.get("column_span", 1) == table_data["column_count"] and cell["content"].strip():
- return cell["content"].strip()
- # Otherwise, look for a line above the first row that is not in the table
- # Find the topmost cell's content
- top_cell_content = None
- if first_row_cells:
- top_cell_content = first_row_cells[0]["content"].strip()
- # Try to find a line above the table that is not the top cell content
- if page_lines and top_cell_content:
- for idx, line in enumerate(page_lines):
- if line.strip() == top_cell_content and idx > 0:
- # Return the previous line as the title
- prev_line = page_lines[idx-1].strip()
- if prev_line:
- return prev_line
- # Fallback: use the top cell content if not empty
- if top_cell_content:
- return top_cell_content
- return "Unknown Table"
-
-@app.blob_trigger(arg_name="myblob", path="pdfinvoices/{name}",
- connection="invoicecontosostorage_STORAGE")
-def BlobTriggerContosoPDFLayoutsDocIntelligence(myblob: func.InputStream) -> None:
- logging.info(f"Python blob trigger function processed blob\n"
- f"Name: {myblob.name}\n"
- f"Blob Size: {myblob.length} bytes")
-
- try:
- form_recognizer_client = initialize_form_recognizer_client()
- pdf_bytes = read_pdf_content(myblob)
- logging.info("Successfully read PDF content from blob.")
- except Exception as e:
- logging.error(f"Error reading PDF: {e}")
- return
-
- try:
- result = analyze_pdf(form_recognizer_client, pdf_bytes)
- logging.info("Successfully analyzed PDF using Document Intelligence.")
- except Exception as e:
- logging.error(f"Error analyzing PDF: {e}")
- return
-
- # --- Step: Convert PDF to image and call Azure AI Vision ---
- visual_cues = []
- try:
- # Validate Vision API credentials
- vision_key = os.getenv("VISION_API_KEY")
- vision_endpoint = os.getenv("VISION_API_ENDPOINT")
+ # CALCULATE PROCESSING TIME
+ end_time = datetime.now()
+ processing_time_info = calculate_processing_time(start_time, end_time)
+
+ layout_data["processing_time"] = processing_time_info
+
+ log_processing_step(
+ "Processing Complete",
+ f"Total time: {processing_time_info['duration_formatted']}"
+ )
+
+ logging.info(f"Successfully processed blob: {blob_name}")
- if not vision_key or not vision_endpoint:
- logging.warning("Vision API credentials not configured - skipping visual cue detection")
- else:
- images = convert_pdf_to_images(pdf_bytes)
- if not images:
- logging.warning("No images extracted from PDF")
- else:
- for page_num, image in enumerate(images, start=1):
- img_bytes_io = BytesIO()
- image.save(img_bytes_io, format='JPEG')
- img_bytes = img_bytes_io.getvalue()
- vision_result = call_vision_api(img_bytes, vision_key, vision_endpoint)
- cues = extract_visual_cues_from_vision(vision_result, page_num)
- visual_cues.extend(cues)
- logging.info(f"Visual cues extracted: {visual_cues}")
- except Exception as e:
- logging.error(f"Error processing visual cues with AI Vision: {e}")
- # Continue processing without visual cues
-
- try:
- layout_data = extract_layout_data(result, visual_cues, myblob.name)
- logging.info("Successfully extracted and merged layout data.")
- except Exception as e:
- logging.error(f"Error extracting layout data: {e}")
- return
-
- try:
- save_layout_data_to_cosmos(layout_data)
- logging.info("Successfully saved layout data to Cosmos DB.")
except Exception as e:
- logging.error(f"Error saving layout data to Cosmos DB: {e}")
-
- # For each table, infer the title, create both DataFrame-like and summary JSON, log both, and save only the summary JSON
- for page in layout_data["pages"]:
- page_lines = page.get("lines", [])
- for table in page["tables"]:
- # --- Table Title Inference ---
- table_title = infer_table_title(table, page_lines)
-
- # --- DataFrame-like JSON ---
- # Build a 2D array of cell contents
- df_like = [[None for _ in range(table["column_count"])] for _ in range(table["row_count"]) ]
- for cell in table["cells"]:
- r, c = cell["row_index"], cell["column_index"]
- df_like[r][c] = cell["content"].strip()
- df_json = {
- "table_title": table_title,
- "data": df_like
- }
-
- # --- Pretty-print table as grid ---
- def pretty_print_table(table_title, df_like):
- # Find max width for each column
- if not df_like or not df_like[0]:
- return "(Empty table)"
- col_widths = [max(len(str(row[c])) if row[c] is not None else 0 for row in df_like) for c in range(len(df_like[0]))]
- lines = []
- lines.append(f"Table: {table_title}")
- border = "+" + "+".join("-" * (w+2) for w in col_widths) + "+"
- lines.append(border)
- for i, row in enumerate(df_like):
- row_str = "|" + "|".join(f" {str(cell) if cell is not None else '' :<{col_widths[j]}} " for j, cell in enumerate(row)) + "|"
- lines.append(row_str)
- lines.append(border)
- return "\n".join(lines)
-
- pretty_table_str = pretty_print_table(table_title, df_like)
- logging.info(f"\n{pretty_table_str}")
-
- # --- Summary JSON ---
- skill_selections = extract_skill_selections_from_table(table)
- summary = {
- "table_title": table_title,
- "skills": skill_selections
- }
-
- # Log both outputs for user inspection
- logging.info(f"Table DataFrame-like JSON: {json.dumps(df_json, indent=2)}")
- logging.info(f"Table summary JSON: {json.dumps(summary, indent=2)}")
- # Only save the summary JSON if needed (e.g., to Cosmos DB or elsewhere)
- # (Current implementation saves only the main layout_data to Cosmos DB)
+ # Define a default blob name in case we fail early
+ blob_info = "unknown blob"
+
+ # Only use blob_name if it was defined before the error
+ if 'blob_name' in dir(): # safer than using locals()
+ blob_info = f"blob {blob_name}"
+
+ logging.error(f"Document analysis failed for {blob_info}: {e}")
+ logging.error(f"Traceback: {traceback.format_exc()}")
+ raise
diff --git a/src/modules/clients/azure_clients.py b/src/modules/clients/azure_clients.py
new file mode 100644
index 0000000..af3f1a2
--- /dev/null
+++ b/src/modules/clients/azure_clients.py
@@ -0,0 +1,69 @@
+"""
+Client Manager Module
+Handles initialization of all Azure service clients
+"""
+
+import os
+import logging
+from azure.ai.formrecognizer import DocumentAnalysisClient
+from azure.core.credentials import AzureKeyCredential
+from azure.identity import DefaultAzureCredential
+from openai import AzureOpenAI
+
+
+def initialize_form_recognizer_client():
+ """Initialize Azure Document Intelligence client"""
+ endpoint = os.getenv("FORM_RECOGNIZER_ENDPOINT")
+ key = os.getenv("FORM_RECOGNIZER_KEY")
+
+ if not isinstance(key, str):
+ raise ValueError("FORM_RECOGNIZER_KEY must be a string")
+
+ logging.info(f"Form Recognizer endpoint: {endpoint}")
+ return DocumentAnalysisClient(endpoint=endpoint, credential=AzureKeyCredential(key))
+
+
+def initialize_openai_client():
+ """Initialize the Azure OpenAI client for LLM processing"""
+ endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
+ key = os.getenv("AZURE_OPENAI_KEY")
+ api_version = os.getenv("AZURE_OPENAI_API_VERSION", "2024-02-15-preview")
+
+ if not endpoint or not key:
+ logging.warning("Azure OpenAI configuration missing or incomplete")
+ return None
+
+ try:
+ client = AzureOpenAI(
+ azure_endpoint=endpoint,
+ api_key=key,
+ api_version=api_version
+ )
+ logging.info(f"Azure OpenAI client initialized with API version: {api_version}")
+ return client
+ except Exception as e:
+ logging.error(f"Failed to initialize Azure OpenAI client: {e}")
+ return None
+
+
+def get_vision_api_config():
+ """Get the Vision API configuration from environment variables"""
+ key = os.getenv("VISION_API_KEY")
+ endpoint = os.getenv("VISION_API_ENDPOINT")
+
+ supported_versions = ["2024-04-01", "2024-02-01-preview", "2023-10-01"]
+ configured_version = os.getenv("VISION_API_VERSION", "2024-04-01")
+
+ config = {
+ "key": key,
+ "endpoint": endpoint,
+ "version": configured_version,
+ "fallback_versions": supported_versions
+ }
+
+ if key and endpoint:
+ logging.info(f"Vision API configuration loaded (API version: {configured_version})")
+ else:
+ logging.warning("Vision API configuration missing or incomplete")
+
+ return config
diff --git a/src/modules/output/display_manager.py b/src/modules/output/display_manager.py
new file mode 100644
index 0000000..5ba06ea
--- /dev/null
+++ b/src/modules/output/display_manager.py
@@ -0,0 +1,98 @@
+"""
+Output Displayer Module
+Handles comprehensive output display for all processing results
+"""
+
+import logging
+import json
+
+
+def display_complete_vision_output(vision_result, processing_stage=""):
+ """Display complete AI Vision analysis output"""
+ logging.info("=" * 80)
+ logging.info(f"== COMPLETE AI VISION ANALYSIS OUTPUT {processing_stage} ==")
+ logging.info("=" * 80)
+ try:
+ complete_vision_output = json.dumps(vision_result, indent=2, ensure_ascii=False)
+ logging.info(f"Full AI Vision Analysis Results:\n{complete_vision_output}")
+ except Exception as e:
+ logging.warning(f"Could not display complete Vision analysis: {e}")
+ logging.info(f"Vision Analysis (string format): {str(vision_result)}")
+ logging.info("=" * 80)
+
+
+def display_complete_llm_output(llm_result):
+ """Display complete LLM analysis output"""
+ logging.info("=" * 80)
+ logging.info("== COMPLETE LLM ANALYSIS OUTPUT ==")
+ logging.info("=" * 80)
+ try:
+ complete_llm_output = json.dumps(llm_result, indent=2, ensure_ascii=False)
+ logging.info(f"Full LLM Analysis Results:\n{complete_llm_output}")
+ except Exception as e:
+ logging.warning(f"Could not display complete LLM analysis: {e}")
+ logging.info(f"LLM Analysis (string format): {str(llm_result)}")
+ logging.info("=" * 80)
+
+
+def display_final_concatenated_output(layout_data):
+ """Display the final concatenated output with all processing results"""
+ logging.info("=" * 80)
+ logging.info("== FINAL CONCATENATED PDF INFORMATION OUTPUT ==")
+ logging.info("== ALL PROCESSING RESULTS COMBINED ==")
+ logging.info("=" * 80)
+
+ try:
+ final_complete_output = json.dumps(layout_data, indent=2, ensure_ascii=False)
+ logging.info("COMPLETE FINAL OUTPUT (All AI Processing Results):")
+ logging.info(final_complete_output)
+ except Exception as e:
+ logging.warning(f"Could not display complete final output as JSON: {e}")
+ _display_structured_fallback(layout_data)
+
+ logging.info("=" * 80)
+ logging.info("== END OF COMPLETE PDF INFORMATION ==")
+ logging.info("=" * 80)
+
+
+def _display_structured_fallback(layout_data):
+ """Fallback structured display when JSON fails"""
+ logging.info("COMPLETE FINAL OUTPUT (Structured Display):")
+ logging.info(f"Document ID: {layout_data.get('id', 'Unknown')}")
+ logging.info(f"File Type: {layout_data.get('file_type', 'Unknown')}")
+ logging.info(f"Original Filename: {layout_data.get('original_filename', 'Unknown')}")
+
+ # Display pages information
+ if 'pages' in layout_data:
+ logging.info(f"Number of Pages: {len(layout_data['pages'])}")
+ for page_idx, page in enumerate(layout_data['pages']):
+ logging.info(f"--- PAGE {page_idx + 1} ---")
+
+ if 'lines' in page:
+ logging.info(f"Text Lines ({len(page['lines'])}):")
+ for line in page['lines']:
+ logging.info(f" {line}")
+
+ if 'tables' in page:
+ logging.info(f"Tables ({len(page['tables'])}):")
+ for table_idx, table in enumerate(page['tables']):
+ logging.info(f" Table {table_idx + 1}: {table.get('row_count', 0)} rows × {table.get('column_count', 0)} columns")
+ if 'cells' in table:
+ for cell in table['cells']:
+ logging.info(f" [R{cell.get('row_index', 0)},C{cell.get('column_index', 0)}]: {cell.get('content', '')}")
+
+ # Display Vision Analysis if available
+ if 'vision_analysis' in layout_data:
+ logging.info("--- AI VISION ANALYSIS ---")
+ for key, value in layout_data['vision_analysis'].items():
+ logging.info(f" {key}: {value}")
+
+ # Display LLM Analysis if available
+ if 'llm_analysis' in layout_data:
+ logging.info("--- LLM ANALYSIS ---")
+ llm_data = layout_data['llm_analysis']
+ if isinstance(llm_data, dict):
+ for key, value in llm_data.items():
+ logging.info(f" {key}: {value}")
+ else:
+ logging.info(f" {llm_data}")
diff --git a/src/modules/processors/document_intelligence.py b/src/modules/processors/document_intelligence.py
new file mode 100644
index 0000000..3dc97c6
--- /dev/null
+++ b/src/modules/processors/document_intelligence.py
@@ -0,0 +1,90 @@
+"""
+Document Processor Module
+Handles Document Intelligence processing and data extraction
+"""
+
+import logging
+import uuid
+
+
+def analyze_pdf(form_recognizer_client, pdf_bytes):
+ """Analyze PDF using Azure Document Intelligence"""
+ logging.info("Starting PDF layout analysis.")
+ poller = form_recognizer_client.begin_analyze_document(
+ model_id="prebuilt-layout",
+ document=pdf_bytes
+ )
+ logging.info("PDF layout analysis in progress.")
+ result = poller.result()
+ logging.info("PDF layout analysis completed.")
+ logging.info(f"Document has {len(result.pages)} page(s), {len(result.tables)} table(s), and {len(result.styles)} style(s).")
+ return result
+
+
+def extract_layout_data(result):
+ """Extract structured data from Document Intelligence results"""
+ logging.info("Extracting layout data from analysis result.")
+
+ layout_data = {
+ "id": str(uuid.uuid4()),
+ "pages": []
+ }
+
+ # Log styles
+ for idx, style in enumerate(result.styles):
+ content_type = "handwritten" if style.is_handwritten else "no handwritten"
+ logging.info(f"Document contains {content_type} content")
+
+ # Process each page
+ for page in result.pages:
+ logging.info(f"--- Page {page.page_number} ---")
+ page_data = {
+ "page_number": page.page_number,
+ "lines": [line.content for line in page.lines],
+ "tables": [],
+ "selection_marks": [
+ {"state": mark.state, "confidence": mark.confidence}
+ for mark in page.selection_marks
+ ]
+ }
+
+ # Log extracted lines
+ for line_idx, line in enumerate(page.lines):
+ logging.info(f"Line {line_idx}: '{line.content}'")
+
+ # Log selection marks
+ for selection_mark in page.selection_marks:
+ logging.info(
+ f"Selection mark is '{selection_mark.state}' with confidence {selection_mark.confidence}"
+ )
+
+ # Extract tables
+ page_tables = [
+ table for table in result.tables
+ if any(region.page_number == page.page_number for region in table.bounding_regions)
+ ]
+
+ for table_index, table in enumerate(page_tables):
+ logging.info(f"Table {table_index}: {table.row_count} rows, {table.column_count} columns")
+
+ table_data = {
+ "row_count": table.row_count,
+ "column_count": table.column_count,
+ "cells": []
+ }
+
+ for cell in table.cells:
+ cell_data = {
+ "row_index": cell.row_index,
+ "column_index": cell.column_index,
+ "content": cell.content,
+ "row_span": cell.row_span,
+ "column_span": cell.column_span
+ }
+ table_data["cells"].append(cell_data)
+
+ page_data["tables"].append(table_data)
+
+ layout_data["pages"].append(page_data)
+
+ return layout_data
diff --git a/src/modules/processors/llm_processing.py b/src/modules/processors/llm_processing.py
new file mode 100644
index 0000000..cffd055
--- /dev/null
+++ b/src/modules/processors/llm_processing.py
@@ -0,0 +1,96 @@
+"""
+LLM Analyzer Module
+Handles Azure OpenAI LLM processing
+"""
+
+import logging
+import json
+import os
+
+
+def analyze_content_with_llm(client, content_text, deployment_name=None, images=None, prompt=None):
+ """Process content using Azure OpenAI with or without images"""
+ if not client:
+ logging.warning("No Azure OpenAI client available, skipping LLM analysis")
+ return None
+
+ try:
+ if not prompt:
+ prompt = """You are an expert document analyzer. Analyze the provided content and extract key information.
+ Identify:
+ 1. Document type (invoice, form, report, etc.)
+ 2. Key entities (people, companies, places)
+ 3. Important dates and amounts
+ 4. Main purpose of the document
+ 5. Any notable observations
+
+ Format your response as a structured JSON with these sections.
+ """
+
+ # Use the provided deployment or fall back to environment variable
+ deployment_id = deployment_name or os.getenv("AZURE_OPENAI_GPT4_DEPLOYMENT", "gpt-4")
+ messages = [{"role": "system", "content": prompt}]
+
+ # Add text content
+ messages.append({"role": "user", "content": content_text[:8000]})
+
+ # Add image content if available
+ if images and len(images) > 0:
+ content_items = [{"type": "text", "text": "Analyze this document:"}]
+
+ for i, img_base64 in enumerate(images[:5]):
+ content_items.append({
+ "type": "image_url",
+ "image_url": {"url": f"data:image/jpeg;base64,{img_base64}"}
+ })
+
+ messages.append({"role": "user", "content": content_items})
+
+ logging.info(f"Calling Azure OpenAI with deployment: {deployment_id}")
+ response = client.chat.completions.create(
+ model=deployment_id,
+ messages=messages,
+ max_tokens=1024,
+ temperature=0.0
+ )
+
+ result_text = response.choices[0].message.content
+
+ # Try to parse JSON response
+ try:
+ if "```json" in result_text and "```" in result_text.split("```json", 1)[1]:
+ json_str = result_text.split("```json", 1)[1].split("```", 1)[0]
+ result = json.loads(json_str)
+ else:
+ result = json.loads(result_text)
+ except json.JSONDecodeError:
+ result = {"analysis": result_text}
+
+ logging.info("Successfully received and processed LLM response")
+ return result
+
+ except Exception as e:
+ logging.error(f"Error in LLM processing: {e}")
+ return {"error": str(e)}
+
+
+def prepare_content_for_llm(layout_data, file_format):
+ """Prepare content text from layout data for LLM processing"""
+ content_text = ""
+
+ if file_format == 'pdf':
+ for page in layout_data['pages']:
+ content_text += f"\n--- PAGE {page['page_number']} ---\n"
+ content_text += "\n".join(page['lines'])
+
+ for i, table in enumerate(page['tables']):
+ content_text += f"\n--- TABLE {i+1} ---\n"
+ for cell in table['cells']:
+ content_text += f"[Row {cell['row_index']}, Col {cell['column_index']}]: {cell['content']}\n"
+ elif file_format == 'image':
+ content_text = f"Image caption: {layout_data['vision_analysis']['caption']}\n"
+ content_text += "Extracted text:\n"
+ for line in layout_data['pages'][0]['lines']:
+ content_text += f"{line}\n"
+
+ return content_text
diff --git a/src/modules/processors/vision_processing.py b/src/modules/processors/vision_processing.py
new file mode 100644
index 0000000..b859251
--- /dev/null
+++ b/src/modules/processors/vision_processing.py
@@ -0,0 +1,99 @@
+"""
+Vision Analyzer Module
+Handles Azure AI Vision API processing
+"""
+
+import logging
+import uuid
+import time
+import requests
+from io import BytesIO
+
+
+def analyze_image_with_vision(image_bytes, vision_config, request_id=None):
+ """Analyze an image using Azure AI Vision API"""
+ if not vision_config.get("endpoint") or not vision_config.get("key"):
+ logging.warning("Vision API configuration is missing, skipping vision analysis")
+ return None
+
+ req_id = request_id or str(uuid.uuid4())[:8]
+ logging.info(f"[Vision-{req_id}] Starting image analysis with Azure AI Vision")
+
+ vision_endpoint = vision_config.get("endpoint")
+ vision_key = vision_config.get("key")
+ current_version = vision_config.get("version", "2024-04-01")
+
+ try:
+ # Build API URL based on version
+ if current_version.startswith("2024"):
+ analyze_url = f"{vision_endpoint}/computervision/imageanalysis:analyze?api-version={current_version}&features=caption,read"
+ else:
+ analyze_url = f"{vision_endpoint}/computervision/imageanalysis:analyze?api-version={current_version}&features=caption,read"
+
+ headers = {
+ 'Content-Type': 'application/octet-stream',
+ 'Ocp-Apim-Subscription-Key': vision_key,
+ 'x-ms-client-request-id': req_id
+ }
+
+ logging.info(f"[Vision-{req_id}] Making request to: {analyze_url}")
+
+ start_time = time.time()
+ response = requests.post(analyze_url, headers=headers, data=image_bytes, timeout=30)
+ api_latency = time.time() - start_time
+
+ logging.info(f"[Vision-{req_id}] Response received in {api_latency:.2f}s with status {response.status_code}")
+
+ response.raise_for_status()
+ result = response.json()
+
+ # Add tracking information
+ result['request_id'] = req_id
+ result['api_version_used'] = current_version
+
+ logging.info(f"[Vision-{req_id}] Successfully processed with API version {current_version}")
+ return result
+
+ except Exception as e:
+ logging.error(f"[Vision-{req_id}] Vision API error: {str(e)}")
+ return {
+ "error": "Vision API failed",
+ "details": str(e),
+ "api_version": current_version
+ }
+
+
+def process_image_file(pdf_bytes, vision_config, invocation_id):
+ """Process image files using Vision API"""
+ vision_result = analyze_image_with_vision(pdf_bytes, vision_config, request_id=invocation_id)
+
+ if vision_result and 'error' not in vision_result:
+ # Extract text lines from Vision API response
+ text_lines = []
+ if 'read' in vision_result and 'blocks' in vision_result['read']:
+ for block in vision_result['read']['blocks']:
+ if 'lines' in block:
+ for line in block['lines']:
+ if 'text' in line:
+ text_lines.append(line['text'])
+
+ layout_data = {
+ "id": str(uuid.uuid4()),
+ "file_type": "image",
+ "pages": [{
+ "page_number": 1,
+ "lines": text_lines,
+ "tables": [],
+ "selection_marks": []
+ }],
+ "vision_analysis": {
+ "caption": vision_result.get("caption", {}).get("text", ""),
+ "confidence": vision_result.get("caption", {}).get("confidence", 0),
+ "api_version": vision_config.get("version", "unknown")
+ }
+ }
+
+ return layout_data, vision_result
+ else:
+ logging.error(f"[Job-{invocation_id}] Vision API processing failed for image")
+ return None, None
diff --git a/src/modules/storage/cosmos_manager.py b/src/modules/storage/cosmos_manager.py
new file mode 100644
index 0000000..8ff042a
--- /dev/null
+++ b/src/modules/storage/cosmos_manager.py
@@ -0,0 +1,130 @@
+"""
+Storage Manager Module
+Handles data persistence operations with Azure Cosmos DB
+"""
+
+import logging
+import json
+from datetime import datetime
+import azure.cosmos.cosmos_client as cosmos_client
+import azure.cosmos.exceptions as exceptions
+
+
+def initialize_cosmos_client(endpoint, key):
+ """Initialize and return a Cosmos DB client"""
+ return cosmos_client.CosmosClient(endpoint, key)
+
+
+def create_database_if_not_exists(client, database_name):
+ """Create database if it doesn't exist"""
+ try:
+ database = client.create_database_if_not_exists(id=database_name)
+ logging.info(f"Database '{database_name}' ready")
+ return database
+ except exceptions.CosmosHttpResponseError as e:
+ logging.error(f"Failed to create/access database: {e}")
+ raise
+
+
+def create_container_if_not_exists(database, container_name, partition_key_path="/id"):
+ """Create container if it doesn't exist"""
+ try:
+ container = database.create_container_if_not_exists(
+ id=container_name,
+ partition_key={"paths": [partition_key_path], "kind": "Hash"},
+ offer_throughput=400
+ )
+ logging.info(f"Container '{container_name}' ready")
+ return container
+ except exceptions.CosmosHttpResponseError as e:
+ logging.error(f"Failed to create/access container: {e}")
+ raise
+
+
+def prepare_document_for_storage(layout_data, original_filename=None):
+ """Prepare the layout data for storage with metadata"""
+ document = {
+ "id": layout_data.get("id", f"doc_{int(datetime.now().timestamp())}"),
+ "timestamp": datetime.now().isoformat(),
+ "original_filename": original_filename or layout_data.get("original_filename", "unknown"),
+ "file_type": layout_data.get("file_type", "pdf"),
+ "processing_status": "completed",
+ "content": layout_data
+ }
+
+ # Ensure all nested data is JSON serializable
+ try:
+ json.dumps(document)
+ except (TypeError, ValueError) as e:
+ logging.warning(f"Document contains non-serializable data: {e}")
+ document["content"] = str(layout_data)
+ document["serialization_issue"] = str(e)
+
+ return document
+
+
+def store_document(container, document):
+ """Store document in Cosmos DB container"""
+ try:
+ stored_item = container.create_item(body=document)
+ logging.info(f"Document stored successfully with ID: {stored_item['id']}")
+ return stored_item
+ except exceptions.CosmosHttpResponseError as e:
+ logging.error(f"Failed to store document: {e}")
+ raise
+
+
+def retrieve_document(container, document_id, partition_key=None):
+ """Retrieve document from Cosmos DB container"""
+ try:
+ if partition_key is None:
+ partition_key = document_id
+
+ item = container.read_item(item=document_id, partition_key=partition_key)
+ logging.info(f"Document retrieved successfully: {document_id}")
+ return item
+ except exceptions.CosmosResourceNotFoundError:
+ logging.warning(f"Document not found: {document_id}")
+ return None
+ except exceptions.CosmosHttpResponseError as e:
+ logging.error(f"Failed to retrieve document: {e}")
+ raise
+
+
+def query_documents(container, query, parameters=None):
+ """Query documents from Cosmos DB container"""
+ try:
+ items = list(container.query_items(
+ query=query,
+ parameters=parameters or [],
+ enable_cross_partition_query=True
+ ))
+ logging.info(f"Query returned {len(items)} documents")
+ return items
+ except exceptions.CosmosHttpResponseError as e:
+ logging.error(f"Failed to query documents: {e}")
+ raise
+
+
+def update_document(container, document_id, updates, partition_key=None):
+ """Update an existing document in Cosmos DB"""
+ try:
+ if partition_key is None:
+ partition_key = document_id
+
+ # First retrieve the existing document
+ existing_doc = retrieve_document(container, document_id, partition_key)
+ if not existing_doc:
+ raise ValueError(f"Document {document_id} not found for update")
+
+ # Apply updates
+ existing_doc.update(updates)
+ existing_doc["last_updated"] = datetime.now().isoformat()
+
+ # Replace the document
+ updated_item = container.replace_item(item=document_id, body=existing_doc)
+ logging.info(f"Document updated successfully: {document_id}")
+ return updated_item
+ except exceptions.CosmosHttpResponseError as e:
+ logging.error(f"Failed to update document: {e}")
+ raise
diff --git a/src/modules/utils/data_helpers.py b/src/modules/utils/data_helpers.py
new file mode 100644
index 0000000..544c3e6
--- /dev/null
+++ b/src/modules/utils/data_helpers.py
@@ -0,0 +1,24 @@
+"""
+Data Helper Functions
+Utilities for data manipulation and processing
+"""
+
+import logging
+
+
+def safe_get_nested_value(dictionary, keys, default=None):
+ """Safely get nested dictionary value using dot notation keys"""
+ try:
+ value = dictionary
+ for key in keys:
+ value = value[key]
+ return value
+ except (KeyError, TypeError):
+ return default
+
+
+def truncate_text(text, max_length=100, suffix="..."):
+ """Truncate text to specified length with suffix"""
+ if len(text) <= max_length:
+ return text
+ return text[:max_length - len(suffix)] + suffix
diff --git a/src/modules/utils/file_helpers.py b/src/modules/utils/file_helpers.py
new file mode 100644
index 0000000..c3b70ca
--- /dev/null
+++ b/src/modules/utils/file_helpers.py
@@ -0,0 +1,94 @@
+"""
+File Helper Functions
+Utilities for file operations and encoding
+"""
+
+import logging
+import base64
+import uuid
+import os
+import mimetypes
+from datetime import datetime
+
+
+def generate_document_id():
+ """Generate a unique document ID"""
+ return str(uuid.uuid4())
+
+
+def encode_file_to_base64(file_path):
+ """Encode a file to base64 string"""
+ try:
+ with open(file_path, "rb") as file:
+ encoded_string = base64.b64encode(file.read()).decode('utf-8')
+ return encoded_string
+ except Exception as e:
+ logging.error(f"Failed to encode file to base64: {e}")
+ raise
+
+
+def decode_base64_to_bytes(base64_string):
+ """Decode base64 string to bytes"""
+ try:
+ return base64.b64decode(base64_string)
+ except Exception as e:
+ logging.error(f"Failed to decode base64 string: {e}")
+ raise
+
+
+def get_file_info(file_path):
+ """Get file information including size, type, and timestamps"""
+ try:
+ stat_info = os.stat(file_path)
+ mime_type, _ = mimetypes.guess_type(file_path)
+
+ return {
+ "filename": os.path.basename(file_path),
+ "size_bytes": stat_info.st_size,
+ "mime_type": mime_type,
+ "created": datetime.fromtimestamp(stat_info.st_ctime).isoformat(),
+ "modified": datetime.fromtimestamp(stat_info.st_mtime).isoformat(),
+ "accessed": datetime.fromtimestamp(stat_info.st_atime).isoformat()
+ }
+ except Exception as e:
+ logging.error(f"Failed to get file info: {e}")
+ raise
+
+
+def validate_file_type(file_path, allowed_extensions=None):
+ """Validate file type based on extension"""
+ if allowed_extensions is None:
+ allowed_extensions = ['.pdf', '.png', '.jpg', '.jpeg', '.bmp', '.tiff']
+
+ file_extension = os.path.splitext(file_path)[1].lower()
+ is_valid = file_extension in allowed_extensions
+
+ if not is_valid:
+ logging.warning(f"File type {file_extension} not in allowed types: {allowed_extensions}")
+
+ return is_valid
+
+
+def sanitize_filename(filename):
+ """Sanitize filename by removing invalid characters"""
+ invalid_chars = '<>:"/\\|?*'
+ sanitized = filename
+ for char in invalid_chars:
+ sanitized = sanitized.replace(char, '_')
+ return sanitized
+
+
+def cleanup_temp_files(file_paths):
+ """Clean up temporary files"""
+ cleaned_count = 0
+ for file_path in file_paths:
+ try:
+ if os.path.exists(file_path):
+ os.remove(file_path)
+ cleaned_count += 1
+ logging.info(f"Cleaned up temp file: {file_path}")
+ except Exception as e:
+ logging.warning(f"Failed to clean up temp file {file_path}: {e}")
+
+ logging.info(f"Cleaned up {cleaned_count} temporary files")
+ return cleaned_count
diff --git a/src/modules/utils/logging_helpers.py b/src/modules/utils/logging_helpers.py
new file mode 100644
index 0000000..7fcf257
--- /dev/null
+++ b/src/modules/utils/logging_helpers.py
@@ -0,0 +1,25 @@
+"""
+Logging Helper Functions
+Utilities for consistent logging and formatting
+"""
+
+import logging
+from datetime import datetime
+
+
+def log_processing_step(step_name, details=None):
+ """Log a processing step with consistent formatting"""
+ separator = "-" * 50
+ logging.info(separator)
+ logging.info(f"PROCESSING STEP: {step_name}")
+ if details:
+ logging.info(f"Details: {details}")
+ logging.info(f"Timestamp: {format_timestamp()}")
+ logging.info(separator)
+
+
+def format_timestamp(timestamp=None):
+ """Format timestamp for logging and display"""
+ if timestamp is None:
+ timestamp = datetime.now()
+ return timestamp.strftime("%Y-%m-%d %H:%M:%S")
diff --git a/src/modules/utils/time_helpers.py b/src/modules/utils/time_helpers.py
new file mode 100644
index 0000000..a64004d
--- /dev/null
+++ b/src/modules/utils/time_helpers.py
@@ -0,0 +1,20 @@
+"""
+Time Helper Functions
+Utilities for time calculations and processing
+"""
+
+from datetime import datetime
+
+
+def calculate_processing_time(start_time, end_time=None):
+ """Calculate processing time duration"""
+ if end_time is None:
+ end_time = datetime.now()
+
+ duration = end_time - start_time
+ return {
+ "duration_seconds": duration.total_seconds(),
+ "duration_formatted": str(duration),
+ "start_time": start_time.isoformat(),
+ "end_time": end_time.isoformat()
+ }
diff --git a/src/modules/utils/validation.py b/src/modules/utils/validation.py
new file mode 100644
index 0000000..4b44039
--- /dev/null
+++ b/src/modules/utils/validation.py
@@ -0,0 +1,23 @@
+"""
+Validation Helper Functions
+Environment validation and configuration checking
+"""
+
+import os
+import logging
+
+
+def validate_required_env_vars(required_vars):
+ """Validate that required environment variables are set"""
+ missing_vars = []
+ for var in required_vars:
+ if not os.getenv(var):
+ missing_vars.append(var)
+
+ if missing_vars:
+ error_msg = f"Missing required environment variables: {missing_vars}"
+ logging.error(error_msg)
+ raise ValueError(error_msg)
+
+ logging.info("All required environment variables are set")
+ return True
diff --git a/src/requirements.txt b/src/requirements.txt
index 0c757b4..2933fe7 100644
--- a/src/requirements.txt
+++ b/src/requirements.txt
@@ -2,11 +2,23 @@
# The Python Worker is managed by Azure Functions platform
# Manually managing azure-functions-worker may cause unexpected issues
-azure-functions
-azure-ai-formrecognizer
-azure-core
-azure-cosmos==4.3.0
-azure-identity==1.7.0
-Pillow==10.0.1
-pdf2image==1.16.3
-requests==2.31.0
+# Core Azure Functions dependencies
+azure-functions>=1.18.0,<2.0.0
+
+# Azure AI and Document Processing - Essential
+azure-ai-formrecognizer>=3.3.0,<4.0.0
+azure-core>=1.29.0,<2.0.0
+azure-cosmos>=4.3.0,<5.0.0
+azure-identity>=1.15.0,<2.0.0
+
+# HTTP requests - Essential
+requests>=2.31.0,<3.0.0
+
+# Image Processing
+Pillow>=10.0.1,<11.0.0
+
+# Azure OpenAI for LLM processing
+openai>=1.3.0,<2.0.0
+
+# Essential utilities
+python-dateutil>=2.8.0,<3.0.0
diff --git a/terraform-infrastructure/README.md b/terraform-infrastructure/README.md
index 754b370..e1235fb 100644
--- a/terraform-infrastructure/README.md
+++ b/terraform-infrastructure/README.md
@@ -109,7 +109,7 @@ graph TD;
-

-
Refresh Date: 2025-07-25
+

+
Refresh Date: 2025-07-29