Skip to content

Conversation

@ShivsundarR
Copy link
Contributor

@ShivsundarR ShivsundarR commented Jun 3, 2025

https://issues.apache.org/jira/browse/KAFKA-17853 -

  • There is an issue with the console share consumer where if the broker
    is unavailable, even after force terminating using ctrl-c, the consumer
    does not shut down immediately. It takes around ~30 seconds to close
    once the broker shuts down.

  • The console consumer on the other hand, was supposedly shutting down
    immediately once we press ctrl-c. On reproducing the issue with a local
    kafka server, I observed the issue was present in both the console
    consumer and the console share consumer.

Issue :

  • On seeing the client debug logs, this issue seemed related to network
    thread sending repeated FindCoordinator requests until the timer
    expired. This was happening in both the console-consumer and
    console-share-consumer.

  • Debug logs showed that when the broker is shut down, the heartbeat
    fails with a DisconnectException(which is retriable), this triggers a
    findCoordinator request on the network thread which retries until the
    default timeout expires.

  • This request is sent even before we trigger a close on the consumer,
    so once we press ctrl-c, although the ConsumerNetworkThread::close()
    is triggered, it waits for the default timeout until all the requests
    are sent out for a graceful shutdown.

PR aims to fix this issue by adding a check in NetworkClientDelegate
to remove any pending unsent requests(with empty node values) during
close. This would avoid unnecessary retries and the consumers would
shut down immediately upon termination.

Share consumers shutting down after the fix.

[2025-06-03 16:23:42,175] DEBUG [ShareConsumer
clientId=console-share-consumer, groupId=console-share-consumer]
Removing unsent request
UnsentRequest{requestBuilder=FindCoordinatorRequestData(key='console-share-consumer',
keyType=0, coordinatorKeys=[]),
handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@2b351de8,
node=Optional.empty, remainingMs=28565} because the client is closing
(org.apache.kafka.clients.consumer.internals.NetworkClientDelegate)
[2025-06-03 16:23:42,175] DEBUG [ShareConsumer
clientId=console-share-consumer, groupId=console-share-consumer]
FindCoordinator request failed due to retriable exception
(org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager)
org.apache.kafka.common.errors.NetworkException: The server disconnected
before a response was received.
[2025-06-03 16:23:42,176] DEBUG [ShareConsumer
clientId=console-share-consumer, groupId=console-share-consumer] Closing
RequestManagers
(org.apache.kafka.clients.consumer.internals.RequestManagers)
[2025-06-03 16:23:42,177] DEBUG [ShareConsumer
clientId=console-share-consumer, groupId=console-share-consumer]
RequestManagers has been closed
(org.apache.kafka.clients.consumer.internals.RequestManagers)
[2025-06-03 16:23:42,179] DEBUG [ShareConsumer
clientId=console-share-consumer, groupId=console-share-consumer] Closed
the consumer network thread
(org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread)
[2025-06-03 16:23:42,181] DEBUG [ShareConsumer
clientId=console-share-consumer, groupId=console-share-consumer] Kafka
share consumer has been closed
(org.apache.kafka.clients.consumer.internals.ShareConsumerImpl)
Processed a total of 0 messages

Regular consumers shutting down after the fix.

[2025-06-03 16:24:27,196] DEBUG [Consumer clientId=console-consumer,
groupId=console-consumer-5671] Removing unsent request
UnsentRequest{requestBuilder=FindCoordinatorRequestData(key='console-consumer-5671',
keyType=0, coordinatorKeys=[]),
handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@3770591b,
node=Optional.empty, remainingMs=29160} because the client is closing
(org.apache.kafka.clients.consumer.internals.NetworkClientDelegate)
[2025-06-03 16:24:27,196] DEBUG [Consumer clientId=console-consumer,
groupId=console-consumer-5671] FindCoordinator request failed due to
retriable exception
(org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager)
org.apache.kafka.common.errors.NetworkException: The server disconnected
before a response was received.
[2025-06-03 16:24:27,197] DEBUG [Consumer clientId=console-consumer,
groupId=console-consumer-5671] Closing RequestManagers
(org.apache.kafka.clients.consumer.internals.RequestManagers)
[2025-06-03 16:24:27,197] DEBUG [Consumer clientId=console-consumer,
groupId=console-consumer-5671] Removing test-topic-23-0 from buffered
fetch data as it is not in the set of partitions to retain ([])
(org.apache.kafka.clients.consumer.internals.FetchBuffer)
[2025-06-03 16:24:27,197] DEBUG [Consumer clientId=console-consumer,
groupId=console-consumer-5671] RequestManagers has been closed
(org.apache.kafka.clients.consumer.internals.RequestManagers)
[2025-06-03 16:24:27,200] DEBUG [Consumer clientId=console-consumer,
groupId=console-consumer-5671] Closed the consumer network thread
(org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread)
[2025-06-03 16:24:27,202] DEBUG [Consumer clientId=console-consumer,
groupId=console-consumer-5671] Kafka consumer has been closed
(org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer)
Processed a total of 0 messages

