Skip to content

Commit 33400cd

Browse files
committed
Merge branch 'trunk' into KAFKA-19940
2 parents 17528ad + 71299c2 commit 33400cd

File tree

100 files changed

+4161
-3962
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

100 files changed

+4161
-3962
lines changed

build.gradle

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -875,6 +875,29 @@ subprojects {
875875
removeUnusedImports()
876876
}
877877
}
878+
879+
tasks.register("checkJsonLicenseHeader") {
880+
group = "Verification"
881+
description = "Verify ASF License header in JSON files"
882+
883+
doLast {
884+
def asfHeader = file("${rootProject.projectDir}/gradle/LICENSE.json").readLines()*.trim()
885+
886+
def incorrectFiles = fileTree(projectDir) {
887+
include 'src/main/resources/**/*.json'
888+
include 'src/test/resources/**/*.json'
889+
}.findAll { f ->
890+
f.readLines().take(asfHeader.size())*.trim() != asfHeader
891+
}
892+
893+
if (incorrectFiles) {
894+
println "\n${project.name}: Incorrect ASF license header in JSON files:"
895+
incorrectFiles.each { println " - $it" }
896+
throw new GradleException("${incorrectFiles.size()} JSON files with incorrect ASF license header")
897+
}
898+
}
899+
}
900+
check.dependsOn("checkJsonLicenseHeader")
878901
}
879902

880903
gradle.taskGraph.whenReady { taskGraph ->

checkstyle/suppressions.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@
250250
files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|StreamThreadTest|TaskManagerTest|StreamTaskTest).java"/>
251251

252252
<suppress checks="NPathComplexity"
253-
files="(EosV2UpgradeIntegrationTest|EosTestDriver|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RelationalSmokeTest|MockProcessorContextStateStoreTest|TopologyTestDriverTest|IQv2StoreIntegrationTest).java"/>
253+
files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RelationalSmokeTest|MockProcessorContextStateStoreTest|TopologyTestDriverTest|IQv2StoreIntegrationTest).java"/>
254254

