Skip to content

Commit 0b6d025

Browse files
authored
Merge pull request #77 from sourcefuse/generic-consumer
fix(core): add subscription of generic topics
2 parents 6b1e76a + 5076645 commit 0b6d025

17 files changed

+182
-22
lines changed

README.md

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
</a>
2424
</p>
2525

26-
2726
## Overview
2827

2928
A Kafka Client for Loopback4 built on top of [KafkaJS](https://kafka.js.org/).
@@ -97,7 +96,13 @@ export class TestStream implements IStreamDefinition {
9796

9897
### Consumer
9998

100-
A Consumer is a [`loopback extension`](https://loopback.io/doc/en/lb4/Extension-point-and-extensions.html) that is used by the [`KafkaConsumerService`](./src/services/kafka-consumer.service.ts) to initialize consumers. It must implement the `IConsumer` interface and should be using the `@consumer()` decorator. If you want the consumers to start at the start of your application, you should pass the `initObservers` config to the Component configuration.
99+
A Consumer is a [`loopback extension`](https://loopback.io/doc/en/lb4/Extension-point-and-extensions.html) that is used by the [`KafkaConsumerService`](./src/services/kafka-consumer.service.ts) to initialize consumers. It must implement one of the `IConsumer`, `ISharedConsumer` or `IGenericConsumer` interfaces and should be using the `asConsumer` binding template. If you want the consumers to start at the start of your application, you should pass the `initObservers` config to the Component configuration.
100+
101+
- `IConsumer` - simple consumer for 1 event in a stream
102+
- `ISharedConsumer` - consumer that consumes data for multiple events in a stream (defined with an array of events)
103+
- `IGenericConsumer` - consumer that consumes data for all events in a stream/topic (defined without any event). By default it is not triggered for an event if a more specific consumer is bound for that event. This behaviour can be changed using the `alwaysRunGenericConsumer` option in consumer configuration.
104+
105+
You can bind any consumer related configuration using the `KafkaClientBindings.ConsumerConfiguration` key. It accepts all the options of KafkaJS, along with an additional option - `alwaysRunGenericConsumer` - this option runs any generic consumer if available always, even if more specific consumers are bound by the client(only the specific consumer would run if this option is false or not provided).
101106

102107
##### Example
103108

@@ -112,13 +117,14 @@ this.configure(KafkaConnectorComponentBindings.COMPONENT).to({
112117

113118
```ts
114119
// start.consumer.ts
120+
// use @genericConsumer for a generic consumer
115121
@consumer<TestStream, Events.start>()
116122
export class StartConsumer implements IConsumer<TestStream, Events.start> {
117123
constructor(
118124
@inject('test.handler.start')
119125
public handler: StreamHandler<TestStream, Events.start>,
120126
) {}
121-
topic: Topics.First = Topics.First;
127+
topic = Topics.First;
122128
event: Events.start = Events.start;
123129
// you can write the handler as a method
124130
handler(payload: StartEvent) {
@@ -149,7 +155,7 @@ export class StartConsumer implements IConsumer<TestStream, Events.start> {
149155
@eventHandler<TestStream>(Events.Start)
150156
public handler: StreamHandler<TestStream, Events.start>,
151157
) {}
152-
topic: Topics.First = Topics.First;
158+
topic = Topics.First;
153159
event: Events.start = Events.start;
154160
}
155161
```
@@ -158,6 +164,7 @@ export class StartConsumer implements IConsumer<TestStream, Events.start> {
158164

159165
A Producer is a loopback service for producing message for a particular topic, you can inject a producer using the `@producer(TOPIC_NAME)` decorator.
160166
Note: The topic name passed to decorator must be first configured in the Component configuration's topic property -
167+
If you want to produce a raw message without any event type, you can use the `@genericProducer(TOPIC_NAME)` decorator, note that in this case, the topic name must be passed in the genericTopics property of the component configuration.
161168

162169
#### Example
163170

package-lock.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/__tests__/acceptance/application.test.ts

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import {
1313
setupConsumerApplication,
1414
setupProducerApplication,
1515
} from './test-helper';
16+
import {GenericProducerService} from './fixtures/producer/generic-producer.service';
1617

1718
describe('end-to-end', () => {
1819
let consumerApp: Application;
@@ -93,18 +94,31 @@ describe('end-to-end', () => {
9394
);
9495
});
9596

97+
it('should produce from a generic producer without events for a single topic', async () => {
98+
const producerService = producerApp.getSync<GenericProducerService>(
99+
`services.GenericProducerService`,
100+
);
101+
const message = 'message';
102+
await producerService.produceMessage(message);
103+
sinon.assert.called(genericHandler);
104+
expect(genericHandler.getCalls()[0].args[0]).to.be.deepEqual({
105+
data: message,
106+
});
107+
});
108+
96109
it('should consume from a generic consumer without events for a single topic', async () => {
97110
const producerInstance = producerApp.getSync<Producer<TestStream>>(
98-
producerKey(Topics.First),
111+
producerKey(Topics.Generic),
99112
);
100113
const close = {
101114
closeTime: new Date(),
102115
};
103116
await producerInstance.send(Events.close, [close]);
104117
sinon.assert.called(genericHandler);
105-
expect(genericHandler.getCalls()[0].args[0]).to.be.deepEqual(
106-
JSON.parse(JSON.stringify(close)),
107-
);
118+
expect(genericHandler.getCalls()[0].args[0]).to.be.deepEqual({
119+
data: JSON.parse(JSON.stringify(close)),
120+
event: Events.close,
121+
});
108122
});
109123

110124
it('should not handle an unspecified events', async () => {
@@ -118,7 +132,7 @@ describe('end-to-end', () => {
118132
};
119133
await producerInstance.connect();
120134
producerInstance.send({
121-
topic: Topics.First,
135+
topic: Topics.Generic,
122136
messages: [payload],
123137
});
124138

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
11
import {inject, injectable} from '@loopback/core';
22
import {asConsumer} from '../../../../keys';
3-
import {TestStream} from '../stream';
4-
import {IGenericConsumer, StreamHandler} from '../../../../types';
3+
import {GenericStream} from '../stream';
4+
import {GenericStreamHandler, IGenericConsumer} from '../../../../types';
55
import {Topics} from '../topics.enum';
6-
import {Events} from '../events.enum';
76

87
@injectable(asConsumer)
9-
export class GenericConsumer implements IGenericConsumer<TestStream> {
8+
export class GenericConsumer implements IGenericConsumer<GenericStream> {
109
constructor(
1110
@inject('eventHandler.generic')
12-
public handler: StreamHandler<TestStream, Events>,
11+
public handler: GenericStreamHandler<GenericStream>,
1312
) {}
14-
topic: Topics.First = Topics.First;
13+
topic: Topics.Generic = Topics.Generic;
1514
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import {genericProducer} from '../../../../decorators/generic-producer.decorator';
2+
import {GenericProducer} from '../../../../types';
3+
import {GenericStream} from '../stream';
4+
import {Topics} from '../topics.enum';
5+
6+
export class GenericProducerService {
7+
constructor(
8+
@genericProducer(Topics.Generic)
9+
private producer: GenericProducer<GenericStream>,
10+
) {}
11+
12+
async produceMessage(message: string): Promise<void> {
13+
await this.producer.send([
14+
{
15+
data: message,
16+
},
17+
]);
18+
}
19+
}

src/__tests__/acceptance/fixtures/producer/producer-app.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {KafkaClientComponent} from '../../../../component';
77
import {KafkaClientBindings} from '../../../../keys';
88
import {KafkaClientStub} from '../../../stubs';
99
import {Topics} from '../topics.enum';
10+
import {GenericProducerService} from './generic-producer.service';
1011

1112
export class ProducerApp extends BootMixin(
1213
ServiceMixin(RepositoryMixin(RestApplication)),
@@ -16,11 +17,13 @@ export class ProducerApp extends BootMixin(
1617

1718
this.configure(KafkaClientBindings.Component).to({
1819
topics: Object.values(Topics) as string[],
20+
genericTopics: [Topics.Generic],
1921
});
2022
this.bind<KafkaClientStub>(KafkaClientBindings.KafkaClient).to(
2123
options.client,
2224
);
2325
this.component(KafkaClientComponent);
26+
this.service(GenericProducerService);
2427

2528
this.projectRoot = __dirname;
2629
// Customize @loopback/boot Booter Conventions here

src/__tests__/acceptance/fixtures/stream.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,10 @@ export interface TestStream extends IStreamDefinition {
1313
[Events.close]: {};
1414
};
1515
}
16+
17+
export interface GenericStream extends IStreamDefinition {
18+
topic: Topics.Generic;
19+
messages: {
20+
data: string;
21+
};
22+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
export enum Topics {
22
First = 'first',
33
Second = 'second',
4+
Generic = 'generic',
45
}

src/component.ts

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,12 @@ import {
1010
} from '@loopback/core';
1111
import {LoggerExtensionComponent} from '@sourceloop/core';
1212
import {Kafka} from 'kafkajs';
13-
import {KafkaClientBindings, producerKey} from './keys';
13+
import {genericProducerKey, KafkaClientBindings, producerKey} from './keys';
1414
import {KafkaObserver} from './observers';
15-
import {KafkaProducerFactoryProvider} from './providers';
15+
import {
16+
GenericKafkaProducerFactoryProvider,
17+
KafkaProducerFactoryProvider,
18+
} from './providers';
1619
import {KafkaConsumerService} from './services/kafka-consumer.service';
1720
import {KafkaClientOptions} from './types';
1821

@@ -39,6 +42,11 @@ export class KafkaClientComponent implements Component {
3942
.toProvider(KafkaProducerFactoryProvider)
4043
.inScope(BindingScope.SINGLETON);
4144

45+
app
46+
.bind(KafkaClientBindings.GenericProducerFactor)
47+
.toProvider(GenericKafkaProducerFactoryProvider)
48+
.inScope(BindingScope.SINGLETON);
49+
4250
app.service(KafkaConsumerService);
4351

4452
if (configuration?.topics) {
@@ -50,6 +58,18 @@ export class KafkaClientComponent implements Component {
5058
.inScope(BindingScope.SINGLETON);
5159
});
5260
}
61+
62+
if (configuration?.genericTopics) {
63+
const genericProducerFactory = app.getSync(
64+
KafkaClientBindings.GenericProducerFactor,
65+
);
66+
configuration.genericTopics.forEach(topic => {
67+
app
68+
.bind(genericProducerKey(topic))
69+
.to(genericProducerFactory(topic))
70+
.inScope(BindingScope.SINGLETON);
71+
});
72+
}
5373
if (configuration?.initObservers) {
5474
app.lifeCycleObserver(KafkaObserver);
5575
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import {Constructor, injectable} from '@loopback/core';
2+
import {asConsumer} from '../keys';
3+
import {IGenericConsumer, IStreamDefinition} from '../types';
4+
5+
export function genericConsumer<T extends IStreamDefinition>() {
6+
return injectable(asConsumer) as (
7+
target: Constructor<IGenericConsumer<T>>,
8+
) => void;
9+
}

0 commit comments

Comments
 (0)