Commit 043db64
committed
ConsumeService: fix client closing causing
## Problem
- calling `_baseConsumer.close()`, outside of the thread the consumer is running in, is invalid
- as docummented in _kafka consumer docs_[^1]
> The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application * making the call. It is the responsibility of the user to ensure that multi-threaded access * is properly synchronized. Un-synchronized access will result in ConcurrentModificationException`.
The exception thrown
```
2021/01/25 23:10:25.961 WARN [ConsumeService] [Thread-1] [kafka-monitoring] [] kac-lc/ConsumeService while trying to close consumer.
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. competing thread is kac-lc consume-service
at com.linkedin.kafka.clients.utils.KafkaConsumerLock.lock(KafkaConsumerLock.java:31) ~[li-apache-kafka-clients-1.0.59.jar:?]
at com.linkedin.kafka.clients.utils.CloseableLock.<init>(CloseableLock.java:18) ~[li-apache-kafka-clients-1.0.59.jar:?]
at com.linkedin.kafka.clients.consumer.LiKafkaInstrumentedConsumerImpl.close(LiKafkaInstrumentedConsumerImpl.java:716) ~[li-apache-kafka-clients-1.0.59.jar:?]
```
## Solution
The recommended solution[^1] is
- to use `consumer.wakeup();` method
- but the method is not yet adopted by the `KMBaseConsumer` interface
- so for now `_baseConsumer.close()` is moved into the thread
- calling stop now only sets `_running.compareAndSet(true, false)`, so the runloop exits
[^1]:[KafkaConsumer.java](https://github.com/apache/kafka/blob/7d61d4505a16f09b85f5eb37adeff9c3534ec02c/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L467-L502)
## Testing Done
Increased `thread.join(5000)` timeout as this implementation is slower to stop due to not interrupting the consumer thread.
`- ./gradlew test`ConcurrentModificationException
1 parent c4a9c71 commit 043db64
File tree
2 files changed
+14
-7
lines changed- src
- main/java/com/linkedin/xinfra/monitor/services
- test/java/com/linkedin/xinfra/monitor/services
2 files changed
+14
-7
lines changedLines changed: 13 additions & 6 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
111 | 111 | | |
112 | 112 | | |
113 | 113 | | |
| 114 | + | |
| 115 | + | |
| 116 | + | |
114 | 117 | | |
115 | 118 | | |
116 | 119 | | |
| |||
211 | 214 | | |
212 | 215 | | |
213 | 216 | | |
| 217 | + | |
| 218 | + | |
| 219 | + | |
214 | 220 | | |
215 | 221 | | |
216 | 222 | | |
| |||
242 | 248 | | |
243 | 249 | | |
244 | 250 | | |
245 | | - | |
246 | | - | |
247 | | - | |
248 | | - | |
249 | | - | |
250 | | - | |
| 251 | + | |
251 | 252 | | |
252 | 253 | | |
253 | 254 | | |
254 | 255 | | |
255 | 256 | | |
| 257 | + | |
| 258 | + | |
| 259 | + | |
| 260 | + | |
| 261 | + | |
| 262 | + | |
256 | 263 | | |
257 | 264 | | |
258 | 265 | | |
| |||
Lines changed: 1 addition & 1 deletion
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
193 | 193 | | |
194 | 194 | | |
195 | 195 | | |
196 | | - | |
| 196 | + | |
197 | 197 | | |
198 | 198 | | |
199 | 199 | | |
| |||
0 commit comments