Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

- Added escaping in crate-exporter.py (#702)
- Simplified docker compose files management (#598)
- Added support for MQTT notification mechanism (#609)

### Bug fixes

Expand Down
35 changes: 35 additions & 0 deletions docs/manuals/user/using.md
Original file line number Diff line number Diff line change
Expand Up @@ -474,3 +474,38 @@ with a specific prefix. This way, if you insert an entity of type
table as `mtmagic.etroom`. This information is also useful for example if you
are configuring the Grafana datasource, as explained in the
[Grafana section](../admin/grafana.md) of the docs.

## MQTT Notification
Apart from HTTP notifications, QunatumLeap is able to handle notification using MQTT.
In this case, a MQTT message is received in a given MQTT client specified at subscription
time each time a notification is triggered.

From an operational point of view, MQTT subscriptions are like HTTP ones, as
described in [Orion Subscription](#orion-subscription) section of the documentation and in
the Orion API specification (e.g. the notification payload is the same, you can set an
expiration date, a filtering expression, etc.) but they use `mqtt`
instead of `http` in the `notification` object.

```
...
"notification": {
"mqtt": {
"url": "mqtt://quantumleap:1883",
"topic": "/ql/mqtt"
}
}
...
```

The following elements can be used within `mqtt`:

* `url` to specify the MQTT broker endpoint to use. URL must start with `mqtt://` and never contains
a path (i.e. it only includes host and port)
* `topic` to specify the MQTT topic to use. Here `/ql/mqtt` is configured.
* `qos`: to specify the MQTT QoS value to use in the notifications associated to the subscription
(0, 1 or 2). This is an optional field, if omitted then QoS 0 is used.
* `retain`: to specify the MQTT retain value to use in the notifications associated to the subscription
(`true` or `false`). This is an optional field, if omitted then retain `false` is used.
* `user` and `passwd`: optional fields, to be used in the case MQTT broker needs user/password based
authentication. If used, both fields have to be used together. Note that for security reasons,
the password is always offuscated when retrieving subscription information (e.g. `GET /v2/subscriptions`).
7 changes: 7 additions & 0 deletions src/server/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@

DEFAULT_HOST = '0.0.0.0' # bind to all available network interfaces
DEFAULT_PORT = 8668
MQTT_HOST = 'localhost'
MQTT_PORT = 1883
MQTT_USERNAME = ''
MQTT_PASSWORD = ''
MQTT_KEEPALIVE = 60
MQTT_TLS_ENABLED = False
MQTT_TOPIC = '/ql/mqtt'
48 changes: 48 additions & 0 deletions src/server/wsgi.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from connexion import FlaskApp
import logging
import server
from utils.cfgreader import EnvReader, BoolVar
from flask.logging import default_handler


SPEC_DIR = '../../specification/'
Expand Down Expand Up @@ -33,6 +35,52 @@ def new_wrapper() -> FlaskApp:
"""

application = quantumleap.app

def use_mqtt() -> bool:
env_var = BoolVar('USE_MQTT', False)
print(EnvReader().safe_read(env_var))
return EnvReader().safe_read(env_var)

if use_mqtt():
application.config['MQTT_BROKER_URL'] = server.MQTT_HOST
application.config['MQTT_BROKER_PORT'] = server.MQTT_PORT
application.config['MQTT_USERNAME'] = server.MQTT_USERNAME
application.config['MQTT_PASSWORD'] = server.MQTT_PASSWORD
application.config['MQTT_KEEPALIVE'] = server.MQTT_KEEPALIVE
application.config['MQTT_TLS_ENABLED'] = server.MQTT_TLS_ENABLED
topic = server.MQTT_TOPIC

mqtt_client = Mqtt(application)

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

@mqtt_client.on_connect()
def handle_connect(client, userdata, flags, rc):
if rc == 0:
logger.info('MQTT Connected successfully')
mqtt_client.subscribe(topic) # subscribe topic
else:
logger.info('Bad connection. Code:', rc)

@mqtt_client.on_message()
def handle_mqtt_message(client, userdata, message):
data = dict(
topic=message.topic,
payload=message.payload.decode()
)
logger.debug('Received message on topic: {topic} with payload: {payload}'.format(**data))
try:
payload = json.loads(message.payload)
except ValueError:
payload = None

if payload:
url = f'http://{server.DEFAULT_HOST}:{server.DEFAULT_PORT}/v2/notify'
headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
r = requests.post(url, data=json.dumps(payload), headers=headers)


"""
The WSGI callable to run QuantumLeap in a WSGI container of your choice,
e.g. Gunicorn, uWSGI.
Expand Down