Skip to content

Commit 82241e1

Browse files
authored
KAFKA-19891: Bump group epoch when member regex subscription transitions from non-empty to empty (#21013)
This PR fixes an issue in GroupMetadataManager#maybeUpdateRegularExpressions where a member’s regex subscription transition from non-empty → empty did not trigger a group epoch bump. The method previously returned REGEX_UPDATED, which does not cause consumerGroupHeartbeat to increment the group epoch. Fix The patch updates the logic to return: REGEX_UPDATED_AND_RESOLVED when: the updated regex subscription text is empty, and the previous subscription was non-empty. This ensures that consumerGroupHeartbeat correctly bumps the group epoch, keeping the group metadata consistent. Tests Several tests were updated to align with the corrected behavior. Tests that previously expected no epoch bump were failing, and have now been adjusted to expect the new, correct logic. JIRA https://issues.apache.org/jira/browse/KAFKA-19891 Impact Fixes coordinator state correctness for regex-subscribing consumer groups Ensures group epoch bumps happen for all relevant subscription transitions Backward compatible No public API changes Reviewers: Sean Quah <[email protected]>, David Jacot <[email protected]>
1 parent 10ebba5 commit 82241e1

File tree

2 files changed

+83
-0
lines changed

2 files changed

+83
-0
lines changed

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3313,6 +3313,8 @@ private UpdateRegularExpressionsResult maybeUpdateRegularExpressions(
33133313
updateRegularExpressionsResult = UpdateRegularExpressionsResult.REGEX_UPDATED_AND_RESOLVED;
33143314
}
33153315
}
3316+
} else if (isNotEmpty(oldSubscribedTopicRegex)) {
3317+
updateRegularExpressionsResult = UpdateRegularExpressionsResult.REGEX_UPDATED_AND_RESOLVED;
33163318
}
33173319
}
33183320

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21542,6 +21542,87 @@ foooTopicName, computeTopicHash(foooTopicName, new KRaftCoordinatorMetadataImage
2154221542
);
2154321543
}
2154421544

21545+
@Test
21546+
public void testConsumerGroupMemberClearsRegex() {
21547+
String groupId = "fooup";
21548+
String memberId1 = Uuid.randomUuid().toString();
21549+
21550+
Uuid fooTopicId = Uuid.randomUuid();
21551+
String fooTopicName = "foo";
21552+
21553+
CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
21554+
.addTopic(fooTopicId, fooTopicName, 6)
21555+
.buildCoordinatorMetadataImage(12345L);
21556+
21557+
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
21558+
assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
21559+
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
21560+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor))
21561+
.withMetadataImage(metadataImage)
21562+
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
21563+
.withMember(new ConsumerGroupMember.Builder(memberId1)
21564+
.setState(MemberState.STABLE)
21565+
.setMemberEpoch(10)
21566+
.setPreviousMemberEpoch(10)
21567+
.setClientId(DEFAULT_CLIENT_ID)
21568+
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
21569+
.setRebalanceTimeoutMs(5000)
21570+
.setSubscribedTopicRegex("foo*")
21571+
.setServerAssignorName("range")
21572+
.setAssignedPartitions(mkAssignment(
21573+
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
21574+
.build())
21575+
.withAssignment(memberId1, mkAssignment(
21576+
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
21577+
.withAssignmentEpoch(10))
21578+
.build();
21579+
21580+
// Member 1 updates its new regular expression.
21581+
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result = context.consumerGroupHeartbeat(
21582+
new ConsumerGroupHeartbeatRequestData()
21583+
.setGroupId(groupId)
21584+
.setMemberId(memberId1)
21585+
.setMemberEpoch(10)
21586+
.setRebalanceTimeoutMs(5000)
21587+
.setSubscribedTopicRegex("")
21588+
.setServerAssignor("range")
21589+
.setTopicPartitions(List.of()));
21590+
21591+
assertResponseEquals(
21592+
new ConsumerGroupHeartbeatResponseData()
21593+
.setMemberId(memberId1)
21594+
.setMemberEpoch(11)
21595+
.setHeartbeatIntervalMs(5000)
21596+
.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
21597+
.setTopicPartitions(List.of())
21598+
),
21599+
result.response()
21600+
);
21601+
21602+
ConsumerGroupMember expectedMember1 = new ConsumerGroupMember.Builder(memberId1)
21603+
.setState(MemberState.STABLE)
21604+
.setMemberEpoch(11)
21605+
.setPreviousMemberEpoch(10)
21606+
.setClientId(DEFAULT_CLIENT_ID)
21607+
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
21608+
.setRebalanceTimeoutMs(5000)
21609+
.setSubscribedTopicRegex("")
21610+
.setServerAssignorName("range")
21611+
.build();
21612+
21613+
List<CoordinatorRecord> expectedRecords = List.of(
21614+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1),
21615+
// previous expression is deleted
21616+
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*"),
21617+
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0),
21618+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, Map.of()),
21619+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 11),
21620+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember1)
21621+
);
21622+
21623+
assertRecordsEquals(expectedRecords, result.records());
21624+
}
21625+
2154521626
@Test
2154621627
public void testConsumerMemberWithRegexReplacedByClassicMemberWithSameSubscription() {
2154721628
String groupId = "fooup";

0 commit comments

Comments
 (0)