Skip to content

Commit f2de254

Browse files
authored
Prepares Jedis redis checkpoint store for release (Azure#32580)
* Add readme samples. * Clean up sample. * Clean-up second sample. * Add CHANGELOG entry. * Update jedis to the latest release. * Update test assets and rename sample. * Adding javadoc sample. * Removing unchecked throw. * Fix broken link. * Fix localisation.
1 parent 5519076 commit f2de254

File tree

12 files changed

+364
-171
lines changed

12 files changed

+364
-171
lines changed

eng/versioning/external_dependencies.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ org.postgresql:postgresql;42.3.8
112112
org.slf4j:slf4j-api;1.7.36
113113
org.slf4j:slf4j-nop;1.7.36
114114
org.slf4j:slf4j-simple;1.7.36
115-
redis.clients:jedis;4.2.3
115+
redis.clients:jedis;4.3.1
116116
io.lettuce:lettuce-core;6.2.0.RELEASE
117117
org.redisson:redisson;3.17.0
118118
net.bytebuddy:byte-buddy;1.12.20

sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-jedis/CHANGELOG.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@
44

55
### Features Added
66

7-
### Breaking Changes
7+
- Added implementation of `CheckpointStore` with `redis.clients.Jedis`
88

9-
### Bugs Fixed
9+
### Other Changes
1010

11+
#### Dependency Updates
1112

13+
- Add `azure-messaging-eventhubs` dependency to `5.16.0-beta.1`.
14+
- Add `jedis` dependency `4.3.1`.

sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-jedis/README.md

Lines changed: 47 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
Azure Event Hubs Checkpoint Store can be used for storing checkpoints while processing events from Azure Event Hubs.
44
This package makes use of Redis as a persistent store for maintaining checkpoints and partition ownership information.
5-
The `JedisRedisCheckpointStore` provided in this package can be plugged in to `EventProcessor`.
5+
The `JedisRedisCheckpointStore` provided in this package can be plugged in to `EventProcessorClient`.
66

77
[Source code][source_code]| [API reference documentation][api_documentation] | [Product
88
documentation][event_hubs_product_docs] | [Samples][sample_examples]
@@ -18,7 +18,7 @@ documentation][event_hubs_product_docs] | [Samples][sample_examples]
1818
- Azure Event Hubs instance
1919
- Step-by-step guide for [creating an Event Hub using the Azure Portal][event_hubs_create]
2020
- Azure Redis Cache or a suitable alternative Redis server
21-
- Step-by-step guide for [creating a Redis Cache using the Azure Portal][redis_cache]
21+
- Step-by-step guide for [creating a Redis Cache using the Azure Portal][redis_quickstart]
2222

2323
### Include the package
2424
#### Include the BOM file
@@ -66,8 +66,9 @@ add the direct dependency to your project as follows.
6666

6767
### Authenticate the storage container client
6868

69-
In order to create an instance of `JedisCheckpointStore`, a `JedisPool` object must be created. To make this `JedisPool` object, a hostname String and a primary key String are required. These can be used as shown below to create a `JedisPool` object.
70-
69+
In order to create an instance of `JedisCheckpointStore`, a `JedisPool` object must be created. To make this `JedisPool`
70+
object, a hostname String and a primary key String are required. These can be used as shown below to create a
71+
`JedisPool` object.
7172

7273
## Key concepts
7374

@@ -77,16 +78,26 @@ Key concepts are explained in detail [here][key_concepts].
7778
- [Create and run an instance of JedisRedisCheckpointStore][sample_jedis_client]
7879
- [Consume events from all Event Hub partitions][sample_event_processor]
7980

80-
### Create an instance of JedisPool with Azure Redis Cache
81+
### Create an instance of JedisPool
82+
83+
To create an instance of JedisPool using Azure Redis Cache, follow the instructions in
84+
[Use Azure Cache for Redis in Java][redis_quickstart_java] to fetch the hostname and access key. Otherwise, use
85+
connection information from a running Redis instance.
8186