255255
<suppress checks="(FinalLocalVariable|WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
256256
files="Murmur3Test.java"/>
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.clients.admin;
18+
19+
import org.apache.kafka.common.test.ClusterInstance;
20+
import org.apache.kafka.common.test.api.ClusterConfigProperty;
21+
import org.apache.kafka.common.test.api.ClusterTest;
22+
import org.apache.kafka.common.test.api.Type;
23+
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
24+
import org.apache.kafka.server.common.Feature;
25+
import org.apache.kafka.server.common.GroupVersion;
26+
import org.apache.kafka.server.common.KRaftVersion;
27+
import org.apache.kafka.server.common.MetadataVersion;
28+
import org.apache.kafka.server.common.ShareVersion;
29+
import org.apache.kafka.server.common.StreamsVersion;
30+
import org.apache.kafka.server.common.TransactionVersion;
31+
32+
import java.util.Map;
33+
import java.util.concurrent.ExecutionException;
34+
35+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
36+
import static org.junit.jupiter.api.Assertions.assertEquals;
37+
import static org.junit.jupiter.api.Assertions.assertThrows;
38+
39+
public class DescribeFeaturesTest {
40+
41+
@ClusterTest(
42+
types = {Type.KRAFT},
43+
metadataVersion = MetadataVersion.IBP_3_7_IV0,
44+
controllers = 2,
45+
brokers = 2,
46+
serverProperties = {
47+
@ClusterConfigProperty(
48+
id = 3000,
49+
key = "unstable.api.versions.enable",
50+
value = "true"
51+
),
52+
@ClusterConfigProperty(
53+
id = 3001,
54+
key = "unstable.api.versions.enable",
55+
value = "false"
56+
),
57+
@ClusterConfigProperty(
58+
id = 0,
59+
key = "unstable.feature.versions.enable",
60+
value = "true"
61+
),
62+
@ClusterConfigProperty(
63+
id = 1,
64+
key = "unstable.feature.versions.enable",
65+
value = "false"
66+
)
67+
}
68+
)
69+
public void testUnstableApiVersions(ClusterInstance clusterInstance) {
70+
// Test unstable.api.versions.enable on controller nodes
71+
try (Admin admin = clusterInstance.admin(Map.of(), true)) {
72+
// unstable.api.versions.enable is true on node 3000
73+
assertFeatures(admin, 3000, true, clusterInstance.config().metadataVersion());
74+
75+
// unstable.api.versions.enable is false on node 3001
76+
assertFeatures(admin, 3001, false, clusterInstance.config().metadataVersion());
77+
}
78+
79+
// Test unstable.feature.versions.enable on broker nodes
80+
try (Admin admin = clusterInstance.admin()) {
81+
// unstable.feature.versions.enable is true on node 0
82+
assertFeatures(admin, 0, true, clusterInstance.config().metadataVersion());
83+
84+
// unstable.feature.versions.enable is false on node 1
85+
assertFeatures(admin, 1, false, clusterInstance.config().metadataVersion());
86+
}
87+
}
88+
89+
@ClusterTest(types = {Type.KRAFT})
90+
public void testSendRequestToWrongNodeType(ClusterInstance clusterInstance) {
91+
try (Admin admin = clusterInstance.admin()) {
92+
// use bootstrap-servers to send request to controller
93+
assertThrows(
94+
ExecutionException.class,
95+
() -> admin.describeFeatures(new DescribeFeaturesOptions().nodeId(3000).timeoutMs(1000)).featureMetadata().get());
96+
}
97+
98+
try (Admin admin = clusterInstance.admin(Map.of(), true)) {
99+
// use bootstrap-controllers to send request to broker
100+
assertThrows(
101+
ExecutionException.class,
102+
() -> admin.describeFeatures(new DescribeFeaturesOptions().nodeId(0).timeoutMs(1000)).featureMetadata().get());
103+
}
104+
}
105+
106+
private void assertFeatures(Admin admin, int nodeId, boolean unstable, MetadataVersion metadataVersion) {
107+
FeatureMetadata featureMetadata = assertDoesNotThrow(
108+
() -> admin.describeFeatures(new DescribeFeaturesOptions().nodeId(nodeId)).featureMetadata().get());
109+
110+
assertEquals(Map.of(
111+
MetadataVersion.FEATURE_NAME, new FinalizedVersionRange(metadataVersion.featureLevel(), metadataVersion.featureLevel())
112+
), featureMetadata.finalizedFeatures());
113+
114+
if (unstable) {
115+
assertEquals(Map.of(
116+
GroupVersion.FEATURE_NAME, new SupportedVersionRange(Feature.GROUP_VERSION.minimumProduction(), Feature.GROUP_VERSION.latestTesting()),
117+
KRaftVersion.FEATURE_NAME, new SupportedVersionRange(Feature.KRAFT_VERSION.minimumProduction(), Feature.KRAFT_VERSION.latestTesting()),
118+
MetadataVersion.FEATURE_NAME, new SupportedVersionRange(MetadataVersion.MINIMUM_VERSION.featureLevel(), MetadataVersion.latestTesting().featureLevel()),
119+
ShareVersion.FEATURE_NAME, new SupportedVersionRange(Feature.SHARE_VERSION.minimumProduction(), Feature.SHARE_VERSION.latestTesting()),
120+
StreamsVersion.FEATURE_NAME, new SupportedVersionRange(Feature.STREAMS_VERSION.minimumProduction(), Feature.STREAMS_VERSION.latestTesting()),
121+
TransactionVersion.FEATURE_NAME, new SupportedVersionRange(Feature.TRANSACTION_VERSION.minimumProduction(), Feature.TRANSACTION_VERSION.latestTesting()),
122+
EligibleLeaderReplicasVersion.FEATURE_NAME, new SupportedVersionRange(Feature.ELIGIBLE_LEADER_REPLICAS_VERSION.minimumProduction(), Feature.ELIGIBLE_LEADER_REPLICAS_VERSION.latestTesting())
123+
), featureMetadata.supportedFeatures());
124+
} else {
125+
assertEquals(Map.of(
126+
GroupVersion.FEATURE_NAME, new SupportedVersionRange(Feature.GROUP_VERSION.minimumProduction(), Feature.GROUP_VERSION.latestProduction()),
127+
KRaftVersion.FEATURE_NAME, new SupportedVersionRange(Feature.KRAFT_VERSION.minimumProduction(), Feature.KRAFT_VERSION.latestProduction()),
128+
MetadataVersion.FEATURE_NAME, new SupportedVersionRange(MetadataVersion.MINIMUM_VERSION.featureLevel(), MetadataVersion.latestProduction().featureLevel()),
129+
ShareVersion.FEATURE_NAME, new SupportedVersionRange(Feature.SHARE_VERSION.minimumProduction(), Feature.SHARE_VERSION.latestProduction()),
130+
StreamsVersion.FEATURE_NAME, new SupportedVersionRange(Feature.STREAMS_VERSION.minimumProduction(), Feature.STREAMS_VERSION.latestProduction()),
131+
TransactionVersion.FEATURE_NAME, new SupportedVersionRange(Feature.TRANSACTION_VERSION.minimumProduction(), Feature.TRANSACTION_VERSION.latestProduction()),
132+
EligibleLeaderReplicasVersion.FEATURE_NAME, new SupportedVersionRange(Feature.ELIGIBLE_LEADER_REPLICAS_VERSION.minimumProduction(), Feature.ELIGIBLE_LEADER_REPLICAS_VERSION.latestProduction())
133+
), featureMetadata.supportedFeatures());
134+
}
135+
}
136+
}

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java

