Skip to content

Commit aa95b08

Browse files
authored
[EventHubs] ensure ownership arg does not mutate (Azure#20008)
Azure#19963
1 parent 6bcbe7f commit aa95b08

File tree

9 files changed

+67
-53
lines changed

9 files changed

+67
-53
lines changed

sdk/eventhub/azure-eventhub-checkpointstoreblob-aio/CHANGELOG.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
# Release History
22

3+
## 1.1.5 (Unreleased)
4+
5+
### Bugs Fixed
6+
7+
- Fixed a bug with `BlobCheckpointStore.claim_ownership` mutating the `ownership_list` argument to no longer mutate the argument.
8+
- Updated `azure-core` dependecy to 1.20.1 to fix `cchardet` ImportError.
9+
310
## 1.1.4 (2021-04-07)
411

512
This version and all future versions will require Python 2.7 or Python 3.6+, Python 3.5 is no longer supported.
@@ -15,7 +22,6 @@ This version will be the last version to officially support Python 3.5, future v
1522
- Updated vendor azure-storage-blob dependency to v12.7.1.
1623
- Fixed storage blob authentication failure due to request date header too old (#16192).
1724

18-
1925
## 1.1.2 (2021-01-11)
2026

2127
**Bug fixes**

sdk/eventhub/azure-eventhub-checkpointstoreblob-aio/azure/eventhub/extensions/checkpointstoreblobaio/_blobstoragecsaio.py

Lines changed: 19 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
# --------------------------------------------------------------------------------------------
55
from typing import Iterable, Dict, Any, Optional
66
import logging
7+
import copy
78
from collections import defaultdict
89
import asyncio
910
from azure.eventhub.exceptions import OwnershipLostError # type: ignore
@@ -101,7 +102,7 @@ def _get_blob_client(self, blob_name: str) -> BlobClient:
101102
return result
102103

103104
async def _upload_ownership(
104-
self, ownership: Dict[str, Any], metadata: Dict[str, str], **kwargs: Any
105+
self, ownership: Dict[str, Any], **kwargs: Any
105106
) -> None:
106107
etag = ownership.get("etag")
107108
if etag:
@@ -116,6 +117,7 @@ async def _upload_ownership(
116117
)
117118
blob_name = blob_name.lower()
118119
blob_client = self._get_blob_client(blob_name)
120+
metadata = {'ownerid': ownership['owner_id']}
119121
try:
120122
uploaded_blob_properties = await blob_client.set_blob_metadata(metadata, **kwargs)
121123
except ResourceNotFoundError:
@@ -127,27 +129,21 @@ async def _upload_ownership(
127129
ownership["last_modified_time"] = uploaded_blob_properties[
128130
"last_modified"
129131
].timestamp()
130-
ownership.update(metadata)
131132

132133
async def _claim_one_partition(self, ownership: Dict[str, Any], **kwargs: Any) -> Dict[str, Any]:
133-
partition_id = ownership["partition_id"]
134-
namespace = ownership["fully_qualified_namespace"]
135-
eventhub_name = ownership["eventhub_name"]
136-
consumer_group = ownership["consumer_group"]
137-
owner_id = ownership["owner_id"]
138-
metadata = {"ownerid": owner_id}
134+
updated_ownership = copy.deepcopy(ownership)
139135
try:
140-
await self._upload_ownership(ownership, metadata, **kwargs)
141-
return ownership
136+
await self._upload_ownership(updated_ownership, **kwargs)
137+
return updated_ownership
142138
except (ResourceModifiedError, ResourceExistsError):
143139
logger.info(
144140
"EventProcessor instance %r of namespace %r eventhub %r consumer group %r "
145141
"lost ownership to partition %r",
146-
owner_id,
147-
namespace,
148-
eventhub_name,
149-
consumer_group,
150-
partition_id,
142+
updated_ownership["owner_id"],
143+
updated_ownership["fully_qualified_namespace"],
144+
updated_ownership["eventhub_name"],
145+
updated_ownership["consumer_group"],
146+
updated_ownership["partition_id"],
151147
)
152148
raise OwnershipLostError()
153149
except Exception as error: # pylint:disable=broad-except
@@ -156,14 +152,14 @@ async def _claim_one_partition(self, ownership: Dict[str, Any], **kwargs: Any) -
156152
"namespace %r eventhub %r consumer group %r partition %r. "
157153
"The ownership is now lost. Exception "
158154
"is %r",
159-
owner_id,
160-
namespace,
161-
eventhub_name,
162-
consumer_group,
163-
partition_id,
155+
updated_ownership["owner_id"],
156+
updated_ownership["fully_qualified_namespace"],
157+
updated_ownership["eventhub_name"],
158+
updated_ownership["consumer_group"],
159+
updated_ownership["partition_id"],
164160
error,
165161
)
166-
return ownership # Keep the ownership if an unexpected error happens
162+
return updated_ownership # Keep the ownership if an unexpected error happens
167163

168164
async def list_ownership(
169165
self, fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, **kwargs: Any
@@ -184,7 +180,7 @@ async def list_ownership(
184180
- `consumer_group` (str): The name of the consumer group the ownership are associated with.
185181
- `partition_id` (str): The partition ID which the checkpoint is created for.
186182
- `owner_id` (str): A UUID representing the current owner of this partition.
187-
- `last_modified_time` (UTC datetime.datetime): The last time this ownership was claimed.
183+
- `last_modified_time` (float): The last time this ownership was claimed as a timestamp.
188184
- `etag` (str): The Etag value for the last time this ownership was modified. Optional depending
189185
on storage implementation.
190186
"""
@@ -237,7 +233,7 @@ async def claim_ownership(
237233
- `consumer_group` (str): The name of the consumer group the ownership are associated with.
238234
- `partition_id` (str): The partition ID which the checkpoint is created for.
239235
- `owner_id` (str): A UUID representing the owner attempting to claim this partition.
240-
- `last_modified_time` (UTC datetime.datetime): The last time this ownership was claimed.
236+
- `last_modified_time` (float): The last time this ownership was claimed as a timestamp.
241237
- `etag` (str): The Etag value for the last time this ownership was modified. Optional depending
242238
on storage implementation.
243239
"""

sdk/eventhub/azure-eventhub-checkpointstoreblob-aio/setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@
6868
python_requires=">=3.6",
6969
install_requires=[
7070
# dependencies for the vendored storage blob
71-
"azure-core<2.0.0,>=1.10.0",
71+
"azure-core<2.0.0,>=1.20.1",
7272
"msrest>=0.6.18",
7373
"cryptography>=2.1.4",
7474
# end of dependencies for the vendored storage blob

sdk/eventhub/azure-eventhub-checkpointstoreblob-aio/tests/test_storage_blob_partition_manager_aio.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,11 @@ async def _claim_and_list_ownership(connection_str, container_name):
6565
ownership['last_modified_time'] = time.time()
6666
ownership_list.append(ownership)
6767

68-
await checkpoint_store.claim_ownership(ownership_list)
68+
claimed_ownership = await checkpoint_store.claim_ownership(ownership_list)
69+
for i in range(ownership_cnt):
70+
assert ownership_list[i]['partition_id'] == str(i)
71+
assert claimed_ownership[i]['partition_id'] == str(i)
72+
assert ownership_list[i] != claimed_ownership[i]
6973

7074
ownership_list = await checkpoint_store.list_ownership(
7175
fully_qualified_namespace=fully_qualified_namespace,

sdk/eventhub/azure-eventhub-checkpointstoreblob/CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
# Release History
22

3+
## 1.1.5 (Unreleased)
4+
5+
### Bugs Fixed
6+
7+
- Fixed a bug with `BlobCheckpointStore.claim_ownership` mutating the `ownership_list` argument to no longer mutate the argument.
8+
- Updated `azure-core` dependecy to 1.20.1 to fix `cchardet` ImportError.
9+
310
## 1.1.4 (2021-04-07)
411

512
This version and all future versions will require Python 2.7 or Python 3.6+, Python 3.5 is no longer supported.

sdk/eventhub/azure-eventhub-checkpointstoreblob/azure/eventhub/extensions/checkpointstoreblob/_blobstoragecs.py

Lines changed: 19 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import logging
77
import time
88
import calendar
9+
import copy
910
from datetime import datetime
1011
from collections import defaultdict
1112

@@ -123,7 +124,7 @@ def _get_blob_client(self, blob_name):
123124
self._cached_blob_clients[blob_name] = result
124125
return result
125126

126-
def _upload_ownership(self, ownership, metadata, **kwargs):
127+
def _upload_ownership(self, ownership, **kwargs):
127128
etag = ownership.get("etag")
128129
if etag:
129130
kwargs["if_match"] = etag
@@ -137,6 +138,7 @@ def _upload_ownership(self, ownership, metadata, **kwargs):
137138
)
138139
blob_name = blob_name.lower()
139140
blob_client = self._get_blob_client(blob_name)
141+
metadata = {'ownerid': ownership['owner_id']}
140142
try:
141143
uploaded_blob_properties = blob_client.set_blob_metadata(metadata, **kwargs)
142144
except ResourceNotFoundError:
@@ -148,27 +150,21 @@ def _upload_ownership(self, ownership, metadata, **kwargs):
148150
ownership["last_modified_time"] = _to_timestamp(
149151
uploaded_blob_properties["last_modified"]
150152
)
151-
ownership.update(metadata)
152153

153154
def _claim_one_partition(self, ownership, **kwargs):
154-
partition_id = ownership["partition_id"]
155-
fully_qualified_namespace = ownership["fully_qualified_namespace"]
156-
eventhub_name = ownership["eventhub_name"]
157-
consumer_group = ownership["consumer_group"]
158-
owner_id = ownership["owner_id"]
159-
metadata = {"ownerid": owner_id}
155+
updated_ownership = copy.deepcopy(ownership)
160156
try:
161-
self._upload_ownership(ownership, metadata, **kwargs)
162-
return ownership
157+
self._upload_ownership(updated_ownership, **kwargs)
158+
return updated_ownership
163159
except (ResourceModifiedError, ResourceExistsError):
164160
logger.info(
165161
"EventProcessor instance %r of namespace %r eventhub %r consumer group %r "
166162
"lost ownership to partition %r",
167-
owner_id,
168-
fully_qualified_namespace,
169-
eventhub_name,
170-
consumer_group,
171-
partition_id,
163+
updated_ownership["owner_id"],
164+
updated_ownership["fully_qualified_namespace"],
165+
updated_ownership["eventhub_name"],
166+
updated_ownership["consumer_group"],
167+
updated_ownership["partition_id"],
172168
)
173169
raise OwnershipLostError()
174170
except Exception as error: # pylint:disable=broad-except
@@ -177,14 +173,14 @@ def _claim_one_partition(self, ownership, **kwargs):
177173
"namespace %r eventhub %r consumer group %r partition %r. "
178174
"The ownership is now lost. Exception "
179175
"is %r",
180-
owner_id,
181-
fully_qualified_namespace,
182-
eventhub_name,
183-
consumer_group,
184-
partition_id,
176+
updated_ownership["owner_id"],
177+
updated_ownership["fully_qualified_namespace"],
178+
updated_ownership["eventhub_name"],
179+
updated_ownership["consumer_group"],
180+
updated_ownership["partition_id"],
185181
error,
186182
)
187-
return ownership # Keep the ownership if an unexpected error happens
183+
return updated_ownership # Keep the ownership if an unexpected error happens
188184

189185
def list_ownership(self, fully_qualified_namespace, eventhub_name, consumer_group, **kwargs):
190186
# type: (str, str, str, Any) -> Iterable[Dict[str, Any]]
@@ -204,7 +200,7 @@ def list_ownership(self, fully_qualified_namespace, eventhub_name, consumer_grou
204200
- `consumer_group` (str): The name of the consumer group the ownership are associated with.
205201
- `partition_id` (str): The partition ID which the checkpoint is created for.
206202
- `owner_id` (str): A UUID representing the current owner of this partition.
207-
- `last_modified_time` (UTC datetime.datetime): The last time this ownership was claimed.
203+
- `last_modified_time` (float): The last time this ownership was claimed as a timestamp.
208204
- `etag` (str): The Etag value for the last time this ownership was modified. Optional depending
209205
on storage implementation.
210206
"""
@@ -254,7 +250,7 @@ def claim_ownership(self, ownership_list, **kwargs):
254250
- `consumer_group` (str): The name of the consumer group the ownership are associated with.
255251
- `partition_id` (str): The partition ID which the checkpoint is created for.
256252
- `owner_id` (str): A UUID representing the owner attempting to claim this partition.
257-
- `last_modified_time` (UTC datetime.datetime): The last time this ownership was claimed.
253+
- `last_modified_time` (float): The last time this ownership was claimed as a timestamp.
258254
- `etag` (str): The Etag value for the last time this ownership was modified. Optional depending
259255
on storage implementation.
260256
"""

sdk/eventhub/azure-eventhub-checkpointstoreblob/setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@
6969
packages=find_packages(exclude=exclude_packages),
7070
install_requires=[
7171
# dependencies for the vendored storage blob
72-
"azure-core<2.0.0,>=1.10.0",
72+
"azure-core<2.0.0,>=1.20.1",
7373
"msrest>=0.6.18",
7474
"cryptography>=2.1.4",
7575
# end of dependencies for the vendored storage blob

sdk/eventhub/azure-eventhub-checkpointstoreblob/tests/test_storage_blob_partition_manager.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,12 @@ def _claim_and_list_ownership(storage_connection_str, container_name):
6868
ownership["sequence_number"] = "1"
6969
ownership_list.append(ownership)
7070

71-
checkpoint_store.claim_ownership(ownership_list)
71+
claimed_ownership = checkpoint_store.claim_ownership(ownership_list)
72+
73+
for i in range(ownership_cnt):
74+
assert ownership_list[i]['partition_id'] == str(i)
75+
assert claimed_ownership[i]['partition_id'] == str(i)
76+
assert ownership_list[i] != claimed_ownership[i]
7277

7378
ownership_list = checkpoint_store.list_ownership(
7479
fully_qualified_namespace=fully_qualified_namespace,

shared_requirements.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,8 +183,8 @@ opentelemetry-api<2.0.0,>=1.5.0,!=1.10a0
183183
opentelemetry-sdk<2.0.0,>=1.5.0,!=1.10a0
184184
#override azure-eventhub-checkpointstoreblob msrest>=0.6.18
185185
#override azure-eventhub-checkpointstoreblob-aio msrest>=0.6.18
186-
#override azure-eventhub-checkpointstoreblob azure-core<2.0.0,>=1.10.0
187-
#override azure-eventhub-checkpointstoreblob-aio azure-core<2.0.0,>=1.10.0
186+
#override azure-eventhub-checkpointstoreblob azure-core<2.0.0,>=1.20.1
187+
#override azure-eventhub-checkpointstoreblob-aio azure-core<2.0.0,>=1.20.1
188188
#override azure-eventhub-checkpointstoreblob-aio aiohttp<4.0,>=3.0
189189
#override azure-eventhub-checkpointstoretable azure-core<2.0.0,>=1.14.0
190190
#override azure-eventhub uamqp>=1.4.3,<2.0.0

0 commit comments

Comments
 (0)