Skip to content

Commit 9599143

Browse files
authored
KAFKA-19186; Mark OffsetCommit and OffsetFetch APIs as stable (#20923)
This patch marks the OffsetCommit and OffsetFetch APIs as stable. Reviewers: Lianet Magrans <[email protected]>
1 parent 19b655f commit 9599143

File tree

9 files changed

+26
-44
lines changed

9 files changed

+26
-44
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -749,7 +749,7 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
749749
}
750750

751751
OffsetCommitRequest.Builder builder = canUseTopicIds
752-
? OffsetCommitRequest.Builder.forTopicIdsOrNames(data, true)
752+
? OffsetCommitRequest.Builder.forTopicIdsOrNames(data)
753753
: OffsetCommitRequest.Builder.forTopicNames(data);
754754

755755
return buildRequestWithResponseHandling(builder);
@@ -1033,7 +1033,7 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
10331033
.setRequireStable(true)
10341034
.setGroups(List.of(groupData));
10351035
OffsetFetchRequest.Builder builder = canUseTopicIds
1036-
? OffsetFetchRequest.Builder.forTopicIdsOrNames(data, throwOnFetchStableOffsetUnsupported, true)
1036+
? OffsetFetchRequest.Builder.forTopicIdsOrNames(data, throwOnFetchStableOffsetUnsupported)
10371037
: OffsetFetchRequest.Builder.forTopicNames(data, throwOnFetchStableOffsetUnsupported);
10381038
return buildRequestWithResponseHandling(builder);
10391039
}

clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ private Builder(OffsetCommitRequestData data, short oldestAllowedVersion, short
5151
this.data = data;
5252
}
5353

54-
public static Builder forTopicIdsOrNames(OffsetCommitRequestData data, boolean enableUnstableLastVersion) {
55-
return new Builder(data, ApiKeys.OFFSET_COMMIT.oldestVersion(), ApiKeys.OFFSET_COMMIT.latestVersion(enableUnstableLastVersion));
54+
public static Builder forTopicIdsOrNames(OffsetCommitRequestData data) {
55+
return new Builder(data, ApiKeys.OFFSET_COMMIT.oldestVersion(), ApiKeys.OFFSET_COMMIT.latestVersion());
5656
}
5757

5858
public static Builder forTopicNames(OffsetCommitRequestData data) {

clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,14 +55,13 @@ public static class Builder extends AbstractRequest.Builder<OffsetFetchRequest>
5555

5656
public static Builder forTopicIdsOrNames(
5757
OffsetFetchRequestData data,
58-
boolean throwOnFetchStableOffsetsUnsupported,
59-
boolean enableUnstableLastVersion
58+
boolean throwOnFetchStableOffsetsUnsupported
6059
) {
6160
return new Builder(
6261
data,
6362
throwOnFetchStableOffsetsUnsupported,
6463
ApiKeys.OFFSET_FETCH.oldestVersion(),
65-
ApiKeys.OFFSET_FETCH.latestVersion(enableUnstableLastVersion)
64+
ApiKeys.OFFSET_FETCH.latestVersion()
6665
);
6766
}
6867

clients/src/main/resources/common/message/OffsetCommitRequest.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
// Version 10 adds support for topic ids and removes support for topic names (KIP-848).
4141
"validVersions": "2-10",
4242
"flexibleVersions": "8+",
43-
"latestVersionUnstable": true,
4443
"fields": [
4544
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
4645
"about": "The unique group identifier." },

clients/src/main/resources/common/message/OffsetFetchRequest.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
// Version 10 adds support for topic ids and removes support for topic names (KIP-848).
4343
"validVersions": "1-10",
4444
"flexibleVersions": "6+",
45-
"latestVersionUnstable": true,
4645
"fields": [
4746
{ "name": "GroupId", "type": "string", "versions": "0-7", "entityType": "groupId",
4847
"about": "The group to fetch offsets for." },

clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public void testWithMultipleGroups(short version) {
5555
.setPartitionIndexes(List.of(0, 1, 2))
5656
))
5757
));
58-
var builder = OffsetFetchRequest.Builder.forTopicIdsOrNames(data, false, true);
58+
var builder = OffsetFetchRequest.Builder.forTopicIdsOrNames(data, false);
5959

