Skip to content

Commit 25a7685

Browse files
authored
Adding the webhook doc status sample files.
Adding the webhook doc status sample files.
1 parent be6ac1e commit 25a7685

File tree

4 files changed

+250
-0
lines changed

4 files changed

+250
-0
lines changed
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
FROM --platform=linux/amd64 python:3-alpine
2+
3+
WORKDIR /app
4+
5+
COPY requirements.txt ./
6+
7+
RUN pip install --upgrade pip && \
8+
pip install -r requirements.txt && \
9+
rm requirements.txt
10+
11+
COPY main.py ./
12+
13+
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8080"]
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
# Document Processing application using Document Status webhook
2+
3+
Document Processing application that utilizes Watson Discovery collection and webhook support of Document Status API.
4+
This is just a sample application, not production code.
5+
6+
## Requirements
7+
- Instance of Watson Discovery Plus/Enterprise plan on IBM Cloud.
8+
9+
## Setup Instructions
10+
11+
### Deploy the document processing application to Code Engine
12+
In this tutorial, we will use [IBM Cloud Code Engine](https://www.ibm.com/cloud/code-engine) as the infrastructure for the application of document processing which receives the document processing status events. Of course, you can deploy the application in any environment you like.
13+
14+
1. [Create a project](https://cloud.ibm.com/docs/codeengine?topic=codeengine-manage-project#create-a-project) of Code Engine.
15+
2. [Deploy the application](https://cloud.ibm.com/docs/codeengine?topic=codeengine-app-source-code) from this repository source code.
16+
- In **Create application**, click **Specify build details** and enter the following:
17+
- Source
18+
- Code repo URL: `https://github.com/watson-developer-cloud/doc-tutorial-downloads`
19+
- Code repo access: `None`
20+
- Branch name: `master`
21+
- Context directory: `discovery-data/webhook-doc-status-sample`
22+
- Strategy
23+
- Strategy: `Dockerfile`
24+
- Output
25+
- Enter your container image registry information.
26+
- Set **Min number of instances** and **Max number of instances** to `1`.
27+
3. [Add service binding](https://cloud.ibm.com/docs/codeengine?topic=codeengine-bind-services) to the application.
28+
- In **IBM Cloud service instance**, specify the service instance of Watson Discovery Plus/Enterprise plan on IBM Cloud
29+
4. Confirm that the application status changes to **Ready**.
30+
31+
### Configure Discovery collection
32+
1. Create a project.
33+
2. Create a collection in the project and apply the document status webhook to the collection. `{webhook-doc-status-sample-url}` is URL to the deployed application.
34+
```sh
35+
curl -X POST {auth} \
36+
'{url}/v2/projects/{project_id}/collections?version=2023-03-31' \
37+
--header 'Content-Type: application/json' \
38+
--data-raw '{
39+
"name":"DocProc App",
40+
"webhooks": {
41+
"document_status": [
42+
{
43+
"url": "{webhook-doc-status-sample-url}/webhook"
44+
}
45+
]
46+
}
47+
}'
48+
```
49+
50+
### Process documents
51+
Process a document and return it for realtime use.
52+
The file is stored in the collection and is processed according to the collection's configuration settings. To remove the processed documents in the collection, you need to remove them manually via Tooling or API.
53+
54+
Example:
55+
56+
```sh
57+
curl -X POST \
58+
'{webhook-doc-status-sample-url}/projects/{project_id}/collections/{collection_id}/extract' \
59+
-H 'accept: application/json' \
60+
-H 'Content-Type: multipart/form-data' \
61+
-F 'file=@sample.pdf;type=application/pdf'
62+
```

webhook-doc-status-sample/main.py

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
from dataclasses import dataclass
2+
import json
3+
from asyncio import Future
4+
import asyncio
5+
from typing import Any, BinaryIO, Dict
6+
import logging
7+
8+
from fastapi import FastAPI, Request, UploadFile
9+
from fastapi.responses import JSONResponse
10+
11+
from ibm_watson import DiscoveryV2
12+
from ibm_watson.discovery_v2 import QueryLargePassages
13+
14+
logger = logging.getLogger(__name__)
15+
logging.basicConfig(level=logging.INFO)
16+
17+
app = FastAPI()
18+
19+
# in-memory store for mapping (project_id, collection_id, document_id) to Future object
20+
docproc_requests: dict[(str, str, str), Future] = {}
21+
22+
discovery = DiscoveryV2(version='2023-03-31')
23+
24+
25+
@app.post("/webhook")
26+
async def webhook(
27+
request: Request,
28+
):
29+
status_code = 200
30+
try:
31+
body = await request.json()
32+
except json.decoder.JSONDecodeError:
33+
content = await request.body()
34+
body = f"Invalid JSON or no body. Body was: {str(content)}"
35+
status_code = 400
36+
if status_code == 200:
37+
event = body["event"]
38+
response_body: dict[str, Any] = {}
39+
if event == "ping":
40+
response_body["accepted"] = True
41+
elif event == "document.status":
42+
data = body["data"]
43+
project_id = data['project_id']
44+
collection_id = data['collection_id']
45+
status = data["status"]
46+
if status in set(["available", "failed"]):
47+
for document_id in data["document_ids"]:
48+
# resume the suspended document processing request
49+
notify_document_completion_status(project_id, collection_id, document_id, status)
50+
response_body["accepted"] = True
51+
else:
52+
status_code = 400
53+
return JSONResponse(content=response_body, status_code=status_code)
54+
55+
56+
@app.post("/projects/{project_id}/collections/{collection_id}/extract")
57+
async def post_and_extraction(
58+
project_id: str,
59+
collection_id: str,
60+
file: UploadFile
61+
):
62+
# Ingest the received document into the underlying Discovery project/collection
63+
logger.info(f'using project/collection {project_id}/{collection_id}')
64+
document_id = add_document(project_id, collection_id, file.file, file.filename)
65+
66+
# Wait until the ingested document become available
67+
logger.info(f'waiting for {document_id} become available')
68+
available = await wait_document_completion(project_id, collection_id, document_id)
69+
70+
# Retrieve the processed document
71+
logger.info(f'{document_id} is available:{available}')
72+
document = get_document(project_id, collection_id, document_id)
73+
return JSONResponse(content=document)
74+
75+
76+
def add_document(
77+
project_id: str,
78+
collection_id: str,
79+
file: BinaryIO,
80+
filename: Any
81+
):
82+
response = discovery.add_document(project_id, collection_id, file=file, filename=filename)
83+
document_id = response.get_result()['document_id']
84+
return document_id
85+
86+
87+
def get_document(
88+
project_id: str,
89+
collection_id: str,
90+
document_id: str,
91+
):
92+
response = discovery.query(project_id=project_id, collection_ids=[collection_id], filter=f'document_id::{document_id}', passages=QueryLargePassages(enabled=False))
93+
document = response.get_result()['results'][0]
94+
return document
95+
96+
97+
async def wait_document_completion(
98+
project_id: str,
99+
collection_id: str,
100+
document_id: str,
101+
):
102+
global docproc_requests
103+
docproc_request = Future()
104+
key = (project_id, collection_id, document_id)
105+
docproc_requests[key] = docproc_request
106+
107+
# Start a background task to pull the processing status periodically when the collection is not configured with document status webhook
108+
if not is_webhook_status_enabled(project_id, collection_id):
109+
asyncio.create_task(wait_document_available(project_id, collection_id, document_id))
110+
111+
# Wait until the document become available or failed
112+
status = await docproc_request
113+
return status == "available"
114+
115+
116+
def is_webhook_status_enabled(
117+
project_id: str,
118+
collection_id: str
119+
):
120+
webhook = discovery.get_collection(project_id, collection_id).get_result().get('webhooks')
121+
return (webhook is not None) and ('document_status' in webhook)
122+
123+
124+
async def wait_document_available(
125+
project_id: str,
126+
collection_id: str,
127+
document_id: str
128+
):
129+
# Pull the document processing status periodically (1 sec interval) and wait until the completion
130+
while(discovery.list_documents(
131+
project_id,
132+
collection_id,
133+
parent_document_id=document_id,
134+
count=0,
135+
status='pending,processing'
136+
).get_result()['matching_results'] != 0
137+
):
138+
await asyncio.sleep(1)
139+
140+
# Retrieve the document processing status
141+
status = discovery.get_document(
142+
project_id,
143+
collection_id,
144+
document_id
145+
).get_result()['status']
146+
147+
# Then, notify it
148+
notify_document_completion_status(
149+
project_id,
150+
collection_id,
151+
document_id,
152+
status
153+
)
154+
155+
156+
def notify_document_completion_status(
157+
project_id: str,
158+
collection_id: str,
159+
document_id: str,
160+
status: str
161+
):
162+
global docproc_requests
163+
key = (project_id, collection_id, document_id)
164+
docproc_request = docproc_requests.get(key)
165+
if docproc_request:
166+
docproc_request.set_result(status)
167+
docproc_requests.pop(key)
168+
169+
170+
171+
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
fastapi>=0.110.0,<0.111.0
2+
uvicorn>=0.27.0,<0.28.0
3+
python-multipart>=0.0.9,<0.0.10
4+
ibm-watson>=8.0.0

0 commit comments

Comments
 (0)