diff --git a/.gitignore b/.gitignore index d5f19d8..8c93adf 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ node_modules package-lock.json +.idea diff --git a/README.md b/README.md index df48317..fcea9f2 100644 --- a/README.md +++ b/README.md @@ -1,20 +1,4 @@ -## MQTT DB Nodes +## Google IoT core nodes -The MQTT DB Nodes are a clone of the built in MQTT nodes modified to optionally persist -messages in a lightweight local database in case of a lost connection, crash or Node-RED restart. - -### How it works - -The mqtt client used by the node is configured to store incoming and outgoing messages using the NeDB database. Messages are stored in the `mqttdb` directory in the Node-RED directory. In there, for each -broker configuration node, here is a directory containing incoming and outgoing message collections. - -To enable persistent storage for input and output messages, click on the appropriate check boxes. You can change the *compaction interval time* to compact the internal database and remove deleted messages. - -Database files for a connection are stored in the Node-RED directory under the *mqttdb* directory. These can be safely removed after stopping Node-RED to clear unsent messages. The database -used is [NeDB](https://github.com/louischatriot/nedb). - -### Known Issues - -1. When a broker connection node is configured to persist incoming messages, the messages can be corrupted. We recommend only configuring outgoing persistence for now. - -2. The NeDB database is similar to REDIS in that it maintains data in RAM and writes to a file in case it goes down. Since it is RAM based, I expect it can only run so long storing messages before it runs into trouble. +The Google IoT core nodes are a clone of the [MQTTDB](https://github.com/SenseTecnic/node-red-contrib-mqttdb) nodes modified for +easier connecting Node-RED to the Google IoT core MQTT broker. diff --git a/mqttdb.html b/google-iot-core.html similarity index 60% rename from mqttdb.html rename to google-iot-core.html index 07db632..d2b67e2 100644 --- a/mqttdb.html +++ b/google-iot-core.html @@ -13,17 +13,13 @@ Modifications Copyright 2017 Sense Tecnic Systems, Inc. --> - - - - - - diff --git a/mqttdb.js b/google-iot-core.js similarity index 68% rename from mqttdb.js rename to google-iot-core.js index 2f10948..1101ca3 100644 --- a/mqttdb.js +++ b/google-iot-core.js @@ -21,12 +21,12 @@ module.exports = function (RED) { var mqtt = require("mqtt"); var util = require("util"); var isUtf8 = require('is-utf8'); - + var jwt = require('jsonwebtoken'); var path = require("path"); var fs = require("fs-extra"); var MQTTStore = require('mqtt-nedb-store'); - var mqttDir = path.join(RED.settings.userDir, 'mqttdb'); + var mqttDir = path.join(RED.settings.userDir, 'google-iot-core'); // create a directory if needed for the data fs.ensureDirSync(mqttDir); @@ -39,6 +39,20 @@ module.exports = function (RED) { return re.test(t); } + // Create a Cloud IoT Core JWT for the given project id, signed with the given + // private key. + function createJwt (projectId, privateKey) { + // Create a JWT to authenticate this device. The device will be disconnected + // after the token expires, and will have to reconnect with a new token. The + // audience field should always be set to the GCP project id. + const token = { + 'iat': Math.round(Date.now() / 1000), + 'exp': Math.round(Date.now() / 1000) + 60 * 60, // 60 minutes + 'aud': projectId + }; + return jwt.sign(token, privateKey, { algorithm: 'RS256' }); + } + function MQTTBrokerNode(n) { RED.nodes.createNode(this, n); @@ -48,9 +62,9 @@ module.exports = function (RED) { this.broker = n.broker; this.port = n.port; this.clientid = n.clientid; - this.usetls = n.usetls; + this.projectid = n.projectid; + this.deviceid = n.deviceid; this.verifyservercert = n.verifyservercert; - this.compatmode = n.compatmode; this.keepalive = n.keepalive; this.cleansession = n.cleansession; this.compactinterval = n.compactinterval; @@ -75,19 +89,6 @@ module.exports = function (RED) { }; } - if (this.credentials) { - this.username = this.credentials.user; - this.password = this.credentials.password; - } - - // If the config node is missing certain options (it was probably deployed prior to an update to the node code), - // select/generate sensible options for the new fields - if (typeof this.usetls === 'undefined') { - this.usetls = false; - } - if (typeof this.compatmode === 'undefined') { - this.compatmode = true; - } if (typeof this.verifyservercert === 'undefined') { this.verifyservercert = false; } @@ -102,11 +103,7 @@ module.exports = function (RED) { // Create the URL to pass in to the MQTT.js library if (this.brokerurl === "") { - if (this.usetls) { - this.brokerurl = "mqtts://"; - } else { - this.brokerurl = "mqtt://"; - } + this.brokerurl = "mqtts://"; if (this.broker !== "") { this.brokerurl = this.brokerurl + this.broker + ":" + this.port; } else { @@ -114,30 +111,25 @@ module.exports = function (RED) { } } - if (!this.cleansession && !this.clientid) { - this.cleansession = true; - this.warn(RED._("mqttdb.errors.nonclean-missingclientid")); + if (!this.clientid) { + this.warn(RED._("google-iot-core.errors.missingclientid")); } // Build options for passing to the MQTT.js API - this.options.clientId = this.clientid || 'mqtt_' + (1 + Math.random() * 4294967295).toString(16); - this.options.username = this.username; - this.options.password = this.password; + this.options.clientId = this.clientid; + this.options.username = 'unused'; this.options.keepalive = this.keepalive; this.options.clean = this.cleansession; this.options.reconnectPeriod = RED.settings.mqttReconnectTime || 5000; this.options.connectTimeout = 30000; - if (this.compatmode == "true" || this.compatmode === true) { - this.options.protocolId = 'MQIsdp'; - this.options.protocolVersion = 3; - } - if (this.usetls && n.tls) { + if (n.tls) { var tlsNode = RED.nodes.getNode(n.tls); if (tlsNode) { tlsNode.addTLSOptions(this.options); } } + this.options.password = createJwt(this.projectid, this.options.key); // configure db storage if (this.persistin) { @@ -195,72 +187,72 @@ module.exports = function (RED) { this.connect = function () { if (!node.connected && !node.connecting) { node.connecting = true; - node.client = mqtt.connect(node.brokerurl, node.options); - node.client.setMaxListeners(0); - // Register successful connect or reconnect handler - node.client.on('connect', function () { - node.connecting = false; - node.connected = true; - node.log(RED._("mqttdb.state.connected", { broker: (node.clientid ? node.clientid + "@" : "") + node.brokerurl })); - for (var id in node.users) { - if (node.users.hasOwnProperty(id)) { - node.users[id].status({ fill: "green", shape: "dot", text: "node-red:common.status.connected" }); + try { + node.client = mqtt.connect(node.brokerurl, node.options); + node.client.setMaxListeners(0); + // Register successful connect or reconnect handler + node.client.on('connect', function () { + node.connecting = false; + node.connected = true; + node.log(RED._("google-iot-core.state.connected", { broker: (node.clientid ? node.clientid + "@" : "") + node.brokerurl })); + for (var id in node.users) { + if (node.users.hasOwnProperty(id)) { + node.users[id].status({ fill: "green", shape: "dot", text: "node-red:common.status.connected" }); + } } - } - // Remove any existing listeners before resubscribing to avoid duplicates in the event of a re-connection - node.client.removeAllListeners('message'); - - // Re-subscribe to stored topics - for (var s in node.subscriptions) { - if (node.subscriptions.hasOwnProperty(s)) { - var topic = s; - var qos = 0; - for (var r in node.subscriptions[s]) { - if (node.subscriptions[s].hasOwnProperty(r)) { - qos = Math.max(qos, node.subscriptions[s][r].qos); - node.client.on('message', node.subscriptions[s][r].handler); + // Remove any existing listeners before resubscribing to avoid duplicates in the event of a re-connection + node.client.removeAllListeners('message'); + + // Re-subscribe to stored topics + for (var s in node.subscriptions) { + if (node.subscriptions.hasOwnProperty(s)) { + var topic = s; + var qos = 0; + for (var r in node.subscriptions[s]) { + if (node.subscriptions[s].hasOwnProperty(r)) { + qos = Math.max(qos, node.subscriptions[s][r].qos); + node.client.on('message', node.subscriptions[s][r].handler); + } } + var options = { qos: qos }; + node.client.subscribe(topic, options); } - var options = { qos: qos }; - node.client.subscribe(topic, options); } - } - // Send any birth message - if (node.birthMessage) { - node.publish(node.birthMessage); - } - }); - node.client.on("reconnect", function () { - for (var id in node.users) { - if (node.users.hasOwnProperty(id)) { - node.users[id].status({ fill: "yellow", shape: "ring", text: "node-red:common.status.connecting" }); + // Send any birth message + if (node.birthMessage) { + node.publish(node.birthMessage); } - } - }) - // Register disconnect handlers - node.client.on('close', function () { - if (node.connected) { - node.connected = false; - node.log(RED._("mqttdb.state.disconnected", { broker: (node.clientid ? node.clientid + "@" : "") + node.brokerurl })); + }); + node.client.on("reconnect", function () { for (var id in node.users) { if (node.users.hasOwnProperty(id)) { - node.users[id].status({ fill: "red", shape: "ring", text: "node-red:common.status.disconnected" }); + node.users[id].status({ fill: "yellow", shape: "ring", text: "node-red:common.status.connecting" }); } } - } else if (node.connecting) { - node.log(RED._("mqttdb.state.connect-failed", { broker: (node.clientid ? node.clientid + "@" : "") + node.brokerurl })); - } - }); + }) + // Register disconnect handlers + node.client.on('close', function () { + // refresh JWT token + node.client.options.password = createJwt(node.projectid, node.options.key); + if (node.connected) { + node.connected = false; + node.log(RED._("google-iot-core.state.disconnected", { broker: (node.clientid ? node.clientid + "@" : "") + node.brokerurl })); + for (var id in node.users) { + if (node.users.hasOwnProperty(id)) { + node.users[id].status({ fill: "red", shape: "ring", text: "node-red:common.status.disconnected" }); + } + } + } else if (node.connecting) { + node.log(RED._("google-iot-core.state.connect-failed", { broker: (node.clientid ? node.clientid + "@" : "") + node.brokerurl })); + } + }); - // Register connect error handler - node.client.on('error', function (error) { - if (node.connecting) { - node.client.end(); - node.connecting = false; - } - node.error(error); - }); + // The client's own reconnect logic will take care of errors + node.client.on('error', function (error) {}); + }catch(err) { + console.log(err); + } } }; @@ -355,24 +347,17 @@ module.exports = function (RED) { }); } - RED.nodes.registerType("mqttdb-broker", MQTTBrokerNode, { - credentials: { - user: { type: "text" }, - password: { type: "password" } - } - }); - function MQTTInNode(n) { RED.nodes.createNode(this, n); - this.topic = n.topic; this.qos = parseInt(n.qos); - if (isNaN(this.qos) || this.qos < 0 || this.qos > 2) { - this.qos = 2; + if (isNaN(this.qos) || this.qos < 0 || this.qos > 1) { + this.qos = 1; } this.broker = n.broker; this.brokerConn = RED.nodes.getNode(this.broker); + this.topic = '/devices/' + this.brokerConn.deviceid + '/config'; if (!/^(#$|(\+|[^+#]*)(\/(\+|[^+#]*))*(\/(\+|#|[^+#]*))?$)/.test(this.topic)) { - return this.warn(RED._("mqttdb.errors.invalid-topic")); + return this.warn(RED._("google-iot-core.errors.invalid-topic")); } var node = this; if (this.brokerConn) { @@ -392,7 +377,7 @@ module.exports = function (RED) { } } else { - this.error(RED._("mqttdb.errors.not-defined")); + this.error(RED._("google-iot-core.errors.not-defined")); } this.on('close', function (done) { if (node.brokerConn) { @@ -401,18 +386,27 @@ module.exports = function (RED) { } }); } else { - this.error(RED._("mqttdb.errors.missing-config")); + this.error(RED._("google-iot-core.errors.missing-config")); } } - RED.nodes.registerType("mqttdb in", MQTTInNode); + RED.nodes.registerType("google-iot-core in", MQTTInNode); + + RED.nodes.registerType("google-iot-core-broker", MQTTBrokerNode, { + }); function MQTTOutNode(n) { RED.nodes.createNode(this, n); - this.topic = n.topic; this.qos = n.qos || null; this.retain = n.retain; this.broker = n.broker; + this.subfolder = n.subfolder; this.brokerConn = RED.nodes.getNode(this.broker); + + if (this.subfolder) { + this.topic = '/devices/' + this.brokerConn.deviceid + '/events/' + this.subfolder; + } else { + this.topic = '/devices/' + this.brokerConn.deviceid + '/events'; + } var node = this; if (this.brokerConn) { @@ -420,7 +414,7 @@ module.exports = function (RED) { this.on("input", function (msg) { if (msg.qos) { msg.qos = parseInt(msg.qos); - if ((msg.qos !== 0) && (msg.qos !== 1) && (msg.qos !== 2)) { + if ((msg.qos !== 0) && (msg.qos !== 1)) { msg.qos = null; } } @@ -434,7 +428,7 @@ module.exports = function (RED) { if (msg.hasOwnProperty("topic") && (typeof msg.topic === "string") && (msg.topic !== "")) { // topic must exist this.brokerConn.publish(msg); // send the message } - else { node.warn(RED._("mqttdb.errors.invalid-topic")); } + else { node.warn(RED._("google-iot-core.errors.invalid-topic")); } } }); if (this.brokerConn.connected) { @@ -445,8 +439,8 @@ module.exports = function (RED) { node.brokerConn.deregister(node, done); }); } else { - this.error(RED._("mqttdb.errors.missing-config")); + this.error(RED._("google-iot-core.errors.missing-config")); } } - RED.nodes.registerType("mqttdb out", MQTTOutNode); + RED.nodes.registerType("google-iot-core out", MQTTOutNode); }; diff --git a/locales/en-US/mqttdb.json b/locales/en-US/google-iot-core.json similarity index 77% rename from locales/en-US/mqttdb.json rename to locales/en-US/google-iot-core.json index 3a295a2..8fec4c3 100644 --- a/locales/en-US/mqttdb.json +++ b/locales/en-US/google-iot-core.json @@ -1,22 +1,22 @@ { - "mqttdb": { + "google-iot-core": { "label": { "broker": "Server", "example": "e.g. localhost", "qos": "QoS", "clientid": "Client ID", + "region": "Cloud region", + "projectid": "Project ID", + "deviceid": "Device ID", + "registryid": "Registry ID", "port": "Port", "keepalive": "Keep alive time (s)", - "cleansession": "Use clean session", - "use-tls": "Enable secure (SSL/TLS) connection", "tls-config":"TLS Configuration", "verify-server-cert":"Verify server certificate", - "compatmode": "Use legacy MQTT 3.1 support", "topic": "Topic", "name": "Name", - "username": "Username", - "password": "Password", - "payload": "Payload", + "subfolder": "Subfolder", + "payload": "Payload", "persistout": "Persist outgoing messages", "persistin": "Incoming messages", "compactinterval": "Compaction interval time (s)" @@ -29,9 +29,9 @@ }, "placeholder": { "clientid": "Leave blank for auto generated", - "clientid-nonclean":"Must be set for non-clean sessions", "will-topic": "Leave blank to disable will message", - "birth-topic": "Leave blank to disable birth message" + "birth-topic": "Leave blank to disable birth message", + "region": "e.g. europe-west1" }, "state": { "connected": "Connected to broker: __broker__", @@ -46,7 +46,7 @@ "not-defined": "topic not defined", "missing-config": "missing broker configuration", "invalid-topic": "Invalid topic specified", - "nonclean-missingclientid": "No client ID set, using clean session" + "missingclientid": "No client ID set" } } } \ No newline at end of file diff --git a/package.json b/package.json index 1aa71dd..44f4d41 100644 --- a/package.json +++ b/package.json @@ -1,22 +1,26 @@ { - "name": "node-red-contrib-mqttdb", - "version": "0.1.1", - "description": "MQTT Node with persistent message storage", + "name": "node-red-contrib-google-iot-core", + "version": "0.0.5", + "description": "MQTT Node with persistent message storage for Google IoT core", "scripts": { "test": "echo \"Error: no test specified\" && exit 1" }, - "repository" : "https://github.com/SenseTecnic/node-red-contrib-mqttdb.git", - "author": "Mike Blackstock", + "repository": "https://github.com/cyberflohr/node-red-contrib-google-iot-core", + "author": "Wolfgang Flohr-Hochbichler", "license": "Apache-2.0", - "keywords": [ "node-red", "mqtt" ], + "keywords": [ + "node-red", + "mqtt" + ], "node-red": { "nodes": { - "mqttdb": "mqttdb.js" + "google-iot-core": "google-iot-core.js" } }, "dependencies": { "fs-extra": "^4.0.2", "is-utf8": "^0.2.1", + "jsonwebtoken": "^8.1.0", "mqtt": "^2.14.0", "mqtt-nedb-store": "^0.1.0", "nedb": "^1.8.0"