-
Notifications
You must be signed in to change notification settings - Fork 14.8k
KAFKA-17853: Fix termination issue in ConsoleConsumer and ConsoleShareConsumer #19886
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
AndrewJSchofield
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. I like the approach but want to review more deeply. A few comments to address.
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
Outdated
Show resolved
Hide resolved
|
Great to see the fix in the area, thanks for looking into this. |
|
Another edge case here: |
|
Hey, thanks for looking into this! Sorry I haven't had the time to look in detail, but just couple of high level comments:
This means "close with default close timeout of 30s". So in a way, it's not unexpected that it waits right?. We should ensure that's what we want from the console consumers. Ex. calling |
|
Hi @lianetm, thanks for the review.
|
|
Thanks for the PR, @ShivsundarR. Since this change affects all consumer use cases, we need to tread carefully 😄 The console consumer is closing the underlying consumer with the default That said, the Is there an approach by which we can "prove" that continuing to make requests is pointless? |
|
Hi @kirktrue, thanks for the review.
This was my line of thought, does this sound good? |
AndrewJSchofield
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Just one minor comment.
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
Outdated
Show resolved
Hide resolved
| } else if (u.node.isEmpty() && onClose) { | ||
| log.debug("Removing unsent request {} because the client is closing", u); | ||
| iter.remove(); | ||
| asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() - u.enqueueTimeMs()); | ||
| u.handler.onFailure(currentTimeMs, Errors.NETWORK_EXCEPTION.exception()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with this we're discarding any request we may have on close , if the node is disconnected, right?
Seems sensible if the node is the Coordinator, no concerns there as you explained (similar to what the classic does
kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
Line 1142 in b58aae2
| // If coordinator is not known, requests are aborted. |
But still thinking, could we be wrongly dropping other relevant requests on close, like close fetch sessions? (if let's say, the leader is briefly disconnected and comes back). The ClassicConsumer (and Async before this PR) give those the full closeTimeout
kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
Lines 129 to 134 in b58aae2
| // Poll to ensure that request has been written to the socket. Wait until either the timer has expired or until | |
| // all requests have received a response. | |
| while (timer.notExpired() && !requestFutures.stream().allMatch(RequestFuture::isDone)) { | |
| client.poll(timer, null, true); | |
| timer.update(); | |
| } |
wdyt? I could be missing something on how the flow would actually go in that case.
If we think this could be an issue, I wonder if we could address this at the Coordinator level, to avoid any undesired side-effect on other requests. The CoordinatorReqMgr knows when it's closing. Could we consider handling a FindCoordinator failures differently if we know we're closing? here
Line 123 in b58aae2
| return unsentRequest.whenComplete((clientResponse, throwable) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @lianetm , thanks for the review.
with this we're discarding any request we may have on close , if the node is disconnected, right?
Not really, so we will still allow time for all pending requests (like commitSync or acknowledgements callback or findCoordinator) to complete, here onClose will be true only when we have completed waiting in those steps and reached at the step to close the network thread itself.
kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
Lines 979 to 981 in 0f4dbf7
| if (applicationEventHandler != null) | |
| closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed shutting down network thread", firstException); | |
| closeTimer.update(); |
But still thinking, could we be wrongly dropping other relevant requests on close, like close fetch sessions?
Again as mentioned above and in this comment - #19886 (comment), the boolean onClose would only be true when we have completed all the required steps(updating callbacks, closing out sessions, after sending stopFindCoordinatorOnCloseEvent, etc).
So at this stage, there would be no other requests that we would be waiting for(if there are any pending requests, they already had their timer expired or get a response as "broker disconnected", that's why this stage of closing network thread was reached).
So even if a broker came up again now and responded to a previous request (lets say commit response), we would not be updating callbacks(or any background events) anymore as in the application thread we have finished all processing and reached the stage of closing the application event handler itself.
The ClassicConsumer (and Async before this PR) give those the full closeTimeout
So the ClassicConsumer does give it full closeTimeout but only if the coordinator isn't null.
kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
Lines 1143 to 1147 in b58aae2
| Node coordinator = checkAndGetCoordinator(); | |
| if (coordinator != null && !client.awaitPendingRequests(coordinator, timer)) | |
| log.warn("Close timed out with {} pending requests to coordinator, terminating client connections", | |
| client.pendingRequestCount(coordinator)); | |
| } |
I tried this out locally and and if we shutdown the broker first, the coordinator here is null, so it closes immediately.
I got this log also when consumer closed, so there is a check to check for disconnected nodes there too.
kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
Lines 406 to 408 in 0f4dbf7
| if (fetchTarget == null || isUnavailable(fetchTarget)) { | |
| log.debug("Skip sending close session request to broker {} since it is not reachable", fetchTarget); | |
| return; |
Would that work? (and by "closing" I refer to the actual var inside the CoordReqMgr, that is only flipped after committing)
- I don't think so, as this variable is set to true only on getting "
StopFindCoordinatorOnCloseEvent". - But if a broker was shutdown before the client was shutdown, for both
AsyncConsumerandShareConsumer, this unsent request for findCoordinator(to a "null" node) still lingers around inNetworkClientDelegateand is retried until the timeout and thestopFindCoordinatorOnCloseEventhas no effect.
So the whenComplete will be reached only after the timeout of 30 seconds. So it would not help terminate the consumer as soon we hit ctrl-c.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My concern was regarding the leader to send the close fetch sessions to (unrelated to the coordinator), but the answer to my concern is indeed in the last bit of the comment above:
kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
Lines 406 to 407 in 33c5e84
| if (fetchTarget == null || isUnavailable(fetchTarget)) { | |
| log.debug("Skip sending close session request to broker {} since it is not reachable", fetchTarget); |
(don't attempt any re-discovery of the leader to send the close session)
So all good, thanks for addressing it!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, the logic in this PR is assuming the UnsentRequest represents a FIND_COORDINATOR RPC because it has an empty Node. I was curious if we could make the check a little more explicit, for example, in UnsentRequest:
public boolean isFindCoordinatorRequest() {
return requestBuilder.apiKey() == ApiKeys.FIND_COORDINATOR;
}And then the code here becomes more clear:
} else if (u.isFindCoordinatorRequest() && onClose) {
log.debug("Removing unsent request {} because the client is closing", u);
iter.remove();
asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() - u.enqueueTimeMs());
u.handler.onFailure(currentTimeMs, Errors.NETWORK_EXCEPTION.exception());
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had another minor concern regarding the handling in CoordinatorRequestManager of the NetworkException that's passed to onFailure(). How does the CoordinatorRequestManager logic handle that error?
For example, does this result in potentially misleading logging? For example, in CoordinatorRequestManager.markCoordinatorUnknown(), there is some logging that states that Rediscovery will be attempted, which isn't really true.
Would it be better to pass a different exception type to onFailure() that we know the CoordinatorRequestManager will interpret correctly in this special case?
lianetm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a minor comment on test coverage @ShivsundarR, thanks!
| client.setUnreachable(node, REQUEST_TIMEOUT_MS); | ||
|
|
||
| // Poll with onClose=false | ||
| ncd.poll(0, time.milliseconds(), false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest we remove the false param here, so that we use the main call to poll() (that is supposed to pass false internally)
It's an important coverage I don't see we have right? It would be a nasty regression if we make the main poll use onClose true by mistake (could start to silently drop requests on node failures I imagine)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, that makes sense. I have updated the test now.
kirktrue
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @ShivsundarR!
Would it be possible to add a couple of integration tests to validate the desired behavior of the consumer not hanging unnecessarily?
Thanks!
| } else if (u.node.isEmpty() && onClose) { | ||
| log.debug("Removing unsent request {} because the client is closing", u); | ||
| iter.remove(); | ||
| asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() - u.enqueueTimeMs()); | ||
| u.handler.onFailure(currentTimeMs, Errors.NETWORK_EXCEPTION.exception()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, the logic in this PR is assuming the UnsentRequest represents a FIND_COORDINATOR RPC because it has an empty Node. I was curious if we could make the check a little more explicit, for example, in UnsentRequest:
public boolean isFindCoordinatorRequest() {
return requestBuilder.apiKey() == ApiKeys.FIND_COORDINATOR;
}And then the code here becomes more clear:
} else if (u.isFindCoordinatorRequest() && onClose) {
log.debug("Removing unsent request {} because the client is closing", u);
iter.remove();
asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() - u.enqueueTimeMs());
u.handler.onFailure(currentTimeMs, Errors.NETWORK_EXCEPTION.exception());
}| } else if (u.node.isEmpty() && onClose) { | ||
| log.debug("Removing unsent request {} because the client is closing", u); | ||
| iter.remove(); | ||
| asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() - u.enqueueTimeMs()); | ||
| u.handler.onFailure(currentTimeMs, Errors.NETWORK_EXCEPTION.exception()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had another minor concern regarding the handling in CoordinatorRequestManager of the NetworkException that's passed to onFailure(). How does the CoordinatorRequestManager logic handle that error?
For example, does this result in potentially misleading logging? For example, in CoordinatorRequestManager.markCoordinatorUnknown(), there is some logging that states that Rediscovery will be attempted, which isn't really true.
Would it be better to pass a different exception type to onFailure() that we know the CoordinatorRequestManager will interpret correctly in this special case?
|
Thanks @kirktrue for the review,
Yes that's true in a way but I was thinking the
No, this does not lead to misleading logging, it does not enter the "Rediscovery" part in We just log that |
AndrewJSchofield
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me.
lianetm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
THanks @ShivsundarR , lgmt!
Builds are unstable with OOM #20917 (comment) , let's see here
…eConsumer (apache#19886) https://issues.apache.org/jira/browse/KAFKA-17853 - - There is an issue with the console share consumer where if the broker is unavailable, even after force terminating using ctrl-c, the consumer does not shut down immediately. It takes around ~30 seconds to close once the broker shuts down. - The console consumer on the other hand, was supposedly shutting down immediately once we press ctrl-c. On reproducing the issue with a local kafka server, I observed the issue was present in both the console consumer and the console share consumer. Issue : - On seeing the client debug logs, this issue seemed related to network thread sending repeated `FindCoordinator` requests until the timer expired. This was happening in both the console-consumer and console-share-consumer. - Debug logs showed that when the broker is shut down, the heartbeat fails with a `DisconnectException`(which is retriable), this triggers a `findCoordinator` request on the network thread which retries until the default timeout expires. - This request is sent even before we trigger a close on the consumer, so once we press ctrl-c, although the `ConsumerNetworkThread::close()` is triggered, it waits for the default timeout until all the requests are sent out for a graceful shutdown. PR aims to fix this issue by adding a check in `NetworkClientDelegate` to remove any pending unsent requests(with empty node values) during close. This would avoid unnecessary retries and the consumers would shut down immediately upon termination. Share consumers shutting down after the fix. ``` [2025-06-03 16:23:42,175] DEBUG [ShareConsumer clientId=console-share-consumer, groupId=console-share-consumer] Removing unsent request UnsentRequest{requestBuilder=FindCoordinatorRequestData(key='console-share-consumer', keyType=0, coordinatorKeys=[]), handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@2b351de8, node=Optional.empty, remainingMs=28565} because the client is closing (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate) [2025-06-03 16:23:42,175] DEBUG [ShareConsumer clientId=console-share-consumer, groupId=console-share-consumer] FindCoordinator request failed due to retriable exception (org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager) org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received. [2025-06-03 16:23:42,176] DEBUG [ShareConsumer clientId=console-share-consumer, groupId=console-share-consumer] Closing RequestManagers (org.apache.kafka.clients.consumer.internals.RequestManagers) [2025-06-03 16:23:42,177] DEBUG [ShareConsumer clientId=console-share-consumer, groupId=console-share-consumer] RequestManagers has been closed (org.apache.kafka.clients.consumer.internals.RequestManagers) [2025-06-03 16:23:42,179] DEBUG [ShareConsumer clientId=console-share-consumer, groupId=console-share-consumer] Closed the consumer network thread (org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread) [2025-06-03 16:23:42,181] DEBUG [ShareConsumer clientId=console-share-consumer, groupId=console-share-consumer] Kafka share consumer has been closed (org.apache.kafka.clients.consumer.internals.ShareConsumerImpl) Processed a total of 0 messages ``` Regular consumers shutting down after the fix. ``` [2025-06-03 16:24:27,196] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-5671] Removing unsent request UnsentRequest{requestBuilder=FindCoordinatorRequestData(key='console-consumer-5671', keyType=0, coordinatorKeys=[]), handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@3770591b, node=Optional.empty, remainingMs=29160} because the client is closing (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate) [2025-06-03 16:24:27,196] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-5671] FindCoordinator request failed due to retriable exception (org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager) org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received. [2025-06-03 16:24:27,197] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-5671] Closing RequestManagers (org.apache.kafka.clients.consumer.internals.RequestManagers) [2025-06-03 16:24:27,197] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-5671] Removing test-topic-23-0 from buffered fetch data as it is not in the set of partitions to retain ([]) (org.apache.kafka.clients.consumer.internals.FetchBuffer) [2025-06-03 16:24:27,197] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-5671] RequestManagers has been closed (org.apache.kafka.clients.consumer.internals.RequestManagers) [2025-06-03 16:24:27,200] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-5671] Closed the consumer network thread (org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread) [2025-06-03 16:24:27,202] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-5671] Kafka consumer has been closed (org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer) Processed a total of 0 messages ``` Reviewers: Lianet Magrans <[email protected]>, Kirk True <[email protected]>, Andrew Schofield <[email protected]>
https://issues.apache.org/jira/browse/KAFKA-17853 -
There is an issue with the console share consumer where if the broker
is unavailable, even after force terminating using ctrl-c, the consumer
does not shut down immediately. It takes around ~30 seconds to close
once the broker shuts down.
The console consumer on the other hand, was supposedly shutting down
immediately once we press ctrl-c. On reproducing the issue with a local
kafka server, I observed the issue was present in both the console
consumer and the console share consumer.
Issue :
On seeing the client debug logs, this issue seemed related to network
thread sending repeated
FindCoordinatorrequests until the timerexpired. This was happening in both the console-consumer and
console-share-consumer.
Debug logs showed that when the broker is shut down, the heartbeat
fails with a
DisconnectException(which is retriable), this triggers afindCoordinatorrequest on the network thread which retries until thedefault timeout expires.
This request is sent even before we trigger a close on the consumer,
so once we press ctrl-c, although the
ConsumerNetworkThread::close()is triggered, it waits for the default timeout until all the requests
are sent out for a graceful shutdown.
PR aims to fix this issue by adding a check in
NetworkClientDelegateto remove any pending unsent requests(with empty node values) during
close. This would avoid unnecessary retries and the consumers would
shut down immediately upon termination.
Share consumers shutting down after the fix.
Regular consumers shutting down after the fix.
Reviewers: Lianet Magrans [email protected], Kirk True
[email protected], Andrew Schofield [email protected]