Skip to content

Commit 8cb6b38

Browse files
JJHWANswathipil
andauthored
refactor:eventhub/_eventprocessor:type hint added (Azure#26208)
* refactor:eventhub/_eventprocessor:type hint added * review applied Co-authored-by: swathipil <76007337+swathipil@users.noreply.github.com>
1 parent 7410fe6 commit 8cb6b38

File tree

6 files changed

+95
-82
lines changed

6 files changed

+95
-82
lines changed

sdk/eventhub/azure-eventhub/azure/eventhub/_eventprocessor/_eventprocessor_mixin.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
# Copyright (c) Microsoft Corporation. All rights reserved.
33
# Licensed under the MIT License. See License.txt in the project root for license information.
44
# --------------------------------------------------------------------------------------------
5-
65
from __future__ import annotations
76
from datetime import datetime
87
from contextlib import contextmanager
@@ -49,8 +48,11 @@ class EventProcessorMixin(object):
4948
{}
5049
) # type: Union[int, str, datetime, Dict[str, Union[int, str, datetime]]]
5150

52-
def get_init_event_position(self, partition_id, checkpoint):
53-
# type: (str, Optional[Dict[str, Any]]) -> Tuple[Union[str, int, datetime], bool]
51+
def get_init_event_position(
52+
self,
53+
partition_id: str,
54+
checkpoint: Optional[Dict[str, Any]]
55+
) -> Tuple[Union[str, int, datetime], bool]:
5456
checkpoint_offset = checkpoint.get("offset") if checkpoint else None
5557

5658
event_position_inclusive = False
@@ -75,13 +77,12 @@ def get_init_event_position(self, partition_id, checkpoint):
7577

7678
def create_consumer(
7779
self,
78-
partition_id, # type: str
79-
initial_event_position, # type: Union[str, int, datetime]
80-
initial_event_position_inclusive, # type: bool
81-
on_event_received, # type: Callable[[Union[Optional[EventData], List[EventData]]], None]
82-
**kwargs, # type: Any
83-
):
84-
# type: (...) -> Union[EventHubConsumer, EventHubConsumerAsync]
80+
partition_id: str,
81+
initial_event_position: Union[str, int, datetime],
82+
initial_event_position_inclusive: bool,
83+
on_event_received: Callable[[Union[Optional[EventData], List[EventData]]], None],
84+
**kwargs: Any
85+
) -> Union[EventHubConsumer, EventHubConsumerAsync]:
8586
consumer = self._eventhub_client._create_consumer( # type: ignore # pylint: disable=protected-access
8687
self._consumer_group,
8788
partition_id,
@@ -96,8 +97,7 @@ def create_consumer(
9697
return consumer
9798

9899
@contextmanager
99-
def _context(self, links=None):
100-
# type: (List[Link]) -> Iterator[None]
100+
def _context(self, links: List[Link]=None) -> Iterator[None]:
101101
"""Tracing"""
102102
span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan]
103103
if span_impl_type is None:

sdk/eventhub/azure-eventhub/azure/eventhub/_eventprocessor/checkpoint_store.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,12 @@ class CheckpointStore(object):
1515

1616
@abstractmethod
1717
def list_ownership(
18-
self, fully_qualified_namespace, eventhub_name, consumer_group, **kwargs
19-
):
20-
# type: (str, str, str, Any) -> Iterable[Dict[str, Any]]
18+
self,
19+
fully_qualified_namespace: str,
20+
eventhub_name: str,
21+
consumer_group: str,
22+
**kwargs: Any
23+
) -> Iterable[Dict[str, Any]]:
2124
"""Retrieves a complete ownership list from the chosen storage service.
2225
2326
:param str fully_qualified_namespace: The fully qualified namespace that the Event Hub belongs to.
@@ -40,8 +43,11 @@ def list_ownership(
4043
"""
4144

4245
@abstractmethod
43-
def claim_ownership(self, ownership_list, **kwargs):
44-
# type: (Iterable[Dict[str, Any]], Any) -> Iterable[Dict[str, Any]]
46+
def claim_ownership(
47+
self,
48+
ownership_list: Iterable[Dict[str, Any]],
49+
**kwargs: Any
50+
) -> Iterable[Dict[str, Any]]:
4551
"""Tries to claim ownership for a list of specified partitions.
4652
4753
:param Iterable[Dict[str,Any]] ownership_list: Iterable of dictionaries containing all the ownerships to claim.
@@ -60,8 +66,7 @@ def claim_ownership(self, ownership_list, **kwargs):
6066
"""
6167