82-
```java
83-
String hostname = "yourHostName.redis.cache.windows.net";
87+
```java readme-sample-createJedis
88+
JedisClientConfig clientConfig = DefaultJedisClientConfig.builder()
89+
.password("<YOUR_REDIS_PRIMARY_ACCESS_KEY>")
90+
.ssl(true)
91+
.build();
8492

85-
String password = "<PRIMARY KEY FOR AZURE REDIS CACHE>";
93+
String redisHostName = "<YOUR_REDIS_HOST_NAME>.redis.cache.windows.net";
94+
HostAndPort hostAndPort = new HostAndPort(redisHostName, 6380);
95+
JedisPool jedisPool = new JedisPool(hostAndPort, clientConfig);
8696

87-
String name = "<NAME OF THE USER CLIENT>"; //this can also be a default value as the connection of Redis Cache is not dependent on this value
97+
// Do things with JedisPool.
8898

89-
JedisPool jedisPool = new JedisPool(poolConfig, hostname, port, 1000, 1000, password, Protocol.DEFAULT_DATABASE, name, true, null, null, null);
99+
// Finally, dispose of resource
100+
jedisPool.close();
90101
```
91102

92103
### Consume events using an Event Processor Client
@@ -95,38 +106,50 @@ To consume events for all partitions of an Event Hub, you'll create an
95106
[`EventProcessorClient`][source_eventprocessorclient] for a specific consumer group. When an Event Hub is created, it
96107
provides a default consumer group that can be used to get started.
97108

98-
The [`EventProcessorClient`][source_eventprocessorclient] will delegate processing of events to a callback function that you
99-
provide, allowing you to focus on the logic needed to provide value while the processor holds responsibility for
100-
managing the underlying consumer operations.
109+
The [`EventProcessorClient`][source_eventprocessorclient] will delegate processing of events to a callback function
110+
that you provide, allowing you to focus on the logic needed to provide value while the processor holds responsibility
111+
for managing the underlying consumer operations.
101112

102113
In our example, we will focus on building the [`EventProcessor`][source_eventprocessorclient], use the
103114
[`JedisRedisCheckpointStore`][source_jedisredischeckpointstore], and a simple callback function to process the events
104115
received from the Event Hubs, writes to console and updates the checkpoint in Blob storage after each event.
105116

106-
```java
107-
JedisPool jedisPool = new JedisPool(poolConfig, hostname, port, 1000, 1000, password, Protocol.DEFAULT_DATABASE, name, true, null, null, null);
117+
```java readme-sample-createCheckpointStore
118+
JedisClientConfig clientConfig = DefaultJedisClientConfig.builder()
119+
.password("<YOUR_REDIS_PRIMARY_ACCESS_KEY>")
120+
.ssl(true)
121+
.build();
122+
123+
String redisHostName = "<YOUR_REDIS_HOST_NAME>.redis.cache.windows.net";
124+
HostAndPort hostAndPort = new HostAndPort(redisHostName, 6380);
125+
JedisPool jedisPool = new JedisPool(hostAndPort, clientConfig);
108126

109127
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
110128
.consumerGroup("<< CONSUMER GROUP NAME >>")
111-
.connectionString("<< EVENT HUB CONNECTION STRING >>")
129+
.connectionString("<< EVENT HUB NAMESPACE CONNECTION STRING >>")
130+
.eventHubName("<< EVENT HUB NAME >>")
112131
.checkpointStore(new JedisRedisCheckpointStore(jedisPool))
113132
.processEvent(eventContext -> {
114133
System.out.println("Partition id = " + eventContext.getPartitionContext().getPartitionId() + " and "
115134
+ "sequence number of event = " + eventContext.getEventData().getSequenceNumber());
116135
})
117-
.processError(errorContext -> {
118-
System.out.println("Error occurred while processing events " + errorContext.getThrowable().getMessage());
136+
.processError(context -> {
137+
System.out.println("Error occurred while processing events " + context.getThrowable().getMessage());
119138
})
120139
.buildEventProcessorClient();
121140

122141
// This will start the processor. It will start processing events from all partitions.
123142
eventProcessorClient.start();
124143

125144
// (for demo purposes only - adding sleep to wait for receiving events)
126-
TimeUnit.SECONDS.sleep(5);
145+
// Your application will probably keep the eventProcessorClient alive until the program ends.
146+
TimeUnit.SECONDS.sleep(2);
127147

128148
// When the user wishes to stop processing events, they can call `stop()`.
129149
eventProcessorClient.stop();
150+
151+
// Dispose of JedisPool resource.
152+
jedisPool.close();
130153
```
131154

