From d137748db6b50eb9cd303968a99a50e7fb2fdb9f Mon Sep 17 00:00:00 2001 From: Ankith-Confluent Date: Thu, 20 Nov 2025 23:30:21 +0530 Subject: [PATCH 1/5] Upgraded LeaveGroup API from version 1 to version 3 --- src/rdkafka_cgrp.c | 25 ++++++++++++++++++++++++- src/rdkafka_request.c | 21 +++++++++++++++++---- src/rdkafka_request.h | 12 +++++++++++- 3 files changed, 52 insertions(+), 6 deletions(-) diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index 0d85cbde3..f5cef8202 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -968,6 +968,11 @@ static void rd_kafka_cgrp_handle_LeaveGroup(rd_kafka_t *rk, rd_kafka_cgrp_t *rkcg = opaque; const int log_decode_errors = LOG_ERR; int16_t ErrorCode = 0; + int32_t member_cnt; + int i; + rd_kafkap_str_t MemberId; + rd_kafkap_str_t GroupInstanceId; + int16_t MemberErrorCode; if (err) { ErrorCode = err; @@ -979,6 +984,19 @@ static void rd_kafka_cgrp_handle_LeaveGroup(rd_kafka_t *rk, rd_kafka_buf_read_i16(rkbuf, &ErrorCode); + if (request->rkbuf_reqhdr.ApiVersion >= 3) { + rd_kafka_buf_read_i32(rkbuf, &member_cnt); + for (i = 0; i < member_cnt; i++) { + rd_kafka_buf_read_str(rkbuf, &MemberId); + rd_kafka_buf_read_str(rkbuf, &GroupInstanceId); + rd_kafka_buf_read_i16(rkbuf, &MemberErrorCode); + + if (ErrorCode == 0 && MemberErrorCode != 0) { + ErrorCode = MemberErrorCode; + } + } + } + err: if (ErrorCode) rd_kafka_dbg(rkb->rkb_rk, CGRP, "LEAVEGROUP", @@ -1124,6 +1142,7 @@ static void rd_kafka_cgrp_consumer_leave(rd_kafka_cgrp_t *rkcg) { static void rd_kafka_cgrp_leave(rd_kafka_cgrp_t *rkcg) { char *member_id; + int member_cnt = 1; RD_KAFKAP_STR_DUPA(&member_id, rkcg->rkcg_member_id); @@ -1148,10 +1167,14 @@ static void rd_kafka_cgrp_leave(rd_kafka_cgrp_t *rkcg) { rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WAIT_LEAVE; if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_UP) { + rd_kafka_leave_member_t member = { + .member_id = rkcg->rkcg_member_id, + .group_instance_id = rkcg->rkcg_group_instance_id + }; rd_rkb_dbg(rkcg->rkcg_curr_coord, CONSUMER, "LEAVE", "Leaving group"); rd_kafka_LeaveGroupRequest( - rkcg->rkcg_coord, rkcg->rkcg_group_id->str, member_id, + rkcg->rkcg_coord, rkcg->rkcg_group_id->str, &member, member_cnt, RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0), rd_kafka_cgrp_handle_LeaveGroup, rkcg); } else diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 663a07eae..1b65e76c0 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -2147,21 +2147,34 @@ void rd_kafka_JoinGroupRequest(rd_kafka_broker_t *rkb, */ void rd_kafka_LeaveGroupRequest(rd_kafka_broker_t *rkb, const char *group_id, - const char *member_id, + const rd_kafka_leave_member_t *members, + int member_cnt, rd_kafka_replyq_t replyq, rd_kafka_resp_cb_t *resp_cb, void *opaque) { rd_kafka_buf_t *rkbuf; int16_t ApiVersion = 0; int features; - + int i; + ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_LeaveGroup, 0, 1, &features); + rkb, RD_KAFKAP_LeaveGroup, 0, 3, &features); rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_LeaveGroup, 1, 300); rd_kafka_buf_write_str(rkbuf, group_id, -1); - rd_kafka_buf_write_str(rkbuf, member_id, -1); + + if (ApiVersion >= 3) { + rd_kafka_buf_write_arraycnt(rkbuf, member_cnt); + for (i = 0; i < member_cnt; i++) { + rd_kafka_buf_write_kstr(rkbuf, members[i].member_id->str); + rd_kafka_buf_write_kstr(rkbuf, members[i].group_instance_id); + } + } else { + /* v0-2: Only supports single member */ + rd_assert(member_cnt == 1); + rd_kafka_buf_write_kstr(rkbuf, members[0].member_id); + } rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); diff --git a/src/rdkafka_request.h b/src/rdkafka_request.h index c508ffdaa..43ef0fb1a 100644 --- a/src/rdkafka_request.h +++ b/src/rdkafka_request.h @@ -160,6 +160,15 @@ typedef struct rd_kafkap_Fetch_reply_tags_s { /**@}*/ +/** + * @brief LeaveGroup memebr identity (for LeaveGroup version 3) + */ +typedef struct rd_kafka_leave_member_s { + rd_kafkap_str_t *member_id; + rd_kafkap_str_t *group_instance_id; +} rd_kafka_leave_member_t; + + rd_kafka_topic_partition_list_t *rd_kafka_buf_read_topic_partitions( rd_kafka_buf_t *rkbuf, rd_bool_t use_topic_id, @@ -326,7 +335,8 @@ void rd_kafka_JoinGroupRequest(rd_kafka_broker_t *rkb, void rd_kafka_LeaveGroupRequest(rd_kafka_broker_t *rkb, const char *group_id, - const char *member_id, + const rd_kafka_leave_member_t *members, + int member_cnt, rd_kafka_replyq_t replyq, rd_kafka_resp_cb_t *resp_cb, void *opaque); From a1cad9294d7908762218e0e61e65246fd2ec054d Mon Sep 17 00:00:00 2001 From: Ankith-Confluent Date: Thu, 20 Nov 2025 23:39:25 +0530 Subject: [PATCH 2/5] minor fix and formatting issues --- src/rdkafka_cgrp.c | 29 ++++++++++++++--------------- src/rdkafka_request.c | 7 ++++--- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index f5cef8202..4a4827772 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -985,18 +985,18 @@ static void rd_kafka_cgrp_handle_LeaveGroup(rd_kafka_t *rk, rd_kafka_buf_read_i16(rkbuf, &ErrorCode); if (request->rkbuf_reqhdr.ApiVersion >= 3) { - rd_kafka_buf_read_i32(rkbuf, &member_cnt); - for (i = 0; i < member_cnt; i++) { - rd_kafka_buf_read_str(rkbuf, &MemberId); - rd_kafka_buf_read_str(rkbuf, &GroupInstanceId); - rd_kafka_buf_read_i16(rkbuf, &MemberErrorCode); - - if (ErrorCode == 0 && MemberErrorCode != 0) { - ErrorCode = MemberErrorCode; + rd_kafka_buf_read_i32(rkbuf, &member_cnt); + for (i = 0; i < member_cnt; i++) { + rd_kafka_buf_read_str(rkbuf, &MemberId); + rd_kafka_buf_read_str(rkbuf, &GroupInstanceId); + rd_kafka_buf_read_i16(rkbuf, &MemberErrorCode); + + if (ErrorCode == 0 && MemberErrorCode != 0) { + ErrorCode = MemberErrorCode; + } } - } } - + err: if (ErrorCode) rd_kafka_dbg(rkb->rkb_rk, CGRP, "LEAVEGROUP", @@ -1168,14 +1168,13 @@ static void rd_kafka_cgrp_leave(rd_kafka_cgrp_t *rkcg) { if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_UP) { rd_kafka_leave_member_t member = { - .member_id = rkcg->rkcg_member_id, - .group_instance_id = rkcg->rkcg_group_instance_id - }; + .member_id = rkcg->rkcg_member_id, + .group_instance_id = rkcg->rkcg_group_instance_id}; rd_rkb_dbg(rkcg->rkcg_curr_coord, CONSUMER, "LEAVE", "Leaving group"); rd_kafka_LeaveGroupRequest( - rkcg->rkcg_coord, rkcg->rkcg_group_id->str, &member, member_cnt, - RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0), + rkcg->rkcg_coord, rkcg->rkcg_group_id->str, &member, + member_cnt, RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0), rd_kafka_cgrp_handle_LeaveGroup, rkcg); } else rd_kafka_cgrp_handle_LeaveGroup(rkcg->rkcg_rk, rkcg->rkcg_coord, diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 1b65e76c0..bf7627bea 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -2156,7 +2156,7 @@ void rd_kafka_LeaveGroupRequest(rd_kafka_broker_t *rkb, int16_t ApiVersion = 0; int features; int i; - + ApiVersion = rd_kafka_broker_ApiVersion_supported( rkb, RD_KAFKAP_LeaveGroup, 0, 3, &features); @@ -2167,8 +2167,9 @@ void rd_kafka_LeaveGroupRequest(rd_kafka_broker_t *rkb, if (ApiVersion >= 3) { rd_kafka_buf_write_arraycnt(rkbuf, member_cnt); for (i = 0; i < member_cnt; i++) { - rd_kafka_buf_write_kstr(rkbuf, members[i].member_id->str); - rd_kafka_buf_write_kstr(rkbuf, members[i].group_instance_id); + rd_kafka_buf_write_kstr(rkbuf, members[i].member_id); + rd_kafka_buf_write_kstr(rkbuf, + members[i].group_instance_id); } } else { /* v0-2: Only supports single member */ From 3a55cee608e151d4de6d3d890f4df754c621b91a Mon Sep 17 00:00:00 2001 From: Ankith-Confluent Date: Mon, 24 Nov 2025 13:38:58 +0530 Subject: [PATCH 3/5] Fixed the issue in leavegroup, and upgraded mock implementation also --- src/rdkafka_cgrp.c | 6 +++- src/rdkafka_mock_handlers.c | 72 ++++++++++++++++++++++++++++++------- 2 files changed, 64 insertions(+), 14 deletions(-) diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index 4a4827772..c52840e37 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -1167,8 +1167,11 @@ static void rd_kafka_cgrp_leave(rd_kafka_cgrp_t *rkcg) { rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WAIT_LEAVE; if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_UP) { + rd_kafkap_str_t *member_id_new; + member_id_new = rd_kafkap_str_new(member_id, -1); + rd_kafka_leave_member_t member = { - .member_id = rkcg->rkcg_member_id, + .member_id = member_id_new, .group_instance_id = rkcg->rkcg_group_instance_id}; rd_rkb_dbg(rkcg->rkcg_curr_coord, CONSUMER, "LEAVE", "Leaving group"); @@ -1176,6 +1179,7 @@ static void rd_kafka_cgrp_leave(rd_kafka_cgrp_t *rkcg) { rkcg->rkcg_coord, rkcg->rkcg_group_id->str, &member, member_cnt, RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0), rd_kafka_cgrp_handle_LeaveGroup, rkcg); + rd_kafkap_str_destroy(member_id_new); } else rd_kafka_cgrp_handle_LeaveGroup(rkcg->rkcg_rk, rkcg->rkcg_coord, RD_KAFKA_RESP_ERR__WAIT_COORD, diff --git a/src/rdkafka_mock_handlers.c b/src/rdkafka_mock_handlers.c index ad509ecce..a011211f7 100644 --- a/src/rdkafka_mock_handlers.c +++ b/src/rdkafka_mock_handlers.c @@ -1746,13 +1746,34 @@ static int rd_kafka_mock_handle_LeaveGroup(rd_kafka_mock_connection_t *mconn, rd_kafka_mock_broker_t *mrkb; const rd_bool_t log_decode_errors = rd_true; rd_kafka_buf_t *resp = rd_kafka_mock_buf_new_response(rkbuf); - rd_kafkap_str_t GroupId, MemberId; + rd_kafkap_str_t GroupId; + int32_t member_cnt = 1; + rd_kafkap_str_t *members_id; + rd_kafkap_str_t *members_instance_id; + rd_kafka_resp_err_t *member_errors; rd_kafka_resp_err_t err; rd_kafka_mock_cgrp_classic_t *mcgrp; rd_kafka_mock_cgrp_classic_member_t *member = NULL; + int i; rd_kafka_buf_read_str(rkbuf, &GroupId); - rd_kafka_buf_read_str(rkbuf, &MemberId); + + if (rkbuf->rkbuf_reqhdr.ApiVersion >= 3) { + rd_kafka_buf_read_i32(rkbuf, &member_cnt); + + members_id = rd_alloca(member_cnt * sizeof(*members_id)); + members_instance_id = + rd_alloca(member_cnt * sizeof(*members_instance_id)); + + for (i = 0; i < member_cnt; i++) { + rd_kafka_buf_read_str(rkbuf, &members_id[i]); + rd_kafka_buf_read_str(rkbuf, &members_instance_id[i]); + } + } else { + members_id = rd_alloca(sizeof(*members_id)); + members_instance_id = NULL; /* Not used for v0-2 */ + rd_kafka_buf_read_str(rkbuf, &members_id[0]); + } /* * Construct response @@ -1781,21 +1802,46 @@ static int rd_kafka_mock_handle_LeaveGroup(rd_kafka_mock_connection_t *mconn, err = RD_KAFKA_RESP_ERR_GROUP_ID_NOT_FOUND; } + member_errors = rd_alloca(member_cnt * sizeof(*member_errors)); + if (!err) { - member = - rd_kafka_mock_cgrp_classic_member_find(mcgrp, &MemberId); - if (!member) - err = RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID; - } + for (i = 0; i < member_cnt; i++) { + member = rd_kafka_mock_cgrp_classic_member_find( + mcgrp, &members_id[i]); + if (!member) { + member_errors[i] = + RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID; + } else { + member_errors[i] = + rd_kafka_mock_cgrp_classic_check_state( + mcgrp, member, rkbuf, -1); + if (!member_errors[i]) + rd_kafka_mock_cgrp_classic_member_leave( + mcgrp, member); + } - if (!err) - err = rd_kafka_mock_cgrp_classic_check_state(mcgrp, member, - rkbuf, -1); + /* For v0-2, promote first member error to top-level */ + if (rkbuf->rkbuf_reqhdr.ApiVersion < 3 && + member_errors[i] && !err) + err = member_errors[i]; + } + } else { + for (i = 0; i < member_cnt; i++) { + member_errors[i] = err; + } + } - if (!err) - rd_kafka_mock_cgrp_classic_member_leave(mcgrp, member); + /* Write top-level error code */ + rd_kafka_buf_write_i16(resp, err); - rd_kafka_buf_write_i16(resp, err); /* ErrorCode */ + if (rkbuf->rkbuf_reqhdr.ApiVersion >= 3) { + rd_kafka_buf_write_i32(resp, member_cnt); + for (i = 0; i < member_cnt; i++) { + rd_kafka_buf_write_kstr(resp, &members_id[i]); + rd_kafka_buf_write_kstr(resp, &members_instance_id[i]); + rd_kafka_buf_write_i16(resp, member_errors[i]); + } + } rd_kafka_mock_connection_send_response(mconn, resp); From ebdb5271871a89a00b7edc80314ccd983405490e Mon Sep 17 00:00:00 2001 From: Ankith-Confluent Date: Mon, 24 Nov 2025 20:36:25 +0530 Subject: [PATCH 4/5] Upgraded LeaveGroup from version 3 to version 4 --- src/rdkafka_cgrp.c | 10 ++++++- src/rdkafka_mock_handlers.c | 12 ++++++-- src/rdkafka_request.c | 60 ++++--------------------------------- 3 files changed, 24 insertions(+), 58 deletions(-) diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index c52840e37..0bca38f1a 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -985,18 +985,26 @@ static void rd_kafka_cgrp_handle_LeaveGroup(rd_kafka_t *rk, rd_kafka_buf_read_i16(rkbuf, &ErrorCode); if (request->rkbuf_reqhdr.ApiVersion >= 3) { - rd_kafka_buf_read_i32(rkbuf, &member_cnt); + rd_kafka_buf_read_arraycnt(rkbuf, &member_cnt, RD_KAFKAP_TOPICS_MAX); for (i = 0; i < member_cnt; i++) { rd_kafka_buf_read_str(rkbuf, &MemberId); rd_kafka_buf_read_str(rkbuf, &GroupInstanceId); rd_kafka_buf_read_i16(rkbuf, &MemberErrorCode); + if (request->rkbuf_reqhdr.ApiVersion >= 4) { + rd_kafka_buf_skip_tags(rkbuf); + } + if (ErrorCode == 0 && MemberErrorCode != 0) { ErrorCode = MemberErrorCode; } } } + if (request->rkbuf_reqhdr.ApiVersion >= 4) { + rd_kafka_buf_skip_tags(rkbuf); + } + err: if (ErrorCode) rd_kafka_dbg(rkb->rkb_rk, CGRP, "LEAVEGROUP", diff --git a/src/rdkafka_mock_handlers.c b/src/rdkafka_mock_handlers.c index a011211f7..439ff8083 100644 --- a/src/rdkafka_mock_handlers.c +++ b/src/rdkafka_mock_handlers.c @@ -1759,7 +1759,7 @@ static int rd_kafka_mock_handle_LeaveGroup(rd_kafka_mock_connection_t *mconn, rd_kafka_buf_read_str(rkbuf, &GroupId); if (rkbuf->rkbuf_reqhdr.ApiVersion >= 3) { - rd_kafka_buf_read_i32(rkbuf, &member_cnt); + rd_kafka_buf_read_arraycnt(rkbuf, &member_cnt, RD_KAFKAP_TOPICS_MAX); members_id = rd_alloca(member_cnt * sizeof(*members_id)); members_instance_id = @@ -1768,10 +1768,13 @@ static int rd_kafka_mock_handle_LeaveGroup(rd_kafka_mock_connection_t *mconn, for (i = 0; i < member_cnt; i++) { rd_kafka_buf_read_str(rkbuf, &members_id[i]); rd_kafka_buf_read_str(rkbuf, &members_instance_id[i]); + + if (rkbuf->rkbuf_reqhdr.ApiVersion >= 4) + rd_kafka_buf_skip_tags(rkbuf); } } else { members_id = rd_alloca(sizeof(*members_id)); - members_instance_id = NULL; /* Not used for v0-2 */ + members_instance_id = NULL; rd_kafka_buf_read_str(rkbuf, &members_id[0]); } @@ -1835,11 +1838,14 @@ static int rd_kafka_mock_handle_LeaveGroup(rd_kafka_mock_connection_t *mconn, rd_kafka_buf_write_i16(resp, err); if (rkbuf->rkbuf_reqhdr.ApiVersion >= 3) { - rd_kafka_buf_write_i32(resp, member_cnt); + rd_kafka_buf_write_arraycnt(resp, member_cnt); for (i = 0; i < member_cnt; i++) { rd_kafka_buf_write_kstr(resp, &members_id[i]); rd_kafka_buf_write_kstr(resp, &members_instance_id[i]); rd_kafka_buf_write_i16(resp, member_errors[i]); + + if (rkbuf->rkbuf_reqhdr.ApiVersion >= 4) + rd_kafka_buf_write_tags_empty(resp); } } diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index bf7627bea..6854ec52d 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -2158,9 +2158,10 @@ void rd_kafka_LeaveGroupRequest(rd_kafka_broker_t *rkb, int i; ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_LeaveGroup, 0, 3, &features); + rkb, RD_KAFKAP_LeaveGroup, 0, 4, &features); - rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_LeaveGroup, 1, 300); + rkbuf = rd_kafka_buf_new_flexver_request(rkb, RD_KAFKAP_LeaveGroup, 1, + 300, ApiVersion >= 4); rd_kafka_buf_write_str(rkbuf, group_id, -1); @@ -2170,6 +2171,9 @@ void rd_kafka_LeaveGroupRequest(rd_kafka_broker_t *rkb, rd_kafka_buf_write_kstr(rkbuf, members[i].member_id); rd_kafka_buf_write_kstr(rkbuf, members[i].group_instance_id); + if (ApiVersion >= 4) { + rd_kafka_buf_write_tags_empty(rkbuf); + } } } else { /* v0-2: Only supports single member */ @@ -2190,58 +2194,6 @@ void rd_kafka_LeaveGroupRequest(rd_kafka_broker_t *rkb, } -/** - * Handler for LeaveGroup responses - * opaque must be the cgrp handle. - */ -void rd_kafka_handle_LeaveGroup(rd_kafka_t *rk, - rd_kafka_broker_t *rkb, - rd_kafka_resp_err_t err, - rd_kafka_buf_t *rkbuf, - rd_kafka_buf_t *request, - void *opaque) { - rd_kafka_cgrp_t *rkcg = opaque; - const int log_decode_errors = LOG_ERR; - int16_t ErrorCode = 0; - int actions; - - if (err) { - ErrorCode = err; - goto err; - } - - rd_kafka_buf_read_i16(rkbuf, &ErrorCode); - -err: - actions = rd_kafka_err_action(rkb, ErrorCode, request, - RD_KAFKA_ERR_ACTION_END); - - if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { - /* Re-query for coordinator */ - rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ, - RD_KAFKA_OP_COORD_QUERY, ErrorCode); - } - - if (actions & RD_KAFKA_ERR_ACTION_RETRY) { - if (rd_kafka_buf_retry(rkb, request)) - return; - /* FALLTHRU */ - } - - if (ErrorCode) - rd_kafka_dbg(rkb->rkb_rk, CGRP, "LEAVEGROUP", - "LeaveGroup response: %s", - rd_kafka_err2str(ErrorCode)); - - return; - -err_parse: - ErrorCode = rkbuf->rkbuf_err; - goto err; -} - - - /** * Send HeartbeatRequest */ From c9320985df056efb73d11482d25b52ce3e24dda1 Mon Sep 17 00:00:00 2001 From: Ankith-Confluent Date: Mon, 24 Nov 2025 21:57:54 +0530 Subject: [PATCH 5/5] style fixes --- src/rdkafka_cgrp.c | 3 ++- src/rdkafka_mock_handlers.c | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index 0bca38f1a..f77c3b9ea 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -985,7 +985,8 @@ static void rd_kafka_cgrp_handle_LeaveGroup(rd_kafka_t *rk, rd_kafka_buf_read_i16(rkbuf, &ErrorCode); if (request->rkbuf_reqhdr.ApiVersion >= 3) { - rd_kafka_buf_read_arraycnt(rkbuf, &member_cnt, RD_KAFKAP_TOPICS_MAX); + rd_kafka_buf_read_arraycnt(rkbuf, &member_cnt, + RD_KAFKAP_TOPICS_MAX); for (i = 0; i < member_cnt; i++) { rd_kafka_buf_read_str(rkbuf, &MemberId); rd_kafka_buf_read_str(rkbuf, &GroupInstanceId); diff --git a/src/rdkafka_mock_handlers.c b/src/rdkafka_mock_handlers.c index 439ff8083..a6ee42765 100644 --- a/src/rdkafka_mock_handlers.c +++ b/src/rdkafka_mock_handlers.c @@ -1759,7 +1759,8 @@ static int rd_kafka_mock_handle_LeaveGroup(rd_kafka_mock_connection_t *mconn, rd_kafka_buf_read_str(rkbuf, &GroupId); if (rkbuf->rkbuf_reqhdr.ApiVersion >= 3) { - rd_kafka_buf_read_arraycnt(rkbuf, &member_cnt, RD_KAFKAP_TOPICS_MAX); + rd_kafka_buf_read_arraycnt(rkbuf, &member_cnt, + RD_KAFKAP_TOPICS_MAX); members_id = rd_alloca(member_cnt * sizeof(*members_id)); members_instance_id =