From faf16e974d22f0c6f972ea7e38d23494d7236987 Mon Sep 17 00:00:00 2001 From: pooja1pathak Date: Tue, 20 Aug 2024 07:51:39 +0000 Subject: [PATCH 1/5] fix609 --- src/server/wsgi.py | 48 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/src/server/wsgi.py b/src/server/wsgi.py index fb2a4f92..c433cf28 100644 --- a/src/server/wsgi.py +++ b/src/server/wsgi.py @@ -1,6 +1,8 @@ from connexion import FlaskApp import logging import server +from utils.cfgreader import EnvReader, BoolVar +from flask.logging import default_handler SPEC_DIR = '../../specification/' @@ -33,6 +35,52 @@ def new_wrapper() -> FlaskApp: """ application = quantumleap.app + +def use_mqtt() -> bool: + env_var = BoolVar('USE_MQTT', False) + print(EnvReader().safe_read(env_var)) + return EnvReader().safe_read(env_var) + +if use_mqtt(): + application.config['MQTT_BROKER_URL'] = 'localhost' + application.config['MQTT_BROKER_PORT'] = 1883 + application.config['MQTT_USERNAME'] = '' # Set this item when you need to verify username and password + application.config['MQTT_PASSWORD'] = '' # Set this item when you need to verify username and password + application.config['MQTT_KEEPALIVE'] = 60 # Set KeepAlive time in seconds + application.config['MQTT_TLS_ENABLED'] = False # If your broker supports TLS, set it True + topic = '/ql/mqtt' + + mqtt_client = Mqtt(application) + + logger = logging.getLogger(__name__) + logger.setLevel(logging.INFO) + + @mqtt_client.on_connect() + def handle_connect(client, userdata, flags, rc): + if rc == 0: + logger.info('MQTT Connected successfully') + mqtt_client.subscribe(topic) # subscribe topic + else: + logger.info('Bad connection. Code:', rc) + + @mqtt_client.on_message() + def handle_mqtt_message(client, userdata, message): + data = dict( + topic=message.topic, + payload=message.payload.decode() + ) + logger.debug('Received message on topic: {topic} with payload: {payload}'.format(**data)) + try: + payload = json.loads(message.payload) + except ValueError: + payload = None + + if payload: + url = "http://localhost:8668/v2/notify" + headers = {'Content-type': 'application/json', 'Accept': 'text/plain'} + r = requests.post(url, data=json.dumps(payload), headers=headers) + + """ The WSGI callable to run QuantumLeap in a WSGI container of your choice, e.g. Gunicorn, uWSGI. From a3974a5ff15d7e91a9b799e3f83af563d2b6c530 Mon Sep 17 00:00:00 2001 From: Pooja Pathak <33315652+pooja1pathak@users.noreply.github.com> Date: Tue, 20 Aug 2024 13:55:32 +0530 Subject: [PATCH 2/5] Updated using.md --- docs/manuals/user/using.md | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/docs/manuals/user/using.md b/docs/manuals/user/using.md index 6b1456a9..59887171 100644 --- a/docs/manuals/user/using.md +++ b/docs/manuals/user/using.md @@ -474,3 +474,38 @@ with a specific prefix. This way, if you insert an entity of type table as `mtmagic.etroom`. This information is also useful for example if you are configuring the Grafana datasource, as explained in the [Grafana section](../admin/grafana.md) of the docs. + +## MQTT Notification +Apart from HTTP notifications, QunatumLeap is able to handle notification using MQTT. +In this case, a MQTT message is received in a given MQTT client specified at subscription +time each time a notification is triggered. + +From an operational point of view, MQTT subscriptions are like HTTP ones, as +described in [Orion Subscription](#orion-subscription) section of the documentation and in +the Orion API specification (e.g. the notification payload is the same, you can set an +expiration date, a filtering expression, etc.) but they use `mqtt` +instead of `http` in the `notification` object. + +``` +... +"notification": { + "mqtt": { + "url": "mqtt://quantumleap:1883", + "topic": "/ql/mqtt" + } +} +... +``` + +The following elements can be used within `mqtt`: + +* `url` to specify the MQTT broker endpoint to use. URL must start with `mqtt://` and never contains + a path (i.e. it only includes host and port) +* `topic` to specify the MQTT topic to use. Here `/ql/mqtt` is configured. +* `qos`: to specify the MQTT QoS value to use in the notifications associated to the subscription + (0, 1 or 2). This is an optional field, if omitted then QoS 0 is used. +* `retain`: to specify the MQTT retain value to use in the notifications associated to the subscription + (`true` or `false`). This is an optional field, if omitted then retain `false` is used. +* `user` and `passwd`: optional fields, to be used in the case MQTT broker needs user/password based + authentication. If used, both fields have to be used together. Note that for security reasons, + the password is always offuscated when retrieving subscription information (e.g. `GET /v2/subscriptions`). From a0c0be73ff4761b4e9ced36eb1682318e9cf61ed Mon Sep 17 00:00:00 2001 From: pooja1pathak Date: Tue, 20 Aug 2024 08:37:07 +0000 Subject: [PATCH 3/5] fix609 --- src/server/__init__.py | 7 +++++++ src/server/wsgi.py | 16 ++++++++-------- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/src/server/__init__.py b/src/server/__init__.py index 470fa983..bba2a1cc 100644 --- a/src/server/__init__.py +++ b/src/server/__init__.py @@ -1,3 +1,10 @@ DEFAULT_HOST = '0.0.0.0' # bind to all available network interfaces DEFAULT_PORT = 8668 +MQTT_HOST = 'localhost' +MQTT_PORT = 1883 +MQTT_USERNAME = '' +MQTT_PASSWORD = '' +MQTT_KEEPALIVE = 60 +MQTT_TLS_ENABLED = False +MQTT_TOPIC = '/ql/mqtt' diff --git a/src/server/wsgi.py b/src/server/wsgi.py index c433cf28..64af6d61 100644 --- a/src/server/wsgi.py +++ b/src/server/wsgi.py @@ -42,13 +42,13 @@ def use_mqtt() -> bool: return EnvReader().safe_read(env_var) if use_mqtt(): - application.config['MQTT_BROKER_URL'] = 'localhost' - application.config['MQTT_BROKER_PORT'] = 1883 - application.config['MQTT_USERNAME'] = '' # Set this item when you need to verify username and password - application.config['MQTT_PASSWORD'] = '' # Set this item when you need to verify username and password - application.config['MQTT_KEEPALIVE'] = 60 # Set KeepAlive time in seconds - application.config['MQTT_TLS_ENABLED'] = False # If your broker supports TLS, set it True - topic = '/ql/mqtt' + application.config['MQTT_BROKER_URL'] = server.MQTT_HOST + application.config['MQTT_BROKER_PORT'] = server.MQTT_PORT + application.config['MQTT_USERNAME'] = server.MQTT_USERNAME + application.config['MQTT_PASSWORD'] = server.MQTT_PASSWORD + application.config['MQTT_KEEPALIVE'] = server.MQTT_KEEPALIVE + application.config['MQTT_TLS_ENABLED'] = server.MQTT_TLS_ENABLED + topic = server.MQTT_TOPIC mqtt_client = Mqtt(application) @@ -76,7 +76,7 @@ def handle_mqtt_message(client, userdata, message): payload = None if payload: - url = "http://localhost:8668/v2/notify" + url = f'http://{server.DEFAULT_HOST}:{server.DEFAULT_PORT}/v2/notify' headers = {'Content-type': 'application/json', 'Accept': 'text/plain'} r = requests.post(url, data=json.dumps(payload), headers=headers) From 10d4dfb9762f74d0be8d5148df560911b877bd64 Mon Sep 17 00:00:00 2001 From: pooja1pathak Date: Tue, 20 Aug 2024 08:40:05 +0000 Subject: [PATCH 4/5] fix609 --- RELEASE_NOTES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index c0da0509..8c63022d 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -6,6 +6,7 @@ - Added escaping in crate-exporter.py (#702) - Simplified docker compose files management (#598) +- Added support for MQTT notification mechanism (#609) ### Bug fixes From 715ad538b3d82947b8cc9454a3afcdfbe3e06c54 Mon Sep 17 00:00:00 2001 From: pooja1pathak Date: Fri, 30 Aug 2024 04:40:11 +0000 Subject: [PATCH 5/5] updated as per comment --- Pipfile | 6 +---- src/server/mqtt_client.py | 52 +++++++++++++++++++++++++++++++++++++++ src/server/wsgi.py | 47 +++-------------------------------- 3 files changed, 57 insertions(+), 48 deletions(-) create mode 100644 src/server/mqtt_client.py diff --git a/Pipfile b/Pipfile index 0f921336..3de0d303 100644 --- a/Pipfile +++ b/Pipfile @@ -6,10 +6,6 @@ name = "pypi" [packages] bitmath = "~=1.3" certifi = "==2023.7.22" -# The latest connexion is 2.14.2 which requires Flask < 2.3. -# So the latest Flask we can install is 2.2.5. (If you install 2.3.0 you'll -# get `AttributeError: module 'flask.json' has no attribute 'JSONEncoder'` -# b/c Flask 2.3.0 removed JSONEncoder. "connexion[swagger-ui]" = "~=2.14" click = "~=8.1" crate = "~=0.22" @@ -28,9 +24,9 @@ redis = "~=4.6" requests = "~=2.31" rq = "~=1.8" geopy = "~=2.2.0" +flask-mqtt = "*" [dev-packages] -# run `pipenv install --dev` to get the packages below in your env aiohttp = "~=3.8" backoff = "~=1.1" matplotlib = "~=3.3" diff --git a/src/server/mqtt_client.py b/src/server/mqtt_client.py new file mode 100644 index 00000000..f3998c33 --- /dev/null +++ b/src/server/mqtt_client.py @@ -0,0 +1,52 @@ +import logging +from utils.cfgreader import EnvReader, BoolVar +from flask_mqtt import Mqtt +import json +import requests + +class MqttConfig: + def __init__(self): + pass + + def if_mqtt_enabled(self) -> bool: + env_var = BoolVar('USE_MQTT', False) + return EnvReader().safe_read(env_var) + +def run_if_enabled(application, host, port, username, password, keepalive, tls, topic, ql_host, ql_port): + application.config['MQTT_BROKER_URL'] = host + application.config['MQTT_BROKER_PORT'] = port + application.config['MQTT_USERNAME'] = username + application.config['MQTT_PASSWORD'] = password + application.config['MQTT_KEEPALIVE'] = keepalive + application.config['MQTT_TLS_ENABLED'] = tls + topic = topic + + mqtt_client = Mqtt(application) + + logger = logging.getLogger(__name__) + logger.setLevel(logging.INFO) + + @mqtt_client.on_connect() + def handle_connect(client, userdata, flags, rc): + if rc == 0: + logger.info('MQTT Connected successfully') + mqtt_client.subscribe(topic) # subscribe topic + else: + logger.info('Bad connection. Code:', rc) + + @mqtt_client.on_message() + def handle_mqtt_message(client, userdata, message): + data = dict( + topic=message.topic, + payload=message.payload.decode() + ) + logger.debug('Received message on topic: {topic} with payload: {payload}'.format(**data)) + try: + payload = json.loads(message.payload) + except ValueError: + payload = None + + if payload: + url = f'http://{ql_host}:{ql_port}/v2/notify' + headers = {'Content-type': 'application/json', 'Accept': 'text/plain'} + r = requests.post(url, data=json.dumps(payload), headers=headers) diff --git a/src/server/wsgi.py b/src/server/wsgi.py index 64af6d61..bab29f08 100644 --- a/src/server/wsgi.py +++ b/src/server/wsgi.py @@ -1,6 +1,7 @@ from connexion import FlaskApp import logging import server +from server.mqtt_client import MqttConfig, run_if_enabled from utils.cfgreader import EnvReader, BoolVar from flask.logging import default_handler @@ -36,49 +37,9 @@ def new_wrapper() -> FlaskApp: application = quantumleap.app -def use_mqtt() -> bool: - env_var = BoolVar('USE_MQTT', False) - print(EnvReader().safe_read(env_var)) - return EnvReader().safe_read(env_var) - -if use_mqtt(): - application.config['MQTT_BROKER_URL'] = server.MQTT_HOST - application.config['MQTT_BROKER_PORT'] = server.MQTT_PORT - application.config['MQTT_USERNAME'] = server.MQTT_USERNAME - application.config['MQTT_PASSWORD'] = server.MQTT_PASSWORD - application.config['MQTT_KEEPALIVE'] = server.MQTT_KEEPALIVE - application.config['MQTT_TLS_ENABLED'] = server.MQTT_TLS_ENABLED - topic = server.MQTT_TOPIC - - mqtt_client = Mqtt(application) - - logger = logging.getLogger(__name__) - logger.setLevel(logging.INFO) - - @mqtt_client.on_connect() - def handle_connect(client, userdata, flags, rc): - if rc == 0: - logger.info('MQTT Connected successfully') - mqtt_client.subscribe(topic) # subscribe topic - else: - logger.info('Bad connection. Code:', rc) - - @mqtt_client.on_message() - def handle_mqtt_message(client, userdata, message): - data = dict( - topic=message.topic, - payload=message.payload.decode() - ) - logger.debug('Received message on topic: {topic} with payload: {payload}'.format(**data)) - try: - payload = json.loads(message.payload) - except ValueError: - payload = None - - if payload: - url = f'http://{server.DEFAULT_HOST}:{server.DEFAULT_PORT}/v2/notify' - headers = {'Content-type': 'application/json', 'Accept': 'text/plain'} - r = requests.post(url, data=json.dumps(payload), headers=headers) +mqttConfig = MqttConfig() +if mqttConfig.if_mqtt_enabled(): + run_if_enabled(application, server.MQTT_HOST, server.MQTT_PORT, server.MQTT_USERNAME, server.MQTT_PASSWORD, server.MQTT_KEEPALIVE, server.MQTT_TLS_ENABLED, server.MQTT_TOPIC, server.DEFAULT_HOST, server.DEFAULT_PORT) """