Reviewers: Lianet Magrans [email protected], Kirk True
[email protected], Andrew Schofield [email protected]

@github-actions github-actions bot added the triage PRs from the community label Jun 3, 2025
@ShivsundarR ShivsundarR requested a review from lianetm June 3, 2025 10:55
@ShivsundarR ShivsundarR added ci-approved and removed triage PRs from the community labels Jun 3, 2025
@AndrewJSchofield AndrewJSchofield added the KIP-932 Queues for Kafka label Jun 3, 2025
Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. I like the approach but want to review more deeply. A few comments to address.

@github-actions github-actions bot removed the small Small PRs label Jun 3, 2025
@github-actions github-actions bot added the small Small PRs label Jun 3, 2025
@apoorvmittal10
Copy link
Contributor

Great to see the fix in the area, thanks for looking into this.

@github-actions github-actions bot removed the small Small PRs label Jun 11, 2025
@ShivsundarR
Copy link
Contributor Author

Another edge case here:
If the broker never started, and we attempt a ctrl-c to close the consumer, then still took 30 seconds for the ConsoleShareConsumer to close. The problem lied with the ShareConsumeRequestManager where we do not complete the closeFuture when the memberId is null.
Added a fix for the same and verified that both the ConsoleConsumer and ConsoleShareConsumer work consistently and close even if the broker never started.

@lianetm
Copy link
Member

lianetm commented Jun 11, 2025

Hey, thanks for looking into this! Sorry I haven't had the time to look in detail, but just couple of high level comments:

This means "close with default close timeout of 30s". So in a way, it's not unexpected that it waits right?. We should ensure that's what we want from the console consumers. Ex. calling close(ZERO) would be the way if we want to "close and attempt sending all requests once without retrying/waiting"

@ShivsundarR
Copy link
Contributor Author

Hi @lianetm, thanks for the review.

  1. So here the abort of the FindCoordinator will only happen when the network thread closes, i.e after all the pending async commits/acknowledgements have completed. So this should not affect the findCoordinator request issued for an async commit right?
    https://github.com/apache/kafka/blob/7c715c02c06f16475faff8aa72048cddb7382c8a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java#L887C9-L888C163
  2. I agree, we could go with close(Duration.ZERO) too to achieve immediate close on ctrl-c. We just need to decide if we want this functionality or not, seems like an immediate close on ctrl-C would be nice from a user experience perspective.
    I am just wondering if we make the ConsoleConsumer/ConsoleShareConsumer do a close(Duration.ZERO), then on certain cases where we actually need some time to send async commits/acknowledgements, we might force close the consumer sooner. This could be an issue right?
    The PR currently only modifies the code when the network thread is closed(i.e after all the commits/acks have been sent/handled), so this ideally should not affect the prior steps in closing.

@kirktrue
Copy link
Contributor

Thanks for the PR, @ShivsundarR.

Since this change affects all consumer use cases, we need to tread carefully 😄

The console consumer is closing the underlying consumer with the default close() API, which uses a timeout of 30 seconds. That means the consumer is within its right to use up all 30 seconds to clean up. The change as is looks a little too broad because it assumes that because a node isn't immediately available that it should abort the request. This doesn't allow for the case where a node later becomes available within the timeout. There is a dependency with, e.g. OFFSET_COMMIT and FIND_COORDINATOR, so if the user is trying to commit their offsets, IMO we should try exhaustively to find the coordinator.

