Skip to content

Commit acf2264

Browse files
author
Rakshith Bhyravabhotla
authored
CloudEvents/EG Events must recognize magic events (Azure#19922)
1 parent ae6d87b commit acf2264

File tree

12 files changed

+656
-11
lines changed

12 files changed

+656
-11
lines changed

sdk/core/azure-core/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
### Features Added
66

77
- Cut hard dependency on requests library
8+
- Added a `from_json` method which now accepts storage QueueMessage, eventhub's EventData or ServiceBusMessage or simply json bytes to return a `CloudEvent`
89

910
### Fixed
1011

sdk/core/azure-core/azure/core/messaging.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from base64 import b64decode
99
from datetime import datetime
1010
from .utils._utils import _convert_to_isoformat, TZ_UTC
11+
from .utils._messaging_shared import _get_json_content
1112
from .serialization import NULL
1213

1314
try:
@@ -181,3 +182,17 @@ def from_dict(cls, event):
181182
" The `source` and `type` params are required."
182183
)
183184
return event_obj
185+
186+
@classmethod
187+
def from_json(cls, event):
188+
# type: (Any) -> CloudEvent
189+
"""
190+
Returns the deserialized CloudEvent object when a json payload is provided.
191+
:param event: The json string that should be converted into a CloudEvent. This can also be
192+
a storage QueueMessage, eventhub's EventData or ServiceBusMessage
193+
:type event: object
194+
:rtype: CloudEvent
195+
:raises ValueError: If the provided JSON is invalid.
196+
"""
197+
dict_event = _get_json_content(event)
198+
return CloudEvent.from_dict(dict_event)
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# coding=utf-8
2+
# --------------------------------------------------------------------------
3+
# Copyright (c) Microsoft Corporation. All rights reserved.
4+
# Licensed under the MIT License. See License.txt in the project root for
5+
# license information.
6+
# --------------------------------------------------------------------------
7+
8+
# ==========================================================================
9+
# This file contains duplicate code that is shared with azure-eventgrid.
10+
# Both the files should always be identical.
11+
# ==========================================================================
12+
13+
14+
15+
import json
16+
from azure.core.exceptions import raise_with_traceback
17+
18+
def _get_json_content(obj):
19+
"""Event mixin to have methods that are common to different Event types
20+
like CloudEvent, EventGridEvent etc.
21+
"""
22+
msg = "Failed to load JSON content from the object."
23+
try:
24+
# storage queue
25+
return json.loads(obj.content)
26+
except ValueError as err:
27+
raise_with_traceback(ValueError, msg, err)
28+
except AttributeError:
29+
# eventhubs
30+
try:
31+
return json.loads(next(obj.body))[0]
32+
except KeyError:
33+
# servicebus
34+
return json.loads(next(obj.body))
35+
except ValueError as err:
36+
raise_with_traceback(ValueError, msg, err)
37+
except: # pylint: disable=bare-except
38+
try:
39+
return json.loads(obj)
40+
except ValueError as err:
41+
raise_with_traceback(ValueError, msg, err)

sdk/core/azure-core/tests/test_messaging_cloud_event.py

Lines changed: 262 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,94 @@
33
# Licensed under the MIT License.
44
# ------------------------------------
55
import pytest
6-
import json
6+
import uuid
77
import datetime
88

99
from azure.core.messaging import CloudEvent
1010
from azure.core.utils._utils import _convert_to_isoformat
11+
from azure.core.utils._messaging_shared import _get_json_content
1112
from azure.core.serialization import NULL
1213

