Skip to content

Commit fb183b1

Browse files
authored
Add EnsureDesiredPropertiesStage kwarg (#1013)
- Added ensure_desired_properties to abstract_clients and shared_client_tests - Added unit tests
1 parent 63a327c commit fb183b1

File tree

5 files changed

+353
-38
lines changed

5 files changed

+353
-38
lines changed

azure-iot-device/azure/iot/device/iothub/abstract_clients.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ def _validate_kwargs(exclude=[], **kwargs):
3939
"auto_connect",
4040
"connection_retry",
4141
"connection_retry_interval",
42+
"ensure_desired_properties",
4243
]
4344

4445
for kwarg in kwargs:
@@ -59,6 +60,7 @@ def _get_config_kwargs(**kwargs):
5960
"auto_connect",
6061
"connection_retry",
6162
"connection_retry_interval",
63+
"ensure_desired_properties",
6264
]
6365

6466
config_kwargs = {}
@@ -264,6 +266,8 @@ def create_from_connection_string(cls, connection_string, **kwargs):
264266
:param bool connection_retry: Attempt to re-establish a dropped connection (Default: True)
265267
:param int connection_retry_interval: Interval, in seconds, between attempts to
266268
re-establish a dropped connection (Default: 10)
269+
:param bool ensure_desired_properties: Ensure the most recent desired properties patch has
270+
been received upon re-connections (Default:True)
267271
268272
:raises: ValueError if given an invalid connection_string.
269273
:raises: TypeError if given an unsupported parameter.
@@ -345,6 +349,8 @@ def create_from_sastoken(cls, sastoken, **kwargs):
345349
:param bool connection_retry: Attempt to re-establish a dropped connection (Default: True)
346350
:param int connection_retry_interval: Interval, in seconds, between attempts to
347351
re-establish a dropped connection (Default: 10)
352+
:param bool ensure_desired_properties: Ensure the most recent desired properties patch has
353+
been received upon re-connections (Default:True)
348354
349355
:raises: TypeError if given an unsupported parameter.
350356
:raises: ValueError if the sastoken parameter is invalid.
@@ -554,6 +560,8 @@ def create_from_x509_certificate(cls, x509, hostname, device_id, **kwargs):
554560
:param bool connection_retry: Attempt to re-establish a dropped connection (Default: True)
555561
:param int connection_retry_interval: Interval, in seconds, between attempts to
556562
re-establish a dropped connection (Default: 10)
563+
:param bool ensure_desired_properties: Ensure the most recent desired properties patch has
564+
been received upon re-connections (Default:True)
557565
558566
:raises: TypeError if given an unsupported parameter.
559567
@@ -569,6 +577,7 @@ def create_from_x509_certificate(cls, x509, hostname, device_id, **kwargs):
569577
device_id=device_id, hostname=hostname, x509=x509, **config_kwargs
570578
)
571579
pipeline_configuration.blob_upload = True # Blob Upload is a feature on Device Clients
580+
pipeline_configuration.ensure_desired_properties = True
572581

573582
# Pipeline setup
574583
http_pipeline = pipeline.HTTPPipeline(pipeline_configuration)
@@ -611,6 +620,8 @@ def create_from_symmetric_key(cls, symmetric_key, hostname, device_id, **kwargs)
611620
:param bool connection_retry: Attempt to re-establish a dropped connection (Default: True)
612621
:param int connection_retry_interval: Interval, in seconds, between attempts to
613622
re-establish a dropped connection (Default: 10)
623+
:param bool ensure_desired_properties: Ensure the most recent desired properties patch has
624+
been received upon re-connections (Default:True)
614625
615626
:raises: TypeError if given an unsupported parameter.
616627
:raises: ValueError if the provided parameters are invalid.
@@ -637,6 +648,7 @@ def create_from_symmetric_key(cls, symmetric_key, hostname, device_id, **kwargs)
637648
device_id=device_id, hostname=hostname, sastoken=sastoken, **config_kwargs
638649
)
639650
pipeline_configuration.blob_upload = True # Blob Upload is a feature on Device Clients
651+
pipeline_configuration.ensure_desired_properties = True
640652

