Skip to content
This repository was archived by the owner on Jul 26, 2025. It is now read-only.

Commit e88bb8d

Browse files
author
Charith Ellawala
committed
Initial commit. Rule annotation works.
0 parents  commit e88bb8d

File tree

6 files changed

+452
-0
lines changed

6 files changed

+452
-0
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
.idea
2+
*.iml
3+
target

README.md

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
Kafka JUnit Rule
2+
=================
3+
4+
A work-in-progress JUnit rule for starting and tearing down a Kafka broker during tests.
5+
6+
7+
Installation
8+
-------------
9+
10+
Use https://jitpack.io/ until I get around to doing a proper release to Maven Central.
11+
12+
13+
Usage
14+
------
15+
16+
Create an instance of the rule in your test class and annotate it with `@Rule`.
17+
18+
```
19+
@Rule
20+
public KafkaJunitRule kafkaRule = new KafkaJunitRule();
21+
```
22+
23+
`kafkaRule` can now be referenced from within your test methods.
24+
25+
```
26+
@Test
27+
public void testSomething(){
28+
// Use the built-in sync producer configuration
29+
ProducerConfig producerConfig = kafkaRule.producerConfig();
30+
31+
// Use the bult-in consumer configuration
32+
ConsumerConfig consumerConfig = kafkaRule.consumerConfig();
33+
34+
// Alternatively, the Zookeeper connection String and the broker port can be retrieved to generate your own config
35+
String zkConnStr = kafkaRule.zookeeperConnectionString();
36+
int brokerPort = kafkaRule.kafkaBrokerPort();
37+
38+
...
39+
}
40+
```

