-
Notifications
You must be signed in to change notification settings - Fork 5.1k
Change cloud ingestion to download blobs based off path versus file data #2858
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
042e397
f8001d8
ac63901
e908849
1dcafc8
f934143
3edfc6b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,23 +3,25 @@ | |
| Custom skill for Azure AI Search that extracts and processes document content. | ||
| """ | ||
|
|
||
| import base64 | ||
| import io | ||
| import json | ||
| import logging | ||
| import os | ||
| from dataclasses import dataclass | ||
| from typing import Any | ||
| from urllib.parse import unquote, urlparse | ||
|
|
||
| import azure.functions as func | ||
| from azure.core.exceptions import HttpResponseError | ||
| from azure.identity.aio import ManagedIdentityCredential | ||
|
|
||
| from prepdocslib.blobmanager import BlobManager | ||
| from prepdocslib.fileprocessor import FileProcessor | ||
| from prepdocslib.page import Page | ||
| from prepdocslib.servicesetup import ( | ||
| build_file_processors, | ||
| select_processor_for_filename, | ||
| setup_blob_manager, | ||
| ) | ||
|
|
||
| app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS) | ||
|
|
@@ -31,6 +33,7 @@ | |
| class GlobalSettings: | ||
| file_processors: dict[str, FileProcessor] | ||
| azure_credential: ManagedIdentityCredential | ||
| blob_manager: BlobManager | ||
|
Comment on lines
31
to
+36
|
||
|
|
||
|
|
||
| settings: GlobalSettings | None = None | ||
|
|
@@ -63,9 +66,18 @@ def configure_global_settings(): | |
| process_figures=use_multimodal, | ||
| ) | ||
|
|
||
| blob_manager = setup_blob_manager( | ||
| azure_credential=azure_credential, | ||
| storage_account=os.environ["AZURE_STORAGE_ACCOUNT"], | ||
| storage_container=os.environ["AZURE_STORAGE_CONTAINER"], | ||
| storage_resource_group=os.environ["AZURE_STORAGE_RESOURCE_GROUP"], | ||
| subscription_id=os.environ["AZURE_SUBSCRIPTION_ID"], | ||
| ) | ||
|
|
||
| settings = GlobalSettings( | ||
| file_processors=file_processors, | ||
| azure_credential=azure_credential, | ||
| blob_manager=blob_manager, | ||
| ) | ||
|
|
||
|
|
||
|
|
@@ -75,20 +87,15 @@ async def extract_document(req: func.HttpRequest) -> func.HttpResponse: | |
| """ | ||
| Azure Search Custom Skill: Extract document content | ||
|
|
||
| Input format (single record; file data only): | ||
| # https://learn.microsoft.com/azure/search/cognitive-search-skill-document-intelligence-layout#skill-inputs | ||
| Input format (single record): | ||
| { | ||
| "values": [ | ||
| { | ||
| "recordId": "1", | ||
| "data": { | ||
| // Base64 encoded file (skillset must enable file data) | ||
| "file_data": { | ||
| "$type": "file", | ||
| "data": "base64..." | ||
| }, | ||
| // Optional | ||
| "file_name": "doc.pdf" | ||
| "metadata_storage_path": "https://<account>.blob.core.windows.net/<container>/<blob_path>", | ||
| "metadata_storage_name": "document.pdf", | ||
| "metadata_storage_content_type": "application/pdf" | ||
| } | ||
pamelafox marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| ] | ||
|
|
@@ -176,45 +183,52 @@ async def process_document(data: dict[str, Any]) -> dict[str, Any]: | |
| Process a single document: download, parse, extract figures, upload images | ||
|
|
||
| Args: | ||
| data: Input data with blobUrl, fileName, contentType | ||
| data: Input data with metadata_storage_path | ||
|
|
||
| Returns: | ||
| Dictionary with 'text' (markdown) and 'images' (list of {url, description}) | ||
| """ | ||
| document_stream, file_name, content_type = get_document_stream_filedata(data) | ||
| logger.info("Processing document: %s", file_name) | ||
| if settings is None: | ||
| raise RuntimeError("Global settings not initialized") | ||
|
|
||
| # Get blob path from metadata_storage_path URL | ||
| # URL format: https://<account>.blob.core.windows.net/<container>/<blob_path> | ||
| storage_path = data["metadata_storage_path"] | ||
| parsed_url = urlparse(storage_path) | ||
| # Path is /<container>/<blob_path>, so split and take everything after container | ||
| path_parts = unquote(parsed_url.path).lstrip("/").split("/", 1) | ||
| if len(path_parts) < 2: | ||
| raise ValueError(f"Invalid storage path format: {storage_path}") | ||
pamelafox marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| blob_path_within_container = path_parts[1] # Everything after the container name | ||
pamelafox marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| logger.info("Downloading blob: %s", blob_path_within_container) | ||
| result = await settings.blob_manager.download_blob(blob_path_within_container) | ||
| if result is None: | ||
| raise ValueError(f"Blob not found: {blob_path_within_container}") | ||
|
|
||
| document_bytes, properties = result | ||
pamelafox marked this conversation as resolved.
Show resolved
Hide resolved
pamelafox marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| document_stream = io.BytesIO(document_bytes) | ||
| document_stream.name = blob_path_within_container | ||
|
|
||
| logger.info("Processing document: %s", blob_path_within_container) | ||
pamelafox marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| # Get parser from file_processors dict based on file extension | ||
| file_processor = select_processor_for_filename(file_name, settings.file_processors) | ||
| file_processor = select_processor_for_filename(blob_path_within_container, settings.file_processors) | ||
| parser = file_processor.parser | ||
|
|
||
| pages: list[Page] = [] | ||
| try: | ||
| document_stream.seek(0) | ||
| pages = [page async for page in parser.parse(content=document_stream)] | ||
| except HttpResponseError as exc: | ||
| raise ValueError(f"Parser failed for {file_name}: {exc.message}") from exc | ||
| raise ValueError(f"Parser failed for {blob_path_within_container}: {exc.message}") from exc | ||
| finally: | ||
| document_stream.close() | ||
|
|
||
| components = build_document_components(file_name, pages) | ||
| components = build_document_components(blob_path_within_container, pages) | ||
| return components | ||
|
|
||
|
|
||
pamelafox marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| def get_document_stream_filedata(data: dict[str, Any]) -> tuple[io.BytesIO, str, str]: | ||
| """Return a BytesIO stream for file_data input only (skillset must send file bytes).""" | ||
| file_payload = data.get("file_data", {}) | ||
| encoded = file_payload.get("data") | ||
| if not encoded: | ||
| raise ValueError("file_data payload missing base64 data") | ||
| document_bytes = base64.b64decode(encoded) | ||
| file_name = data.get("file_name") or data.get("fileName") or file_payload.get("name") or "document" | ||
| content_type = data.get("contentType") or file_payload.get("contentType") or "application/octet-stream" | ||
| stream = io.BytesIO(document_bytes) | ||
| stream.name = file_name | ||
| return stream, file_name, content_type | ||
|
|
||
|
|
||
| def build_document_components(file_name: str, pages: list[Page]) -> dict[str, Any]: | ||
| page_entries: list[dict[str, Any]] = [] | ||
| figure_entries: list[dict[str, Any]] = [] | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -83,9 +83,14 @@ async def parse(self, content: Any): | |
| ".pdf": FileProcessor(StubParser([page]), None), | ||
| } | ||
|
|
||
| class MockBlobManager: | ||
| async def download_blob(self, blob_path: str): | ||
| return (b"pdf-bytes", {}) | ||
|
|
||
| mock_settings = document_extractor.GlobalSettings( | ||
| file_processors=mock_file_processors, | ||
| azure_credential=object(), | ||
| blob_manager=MockBlobManager(), | ||
| ) | ||
| monkeypatch.setattr(document_extractor, "settings", mock_settings) | ||
|
|
||
|
|
@@ -94,9 +99,9 @@ async def parse(self, content: Any): | |
| { | ||
| "recordId": "record-1", | ||
| "data": { | ||
| "file_data": {"$type": "file", "data": base64.b64encode(b"pdf-bytes").decode("utf-8")}, | ||
| "file_name": "sample.pdf", | ||
| "contentType": "application/pdf", | ||
| "metadata_storage_path": "https://account.blob.core.windows.net/container/sample.pdf", | ||
|
||
| "metadata_storage_name": "sample.pdf", | ||
| "metadata_storage_content_type": "application/pdf", | ||
| }, | ||
| } | ||
| ] | ||
|
|
@@ -128,6 +133,7 @@ async def test_document_extractor_requires_single_record(monkeypatch: pytest.Mon | |
| mock_settings = document_extractor.GlobalSettings( | ||
| file_processors={".pdf": FileProcessor(None, None)}, | ||
| azure_credential=object(), | ||
| blob_manager=object(), | ||
| ) | ||
| monkeypatch.setattr(document_extractor, "settings", mock_settings) | ||
| response = await document_extractor.extract_document(build_request({"values": []})) | ||
|
|
@@ -144,6 +150,7 @@ async def failing_process(data: dict[str, Any]) -> dict[str, Any]: | |
| mock_settings = document_extractor.GlobalSettings( | ||
| file_processors={".pdf": FileProcessor(None, None)}, | ||
| azure_credential=object(), | ||
| blob_manager=object(), | ||
| ) | ||
| monkeypatch.setattr(document_extractor, "settings", mock_settings) | ||
| monkeypatch.setattr(document_extractor, "process_document", failing_process) | ||
|
|
@@ -153,9 +160,9 @@ async def failing_process(data: dict[str, Any]) -> dict[str, Any]: | |
| { | ||
| "recordId": "rec-error", | ||
| "data": { | ||
| "file_data": {"$type": "file", "data": base64.b64encode(b"pdf-bytes").decode("utf-8")}, | ||
| "file_name": "sample.pdf", | ||
| "contentType": "application/pdf", | ||
| "metadata_storage_path": "https://account.blob.core.windows.net/container/sample.pdf", | ||
| "metadata_storage_name": "sample.pdf", | ||
| "metadata_storage_content_type": "application/pdf", | ||
| }, | ||
| } | ||
| ] | ||
|
|
@@ -186,16 +193,19 @@ async def parse(self, content): | |
| ".pdf": FileProcessor(FailingParser(), None), | ||
| } | ||
|
|
||
| class MockBlobManager: | ||
| async def download_blob(self, blob_path: str): | ||
| return (b"content", {}) | ||
|
|
||
| mock_settings = document_extractor.GlobalSettings( | ||
| file_processors=mock_file_processors, | ||
| azure_credential=object(), | ||
| blob_manager=MockBlobManager(), | ||
| ) | ||
| monkeypatch.setattr(document_extractor, "settings", mock_settings) | ||
|
|
||
| data = { | ||
| "file_data": {"data": base64.b64encode(b"content").decode("utf-8")}, | ||
| "file_name": "doc.pdf", | ||
| "contentType": "application/pdf", | ||
| "metadata_storage_path": "https://account.blob.core.windows.net/container/doc.pdf", | ||
| } | ||
|
|
||
| with pytest.raises(ValueError) as exc_info: | ||
|
|
@@ -204,12 +214,16 @@ async def parse(self, content): | |
| assert "Parser failed" in str(exc_info.value) | ||
|
|
||
|
|
||
| def test_document_extractor_missing_file_data() -> None: | ||
| with pytest.raises(ValueError): | ||
| document_extractor.get_document_stream_filedata({"file_data": {}}) | ||
| def test_document_extractor_managed_identity_reload(monkeypatch: pytest.MonkeyPatch) -> None: | ||
| # Set required environment variables | ||
| monkeypatch.setenv("AZURE_STORAGE_ACCOUNT", "teststorage") | ||
| monkeypatch.setenv("AZURE_STORAGE_CONTAINER", "testcontainer") | ||
| monkeypatch.setenv("AZURE_STORAGE_RESOURCE_GROUP", "testrg") | ||
| monkeypatch.setenv("AZURE_SUBSCRIPTION_ID", "test-sub-id") | ||
|
|
||
| # Mock setup_blob_manager to avoid actual Azure calls | ||
| monkeypatch.setattr(document_extractor, "setup_blob_manager", lambda **kwargs: object()) | ||
|
|
||
| def test_document_extractor_managed_identity_reload(monkeypatch: pytest.MonkeyPatch) -> None: | ||
| monkeypatch.setenv("AZURE_CLIENT_ID", "client-123") | ||
| document_extractor.configure_global_settings() | ||
| assert isinstance(document_extractor.settings.azure_credential, document_extractor.ManagedIdentityCredential) | ||
|
|
@@ -471,9 +485,9 @@ async def test_document_extractor_without_settings(monkeypatch: pytest.MonkeyPat | |
| { | ||
| "recordId": "record-1", | ||
| "data": { | ||
| "file_data": {"$type": "file", "data": base64.b64encode(b"pdf-bytes").decode("utf-8")}, | ||
| "file_name": "sample.pdf", | ||
| "contentType": "application/pdf", | ||
| "metadata_storage_path": "https://account.blob.core.windows.net/container/sample.pdf", | ||
| "metadata_storage_name": "sample.pdf", | ||
| "metadata_storage_content_type": "application/pdf", | ||
| }, | ||
| } | ||
| ] | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.