Lines changed: 10 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2218,29 +2218,6 @@ public void testShareConsumerAfterCoordinatorMovement() throws Exception {
22182218
verifyShareGroupStateTopicRecordsProduced();
22192219
}
22202220

2221-
@ClusterTest
2222-
public void testDeliveryCountNotIncreaseAfterSessionClose() {
2223-
alterShareAutoOffsetReset("group1", "earliest");
2224-
try (Producer<byte[], byte[]> producer = createProducer()) {
2225-
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
2226-
// We write 10 records to the topic, so they would be written from offsets 0-9 on the topic.
2227-
for (int i = 0; i < 10; i++) {
2228-
assertDoesNotThrow(() -> producer.send(record).get(), "Failed to send records");
2229-
}
2230-
}
2231-
2232-
// Perform the fetch, close in a loop.
2233-
for (int count = 0; count < ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT; count++) {
2234-
consumeMessages(new AtomicInteger(0), 10, "group1", 1, 10, false);
2235-
}
2236-
2237-
// If the delivery count is increased, consumer will get nothing.
2238-
int consumedMessageCount = consumeMessages(new AtomicInteger(0), 10, "group1", 1, 10, true);
2239-
// The records returned belong to offsets 0-9.
2240-
assertEquals(10, consumedMessageCount);
2241-
verifyShareGroupStateTopicRecordsProduced();
2242-
}
2243-
22442221
@ClusterTest
22452222
public void testDeliveryCountDifferentBehaviorWhenClosingSessionWithExplicitAcknowledgement() {
22462223
alterShareAutoOffsetReset("group1", "earliest");
@@ -2270,13 +2247,13 @@ public void testDeliveryCountDifferentBehaviorWhenClosingSessionWithExplicitAckn
22702247
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 2);
22712248
assertEquals(2, records.count());
22722249
assertEquals((short) 2, records.records(tp).get(0).deliveryCount().get());
2273-
assertEquals((short) 1, records.records(tp).get(1).deliveryCount().get());
2250+
assertEquals((short) 2, records.records(tp).get(1).deliveryCount().get());
22742251
}
22752252
}
22762253

22772254
@ClusterTest(
22782255
serverProperties = {
2279-
@ClusterConfigProperty(key = "group.share.delivery.count.limit", value = "2"),
2256+
@ClusterConfigProperty(key = "group.share.delivery.count.limit", value = "3"),
22802257
}
22812258
)
22822259
public void testBehaviorOnDeliveryCountBoundary() {
@@ -2304,15 +2281,14 @@ public void testBehaviorOnDeliveryCountBoundary() {
23042281
records = waitedPoll(shareConsumer, 2500L, 1);
23052282
assertEquals(1, records.count());
23062283
assertEquals((short) 2, records.records(tp).get(0).deliveryCount().get());
2307-
23082284
}
23092285

23102286
// Start again and same record should be delivered
23112287
try (ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1", Map.of())) {
23122288
shareConsumer.subscribe(Set.of(tp.topic()));
23132289
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 1);
23142290
assertEquals(1, records.count());
2315-
assertEquals((short) 2, records.records(tp).get(0).deliveryCount().get());
2291+
assertEquals((short) 3, records.records(tp).get(0).deliveryCount().get());
23162292
}
23172293
}
23182294

@@ -2369,9 +2345,9 @@ public void testComplexShareConsumer() throws Exception {
23692345
// Let the complex consumer read the messages.
23702346
service.schedule(() -> prodState.done().set(true), 5L, TimeUnit.SECONDS);
23712347

2372-
// All messages which can be read are read, some would be redelivered (roughly 3 times the records produced).
2348+
// All messages which can be read are read, some would be redelivered (roughly 2 times the records produced).
23732349
TestUtils.waitForCondition(complexCons1::isDone, 45_000L, () -> "did not close!");
2374-
int delta = complexCons1.recordsRead() - (int) (prodState.count().get() * 3 * 0.95); // 3 times with margin of error (5%).
2350+
int delta = complexCons1.recordsRead() - (int) (prodState.count().get() * 2 * 0.95); // 2 times with margin of error (5%).
23752351

23762352
assertTrue(delta > 0,
23772353
String.format("Producer (%d) and share consumer (%d) record count mismatch.", prodState.count().get(), complexCons1.recordsRead()));
@@ -3492,7 +3468,11 @@ public void testFetchWithThrottledDeliveryBatchesWithIncreasedDeliveryLimit() {
34923468
try (Producer<byte[], byte[]> producer = createProducer();
34933469
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
34943470
"group1",
3495-
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))
3471+
Map.of(
3472+
ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT,
3473+
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 512
3474+
)
3475+
)
34963476
) {
34973477
// Produce records in complete power of 2 to fully test the throttling behavior.
34983478
int producedMessageCount = 512;

clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,27 @@
1616
*/
1717
package org.apache.kafka.clients.admin;
1818

19+
import java.util.OptionalInt;
20+
1921
/**
2022
* Options for {@link AdminClient#describeFeatures(DescribeFeaturesOptions)}.
2123
*/
2224
public class DescribeFeaturesOptions extends AbstractOptions<DescribeFeaturesOptions> {
25+
private OptionalInt nodeId = OptionalInt.empty();
26+
27+
/**
28+
* Set the node id to which the request should be sent.
29+
*/
30+
public DescribeFeaturesOptions nodeId(int nodeId) {
31+
this.nodeId = OptionalInt.of(nodeId);
32+
return this;
33+
}
34+
35+
/**
36+
* The node id to which the request should be sent. If the node id is empty, the request will be sent to the
37+
* arbitrary controller/broker.
38+
*/
39+
public OptionalInt nodeId() {
40+
return nodeId;
41+
}
2342
}

clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4519,8 +4519,10 @@ private static byte[] getSaltedPassword(ScramMechanism publicScramMechanism, byt
45194519
public DescribeFeaturesResult describeFeatures(final DescribeFeaturesOptions options) {
45204520
final KafkaFutureImpl<FeatureMetadata> future = new KafkaFutureImpl<>();
45214521
final long now = time.milliseconds();
4522+
final NodeProvider nodeProvider = options.nodeId().isPresent() ?
4523+
new ConstantNodeIdProvider(options.nodeId().getAsInt(), true) : new LeastLoadedBrokerOrActiveKController();
45224524
final Call call = new Call(
4523-
"describeFeatures", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedBrokerOrActiveKController()) {
4525+
"describeFeatures", calcDeadlineMs(now, options.timeoutMs()), nodeProvider) {
45244526

45254527
private FeatureMetadata createFeatureMetadata(final ApiVersionsResponse response) {
45264528
final Map<String, FinalizedVersionRange> finalizedFeatures = new HashMap<>();

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -253,24 +253,20 @@ public PollResult poll(long currentTimeMs) {
253253
Node target = entry.getKey();
254254
ShareSessionHandler handler = entry.getValue();
255255

256-
log.trace("Building ShareFetch request to send to node {}", target.id());
257-
ShareFetchRequest.Builder requestBuilder = handler.newShareFetchBuilder(groupId, shareFetchConfig);
258-
259256
// For record_limit mode, we only send a full ShareFetch to a single node at a time.
260257
// We prepare to build ShareFetch requests for all nodes with session handlers to permit
261258
// piggy-backing of acknowledgements, and also to adjust the topic-partitions
262-
// in the share session.
263-
if (isShareAcquireModeRecordLimit() && target.id() != fetchRecordsNodeId.get()) {
264-
ShareFetchRequestData data = requestBuilder.data();
265-
// If there's nothing to send, just skip building the record.
266-
if (data.topics().isEmpty() && data.forgottenTopicsData().isEmpty()) {
267-
return null;
268-
} else {
269-
// There is something to send, but we don't want to fetch any records.
270-
requestBuilder.data().setMaxRecords(0);
271-
}
259+
// in the share session, but if the request would contain neither of those, it can be skipped.
260+
boolean canSkipIfRequestEmpty = isShareAcquireModeRecordLimit() && target.id() != fetchRecordsNodeId.get();
261+
262+
ShareFetchRequest.Builder requestBuilder = handler.newShareFetchBuilder(groupId, shareFetchConfig, canSkipIfRequestEmpty);
263+
if (requestBuilder == null) {
264+
log.trace("Skipping ShareFetch request to send to node {}", target.id());
265+
return null;
272266
}
273267

268+
log.trace("Building ShareFetch request to send to node {}", target.id());
269+
274270
nodesWithPendingRequests.add(target.id());
275271

276272
BiConsumer<ClientResponse, Throwable> responseHandler = (clientResponse, error) -> {

0 commit comments

Comments
 (0)