132155
## Troubleshooting
@@ -137,13 +160,6 @@ Azure SDK for Java offers a consistent logging story to help aid in troubleshoot
137160
their resolution. The logs produced will capture the flow of an application before reaching the terminal state to help
138161
locate the root issue. View the [logging][logging] wiki for guidance about enabling logging.
139162

140-
### Default SSL library
141-
142-
All client libraries, by default, use the Tomcat-native Boring SSL library to enable native-level performance for SSL
143-
operations. The Boring SSL library is an uber jar containing native libraries for Linux / macOS / Windows, and provides
144-
better performance compared to the default SSL implementation within the JDK. For more information, including how to
145-
reduce the dependency size, refer to the [performance tuning][performance_tuning] section of the wiki.
146-
147163
## Next steps
148164

149165
Get started by exploring the samples [here][samples_readme].
@@ -157,18 +173,17 @@ Guidelines][guidelines] for more information.
157173
[api_documentation]: https://azure.github.io/azure-sdk-for-java
158174
[event_hubs_create]: https://docs.microsoft.com/azure/event-hubs/event-hubs-create
159175
[event_hubs_product_docs]: https://docs.microsoft.com/azure/event-hubs/
160-
[java_8_sdk_javadocs]: https://docs.oracle.com/javase/8/docs/api/java/util/logging/package-summary.html
161176
[jdk_link]: https://docs.microsoft.com/java/azure/jdk/?view=azure-java-stable
162177
[key_concepts]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/README.md#key-concepts
163178
[logging]: https://github.com/Azure/azure-sdk-for-java/wiki/Logging-with-Azure-SDK
164179
[maven]: https://maven.apache.org/
165-
[performance_tuning]: https://github.com/Azure/azure-sdk-for-java/wiki/Performance-Tuning
166-
[redis_cache]: https://docs.microsoft.com/azure/azure-cache-for-redis/cache-configure
180+
[redis_quickstart]: https://learn.microsoft.com/azure/azure-cache-for-redis/quickstart-create-redis
181+
[redis_quickstart_java]: https://learn.microsoft.com/azure/azure-cache-for-redis/cache-java-get-started
167182
[samples_readme]: https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-jedis
168183
[sample_jedis_client]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-jedis/src/samples/java/com/azure/messaging/eventhubs/checkpointstore/jedis/JedisRedisCheckpointStoreSample.java
169-
[sample_event_processor]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-jedis/src/samples/java/com/azure/messaging/eventhubs/checkpointstore/jedis/EventProcessorJedisRedisCheckpointStoreSample.java
170-
[sample_examples]: https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-jedis
184+
[sample_event_processor]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-jedis/src/samples/java/com/azure/messaging/eventhubs/checkpointstore/jedis/EventProcessorClientJedisSample.java
185+
[sample_examples]: https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-jedis/src/samples
171186
[source_code]: https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-jedis
172187
[source_eventprocessorclient]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessorClient.java
173-
[source_jedisredischeckpointstore]: https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-jedis
188+
[source_jedisredischeckpointstore]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-jedis/src/main/java/com/azure/messaging/eventhubs/checkpointstore/jedis/JedisRedisCheckpointStore.java
174189
[guidelines]: https://github.com/Azure/azure-sdk-for-java/blob/main/CONTRIBUTING.md

sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-jedis/pom.xml

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
<dependency>
4747
<groupId>redis.clients</groupId>
4848
<artifactId>jedis</artifactId>
49-
<version>4.2.3</version> <!-- {x-version-update;redis.clients:jedis;external_dependency} -->
49+
<version>4.3.1</version> <!-- {x-version-update;redis.clients:jedis;external_dependency} -->
5050
</dependency>
5151

5252
<!-- Test dependencies -->
@@ -83,24 +83,22 @@
8383
</dependencies>
8484

