-
Notifications
You must be signed in to change notification settings - Fork 0
Creating Your Own Simple Data Pipe Connector (using the high level API)
The Simple Data Pipe example application includes connectors for multiple data sources as well as the ability to implement your own connector. For creating your own connector, the data pipe provides two approaches:
- The advanced approach (extending connector.js) requires you to handle the authentication and to configure all the steps of the pipe run (i.e., fetch data from source, copy data into Cloudant, etc).
- The simplified approach (extending connectorExt.js) handles the latter half of the pipe run for you and only requires you to handle the authentication and fetching of records from the data source.
In addition, the Simple Data Pipe includes two sample connectors (SampleConnector and SampleConnector2), which you can use as templates for building your own.
This tutorial will step through how to create a connector for your data source using our simplified approach. It uses SugarCRM 6.5 as the data source; however, the same general principle applies to other data sources which support OAuth and provide a REST API for access.
REST API access to the data source providing the content is needed. For these instructions, we installed and ran SugarCRM 6.5.22 Community Edition locally.
Configure the OAuth credentials for SugarCRM 6.x as follows:
-
Log into SugarCRM
-
Go into the OAuth page ( > Admin > OAuth Keys )
-
Click on Create
-
Enter a Consumer Key Name, Consumer Key, and Consumer Secret
![SugarCRM OAuth Keys page] (https://i0.wp.com/developer.ibm.com/clouddataservices/wp-content/uploads/sites/47/2015/11/sugarcrmoauthkeys.png)
-
Click Save
SugarCRM should now be ready for authentication via OAuth.
The instructions for setting up the Simple Data Pipe development environment can be found here: Instructions for setting up a local dev environment for Simple Data Pipe.
The first step is to add a new connector JavaScript file which uses the pipes-sdk and extends the connectorExt.js.
- Create a new directory (e.g., pipes.connector.sugarcrm) for your connector
- Create a lib directory inside the connector directory created in step 1
- Create an
index.jsfile inside the lib directory created in step 2 - To separate any boilerplate code from
index.js, you can create a second JavaScript file (e.g., connectorUtil.js) for helper functions - Edit the new
index.js:
- Extend connectorExt.js and call it, passing a unique ID and a label for your connector
- Edit
index.jsand add the functions to override. Your connector will need to override a few basic functions:
-
getTablePrefix- returns your connector's prefix to be prepended to table names created when copying into Cloudant -
connectDataSource- initiates the OAuth protocol (getting a Request token and authorizing the token) -
authCallback- handles the callback after authorization (get the Access token and obtain the list of tables) -
doConnectStep- connect to data source and verify Access token is still valid (handle token refresh if necessary) -
fetchRecords- get the records which will be copied over into Cloudant
Creating an npm package for the connector allows for easier installing and sharing of the connector.
From a command line prompt:
-
Change to the connector directory:
cd pipes.connector.sugarcrm -
Create a default
package.json:npm init --yes -
Edit the new
package.json(Detail about package.json can be found in the npm documentation):- Edit the description if desired
- Set main to ./lib
- Add pipes-connector-name and set to a unique name for the connector package
- Remove scripts
- Insert keywords if desired
- Add dependencies required by the connector code (be sure to include pipes-sdk)
- Update the author
- Change the license if necessary
{
"name": "pipes.connector.sugarcrm", "version": "0.1.0", "description": "SugarCRM connector for Simple Data Pipe tool", "main": "./lib", "pipes-connector-name": "sugarcrm", "keywords": [ "pipes", "dataworks", "dashDb", "Cloudant", "Data Movement", "SugarCRM" ], "dependencies": { "async": "^1.5.0", "bluemix-helper-config": "^0.1.9", "lodash": "~3.9.3", "node-schedule": "~0.2.9", "oauth": "0.9.10", "pipes-sdk": "^0.1.0", "request": "^2.67.0", "when": "~3.7.3" }, "author": "va@us.ibm.com", "license": "Apache-2.0" } ```
To see and make the connector available in Simple Data Pipe it will need to be installed using npm.
From a command line prompt:
-
Change to the Simple Data Pipe directory:
cd pipes -
Link to the connector:
npm link [connectorPath]/pipes.connector.sugarcrm[connectorPath] is the path to the connector
Using
npm linkadds a symlink in the pipes/nodes_modules directory. This allows for updates to the connector to be immediately available in Simple Data Pipe. Alternatively,npm installcan be used to install a copy of the connector in Simple Data Pipe but, when changing the connectornpm installwill need to be re-run so the update can be picked up.
Simple Data Pipe is now able to find and display your connector!
var pipesSDK = require.main.require('pipes-sdk');
var connectorExt = pipesSDK.connectorExt;
var connUtil = require('./connectorUtil.js').ConnectorUtil;
var pipeDb = pipesSDK.pipesDb;
var _ = require('lodash');
function sugarConnector() {
connectorExt.call(this, connUtil.metadata.id, connUtil.metadata.label);
this.getTablePrefix = function() {
//prepended to the name of each table created
return connUtil.metadata.prefix;
};
this.connectDataSource = function(req, res, pipeId, url, callback) {
//TODO: get request token and approve request token
callback("Not yet implemented");
};
this.authCallback = function(oAuthCode, pipeId, callback, state) {
//TODO: get access token and use access token to retrieve tables
callback("Not yet implemented");
};
this.doConnectStep = function(done, pipeRunStep, pipeRunStats, logger, pipe, pipeRunner) {
//TODO: reconnect to confirm token still valid
done();
};
this.fetchRecords = function(table, pushRecordFn, done, pipeRunStep, pipeRunStats, logger, pipe, pipeRunner) {
//TODO: fetch records for requested tables
done();
}
};
require('util').inherits(sugarConnector, connectorExt);
module.exports = new sugarConnector();
var ConnectorUtil = {
metadata: {
id: "sugarcrm",
label: "SugarCRM",
prefix: "sugar"
}
};
module.exports.ConnectorUtil = ConnectorUtil;
Using cloudant service "Cloudant Url"
Pipes Tool started on port 8082 : Thu Dec 03 2015 11:06:06 GMT-0500 (EST)
Loaded connector SalesForce
Loaded connector stripe
Cloudant database pipe_db successfully initialized
Loaded npm dependency connector sugarcrm
Cloudant database pipe_db successfully initialized
![Create A New Pipe dialog] (https://i0.wp.com/developer.ibm.com/clouddataservices/wp-content/uploads/sites/47/2015/11/pipescreatenewpipe.png)
The OAuth protocol for SugarCRM 6.x is a multi-step flow. With the simplified approch for creating connectors the first steps (Request token and User authorization) should be handled within the connectDataSource function while the final step (Access token) should be handled within the authCallback function.
- Implement the
connectDataSourcefunction - Get the pipe object - which should contain the pipe.clientId (Consumer Key) and pipe.clientSecret (Consumer Secret)
- Using the Consumer Key and Consumer Secret obtain a Request token. For SugarCRM 6.x make the REST call passing
method=oauth_request_token - Store the token and secret received from the request, along with pipeId and url. If your data source's OAuth implementation supports state parameters you may use it or alternatively, you may use the req session object to store the state
- Redirect user to approve token. For SugarCRM 6.x make the REST call passing
module=OAuthTokens,action=authorize,token=<request_token> - Implement the
authCallbackfunction - Get the pipe object
- Using the OAuth code/verifier obtain an Access token. For SugarCRM 6.x make the REST call passing
method=oauth_access_token,oauth_verifier=<verifiier> - Update the pipe object with the access token and secret received from the request
- Obtain the list of tables that will be copied over to Cloudant. For SugarCRM 6.x make a REST call passing
method=get_available_modules - Update the pipe object with the tables from the previous request. The pipe.tables should be an array of objects containing
nameandlabelPlural - Call the callback function passing in the updated pipe object. The first parameter of
callback()is an error message or null if no errors
Simple Data Pipe is now able to connect to the data source!
this.connectDataSource = function(req, res, pipeId, url, callback) {
//get the pipe object
pipeDb.getPipe(pipeId, function(err, pipe) {
if (err) {
console.error("connectDataSource > getPipe - error: ", err);
return callback(err);
}
var oauth = connUtil.oauthClient(pipe);
//obtain a request token
oauth.getOAuthRequestToken(
{ "method": "oauth_request_token" },
function(err, oauth_token, oauth_token_secret, results) {
if (err) {
console.error("connectDataSource > getOAuthRequestToken - error: ", err);
}
else {
//store pipe, url, token, secret in session to retrieve/use after token has been manually authorized
req.session.state = JSON.stringify({pipe: pipeId, url: url, oauth_token: oauth_token, oauth_token_secret: oauth_token_secret });
//redirect the user to authorize the token
res.redirect(connUtil.oauthEndPoint.approveRequestToken + oauth_token);
}
}
);
});
};
this.authCallback = function(oAuthCode, pipeId, callback, state) {
//get the pipe object
pipeDb.getPipe(pipeId, function(err, pipe) {
if (err) {
console.error("authCallback > getPipe - error: ", err);
return callback(err);
}
var oauth = connUtil.oauthClient(pipe, oAuthCode);
//obtain an access token
oauth.getOAuthAccessToken(
state.oauth_token,
state.oauth_token_secret,
oAuthCode,
function(err, oauth_access_token, oauth_access_token_secret, results) {
if (err) {
console.error("authCallback > getOAuthAccessToken - error: ", err);
return callback(err);
}
else {
//update the pipe with the oauth token and token secret
pipe.oauth = {
oauth_access_token: oauth_access_token,
oauth_access_token_secret: oauth_access_token_secret
};
//obtain the list of tables to be copied over into Cloudant
oauth.get(
connUtil.restApi.getAvailableModules(pipe),
oauth_access_token,
oauth_access_token_secret,
function(err, data, res) {
if (err) {
console.error("authCallback > get_available_modules - error: ", err);
return callback(err);
}
else {
var modules = JSON.parse(data).modules;
//update the pipe with the tables (i.e., modules)
pipe.tables = _.map(modules, function(module) {
return { name: module.module_key, labelPlural: (module.module_label && module.module_label.length > 0) ? module.module_label : module.module_key };
});
//pass the updated pipe to the callback
callback(null, pipe);
}
}
);
}
}
);
});
};
var global = require('bluemix-helper-config').global;
var OAuth = require('oauth').OAuth;
var qs = require('querystring');
var siteUrl = "http://127.0.0.1:80/sugarcrm"; //should match your SugarCRM's site_url (in config.php)
var ConnectorUtil = {
...
//endpoints to be used when negotiating OAuth flow with SugarCRM
oauthEndPoint: {
redirect: global.getHostUrl() + "/authCallback",
rest: siteUrl + "/service/v4/rest.php",
approveRequestToken: siteUrl + "/index.php?module=OAuthTokens&action=authorize&token=",
requestAccessToken: siteUrl + "/service/v4/rest.php?method=oauth_access_token&oauth_verifier="
},
//client to be used for making OAuth calls and API requests
oauthClient: function(pipe, verifier) {
return new OAuth(
this.oauthEndPoint.rest,
this.oauthEndPoint.requestAccessToken + verifier,
pipe.clientId,
pipe.clientSecret,
'1.0A',
this.oauthEndPoint.redirect,
'HMAC-SHA1'
);
},
//various APIs for making SugarCRM REST calls
restApi: {
getAvailableModules: function(pipe) { return _getAvailMods(pipe); }
}
};
/**
* Return the REST API url in the format of
* http://{site_url}/service/v4/rest.php?method={method_name}&oauth_token={access_token}&input_type=JSON&response_type=JSON&rest_data={rest_data_json}
*/
var _getMethod = function(method_name, pipe, rest_data_json) {
return ConnectorUtil.oauthEndPoint.rest + '?' + qs.stringify({
method: method_name,
oauth_token: pipe.oauth.oauth_access_token,
input_type: "JSON",
response_type: "JSON",
rest_data: JSON.stringify(rest_data_json || {})
});
};
var _getAvailMods = function(pipe) {
return _getMethod(
"get_available_modules",
pipe,
{ session:"", filter:"default" /* visible modules only */ }
);
};
...
For the Simple Data Pipe — for connectors using the simplified approach — when a run is initiated the connection to the data source should be handled in the doConnectStep function, which should confirm the access token is still valid and refresh the token if necessary. Retrieving the records to copy into Cloudant should be handled by the fetchRecords function.
- Implement the
doConnectStepfunction - Using the access token (stored in the pipe earlier), connect to the data source to confirm token is still valid. For SugarCRM 6.x make the REST call passing
method=oauth_access - Refresh the token if access token is no longer valid
- Call
done()when completed - Implement the
fetchRecordsfunction - Obtain the desired records for the given table. For SugarCRM 6.x make the REST call passing
method=get_entry_list,module_name=<table/module> - Update the expected number of records in
pipeRunStats.expectedTotalRecords - Call the
pushRecordFnfunction passing in the array of records - Call
done()
Simple Data Pipe is now able to retrieve records from the data source!
this.doConnectStep = function(done, pipeRunStep, pipeRunStats, logger, pipe, pipeRunner) {
console.log("Calling doConnectStep() for", pipe.name);
var oauth = connUtil.oauthClient(pipe);
try {
//connect to the data source, check access token is still valid
oauth.get(
connUtil.restApi.oauthAccess(pipe),
pipe.oauth.oauth_access_token,
pipe.oauth.oauth_access_token_secret,
function(err, data, res) {
if (err) {
logger.error("doConnectStep > oauth_access - error: ", err);
done(err);
}
else {
done();
}
}
);
}
catch(e) {
logger.error("doConnectStep exception:", e);
done(e);
}
};
this.fetchRecords = function(table, pushRecordFn, done, pipeRunStep, pipeRunStats, logger, pipe, pipeRunner) {
console.log("Calling fetchRecords() for", table.name);
var oauth = connUtil.oauthClient(pipe);
try {
//fetch list of entries for the given table
oauth.get(
connUtil.restApi.getEntryList(pipe, table),
pipe.oauth.oauth_access_token,
pipe.oauth.oauth_access_token_secret,
function(err, data, res) {
if (err) {
logger.error("fetchRecords > get_entry_list - error:", err);
return done(err);
}
else {
var dataJson = {};
try {
dataJson = JSON.parse(data);
if (typeof dataJson.total_count == "undefined") {
logger.info("Skipping table", table.name, ":", data);
}
else {
//set expected number of records so percentage complete can be calculated
if (pipeRunStats.expectedTotalRecords) {
pipeRunStats.expectedTotalRecords += dataJson.entry_list.length;
}
else {
pipeRunStats.expectedTotalRecords = dataJson.entry_list.length;
}
//send the list of entries to retrieve
pushRecordFn(dataJson.entry_list);
}
}
catch(e) {
logger.error("Error with table", table.name, ":", data);
}
done();
}
}
);
}
catch(e) {
logger.error("fetchRecords exception: ", e);
done(e);
}
}
...
var ConnectorUtil = {
...
//various APIs for making SugarCRM REST calls
restApi: {
...
oauthAccess: function(pipe) { return _getOAuthAccess(pipe); },
getEntryList: function(pipe, table) { return _getEntryList(pipe, table);
}
}
};
...
var _getOAuthAccess = function(pipe) {
return _getMethod(
"oauth_access",
pipe,
{ session: "" }
);
};
var _getEntryList = function(pipe, table) {
return _getMethod(
"get_entry_list",
pipe,
{
session: "",
module_name: table.name,
query: "",
order_by: "",
offset: 0,
select_fields: [],
link_name_to_fields_array: [],
max_results: 1000, //if > 0 replaces $sugar_config['list_max_entries_per_page']
deleted: false,
favorites: false
}
);
};
...
Console logging messages that appear in the development environment can be added using console.log()
In addition using the logger, which is passed into doConnectStep and fetchRecords, logging messages can be added to the pipe run logs (e.g., logger.info(), logger.error())
- Check browser for JavaScript errors
- Check development environment console for errors when starting Simple Data Pipe
- Confirm connector JavaScript file name is
index.js - Confirm connector code is present under /pipes/node_modules
- Check for errors in development environment console
- Confirm OAuth is configured/enabled in the data source environment
- Confirm data source OAuth flow requirements
- Check for errors in development environment console
- Confirm the correct data source API is being called with appropriate parameters
- Hard code the tables list (see the SampleConnector2 example) to confirm rest of the connector code is working properly
Depending on the number of tables and records being copied the pipe run may take some time.
- Review the pipe run log for status and possible errors
- Monitor dataWorks activities
The resulting connector and complete files from this tutorial can be found in the connector-pipes-sdk branch of the SugarCRM connector GitHub repo.
See the Simple Data Pipe GitHub repo for additional information and resources, as well as, the Simple Data Pipe landing page.