Skip to content

Commit f900102

Browse files
committed
Don't retry by default when the stream is exhausted
When the stream is exhausted and the server closes the stream, we no longer automatically reconnect. But this can be changed by setting the `retry_on_exhausted_stream` parameter to `True`. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
1 parent 93cd8df commit f900102

File tree

1 file changed

+11
-1
lines changed

1 file changed

+11
-1
lines changed

src/frequenz/client/base/streaming.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,13 @@
2727
class GrpcStreamBroadcaster(Generic[InputT, OutputT]):
2828
"""Helper class to handle grpc streaming methods."""
2929

30-
def __init__(
30+
def __init__( # pylint: disable=too-many-arguments,too-many-positional-arguments
3131
self,
3232
stream_name: str,
3333
stream_method: Callable[[], AsyncIterable[InputT]],
3434
transform: Callable[[InputT], OutputT],
3535
retry_strategy: retry.Strategy | None = None,
36+
retry_on_exhausted_stream: bool = False,
3637
):
3738
"""Initialize the streaming helper.
3839
@@ -43,13 +44,16 @@ def __init__(
4344
transform: A function to transform the input type to the output type.
4445
retry_strategy: The retry strategy to use, when the connection is lost. Defaults
4546
to retries every 3 seconds, with a jitter of 1 second, indefinitely.
47+
retry_on_exhausted_stream: Whether to retry when the stream is exhausted, i.e.
48+
when the server closes the stream. Defaults to False.
4649
"""
4750
self._stream_name = stream_name
4851
self._stream_method = stream_method
4952
self._transform = transform
5053
self._retry_strategy = (
5154
retry.LinearBackoff() if retry_strategy is None else retry_strategy.copy()
5255
)
56+
self._retry_on_exhausted_stream = retry_on_exhausted_stream
5357

5458
self._channel: channels.Broadcast[OutputT] = channels.Broadcast(
5559
name=f"GrpcStreamBroadcaster-{stream_name}"
@@ -91,6 +95,12 @@ async def _run(self) -> None:
9195
await sender.send(self._transform(msg))
9296
except grpc.aio.AioRpcError as err:
9397
error = err
98+
if error is None and not self._retry_on_exhausted_stream:
99+
_logger.info(
100+
"%s: connection closed, stream exhausted", self._stream_name
101+
)
102+
await self._channel.close()
103+
break
94104
error_str = f"Error: {error}" if error else "Stream exhausted"
95105
interval = self._retry_strategy.next_interval()
96106
if interval is None:

0 commit comments

Comments
 (0)