Skip to content

Commit 0491e11

Browse files
KAFKA-19892: Client use of ShareAcknowledge acquisition lock timeout (#20905)
Part of KIP-1222. Use the acquisitionLockDurationMs in the ShareAcknowledge response to update the value in the share consumer when renew acknowledgements are used. Reviewers: Apoorv Mittal <[email protected]>
1 parent 5cc4c87 commit 0491e11

File tree

9 files changed

+94
-47
lines changed

9 files changed

+94
-47
lines changed

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2993,6 +2993,10 @@ public void testRenewAcknowledgementOnCommitSync() {
29932993
shareConsumer.subscribe(List.of(tp.topic()));
29942994
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 10);
29952995
assertEquals(10, records.count());
2996+
assertEquals(Optional.of(15000), shareConsumer.acquisitionLockTimeoutMs());
2997+
2998+
// The updated acquisition lock timeout is only applied when the next poll is called.
2999+
alterShareRecordLockDurationMs("group1", 25000);
29963000

29973001
int count = 0;
29983002
Map<TopicIdPartition, Optional<KafkaException>> result;
@@ -3004,13 +3008,15 @@ public void testRenewAcknowledgementOnCommitSync() {
30043008
}
30053009
result = shareConsumer.commitSync();
30063010
assertEquals(1, result.size());
3011+
assertEquals(Optional.of(15000), shareConsumer.acquisitionLockTimeoutMs());
30073012
assertEquals(Optional.empty(), result.get(new TopicIdPartition(tpId, tp.partition(), tp.topic())));
30083013
count++;
30093014
}
30103015

30113016
// Get the rest of all 5 records.
30123017
records = waitedPoll(shareConsumer, 2500L, 5);
30133018
assertEquals(5, records.count());
3019+
assertEquals(Optional.of(25000), shareConsumer.acquisitionLockTimeoutMs());
30143020
for (ConsumerRecord<byte[], byte[]> record : records) {
30153021
shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
30163022
}
@@ -3441,6 +3447,19 @@ private void alterShareIsolationLevel(String groupId, String newValue) {
34413447
}
34423448
}
34433449

3450+
private void alterShareRecordLockDurationMs(String groupId, int newValue) {
3451+
ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId);
3452+
Map<ConfigResource, Collection<AlterConfigOp>> alterEntries = new HashMap<>();
3453+
alterEntries.put(configResource, List.of(new AlterConfigOp(new ConfigEntry(
3454+
GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, Integer.toString(newValue)), AlterConfigOp.OpType.SET)));
3455+
AlterConfigsOptions alterOptions = new AlterConfigsOptions();
3456+
try (Admin adminClient = createAdminClient()) {
3457+
assertDoesNotThrow(() -> adminClient.incrementalAlterConfigs(alterEntries, alterOptions)
3458+
.all()
3459+
.get(60, TimeUnit.SECONDS), "Failed to alter configs");
3460+
}
3461+
}
3462+
34443463
/**
34453464
* Test utility which encapsulates a {@link ShareConsumer} whose record processing
34463465
* behavior can be supplied as a function argument.

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java

Lines changed: 41 additions & 33 deletions
Large diffs are not rendered by default.

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ public void process(final ShareAcknowledgementEvent event) {
138138
completedAcknowledgements.add(event.acknowledgementsMap());
139139
}
140140
if (event.checkForRenewAcknowledgements()) {
141-
currentFetch.renew(event.acknowledgementsMap());
141+
currentFetch.renew(event.acknowledgementsMap(), event.acquisitionLockTimeoutMs());
142142
}
143143
}
144144
}

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
public class ShareFetch<K, V> {
4444
private final Map<TopicIdPartition, ShareInFlightBatch<K, V>> batches;
4545
private Optional<Integer> acquisitionLockTimeoutMs;
46+
private Optional<Integer> acquisitionLockTimeoutMsRenewed;
4647

4748
public static <K, V> ShareFetch<K, V> empty() {
4849
return new ShareFetch<>(new HashMap<>(), Optional.empty());
@@ -51,6 +52,7 @@ public static <K, V> ShareFetch<K, V> empty() {
5152
private ShareFetch(Map<TopicIdPartition, ShareInFlightBatch<K, V>> batches, Optional<Integer> acquisitionLockTimeoutMs) {
5253
this.batches = batches;
5354
this.acquisitionLockTimeoutMs = acquisitionLockTimeoutMs;
55+
this.acquisitionLockTimeoutMsRenewed = Optional.empty();
5456
}
5557

5658
/**
@@ -142,6 +144,10 @@ public void takeRenewedRecords() {
142144
for (Map.Entry<TopicIdPartition, ShareInFlightBatch<K, V>> entry : batches.entrySet()) {
143145
entry.getValue().takeRenewals();
144146
}
147+
// Any acquisition lock timeout updated by renewal is applied as the renewed records are move back to in-flight
148+
if (acquisitionLockTimeoutMsRenewed.isPresent()) {
149+
acquisitionLockTimeoutMs = acquisitionLockTimeoutMsRenewed;
150+
}
145151
}
146152

147153
/**
@@ -235,16 +241,18 @@ public Map<TopicIdPartition, NodeAcknowledgements> takeAcknowledgedRecords() {
235241
* Handles completed renew acknowledgements by returning successfully renewed records
236242
* to the set of in-flight records.
237243
*
238-
* @param acknowledgementsMap Map from topic-partition to acknowledgements for
239-
* completed renew acknowledgements
244+
* @param acknowledgementsMap Map from topic-partition to acknowledgements for
245+
* completed renew acknowledgements
246+
* @param acquisitionLockTimeoutMs Optional updated acquisition lock timeout
240247
*
241248
* @return The number of records renewed
242249
*/
243-
public int renew(Map<TopicIdPartition, Acknowledgements> acknowledgementsMap) {
250+
public int renew(Map<TopicIdPartition, Acknowledgements> acknowledgementsMap, Optional<Integer> acquisitionLockTimeoutMs) {
244251
int recordsRenewed = 0;
245252
for (Map.Entry<TopicIdPartition, Acknowledgements> entry : acknowledgementsMap.entrySet()) {
246253
recordsRenewed += batches.get(entry.getKey()).renew(entry.getValue());
247254
}
255+
acquisitionLockTimeoutMsRenewed = acquisitionLockTimeoutMs;
248256
return recordsRenewed;
249257
}
250258
}

clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgementEvent.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.kafka.common.TopicIdPartition;
2222

2323
import java.util.Map;
24+
import java.util.Optional;
2425

2526
/**
2627
* This is the class of events created by the {@link ConsumerNetworkThread network thread} to indicate completion
@@ -30,11 +31,14 @@ public class ShareAcknowledgementEvent {
3031

3132
private final Map<TopicIdPartition, Acknowledgements> acknowledgementsMap;
3233
private final boolean checkForRenewAcknowledgements;
34+
private final Optional<Integer> acquisitionLockTimeoutMs;
3335

3436
public ShareAcknowledgementEvent(Map<TopicIdPartition, Acknowledgements> acknowledgementsMap,
35-
boolean checkForRenewAcknowledgements) {
37+
boolean checkForRenewAcknowledgements,
38+
Optional<Integer> acquisitionLockTimeoutMs) {
3639
this.acknowledgementsMap = acknowledgementsMap;
3740
this.checkForRenewAcknowledgements = checkForRenewAcknowledgements;
41+
this.acquisitionLockTimeoutMs = acquisitionLockTimeoutMs;
3842
}
3943

4044
public Map<TopicIdPartition, Acknowledgements> acknowledgementsMap() {
@@ -44,4 +48,8 @@ public Map<TopicIdPartition, Acknowledgements> acknowledgementsMap() {
4448
public boolean checkForRenewAcknowledgements() {
4549
return checkForRenewAcknowledgements;
4650
}
51+
52+
public Optional<Integer> acquisitionLockTimeoutMs() {
53+
return acquisitionLockTimeoutMs;
54+
}
4755
}

clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -484,16 +484,16 @@ public void testResultHandlerOnCommitAsync() {
484484
ShareConsumeRequestManager.ResultHandler resultHandler = shareConsumeRequestManager.buildResultHandler(null, Optional.empty());
485485

486486
// Passing null acknowledgements should mean we do not send the background event at all.
487-
resultHandler.complete(tip0, null, ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_ASYNC, false);
487+
resultHandler.complete(tip0, null, ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_ASYNC, false, Optional.empty());
488488
assertEquals(0, completedAcknowledgements.size());
489489

490490
// Setting the request type to COMMIT_SYNC should still not send any background event
491491
// as we have initialized remainingResults to null.
492-
resultHandler.complete(tip0, acknowledgements, ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC, false);
492+
resultHandler.complete(tip0, acknowledgements, ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC, false, Optional.empty());
493493
assertEquals(0, completedAcknowledgements.size());
494494

495495
// Sending non-null acknowledgements means we do send the background event
496-
resultHandler.complete(tip0, acknowledgements, ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_ASYNC, false);
496+
resultHandler.complete(tip0, acknowledgements, ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_ASYNC, false, Optional.empty());
497497
assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());
498498
}
499499

@@ -513,16 +513,16 @@ public void testResultHandlerOnCommitSync() {
513513
ShareConsumeRequestManager.ResultHandler resultHandler = shareConsumeRequestManager.buildResultHandler(resultCount, Optional.of(future));
514514

515515
// We only send the background event after all results have been completed.
516-
resultHandler.complete(tip0, acknowledgements, ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC, false);
516+
resultHandler.complete(tip0, acknowledgements, ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC, false, Optional.empty());
517517
assertEquals(0, completedAcknowledgements.size());
518518
assertFalse(future.isDone());
519519

520-
resultHandler.complete(t2ip0, null, ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC, false);
520+
resultHandler.complete(t2ip0, null, ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC, false, Optional.empty());
521521
assertEquals(0, completedAcknowledgements.size());
522522
assertFalse(future.isDone());
523523

524524
// After third response is received, we send the background event.
525-
resultHandler.complete(tip1, acknowledgements, ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC, false);
525+
resultHandler.complete(tip1, acknowledgements, ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC, false, Optional.empty());
526526
assertEquals(1, completedAcknowledgements.size());
527527
assertEquals(2, completedAcknowledgements.get(0).size());
528528
assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());

clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -539,7 +539,7 @@ public void testExplicitModeRenewAndAcknowledgeOnPoll() {
539539
Acknowledgements acks = Acknowledgements.empty();
540540
acks.add(0, AcknowledgeType.RENEW);
541541
acks.complete(null);
542-
ShareAcknowledgementEvent e = new ShareAcknowledgementEvent(Map.of(tip, acks), true);
542+
ShareAcknowledgementEvent e = new ShareAcknowledgementEvent(Map.of(tip, acks), true, Optional.empty());
543543
acknowledgementEventQueue.add(e);
544544

545545
records = consumer.poll(Duration.ofMillis(100));

clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,11 +184,12 @@ public void testWithRenew() {
184184

185185
Acknowledgements acks = acknowledgementsMap.get(topicAPartition0).acknowledgements();
186186
acks.complete(null);
187-
fetch.renew(Map.of(topicAPartition0, acks));
187+
fetch.renew(Map.of(topicAPartition0, acks), Optional.of(20000));
188188
assertTrue(fetch.hasRenewals());
189189
fetch.takeRenewedRecords();
190190
assertFalse(fetch.hasRenewals());
191191
assertEquals(DEFAULT_MAX_POLL_RECORDS, fetch.numRecords());
192+
assertEquals(Optional.of(20000), fetch.acquisitionLockTimeoutMs());
192193

193194
// Now attempt to collect more records from the fetch buffer.
194195
fetch = fetchCollector.collect(fetchBuffer);

core/src/main/scala/kafka/server/KafkaApis.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4014,6 +4014,9 @@ class KafkaApis(val requestChannel: RequestChannel,
40144014
// the callback for processing a share acknowledge response, invoked before throttling
40154015
def processShareAcknowledgeResponse(responseAcknowledgeData: Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData],
40164016
request: RequestChannel.Request): ShareAcknowledgeResponse = {
4017+
val shareAcknowledgeRequest = request.body[ShareAcknowledgeRequest]
4018+
val groupId = shareAcknowledgeRequest.data.groupId
4019+
40174020
val partitions = new util.LinkedHashMap[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData]
40184021
val nodeEndpoints = new mutable.HashMap[Int, Node]
40194022
responseAcknowledgeData.foreach{ case(tp, partitionData) =>
@@ -4036,7 +4039,7 @@ class KafkaApis(val requestChannel: RequestChannel,
40364039
0,
40374040
partitions,
40384041
nodeEndpoints.values.toList.asJava,
4039-
config.shareGroupConfig.shareGroupRecordLockDurationMs
4042+
ShareFetchUtils.recordLockDurationMsOrDefault(groupConfigManager, groupId, config.shareGroupConfig.shareGroupRecordLockDurationMs)
40404043
)
40414044
}
40424045

0 commit comments

Comments
 (0)