6268
@abstractmethod
63-
def update_checkpoint(self, checkpoint, **kwargs):
64-
# type: (Dict[str, Optional[Union[str, int]]], Any) -> None
69+
def update_checkpoint(self, checkpoint: Dict[str, Optional[Union[str, int]]], **kwargs: Any) -> None:
6570
"""Updates the checkpoint using the given information for the offset, associated partition and
6671
consumer group in the chosen storage service.
6772
@@ -86,9 +91,12 @@ def update_checkpoint(self, checkpoint, **kwargs):
8691

8792
@abstractmethod
8893
def list_checkpoints(
89-
self, fully_qualified_namespace, eventhub_name, consumer_group, **kwargs
90-
):
91-
# type: (str, str, str, Any) -> Iterable[Dict[str, Any]]
94+
self,
95+
fully_qualified_namespace: str,
96+
eventhub_name: str,
97+
consumer_group: str,
98+
**kwargs: Any
99+
) -> Iterable[Dict[str, Any]]:
92100
"""List the updated checkpoints from the chosen storage service.
93101
94102
:param str fully_qualified_namespace: The fully qualified namespace that the Event Hub belongs to.

sdk/eventhub/azure-eventhub/azure/eventhub/_eventprocessor/event_processor.py

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# Copyright (c) Microsoft Corporation. All rights reserved.
33
# Licensed under the MIT License. See License.txt in the project root for license information.
44
# --------------------------------------------------------------------------------------------
5+
from __future__ import annotations
56
import random
67
import uuid
78
import logging
@@ -49,12 +50,11 @@ class EventProcessor(
4950

5051
def __init__(
5152
self,
52-
eventhub_client, # type: EventHubConsumerClient
53-
consumer_group, # type: str
54-
on_event, # type: Callable[[PartitionContext, Union[Optional[EventData], List[EventData]]], None]
55-
**kwargs # type: Any
56-
):
57-
# type: (...) -> None
53+
eventhub_client: EventHubConsumerClient,
54+
consumer_group: str,
55+
on_event: Callable[[PartitionContext, Union[Optional[EventData], List[EventData]]], None],
56+
**kwargs: Any
57+
) -> None:
5858
# pylint: disable=line-too-long
5959
self._consumer_group = consumer_group
6060
self._eventhub_client = eventhub_client
@@ -122,8 +122,7 @@ def __init__(
122122
self._partition_id,
123123
)
124124

125-
def __repr__(self):
126-
# type: () -> str
125+
def __repr__(self) -> str:
127126
return "EventProcessor: id {}".format(self._id)
128127

129128
def _process_error(self, partition_context, err):
@@ -141,8 +140,7 @@ def _process_error(self, partition_context, err):
141140
err_again,
142141
)
143142

144-
def _cancel_tasks_for_partitions(self, to_cancel_partitions):
145-
# type: (Iterable[str]) -> None
143+
def _cancel_tasks_for_partitions(self, to_cancel_partitions: Iterable[str]) -> None:
146144
with self._lock:
147145
_LOGGER.debug(
148146
"EventProcessor %r tries to cancel partitions %r",
@@ -179,8 +177,11 @@ def _initialize_partition_consumer(self, partition_id):
179177
"EventProcessor %r has claimed partition %r", self._id, partition_id
180178
)
181179

