Skip to content

Commit 108ad6e

Browse files
authored
KAFKA-19767 - Send Share-Fetch one-node at a time for record_limit mode (#20855)
*What* - After KIP-1206, when `record_limit` mode was introduced, we ideally want no more than the #records in the `maxRecords` field in `ShareFetchRequest`. - Currently, the client broadcasts the share fetch requests to all nodes which host the leaders of the partitions that it is subscribed to. - The application thread would be woken up after the first response arrives, but meanwhile the responses from other nodes could bring in those many #records next and would wait in the buffer, that would mean we are wasting the acquisition locks for these records which are waiting. - Instead we would want to only send the next request when we poll again. - PR aims to send the request to only 1 node at a time in record_limit mode. - We are using partition-rotation on each poll so that no partition is starved. There were NCSS checkstyle errors in `ShareConsumeRequestManagerTest`, so added a few refactors there to reduce the length. Performance - When we have more consumers than the #partitions(i.e when real sharing of data happens in a partition), then we are seeing the performance is almost the same as the current approach. But when we have lesser consumers than the #partitions, then we see a performance regression as client is waiting for a node to return a response before it can send the next request. - Hence we have introduced this only for `record_limit` mode for now, future work will be done to improve this area. Reviewers: Andrew Schofield <[email protected]>
1 parent 0491e11 commit 108ad6e

File tree

3 files changed

+185
-131
lines changed

3 files changed

+185
-131
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import org.apache.kafka.clients.ClientResponse;
2020
import org.apache.kafka.clients.Metadata;
21+
import org.apache.kafka.clients.consumer.ShareAcquireMode;
2122
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
2223
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest;
2324
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementEvent;
@@ -89,6 +90,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
8990
private final IdempotentCloser idempotentCloser = new IdempotentCloser();
9091
private Uuid memberId;
9192
private boolean fetchMoreRecords = false;
93+
private final AtomicInteger fetchRecordsNodeId = new AtomicInteger(-1);
9294
private final Map<Integer, Map<TopicIdPartition, Acknowledgements>> fetchAcknowledgementsToSend;
9395
private final Map<Integer, Map<TopicIdPartition, Acknowledgements>> fetchAcknowledgementsInFlight;
9496
private final Map<Integer, Tuple<AcknowledgeRequestState>> acknowledgeRequestStates;
@@ -196,6 +198,13 @@ public PollResult poll(long currentTimeMs) {
196198
}
197199
topicNamesMap.putIfAbsent(new IdAndPartition(tip.topicId(), tip.partition()), tip.topic());
198200

201+
// If we have not chosen a node for fetching records yet,
202+
// choose now, and rotate the assigned partitions so the next poll starts on a different partition.
203+
// This is only applicable for record_limit mode.
204+
if (isShareAcquireModeRecordLimit() && fetchRecordsNodeId.compareAndSet(-1, node.id())) {
205+
subscriptions.movePartitionToEnd(partition);
206+
}
207+
199208
log.debug("Added fetch request for partition {} to node {}", tip, node.id());
200209
}
201210
}
@@ -245,6 +254,21 @@ public PollResult poll(long currentTimeMs) {
245254
log.trace("Building ShareFetch request to send to node {}", target.id());
246255
ShareFetchRequest.Builder requestBuilder = handler.newShareFetchBuilder(groupId, shareFetchConfig);
247256

257+
// For record_limit mode, we only send a full ShareFetch to a single node at a time.
258+
// We prepare to build ShareFetch requests for all nodes with session handlers to permit
259+
// piggy-backing of acknowledgements, and also to adjust the topic-partitions
260+
// in the share session.
261+
if (isShareAcquireModeRecordLimit() && target.id() != fetchRecordsNodeId.get()) {
262+
ShareFetchRequestData data = requestBuilder.data();
263+
// If there's nothing to send, just skip building the record.
264+
if (data.topics().isEmpty() && data.forgottenTopicsData().isEmpty()) {
265+
return null;
266+
} else {
267+
// There is something to send, but we don't want to fetch any records.
268+
requestBuilder.data().setMaxRecords(0);
269+
}
270+
}
271+
248272
nodesWithPendingRequests.add(target.id());
249273

250274
BiConsumer<ClientResponse, Throwable> responseHandler = (clientResponse, error) -> {
@@ -255,11 +279,15 @@ public PollResult poll(long currentTimeMs) {
255279
}
256280
};
257281
return new UnsentRequest(requestBuilder, Optional.of(target)).whenComplete(responseHandler);
258-
}).collect(Collectors.toList());
282+
}).filter(Objects::nonNull).collect(Collectors.toList());
259283

