33Custom skill for Azure AI Search that extracts and processes document content.
44"""
55
6- import base64
76import io
87import json
98import logging
109import os
1110from dataclasses import dataclass
1211from typing import Any
12+ from urllib .parse import unquote , urlparse
1313
1414import azure .functions as func
1515from azure .core .exceptions import HttpResponseError
1616from azure .identity .aio import ManagedIdentityCredential
1717
18+ from prepdocslib .blobmanager import BlobManager
1819from prepdocslib .fileprocessor import FileProcessor
1920from prepdocslib .page import Page
2021from prepdocslib .servicesetup import (
2122 build_file_processors ,
2223 select_processor_for_filename ,
24+ setup_blob_manager ,
2325)
2426
2527app = func .FunctionApp (http_auth_level = func .AuthLevel .ANONYMOUS )
3133class GlobalSettings :
3234 file_processors : dict [str , FileProcessor ]
3335 azure_credential : ManagedIdentityCredential
36+ blob_manager : BlobManager
3437
3538
3639settings : GlobalSettings | None = None
@@ -63,9 +66,18 @@ def configure_global_settings():
6366 process_figures = use_multimodal ,
6467 )
6568
69+ blob_manager = setup_blob_manager (
70+ azure_credential = azure_credential ,
71+ storage_account = os .environ ["AZURE_STORAGE_ACCOUNT" ],
72+ storage_container = os .environ ["AZURE_STORAGE_CONTAINER" ],
73+ storage_resource_group = os .environ ["AZURE_STORAGE_RESOURCE_GROUP" ],
74+ subscription_id = os .environ ["AZURE_SUBSCRIPTION_ID" ],
75+ )
76+
6677 settings = GlobalSettings (
6778 file_processors = file_processors ,
6879 azure_credential = azure_credential ,
80+ blob_manager = blob_manager ,
6981 )
7082
7183
@@ -75,20 +87,15 @@ async def extract_document(req: func.HttpRequest) -> func.HttpResponse:
7587 """
7688 Azure Search Custom Skill: Extract document content
7789
78- Input format (single record; file data only):
79- # https://learn.microsoft.com/azure/search/cognitive-search-skill-document-intelligence-layout#skill-inputs
90+ Input format (single record):
8091 {
8192 "values": [
8293 {
8394 "recordId": "1",
8495 "data": {
85- // Base64 encoded file (skillset must enable file data)
86- "file_data": {
87- "$type": "file",
88- "data": "base64..."
89- },
90- // Optional
91- "file_name": "doc.pdf"
96+ "metadata_storage_path": "https://<account>.blob.core.windows.net/<container>/<blob_path>",
97+ "metadata_storage_name": "document.pdf",
98+ "metadata_storage_content_type": "application/pdf"
9299 }
93100 }
94101 ]
@@ -176,45 +183,52 @@ async def process_document(data: dict[str, Any]) -> dict[str, Any]:
176183 Process a single document: download, parse, extract figures, upload images
177184
178185 Args:
179- data: Input data with blobUrl, fileName, contentType
186+ data: Input data with metadata_storage_path
180187
181188 Returns:
182189 Dictionary with 'text' (markdown) and 'images' (list of {url, description})
183190 """
184- document_stream , file_name , content_type = get_document_stream_filedata (data )
185- logger .info ("Processing document: %s" , file_name )
191+ if settings is None :
192+ raise RuntimeError ("Global settings not initialized" )
193+
194+ # Get blob path from metadata_storage_path URL
195+ # URL format: https://<account>.blob.core.windows.net/<container>/<blob_path>
196+ storage_path = data ["metadata_storage_path" ]
197+ parsed_url = urlparse (storage_path )
198+ # Path is /<container>/<blob_path>, so split and take everything after container
199+ path_parts = unquote (parsed_url .path ).lstrip ("/" ).split ("/" , 1 )
200+ if len (path_parts ) < 2 :
201+ raise ValueError (f"Invalid storage path format: { storage_path } " )
202+ blob_path_within_container = path_parts [1 ] # Everything after the container name
203+
204+ logger .info ("Downloading blob: %s" , blob_path_within_container )
205+ result = await settings .blob_manager .download_blob (blob_path_within_container )
206+ if result is None :
207+ raise ValueError (f"Blob not found: { blob_path_within_container } " )
208+
209+ document_bytes , properties = result
210+ document_stream = io .BytesIO (document_bytes )
211+ document_stream .name = blob_path_within_container
212+
213+ logger .info ("Processing document: %s" , blob_path_within_container )
186214
187215 # Get parser from file_processors dict based on file extension
188- file_processor = select_processor_for_filename (file_name , settings .file_processors )
216+ file_processor = select_processor_for_filename (blob_path_within_container , settings .file_processors )
189217 parser = file_processor .parser
190218
191219 pages : list [Page ] = []
192220 try :
193221 document_stream .seek (0 )
194222 pages = [page async for page in parser .parse (content = document_stream )]
195223 except HttpResponseError as exc :
196- raise ValueError (f"Parser failed for { file_name } : { exc .message } " ) from exc
224+ raise ValueError (f"Parser failed for { blob_path_within_container } : { exc .message } " ) from exc
197225 finally :
198226 document_stream .close ()
199227
200- components = build_document_components (file_name , pages )
228+ components = build_document_components (blob_path_within_container , pages )
201229 return components
202230
203231
204- def get_document_stream_filedata (data : dict [str , Any ]) -> tuple [io .BytesIO , str , str ]:
205- """Return a BytesIO stream for file_data input only (skillset must send file bytes)."""
206- file_payload = data .get ("file_data" , {})
207- encoded = file_payload .get ("data" )
208- if not encoded :
209- raise ValueError ("file_data payload missing base64 data" )
210- document_bytes = base64 .b64decode (encoded )
211- file_name = data .get ("file_name" ) or data .get ("fileName" ) or file_payload .get ("name" ) or "document"
212- content_type = data .get ("contentType" ) or file_payload .get ("contentType" ) or "application/octet-stream"
213- stream = io .BytesIO (document_bytes )
214- stream .name = file_name
215- return stream , file_name , content_type
216-
217-
218232def build_document_components (file_name : str , pages : list [Page ]) -> dict [str , Any ]:
219233 page_entries : list [dict [str , Any ]] = []
220234 figure_entries : list [dict [str , Any ]] = []
0 commit comments