Firstly, thanks for developing such a nice tool very useful for Kafka development.
I'm using the withRunningKafka method to test publishing to an embedded Kafka instance using ReactiveKafka.
"KafkaEvnetSink" should {
"streams event to kafka" in withRunningKafka {
val kafkaConfig = EmbeddedKafkaConfig()
val brokerList = s"localhost:${kafkaConfig.kafkaPort}"
val topic = "abc"
val sink = Sink.fromSubscriber(new ReactiveKafka().publish(ProducerProperties(brokerList, topic, new StringEncoder())))
Source(1 to 10).map(_.toString).to(sink).run()
consumeFirstStringMessageFrom(topic) shouldBe "1"
}
The above test throws this error:
15:08:12.857 ERROR k.producer.async.DefaultEventHandler - Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: abc
It seems that this is an issue of port and host according this SO post.
Is this a bug or am I using it wrong?