Skip to content

Commit c9fa8c5

Browse files
KAFKA-19944: kafka-share-groups.sh describe offsets message improvement (#21024)
kafka-share-groups.sh --describe --offsets prints an empty table consisting only of headers when there is no offset information to display for a share group. This is most likely only when a topic which was formerly being consumed is deleted. This PR displays a message instead of an empty table, similar to how the tool works if the list of members is requested and there are no active members. Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
1 parent 43062a0 commit c9fa8c5

File tree

2 files changed

+68
-23
lines changed

2 files changed

+68
-23
lines changed

tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -545,35 +545,39 @@ private void printOffsets(TreeMap<String, Entry<ShareGroupDescription, Collectio
545545
.thenComparingInt(info -> info.partition))
546546
.toList();
547547

548-
String fmt = printOffsetFormat(groupId, offsetsInfo, verbose);
549-
550-
if (verbose) {
551-
System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", "LEADER-EPOCH", "START-OFFSET", "LAG");
548+
if (offsetsInfo.isEmpty()) {
549+
System.out.println("\nShare group '" + groupId + "' has no offset information.");
552550
} else {
553-
System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", "START-OFFSET", "LAG");
554-
}
551+
String fmt = printOffsetFormat(groupId, offsetsInfo, verbose);
555552

556-
for (SharePartitionOffsetInformation info : offsetsInfo) {
557553
if (verbose) {
558-
System.out.printf(fmt,
559-
groupId,
560-
info.topic,
561-
info.partition,
562-
info.leaderEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE),
563-
info.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE),
564-
info.lag.map(Object::toString).orElse(MISSING_COLUMN_VALUE)
565-
);
554+
System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", "LEADER-EPOCH", "START-OFFSET", "LAG");
566555
} else {
567-
System.out.printf(fmt,
568-
groupId,
569-
info.topic,
570-
info.partition,
571-
info.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE),
572-
info.lag.map(Object::toString).orElse(MISSING_COLUMN_VALUE)
573-
);
556+
System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", "START-OFFSET", "LAG");
574557
}
558+
559+
for (SharePartitionOffsetInformation info : offsetsInfo) {
560+
if (verbose) {
561+
System.out.printf(fmt,
562+
groupId,
563+
info.topic,
564+
info.partition,
565+
info.leaderEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE),
566+
info.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE),
567+
info.lag.map(Object::toString).orElse(MISSING_COLUMN_VALUE)
568+
);
569+
} else {
570+
System.out.printf(fmt,
571+
groupId,
572+
info.topic,
573+
info.partition,
574+
info.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE),
575+
info.lag.map(Object::toString).orElse(MISSING_COLUMN_VALUE)
576+
);
577+
}
578+
}
579+
System.out.println();
575580
}
576-
System.out.println();
577581
});
578582
}
579583

tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,47 @@ public void testDescribeOffsetsOfExistingGroupWithNulls() throws Exception {
286286
}
287287
}
288288

289+
@Test
290+
public void testDescribeOffsetsOfExistingGroupWithNoOffsetInfo() throws Exception {
291+
String firstGroup = "group1";
292+
String bootstrapServer = "localhost:9092";
293+
294+
for (List<String> describeType : DESCRIBE_TYPE_OFFSETS) {
295+
List<String> cgcArgs = new ArrayList<>(List.of("--bootstrap-server", bootstrapServer, "--describe", "--group", firstGroup));
296+
cgcArgs.addAll(describeType);
297+
Admin adminClient = mock(KafkaAdminClient.class);
298+
DescribeShareGroupsResult describeShareGroupsResult = mock(DescribeShareGroupsResult.class);
299+
ShareGroupDescription exp = new ShareGroupDescription(
300+
firstGroup,
301+
List.of(),
302+
GroupState.EMPTY,
303+
new Node(0, "host1", 9090), 0, 0);
304+
// When there is no offset information at all, an empty map will be returned
305+
ListShareGroupOffsetsResult listShareGroupOffsetsResult = AdminClientTestUtils.createListShareGroupOffsetsResult(
306+
Map.of(
307+
firstGroup,
308+
KafkaFuture.completedFuture(Map.of())
309+
)
310+
);
311+
312+
when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(firstGroup, KafkaFuture.completedFuture(exp)));
313+
when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection(), any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult);
314+
when(adminClient.listShareGroupOffsets(ArgumentMatchers.anyMap(), any(ListShareGroupOffsetsOptions.class))).thenReturn(listShareGroupOffsetsResult);
315+
try (ShareGroupService service = getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) {
316+
TestUtils.waitForCondition(() -> {
317+
Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service));
318+
String[] lines = res.getKey().trim().split("\n");
319+
if (lines.length != 1 && !res.getValue().isEmpty()) {
320+
return false;
321+
}
322+
323+
String expectedValue = "Share group '" + firstGroup + "' has no offset information.";
324+
return expectedValue.equals(lines[0]);
325+
}, "Expected just an informational message with describe type " + String.join(" ", describeType) + ".");
326+
}
327+
}
328+
}
329+
289330
@Test
290331
public void testDescribeOffsetsOfAllExistingGroups() throws Exception {
291332
String firstGroup = "group1";

0 commit comments

Comments
 (0)