That said, the close() process is rife with edge cases and I'm sure there are things to fix.

Is there an approach by which we can "prove" that continuing to make requests is pointless?

@ShivsundarR
Copy link
Contributor Author

Hi @kirktrue, thanks for the review.
Yes I agree, the consumer should be able to use the 30 seconds if needed. I think the abort here only happens when the request need not be sent anymore. The abort happens

  • only when onClose in the ConsumerNetworkThread is true, and

  • onClose will be true only when the ConsumerNetworkThread::cleanup is called

  • The cleanup of network thread happens right at the end of close() here after the completion of commitSync(), and updating callbacks.

  • And anyway we do intend to stop sending findCoordinator requests before the network thread closes here, so ideally this should not be a problem.

  • If there happened to be a findCoordinator request issued before the stopFindCoordinatorOnClose event was sent, that goes into a loop of retries when a node is unavailable.

  • For commitSync/acknowledgements(for ShareConsumers) which occur during close, we do NOT abort the findCoordinator even when the node is unavailable(as onClose in NetworkClientDelegate would be false), the respective request managers will handle the response when broker is unavailable and the process will complete as it was happening before this change.

  • Once these stages complete, we reach the end when the network thread itself needs to close with the remaining time on the closeTimeout.

  • Now, if there are any unsent requests with no node to connect to, we try to abort such requests as anyway we have to close the network thread and issuing a FindCoordinator(even if a node was available) is no longer useful beyond this point.

This was my line of thought, does this sound good?

Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Just one minor comment.

@AndrewJSchofield
Copy link
Member

@lianetm @kirktrue The direction this PR is heading in looks good to me. Please could you take a deeper look and see whether you agree.

Comment on lines +245 to +249
} else if (u.node.isEmpty() && onClose) {
log.debug("Removing unsent request {} because the client is closing", u);
iter.remove();
asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() - u.enqueueTimeMs());
u.handler.onFailure(currentTimeMs, Errors.NETWORK_EXCEPTION.exception());
Copy link
Member

@lianetm lianetm Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with this we're discarding any request we may have on close , if the node is disconnected, right?

Seems sensible if the node is the Coordinator, no concerns there as you explained (similar to what the classic does

// If coordinator is not known, requests are aborted.
and the root case of the console issue from what you explain)