8585
<build>
86-
<plugins>
87-
<plugin>
88-
<groupId>org.apache.maven.plugins</groupId>
89-
<artifactId>maven-enforcer-plugin</artifactId>
90-
<version>3.0.0-M3</version> <!-- {x-version-update;org.apache.maven.plugins:maven-enforcer-plugin;external_dependency} -->
91-
<configuration>
92-
<rules>
93-
<bannedDependencies>
94-
<includes>
95-
<include>redis.clients:jedis:[4.2.3]</include> <!-- {x-include-update;redis.clients:jedis;external_dependency} -->
96-
<include>org.mockito:mockito-inline:[4.5.1]</include> <!-- {x-include-update;org.mockito:mockito-inline;external_dependency} -->
97-
</includes>
98-
</bannedDependencies>
99-
</rules>
100-
</configuration>
101-
</plugin>
102-
</plugins>
103-
</build>
104-
105-
86+
<plugins>
87+
<plugin>
88+
<groupId>org.apache.maven.plugins</groupId>
89+
<artifactId>maven-enforcer-plugin</artifactId>
90+
<version>3.0.0-M3</version> <!-- {x-version-update;org.apache.maven.plugins:maven-enforcer-plugin;external_dependency} -->
91+
<configuration>
92+
<rules>
93+
<bannedDependencies>
94+
<includes>
95+
<include>redis.clients:jedis:[4.3.1]</include> <!-- {x-include-update;redis.clients:jedis;external_dependency} -->
96+
<include>org.mockito:mockito-inline:[4.5.1]</include> <!-- {x-include-update;org.mockito:mockito-inline;external_dependency} -->
97+
</includes>
98+
</bannedDependencies>
99+
</rules>
100+
</configuration>
101+
</plugin>
102+
</plugins>
103+
</build>
106104
</project>

sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-jedis/src/main/java/com/azure/messaging/eventhubs/checkpointstore/jedis/JedisRedisCheckpointStore.java

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,45 @@
77
import com.azure.core.util.serializer.JsonSerializerProviders;
88
import com.azure.core.util.serializer.TypeReference;
99
import com.azure.messaging.eventhubs.CheckpointStore;
10+
import com.azure.messaging.eventhubs.EventProcessorClient;
11+
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
1012
import com.azure.messaging.eventhubs.models.Checkpoint;
1113
import com.azure.messaging.eventhubs.models.PartitionOwnership;
12-
1314
import reactor.core.Exceptions;
1415
import reactor.core.publisher.Flux;
1516
import reactor.core.publisher.Mono;
17+
import redis.clients.jedis.Jedis;
18+
import redis.clients.jedis.JedisPool;
19+
import redis.clients.jedis.Transaction;
1620

1721
import java.nio.charset.StandardCharsets;
1822
import java.util.ArrayList;
1923
import java.util.List;
2024
import java.util.Set;
2125

