Skip to content
This repository was archived by the owner on Mar 8, 2020. It is now read-only.

Commit 670bb83

Browse files
authored
[Master] Add checks for eventhub failure (#4492)
Missing checks in startListening and waitForEvents to ensure we can still get a commit notification Signed-off-by: Dave Kelsey <d_kelsey@uk.ibm.com>
1 parent 985af23 commit 670bb83

File tree

3 files changed

+113
-28
lines changed

3 files changed

+113
-28
lines changed

packages/composer-connector-hlfv1/lib/hlfconnection.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1032,7 +1032,7 @@ class HLFConnection extends Connection {
10321032
const proposal = results[1];
10331033
const header = results[2];
10341034

1035-
eventHandler = HLFConnection.createTxEventHandler(this.eventHubs, txId.getTransactionID(), this.commitTimeout);
1035+
eventHandler = HLFConnection.createTxEventHandler(this.eventHubs, txId.getTransactionID(), this.commitTimeout, this.requiredEventHubs);
10361036
eventHandler.startListening();
10371037
LOG.debug(method, 'TxEventHandler started listening, sending valid responses to the orderer');
10381038
LOG.perf(method, 'Total duration to prepare proposals for orderer: ', txId, t0);

packages/composer-connector-hlfv1/lib/hlftxeventhandler.js

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,20 @@ class HLFTxEventHandler {
3030
* @param {EventHub[]} eventHubs the event hubs to listen for tx events
3131
* @param {String} txId the txid that is driving the events to occur
3232
* @param {Integer} timeout how long (in seconds) to wait for events to occur.
33+
* @param {Integer} requiredEventHubs number of required event hubs, either 0 or 1, default to 0
34+
* for old composer be
3335
*/
34-
constructor(eventHubs, txId, timeout) {
36+
constructor(eventHubs, txId, timeout, requiredEventHubs = 1) {
3537
const method = 'constructor';
3638
// Don't log the eventHub objects they are too large
37-
LOG.entry(method, txId, timeout);
39+
LOG.entry(method, txId, timeout, requiredEventHubs);
3840
this.eventHubs = eventHubs || [];
3941
this.txId = txId || '';
4042
this.listenerPromises = [];
4143
this.timeoutHandles = [];
4244
this.timeout = timeout || 0;
45+
this.responseCount = 0;
46+
this.requiredEventHubs = requiredEventHubs;
4347
LOG.exit(method);
4448
}
4549

@@ -66,6 +70,7 @@ class HLFTxEventHandler {
6670

6771
eh.registerTxEvent(this.txId,
6872
(tx, code) => {
73+
this.responseCount++;
6974
clearTimeout(handle);
7075
eh.unregisterTxEvent(this.txId);
7176
if (code !== 'VALID') {
@@ -90,23 +95,35 @@ class HLFTxEventHandler {
9095
this.timeoutHandles.push(handle);
9196
}
9297
});
98+
if (this.listenerPromises.length < this.requiredEventHubs) {
99+
this.cancelListening();
100+
const msg = 'No connected event hubs. It is required that at least 1 event hub has been connected to receive the commit event';
101+
LOG.error(method, msg);
102+
throw Error(msg);
103+
}
93104
LOG.exit(method);
94105
}
95106

96107
/**
97108
* wait for all event hubs to send the tx event.
98-
* @returns {Promise} a promise which is resolved when all the events have been received, rejected if an error occurs.
99109
*/
100-
waitForEvents() {
110+
async waitForEvents() {
101111
const method = 'waitForEvents';
102112
LOG.entry(method);
113+
// don't need to check against requiredEventHubs as startListening has already checked
114+
// this listenerPromises.length. This ensures the same composer behaviour if requiredEventHubs
115+
// is set to 0.
103116
if (this.listenerPromises.length > 0) {
117+
await Promise.all(this.listenerPromises);
118+
if (this.responseCount < this.requiredEventHubs) {
119+
const msg = 'No event hubs responded. It is required that at least 1 event hub responds with a commit event';
120+
LOG.error(method, msg);
121+
throw Error(msg);
122+
}
104123
LOG.exit(method);
105-
return Promise.all(this.listenerPromises);
106124
}
107125
LOG.warn(method, `No event hubs available to listen on to wait for a commit for transaction '${this.txId}'`);
108126
LOG.exit(method);
109-
return Promise.resolve();
110127
}
111128

112129
/**

packages/composer-connector-hlfv1/test/hlftxeventhandler.js

Lines changed: 89 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,28 @@ describe('HLFTxEventHandler', () => {
4242
});
4343

4444
describe('#startListening', () => {
45-
it('Should do nothing if no events hubs', () => {
45+
it('Should do nothing if no events hubs and none are required', () => {
4646
sandbox.stub(global, 'setTimeout');
47-
let evHandler = new HLFTxEventHandler(null, null, null);
47+
let evHandler = new HLFTxEventHandler(null, null, null, 0);
4848
evHandler.startListening();
4949
sinon.assert.notCalled(global.setTimeout);
50-
evHandler = new HLFTxEventHandler([], '1234', 100);
50+
evHandler = new HLFTxEventHandler([], '1234', 100, 0);
5151
evHandler.startListening();
5252
sinon.assert.notCalled(global.setTimeout);
53+
});
5354

55+
it('Should throw an error if no event hubs and an event hub is required', () => {
56+
sandbox.stub(global, 'setTimeout');
57+
let evHandler = new HLFTxEventHandler(null, null, null);
58+
(() => {
59+
evHandler.startListening();
60+
}).should.throw(/No connected event hubs/);
61+
sinon.assert.notCalled(global.setTimeout);
62+
evHandler = new HLFTxEventHandler([], '1234', 100);
63+
(() => {
64+
evHandler.startListening();
65+
}).should.throw(/No connected event hubs/);
66+
sinon.assert.notCalled(global.setTimeout);
5467
});
5568

5669
it('Should set up a timeout and register for a single event hub', () => {
@@ -110,8 +123,6 @@ describe('HLFTxEventHandler', () => {
110123
sinon.assert.calledOnce(eventhub2.unregisterTxEvent);
111124
sinon.assert.calledWith(eventhub2.unregisterTxEvent, '1234');
112125
});
113-
114-
115126
});
116127

117128
it('Should handle a transaction event response which is valid', async () => {
@@ -129,7 +140,6 @@ describe('HLFTxEventHandler', () => {
129140
} catch(err) {
130141
should.fail(null,null,`${err} unexpected`);
131142
}
132-
133143
});
134144

135145
it('Should handle a transaction event response which is not valid', async () => {
@@ -150,7 +160,7 @@ describe('HLFTxEventHandler', () => {
150160
}
151161
});
152162

153-
it('Should handle a transaction error response', async () => {
163+
it('Should handle a transaction listener error response on the only single event hub', async () => {
154164
sandbox.stub(global, 'setTimeout');
155165
sandbox.stub(global, 'clearTimeout');
156166
sandbox.stub(HLFUtil, 'eventHubConnected').withArgs(eventhub1).returns(true);
@@ -160,16 +170,76 @@ describe('HLFTxEventHandler', () => {
160170
sinon.assert.calledOnce(global.clearTimeout);
161171
sinon.assert.calledOnce(eventhub1.unregisterTxEvent);
162172
sinon.assert.calledWith(eventhub1.unregisterTxEvent, '1234');
173+
await evHandler.waitForEvents().should.be.rejectedWith(/No event hubs responded/);
163174
sinon.assert.calledOnce(logWarnSpy);
164-
try {
165-
await evHandler.waitForEvents();
166-
} catch(err) {
167-
should.fail(null,null,`${err} unexpected`);
168-
}
169-
170175
});
171176
});
172177

178+
it('Should handle a transaction listener error response all event hubs and event hubs required', async () => {
179+
sandbox.stub(global, 'setTimeout');
180+
sandbox.stub(global, 'clearTimeout');
181+
const ehc = sandbox.stub(HLFUtil, 'eventHubConnected');
182+
ehc.withArgs(eventhub1).returns(true);
183+
ehc.withArgs(eventhub2).returns(true);
184+
let evHandler = new HLFTxEventHandler([eventhub1, eventhub2], '1234', 31);
185+
evHandler.startListening();
186+
eventhub1.registerTxEvent.callArgWith(2, new Error('lost connection'));
187+
eventhub2.registerTxEvent.callArgWith(2, new Error('lost connection'));
188+
sinon.assert.calledTwice(global.clearTimeout);
189+
sinon.assert.calledOnce(eventhub1.unregisterTxEvent);
190+
sinon.assert.calledWith(eventhub1.unregisterTxEvent, '1234');
191+
sinon.assert.calledOnce(eventhub2.unregisterTxEvent);
192+
sinon.assert.calledWith(eventhub2.unregisterTxEvent, '1234');
193+
await evHandler.waitForEvents().should.be.rejectedWith(/No event hubs responded/);
194+
sinon.assert.calledTwice(logWarnSpy);
195+
});
196+
197+
it('Should handle a transaction listener error response all event hubs but no event hubs required', async () => {
198+
sandbox.stub(global, 'setTimeout');
199+
sandbox.stub(global, 'clearTimeout');
200+
const ehc = sandbox.stub(HLFUtil, 'eventHubConnected');
201+
ehc.withArgs(eventhub1).returns(true);
202+
ehc.withArgs(eventhub2).returns(true);
203+
let evHandler = new HLFTxEventHandler([eventhub1, eventhub2], '1234', 31, 0);
204+
evHandler.startListening();
205+
eventhub1.registerTxEvent.callArgWith(2, new Error('lost connection'));
206+
eventhub2.registerTxEvent.callArgWith(2, new Error('lost connection'));
207+
sinon.assert.calledTwice(global.clearTimeout);
208+
sinon.assert.calledOnce(eventhub1.unregisterTxEvent);
209+
sinon.assert.calledWith(eventhub1.unregisterTxEvent, '1234');
210+
sinon.assert.calledOnce(eventhub2.unregisterTxEvent);
211+
sinon.assert.calledWith(eventhub2.unregisterTxEvent, '1234');
212+
try {
213+
await evHandler.waitForEvents();
214+
} catch(err) {
215+
should.fail(null,null,`${err} unexpected`);
216+
}
217+
sinon.assert.calledThrice(logWarnSpy);
218+
});
219+
220+
it('Should handle a transaction listener error response on one but not all of the event hubs', async () => {
221+
sandbox.stub(global, 'setTimeout');
222+
sandbox.stub(global, 'clearTimeout');
223+
const ehc = sandbox.stub(HLFUtil, 'eventHubConnected');
224+
ehc.withArgs(eventhub1).returns(true);
225+
ehc.withArgs(eventhub2).returns(true);
226+
let evHandler = new HLFTxEventHandler([eventhub1, eventhub2], '1234', 31);
227+
evHandler.startListening();
228+
eventhub1.registerTxEvent.callArgWith(2, new Error('lost connection'));
229+
eventhub2.registerTxEvent.yield('1234', 'VALID');
230+
sinon.assert.calledTwice(global.clearTimeout);
231+
sinon.assert.calledOnce(eventhub1.unregisterTxEvent);
232+
sinon.assert.calledWith(eventhub1.unregisterTxEvent, '1234');
233+
sinon.assert.calledOnce(eventhub2.unregisterTxEvent);
234+
sinon.assert.calledWith(eventhub2.unregisterTxEvent, '1234');
235+
sinon.assert.calledOnce(logWarnSpy);
236+
try {
237+
await evHandler.waitForEvents();
238+
} catch(err) {
239+
should.fail(null, null, `${err} unexpected`);
240+
}
241+
});
242+
173243
describe('#waitForEvents', () => {
174244
it('Should do nothing if no events hubs', () => {
175245
let evHandler = new HLFTxEventHandler(null, null, null);
@@ -192,8 +262,8 @@ describe('HLFTxEventHandler', () => {
192262
evHandler = new HLFTxEventHandler([eventhub1], '1234', 31);
193263
evHandler.startListening();
194264
evHandler.listenerPromises[0].should.be.instanceOf(Promise);
195-
evHandler.listenerPromises[0] = Promise.reject();
196-
evHandler.waitForEvents().should.eventually.be.rejected;
265+
evHandler.listenerPromises[0] = Promise.reject(new Error('some error'));
266+
evHandler.waitForEvents().should.eventually.be.rejectedWith(/some error/);
197267
});
198268

199269
it('Should do wait more than 1 event', () => {
@@ -206,8 +276,8 @@ describe('HLFTxEventHandler', () => {
206276
evHandler.listenerPromises[0].should.be.instanceOf(Promise);
207277
evHandler.listenerPromises[0] = Promise.resolve();
208278
evHandler.listenerPromises[1].should.be.instanceOf(Promise);
209-
evHandler.listenerPromises[1] = Promise.reject();
210-
evHandler.waitForEvents().should.eventually.be.rejected;
279+
evHandler.listenerPromises[1] = Promise.reject(new Error('some error'));
280+
evHandler.waitForEvents().should.eventually.be.rejectedWith(/some error/);
211281
});
212282

213283
it('Should handle timeout for an event', () => {
@@ -248,7 +318,6 @@ describe('HLFTxEventHandler', () => {
248318
sinon.assert.calledWith(global.clearTimeout.firstCall, 'handle1');
249319
sinon.assert.calledWith(global.clearTimeout.secondCall, 'handle2');
250320
});
251-
252321
});
253322

254323
it('Should handle a transaction event response which isn\'t valid', () => {
@@ -273,7 +342,6 @@ describe('HLFTxEventHandler', () => {
273342
sinon.assert.calledWith(global.clearTimeout.firstCall, 'handle1');
274343
sinon.assert.calledWith(global.clearTimeout.secondCall, 'handle2');
275344
});
276-
277345
});
278346

279347
});
@@ -292,11 +360,11 @@ describe('HLFTxEventHandler', () => {
292360
it('Should do nothing if no events hubs', () => {
293361
sandbox.stub(global, 'setTimeout');
294362
sandbox.stub(global, 'clearTimeout');
295-
let evHandler = new HLFTxEventHandler(null, null, null);
363+
let evHandler = new HLFTxEventHandler(null, null, null, 0);
296364
evHandler.startListening();
297365
evHandler.cancelListening();
298366
sinon.assert.notCalled(global.clearTimeout);
299-
evHandler = new HLFTxEventHandler([], '1234', 100);
367+
evHandler = new HLFTxEventHandler([], '1234', 100, 0);
300368
evHandler.startListening();
301369
evHandler.cancelListening();
302370
sinon.assert.notCalled(global.clearTimeout);

0 commit comments

Comments
 (0)