641653
# Pipeline setup
642654
http_pipeline = pipeline.HTTPPipeline(pipeline_configuration)
@@ -799,6 +811,8 @@ def create_from_edge_environment(cls, **kwargs):
799811
server_verification_cert=server_verification_cert,
800812
**config_kwargs
801813
)
814+
pipeline_configuration.ensure_desired_properties = True
815+
802816
pipeline_configuration.method_invoke = (
803817
True # Method Invoke is allowed on modules created from edge environment
804818
)
@@ -847,6 +861,8 @@ def create_from_x509_certificate(cls, x509, hostname, device_id, module_id, **kw
847861
:param bool connection_retry: Attempt to re-establish a dropped connection (Default: True)
848862
:param int connection_retry_interval: Interval, in seconds, between attempts to
849863
re-establish a dropped connection (Default: 10)
864+
:param bool ensure_desired_properties: Ensure the most recent desired properties patch has
865+
been received upon re-connections (Default:True)
850866
851867
:raises: TypeError if given an unsupported parameter.
852868
@@ -861,6 +877,7 @@ def create_from_x509_certificate(cls, x509, hostname, device_id, module_id, **kw
861877
pipeline_configuration = pipeline.IoTHubPipelineConfig(
862878
device_id=device_id, module_id=module_id, hostname=hostname, x509=x509, **config_kwargs
863879
)
880+
pipeline_configuration.ensure_desired_properties = True
864881

865882
# Pipeline setup
866883
http_pipeline = pipeline.HTTPPipeline(pipeline_configuration)

azure-iot-device/azure/iot/device/iothub/pipeline/config.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,15 @@
1313
class IoTHubPipelineConfig(BasePipelineConfig):
1414
"""A class for storing all configurations/options for IoTHub clients in the Azure IoT Python Device Client Library."""
1515

16-
def __init__(self, hostname, device_id, module_id=None, product_info="", **kwargs):
16+
def __init__(
17+
self,
18+
hostname,
19+
device_id,
20+
module_id=None,
21+
product_info="",
22+
ensure_desired_properties=True,
23+
**kwargs
24+
):
1725
"""Initializer for IoTHubPipelineConfig which passes all unrecognized keyword-args down to BasePipelineConfig
1826
to be evaluated. This stacked options setting is to allow for unique configuration options to exist between the
1927
multiple clients, while maintaining a base configuration class with shared config options.
@@ -22,6 +30,8 @@ def __init__(self, hostname, device_id, module_id=None, product_info="", **kwarg
2230
:param str device_id: The device identity being used with the IoTHub
2331
:param str module_id: The module identity being used with the IoTHub
2432
:param str product_info: A custom identification string for the type of device connecting to Azure IoT Hub.
33+
:param bool ensure_desired_properties: Indicates if twin_patches should ensure the most
34+
recent desired properties patch has been received upon re-connections
2535
"""
2636
super().__init__(hostname=hostname, **kwargs)
2737

@@ -32,6 +42,9 @@ def __init__(self, hostname, device_id, module_id=None, product_info="", **kwarg
3242
# Product Info
3343
self.product_info = product_info
3444

45+
# Stage Behavior
46+
self.ensure_desired_properties = ensure_desired_properties
47+
3548
# Now, the parameters below are not exposed to the user via kwargs. They need to be set by manipulating the IoTHubPipelineConfig object.
3649
# They are not in the BasePipelineConfig because these do not apply to the provisioning client.
3750
self.blob_upload = False

azure-iot-device/azure/iot/device/iothub/pipeline/pipeline_stages_iothub.py

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,19 @@ def __init__(self):
3434

3535
@pipeline_thread.runs_on_pipeline_thread
3636
def _run_op(self, op):
37-
if isinstance(op, pipeline_ops_base.EnableFeatureOperation):
38-
# If we're enabling twin patches, we set last_version_seen to -1
39-
# as a way of enabling this functionality. If the ConnectedEvent handler
40-
# sees this -1, it will send a GetTwinOperation to refresh desired properties.
41-
42-
if op.feature_name == constant.TWIN_PATCHES:
43-
logger.debug(
44-
"{}: enabling twin patches. setting last_version_seen".format(self.name)
45-
)
46-
self.last_version_seen = -1
37+
if self.nucleus.pipeline_configuration.ensure_desired_properties:
38+
if isinstance(op, pipeline_ops_base.EnableFeatureOperation):
39+
# Ensure_desired_properties enables twin patches, when true, by setting last version
40+
# to -1. The ConnectedEvent handler sees this and sends a GetTwinOperation to refresh
41+
# desired properties. Setting ensure_desired_properties to false causes the GetTwinOp
42+
# to not be sent. The rest of the functions in this stage stem from the GetTwinOperation,
43+
# so disabling ensure_desired_properties effectively disables this stage.
44+
45+
if op.feature_name == constant.TWIN_PATCHES:
46+
logger.debug(
47+
"{}: enabling twin patches. setting last_version_seen".format(self.name)
48+
)
49+
self.last_version_seen = -1
4750
self.send_op_down(op)
4851

4952
@pipeline_thread.runs_on_pipeline_thread
@@ -102,20 +105,21 @@ def _on_get_twin_complete(self, op, error):
102105

103106
@pipeline_thread.runs_on_pipeline_thread
104107
def _handle_pipeline_event(self, event):
105-
if isinstance(event, pipeline_events_iothub.TwinDesiredPropertiesPatchEvent):
106-
# remember the $version when we get a patch.
107-
version = event.patch["$version"]
108-
logger.debug(
109-
"{}: Desired patch received. Saving $version={}".format(self.name, version)
110-
)
111-
self.last_version_seen = version
112-
elif isinstance(event, pipeline_events_base.ConnectedEvent):
113-
# If last_version_seen is truthy, that means we've seen desired property patches
114-
# before (or we've enabled them at least). If this is the case, get the twin to
115-
# see if the desired props have been updated.
116-
if self.last_version_seen:
117-
logger.info("{}: Reconnected. Getting twin".format(self.name))
118-
self._ensure_get_op()
108+
if self.nucleus.pipeline_configuration.ensure_desired_properties:
109+
if isinstance(event, pipeline_events_iothub.TwinDesiredPropertiesPatchEvent):
110+
# remember the $version when we get a patch.
111+
version = event.patch["$version"]
112+
logger.debug(
113+
"{}: Desired patch received. Saving $version={}".format(self.name, version)
114+
)
115+
self.last_version_seen = version
116+
elif isinstance(event, pipeline_events_base.ConnectedEvent):
117+
# If last_version_seen is truthy, that means we've seen desired property patches
118+
# before (or we've enabled them at least). If this is the case, get the twin to
119+
# see if the desired props have been updated.
120+
if self.last_version_seen:
121+
logger.info("{}: Reconnected. Getting twin".format(self.name))
122+
self._ensure_get_op()
119123
self.send_event_up(event)
120124

121125

0 commit comments

Comments
 (0)