6060
if (version < 8) {
6161
assertThrows(OffsetFetchRequest.NoBatchedOffsetFetchRequestException.class, () -> builder.build(version));
@@ -80,7 +80,6 @@ public void testThrowOnFetchStableOffsetsUnsupported(short version) {
8080
.setPartitionIndexes(List.of(0, 1, 2))
8181
))
8282
)),
83-
true,
8483
true
8584
);
8685

@@ -105,7 +104,7 @@ public void testSingleGroup(short version) {
105104
.setPartitionIndexes(List.of(0, 1, 2))
106105
))
107106
));
108-
var builder = OffsetFetchRequest.Builder.forTopicIdsOrNames(data, false, true);
107+
var builder = OffsetFetchRequest.Builder.forTopicIdsOrNames(data, false);
109108

110109
if (version < 8) {
111110
var expectedRequest = new OffsetFetchRequestData()
@@ -130,7 +129,7 @@ public void testSingleGroupWithAllTopics(short version) {
130129
.setGroupId("grp1")
131130
.setTopics(null)
132131
));
133-
var builder = OffsetFetchRequest.Builder.forTopicIdsOrNames(data, false, true);
132+
var builder = OffsetFetchRequest.Builder.forTopicIdsOrNames(data, false);
134133

135134
if (version < 2) {
136135
assertThrows(UnsupportedVersionException.class, () -> builder.build(version));
@@ -159,8 +158,7 @@ public void testGetErrorResponse(short version) {
159158
.setPartitionIndexes(List.of(0, 1))
160159
))
161160
)),
162-
false,
163-
true
161+
false
164162
).build(version);
165163

166164
if (version < 2) {
@@ -217,8 +215,7 @@ public void testGroups(short version) {
217215
.setPartitionIndexes(List.of(0, 1, 2))
218216
))
219217
)),
220-
false,
221-
true
218+
false
222219
).build(version);
223220

224221
if (version < 8) {
@@ -247,8 +244,7 @@ public void testGroupsWithAllTopics(short version) {
247244
.setGroupId("grp1")
248245
.setTopics(null)
249246
)),
250-
false,
251-
true
247+
false
252248
).build(version);
253249

254250
if (version < 8) {

clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2423,8 +2423,7 @@ private OffsetFetchRequest createOffsetFetchRequest(short version, boolean requi
24232423
.setPartitionIndexes(List.of(1))
24242424
))
24252425
)),
2426-
false,
2427-
true
2426+
false
24282427
).build(version);
24292428
}
24302429

@@ -2470,8 +2469,7 @@ private OffsetFetchRequest createOffsetFetchRequestWithMultipleGroups(short vers
24702469
.setGroupId("group5")
24712470
.setTopics(null)
24722471
)),
2473-
false,
2474-
true
2472+
false
24752473
).build(version);
24762474
}
24772475

@@ -2486,8 +2484,7 @@ private OffsetFetchRequest createOffsetFetchRequestForAllPartition(short version
24862484
.setMemberEpoch(version >= 9 ? 10 : -1)
24872485
.setTopics(null)
24882486
)),
2489-
false,
2490-
true
2487+
false
24912488
).build(version);
24922489
}
24932490

core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -226,8 +226,7 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
226226
.setPartitionIndex(partition)
227227
.setCommittedOffset(offset)
228228
).asJava)
229-
).asJava),
230-
isUnstableApiEnabled
229+
).asJava)
231230
).build(version)
232231

233232
val expectedResponse = new OffsetCommitResponseData()
@@ -396,8 +395,7 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
396395
new OffsetFetchRequestData()
397396
.setRequireStable(requireStable)
398397
.setGroups(groups.asJava),
399-
false,
400-
true
398+
false
401399
).build(version)
402400

