-
Notifications
You must be signed in to change notification settings - Fork 14.8k
KAFKA-19767 - Send Share-Fetch one-node at a time for record_limit mode #20855
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
AndrewJSchofield
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Generally looks pretty good, but I have some formatting feedback for the test, and one comment about perhaps a test case which might sometimes fail unexpectedly.
Please could you merge latest changes from trunk into your branch. Thanks.
...rc/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
Outdated
Show resolved
Hide resolved
...rc/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
Outdated
Show resolved
Hide resolved
...rc/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
Outdated
Show resolved
Hide resolved
...rc/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
Outdated
Show resolved
Hide resolved
...rc/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
Outdated
Show resolved
Hide resolved
...rc/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
Outdated
Show resolved
Hide resolved
...rc/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
Outdated
Show resolved
Hide resolved
...rc/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
Outdated
Show resolved
Hide resolved
| Set<TopicPartition> partitions = new HashSet<>(); | ||
| partitions.add(tp0); | ||
| partitions.add(tp1); | ||
| subscriptions.assignFromSubscribed(partitions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is necessarily deterministic order. I worry that this might be a new flaky test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I looked into this and there are 2 ways to solve this I thought.
- Change the test to be order-agnostic, this makes the test a bit bigger, but we can make it work.
- Use
LinkedHashMapinSubscriptionStateto maintain insertion order, so that we can be deterministic in the test and in any future tests that other classes may use.
I have updated the PR with option 2 as this was just a temporary map used in assignFromSubscibed before updating the actual assignment. Does this work?
|
Thanks @AndrewJSchofield for the review, |
…de (apache#20855) *What* - After KIP-1206, when `record_limit` mode was introduced, we ideally want no more than the #records in the `maxRecords` field in `ShareFetchRequest`. - Currently, the client broadcasts the share fetch requests to all nodes which host the leaders of the partitions that it is subscribed to. - The application thread would be woken up after the first response arrives, but meanwhile the responses from other nodes could bring in those many #records next and would wait in the buffer, that would mean we are wasting the acquisition locks for these records which are waiting. - Instead we would want to only send the next request when we poll again. - PR aims to send the request to only 1 node at a time in record_limit mode. - We are using partition-rotation on each poll so that no partition is starved. There were NCSS checkstyle errors in `ShareConsumeRequestManagerTest`, so added a few refactors there to reduce the length. Performance - When we have more consumers than the #partitions(i.e when real sharing of data happens in a partition), then we are seeing the performance is almost the same as the current approach. But when we have lesser consumers than the #partitions, then we see a performance regression as client is waiting for a node to return a response before it can send the next request. - Hence we have introduced this only for `record_limit` mode for now, future work will be done to improve this area. Reviewers: Andrew Schofield <[email protected]>
What
After KIP-1206, when
record_limitmode was introduced, we ideallywant no more than the #records in the
maxRecordsfield inShareFetchRequest.Currently, the client broadcasts the share fetch requests to all nodes
which host the leaders of the partitions that it is subscribed to.
The application thread would be woken up after the first response
arrives, but meanwhile the responses from other nodes could bring in
those many #records next and would wait in the buffer, that would mean
we are wasting the acquisition locks for these records which are
waiting.
Instead we would want to only send the next request when we poll
again.
PR aims to send the request to only 1 node at a time in record_limit
mode.
We are using partition-rotation on each poll so that no partition is
starved.
There were NCSS checkstyle errors in
ShareConsumeRequestManagerTest,so added a few refactors there to reduce the length.
Performance
of data happens in a partition), then we are seeing the performance is
almost the same as the current approach. But when we have lesser
consumers than the #partitions, then we see a performance regression as
client is waiting for a node to return a response before it can send the
next request.
record_limitmode for now,future work will be done to improve this area.
Reviewers: Andrew Schofield [email protected]