182-
def _create_tasks_for_claimed_ownership(self, claimed_partitions, checkpoints=None):
183-
# type: (Iterable[str], Optional[Dict[str, Dict[str, Any]]]) -> None
180+
def _create_tasks_for_claimed_ownership(
181+
self,
182+
claimed_partitions: Iterable[str],
183+
checkpoints: Optional[Dict[str, Dict[str, Any]]]=None
184+
) -> None:
184185
with self._lock:
185186
_LOGGER.debug(
186187
"EventProcessor %r tries to claim partition %r",
@@ -223,8 +224,11 @@ def _create_tasks_for_claimed_ownership(self, claimed_partitions, checkpoints=No
223224
)
224225
self._initialize_partition_consumer(partition_id)
225226

226-
def _on_event_received(self, partition_context, event):
227-
# type: (PartitionContext, Union[Optional[EventData], List[EventData]]) -> None
227+
def _on_event_received(
228+
self,
229+
partition_context: PartitionContext,
230+
event: Union[Optional[EventData], List[EventData]]
231+
) -> None:
228232
if event:
229233
try:
230234
partition_context._last_received_event = event[-1] # type: ignore #pylint:disable=protected-access
@@ -236,8 +240,7 @@ def _on_event_received(self, partition_context, event):
236240
else:
237241
self._event_handler(partition_context, event)
238242

239-
def _load_balancing(self):
240-
# type: () -> None
243+
def _load_balancing(self) -> None:
241244
"""Start the EventProcessor.
242245
243246
The EventProcessor will try to claim and balance partition ownership with other `EventProcessor`
@@ -297,8 +300,12 @@ def _load_balancing(self):
297300

298301
time.sleep(load_balancing_interval)
299302

300-
def _close_consumer(self, partition_id, consumer, reason):
301-
# type: (str, EventHubConsumer, CloseReason) -> None
303+
def _close_consumer(
304+
self,
305+
partition_id: str,
306+
consumer: EventHubConsumer,
307+
reason: CloseReason
308+
) -> None:
302309
consumer.close()
303310
with self._lock:
304311
del self._consumers[partition_id]
@@ -332,8 +339,7 @@ def _close_consumer(self, partition_id, consumer, reason):
332339

333340
self._ownership_manager.release_ownership(partition_id)
334341

335-
def _do_receive(self, partition_id, consumer):
336-
# type: (str, EventHubConsumer) -> None
342+
def _do_receive(self, partition_id: str, consumer: EventHubConsumer) -> None:
337343
"""Call the consumer.receive() and handle exceptions if any after it exhausts retries."""
338344
try:
339345
consumer.receive(self._batch, self._max_batch_size, self._max_wait_time)
@@ -352,8 +358,7 @@ def _do_receive(self, partition_id, consumer):
352358
# Does OWNERSHIP_LOST make sense for all errors?
353359
self._close_consumer(partition_id, consumer, CloseReason.OWNERSHIP_LOST)
354360

355-
def start(self):
356-
# type: () -> None
361+
def start(self) -> None:
357362
if self._running:
358363
_LOGGER.info("EventProcessor %r has already started.", self._id)
359364
return
@@ -378,8 +383,7 @@ def start(self):
378383
for partition_id, consumer in list(self._consumers.items()):
379384
self._close_consumer(partition_id, consumer, CloseReason.SHUTDOWN)
380385

381-
def stop(self):
382-
# type: () -> None
386+
def stop(self) -> None:
383387
"""Stop the EventProcessor.
384388
385389
The EventProcessor will stop receiving events from EventHubs and release the ownership of the partitions

sdk/eventhub/azure-eventhub/azure/eventhub/_eventprocessor/in_memory_checkpoint_store.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -105,16 +105,22 @@ def __init__(self):
105105
)
106106

107107
def list_ownership(
108-
self, fully_qualified_namespace, eventhub_name, consumer_group, **kwargs
109-
):
110-
# type: (str, str, str, Any) -> Iterable[Dict[str, Any]]
108+
self,
109+
fully_qualified_namespace: str,
110+
eventhub_name: str,
111+
consumer_group: str,
112+
**kwargs: Any
113+
) -> Iterable[Dict[str, Any]]:
111114
consumer_group_node = self._ownerships_trie.lookup(
112115
(fully_qualified_namespace, eventhub_name, consumer_group)
113116
)
114117
return self._ownerships_trie.list_leaves(consumer_group_node)
115118

116-
def claim_ownership(self, ownership_list, **kwargs):
117-
# type: (Iterable[Dict[str, Any]], Any) -> Iterable[Dict[str, Any]]
119+
def claim_ownership(
120+
self,
121+
ownership_list: Iterable[Dict[str, Any]],
122+
**kwargs: Any
123+
) -> Iterable[Dict[str, Any]]:
118124
result = []
119125
for ownership in ownership_list:
120126
fully_qualified_namespace = ownership["fully_qualified_namespace"]
@@ -143,8 +149,7 @@ def claim_ownership(self, ownership_list, **kwargs):
143149
result.append(ownership)
144150
return result
145151