But still thinking, could we be wrongly dropping other relevant requests on close, like close fetch sessions? (if let's say, the leader is briefly disconnected and comes back). The ClassicConsumer (and Async before this PR) give those the full closeTimeout

// Poll to ensure that request has been written to the socket. Wait until either the timer has expired or until
// all requests have received a response.
while (timer.notExpired() && !requestFutures.stream().allMatch(RequestFuture::isDone)) {
client.poll(timer, null, true);
timer.update();
}

wdyt? I could be missing something on how the flow would actually go in that case.

If we think this could be an issue, I wonder if we could address this at the Coordinator level, to avoid any undesired side-effect on other requests. The CoordinatorReqMgr knows when it's closing. Could we consider handling a FindCoordinator failures differently if we know we're closing? here

return unsentRequest.whenComplete((clientResponse, throwable) -> {
(not retrying it anymore). Would that work? (and by "closing" I refer to the actual var inside the CoordReqMgr, that is only flipped after committing)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @lianetm , thanks for the review.

with this we're discarding any request we may have on close , if the node is disconnected, right?

Not really, so we will still allow time for all pending requests (like commitSync or acknowledgements callback or findCoordinator) to complete, here onClose will be true only when we have completed waiting in those steps and reached at the step to close the network thread itself.

if (applicationEventHandler != null)
closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed shutting down network thread", firstException);
closeTimer.update();


But still thinking, could we be wrongly dropping other relevant requests on close, like close fetch sessions?

Again as mentioned above and in this comment - #19886 (comment), the boolean onClose would only be true when we have completed all the required steps(updating callbacks, closing out sessions, after sending stopFindCoordinatorOnCloseEvent, etc).
So at this stage, there would be no other requests that we would be waiting for(if there are any pending requests, they already had their timer expired or get a response as "broker disconnected", that's why this stage of closing network thread was reached).

So even if a broker came up again now and responded to a previous request (lets say commit response), we would not be updating callbacks(or any background events) anymore as in the application thread we have finished all processing and reached the stage of closing the application event handler itself.


The ClassicConsumer (and Async before this PR) give those the full closeTimeout

So the ClassicConsumer does give it full closeTimeout but only if the coordinator isn't null.

Node coordinator = checkAndGetCoordinator();
if (coordinator != null && !client.awaitPendingRequests(coordinator, timer))
log.warn("Close timed out with {} pending requests to coordinator, terminating client connections",
client.pendingRequestCount(coordinator));
}

I tried this out locally and and if we shutdown the broker first, the coordinator here is null, so it closes immediately.
I got this log also when consumer closed, so there is a check to check for disconnected nodes there too.

if (fetchTarget == null || isUnavailable(fetchTarget)) {
log.debug("Skip sending close session request to broker {} since it is not reachable", fetchTarget);
return;


Would that work? (and by "closing" I refer to the actual var inside the CoordReqMgr, that is only flipped after committing)

  • I don't think so, as this variable is set to true only on getting "StopFindCoordinatorOnCloseEvent".
  • But if a broker was shutdown before the client was shutdown, for both AsyncConsumer and ShareConsumer, this unsent request for findCoordinator(to a "null" node) still lingers around in NetworkClientDelegate and is retried until the timeout and the stopFindCoordinatorOnCloseEvent has no effect.

So the whenComplete will be reached only after the timeout of 30 seconds. So it would not help terminate the consumer as soon we hit ctrl-c.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern was regarding the leader to send the close fetch sessions to (unrelated to the coordinator), but the answer to my concern is indeed in the last bit of the comment above:

if (fetchTarget == null || isUnavailable(fetchTarget)) {
log.debug("Skip sending close session request to broker {} since it is not reachable", fetchTarget);

(don't attempt any re-discovery of the leader to send the close session)

So all good, thanks for addressing it!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, the logic in this PR is assuming the UnsentRequest represents a FIND_COORDINATOR RPC because it has an empty Node. I was curious if we could make the check a little more explicit, for example, in UnsentRequest:

public boolean isFindCoordinatorRequest() {
    return requestBuilder.apiKey() == ApiKeys.FIND_COORDINATOR;
}

And then the code here becomes more clear:

            } else if (u.isFindCoordinatorRequest() && onClose) {
                log.debug("Removing unsent request {} because the client is closing", u);
                iter.remove();
                asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() - u.enqueueTimeMs());
                u.handler.onFailure(currentTimeMs, Errors.NETWORK_EXCEPTION.exception());
            }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had another minor concern regarding the handling in CoordinatorRequestManager of the NetworkException that's passed to onFailure(). How does the CoordinatorRequestManager logic handle that error?

For example, does this result in potentially misleading logging? For example, in CoordinatorRequestManager.markCoordinatorUnknown(), there is some logging that states that Rediscovery will be attempted, which isn't really true.

Would it be better to pass a different exception type to onFailure() that we know the CoordinatorRequestManager will interpret correctly in this special case?

@ShivsundarR ShivsundarR requested a review from lianetm November 16, 2025 18:28
Copy link
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a minor comment on test coverage @ShivsundarR, thanks!

client.setUnreachable(node, REQUEST_TIMEOUT_MS);

// Poll with onClose=false
ncd.poll(0, time.milliseconds(), false);
Copy link
Member

@lianetm lianetm Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest we remove the false param here, so that we use the main call to poll() (that is supposed to pass false internally)

It's an important coverage I don't see we have right? It would be a nasty regression if we make the main poll use onClose true by mistake (could start to silently drop requests on node failures I imagine)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, that makes sense. I have updated the test now.

Copy link
Contributor

@kirktrue kirktrue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @ShivsundarR!

Would it be possible to add a couple of integration tests to validate the desired behavior of the consumer not hanging unnecessarily?

Thanks!

Comment on lines +245 to +249
} else if (u.node.isEmpty() && onClose) {
log.debug("Removing unsent request {} because the client is closing", u);
iter.remove();
asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() - u.enqueueTimeMs());
u.handler.onFailure(currentTimeMs, Errors.NETWORK_EXCEPTION.exception());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, the logic in this PR is assuming the UnsentRequest represents a FIND_COORDINATOR RPC because it has an empty Node. I was curious if we could make the check a little more explicit, for example, in UnsentRequest:

public boolean isFindCoordinatorRequest() {
    return requestBuilder.apiKey() == ApiKeys.FIND_COORDINATOR;
}

And then the code here becomes more clear:

            } else if (u.isFindCoordinatorRequest() && onClose) {
                log.debug("Removing unsent request {} because the client is closing", u);
                iter.remove();
                asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() - u.enqueueTimeMs());
                u.handler.onFailure(currentTimeMs, Errors.NETWORK_EXCEPTION.exception());
            }

Comment on lines +245 to +249
} else if (u.node.isEmpty() && onClose) {
log.debug("Removing unsent request {} because the client is closing", u);
iter.remove();
asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() - u.enqueueTimeMs());
u.handler.onFailure(currentTimeMs, Errors.NETWORK_EXCEPTION.exception());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had another minor concern regarding the handling in CoordinatorRequestManager of the NetworkException that's passed to onFailure(). How does the CoordinatorRequestManager logic handle that error?

