Skip to content

Commit 35a7885

Browse files
authored
Change cloud ingestion to download blobs based off path versus file data (#2858)
* Move cloud functions to use blob download based off path * Restore comments for PR * Restore comments for PR * Fix the tests * Remove requirements.txt from git since they are manually opied * Remove unused fields * Fix SaS to SAS
1 parent e7fb877 commit 35a7885

File tree

8 files changed

+73
-1419
lines changed

8 files changed

+73
-1419
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ node_modules
149149
static/
150150

151151
app/functions/*/prepdocslib/
152+
app/functions/*/requirements.txt
152153

153154
data/**/*.md5
154155

app/backend/prepdocslib/cloudingestionstrategy.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -161,10 +161,9 @@ def _build_skillset(self) -> SearchIndexerSkillset:
161161
resource_id=self.search_user_assigned_identity_resource_id
162162
),
163163
inputs=[
164-
# Provide the binary payload expected by the document extractor custom skill.
165-
InputFieldMappingEntry(name="file_data", source="/document/file_data"),
166-
InputFieldMappingEntry(name="file_name", source="/document/metadata_storage_name"),
167-
InputFieldMappingEntry(name="content_type", source="/document/metadata_storage_content_type"),
164+
# Always provide the blob URL so the function can download large files (> 16MB)
165+
InputFieldMappingEntry(name="metadata_storage_path", source="/document/metadata_storage_path"),
166+
# We are not using the SAS token since the functions have RBAC access via managed identity
168167
],
169168
outputs=[
170169
OutputFieldMappingEntry(name="pages", target_name="pages"),
@@ -310,7 +309,7 @@ async def setup(self) -> None:
310309
configuration=IndexingParametersConfiguration(
311310
query_timeout=None, # type: ignore
312311
data_to_extract="storageMetadata",
313-
allow_skillset_to_read_file_data=True,
312+
allow_skillset_to_read_file_data=False,
314313
)
315314
),
316315
)

app/functions/document_extractor/function_app.py

Lines changed: 42 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,25 @@
33
Custom skill for Azure AI Search that extracts and processes document content.
44
"""
55

6-
import base64
76
import io
87
import json
98
import logging
109
import os
1110
from dataclasses import dataclass
1211
from typing import Any
12+
from urllib.parse import unquote, urlparse
1313

1414
import azure.functions as func
1515
from azure.core.exceptions import HttpResponseError
1616
from azure.identity.aio import ManagedIdentityCredential
1717

18+
from prepdocslib.blobmanager import BlobManager
1819
from prepdocslib.fileprocessor import FileProcessor
1920
from prepdocslib.page import Page
2021
from prepdocslib.servicesetup import (
2122
build_file_processors,
2223
select_processor_for_filename,
24+
setup_blob_manager,
2325
)
2426

2527
app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)
@@ -31,6 +33,7 @@
3133
class GlobalSettings:
3234
file_processors: dict[str, FileProcessor]
3335
azure_credential: ManagedIdentityCredential
36+
blob_manager: BlobManager
3437

3538

3639
settings: GlobalSettings | None = None
@@ -63,9 +66,18 @@ def configure_global_settings():
6366
process_figures=use_multimodal,
6467
)
6568

69+
blob_manager = setup_blob_manager(
70+
azure_credential=azure_credential,
71+
storage_account=os.environ["AZURE_STORAGE_ACCOUNT"],
72+
storage_container=os.environ["AZURE_STORAGE_CONTAINER"],
73+
storage_resource_group=os.environ["AZURE_STORAGE_RESOURCE_GROUP"],
74+
subscription_id=os.environ["AZURE_SUBSCRIPTION_ID"],
75+
)
76+
6677
settings = GlobalSettings(
6778
file_processors=file_processors,
6879
azure_credential=azure_credential,
80+
blob_manager=blob_manager,
6981
)
7082

7183

@@ -75,20 +87,13 @@ async def extract_document(req: func.HttpRequest) -> func.HttpResponse:
7587
"""
7688
Azure Search Custom Skill: Extract document content
7789
78-
Input format (single record; file data only):
79-
# https://learn.microsoft.com/azure/search/cognitive-search-skill-document-intelligence-layout#skill-inputs
90+
Input format (single record):
8091
{
8192
"values": [
8293
{
8394
"recordId": "1",
8495
"data": {
85-
// Base64 encoded file (skillset must enable file data)
86-
"file_data": {
87-
"$type": "file",
88-
"data": "base64..."
89-
},
90-
// Optional
91-
"file_name": "doc.pdf"
96+
"metadata_storage_path": "https://<account>.blob.core.windows.net/<container>/<blob_path>"
9297
}
9398
}
9499
]
@@ -176,45 +181,52 @@ async def process_document(data: dict[str, Any]) -> dict[str, Any]:
176181
Process a single document: download, parse, extract figures, upload images
177182
178183
Args:
179-
data: Input data with blobUrl, fileName, contentType
184+
data: Input data with metadata_storage_path
180185
181186
Returns:
182187
Dictionary with 'text' (markdown) and 'images' (list of {url, description})
183188
"""
184-
document_stream, file_name, content_type = get_document_stream_filedata(data)
185-
logger.info("Processing document: %s", file_name)
189+
if settings is None:
190+
raise RuntimeError("Global settings not initialized")
191+
192+
# Get blob path from metadata_storage_path URL
193+
# URL format: https://<account>.blob.core.windows.net/<container>/<blob_path>
194+
storage_path = data["metadata_storage_path"]
195+
parsed_url = urlparse(storage_path)
196+
# Path is /<container>/<blob_path>, so split and take everything after container
197+
path_parts = unquote(parsed_url.path).lstrip("/").split("/", 1)
198+
if len(path_parts) < 2:
199+
raise ValueError(f"Invalid storage path format: {storage_path}")
200+
blob_path_within_container = path_parts[1] # Everything after the container name
201+
202+
logger.info("Downloading blob: %s", blob_path_within_container)
203+
result = await settings.blob_manager.download_blob(blob_path_within_container)
204+
if result is None:
205+
raise ValueError(f"Blob not found: {blob_path_within_container}")
206+
207+
document_bytes, properties = result
208+
document_stream = io.BytesIO(document_bytes)
209+
document_stream.name = blob_path_within_container
210+
211+
logger.info("Processing document: %s", blob_path_within_container)
186212

187213
# Get parser from file_processors dict based on file extension
188-
file_processor = select_processor_for_filename(file_name, settings.file_processors)
214+
file_processor = select_processor_for_filename(blob_path_within_container, settings.file_processors)
189215
parser = file_processor.parser
190216

191217
pages: list[Page] = []
192218
try:
193219
document_stream.seek(0)
194220
pages = [page async for page in parser.parse(content=document_stream)]
195221
except HttpResponseError as exc:
196-
raise ValueError(f"Parser failed for {file_name}: {exc.message}") from exc
222+
raise ValueError(f"Parser failed for {blob_path_within_container}: {exc.message}") from exc
197223
finally:
198224
document_stream.close()
199225

200-
components = build_document_components(file_name, pages)
226+
components = build_document_components(blob_path_within_container, pages)
201227
return components
202228

203229

204-
def get_document_stream_filedata(data: dict[str, Any]) -> tuple[io.BytesIO, str, str]:
205-
"""Return a BytesIO stream for file_data input only (skillset must send file bytes)."""
206-
file_payload = data.get("file_data", {})
207-
encoded = file_payload.get("data")
208-
if not encoded:
209-
raise ValueError("file_data payload missing base64 data")
210-
document_bytes = base64.b64decode(encoded)
211-
file_name = data.get("file_name") or data.get("fileName") or file_payload.get("name") or "document"
212-
content_type = data.get("contentType") or file_payload.get("contentType") or "application/octet-stream"
213-
stream = io.BytesIO(document_bytes)
214-
stream.name = file_name
215-
return stream, file_name, content_type
216-
217-
218230
def build_document_components(file_name: str, pages: list[Page]) -> dict[str, Any]:
219231
page_entries: list[dict[str, Any]] = []
220232
figure_entries: list[dict[str, Any]] = []

0 commit comments

Comments
 (0)