403401
val response = connectAndReceive[OffsetFetchResponse](request)
@@ -417,8 +415,7 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
417415
new OffsetFetchRequestData()
418416
.setRequireStable(requireStable)
419417
.setGroups(List(group).asJava),
420-
false,
421-
true
418+
false
422419
).build(version)
423420

424421
val response = connectAndReceive[OffsetFetchResponse](request)

core/src/test/scala/unit/kafka/server/KafkaApisTest.scala

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1053,7 +1053,7 @@ class KafkaApisTest extends Logging {
10531053
.setPartitionIndex(0)
10541054
.setCommittedOffset(10)))))
10551055

1056-
val requestChannelRequest = buildRequest(OffsetCommitRequest.Builder.forTopicIdsOrNames(offsetCommitRequest, true).build(version))
1056+
val requestChannelRequest = buildRequest(OffsetCommitRequest.Builder.forTopicIdsOrNames(offsetCommitRequest).build(version))
10571057

10581058
val future = new CompletableFuture[OffsetCommitResponseData]()
10591059
when(groupCoordinator.commitOffsets(
@@ -1114,7 +1114,7 @@ class KafkaApisTest extends Logging {
11141114
.setPartitionIndex(0)
11151115
.setCommittedOffset(10)))))
11161116

1117-
val requestChannelRequest = buildRequest(OffsetCommitRequest.Builder.forTopicIdsOrNames(offsetCommitRequest, true).build(version))
1117+
val requestChannelRequest = buildRequest(OffsetCommitRequest.Builder.forTopicIdsOrNames(offsetCommitRequest).build(version))
11181118

11191119
val future = new CompletableFuture[OffsetCommitResponseData]()
11201120
when(groupCoordinator.commitOffsets(
@@ -1192,7 +1192,7 @@ class KafkaApisTest extends Logging {
11921192
.setPartitionIndex(1)
11931193
.setCommittedOffset(70)))))
11941194

1195-
val requestChannelRequest = buildRequest(OffsetCommitRequest.Builder.forTopicIdsOrNames(offsetCommitRequest, true).build())
1195+
val requestChannelRequest = buildRequest(OffsetCommitRequest.Builder.forTopicIdsOrNames(offsetCommitRequest).build())
11961196

11971197
// This is the request expected by the group coordinator.
11981198
val expectedOffsetCommitRequest = new OffsetCommitRequestData()
@@ -9007,8 +9007,7 @@ class KafkaApisTest extends Logging {
90079007
.setGroupId("group-4")
90089008
.setTopics(null),
90099009
)),
9010-
false,
9011-
true
9010+
false
90129011
).build(version)
90139012
)
90149013
}
@@ -9170,8 +9169,7 @@ class KafkaApisTest extends Logging {
91709169
.setGroupId("group-2")
91719170
.setTopics(null)
91729171
)),
9173-
false,
9174-
true
9172+
false
91759173
).build(version)
91769174
)
91779175
}
@@ -9379,8 +9377,7 @@ class KafkaApisTest extends Logging {
93799377
.setGroupId("group-1")
93809378
.setTopics(null) // all offsets.
93819379
)),
9382-
false,
9383-
true
9380+
false
93849381
).build(version))
93859382
}
93869383

@@ -9506,8 +9503,7 @@ class KafkaApisTest extends Logging {
95069503
.setGroupId("group-4")
95079504
.setTopics(null),
95089505
)),
9509-
false,
9510-
true
9506+
false
95119507
).build(version)
95129508
)
95139509
}
@@ -9695,8 +9691,7 @@ class KafkaApisTest extends Logging {
96959691
.setPartitionIndexes(util.List.of[Integer](0))
96969692
))
96979693
)),
9698-
false,
9699-
true
9694+
false
97009695
).build(version)
97019696
)
97029697
}

0 commit comments

Comments
 (0)