diff --git a/.gitignore b/.gitignore
index d5f19d8..8c93adf 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,2 +1,3 @@
node_modules
package-lock.json
+.idea
diff --git a/README.md b/README.md
index df48317..fcea9f2 100644
--- a/README.md
+++ b/README.md
@@ -1,20 +1,4 @@
-## MQTT DB Nodes
+## Google IoT core nodes
-The MQTT DB Nodes are a clone of the built in MQTT nodes modified to optionally persist
-messages in a lightweight local database in case of a lost connection, crash or Node-RED restart.
-
-### How it works
-
-The mqtt client used by the node is configured to store incoming and outgoing messages using the NeDB database. Messages are stored in the `mqttdb` directory in the Node-RED directory. In there, for each
-broker configuration node, here is a directory containing incoming and outgoing message collections.
-
-To enable persistent storage for input and output messages, click on the appropriate check boxes. You can change the *compaction interval time* to compact the internal database and remove deleted messages.
-
-Database files for a connection are stored in the Node-RED directory under the *mqttdb* directory. These can be safely removed after stopping Node-RED to clear unsent messages. The database
-used is [NeDB](https://github.com/louischatriot/nedb).
-
-### Known Issues
-
-1. When a broker connection node is configured to persist incoming messages, the messages can be corrupted. We recommend only configuring outgoing persistence for now.
-
-2. The NeDB database is similar to REDIS in that it maintains data in RAM and writes to a file in case it goes down. Since it is RAM based, I expect it can only run so long storing messages before it runs into trouble.
+The Google IoT core nodes are a clone of the [MQTTDB](https://github.com/SenseTecnic/node-red-contrib-mqttdb) nodes modified for
+easier connecting Node-RED to the Google IoT core MQTT broker.
diff --git a/mqttdb.html b/google-iot-core.html
similarity index 60%
rename from mqttdb.html
rename to google-iot-core.html
index 07db632..d2b67e2 100644
--- a/mqttdb.html
+++ b/google-iot-core.html
@@ -13,17 +13,13 @@
Modifications Copyright 2017 Sense Tecnic Systems, Inc.
-->
-
-
-
-
-
-
diff --git a/mqttdb.js b/google-iot-core.js
similarity index 68%
rename from mqttdb.js
rename to google-iot-core.js
index 2f10948..1101ca3 100644
--- a/mqttdb.js
+++ b/google-iot-core.js
@@ -21,12 +21,12 @@ module.exports = function (RED) {
var mqtt = require("mqtt");
var util = require("util");
var isUtf8 = require('is-utf8');
-
+ var jwt = require('jsonwebtoken');
var path = require("path");
var fs = require("fs-extra");
var MQTTStore = require('mqtt-nedb-store');
- var mqttDir = path.join(RED.settings.userDir, 'mqttdb');
+ var mqttDir = path.join(RED.settings.userDir, 'google-iot-core');
// create a directory if needed for the data
fs.ensureDirSync(mqttDir);
@@ -39,6 +39,20 @@ module.exports = function (RED) {
return re.test(t);
}
+ // Create a Cloud IoT Core JWT for the given project id, signed with the given
+ // private key.
+ function createJwt (projectId, privateKey) {
+ // Create a JWT to authenticate this device. The device will be disconnected
+ // after the token expires, and will have to reconnect with a new token. The
+ // audience field should always be set to the GCP project id.
+ const token = {
+ 'iat': Math.round(Date.now() / 1000),
+ 'exp': Math.round(Date.now() / 1000) + 60 * 60, // 60 minutes
+ 'aud': projectId
+ };
+ return jwt.sign(token, privateKey, { algorithm: 'RS256' });
+ }
+
function MQTTBrokerNode(n) {
RED.nodes.createNode(this, n);
@@ -48,9 +62,9 @@ module.exports = function (RED) {
this.broker = n.broker;
this.port = n.port;
this.clientid = n.clientid;
- this.usetls = n.usetls;
+ this.projectid = n.projectid;
+ this.deviceid = n.deviceid;
this.verifyservercert = n.verifyservercert;
- this.compatmode = n.compatmode;
this.keepalive = n.keepalive;
this.cleansession = n.cleansession;
this.compactinterval = n.compactinterval;
@@ -75,19 +89,6 @@ module.exports = function (RED) {
};
}
- if (this.credentials) {
- this.username = this.credentials.user;
- this.password = this.credentials.password;
- }
-
- // If the config node is missing certain options (it was probably deployed prior to an update to the node code),
- // select/generate sensible options for the new fields
- if (typeof this.usetls === 'undefined') {
- this.usetls = false;
- }
- if (typeof this.compatmode === 'undefined') {
- this.compatmode = true;
- }
if (typeof this.verifyservercert === 'undefined') {
this.verifyservercert = false;
}
@@ -102,11 +103,7 @@ module.exports = function (RED) {
// Create the URL to pass in to the MQTT.js library
if (this.brokerurl === "") {
- if (this.usetls) {
- this.brokerurl = "mqtts://";
- } else {
- this.brokerurl = "mqtt://";
- }
+ this.brokerurl = "mqtts://";
if (this.broker !== "") {
this.brokerurl = this.brokerurl + this.broker + ":" + this.port;
} else {
@@ -114,30 +111,25 @@ module.exports = function (RED) {
}
}
- if (!this.cleansession && !this.clientid) {
- this.cleansession = true;
- this.warn(RED._("mqttdb.errors.nonclean-missingclientid"));
+ if (!this.clientid) {
+ this.warn(RED._("google-iot-core.errors.missingclientid"));
}
// Build options for passing to the MQTT.js API
- this.options.clientId = this.clientid || 'mqtt_' + (1 + Math.random() * 4294967295).toString(16);
- this.options.username = this.username;
- this.options.password = this.password;
+ this.options.clientId = this.clientid;
+ this.options.username = 'unused';
this.options.keepalive = this.keepalive;
this.options.clean = this.cleansession;
this.options.reconnectPeriod = RED.settings.mqttReconnectTime || 5000;
this.options.connectTimeout = 30000;
- if (this.compatmode == "true" || this.compatmode === true) {
- this.options.protocolId = 'MQIsdp';
- this.options.protocolVersion = 3;
- }
- if (this.usetls && n.tls) {
+ if (n.tls) {
var tlsNode = RED.nodes.getNode(n.tls);
if (tlsNode) {
tlsNode.addTLSOptions(this.options);
}
}
+ this.options.password = createJwt(this.projectid, this.options.key);
// configure db storage
if (this.persistin) {
@@ -195,72 +187,72 @@ module.exports = function (RED) {
this.connect = function () {
if (!node.connected && !node.connecting) {
node.connecting = true;
- node.client = mqtt.connect(node.brokerurl, node.options);
- node.client.setMaxListeners(0);
- // Register successful connect or reconnect handler
- node.client.on('connect', function () {
- node.connecting = false;
- node.connected = true;
- node.log(RED._("mqttdb.state.connected", { broker: (node.clientid ? node.clientid + "@" : "") + node.brokerurl }));
- for (var id in node.users) {
- if (node.users.hasOwnProperty(id)) {
- node.users[id].status({ fill: "green", shape: "dot", text: "node-red:common.status.connected" });
+ try {
+ node.client = mqtt.connect(node.brokerurl, node.options);
+ node.client.setMaxListeners(0);
+ // Register successful connect or reconnect handler
+ node.client.on('connect', function () {
+ node.connecting = false;
+ node.connected = true;
+ node.log(RED._("google-iot-core.state.connected", { broker: (node.clientid ? node.clientid + "@" : "") + node.brokerurl }));
+ for (var id in node.users) {
+ if (node.users.hasOwnProperty(id)) {
+ node.users[id].status({ fill: "green", shape: "dot", text: "node-red:common.status.connected" });
+ }
}
- }
- // Remove any existing listeners before resubscribing to avoid duplicates in the event of a re-connection
- node.client.removeAllListeners('message');
-
- // Re-subscribe to stored topics
- for (var s in node.subscriptions) {
- if (node.subscriptions.hasOwnProperty(s)) {
- var topic = s;
- var qos = 0;
- for (var r in node.subscriptions[s]) {
- if (node.subscriptions[s].hasOwnProperty(r)) {
- qos = Math.max(qos, node.subscriptions[s][r].qos);
- node.client.on('message', node.subscriptions[s][r].handler);
+ // Remove any existing listeners before resubscribing to avoid duplicates in the event of a re-connection
+ node.client.removeAllListeners('message');
+
+ // Re-subscribe to stored topics
+ for (var s in node.subscriptions) {
+ if (node.subscriptions.hasOwnProperty(s)) {
+ var topic = s;
+ var qos = 0;
+ for (var r in node.subscriptions[s]) {
+ if (node.subscriptions[s].hasOwnProperty(r)) {
+ qos = Math.max(qos, node.subscriptions[s][r].qos);
+ node.client.on('message', node.subscriptions[s][r].handler);
+ }
}
+ var options = { qos: qos };
+ node.client.subscribe(topic, options);
}
- var options = { qos: qos };
- node.client.subscribe(topic, options);
}
- }
- // Send any birth message
- if (node.birthMessage) {
- node.publish(node.birthMessage);
- }
- });
- node.client.on("reconnect", function () {
- for (var id in node.users) {
- if (node.users.hasOwnProperty(id)) {
- node.users[id].status({ fill: "yellow", shape: "ring", text: "node-red:common.status.connecting" });
+ // Send any birth message
+ if (node.birthMessage) {
+ node.publish(node.birthMessage);
}
- }
- })
- // Register disconnect handlers
- node.client.on('close', function () {
- if (node.connected) {
- node.connected = false;
- node.log(RED._("mqttdb.state.disconnected", { broker: (node.clientid ? node.clientid + "@" : "") + node.brokerurl }));
+ });
+ node.client.on("reconnect", function () {
for (var id in node.users) {
if (node.users.hasOwnProperty(id)) {
- node.users[id].status({ fill: "red", shape: "ring", text: "node-red:common.status.disconnected" });
+ node.users[id].status({ fill: "yellow", shape: "ring", text: "node-red:common.status.connecting" });
}
}
- } else if (node.connecting) {
- node.log(RED._("mqttdb.state.connect-failed", { broker: (node.clientid ? node.clientid + "@" : "") + node.brokerurl }));
- }
- });
+ })
+ // Register disconnect handlers
+ node.client.on('close', function () {
+ // refresh JWT token
+ node.client.options.password = createJwt(node.projectid, node.options.key);
+ if (node.connected) {
+ node.connected = false;
+ node.log(RED._("google-iot-core.state.disconnected", { broker: (node.clientid ? node.clientid + "@" : "") + node.brokerurl }));
+ for (var id in node.users) {
+ if (node.users.hasOwnProperty(id)) {
+ node.users[id].status({ fill: "red", shape: "ring", text: "node-red:common.status.disconnected" });
+ }
+ }
+ } else if (node.connecting) {
+ node.log(RED._("google-iot-core.state.connect-failed", { broker: (node.clientid ? node.clientid + "@" : "") + node.brokerurl }));
+ }
+ });
- // Register connect error handler
- node.client.on('error', function (error) {
- if (node.connecting) {
- node.client.end();
- node.connecting = false;
- }
- node.error(error);
- });
+ // The client's own reconnect logic will take care of errors
+ node.client.on('error', function (error) {});
+ }catch(err) {
+ console.log(err);
+ }
}
};
@@ -355,24 +347,17 @@ module.exports = function (RED) {
});
}
- RED.nodes.registerType("mqttdb-broker", MQTTBrokerNode, {
- credentials: {
- user: { type: "text" },
- password: { type: "password" }
- }
- });
-
function MQTTInNode(n) {
RED.nodes.createNode(this, n);
- this.topic = n.topic;
this.qos = parseInt(n.qos);
- if (isNaN(this.qos) || this.qos < 0 || this.qos > 2) {
- this.qos = 2;
+ if (isNaN(this.qos) || this.qos < 0 || this.qos > 1) {
+ this.qos = 1;
}
this.broker = n.broker;
this.brokerConn = RED.nodes.getNode(this.broker);
+ this.topic = '/devices/' + this.brokerConn.deviceid + '/config';
if (!/^(#$|(\+|[^+#]*)(\/(\+|[^+#]*))*(\/(\+|#|[^+#]*))?$)/.test(this.topic)) {
- return this.warn(RED._("mqttdb.errors.invalid-topic"));
+ return this.warn(RED._("google-iot-core.errors.invalid-topic"));
}
var node = this;
if (this.brokerConn) {
@@ -392,7 +377,7 @@ module.exports = function (RED) {
}
}
else {
- this.error(RED._("mqttdb.errors.not-defined"));
+ this.error(RED._("google-iot-core.errors.not-defined"));
}
this.on('close', function (done) {
if (node.brokerConn) {
@@ -401,18 +386,27 @@ module.exports = function (RED) {
}
});
} else {
- this.error(RED._("mqttdb.errors.missing-config"));
+ this.error(RED._("google-iot-core.errors.missing-config"));
}
}
- RED.nodes.registerType("mqttdb in", MQTTInNode);
+ RED.nodes.registerType("google-iot-core in", MQTTInNode);
+
+ RED.nodes.registerType("google-iot-core-broker", MQTTBrokerNode, {
+ });
function MQTTOutNode(n) {
RED.nodes.createNode(this, n);
- this.topic = n.topic;
this.qos = n.qos || null;
this.retain = n.retain;
this.broker = n.broker;
+ this.subfolder = n.subfolder;
this.brokerConn = RED.nodes.getNode(this.broker);
+
+ if (this.subfolder) {
+ this.topic = '/devices/' + this.brokerConn.deviceid + '/events/' + this.subfolder;
+ } else {
+ this.topic = '/devices/' + this.brokerConn.deviceid + '/events';
+ }
var node = this;
if (this.brokerConn) {
@@ -420,7 +414,7 @@ module.exports = function (RED) {
this.on("input", function (msg) {
if (msg.qos) {
msg.qos = parseInt(msg.qos);
- if ((msg.qos !== 0) && (msg.qos !== 1) && (msg.qos !== 2)) {
+ if ((msg.qos !== 0) && (msg.qos !== 1)) {
msg.qos = null;
}
}
@@ -434,7 +428,7 @@ module.exports = function (RED) {
if (msg.hasOwnProperty("topic") && (typeof msg.topic === "string") && (msg.topic !== "")) { // topic must exist
this.brokerConn.publish(msg); // send the message
}
- else { node.warn(RED._("mqttdb.errors.invalid-topic")); }
+ else { node.warn(RED._("google-iot-core.errors.invalid-topic")); }
}
});
if (this.brokerConn.connected) {
@@ -445,8 +439,8 @@ module.exports = function (RED) {
node.brokerConn.deregister(node, done);
});
} else {
- this.error(RED._("mqttdb.errors.missing-config"));
+ this.error(RED._("google-iot-core.errors.missing-config"));
}
}
- RED.nodes.registerType("mqttdb out", MQTTOutNode);
+ RED.nodes.registerType("google-iot-core out", MQTTOutNode);
};
diff --git a/locales/en-US/mqttdb.json b/locales/en-US/google-iot-core.json
similarity index 77%
rename from locales/en-US/mqttdb.json
rename to locales/en-US/google-iot-core.json
index 3a295a2..8fec4c3 100644
--- a/locales/en-US/mqttdb.json
+++ b/locales/en-US/google-iot-core.json
@@ -1,22 +1,22 @@
{
- "mqttdb": {
+ "google-iot-core": {
"label": {
"broker": "Server",
"example": "e.g. localhost",
"qos": "QoS",
"clientid": "Client ID",
+ "region": "Cloud region",
+ "projectid": "Project ID",
+ "deviceid": "Device ID",
+ "registryid": "Registry ID",
"port": "Port",
"keepalive": "Keep alive time (s)",
- "cleansession": "Use clean session",
- "use-tls": "Enable secure (SSL/TLS) connection",
"tls-config":"TLS Configuration",
"verify-server-cert":"Verify server certificate",
- "compatmode": "Use legacy MQTT 3.1 support",
"topic": "Topic",
"name": "Name",
- "username": "Username",
- "password": "Password",
- "payload": "Payload",
+ "subfolder": "Subfolder",
+ "payload": "Payload",
"persistout": "Persist outgoing messages",
"persistin": "Incoming messages",
"compactinterval": "Compaction interval time (s)"
@@ -29,9 +29,9 @@
},
"placeholder": {
"clientid": "Leave blank for auto generated",
- "clientid-nonclean":"Must be set for non-clean sessions",
"will-topic": "Leave blank to disable will message",
- "birth-topic": "Leave blank to disable birth message"
+ "birth-topic": "Leave blank to disable birth message",
+ "region": "e.g. europe-west1"
},
"state": {
"connected": "Connected to broker: __broker__",
@@ -46,7 +46,7 @@
"not-defined": "topic not defined",
"missing-config": "missing broker configuration",
"invalid-topic": "Invalid topic specified",
- "nonclean-missingclientid": "No client ID set, using clean session"
+ "missingclientid": "No client ID set"
}
}
}
\ No newline at end of file
diff --git a/package.json b/package.json
index 1aa71dd..44f4d41 100644
--- a/package.json
+++ b/package.json
@@ -1,22 +1,26 @@
{
- "name": "node-red-contrib-mqttdb",
- "version": "0.1.1",
- "description": "MQTT Node with persistent message storage",
+ "name": "node-red-contrib-google-iot-core",
+ "version": "0.0.5",
+ "description": "MQTT Node with persistent message storage for Google IoT core",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
- "repository" : "https://github.com/SenseTecnic/node-red-contrib-mqttdb.git",
- "author": "Mike Blackstock",
+ "repository": "https://github.com/cyberflohr/node-red-contrib-google-iot-core",
+ "author": "Wolfgang Flohr-Hochbichler",
"license": "Apache-2.0",
- "keywords": [ "node-red", "mqtt" ],
+ "keywords": [
+ "node-red",
+ "mqtt"
+ ],
"node-red": {
"nodes": {
- "mqttdb": "mqttdb.js"
+ "google-iot-core": "google-iot-core.js"
}
},
"dependencies": {
"fs-extra": "^4.0.2",
"is-utf8": "^0.2.1",
+ "jsonwebtoken": "^8.1.0",
"mqtt": "^2.14.0",
"mqtt-nedb-store": "^0.1.0",
"nedb": "^1.8.0"