diff --git a/Pipfile b/Pipfile index 0f921336..3de0d303 100644 --- a/Pipfile +++ b/Pipfile @@ -6,10 +6,6 @@ name = "pypi" [packages] bitmath = "~=1.3" certifi = "==2023.7.22" -# The latest connexion is 2.14.2 which requires Flask < 2.3. -# So the latest Flask we can install is 2.2.5. (If you install 2.3.0 you'll -# get `AttributeError: module 'flask.json' has no attribute 'JSONEncoder'` -# b/c Flask 2.3.0 removed JSONEncoder. "connexion[swagger-ui]" = "~=2.14" click = "~=8.1" crate = "~=0.22" @@ -28,9 +24,9 @@ redis = "~=4.6" requests = "~=2.31" rq = "~=1.8" geopy = "~=2.2.0" +flask-mqtt = "*" [dev-packages] -# run `pipenv install --dev` to get the packages below in your env aiohttp = "~=3.8" backoff = "~=1.1" matplotlib = "~=3.3" diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 12d1947a..026c4c39 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -7,6 +7,7 @@ - Added escaping in crate-exporter.py (#702) - Simplified docker compose files management (#598) - Fixed Read the Docs deprecation (#731) +- Added support for MQTT notification mechanism (#609) ### Bug fixes diff --git a/docs/manuals/user/using.md b/docs/manuals/user/using.md index 6b1456a9..59887171 100644 --- a/docs/manuals/user/using.md +++ b/docs/manuals/user/using.md @@ -474,3 +474,38 @@ with a specific prefix. This way, if you insert an entity of type table as `mtmagic.etroom`. This information is also useful for example if you are configuring the Grafana datasource, as explained in the [Grafana section](../admin/grafana.md) of the docs. + +## MQTT Notification +Apart from HTTP notifications, QunatumLeap is able to handle notification using MQTT. +In this case, a MQTT message is received in a given MQTT client specified at subscription +time each time a notification is triggered. + +From an operational point of view, MQTT subscriptions are like HTTP ones, as +described in [Orion Subscription](#orion-subscription) section of the documentation and in +the Orion API specification (e.g. the notification payload is the same, you can set an +expiration date, a filtering expression, etc.) but they use `mqtt` +instead of `http` in the `notification` object. + +``` +... +"notification": { + "mqtt": { + "url": "mqtt://quantumleap:1883", + "topic": "/ql/mqtt" + } +} +... +``` + +The following elements can be used within `mqtt`: + +* `url` to specify the MQTT broker endpoint to use. URL must start with `mqtt://` and never contains + a path (i.e. it only includes host and port) +* `topic` to specify the MQTT topic to use. Here `/ql/mqtt` is configured. +* `qos`: to specify the MQTT QoS value to use in the notifications associated to the subscription + (0, 1 or 2). This is an optional field, if omitted then QoS 0 is used. +* `retain`: to specify the MQTT retain value to use in the notifications associated to the subscription + (`true` or `false`). This is an optional field, if omitted then retain `false` is used. +* `user` and `passwd`: optional fields, to be used in the case MQTT broker needs user/password based + authentication. If used, both fields have to be used together. Note that for security reasons, + the password is always offuscated when retrieving subscription information (e.g. `GET /v2/subscriptions`). diff --git a/src/server/__init__.py b/src/server/__init__.py index 470fa983..bba2a1cc 100644 --- a/src/server/__init__.py +++ b/src/server/__init__.py @@ -1,3 +1,10 @@ DEFAULT_HOST = '0.0.0.0' # bind to all available network interfaces DEFAULT_PORT = 8668 +MQTT_HOST = 'localhost' +MQTT_PORT = 1883 +MQTT_USERNAME = '' +MQTT_PASSWORD = '' +MQTT_KEEPALIVE = 60 +MQTT_TLS_ENABLED = False +MQTT_TOPIC = '/ql/mqtt' diff --git a/src/server/mqtt_client.py b/src/server/mqtt_client.py new file mode 100644 index 00000000..f3998c33 --- /dev/null +++ b/src/server/mqtt_client.py @@ -0,0 +1,52 @@ +import logging +from utils.cfgreader import EnvReader, BoolVar +from flask_mqtt import Mqtt +import json +import requests + +class MqttConfig: + def __init__(self): + pass + + def if_mqtt_enabled(self) -> bool: + env_var = BoolVar('USE_MQTT', False) + return EnvReader().safe_read(env_var) + +def run_if_enabled(application, host, port, username, password, keepalive, tls, topic, ql_host, ql_port): + application.config['MQTT_BROKER_URL'] = host + application.config['MQTT_BROKER_PORT'] = port + application.config['MQTT_USERNAME'] = username + application.config['MQTT_PASSWORD'] = password + application.config['MQTT_KEEPALIVE'] = keepalive + application.config['MQTT_TLS_ENABLED'] = tls + topic = topic + + mqtt_client = Mqtt(application) + + logger = logging.getLogger(__name__) + logger.setLevel(logging.INFO) + + @mqtt_client.on_connect() + def handle_connect(client, userdata, flags, rc): + if rc == 0: + logger.info('MQTT Connected successfully') + mqtt_client.subscribe(topic) # subscribe topic + else: + logger.info('Bad connection. Code:', rc) + + @mqtt_client.on_message() + def handle_mqtt_message(client, userdata, message): + data = dict( + topic=message.topic, + payload=message.payload.decode() + ) + logger.debug('Received message on topic: {topic} with payload: {payload}'.format(**data)) + try: + payload = json.loads(message.payload) + except ValueError: + payload = None + + if payload: + url = f'http://{ql_host}:{ql_port}/v2/notify' + headers = {'Content-type': 'application/json', 'Accept': 'text/plain'} + r = requests.post(url, data=json.dumps(payload), headers=headers) diff --git a/src/server/wsgi.py b/src/server/wsgi.py index fb2a4f92..bab29f08 100644 --- a/src/server/wsgi.py +++ b/src/server/wsgi.py @@ -1,6 +1,9 @@ from connexion import FlaskApp import logging import server +from server.mqtt_client import MqttConfig, run_if_enabled +from utils.cfgreader import EnvReader, BoolVar +from flask.logging import default_handler SPEC_DIR = '../../specification/' @@ -33,6 +36,12 @@ def new_wrapper() -> FlaskApp: """ application = quantumleap.app + +mqttConfig = MqttConfig() +if mqttConfig.if_mqtt_enabled(): + run_if_enabled(application, server.MQTT_HOST, server.MQTT_PORT, server.MQTT_USERNAME, server.MQTT_PASSWORD, server.MQTT_KEEPALIVE, server.MQTT_TLS_ENABLED, server.MQTT_TOPIC, server.DEFAULT_HOST, server.DEFAULT_PORT) + + """ The WSGI callable to run QuantumLeap in a WSGI container of your choice, e.g. Gunicorn, uWSGI.