Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ node_modules
static/

app/functions/*/prepdocslib/
app/functions/*/requirements.txt

data/**/*.md5

Expand Down
9 changes: 4 additions & 5 deletions app/backend/prepdocslib/cloudingestionstrategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,9 @@ def _build_skillset(self) -> SearchIndexerSkillset:
resource_id=self.search_user_assigned_identity_resource_id
),
inputs=[
# Provide the binary payload expected by the document extractor custom skill.
InputFieldMappingEntry(name="file_data", source="/document/file_data"),
InputFieldMappingEntry(name="file_name", source="/document/metadata_storage_name"),
InputFieldMappingEntry(name="content_type", source="/document/metadata_storage_content_type"),
# Always provide the blob URL so the function can download large files (> 16MB)
InputFieldMappingEntry(name="metadata_storage_path", source="/document/metadata_storage_path"),
# We are not using the SAS token since the functions have RBAC access via managed identity
],
outputs=[
OutputFieldMappingEntry(name="pages", target_name="pages"),
Expand Down Expand Up @@ -310,7 +309,7 @@ async def setup(self) -> None:
configuration=IndexingParametersConfiguration(
query_timeout=None, # type: ignore
data_to_extract="storageMetadata",
allow_skillset_to_read_file_data=True,
allow_skillset_to_read_file_data=False,
)
),
)
Expand Down
72 changes: 42 additions & 30 deletions app/functions/document_extractor/function_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,25 @@
Custom skill for Azure AI Search that extracts and processes document content.
"""

import base64
import io
import json
import logging
import os
from dataclasses import dataclass
from typing import Any
from urllib.parse import unquote, urlparse

import azure.functions as func
from azure.core.exceptions import HttpResponseError
from azure.identity.aio import ManagedIdentityCredential

from prepdocslib.blobmanager import BlobManager
from prepdocslib.fileprocessor import FileProcessor
from prepdocslib.page import Page
from prepdocslib.servicesetup import (
build_file_processors,
select_processor_for_filename,
setup_blob_manager,
)

app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)
Expand All @@ -31,6 +33,7 @@
class GlobalSettings:
file_processors: dict[str, FileProcessor]
azure_credential: ManagedIdentityCredential
blob_manager: BlobManager
Comment on lines 31 to +36
Copy link

Copilot AI Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The GlobalSettings dataclass has been updated to include a blob_manager field, but existing tests create mock settings without this field. This will cause TypeError exceptions in tests since blob_manager is a required field.

Tests like test_document_extractor_requires_single_record, test_document_extractor_handles_processing_exception, and test_document_extractor_process_document_http_error in tests/test_function_apps.py need to be updated to include blob_manager in their mock GlobalSettings objects.

Copilot uses AI. Check for mistakes.


settings: GlobalSettings | None = None
Expand Down Expand Up @@ -63,9 +66,18 @@ def configure_global_settings():
process_figures=use_multimodal,
)

blob_manager = setup_blob_manager(
azure_credential=azure_credential,
storage_account=os.environ["AZURE_STORAGE_ACCOUNT"],
storage_container=os.environ["AZURE_STORAGE_CONTAINER"],
storage_resource_group=os.environ["AZURE_STORAGE_RESOURCE_GROUP"],
subscription_id=os.environ["AZURE_SUBSCRIPTION_ID"],
)

settings = GlobalSettings(
file_processors=file_processors,
azure_credential=azure_credential,
blob_manager=blob_manager,
)


Expand All @@ -75,20 +87,13 @@ async def extract_document(req: func.HttpRequest) -> func.HttpResponse:
"""
Azure Search Custom Skill: Extract document content

Input format (single record; file data only):
# https://learn.microsoft.com/azure/search/cognitive-search-skill-document-intelligence-layout#skill-inputs
Input format (single record):
{
"values": [
{
"recordId": "1",
"data": {
// Base64 encoded file (skillset must enable file data)
"file_data": {
"$type": "file",
"data": "base64..."
},
// Optional
"file_name": "doc.pdf"
"metadata_storage_path": "https://<account>.blob.core.windows.net/<container>/<blob_path>"
}
}
]
Expand Down Expand Up @@ -176,45 +181,52 @@ async def process_document(data: dict[str, Any]) -> dict[str, Any]:
Process a single document: download, parse, extract figures, upload images

Args:
data: Input data with blobUrl, fileName, contentType
data: Input data with metadata_storage_path

Returns:
Dictionary with 'text' (markdown) and 'images' (list of {url, description})
"""
document_stream, file_name, content_type = get_document_stream_filedata(data)
logger.info("Processing document: %s", file_name)
if settings is None:
raise RuntimeError("Global settings not initialized")

# Get blob path from metadata_storage_path URL
# URL format: https://<account>.blob.core.windows.net/<container>/<blob_path>
storage_path = data["metadata_storage_path"]
parsed_url = urlparse(storage_path)
# Path is /<container>/<blob_path>, so split and take everything after container
path_parts = unquote(parsed_url.path).lstrip("/").split("/", 1)
if len(path_parts) < 2:
raise ValueError(f"Invalid storage path format: {storage_path}")
blob_path_within_container = path_parts[1] # Everything after the container name

logger.info("Downloading blob: %s", blob_path_within_container)
result = await settings.blob_manager.download_blob(blob_path_within_container)
if result is None:
raise ValueError(f"Blob not found: {blob_path_within_container}")

document_bytes, properties = result
document_stream = io.BytesIO(document_bytes)
document_stream.name = blob_path_within_container

logger.info("Processing document: %s", blob_path_within_container)

# Get parser from file_processors dict based on file extension
file_processor = select_processor_for_filename(file_name, settings.file_processors)
file_processor = select_processor_for_filename(blob_path_within_container, settings.file_processors)
parser = file_processor.parser

pages: list[Page] = []
try:
document_stream.seek(0)
pages = [page async for page in parser.parse(content=document_stream)]
except HttpResponseError as exc:
raise ValueError(f"Parser failed for {file_name}: {exc.message}") from exc
raise ValueError(f"Parser failed for {blob_path_within_container}: {exc.message}") from exc
finally:
document_stream.close()

components = build_document_components(file_name, pages)
components = build_document_components(blob_path_within_container, pages)
return components


def get_document_stream_filedata(data: dict[str, Any]) -> tuple[io.BytesIO, str, str]:
"""Return a BytesIO stream for file_data input only (skillset must send file bytes)."""
file_payload = data.get("file_data", {})
encoded = file_payload.get("data")
if not encoded:
raise ValueError("file_data payload missing base64 data")
document_bytes = base64.b64decode(encoded)
file_name = data.get("file_name") or data.get("fileName") or file_payload.get("name") or "document"
content_type = data.get("contentType") or file_payload.get("contentType") or "application/octet-stream"
stream = io.BytesIO(document_bytes)
stream.name = file_name
return stream, file_name, content_type


def build_document_components(file_name: str, pages: list[Page]) -> dict[str, Any]:
page_entries: list[dict[str, Any]] = []
figure_entries: list[dict[str, Any]] = []
Expand Down
Loading
Loading