For example, does this result in potentially misleading logging? For example, in CoordinatorRequestManager.markCoordinatorUnknown(), there is some logging that states that Rediscovery will be attempted, which isn't really true.

Would it be better to pass a different exception type to onFailure() that we know the CoordinatorRequestManager will interpret correctly in this special case?

@ShivsundarR
Copy link
Contributor Author

Thanks @kirktrue for the review,
I have added a couple of integration tests to test both share-consumers and regular consumers.

the logic in this PR is assuming the UnsentRequest represents a FIND_COORDINATOR RPC because it has an empty Node.

Yes that's true in a way but I was thinking the u.node.isEmpty() check represents the general idea of not issuing requests to an unknown node when we are closing the network thread, in future if we modify this to have other APIs, the idea will still work.
But yes as of now both ways work, I was thinking to retain the current code to make it behaviour specific, what do you think?

I had another minor concern regarding the handling in CoordinatorRequestManager of the NetworkException that's passed to onFailure(). How does the CoordinatorRequestManager logic handle that error?
For example, does this result in potentially misleading logging?

No, this does not lead to misleading logging, it does not enter the "Rediscovery" part in markCoordinatorUnknown as the coordinator is null.
Following are the debug logs I got when this the request is completed.

DEBUG [ShareConsumer clientId=consumer-group1-2, groupId=group1] Removing unsent request UnsentRequest{requestBuilder=FindCoordinatorRequestData(key='group1', keyType=0, coordinatorKeys=[]), handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@32e301da, node=Optional.empty, remainingMs=29251} because the client is closing (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate:246)

 DEBUG [ShareConsumer clientId=consumer-group1-2, groupId=group1] FindCoordinator request failed due to retriable exception (org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager:205)
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.

We just log that findCoordinator failed with a retriable exception and print the exception as well. So it should be fine I guess.

Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me.

Copy link
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

THanks @ShivsundarR , lgmt!
Builds are unstable with OOM #20917 (comment) , let's see here