pom.xml

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
~ Copyright 2015 Charith Ellawala
4+
~
5+
~ Licensed under the Apache License, Version 2.0 (the "License");
6+
~ you may not use this file except in compliance with the License.
7+
~ You may obtain a copy of the License at
8+
~
9+
~ http://www.apache.org/licenses/LICENSE-2.0
10+
~
11+
~ Unless required by applicable law or agreed to in writing, software
12+
~ distributed under the License is distributed on an "AS IS" BASIS,
13+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
~ See the License for the specific language governing permissions and
15+
~ limitations under the License.
16+
-->
17+
18+
<project xmlns="http://maven.apache.org/POM/4.0.0"
19+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
20+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
21+
<modelVersion>4.0.0</modelVersion>
22+
23+
<groupId>com.github.charithe</groupId>
24+
<artifactId>kafka-junit</artifactId>
25+
<version>1.0-SNAPSHOT</version>
26+
27+
<properties>
28+
<java.version>1.8</java.version>
29+
</properties>
30+
31+
<dependencies>
32+
<dependency>
33+
<groupId>org.apache.curator</groupId>
34+
<artifactId>curator-test</artifactId>
35+
<version>2.7.1</version>
36+
<exclusions>
37+
<exclusion>
38+
<groupId>log4j</groupId>
39+
<artifactId>log4j</artifactId>
40+
</exclusion>
41+
</exclusions>
42+
</dependency>
43+
<dependency>
44+
<groupId>org.apache.kafka</groupId>
45+
<artifactId>kafka_2.10</artifactId>
46+
<version>0.8.2.0</version>
47+
<exclusions>
48+
<exclusion>
49+
<groupId>log4j</groupId>
50+
<artifactId>log4j</artifactId>
51+
</exclusion>
52+
</exclusions>
53+
</dependency>
54+
<dependency>
55+
<groupId>org.apache.kafka</groupId>
56+
<artifactId>kafka-clients</artifactId>
57+
<version>0.8.2.0</version>
58+
<exclusions>
59+
<exclusion>
60+
<groupId>log4j</groupId>
61+
<artifactId>log4j</artifactId>
62+
</exclusion>
63+
</exclusions>
64+
</dependency>
65+
<dependency>
66+
<groupId>junit</groupId>
67+
<artifactId>junit</artifactId>
68+
<version>4.12</version>
69+
</dependency>
70+
<dependency>
71+
<groupId>org.slf4j</groupId>
72+
<artifactId>slf4j-api</artifactId>
73+
<version>1.7.10</version>
74+
</dependency>
75+
<dependency>
76+
<groupId>org.slf4j</groupId>
77+
<artifactId>log4j-over-slf4j</artifactId>
78+
<version>1.7.10</version>
79+
</dependency>
80+
<dependency>
81+
<groupId>ch.qos.logback</groupId>
82+
<artifactId>logback-classic</artifactId>
83+
<version>1.1.2</version>
84+
<scope>test</scope>
85+
</dependency>
86+
</dependencies>
87+
<build>
88+
<plugins>
89+
<plugin>
90+
<groupId>org.apache.maven.plugins</groupId>
91+
<artifactId>maven-compiler-plugin</artifactId>
92+
<version>3.1</version>
93+
<configuration>
94+
<source>${java.version}</source>
95+
<target>${java.version}</target>
96+
</configuration>
97+
</plugin>
98+
<plugin>
99+
<groupId>org.apache.maven.plugins</groupId>
100+
<artifactId>maven-javadoc-plugin</artifactId>
101+
<version>2.9.1</version>
102+
<executions>
103+
<execution>
104+
<phase>package</phase>
105+
<goals>
106+
<goal>javadoc</goal>
107+
</goals>
108+
</execution>
109+
</executions>
110+
</plugin>
111+
</plugins>
112+
</build>
113+
</project>
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/*
2+
* Copyright 2015 Charith Ellawala
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.github.charithe.kafka;
18+
19+
import kafka.consumer.ConsumerConfig;
20+
import kafka.producer.ProducerConfig;
21+
import kafka.server.KafkaConfig;
22+
import kafka.server.KafkaServerStartable;
23+
import org.apache.curator.test.InstanceSpec;
24+
import org.apache.curator.test.TestingServer;
25+
import org.junit.rules.TestRule;
26+
import org.junit.runner.Description;
27+
import org.junit.runners.model.Statement;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
import java.io.IOException;
32+
import java.nio.file.FileVisitResult;
33+
import java.nio.file.Files;
34+
import java.nio.file.Path;
35+
import java.nio.file.SimpleFileVisitor;
36+
import java.nio.file.attribute.BasicFileAttributes;
37+
import java.util.Properties;
38+
39+
/**
40+
* Starts up a local Zookeeper and a Kafka broker
41+
*/
42+
public class KafkaJunitRule implements TestRule {
43+
44+
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaJunitRule.class);
45+
46+
private TestingServer zookeeper;
47+
private KafkaServerStartable kafkaServer;
48+
49+
private int kafkaPort = 9092;
50+
private Path kafkaLogDir;
51+
52+
@Override
53+
public Statement apply(final Statement statement, Description description) {
54+
return new Statement() {
55+
@Override
56+
public void evaluate() throws Throwable {
57+
try {
58+
startKafkaServer();
59+
statement.evaluate();
60+
} finally {
61+
stopKafkaServer();
62+
}
63+
}
64+
};
65+
}
66+
67+
private void startKafkaServer() throws Exception {
68+
zookeeper = new TestingServer(true);
69+
String zkQuorumStr = zookeeper.getConnectString();
70+
KafkaConfig kafkaConfig = buildKafkaConfig(zkQuorumStr);
71+
72+
LOGGER.info("Starting Kafka server with config: {}", kafkaConfig.props().props());
73+
kafkaServer = new KafkaServerStartable(kafkaConfig);
74+
kafkaServer.startup();
75+
}
76+
77+
private KafkaConfig buildKafkaConfig(String zookeeperQuorum) throws IOException {
78+
kafkaLogDir = Files.createTempDirectory("kafka_junit");
79+
kafkaPort = InstanceSpec.getRandomPort();
80+
81+
Properties props = new Properties();
82+
props.put("port", kafkaPort + "");
83+
props.put("broker.id", "1");
84+
props.put("log.dirs", kafkaLogDir.toAbsolutePath().toString());
85+
props.put("zookeeper.connect", zookeeperQuorum);
86+
87+
88+
return new KafkaConfig(props);
89+
}
90+
91+
private void stopKafkaServer() throws IOException {
92+
if (kafkaServer != null) {
93+
LOGGER.info("Shutting down Kafka Server");
94+
kafkaServer.shutdown();
95+
}
96+
97+
if (zookeeper != null) {
98+
LOGGER.info("Shutting down Zookeeper");
99+
zookeeper.close();
100+
}
101+
102+
if (Files.exists(kafkaLogDir)) {
103+
LOGGER.info("Deleting the log dir: {}", kafkaLogDir);
104+
Files.walkFileTree(kafkaLogDir, new SimpleFileVisitor<Path>() {
105+
@Override
106+
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
107+
Files.deleteIfExists(file);
108+
return FileVisitResult.CONTINUE;
109+
}
110+
111+
@Override
112+
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
113+
Files.deleteIfExists(dir);
114+
return FileVisitResult.CONTINUE;
115+
}
116+
});
117+
}
118+
}
119+
120+
/**
121+
* Create a producer configuration.
122+
* Sets the serializer class to "StringEncoder" and producer type to "sync"
123+
*
124+
* @return {@link ProducerConfig}
125+
*/
126+
public ProducerConfig producerConfig() {
127+
Properties props = new Properties();
128+
props.put("metadata.broker.list", "localhost:" + kafkaPort);
129+
props.put("serializer.class", "kafka.serializer.StringEncoder");
130+
props.put("producer.type", "sync");
131+
props.put("request.required.acks", "1");
132+
133+
return new ProducerConfig(props);
134+
}
135+
136+
/**
137+
* Create a consumer configuration
138+
* Offset is set to "smallest"
139+
* @return {@link ConsumerConfig}
140+
*/
141+
public ConsumerConfig consumerConfig() {
142+
Properties props = new Properties();
143+
props.put("zookeeper.connect", zookeeper.getConnectString());
144+
props.put("group.id", "kafka-junit-consumer");
145+
props.put("zookeeper.session.timeout.ms", "400");
146+
props.put("zookeeper.sync.time.ms", "200");
147+
props.put("auto.commit.interval.ms", "1000");
148+
props.put("auto.offset.reset", "smallest");
149+
return new ConsumerConfig(props);
150+
}
151+
152+
/**
153+
* Get the Kafka log directory
154+
* @return kafka log directory path
155+
*/
156+
public Path kafkaLogDir(){
157+
return kafkaLogDir;
158+
}
159+
160+
/**
161+
* Get the kafka broker port
162+
* @return broker port
163+
*/
164+
public int kafkaBrokerPort(){
165+
return kafkaPort;
166+
}
167+
168+
/**
169+
* Get the zookeeper port
170+
* @return zookeeper port
171+
*/
172+
public int zookeeperPort(){
173+
return zookeeper.getPort();
174+
}
175+
176+
/**
177+
* Get the zookeeper connection string
178+
* @return zookeeper connection string
179+
*/
180+
public String zookeeperConnectionString(){
181+
return zookeeper.getConnectString();
182+
}
183+
}

0 commit comments

Comments
 (0)