146-
def update_checkpoint(self, checkpoint, **kwargs):
147-
# type: (Dict[str, Optional[Union[str, int]]], Any) -> None
152+
def update_checkpoint(self, checkpoint: Dict[str, Optional[Union[str, int]]], **kwargs: Any) -> None:
148153
return self._checkpoints_trie.set_ele(checkpoint)
149154

150155
def list_checkpoints(

sdk/eventhub/azure-eventhub/azure/eventhub/_eventprocessor/ownership_manager.py

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# Copyright (c) Microsoft Corporation. All rights reserved.
33
# Licensed under the MIT License. See License.txt in the project root for license information.
44
# --------------------------------------------------------------------------------------------
5+
from __future__ import annotations
56
import math
67
import time
78
import random
@@ -29,13 +30,13 @@ class OwnershipManager(object): # pylint:disable=too-many-instance-attributes
2930

3031
def __init__(
3132
self,
32-
eventhub_client, # type: Union[EventHubConsumerClient, EventHubProducerClient]
33-
consumer_group, # type: str
34-
owner_id, # type: str
35-
checkpoint_store, # type: Optional[CheckpointStore]
36-
ownership_timeout, # type: float
37-
load_balancing_strategy, # type: LoadBalancingStrategy
38-
partition_id, # type: Optional[str]
33+
eventhub_client: Union[EventHubConsumerClient, EventHubProducerClient],
34+
consumer_group: str,
35+
owner_id: str,
36+
checkpoint_store: Optional[CheckpointStore],
37+
ownership_timeout: float,
38+
load_balancing_strategy: LoadBalancingStrategy,
39+
partition_id: Optional[str]
3940
):
4041
self.cached_parition_ids = [] # type: List[str]
4142
self.owned_partitions = [] # type: Iterable[Dict[str, Any]]
@@ -51,8 +52,7 @@ def __init__(
5152
self.load_balancing_strategy = load_balancing_strategy
5253
self.partition_id = partition_id
5354

54-
def claim_ownership(self):
55-
# type: () -> List[str]
55+
def claim_ownership(self) -> List[str]:
5656
"""Claims ownership for this EventProcessor"""
5757
if not self.cached_parition_ids:
5858
self._retrieve_partition_ids()
@@ -78,8 +78,7 @@ def claim_ownership(self):
7878
)
7979
return [x["partition_id"] for x in self.owned_partitions]
8080

81-
def release_ownership(self, partition_id):
82-
# type: (str) -> None
81+
def release_ownership(self, partition_id: str) -> None:
8382
"""Explicitly release ownership of a partition if we still have it.
8483
8584
This is called when a consumer is shutdown, and is achieved by resetting the associated
@@ -100,15 +99,15 @@ def release_ownership(self, partition_id):
10099
partition_ownership[0]["owner_id"] = ""
101100
self.checkpoint_store.claim_ownership(partition_ownership)
102101

103-
def _retrieve_partition_ids(self):
104-
# type: () -> None
102+
def _retrieve_partition_ids(self) -> None:
105103
"""List all partition ids of the event hub that the EventProcessor is working on."""
106104
self.cached_parition_ids = self.eventhub_client.get_partition_ids()
107105

108106
def _balance_ownership( # pylint:disable=too-many-locals
109-
self, ownership_list, all_partition_ids
110-
):
111-
# type: (Iterable[Dict[str, Any]], List[str]) -> List[Dict[str, Any]]
107+
self,
108+
ownership_list: Iterable[Dict[str, Any]],
109+
all_partition_ids: List[str]
110+
) -> List[Dict[str, Any]]:
112111
"""Balances and claims ownership of partitions for this EventProcessor."""
113112
now = time.time()
114113
ownership_dict = {
@@ -206,8 +205,7 @@ def _balance_ownership( # pylint:disable=too-many-locals
206205
to_claim.append(to_steal_partition)
207206
return to_claim
208207

209-
def get_checkpoints(self):
210-
# type: () -> Dict[str, Dict[str, Any]]
208+
def get_checkpoints(self) -> Dict[str, Dict[str, Any]]:
211209
if self.checkpoint_store:
212210
checkpoints = self.checkpoint_store.list_checkpoints(
213211
self.fully_qualified_namespace, self.eventhub_name, self.consumer_group

0 commit comments

Comments
 (0)