diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index 0d85cbde3..f77c3b9ea 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -968,6 +968,11 @@ static void rd_kafka_cgrp_handle_LeaveGroup(rd_kafka_t *rk, rd_kafka_cgrp_t *rkcg = opaque; const int log_decode_errors = LOG_ERR; int16_t ErrorCode = 0; + int32_t member_cnt; + int i; + rd_kafkap_str_t MemberId; + rd_kafkap_str_t GroupInstanceId; + int16_t MemberErrorCode; if (err) { ErrorCode = err; @@ -979,6 +984,28 @@ static void rd_kafka_cgrp_handle_LeaveGroup(rd_kafka_t *rk, rd_kafka_buf_read_i16(rkbuf, &ErrorCode); + if (request->rkbuf_reqhdr.ApiVersion >= 3) { + rd_kafka_buf_read_arraycnt(rkbuf, &member_cnt, + RD_KAFKAP_TOPICS_MAX); + for (i = 0; i < member_cnt; i++) { + rd_kafka_buf_read_str(rkbuf, &MemberId); + rd_kafka_buf_read_str(rkbuf, &GroupInstanceId); + rd_kafka_buf_read_i16(rkbuf, &MemberErrorCode); + + if (request->rkbuf_reqhdr.ApiVersion >= 4) { + rd_kafka_buf_skip_tags(rkbuf); + } + + if (ErrorCode == 0 && MemberErrorCode != 0) { + ErrorCode = MemberErrorCode; + } + } + } + + if (request->rkbuf_reqhdr.ApiVersion >= 4) { + rd_kafka_buf_skip_tags(rkbuf); + } + err: if (ErrorCode) rd_kafka_dbg(rkb->rkb_rk, CGRP, "LEAVEGROUP", @@ -1124,6 +1151,7 @@ static void rd_kafka_cgrp_consumer_leave(rd_kafka_cgrp_t *rkcg) { static void rd_kafka_cgrp_leave(rd_kafka_cgrp_t *rkcg) { char *member_id; + int member_cnt = 1; RD_KAFKAP_STR_DUPA(&member_id, rkcg->rkcg_member_id); @@ -1148,12 +1176,19 @@ static void rd_kafka_cgrp_leave(rd_kafka_cgrp_t *rkcg) { rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WAIT_LEAVE; if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_UP) { + rd_kafkap_str_t *member_id_new; + member_id_new = rd_kafkap_str_new(member_id, -1); + + rd_kafka_leave_member_t member = { + .member_id = member_id_new, + .group_instance_id = rkcg->rkcg_group_instance_id}; rd_rkb_dbg(rkcg->rkcg_curr_coord, CONSUMER, "LEAVE", "Leaving group"); rd_kafka_LeaveGroupRequest( - rkcg->rkcg_coord, rkcg->rkcg_group_id->str, member_id, - RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0), + rkcg->rkcg_coord, rkcg->rkcg_group_id->str, &member, + member_cnt, RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0), rd_kafka_cgrp_handle_LeaveGroup, rkcg); + rd_kafkap_str_destroy(member_id_new); } else rd_kafka_cgrp_handle_LeaveGroup(rkcg->rkcg_rk, rkcg->rkcg_coord, RD_KAFKA_RESP_ERR__WAIT_COORD, diff --git a/src/rdkafka_mock_handlers.c b/src/rdkafka_mock_handlers.c index ad509ecce..a6ee42765 100644 --- a/src/rdkafka_mock_handlers.c +++ b/src/rdkafka_mock_handlers.c @@ -1746,13 +1746,38 @@ static int rd_kafka_mock_handle_LeaveGroup(rd_kafka_mock_connection_t *mconn, rd_kafka_mock_broker_t *mrkb; const rd_bool_t log_decode_errors = rd_true; rd_kafka_buf_t *resp = rd_kafka_mock_buf_new_response(rkbuf); - rd_kafkap_str_t GroupId, MemberId; + rd_kafkap_str_t GroupId; + int32_t member_cnt = 1; + rd_kafkap_str_t *members_id; + rd_kafkap_str_t *members_instance_id; + rd_kafka_resp_err_t *member_errors; rd_kafka_resp_err_t err; rd_kafka_mock_cgrp_classic_t *mcgrp; rd_kafka_mock_cgrp_classic_member_t *member = NULL; + int i; rd_kafka_buf_read_str(rkbuf, &GroupId); - rd_kafka_buf_read_str(rkbuf, &MemberId); + + if (rkbuf->rkbuf_reqhdr.ApiVersion >= 3) { + rd_kafka_buf_read_arraycnt(rkbuf, &member_cnt, + RD_KAFKAP_TOPICS_MAX); + + members_id = rd_alloca(member_cnt * sizeof(*members_id)); + members_instance_id = + rd_alloca(member_cnt * sizeof(*members_instance_id)); + + for (i = 0; i < member_cnt; i++) { + rd_kafka_buf_read_str(rkbuf, &members_id[i]); + rd_kafka_buf_read_str(rkbuf, &members_instance_id[i]); + + if (rkbuf->rkbuf_reqhdr.ApiVersion >= 4) + rd_kafka_buf_skip_tags(rkbuf); + } + } else { + members_id = rd_alloca(sizeof(*members_id)); + members_instance_id = NULL; + rd_kafka_buf_read_str(rkbuf, &members_id[0]); + } /* * Construct response @@ -1781,21 +1806,49 @@ static int rd_kafka_mock_handle_LeaveGroup(rd_kafka_mock_connection_t *mconn, err = RD_KAFKA_RESP_ERR_GROUP_ID_NOT_FOUND; } + member_errors = rd_alloca(member_cnt * sizeof(*member_errors)); + if (!err) { - member = - rd_kafka_mock_cgrp_classic_member_find(mcgrp, &MemberId); - if (!member) - err = RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID; + for (i = 0; i < member_cnt; i++) { + member = rd_kafka_mock_cgrp_classic_member_find( + mcgrp, &members_id[i]); + if (!member) { + member_errors[i] = + RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID; + } else { + member_errors[i] = + rd_kafka_mock_cgrp_classic_check_state( + mcgrp, member, rkbuf, -1); + if (!member_errors[i]) + rd_kafka_mock_cgrp_classic_member_leave( + mcgrp, member); + } + + /* For v0-2, promote first member error to top-level */ + if (rkbuf->rkbuf_reqhdr.ApiVersion < 3 && + member_errors[i] && !err) + err = member_errors[i]; + } + } else { + for (i = 0; i < member_cnt; i++) { + member_errors[i] = err; + } } - if (!err) - err = rd_kafka_mock_cgrp_classic_check_state(mcgrp, member, - rkbuf, -1); + /* Write top-level error code */ + rd_kafka_buf_write_i16(resp, err); - if (!err) - rd_kafka_mock_cgrp_classic_member_leave(mcgrp, member); + if (rkbuf->rkbuf_reqhdr.ApiVersion >= 3) { + rd_kafka_buf_write_arraycnt(resp, member_cnt); + for (i = 0; i < member_cnt; i++) { + rd_kafka_buf_write_kstr(resp, &members_id[i]); + rd_kafka_buf_write_kstr(resp, &members_instance_id[i]); + rd_kafka_buf_write_i16(resp, member_errors[i]); - rd_kafka_buf_write_i16(resp, err); /* ErrorCode */ + if (rkbuf->rkbuf_reqhdr.ApiVersion >= 4) + rd_kafka_buf_write_tags_empty(resp); + } + } rd_kafka_mock_connection_send_response(mconn, resp); diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 663a07eae..6854ec52d 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -2147,21 +2147,39 @@ void rd_kafka_JoinGroupRequest(rd_kafka_broker_t *rkb, */ void rd_kafka_LeaveGroupRequest(rd_kafka_broker_t *rkb, const char *group_id, - const char *member_id, + const rd_kafka_leave_member_t *members, + int member_cnt, rd_kafka_replyq_t replyq, rd_kafka_resp_cb_t *resp_cb, void *opaque) { rd_kafka_buf_t *rkbuf; int16_t ApiVersion = 0; int features; + int i; ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_LeaveGroup, 0, 1, &features); + rkb, RD_KAFKAP_LeaveGroup, 0, 4, &features); - rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_LeaveGroup, 1, 300); + rkbuf = rd_kafka_buf_new_flexver_request(rkb, RD_KAFKAP_LeaveGroup, 1, + 300, ApiVersion >= 4); rd_kafka_buf_write_str(rkbuf, group_id, -1); - rd_kafka_buf_write_str(rkbuf, member_id, -1); + + if (ApiVersion >= 3) { + rd_kafka_buf_write_arraycnt(rkbuf, member_cnt); + for (i = 0; i < member_cnt; i++) { + rd_kafka_buf_write_kstr(rkbuf, members[i].member_id); + rd_kafka_buf_write_kstr(rkbuf, + members[i].group_instance_id); + if (ApiVersion >= 4) { + rd_kafka_buf_write_tags_empty(rkbuf); + } + } + } else { + /* v0-2: Only supports single member */ + rd_assert(member_cnt == 1); + rd_kafka_buf_write_kstr(rkbuf, members[0].member_id); + } rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); @@ -2176,58 +2194,6 @@ void rd_kafka_LeaveGroupRequest(rd_kafka_broker_t *rkb, } -/** - * Handler for LeaveGroup responses - * opaque must be the cgrp handle. - */ -void rd_kafka_handle_LeaveGroup(rd_kafka_t *rk, - rd_kafka_broker_t *rkb, - rd_kafka_resp_err_t err, - rd_kafka_buf_t *rkbuf, - rd_kafka_buf_t *request, - void *opaque) { - rd_kafka_cgrp_t *rkcg = opaque; - const int log_decode_errors = LOG_ERR; - int16_t ErrorCode = 0; - int actions; - - if (err) { - ErrorCode = err; - goto err; - } - - rd_kafka_buf_read_i16(rkbuf, &ErrorCode); - -err: - actions = rd_kafka_err_action(rkb, ErrorCode, request, - RD_KAFKA_ERR_ACTION_END); - - if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { - /* Re-query for coordinator */ - rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ, - RD_KAFKA_OP_COORD_QUERY, ErrorCode); - } - - if (actions & RD_KAFKA_ERR_ACTION_RETRY) { - if (rd_kafka_buf_retry(rkb, request)) - return; - /* FALLTHRU */ - } - - if (ErrorCode) - rd_kafka_dbg(rkb->rkb_rk, CGRP, "LEAVEGROUP", - "LeaveGroup response: %s", - rd_kafka_err2str(ErrorCode)); - - return; - -err_parse: - ErrorCode = rkbuf->rkbuf_err; - goto err; -} - - - /** * Send HeartbeatRequest */ diff --git a/src/rdkafka_request.h b/src/rdkafka_request.h index c508ffdaa..43ef0fb1a 100644 --- a/src/rdkafka_request.h +++ b/src/rdkafka_request.h @@ -160,6 +160,15 @@ typedef struct rd_kafkap_Fetch_reply_tags_s { /**@}*/ +/** + * @brief LeaveGroup memebr identity (for LeaveGroup version 3) + */ +typedef struct rd_kafka_leave_member_s { + rd_kafkap_str_t *member_id; + rd_kafkap_str_t *group_instance_id; +} rd_kafka_leave_member_t; + + rd_kafka_topic_partition_list_t *rd_kafka_buf_read_topic_partitions( rd_kafka_buf_t *rkbuf, rd_bool_t use_topic_id, @@ -326,7 +335,8 @@ void rd_kafka_JoinGroupRequest(rd_kafka_broker_t *rkb, void rd_kafka_LeaveGroupRequest(rd_kafka_broker_t *rkb, const char *group_id, - const char *member_id, + const rd_kafka_leave_member_t *members, + int member_cnt, rd_kafka_replyq_t replyq, rd_kafka_resp_cb_t *resp_cb, void *opaque);