Skip to content

Commit d111b55

Browse files
swathipilyunhaolingazure-sdkrakshith91kashifkhan
authored
[EventHubs] merge pyamqp to main (Azure#27763)
* [EventHub] basic receive event scenario with pyamqp (Azure#19748) * initial changes for receiving * undo __init__ aio * vendor * remove c/pyx files * adams comments * [EventHubs & AMQP Python] Send Port (Azure#19745) * draft send port * copy and paste code changes in amqp * simple stress test scripts for sending and receiving * review feedbacks * [EventHubs&AMQP Python] Port amqp send large message (Azure#19937) * port amqp send large message * send perf test in parallel * use context manager for executor * add throughput * improve test code * update test matrix * update test * handle server busy * fix timeout * precision to 2 decimal points * update pyamqp changes and update tests * [EventHubs] Pure Python AMQP Sycn Implementation Integration (Azure#22397) * copy amqp changes * eh python amqp integration * fix time unit * rename module pyamqp to _pyamqp * more pyamqp to _pyamqp * simplify todo * [EH Pyproto] Release preparation (Azure#22433) * cherry pick changes * update docs * cherry pick fixed retry PR * minor fix * fix mypy, pylint, brokenlink * update doc * opt out mypy/pylint/api stub * try opt out checkpointstore in ci and test * fix * more fixes * furuther opt out tests * update tests * bump version * fix __str__ * add test play holder * ignore azure checkpoinstore aio in ci Co-authored-by: swathipil <76007337+swathipil@users.noreply.github.com> * update readme to drop uamqp * revert async tests * revert aio module in eventhub * [EH Pyproto] Async support (Azure#22957) * async port * add scripts for tests * update async perf test scripts * fix test scripts * amqp implementation update + eh update + sync perf test scripts * update pyamqp and eh async impl and test scripts * update pyamqp async impl * fix bug * fix pyamqp transport ssl setting and asyncio exception module import * use ensure future for 3.6 * update token generation to return bytes to avoid breaking changes * update docs * Increment version for eventhub releases (Azure#22994) Increment package version after release of azure-eventhub * [EH Pyproto] Async recv perf improvement (Azure#23122) * stop spawning too much coroutines * improve send * async recv perf improvement * async perf improve * update version * align with sync imple * update method name * remove redundant except catch * [EH Pyproto] Release updates (Azure#23349) * update docs * add todo * Increment version for eventhub releases (Azure#23420) Increment package version after release of azure-eventhub * AMQP websocket implementation (Azure#23722) * Initial implementation * http proxy support * change impl * more changes * working sol * async impl * Update sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_transport.py * more changes * sasl mixin * Update sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/message.py * refactor * Update sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_transport.py * Update sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_transport.py * oops * comments * comment * Apply suggestions from code review Co-authored-by: swathipil <76007337+swathipil@users.noreply.github.com> * comments * changes * async test * rasie * lint * changelog * version * comments * move path to EH Co-authored-by: swathipil <76007337+swathipil@users.noreply.github.com> * Revert "AMQP websocket implementation (Azure#23722)" (Azure#24344) This reverts commit 0123f4d. * AMQP websocket implementation (Azure#24345) * Initial implementation * http proxy support * change impl * more changes * working sol * async impl * Update sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_transport.py * more changes * sasl mixin * Update sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/message.py * refactor * Update sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_transport.py * Update sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_transport.py * oops * comments * comment * Apply suggestions from code review Co-authored-by: swathipil <76007337+swathipil@users.noreply.github.com> * comments * changes * async test * rasie * lint * changelog * version * comments * move path to EH * Fix typo Co-authored-by: swathipil <76007337+swathipil@users.noreply.github.com> * [EventHub] basic receive event scenario with pyamqp (Azure#19748) * initial changes for receiving * undo __init__ aio * vendor * remove c/pyx files * adams comments * [EventHubs & AMQP Python] Send Port (Azure#19745) * draft send port * copy and paste code changes in amqp * simple stress test scripts for sending and receiving * review feedbacks * [EventHubs&AMQP Python] Port amqp send large message (Azure#19937) * port amqp send large message * send perf test in parallel * use context manager for executor * add throughput * improve test code * update test matrix * update test * handle server busy * fix timeout * precision to 2 decimal points * update pyamqp changes and update tests * [EventHubs] Pure Python AMQP Sycn Implementation Integration (Azure#22397) * copy amqp changes * eh python amqp integration * fix time unit * rename module pyamqp to _pyamqp * more pyamqp to _pyamqp * simplify todo * [EH Pyproto] Release preparation (Azure#22433) * cherry pick changes * update docs * cherry pick fixed retry PR * minor fix * fix mypy, pylint, brokenlink * update doc * opt out mypy/pylint/api stub * try opt out checkpointstore in ci and test * fix * more fixes * furuther opt out tests * update tests * bump version * fix __str__ * add test play holder * ignore azure checkpoinstore aio in ci Co-authored-by: swathipil <76007337+swathipil@users.noreply.github.com> * update readme to drop uamqp * revert async tests * revert aio module in eventhub * [EH Pyproto] Async support (Azure#22957) * async port * add scripts for tests * update async perf test scripts * fix test scripts * amqp implementation update + eh update + sync perf test scripts * update pyamqp and eh async impl and test scripts * update pyamqp async impl * fix bug * fix pyamqp transport ssl setting and asyncio exception module import * use ensure future for 3.6 * update token generation to return bytes to avoid breaking changes * update docs * Increment version for eventhub releases (Azure#22994) Increment package version after release of azure-eventhub * [EH Pyproto] Async recv perf improvement (Azure#23122) * stop spawning too much coroutines * improve send * async recv perf improvement * async perf improve * update version * align with sync imple * update method name * remove redundant except catch * [EH Pyproto] Release updates (Azure#23349) * update docs * add todo * Increment version for eventhub releases (Azure#23420) Increment package version after release of azure-eventhub * AMQP websocket implementation (Azure#23722) * Initial implementation * http proxy support * change impl * more changes * working sol * async impl * Update sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_transport.py * more changes * sasl mixin * Update sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/message.py * refactor * Update sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_transport.py * Update sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_transport.py * oops * comments * comment * Apply suggestions from code review Co-authored-by: swathipil <76007337+swathipil@users.noreply.github.com> * comments * changes * async test * rasie * lint * changelog * version * comments * move path to EH Co-authored-by: swathipil <76007337+swathipil@users.noreply.github.com> * Revert "AMQP websocket implementation (Azure#23722)" (Azure#24344) This reverts commit 0123f4d. * AMQP websocket implementation (Azure#24345) * Initial implementation * http proxy support * change impl * more changes * working sol * async impl * Update sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_transport.py * more changes * sasl mixin * Update sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/message.py * refactor * Update sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_transport.py * Update sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_transport.py * oops * comments * comment * Apply suggestions from code review Co-authored-by: swathipil <76007337+swathipil@users.noreply.github.com> * comments * changes * async test * rasie * lint * changelog * version * comments * move path to EH * Fix typo Co-authored-by: swathipil <76007337+swathipil@users.noreply.github.com> * remove extra SR related code * update docs + type hints * fixing failing tests * [EventHubs] merge working websocket changes to feature branch (Azure#24444) * adam's working changes * Adding back Rakshith's websocket changes (Azure#24410) * Adding back Rakshith's sync websocket changes * fix async send and receive * fix transport bugs * add websocket to dev reqs + async fix hostname * thank you kashif * fix tests + turn on websocket tests * update consumer test timing * fix merge bugs + remove pyamqp specific tests * update sleep time in test * enable live test for ws receive * fix to create Batch properly * [eventhub] Websocket timeout error exception thrown (Azure#24504) * sync websocket timeout to operationtimeout, not changing other transport types just yet * async websocket to operation timeout * upstream * default timeout 1.0 * default timeout 1 * throwing a socket timeout, operationtimeout was throwing out a real error * replacing socket.timeout with TimeoutError, added into except statments as well * timeout is inherited from oserror, dont need both * test timeoutexception throwing errors * deafult timeout to 1 * [eventhub] websocket default timeout fix (Azure#24565) * websocket timeout fix * timeout interval for both ssl and webscoket * [eventhub] Custom Endpoint (Azure#24505) * sync ce * async ce * add string ending * only pass to transport * running into same recieve issue with sync * fixing async - needs to pass to sasl * remove logger * stopping here * adding prefix to fix sample * add in prefetch * fixing transport remove print * host being overriden * removing trace * fix to use url async * aligning sync/async pattern * removing uneeded hostname switch * string formatting * changelog * adding docstrings for supported events * pr comments refactoring sync * mirroring on async * pr comment docstring * removing import * missing _ * missing ssl * if no port given, we use default set in config * async of same ^ * add default port in connection stage if port is none * adding in docstring to cliet/connection string constructor * custom_endpoint_address in client base async to match sync * fix import on websocket test * fix import 2 * skipping tests * removing import * pytest.mark * [EventHubs] pyproto - update release date + docs (Azure#24723) * add async doc rst file * Increment version for eventhub releases (Azure#24753) Increment package version after release of azure-eventhub * changes to update status (Azure#25024) * updating codeowners file in pyproto feature * [Eventhub] pyamqp prefetch fix (Azure#24890) * prefetch fix * adding async - sorry! * async * Use --no-cone in pipeline sparse checkout script (Azure#25165) (Azure#25208) Co-authored-by: Ben Broderick Phillips <bebroder@microsoft.com> Co-authored-by: Azure SDK Bot <53356347+azure-sdk@users.noreply.github.com> Co-authored-by: Ben Broderick Phillips <bebroder@microsoft.com> * [AMQP Python] Eventhub Pyamqp tests (Azure#24895) * starting tests * updates to websocket sync * moving around format - unittest and live test * live test + unittests starting * websocket async passing * eol * assert not return * assert not return * fixed assert * auth tests * auth unittest pyamqp * replicating uamqp tests * keep_alive_thread * skip for now - no keep alive * pickle/deepcopy, might not want to keep all * stopping here for now - need tls on rabbitmq * cleaning up tests - pickle * removing and editing uneeded tests * removing unused test * added receive amqp tests * exceptions with pytest.raises, not live * moving around tests * testing mgmt calls like _start_producer * Use --no-cone in pipeline sparse checkout script (Azure#25165) Co-authored-by: Ben Broderick Phillips <bebroder@microsoft.com> * unused imports Co-authored-by: Azure SDK Bot <53356347+azure-sdk@users.noreply.github.com> Co-authored-by: Ben Broderick Phillips <bebroder@microsoft.com> * [Pyamqp] Fix network logging trace in client_base (Azure#25218) * pass in right kwarg for network tracing * remove client changes. Another PR * reverting link credit for now (Azure#25310) * [Pyamqp] Pyampq debug build Linkedin (Azure#25296) * enhanced logging for linkedin * stuff * fixes * minor sample fix * Changelog * remove unused imports * fix formatting changes * change debug level * update version info * update changelog * fix sample * fix logging message for empty access token * Increment version for eventhub releases (Azure#25320) Increment package version after release of azure-eventhub * removing duplicate (Azure#25321) * add async unit tests (Azure#25396) * reset logging level (Azure#25588) * [Pyamqp] Remember Proxy Params (Azure#25564) * fix to keep proxy params * async changes for proxy * unit tests * changes * more changes Co-authored-by: swathipil <76007337+swathipil@users.noreply.github.com> Co-authored-by: swathipil <76007337+swathipil@users.noreply.github.com> * [Pyamqp] Intial TODOS Clean Up (Azure#25630) * set default SSL version * dont need it for EH & SB * wont impact us, check mtg notes * decode error wont affect us * address in SB PR * change language on logger * it does close socket * no other closes needed * keep for better tracking * close w/ error when max frame size is invalid * detach links on session end * clean up links on session outgoing_end * reject link by detaching * uncomment logging for later review * reject the link that was set in the try * [Pyamqp] test fixes for pipeline (Azure#25749) * test fixes for pipeline * connect to EHerror * [PyAMQP] Stress testing reform (Azure#25770) * reform stress test * reform stress test * updating stress test format * can specify azure_identity on producer * epoch level sync * removing commands * aligning sync/async * pyamqp logging * uamqp to pyamqp * import remove * fixing deploy commands * values file * checking eng/ file resources * remove version * moving location of this folder * imagepullpolicy, azure_identity for consumer * [PyAMQP] Updating pyamqp with SB changes (Azure#25804) * pyamqp diff from anna's branch * mgmt_request returns code, response, desc * mgmt_request returns code, response, desc async * handle error being thrown * handle error being thrown 2 * handle error being thrown 3 * Fixed error path * fix error path Co-authored-by: Anna Tisch <antisch@microsoft.com> Co-authored-by: Kashif Khan <361477+kashifkhan@users.noreply.github.com> * removing buff producer to add back in later * update cspell * fix uppercase link in doc/dev/issues/resolve_issues_effectively * [PyAMQP] Connections TODO (Azure#26018) * protocol errors * fix formatting * [PyAMQP] Kashif Client refactoring changes (Azure#25451) Linked to link Azure#22051 for a TODO about typing in docstrings * need to use pyamqp (Azure#25895) * [EventHubs] add amqp switch support (Azure#25965) This PR is for adding switch support to the `feature/eventhub/pyproto` changes including the uamqp switch from current `main`. fixes Azure#21246 Addressing Anna's comments from uamqp switch PR (Azure#25193) + main changes: - [x] Moving pyamqp logic out to the PyamqpTransport - [x] **Confirmed: The size of encoded pyamqp.Message is larger than uamqp.Message.** I thought otherwise b/c I was adding the header/property objects by default when building the outgoing uamqp message, even if all values inside those are None. I've fixed this. - [ ] Make BatchMessage transport agnostic: Azure#25494 (comment) - Instead, updated EventDataBatch so that it takes an amqp_transport. If EventDataBatch is manually created and uses PyamqpTransport, inside `send_batch()`, if the producer client transport uses UamqpTransport, the BatchMessage corresponding to the client's amqp_transport will be built and sent. - [X] Add `message` property to `EventData`/`EventDataBatch`, which return `LegacyMessage`/`LegacyMessageBatch` from `_pyamqp` for backcompat. - [x] add `connection=None` parameter to `pyamqp.AMQPClient.open()` as per [this discussion](Azure#25494 (comment)) - [x] Add an async SharedConnectionManager in pyamqp: Azure#25494 (comment) Issue created [[here](Azure#25875)] to address the below TODOs in a separate PR: - [x] add TODO in pyamqp that SenderClient should take msg_timeout: Azure#25494 (comment) - [x] add TODO in pyamqp that ReceiveClient should take timeout: Azure#25494 (comment) - [x] add both MAX_MESSAGE_LENGTH_BYTES and MAX_FRAME_SIZE_BYTES to pyamqp: Azure#25494 (comment) TODO: - [ ] fix mypy/pylint issues - made partial progress. fix rest in separate PR for issue: Azure#25936 - [x] investigate how to remove `message` property from public API. - mark as deprecated and log a deprecation warning if accessed * prep release * [Pyamqp] Exception Todos (Azure#25893) * throw proper exception and error condition on CBS * error condition for ready * close when open is on non-zero channel * error condition is proper for timeout * unattached handle * max handles error condition * fix strings * fix * end the session on unattached handle * remove TODO * conditions are proper * raise link error on close or detach * revert change * remove TODO * change to client error * change to AmqpError * change to AMQPError * detach called * detach the link, dont close the connection * comments clean up + change condition * handle invalid channel on end * descriptive error message * fix formatting * detach err message on max handle * [Pyamqp] Async WS implementation using a pure async library (Azure#26234) * init changes for aiohttp ws * fixes + formatting * fixes for context manager * changes * ssl options * clean up * move build opts in to mixin * pass in proxy information * attach port to proxy if given * address comments * remove self from proxy host * [SB PyAMQP] Servicebus PyAMQP Working off of Anna's Branch (Azure#24975) * Added pyamqp * Added message compatibility tests * Start rewiring messages for pyamqp * Added message backcompat layer * Successful message send * Started receiver * Successful message receive * Message settlement * Fix other settlement outcomes * Make tests live * message partition_key if it can't be decoded - output value * removing references to __future___ annotations for now - not supported in 3.6 * comparing name of transport - not the object * passing in a dummy frame for new formatting of SBMessageReceived * adding in fake frame for message in queue tests * uamqp_mesage -> uamqp_message * state should be auth_state * switching this back - _message is Message * Improved typing * Revert "Improved typing" This reverts commit aeffcb2. * Fix TransportType enum * Fix import statement * Fix application property encoding * Skip queue iterator tests * Fix mgmt op timeout * Fixes to mgmt link * Fix frame decode tests * More mgmt fixes * Some message fixes * Fix session filters * Message tests * Skip more iterator tests * Update to retry policy * adding in support for websockets is CE supported? * fixing up pylint-still some issues * some more pylint/TODOs * pylint changes * fixing pylint * more pylint connection * More test fixes * Fix scheduling * Fix retry test * Fix error handling * Sender refactor for timeout * Fix link detach * Fixed receiver control flow * Update pyamqp async code * Updated sb async * Typing fix * Some async fixes * Skip async iter tests * Workaround socket timeout * Literal import * More async test fixes * Added keepalive * Pylint cleanup * fix mypy errors in _pyamqp * fix mypy sb layer * fix bug * unused import * lint * fix failing tests * ignore sb iterator receive samples Co-authored-by: antisch <antisch@microsoft.com> Co-authored-by: swathipil <swathip@microsoft.com> * [Pyamqp] Pyamqp fix conn (Azure#26568) * remove unnecessary pops * fix var name + remove unnecessary pop * fix * Bring in changes to fix bandit from failing * [EventHubs&ServiceBus] merge sb and eh pyamqp (Azure#26548) * merge sb and eh pyamqp * reenable pylint for EH * turn on mypy for EH * fix mypy errors eh layer * fix EH mypy/pylint * fix SB failing tests * fix more tests/mypy * import literal from typing extensions * remove whitespace * fix typing cast bug in EH * lint * fix port url in async transport * kashifs comments * mypy/lint/kashifs comments * [Pyamqp] Fix Async Invalid Host Error Test (Azure#26595) * resert transport to original state * add in missing continue in except * Update sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_transport_async.py Co-authored-by: swathipil <76007337+swathipil@users.noreply.github.com> Co-authored-by: swathipil <76007337+swathipil@users.noreply.github.com> * fix merge conflict stress * update cspell, ignore tables spelling * skip connection verify tests sb for now * fix logging formatting (Azure#26682) * copy over kashifs change to sb * [ServiceBus&EventHubs] pyamqp - update EH/SB docs for release (Azure#26741) updating docs to prepare for release * [ServiceBus&EventHubs] fix mypy/pylint (Azure#26744) * fix mypy/pylint * bump SB version * prep release alpha (Azure#26755) * Increment version for servicebus releases (Azure#26762) Increment package version after release of azure-servicebus * Increment version for eventhub releases (Azure#26766) Increment package version after release of azure-eventhub * [PyAMQP] Fix logging (Azure#26785) * make logging network trace a debug level log * async changes * await async sleep (Azure#26853) * fix for async socket (Azure#26852) * [Pyamqp] Possible solutions for network disruption using async websocket (Azure#26856) * heartbeat * constant value for heartbeat * address comments * [Pyamqp] Fix to Improve Websocket Sync and Async Network Disruption Handling (Azure#27006) * changes * fix lint * remove prints * remove unused import * remove heartbeat from this PR * [pyAMQP] Stress fixes for aiohttp and valueError (Azure#27034) * changes * fix lint * remove prints * remove unused import * remove heartbeat from this PR * OS Error to catch client closed session error * removing value error raise, change to logging -- causes error on network disconnect * when looping on open clinet * change to printf style Co-authored-by: Kashif Khan <kashifkhan@microsoft.com> * [Pyamqp] Changes for blocking exceptions (Azure#27260) * changes for blocking exceptions * fix hang on unit test * lint fixes * fix bug for closes * fixes * lint issues * Stress testing updates (Azure#27456) * can uncomment line 5 to run against git version of pyamqp * can move line 6 into scripts/dev_requirement file * test against newest version of pyamqp * increase test time, get rid of unused tests * removing test names * change naming * change naming * raise logging level to catch only error level * return logging to info * adding resource requests * message retention needs to last as long as the test * websocket async test * changing life of messages * 32 partitions * add uamqp flag - remove logging * adding before trying matrix * websocket dep * helm ignore * updating * updating tests * update consumer files * remove log lines * remove log lines * remove commented * update * [EventHubs] kwargs/error testing (Azure#27065) * adding tests * add auth/connection tests + fixes * fix connection verify error handling * revert consumer retry change * call ws close in sync transport * typo * fix ws exc import * fix async transport * fix link detach vendor error exception parity * add operationtimeouterror * add more negative tests * annas comments + lint * lint + tests * add ids for uamqp vs pyamqp tests * update tests * skip macos tests * [EventHubs] check for any non-None values in amqp header/properties (Azure#27444) * add any method to amqp header/properties * use count(None) to check non-None header/props vals * [EventHubs] update sync receive client ready flow (Azure#27411) * update sync receive client ready flow * lint + mypy * fix reconnect test * [pyamqp] os error add (Azure#27351) * os error add * update except statement * test mock of receive_bytes on pipeline * client os error * mock try 2 * pylint * fix patch line * fixed mock * newline * fixing spacing * unused import * fix error mssgs * changes from perf run (Azure#27703) * Matrix Gen Stress Tests (Azure#27754) * stress matrix gen * removing unused dockerfiles for now * revert sb to main * revert non-eh files * restore samples/readmes to main * restore ci/tests/shared reqs to main * update to stable * lint + fix tests for no uamqp import * update test timeout * re-organize changelog Co-authored-by: Adam Ling (MSFT) <adam_ling@outlook.com> Co-authored-by: Azure SDK Bot <53356347+azure-sdk@users.noreply.github.com> Co-authored-by: Rakshith Bhyravabhotla <rakshith.bhyravabhotla@gmail.com> Co-authored-by: Kashif Khan <kashifkhan@microsoft.com> Co-authored-by: Libba Lawrence <llawrence@microsoft.com> Co-authored-by: Kashif Khan <361477+kashifkhan@users.noreply.github.com> Co-authored-by: Ben Broderick Phillips <bebroder@microsoft.com> Co-authored-by: Anna Tisch <antisch@microsoft.com>
1 parent a8041f5 commit d111b55

File tree

132 files changed

+17954
-1057
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

132 files changed

+17954
-1057
lines changed

sdk/eventhub/azure-eventhub/CHANGELOG.md

Lines changed: 62 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Release History
22

3-
## 5.10.2 (Unreleased)
3+
## 5.11.0 (Unreleased)
44

55
### Features Added
66

@@ -88,9 +88,66 @@ This version and all future versions will require Python 3.7+, Python 3.6 is no
8888

8989
## 5.9.0b1 (2022-02-09)
9090

91+
- The following features have been temporarily pulled out of async `EventHubProducerClient` and `EventHubConsumerClient` which will be added back in future previews as we work towards a stable release:
92+
- Passing the following keyword arguments to the constructors and `from_connection_string` methods of the `EventHubProducerClient` and `EventHubConsumerClient` is not supported: `transport_type`, `http_proxy`, `custom_endpoint_address`, and `connection_verify`.
93+
94+
## 5.8.0b2 (2022-10-11)
95+
96+
### Features Added
97+
98+
- Updated the optional dependency for async transport using AMQP over WebSocket from `websocket-client` to `aiohttp` (Issue #24315, thanks @hansmbakker for the suggestion).
99+
100+
## 5.8.0b1 (2022-09-22)
101+
102+
This version and all future versions will require Python 3.7+. Python 3.6 is no longer supported.
103+
104+
### Other Changes
105+
106+
- Added the `uamqp_transport` optional parameter to the clients, to allow switching to the `uamqp` library as the transport.
107+
108+
## 5.8.0a5 (2022-07-19)
109+
110+
### Bugs Fixed
111+
112+
- Fixed bug that prevented token refresh at regular intervals.
113+
- Fixed bug that was improperly passing the debug keyword argument, so that network trace debug logs are output when requested.
114+
115+
### Other Changes
116+
117+
- Added logging added in to track proper token refreshes & fetches, output exception reason for producer init failure.
118+
119+
## 5.8.0a4 (2022-06-07)
120+
121+
### Features Added
122+
123+
- Added support for connection using websocket and http proxy.
124+
- Added support for custom endpoint connection over websocket.
125+
126+
## 5.8.0a3 (2022-03-08)
127+
128+
### Other Changes
129+
130+
- Improved the performance of async sending and receiving.
131+
132+
## 5.8.0a2 (2022-02-09)
133+
91134
### Features Added
92135

93-
- The classmethod `from_message_data` has been added to `EventData` for interoperability with the Schema Registry Avro Encoder library, and takes `data` and `content_type` as positional parameters.
136+
- Added support for async `EventHubProducerClient` and `EventHubConsumerClient`.
137+
138+
## 5.8.0a1 (2022-01-13)
139+
140+
Version 5.8.0a1 is our first efforts to build an Azure Event Hubs client library based on pure python implemented AMQP stack.
141+
142+
### Breaking changes
143+
144+
- The following features have been temporarily pulled out which will be added back in future previews as we work towards a stable release:
145+
- Async is not supported.
146+
- Passing the following keyword arguments to the constructors and `from_connection_string` methods of the `EventHubProducerClient` and `EventHubConsumerClient` is not supported: `transport_type`, `http_proxy`, `custom_endpoint_address`, and `connection_verify`.
147+
148+
### Other Changes
149+
150+
- uAMQP dependency is removed.
94151

95152
## 5.7.0 (2022-01-12)
96153

@@ -598,4 +655,6 @@ Version 5.0.0b1 is a preview of our efforts to create a client library that is u
598655
- Further testing and minor bug fixes.
599656

600657

601-
![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-python/sdk/eventhub/azure-eventhub/HISTORY.png)
658+
## 0.2.0a2 (2018-04-02)
659+
660+
- Updated uAQMP dependency.

sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ def __init__(
3232
max_message_size_on_link: int,
3333
executor: ThreadPoolExecutor,
3434
*,
35+
amqp_transport: AmqpTransport,
3536
max_buffer_length: int,
3637
max_wait_time: float = 1
3738
):
@@ -50,10 +51,11 @@ def __init__(
5051
self._max_message_size_on_link = max_message_size_on_link
5152
self._check_max_wait_time_future = None
5253
self.partition_id = partition_id
54+
self._amqp_transport = amqp_transport
5355

5456
def start(self):
5557
with self._lock:
56-
self._cur_batch = EventDataBatch(self._max_message_size_on_link)
58+
self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport)
5759
self._running = True
5860
if self._max_wait_time:
5961
self._last_send_time = time.time()
@@ -113,12 +115,12 @@ def put_events(self, events, timeout_time=None):
113115
self._buffered_queue.put(self._cur_batch)
114116
self._buffered_queue.put(events)
115117
# create a new batch for incoming events
116-
self._cur_batch = EventDataBatch(self._max_message_size_on_link)
118+
self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport)
117119
except ValueError:
118120
# add single event exceeds the cur batch size, create new batch
119121
with self._lock:
120122
self._buffered_queue.put(self._cur_batch)
121-
self._cur_batch = EventDataBatch(self._max_message_size_on_link)
123+
self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport)
122124
self._cur_batch.add(events)
123125
with self._lock:
124126
self._cur_buffered_len += new_events_len
@@ -197,7 +199,7 @@ def flush(self, timeout_time=None, raise_error=True):
197199
self._last_send_time = time.time()
198200
#reset buffered count
199201
self._cur_buffered_len = 0
200-
self._cur_batch = EventDataBatch(self._max_message_size_on_link)
202+
self._cur_batch = EventDataBatch(self._max_message_size_on_link, amqp_transport=self._amqp_transport)
201203
_LOGGER.info("Partition %r finished flushing.", self.partition_id)
202204

203205
def check_max_wait_time_worker(self):

sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer_dispatcher.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
from ..exceptions import EventDataSendError, ConnectError, EventHubError
1515

1616
if TYPE_CHECKING:
17-
from .._producer_client import SendEventTypes
1817
from .._transport._base import AmqpTransport
18+
from .._producer_client import SendEventTypes
1919

2020
_LOGGER = logging.getLogger(__name__)
2121

@@ -31,6 +31,7 @@ def __init__(
3131
eventhub_name: str,
3232
max_message_size_on_link: int,
3333
*,
34+
amqp_transport: AmqpTransport,
3435
max_buffer_length: int = 1500,
3536
max_wait_time: float = 1,
3637
executor: Optional[Union[ThreadPoolExecutor, int]] = None
@@ -47,6 +48,7 @@ def __init__(
4748
self._max_wait_time = max_wait_time
4849
self._max_buffer_length = max_buffer_length
4950
self._existing_executor = False
51+
self._amqp_transport = amqp_transport
5052

5153
if not executor:
5254
self._executor = ThreadPoolExecutor()
@@ -88,6 +90,7 @@ def enqueue_events(
8890
executor=self._executor,
8991
max_wait_time=self._max_wait_time,
9092
max_buffer_length=self._max_buffer_length,
93+
amqp_transport = self._amqp_transport,
9194
)
9295
buffered_producer.start()
9396
self._buffered_producers[pid] = buffered_producer

sdk/eventhub/azure-eventhub/azure/eventhub/_client_base.py

Lines changed: 39 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import collections
1212
from typing import Any, Dict, Tuple, List, Optional, TYPE_CHECKING, cast, Union
1313
try:
14-
from typing import TypeAlias
14+
from typing import TypeAlias # type: ignore
1515
except ImportError:
1616
from typing_extensions import TypeAlias
1717
from datetime import timedelta
@@ -25,11 +25,15 @@
2525
from azure.core.utils import parse_connection_string as core_parse_connection_string
2626
from azure.core.pipeline.policies import RetryMode
2727

28-
29-
from ._transport._uamqp_transport import UamqpTransport
28+
try:
29+
from ._transport._uamqp_transport import UamqpTransport
30+
except ImportError:
31+
UamqpTransport = None # type: ignore
32+
from ._transport._pyamqp_transport import PyamqpTransport
3033
from .exceptions import ClientClosedError
3134
from ._configuration import Configuration
32-
from ._utils import utc_from_timestamp, parse_sas_credential, generate_sas_token
35+
from ._utils import utc_from_timestamp, parse_sas_credential
36+
from ._pyamqp.utils import generate_sas_token
3337
from ._connection_manager import get_connection_manager
3438
from ._constants import (
3539
CONTAINER_PREFIX,
@@ -43,8 +47,14 @@
4347

4448
if TYPE_CHECKING:
4549
from azure.core.credentials import TokenCredential
46-
from uamqp import Message as uamqp_Message
47-
from uamqp.authentication import JWTTokenAuth as uamqp_JWTTokenAuth
50+
try:
51+
from uamqp import Message as uamqp_Message
52+
from uamqp.authentication import JWTTokenAuth as uamqp_JWTTokenAuth
53+
except ImportError:
54+
uamqp_Message = None
55+
uamqp_JWTTokenAuth = None
56+
from ._pyamqp.message import Message
57+
from ._pyamqp.authentication import JWTTokenAuth
4858

4959
_LOGGER = logging.getLogger(__name__)
5060
_Address = collections.namedtuple("_Address", "hostname path")
@@ -165,7 +175,7 @@ def _get_backoff_time(retry_mode, backoff_factor, backoff_max, retried_times):
165175
if retry_mode == RetryMode.Fixed:
166176
backoff_value = backoff_factor
167177
else:
168-
backoff_value = backoff_factor * (2**retried_times)
178+
backoff_value = backoff_factor * (2 ** retried_times)
169179
return min(backoff_max, backoff_value)
170180

171181

@@ -262,6 +272,7 @@ def get_token(self, *scopes, **kwargs): # pylint:disable=unused-argument
262272
return AccessToken(signature, expiry)
263273

264274

275+
# separate TYPE_CHECKING block here for EventHubSharedKeyCredential, o/w mypy raised error even with forward referencing
265276
if TYPE_CHECKING:
266277
from azure.core.credentials import TokenCredential
267278

@@ -281,8 +292,10 @@ def __init__(
281292
credential: CredentialTypes,
282293
**kwargs: Any,
283294
) -> None:
284-
uamqp_transport = kwargs.pop("uamqp_transport", True)
285-
self._amqp_transport = kwargs.pop("amqp_transport", UamqpTransport)
295+
uamqp_transport = kwargs.pop("uamqp_transport", False)
296+
if uamqp_transport and not UamqpTransport:
297+
raise ValueError("To use the uAMQP transport, please install `uamqp>=1.6.0,<2.0.0`.")
298+
self._amqp_transport = kwargs.pop("amqp_transport", UamqpTransport if uamqp_transport else PyamqpTransport)
286299

287300
self.eventhub_name = eventhub_name
288301
if not eventhub_name:
@@ -305,7 +318,10 @@ def __init__(
305318
**kwargs,
306319
)
307320
self._debug = self._config.network_tracing
308-
self._conn_manager = get_connection_manager(**kwargs)
321+
kwargs["custom_endpoint_address"] = self._config.custom_endpoint_address
322+
self._conn_manager = get_connection_manager(
323+
amqp_transport=self._amqp_transport,
324+
**kwargs)
309325
self._idle_timeout = kwargs.get("idle_timeout", None)
310326

311327
@staticmethod
@@ -322,7 +338,7 @@ def _from_connection_string(conn_str, **kwargs):
322338
kwargs["credential"] = EventHubSharedKeyCredential(policy, key)
323339
return kwargs
324340

325-
def _create_auth(self) -> uamqp_JWTTokenAuth:
341+
def _create_auth(self) -> Union[uamqp_JWTTokenAuth, JWTTokenAuth]:
326342
"""
327343
Create an ~uamqp.authentication.SASTokenAuth instance
328344
to authenticate the session.
@@ -381,7 +397,7 @@ def _backoff(
381397
raise last_exception
382398

383399
def _management_request(
384-
self, mgmt_msg: uamqp_Message, op_type: bytes
400+
self, mgmt_msg: Union[uamqp_Message, Message], op_type: bytes
385401
) -> Any:
386402
# pylint:disable=assignment-from-none
387403
retried_times = 0
@@ -401,26 +417,29 @@ def _management_request(
401417
mgmt_msg.application_properties[
402418
"security_token"
403419
] = self._amqp_transport.get_updated_token(mgmt_auth)
404-
response = self._amqp_transport.mgmt_client_request(
420+
status_code, description, response = self._amqp_transport.mgmt_client_request(
405421
mgmt_client,
406422
mgmt_msg,
407423
operation=READ_OPERATION,
408424
operation_type=op_type,
409425
status_code_field=MGMT_STATUS_CODE,
410426
description_fields=MGMT_STATUS_DESC,
411427
)
412-
status_code = int(response.application_properties[MGMT_STATUS_CODE])
413-
description = response.application_properties.get(
414-
MGMT_STATUS_DESC
415-
) # type: Optional[Union[str, bytes]]
428+
status_code = int(status_code)
416429
if description and isinstance(description, bytes):
417430
description = description.decode("utf-8")
418431
if status_code < 400:
419432
return response
420433
raise self._amqp_transport.get_error(status_code, description)
421434
except Exception as exception: # pylint: disable=broad-except
435+
# is_consumer=True passed in here, ALTHOUGH this method is shared by the producer and consumer.
436+
# is_consumer will only be checked if FileNotFoundError is raised by self.mgmt_client.open() due to
437+
# invalid/non-existent connection_verify filepath. The producer will encounter the FileNotFoundError
438+
# when opening the SendClient, so is_consumer=True will not be passed to amqp_transport.handle_exception
439+
# there. This is for uamqp exception parity, which raises FileNotFoundError in the consumer and
440+
# EventHubError in the producer. TODO: Remove `is_consumer` kwarg when resolving issue #27128.
422441
last_exception = self._amqp_transport._handle_exception( # pylint: disable=protected-access
423-
exception, self
442+
exception, self, is_consumer=True
424443
)
425444
self._backoff(
426445
retried_times=retried_times, last_exception=last_exception
@@ -540,10 +559,10 @@ def _close_connection(self):
540559
self._close_handler()
541560
self._client._conn_manager.reset_connection_if_broken() # pylint: disable=protected-access
542561

543-
def _handle_exception(self, exception):
562+
def _handle_exception(self, exception, *, is_consumer=False):
544563
exception = self._amqp_transport.check_timeout_exception(self, exception)
545564
return self._amqp_transport._handle_exception( # pylint: disable=protected-access
546-
exception, self
565+
exception, self, is_consumer=is_consumer
547566
)
548567

549568
def _do_retryable_operation(self, operation, timeout=None, **kwargs):

0 commit comments

Comments
 (0)