22-
import redis.clients.jedis.JedisPool;
23-
import redis.clients.jedis.Jedis;
24-
import redis.clients.jedis.Transaction;
25-
2626
/**
2727
* Implementation of {@link CheckpointStore} that uses Azure Redis Cache, specifically Jedis.
28+
*
29+
* <p><strong>Instantiate checkpoint store</strong></p>
30+
* Demonstrates one way to instantiate the checkpoint store. {@link JedisPool} has multiple ways to create an instance.
31+
*
32+
* <!-- src_embed com.azure.messaging.eventhubs.jedisredischeckpointstore.instantiation -->
33+
* <pre>
34+
* JedisClientConfig clientConfig = DefaultJedisClientConfig.builder&#40;&#41;
35+
* .password&#40;&quot;&lt;YOUR_REDIS_PRIMARY_ACCESS_KEY&gt;&quot;&#41;
36+
* .ssl&#40;true&#41;
37+
* .build&#40;&#41;;
38+
*
39+
* String redisHostName = &quot;&lt;YOUR_REDIS_HOST_NAME&gt;.redis.cache.windows.net&quot;;
40+
* HostAndPort hostAndPort = new HostAndPort&#40;redisHostName, 6380&#41;;
41+
* JedisPool jedisPool = new JedisPool&#40;hostAndPort, clientConfig&#41;;
42+
*
43+
* CheckpointStore checkpointStore = new JedisRedisCheckpointStore&#40;jedisPool&#41;;
44+
* </pre>
45+
* <!-- end com.azure.messaging.eventhubs.jedisredischeckpointstore.instantiation -->
46+
*
47+
* @see EventProcessorClient
48+
* @see EventProcessorClientBuilder
2849
*/
2950
public class JedisRedisCheckpointStore implements CheckpointStore {
3051

@@ -40,7 +61,7 @@ public class JedisRedisCheckpointStore implements CheckpointStore {
4061
* @param jedisPool a JedisPool object that creates a pool connected to the Azure Redis Cache
4162
* @throws IllegalArgumentException thrown when JedisPool object supplied is null
4263
*/
43-
public JedisRedisCheckpointStore(JedisPool jedisPool) throws IllegalArgumentException {
64+
public JedisRedisCheckpointStore(JedisPool jedisPool) {
4465
if (jedisPool == null) {
4566
throw LOGGER.logExceptionAsError(Exceptions
4667
.propagate(new IllegalArgumentException(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.messaging.eventhubs.checkpointstore.jedis;
5+
6+
import com.azure.messaging.eventhubs.EventProcessorClient;
7+
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
8+
import com.azure.messaging.eventhubs.models.ErrorContext;
9+
import com.azure.messaging.eventhubs.models.EventContext;
10+
import redis.clients.jedis.DefaultJedisClientConfig;
11+
import redis.clients.jedis.HostAndPort;
12+
import redis.clients.jedis.JedisClientConfig;
13+
import redis.clients.jedis.JedisPool;
14+
15+
import java.util.concurrent.TimeUnit;
16+
17+
/**
18+
* Sample for using {@link JedisRedisCheckpointStore} with {@link EventProcessorClient}.
19+
*/
20+
public class EventProcessorClientJedisSample {
21+
/**
22+
* The main method to run the sample.
23+
*
24+
* @param args Unused arguments given to the sample
25+
*
26+
* @throws Exception an Exception will be thrown in case of errors while running the sample
27+
*/
28+
public static void main(String[] args) throws Exception {
29+
JedisClientConfig clientConfig = DefaultJedisClientConfig.builder()
30+
.password("<YOUR_REDIS_PRIMARY_ACCESS_KEY>")
31+
.ssl(true)
32+
.build();
33+
34+
String redisHostName = "<YOUR_REDIS_HOST_NAME>.redis.cache.windows.net";
35+
HostAndPort hostAndPort = new HostAndPort(redisHostName, 6380);
36+
JedisPool jedisPool = new JedisPool(hostAndPort, clientConfig);
37+
38+
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
39+
.consumerGroup("<< CONSUMER GROUP NAME >>")
40+
.connectionString("<< EVENT HUB NAMESPACE CONNECTION STRING >>")
41+
.eventHubName("<< EVENT HUB NAME >>")
42+
.processEvent(eventContext -> onEvent(eventContext))
43+
.processError(errorContext -> onError(errorContext))
44+
.checkpointStore(new JedisRedisCheckpointStore(jedisPool))
45+
.buildEventProcessorClient();
46+
47+
// Starts the event processor
48+
eventProcessorClient.start();
49+
50+
// Perform other tasks while the event processor is processing events in the background.
51+
TimeUnit.MINUTES.sleep(5);
52+
53+
// Stops the event processor
54+
eventProcessorClient.stop();
55+
}
56+
57+
private static void onEvent(EventContext eventContext) {
58+
System.out.printf("Processing event from partition %s with sequence number %d %n",
59+
eventContext.getPartitionContext().getPartitionId(), eventContext.getEventData().getSequenceNumber());
60+
if (eventContext.getEventData().getSequenceNumber() % 10 == 0) {
61+
eventContext.updateCheckpoint();
62+
}
63+
}
64+
65+
private static void onError(ErrorContext errorContext) {
66+
System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
67+
errorContext.getPartitionContext().getPartitionId(),
68+
errorContext.getThrowable());
69+
}
70+
}

0 commit comments

Comments
 (0)