260284
return new PollResult(requests);
261285
}
262286

287+
private boolean isShareAcquireModeRecordLimit() {
288+
return shareFetchConfig.shareAcquireMode == ShareAcquireMode.RECORD_LIMIT;
289+
}
290+
263291
/**
264292
* Add acknowledgements for a topic-partition to the node's in-flight acknowledgements.
265293
*
@@ -738,6 +766,15 @@ private boolean isLeaderKnownToHaveChanged(int nodeId, TopicIdPartition topicIdP
738766
return false;
739767
}
740768

769+
@Override
770+
public long maximumTimeToWait(long currentTimeMs) {
771+
// When fetching records and there is no chosen node for fetching, we do not want to wait for the next poll in record_limit mode.
772+
if (isShareAcquireModeRecordLimit() && fetchMoreRecords && subscriptions.numAssignedPartitions() > 0 && fetchRecordsNodeId.get() == -1) {
773+
return 0L;
774+
}
775+
return Long.MAX_VALUE;
776+
}
777+
741778
private void handleShareFetchSuccess(Node fetchTarget,
742779
ShareFetchRequestData requestData,
743780
ClientResponse resp) {
@@ -858,6 +895,9 @@ private void handleShareFetchSuccess(Node fetchTarget,
858895
metricsManager.recordLatency(resp.destination(), resp.requestLatencyMs());
859896
} finally {
860897
log.debug("Removing pending request for node {} - success", fetchTarget.id());
898+
if (isShareAcquireModeRecordLimit()) {
899+
fetchRecordsNodeId.compareAndSet(fetchTarget.id(), -1);
900+
}
861901
nodesWithPendingRequests.remove(fetchTarget.id());
862902
}
863903
}
@@ -896,6 +936,9 @@ private void handleShareFetchFailure(Node fetchTarget,
896936
}));
897937
} finally {
898938
log.debug("Removing pending request for node {} - failed", fetchTarget.id());
939+
if (isShareAcquireModeRecordLimit()) {
940+
fetchRecordsNodeId.compareAndSet(fetchTarget.id(), -1);
941+
}
899942
nodesWithPendingRequests.remove(fetchTarget.id());
900943
}
901944
}

clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.HashMap;
4040
import java.util.HashSet;
4141
import java.util.Iterator;
42+
import java.util.LinkedHashMap;
4243
import java.util.List;
4344
import java.util.Map;
4445
import java.util.Objects;
@@ -315,7 +316,7 @@ public synchronized void assignFromSubscribed(Collection<TopicPartition> assignm
315316
if (!this.hasAutoAssignedPartitions())
316317
throw new IllegalArgumentException("Attempt to dynamically assign partitions while manual assignment in use");
317318

318-
Map<TopicPartition, TopicPartitionState> assignedPartitionStates = new HashMap<>(assignments.size());
319+
Map<TopicPartition, TopicPartitionState> assignedPartitionStates = new LinkedHashMap<>(assignments.size());
319320
for (TopicPartition tp : assignments) {
320321
TopicPartitionState state = this.assignment.stateValue(tp);
321322
if (state == null)

0 commit comments

Comments
 (0)