14+
class MockQueueMessage(object):
15+
def __init__(self, content=None):
16+
self.id = uuid.uuid4()
17+
self.inserted_on = datetime.datetime.now()
18+
self.expires_on = datetime.datetime.now() + datetime.timedelta(days=100)
19+
self.dequeue_count = 1
20+
self.content = content
21+
self.pop_receipt = None
22+
self.next_visible_on = None
23+
24+
class MockServiceBusReceivedMessage(object):
25+
def __init__(self, body=None, **kwargs):
26+
self.body=body
27+
self.application_properties=None
28+
self.session_id=None
29+
self.message_id='3f6c5441-5be5-4f33-80c3-3ffeb6a090ce'
30+
self.content_type='application/cloudevents+json; charset=utf-8'
31+
self.correlation_id=None
32+
self.to=None
33+
self.reply_to=None
34+
self.reply_to_session_id=None
35+
self.subject=None
36+
self.time_to_live=datetime.timedelta(days=14)
37+
self.partition_key=None
38+
self.scheduled_enqueue_time_utc=None
39+
self.auto_renew_error=None,
40+
self.dead_letter_error_description=None
41+
self.dead_letter_reason=None
42+
self.dead_letter_source=None
43+
self.delivery_count=13
44+
self.enqueued_sequence_number=0
45+
self.enqueued_time_utc=datetime.datetime(2021, 7, 22, 22, 27, 41, 236000)
46+
self.expires_at_utc=datetime.datetime(2021, 8, 5, 22, 27, 41, 236000)
47+
self.sequence_number=11219
48+
self.lock_token='233146e3-d5a6-45eb-826f-691d82fb8b13'
49+
50+
class MockEventhubData(object):
51+
def __init__(self, body=None):
52+
self._last_enqueued_event_properties = {}
53+
self._sys_properties = None
54+
if body is None:
55+
raise ValueError("EventData cannot be None.")
56+
57+
# Internal usage only for transforming AmqpAnnotatedMessage to outgoing EventData
58+
self.body=body
59+
self._raw_amqp_message = "some amqp data"
60+
self.message_id = None
61+
self.content_type = None
62+
self.correlation_id = None
63+
64+
65+
class MockBody(object):
66+
def __init__(self, data=None):
67+
self.data = data
68+
69+
def __iter__(self):
70+
return self
71+
72+
def __next__(self):
73+
if not self.data:
74+
return """{"id":"f208feff-099b-4bda-a341-4afd0fa02fef","source":"https://egsample.dev/sampleevent","data":"ServiceBus","type":"Azure.Sdk.Sample","time":"2021-07-22T22:27:38.960209Z","specversion":"1.0"}"""
75+
return self.data
76+
77+
next = __next__
78+
79+
class MockEhBody(object):
80+
def __init__(self, data=None):
81+
self.data = data
82+
83+
def __iter__(self):
84+
return self
85+
86+
def __next__(self):
87+
if not self.data:
88+
return b'[{"id":"f208feff-099b-4bda-a341-4afd0fa02fef","source":"https://egsample.dev/sampleevent","data":"Eventhub","type":"Azure.Sdk.Sample","time":"2021-07-22T22:27:38.960209Z","specversion":"1.0"}]'
89+
return self.data
90+
91+
next = __next__
92+
93+
1394
# Cloud Event tests
1495
def test_cloud_event_constructor():
1596
event = CloudEvent(
@@ -469,4 +550,183 @@ def test_wrong_schema_raises_no_type():
469550
"specversion":"1.0",
470551
}
471552
with pytest.raises(ValueError, match="The event does not conform to the cloud event spec https://github.com/cloudevents/spec. The `source` and `type` params are required."):
472-
CloudEvent.from_dict(cloud_custom_dict)
553+
CloudEvent.from_dict(cloud_custom_dict)
554+
555+
def test_get_bytes_storage_queue():
556+
cloud_storage_dict = """{
557+
"id":"a0517898-9fa4-4e70-b4a3-afda1dd68672",
558+
"source":"/subscriptions/{subscription-id}/resourceGroups/{resource-group}/providers/Microsoft.Storage/storageAccounts/{storage-account}",
559+
"data":{
560+
"api":"PutBlockList",
561+
"client_request_id":"6d79dbfb-0e37-4fc4-981f-442c9ca65760",
562+
"request_id":"831e1650-001e-001b-66ab-eeb76e000000",
563+
"e_tag":"0x8D4BCC2E4835CD0",
564+
"content_type":"application/octet-stream",
565+
"content_length":524288,
566+
"blob_type":"BlockBlob",
567+
"url":"https://oc2d2817345i60006.blob.core.windows.net/oc2d2817345i200097container/oc2d2817345i20002296blob",
568+
"sequencer":"00000000000004420000000000028963",
569+
"storage_diagnostics":{"batchId":"b68529f3-68cd-4744-baa4-3c0498ec19f0"}
570+
},
571+
"type":"Microsoft.Storage.BlobCreated",
572+
"time":"2021-02-18T20:18:10.581147898Z",
573+
"specversion":"1.0"
574+
}"""
575+
obj = MockQueueMessage(content=cloud_storage_dict)
576+
577+
dict = _get_json_content(obj)
578+
assert dict.get('data') == {
579+
"api":"PutBlockList",
580+
"client_request_id":"6d79dbfb-0e37-4fc4-981f-442c9ca65760",
581+
"request_id":"831e1650-001e-001b-66ab-eeb76e000000",
582+
"e_tag":"0x8D4BCC2E4835CD0",
583+
"content_type":"application/octet-stream",
584+
"content_length":524288,
585+
"blob_type":"BlockBlob",
586+
"url":"https://oc2d2817345i60006.blob.core.windows.net/oc2d2817345i200097container/oc2d2817345i20002296blob",
587+
"sequencer":"00000000000004420000000000028963",
588+
"storage_diagnostics":{"batchId":"b68529f3-68cd-4744-baa4-3c0498ec19f0"}
589+
}
590+
assert dict.get('specversion') == "1.0"
591+
592+
def test_get_bytes_storage_queue_wrong_content():
593+
cloud_storage_string = u'This is a random string which must fail'
594+
obj = MockQueueMessage(content=cloud_storage_string)
595+
596+
with pytest.raises(ValueError, match="Failed to load JSON content from the object."):
597+
_get_json_content(obj)
598+
599+
def test_get_bytes_servicebus():
600+
obj = MockServiceBusReceivedMessage(
601+
body=MockBody(),
602+
message_id='3f6c5441-5be5-4f33-80c3-3ffeb6a090ce',
603+
content_type='application/cloudevents+json; charset=utf-8',
604+
time_to_live=datetime.timedelta(days=14),
605+
delivery_count=13,
606+
enqueued_sequence_number=0,
607+
enqueued_time_utc=datetime.datetime(2021, 7, 22, 22, 27, 41, 236000),
608+
expires_at_utc=datetime.datetime(2021, 8, 5, 22, 27, 41, 236000),
609+
sequence_number=11219,
610+
lock_token='233146e3-d5a6-45eb-826f-691d82fb8b13'
611+
)
612+
dict = _get_json_content(obj)
613+
assert dict.get('data') == "ServiceBus"
614+
assert dict.get('specversion') == '1.0'
615+
616+
def test_get_bytes_servicebus_wrong_content():
617+
obj = MockServiceBusReceivedMessage(
618+
body=MockBody(data="random string"),
619+
message_id='3f6c5441-5be5-4f33-80c3-3ffeb6a090ce',
620+
content_type='application/json; charset=utf-8',
621+
time_to_live=datetime.timedelta(days=14),
622+
delivery_count=13,
623+
enqueued_sequence_number=0,
624+
enqueued_time_utc=datetime.datetime(2021, 7, 22, 22, 27, 41, 236000),
625+
expires_at_utc=datetime.datetime(2021, 8, 5, 22, 27, 41, 236000),
626+
sequence_number=11219,
627+
lock_token='233146e3-d5a6-45eb-826f-691d82fb8b13'
628+
)
629+
630+
with pytest.raises(ValueError, match="Failed to load JSON content from the object."):
631+
_get_json_content(obj)
632+
633+
def test_get_bytes_eventhubs():
634+
obj = MockEventhubData(
635+
body=MockEhBody()
636+
)
637+
dict = _get_json_content(obj)
638+
assert dict.get('data') == 'Eventhub'
639+
assert dict.get('specversion') == '1.0'
640+
641+
def test_get_bytes_eventhubs_wrong_content():
642+
obj = MockEventhubData(
643+
body=MockEhBody(data='random string')
644+
)
645+
646+
with pytest.raises(ValueError, match="Failed to load JSON content from the object."):
647+
dict = _get_json_content(obj)
648+
649+
def test_get_bytes_random_obj():
650+
json_str = '{"id": "de0fd76c-4ef4-4dfb-ab3a-8f24a307e033", "source": "https://egtest.dev/cloudcustomevent", "data": {"team": "event grid squad"}, "type": "Azure.Sdk.Sample", "time": "2020-08-07T02:06:08.11969Z", "specversion": "1.0"}'
651+
random_obj = {
652+
"id":"de0fd76c-4ef4-4dfb-ab3a-8f24a307e033",
653+
"source":"https://egtest.dev/cloudcustomevent",
654+
"data":{"team": "event grid squad"},
655+
"type":"Azure.Sdk.Sample",
656+
"time":"2020-08-07T02:06:08.11969Z",
657+
"specversion":"1.0"
658+
}
659+
660+
assert _get_json_content(json_str) == random_obj
661+
662+
def test_from_json_sb():
663+
obj = MockServiceBusReceivedMessage(
664+
body=MockBody(),
665+
message_id='3f6c5441-5be5-4f33-80c3-3ffeb6a090ce',
666+
content_type='application/cloudevents+json; charset=utf-8',
667+
time_to_live=datetime.timedelta(days=14),
668+
delivery_count=13,
669+
enqueued_sequence_number=0,
670+
enqueued_time_utc=datetime.datetime(2021, 7, 22, 22, 27, 41, 236000),
671+
expires_at_utc=datetime.datetime(2021, 8, 5, 22, 27, 41, 236000),
672+
sequence_number=11219,
673+
lock_token='233146e3-d5a6-45eb-826f-691d82fb8b13'
674+
)
675+
event = CloudEvent.from_json(obj)
676+
677+
assert event.id == "f208feff-099b-4bda-a341-4afd0fa02fef"
678+
assert event.data == "ServiceBus"
679+
680+
def test_from_json_eh():
681+
obj = MockEventhubData(
682+
body=MockEhBody()
683+
)
684+
event = CloudEvent.from_json(obj)
685+
assert event.id == "f208feff-099b-4bda-a341-4afd0fa02fef"
686+
assert event.data == "Eventhub"
687+
688+
def test_from_json_storage():
689+
cloud_storage_dict = """{
690+
"id":"a0517898-9fa4-4e70-b4a3-afda1dd68672",
691+
"source":"/subscriptions/{subscription-id}/resourceGroups/{resource-group}/providers/Microsoft.Storage/storageAccounts/{storage-account}",
692+
"data":{
693+
"api":"PutBlockList",
694+
"client_request_id":"6d79dbfb-0e37-4fc4-981f-442c9ca65760",
695+
"request_id":"831e1650-001e-001b-66ab-eeb76e000000",
696+
"e_tag":"0x8D4BCC2E4835CD0",
697+
"content_type":"application/octet-stream",
698+
"content_length":524288,
699+
"blob_type":"BlockBlob",
700+
"url":"https://oc2d2817345i60006.blob.core.windows.net/oc2d2817345i200097container/oc2d2817345i20002296blob",
701+
"sequencer":"00000000000004420000000000028963",
702+
"storage_diagnostics":{"batchId":"b68529f3-68cd-4744-baa4-3c0498ec19f0"}
703+
},
704+
"type":"Microsoft.Storage.BlobCreated",
705+
"time":"2021-02-18T20:18:10.581147898Z",
706+
"specversion":"1.0"
707+
}"""
708+
obj = MockQueueMessage(content=cloud_storage_dict)
709+
event = CloudEvent.from_json(obj)
710+
assert event.data == {
711+
"api":"PutBlockList",
712+
"client_request_id":"6d79dbfb-0e37-4fc4-981f-442c9ca65760",
713+
"request_id":"831e1650-001e-001b-66ab-eeb76e000000",
714+
"e_tag":"0x8D4BCC2E4835CD0",
715+
"content_type":"application/octet-stream",
716+
"content_length":524288,
717+
"blob_type":"BlockBlob",
718+
"url":"https://oc2d2817345i60006.blob.core.windows.net/oc2d2817345i200097container/oc2d2817345i20002296blob",
719+
"sequencer":"00000000000004420000000000028963",
720+
"storage_diagnostics":{"batchId":"b68529f3-68cd-4744-baa4-3c0498ec19f0"}
721+
}
722+
723+
724+
def test_from_json():
725+
json_str = '{"id": "de0fd76c-4ef4-4dfb-ab3a-8f24a307e033", "source": "https://egtest.dev/cloudcustomevent", "data": {"team": "event grid squad"}, "type": "Azure.Sdk.Sample", "time": "2020-08-07T02:06:08.11969Z", "specversion": "1.0"}'
726+
event = CloudEvent.from_json(json_str)
727+
728+
assert event.data == {"team": "event grid squad"}
729+
assert event.time.year == 2020
730+
assert event.time.month == 8
731+
assert event.time.day == 7
732+
assert event.time.hour == 2

sdk/eventgrid/azure-eventgrid/CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
# Release History
22

3+
## 4.5.0 (Unreleased)
4+
5+
### Features Added
6+
7+
- `EventGridEvent`'s `from_dict` method now accepts objects from servicebus, eventhubs and storage directly.
8+
- Added a `from_json` method which now accepts storage QueueMessage, eventhub's EventData or ServiceBusMessage or simply json bytes to return an `EventGridEvent`
9+
10+
311
## 4.4.0 (2021-07-19)
412

513
- Bumped `msrest` dependency to `0.6.21` to align with mgmt package.

0 commit comments

Comments
 (0)