Skip to content

Conversation

@ShivsundarR
Copy link
Contributor

@ShivsundarR ShivsundarR commented Nov 10, 2025

What

  • After KIP-1206, when record_limit mode was introduced, we ideally
    want no more than the #records in the maxRecords field in
    ShareFetchRequest.

  • Currently, the client broadcasts the share fetch requests to all nodes
    which host the leaders of the partitions that it is subscribed to.

  • The application thread would be woken up after the first response
    arrives, but meanwhile the responses from other nodes could bring in
    those many #records next and would wait in the buffer, that would mean
    we are wasting the acquisition locks for these records which are
    waiting.

  • Instead we would want to only send the next request when we poll
    again.

  • PR aims to send the request to only 1 node at a time in record_limit
    mode.

  • We are using partition-rotation on each poll so that no partition is
    starved.

There were NCSS checkstyle errors in ShareConsumeRequestManagerTest,
so added a few refactors there to reduce the length.

Performance

  • When we have more consumers than the #partitions(i.e when real sharing
    of data happens in a partition), then we are seeing the performance is
    almost the same as the current approach. But when we have lesser
    consumers than the #partitions, then we see a performance regression as
    client is waiting for a node to return a response before it can send the
    next request.
  • Hence we have introduced this only for record_limit mode for now,
    future work will be done to improve this area.

Reviewers: Andrew Schofield [email protected]

@github-actions github-actions bot added triage PRs from the community consumer clients labels Nov 10, 2025
@AndrewJSchofield AndrewJSchofield added KIP-932 Queues for Kafka ci-approved and removed triage PRs from the community labels Nov 10, 2025
@AndrewJSchofield AndrewJSchofield self-requested a review November 10, 2025 16:04
Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Generally looks pretty good, but I have some formatting feedback for the test, and one comment about perhaps a test case which might sometimes fail unexpectedly.

Please could you merge latest changes from trunk into your branch. Thanks.

Set<TopicPartition> partitions = new HashSet<>();
partitions.add(tp0);
partitions.add(tp1);
subscriptions.assignFromSubscribed(partitions);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is necessarily deterministic order. I worry that this might be a new flaky test.

Copy link
Contributor Author

@ShivsundarR ShivsundarR Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I looked into this and there are 2 ways to solve this I thought.

  1. Change the test to be order-agnostic, this makes the test a bit bigger, but we can make it work.
  2. Use LinkedHashMap in SubscriptionState to maintain insertion order, so that we can be deterministic in the test and in any future tests that other classes may use.

I have updated the PR with option 2 as this was just a temporary map used in assignFromSubscibed before updating the actual assignment. Does this work?

@ShivsundarR
Copy link
Contributor Author

Thanks @AndrewJSchofield for the review,
I had made the refactors mainly for the checkstyle to pass, it was showing code limit exceeded(NCSS), so tried combining the asserts, added some helper functions for it to pass.
Now I reverted the assert changes, and checkstyle still passed, so I have reverted them now.

@AndrewJSchofield AndrewJSchofield merged commit 108ad6e into apache:trunk Nov 18, 2025
24 checks passed
TaiJuWu pushed a commit to TaiJuWu/kafka that referenced this pull request Dec 3, 2025
…de (apache#20855)

*What*

- After KIP-1206, when `record_limit` mode was introduced, we ideally
want no more than the #records in the `maxRecords` field in
`ShareFetchRequest`.
- Currently, the client broadcasts the share fetch requests to all nodes
which host the leaders of the partitions that it is subscribed to.
- The application thread would be woken up after the first response
arrives, but meanwhile the responses from other nodes could bring in
those many #records next and would wait in the buffer, that would mean
we are wasting the acquisition locks for these records which are
waiting.
- Instead we would want to only send the next request when we poll
again.

- PR aims to send the request to only 1 node at a time in record_limit
mode.
- We are using partition-rotation on each poll so that no partition is
starved.

There were NCSS checkstyle errors in `ShareConsumeRequestManagerTest`,
so added a few refactors there to reduce the length.

Performance
- When we have more consumers than the #partitions(i.e when real sharing
of data happens in a partition), then we are seeing the performance is
almost the same as the current approach. But when we have lesser
consumers than the #partitions, then we see a performance regression as
client is waiting for a node to return a response before it can send the
next request.
- Hence we have introduced this only for `record_limit` mode for now,
future work will be done to improve this area.

Reviewers: Andrew Schofield <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants