Skip to content

Commit 5e3ffb0

Browse files
authored
Implement network statsbeat - success count (Azure#25752)
1 parent da5bd7f commit 5e3ffb0

File tree

12 files changed

+546
-9
lines changed

12 files changed

+546
-9
lines changed

sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44

55
### Features Added
66

7+
- Implement success count network statsbeat
8+
([#25752](https://github.com/Azure/azure-sdk-for-python/pull/25752))
9+
710
### Breaking Changes
811

912
### Bugs Fixed

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,13 @@
1414
from azure.monitor.opentelemetry.exporter._generated.models import TelemetryItem
1515
from azure.monitor.opentelemetry.exporter._connection_string_parser import ConnectionStringParser
1616
from azure.monitor.opentelemetry.exporter._storage import LocalFileStorage
17-
17+
from azure.monitor.opentelemetry.exporter.statsbeat._state import (
18+
_REQ_SUCCESS_NAME,
19+
_REQUESTS_LOCK,
20+
_REQUESTS_MAP,
21+
get_statsbeat_shutdown,
22+
is_statsbeat_enabled,
23+
)
1824

1925
logger = logging.getLogger(__name__)
2026

@@ -44,6 +50,7 @@ def __init__(self, **kwargs: Any) -> None:
4450
self._api_version = kwargs.get('api_version') or _SERVICE_API_LATEST
4551
self._consecutive_redirects = 0 # To prevent circular redirects
4652
self._enable_local_storage = kwargs.get('enable_local_storage', True)
53+
self._endpoint = parsed_connection_string.endpoint
4754
self._instrumentation_key = parsed_connection_string.instrumentation_key
4855
self._storage_maintenance_period = kwargs.get('storage_maintenance_period', 60) # Maintenance interval in seconds.
4956
self._storage_max_size = kwargs.get('storage_max_size', 50 * 1024 * 1024) # Maximum size in bytes (default 50MiB)
@@ -56,8 +63,7 @@ def __init__(self, **kwargs: Any) -> None:
5663
self._storage_retention_period = kwargs.get('storage_retention_period', 7 * 24 * 60 * 60) # Retention period in seconds
5764
self._timeout = kwargs.get('timeout', 10.0) # networking timeout in seconds
5865

59-
config = AzureMonitorClientConfiguration(
60-
parsed_connection_string.endpoint, **kwargs)
66+
config = AzureMonitorClientConfiguration(self._endpoint, **kwargs)
6167
policies = [
6268
RequestIdPolicy(**kwargs),
6369
config.headers_policy,
@@ -86,6 +92,11 @@ def __init__(self, **kwargs: Any) -> None:
8692
name="{} Storage".format(self.__class__.__name__),
8793
lease_period=self._storage_min_retry_interval,
8894
)
95+
# statsbeat initialization
96+
if self._should_collect_stats():
97+
# Import here to avoid circular dependencies
98+
from azure.monitor.opentelemetry.exporter.statsbeat._statsbeat import collect_statsbeat_metrics
99+
collect_statsbeat_metrics(self)
89100

90101
def _transmit_from_storage(self) -> None:
91102
for blob in self.storage.gets():
@@ -121,13 +132,15 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
121132
if len(envelopes) > 0:
122133
try:
123134
track_response = self.client.track(envelopes)
124-
if not track_response.errors:
135+
if not track_response.errors: # 200
125136
self._consecutive_redirects = 0
137+
if self._should_collect_stats():
138+
_update_requests_map(_REQ_SUCCESS_NAME[1])
126139
logger.info("Transmission succeeded: Item received: %s. Items accepted: %s",
127140
track_response.items_received, track_response.items_accepted)
128141
return ExportResult.SUCCESS
129142
resend_envelopes = []
130-
for error in track_response.errors:
143+
for error in track_response.errors: # 206
131144
if _is_retryable_code(error.status_code):
132145
resend_envelopes.append(
133146
envelopes[error.index]
@@ -147,6 +160,7 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
147160
return ExportResult.FAILED_RETRYABLE
148161
return ExportResult.FAILED_NOT_RETRYABLE
149162
except HttpResponseError as response_error:
163+
# HttpResponseError is raised when a response is received
150164
if _is_retryable_code(response_error.status_code):
151165
return ExportResult.FAILED_RETRYABLE
152166
if _is_redirect_code(response_error.status_code):
@@ -174,6 +188,7 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
174188
except ServiceRequestError as request_error:
175189
# Errors when we're fairly sure that the server did not receive the
176190
# request, so it should be safe to retry.
191+
# ServiceRequestError is raised by azure.core for these cases
177192
logger.warning(
178193
"Retrying due to server request error: %s.", request_error
179194
)
@@ -187,6 +202,12 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
187202
self._consecutive_redirects = 0
188203
return ExportResult.SUCCESS
189204

205+
# check to see whether its the case of stats collection
206+
def _should_collect_stats(self):
207+
return is_statsbeat_enabled() and \
208+
not get_statsbeat_shutdown() and \
209+
self.__class__.__name__ != "_StatsBeatExporter"
210+
190211

191212
def _is_redirect_code(response_code: int) -> bool:
192213
"""
@@ -212,3 +233,8 @@ def _is_retryable_code(response_code: int) -> bool:
212233
503, # Service Unavailable
213234
504, # Gateway timeout
214235
))
236+
237+
238+
def _update_requests_map(type_name):
239+
with _REQUESTS_LOCK:
240+
_REQUESTS_MAP[type_name] = _REQUESTS_MAP.get(type_name, 0) + 1

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/metrics/_exporter.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,12 @@ def export(
8181
def force_flush(
8282
self,
8383
timeout_millis: float = 10_000,
84-
) -> None:
84+
) -> bool:
8585
"""
8686
Ensure that export of any metrics currently received by the exporter
8787
are completed as soon as possible.
8888
"""
89-
# TODO
89+
return True
9090

9191
def shutdown(
9292
self,

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/__init__.py

Whitespace-only changes.
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# Copyright (c) Microsoft Corporation. All rights reserved.
2+
# Licensed under the MIT License.
3+
import logging
4+
5+
from typing import Optional
6+
from opentelemetry.sdk.metrics.export import DataPointT
7+
from opentelemetry.sdk.resources import Resource
8+
from opentelemetry.sdk.util.instrumentation import InstrumentationScope
9+
10+
from azure.monitor.opentelemetry.exporter._generated.models import TelemetryItem
11+
from azure.monitor.opentelemetry.exporter import AzureMonitorMetricExporter
12+
from azure.monitor.opentelemetry.exporter.statsbeat._state import _STATSBEAT_METRIC_NAME_MAPPINGS
13+
14+
_logger = logging.getLogger(__name__)
15+
16+
17+
class _StatsBeatExporter(AzureMonitorMetricExporter):
18+
19+
# pylint: disable=protected-access
20+
def _point_to_envelope(
21+
self,
22+
point: DataPointT,
23+
name: str,
24+
resource: Optional[Resource] = None,
25+
scope: Optional[InstrumentationScope] = None
26+
) -> TelemetryItem:
27+
# map statsbeat name from OpenTelemetry name
28+
name = _STATSBEAT_METRIC_NAME_MAPPINGS.get(name, "")
29+
return super()._point_to_envelope(
30+
point,
31+
name,
32+
resource,
33+
scope,
34+
)
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# Copyright (c) Microsoft Corporation. All rights reserved.
2+
# Licensed under the MIT License.
3+
import os
4+
import threading
5+
6+
_REQUESTS_LOCK = threading.Lock()
7+
_REQUESTS_MAP = {}
8+
9+
_STATSBEAT_STATE = {
10+
"INITIAL_FAILURE_COUNT": 0,
11+
"INITIAL_SUCCESS": False,
12+
"SHUTDOWN": False,
13+
}
14+
_STATSBEAT_STATE_LOCK = threading.Lock()
15+
16+
# (OpenTelemetry metric name, Statsbeat metric name)
17+
_ATTACH_METRIC_NAME = "Attach"
18+
_FEATURE_METRIC_NAME = "Feature"
19+
_REQ_SUCCESS_NAME = ("statsbeat_success_count", "Request Success Count")
20+
_REQ_FAILURE_NAME = "Request Failure Count"
21+
_REQ_DURATION_NAME = "Request Duration"
22+
_REQ_RETRY_NAME = "Retry Count"
23+
_REQ_THROTTLE_NAME = "Throttle Count"
24+
_REQ_EXCEPTION_NAME = "Exception Count"
25+
26+
_STATSBEAT_METRIC_NAME_MAPPINGS = dict(
27+
[
28+
_REQ_SUCCESS_NAME,
29+
]
30+
)
31+
32+
33+
def is_statsbeat_enabled():
34+
return not os.environ.get("APPLICATIONINSIGHTS_STATSBEAT_DISABLED_ALL")
35+
36+
37+
def increment_statsbeat_initial_failure_count():
38+
with _STATSBEAT_STATE_LOCK:
39+
_STATSBEAT_STATE["INITIAL_FAILURE_COUNT"] += 1
40+
41+
42+
def get_statsbeat_initial_failure_count():
43+
return _STATSBEAT_STATE["INITIAL_FAILURE_COUNT"]
44+
45+
46+
def set_statsbeat_initial_success(success):
47+
with _STATSBEAT_STATE_LOCK:
48+
_STATSBEAT_STATE["INITIAL_SUCCESS"] = success
49+
50+
51+
def get_statsbeat_initial_success():
52+
return _STATSBEAT_STATE["INITIAL_SUCCESS"]
53+
54+
55+
def get_statsbeat_shutdown():
56+
return _STATSBEAT_STATE["SHUTDOWN"]
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
# Copyright (c) Microsoft Corporation. All rights reserved.
2+
# Licensed under the MIT License.
3+
import os
4+
import threading
5+
6+
from opentelemetry.sdk.metrics import MeterProvider
7+
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
8+
9+
from azure.monitor.opentelemetry.exporter.statsbeat._exporter import _StatsBeatExporter
10+
from azure.monitor.opentelemetry.exporter.statsbeat._statsbeat_metrics import _StatsbeatMetrics
11+
12+
# pylint: disable=line-too-long
13+
_DEFAULT_NON_EU_STATS_CONNECTION_STRING = "InstrumentationKey=c4a29126-a7cb-47e5-b348-11414998b11e;IngestionEndpoint=https://westus-0.in.applicationinsights.azure.com/"
14+
_DEFAULT_EU_STATS_CONNECTION_STRING = "InstrumentationKey=7dc56bab-3c0c-4e9f-9ebb-d1acadee8d0f;IngestionEndpoint=https://westeurope-5.in.applicationinsights.azure.com/"
15+
_DEFAULT_STATS_SHORT_EXPORT_INTERVAL = 900 # 15 minutes
16+
_DEFAULT_STATS_LONG_EXPORT_INTERVAL = 86400 # 24 hours
17+
_EU_ENDPOINTS = [
18+
"westeurope",
19+
"northeurope",
20+
"francecentral",
21+
"francesouth",
22+
"germanywestcentral",
23+
"norwayeast",
24+
"norwaywest",
25+
"swedencentral",
26+
"switzerlandnorth",
27+
"switzerlandwest",
28+
]
29+
30+
_STATSBEAT_METER_PROVIDER = None
31+
_STATSBEAT_LOCK = threading.Lock()
32+
33+
# pylint: disable=global-statement
34+
# pylint: disable=protected-access
35+
def collect_statsbeat_metrics(exporter) -> None:
36+
global _STATSBEAT_METER_PROVIDER
37+
# Only start statsbeat if did not exist before
38+
if _STATSBEAT_METER_PROVIDER is None:
39+
with _STATSBEAT_LOCK:
40+
statsbeat_exporter = _StatsBeatExporter(
41+
connection_string=_get_stats_connection_string(exporter._endpoint),
42+
)
43+
reader = PeriodicExportingMetricReader(
44+
statsbeat_exporter,
45+
export_interval_millis=_get_stats_short_export_interval() * 1000, # 15m by default
46+
)
47+
_STATSBEAT_METER_PROVIDER = MeterProvider(metric_readers=[reader])
48+
_StatsbeatMetrics(
49+
_STATSBEAT_METER_PROVIDER,
50+
exporter._instrumentation_key,
51+
exporter._endpoint,
52+
)
53+
# Export some initial stats on program start
54+
# TODO: initial stats
55+
# TODO: set context
56+
# execution_context.set_is_exporter(True)
57+
# exporter.export_metrics(_STATSBEAT_METRICS.get_initial_metrics())
58+
# execution_context.set_is_exporter(False)
59+
# TODO: state
60+
# with _STATSBEAT_STATE_LOCK:
61+
# _STATSBEAT_STATE["INITIAL_FAILURE_COUNT"] = 0
62+
# _STATSBEAT_STATE["INITIAL_SUCCESS"] = 0
63+
# _STATSBEAT_STATE["SHUTDOWN"] = False
64+
65+
# TODO
66+
# def shutdown_statsbeat_metrics() -> None:
67+
# global _STATSBEAT_METER_PROVIDER
68+
# shutdown_success = False
69+
# if _STATSBEAT_METER_PROVIDER is not None:
70+
# with _STATSBEAT_LOCK:
71+
# try:
72+
# _STATSBEAT_METER_PROVIDER.shutdown()
73+
# _STATSBEAT_METER_PROVIDER = None
74+
# shutdown_success = True
75+
# except:
76+
# pass
77+
# if shutdown_success:
78+
# # with _STATSBEAT_STATE_LOCK:
79+
# # _STATSBEAT_STATE["SHUTDOWN"] = True
80+
# pass
81+
82+
83+
def _get_stats_connection_string(endpoint: str) -> str:
84+
cs_env = os.environ.get("APPLICATION_INSIGHTS_STATS_CONNECTION_STRING")
85+
if cs_env:
86+
return cs_env
87+
for endpoint_location in _EU_ENDPOINTS:
88+
if endpoint_location in endpoint:
89+
# Use statsbeat EU endpoint if user is in EU region
90+
return _DEFAULT_EU_STATS_CONNECTION_STRING
91+
return _DEFAULT_NON_EU_STATS_CONNECTION_STRING
92+
93+
94+
# seconds
95+
def _get_stats_short_export_interval() -> float:
96+
ei_env = os.environ.get("APPLICATION_INSIGHTS_STATS_SHORT_EXPORT_INTERVAL")
97+
if ei_env:
98+
return int(ei_env)
99+
return _DEFAULT_STATS_SHORT_EXPORT_INTERVAL
100+
101+
102+
# seconds
103+
def _get_stats_long_export_interval() -> float:
104+
ei_env = os.environ.get("APPLICATION_INSIGHTS_STATS_LONG_EXPORT_INTERVAL")
105+
if ei_env:
106+
return int(ei_env)
107+
return _DEFAULT_STATS_LONG_EXPORT_INTERVAL

0 commit comments

Comments
 (0)