Skip to content

Commit e6e8621

Browse files
authored
Merge pull request #33 from sourcefuse/GH-30
fix(core): add support for generic and common consumer
2 parents 0fb5175 + 0221fce commit e6e8621

17 files changed

+248
-55
lines changed

package-lock.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/__tests__/acceptance/application.test.ts

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ describe('end-to-end', () => {
1919
let producerApp: Application;
2020
let startHandler: sinon.SinonStub;
2121
let stopHandler: sinon.SinonStub;
22+
let commonHandler: sinon.SinonStub;
23+
let genericHandler: sinon.SinonStub;
2224
let logger: ILogger;
2325
const kafkaClient = new KafkaClientStub();
2426
before(setupApplications);
@@ -29,6 +31,8 @@ describe('end-to-end', () => {
2931
afterEach(() => {
3032
startHandler.reset();
3133
stopHandler.reset();
34+
commonHandler.reset();
35+
genericHandler.reset();
3236
});
3337

3438
describe('Acceptance: event stream with single topic', () => {
@@ -68,7 +72,42 @@ describe('end-to-end', () => {
6872
);
6973
});
7074

71-
it('should not handle a unspecified events', async () => {
75+
it('should consume from a common consumer with multiple events for a single topic', async () => {
76+
const producerInstance = producerApp.getSync<Producer<TestStream>>(
77+
producerKey(Topics.First),
78+
);
79+
const reset = {
80+
resetTime: new Date(),
81+
};
82+
const pause = {
83+
pauseTime: new Date(),
84+
};
85+
await producerInstance.send(Events.reset, [reset]);
86+
await producerInstance.send(Events.pause, [pause]);
87+
sinon.assert.called(commonHandler);
88+
expect(commonHandler.getCalls()[0].args[0]).to.be.deepEqual(
89+
JSON.parse(JSON.stringify(reset)),
90+
);
91+
expect(commonHandler.getCalls()[1].args[0]).to.be.deepEqual(
92+
JSON.parse(JSON.stringify(pause)),
93+
);
94+
});
95+
96+
it('should consume from a generic consumer without events for a single topic', async () => {
97+
const producerInstance = producerApp.getSync<Producer<TestStream>>(
98+
producerKey(Topics.First),
99+
);
100+
const close = {
101+
closeTime: new Date(),
102+
};
103+
await producerInstance.send(Events.close, [close]);
104+
sinon.assert.called(genericHandler);
105+
expect(genericHandler.getCalls()[0].args[0]).to.be.deepEqual(
106+
JSON.parse(JSON.stringify(close)),
107+
);
108+
});
109+
110+
it('should not handle an unspecified events', async () => {
72111
const warnStub = sinon.stub(logger, 'warn');
73112
const producerInstance = kafkaClient.producer();
74113
const payload = {
@@ -82,21 +121,21 @@ describe('end-to-end', () => {
82121
topic: Topics.First,
83122
messages: [payload],
84123
});
124+
85125
sinon.assert.calledOnce(warnStub);
86126
expect(warnStub.lastCall.args[0]).to.equal(
87-
`${KafkaErrorKeys.UnhandledEvent}: ${JSON.stringify({
88-
topic: Topics.First,
89-
message: payload,
90-
})}`,
127+
`${KafkaErrorKeys.HandleByGenericConsumer}:${undefined}`,
91128
);
92129
});
93130
});
94131

95132
async function setupApplications() {
96133
startHandler = sinon.stub().callsFake(() => {});
97134
stopHandler = sinon.stub().callsFake(() => {});
135+
commonHandler = sinon.stub().callsFake(() => {});
136+
genericHandler = sinon.stub().callsFake(() => {});
98137
logger = {
99-
warn: e => console.warn(e),
138+
warn: e => console.log(e),
100139
log: e => console.log(e),
101140
error: e => console.error(e),
102141
info: e => console.info(e),
@@ -106,6 +145,8 @@ describe('end-to-end', () => {
106145
kafkaClient,
107146
startHandler,
108147
stopHandler,
148+
commonHandler,
149+
genericHandler,
109150
logger,
110151
);
111152
producerApp = await setupProducerApplication(kafkaClient);

src/__tests__/acceptance/fixtures/consumer/consumer-app.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import {KafkaClientBindings} from '../../../../keys';
88
import {KafkaClientStub} from '../../../stubs';
99
import {StartConsumer} from './start-consumer.extension';
1010
import {StopConsumer} from './stop-consumer.extension';
11+
import {CommonConsumer} from './shared-consumer.extension';
12+
import {GenericConsumer} from './generic-consumer.extension';
1113

1214
export class ConsumerApp extends BootMixin(
1315
ServiceMixin(RepositoryMixin(RestApplication)),
@@ -26,6 +28,8 @@ export class ConsumerApp extends BootMixin(
2628
this.component(KafkaClientComponent);
2729
this.service(StartConsumer);
2830
this.service(StopConsumer);
31+
this.service(CommonConsumer);
32+
this.service(GenericConsumer);
2933

3034
this.projectRoot = __dirname;
3135
// Customize @loopback/boot Booter Conventions here
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
import {inject, injectable} from '@loopback/core';
2+
import {asConsumer} from '../../../../keys';
3+
import {TestStream} from '../stream';
4+
import {IGenericConsumer, StreamHandler} from '../../../../types';
5+
import {Topics} from '../topics.enum';
6+
import {Events} from '../events.enum';
7+
8+
@injectable(asConsumer)
9+
export class GenericConsumer implements IGenericConsumer<TestStream> {
10+
constructor(
11+
@inject('eventHandler.generic')
12+
public handler: StreamHandler<TestStream, Events>,
13+
) {}
14+
topic: Topics.First = Topics.First;
15+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import {inject, injectable} from '@loopback/core';
2+
import {asConsumer} from '../../../../keys';
3+
import {TestStream} from '../stream';
4+
import {ISharedConsumer, StreamHandler} from '../../../../types';
5+
import {Topics} from '../topics.enum';
6+
import {Events} from '../events.enum';
7+
8+
@injectable(asConsumer)
9+
export class CommonConsumer implements ISharedConsumer<TestStream> {
10+
constructor(
11+
@inject('eventHandler.common')
12+
public handler: StreamHandler<TestStream, Events>,
13+
) {}
14+
topic: Topics.First = Topics.First;
15+
events = [Events.pause, Events.reset];
16+
}

src/__tests__/acceptance/fixtures/consumer/start-consumer.extension.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
1-
import {consumer} from '../../../../decorators';
1+
import {injectable} from '@loopback/core';
22
import {eventHandler} from '../../../../decorators/handler.decorator';
3-
import {StreamHandler} from '../../../../types';
3+
import {asConsumer} from '../../../../keys';
4+
import {IConsumer, StreamHandler} from '../../../../types';
45
import {Events} from '../events.enum';
56
import {TestStream} from '../stream';
67
import {Topics} from '../topics.enum';
78

8-
@consumer<TestStream, Events.start>()
9-
export class StartConsumer {
9+
@injectable(asConsumer)
10+
export class StartConsumer implements IConsumer<TestStream, Events.start> {
1011
constructor(
1112
@eventHandler<TestStream>(Events.start)
1213
public handler: StreamHandler<TestStream, Events.start>,

src/__tests__/acceptance/fixtures/consumer/stop-consumer.extension.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1-
import {consumer} from '../../../../decorators';
1+
import {injectable} from '@loopback/core';
22
import {eventHandler} from '../../../../decorators/handler.decorator';
3+
import {asConsumer} from '../../../../keys';
34
import {IConsumer, StreamHandler} from '../../../../types';
45
import {Events} from '../events.enum';
56
import {TestStream} from '../stream';
67
import {Topics} from '../topics.enum';
78

8-
@consumer<TestStream, Events.stop>()
9+
@injectable(asConsumer)
910
export class StopConsumer implements IConsumer<TestStream, Events.stop> {
1011
constructor(
1112
@eventHandler<TestStream>(Events.stop)
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
export enum Events {
22
start = 'start',
33
stop = 'stop',
4+
reset = 'reset',
5+
pause = 'pause',
6+
close = 'close',
47
}

src/__tests__/acceptance/fixtures/stream.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,8 @@ export interface TestStream extends IStreamDefinition {
88
messages: {
99
[Events.start]: StartEvent;
1010
[Events.stop]: StopEvent;
11+
[Events.reset]: {};
12+
[Events.pause]: {};
13+
[Events.close]: {};
1114
};
1215
}

src/__tests__/acceptance/test-helper.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ export async function setupConsumerApplication(
1313
kafkaStub: KafkaClientStub,
1414
startHandler: StreamHandler<TestStream, Events.start>,
1515
stopHandler: StreamHandler<TestStream, Events.stop>,
16+
commonHandler: StreamHandler<TestStream, Events>,
17+
genericHandler: StreamHandler<TestStream, Events>,
1618
logger: ILogger,
1719
): Promise<Application> {
1820
const restConfig = givenHttpServerConfig({});
@@ -29,6 +31,8 @@ export async function setupConsumerApplication(
2931
app
3032
.bind(eventHandlerKey<TestStream, Events.stop>(Events.stop))
3133
.to(stopHandler);
34+
app.bind('eventHandler.common').to(commonHandler);
35+
app.bind('eventHandler.generic').to(genericHandler);
3236
app.bind(LOGGER.LOGGER_INJECT).to(logger);
3337
await app.boot();
3438
await app.start();

0 commit comments

Comments
 (0)