diff --git a/src/main/scala/spinoco/fs2/kafka/KafkaClient.scala b/src/main/scala/spinoco/fs2/kafka/KafkaClient.scala index a7e9ca0..d1ba3a1 100644 --- a/src/main/scala/spinoco/fs2/kafka/KafkaClient.scala +++ b/src/main/scala/spinoco/fs2/kafka/KafkaClient.scala @@ -396,17 +396,22 @@ object KafkaClient { * @tparam F * @return */ - def leaderFor[F[_] : Sync]( + def leaderFor[F[_]: Sync : Log]( requestMeta: (BrokerAddress, MetadataRequest) => F[MetadataResponse] , seed: Seq[BrokerAddress] - )(topicId: String @@ TopicName, partition: Int @@ PartitionId) :F[Option[BrokerAddress]] = { + )(topicId: String @@ TopicName, partition: Int @@ PartitionId): F[Option[BrokerAddress]] = { Stream.emits(seed) .evalMap { address => requestMeta(address, MetadataRequest(Vector(topicId))).attempt } - .collect { case Right(response) => response } - .map { resp => - resp.topics.find(_.name == topicId) flatMap { _.partitions.find( _.id == partition)} flatMap { + .flatMap { + case Left(err) => Stream.eval_(Log[F].error(s"Failed to read metadata", Detail(topicId) and partition, thrown = Some(err))) + case Right(response) => + Stream.eval(Log[F].info(s"Received metadata response for", Detail(topicId) and partition)) as response + }.flatMap { resp => + val broker = resp.topics.find(_.name == topicId) flatMap { _.partitions.find( _.id == partition)} flatMap { _.leader flatMap { leaderId => resp.brokers.find { _.nodeId == leaderId } map { b => BrokerAddress(b.host, b.port) } } } + + Stream.eval(Log[F].info(s"New broker for topic", Detail(topicId) and partition and broker)) as broker } .collectFirst { case Some(broker) => broker } .compile