@AndrewJSchofield AndrewJSchofield merged commit 64cb839 into apache:trunk Nov 20, 2025
20 checks passed
TaiJuWu pushed a commit to TaiJuWu/kafka that referenced this pull request Dec 3, 2025
…eConsumer (apache#19886)

https://issues.apache.org/jira/browse/KAFKA-17853  -

- There is an issue with the console share consumer where if the broker
is unavailable, even after force terminating using ctrl-c, the consumer
does not shut down immediately. It takes around ~30 seconds to close
once the broker shuts down.

- The console consumer on the other hand, was supposedly shutting down
immediately once we press ctrl-c. On reproducing the issue with a local
kafka server, I observed the issue was present in both the console
consumer and the console share consumer.

Issue :
- On seeing the client debug logs, this issue seemed related to network
thread sending repeated `FindCoordinator` requests until the timer
expired. This was happening in both the console-consumer and
console-share-consumer.

- Debug logs showed that when the broker is shut down, the heartbeat
fails with a `DisconnectException`(which is retriable), this triggers a
`findCoordinator` request on the network thread which retries until the
default timeout expires.

- This request is sent even before we trigger a close on the consumer,
so once we press ctrl-c, although the `ConsumerNetworkThread::close()`
is triggered, it waits for the default timeout until all the requests
are sent out for a graceful shutdown.

PR aims to fix this issue by adding a check in `NetworkClientDelegate`
to remove any pending unsent requests(with empty node values) during
close.   This would avoid unnecessary retries and the consumers would
shut down immediately upon termination.

Share consumers shutting down after the fix.
```
[2025-06-03 16:23:42,175] DEBUG [ShareConsumer
clientId=console-share-consumer, groupId=console-share-consumer]
Removing unsent request
UnsentRequest{requestBuilder=FindCoordinatorRequestData(key='console-share-consumer',
keyType=0, coordinatorKeys=[]),
handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@2b351de8,
node=Optional.empty, remainingMs=28565} because the client is closing
(org.apache.kafka.clients.consumer.internals.NetworkClientDelegate)
[2025-06-03 16:23:42,175] DEBUG [ShareConsumer
clientId=console-share-consumer, groupId=console-share-consumer]
FindCoordinator request failed due to retriable exception
(org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager)
org.apache.kafka.common.errors.NetworkException: The server disconnected
before a response was received.
[2025-06-03 16:23:42,176] DEBUG [ShareConsumer
clientId=console-share-consumer, groupId=console-share-consumer] Closing
RequestManagers
(org.apache.kafka.clients.consumer.internals.RequestManagers)
[2025-06-03 16:23:42,177] DEBUG [ShareConsumer
clientId=console-share-consumer, groupId=console-share-consumer]
RequestManagers has been closed
(org.apache.kafka.clients.consumer.internals.RequestManagers)
[2025-06-03 16:23:42,179] DEBUG [ShareConsumer
clientId=console-share-consumer, groupId=console-share-consumer] Closed
the consumer network thread
(org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread)
[2025-06-03 16:23:42,181] DEBUG [ShareConsumer
clientId=console-share-consumer, groupId=console-share-consumer] Kafka
share consumer has been closed
(org.apache.kafka.clients.consumer.internals.ShareConsumerImpl)
Processed a total of 0 messages
```

Regular consumers shutting down after the fix.

```
[2025-06-03 16:24:27,196] DEBUG [Consumer clientId=console-consumer,
groupId=console-consumer-5671] Removing unsent request
UnsentRequest{requestBuilder=FindCoordinatorRequestData(key='console-consumer-5671',
keyType=0, coordinatorKeys=[]),
handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@3770591b,
node=Optional.empty, remainingMs=29160} because the client is closing
(org.apache.kafka.clients.consumer.internals.NetworkClientDelegate)
[2025-06-03 16:24:27,196] DEBUG [Consumer clientId=console-consumer,
groupId=console-consumer-5671] FindCoordinator request failed due to
retriable exception
(org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager)
org.apache.kafka.common.errors.NetworkException: The server disconnected
before a response was received.
[2025-06-03 16:24:27,197] DEBUG [Consumer clientId=console-consumer,
groupId=console-consumer-5671] Closing RequestManagers
(org.apache.kafka.clients.consumer.internals.RequestManagers)
[2025-06-03 16:24:27,197] DEBUG [Consumer clientId=console-consumer,
groupId=console-consumer-5671] Removing test-topic-23-0 from buffered
fetch data as it is not in the set of partitions to retain ([])
(org.apache.kafka.clients.consumer.internals.FetchBuffer)
[2025-06-03 16:24:27,197] DEBUG [Consumer clientId=console-consumer,
groupId=console-consumer-5671] RequestManagers has been closed
(org.apache.kafka.clients.consumer.internals.RequestManagers)
[2025-06-03 16:24:27,200] DEBUG [Consumer clientId=console-consumer,
groupId=console-consumer-5671] Closed the consumer network thread
(org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread)
[2025-06-03 16:24:27,202] DEBUG [Consumer clientId=console-consumer,
groupId=console-consumer-5671] Kafka consumer has been closed
(org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer)
Processed a total of 0 messages
```

Reviewers: Lianet Magrans <[email protected]>, Kirk True
 <[email protected]>, Andrew Schofield <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants