-
Notifications
You must be signed in to change notification settings - Fork 14.8k
KAFKA-17853: Fix termination issue in ConsoleConsumer and ConsoleShareConsumer #19886
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
23495a0
1fd86c2
01993c9
58b81d8
fe4820a
ea0aff1
f1d99ec
ac967c7
9f41674
7b6c848
74e8f26
50f4bf3
dde9365
1ee27ca
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -34,6 +34,7 @@ | |||||||||||||||||||||||||||||||||||||||||||
| import org.apache.kafka.common.errors.TimeoutException; | ||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.kafka.common.metrics.Metrics; | ||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.kafka.common.metrics.Sensor; | ||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.kafka.common.protocol.Errors; | ||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.kafka.common.requests.AbstractRequest; | ||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender; | ||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.kafka.common.utils.LogContext; | ||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -137,13 +138,25 @@ public void tryConnect(Node node) { | |||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||
| * Returns the responses of the sent requests. This method will try to send the unsent requests, poll for responses, | ||||||||||||||||||||||||||||||||||||||||||||
| * This method will try to send the unsent requests, poll for responses, | ||||||||||||||||||||||||||||||||||||||||||||
| * and check the disconnected nodes. | ||||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||||
| * @param timeoutMs timeout time | ||||||||||||||||||||||||||||||||||||||||||||
| * @param currentTimeMs current time | ||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||
| public void poll(final long timeoutMs, final long currentTimeMs) { | ||||||||||||||||||||||||||||||||||||||||||||
| poll(timeoutMs, currentTimeMs, false); | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||
| * This method will try to send the unsent requests, poll for responses, | ||||||||||||||||||||||||||||||||||||||||||||
| * and check the disconnected nodes. | ||||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||||
| * @param timeoutMs timeout time | ||||||||||||||||||||||||||||||||||||||||||||
| * @param currentTimeMs current time | ||||||||||||||||||||||||||||||||||||||||||||
| * @param onClose True when the network thread is closing. | ||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||
| public void poll(final long timeoutMs, final long currentTimeMs, boolean onClose) { | ||||||||||||||||||||||||||||||||||||||||||||
| trySend(currentTimeMs); | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| long pollTimeoutMs = timeoutMs; | ||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -152,7 +165,7 @@ public void poll(final long timeoutMs, final long currentTimeMs) { | |||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
| this.client.poll(pollTimeoutMs, currentTimeMs); | ||||||||||||||||||||||||||||||||||||||||||||
| maybePropagateMetadataError(); | ||||||||||||||||||||||||||||||||||||||||||||
| checkDisconnects(currentTimeMs); | ||||||||||||||||||||||||||||||||||||||||||||
| checkDisconnects(currentTimeMs, onClose); | ||||||||||||||||||||||||||||||||||||||||||||
| asyncConsumerMetrics.recordUnsentRequestsQueueSize(unsentRequests.size(), currentTimeMs); | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -219,7 +232,7 @@ boolean doSend(final UnsentRequest r, final long currentTimeMs) { | |||||||||||||||||||||||||||||||||||||||||||
| return true; | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| protected void checkDisconnects(final long currentTimeMs) { | ||||||||||||||||||||||||||||||||||||||||||||
| protected void checkDisconnects(final long currentTimeMs, boolean onClose) { | ||||||||||||||||||||||||||||||||||||||||||||
| // Check the connection of the unsent request. Disconnect the disconnected node if it is unable to be connected. | ||||||||||||||||||||||||||||||||||||||||||||
| Iterator<UnsentRequest> iter = unsentRequests.iterator(); | ||||||||||||||||||||||||||||||||||||||||||||
| while (iter.hasNext()) { | ||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -229,6 +242,11 @@ protected void checkDisconnects(final long currentTimeMs) { | |||||||||||||||||||||||||||||||||||||||||||
| asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() - u.enqueueTimeMs()); | ||||||||||||||||||||||||||||||||||||||||||||
| AuthenticationException authenticationException = client.authenticationException(u.node.get()); | ||||||||||||||||||||||||||||||||||||||||||||
| u.handler.onFailure(currentTimeMs, authenticationException); | ||||||||||||||||||||||||||||||||||||||||||||
| } else if (u.node.isEmpty() && onClose) { | ||||||||||||||||||||||||||||||||||||||||||||
| log.debug("Removing unsent request {} because the client is closing", u); | ||||||||||||||||||||||||||||||||||||||||||||
| iter.remove(); | ||||||||||||||||||||||||||||||||||||||||||||
| asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() - u.enqueueTimeMs()); | ||||||||||||||||||||||||||||||||||||||||||||
| u.handler.onFailure(currentTimeMs, Errors.NETWORK_EXCEPTION.exception()); | ||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+245
to
+249
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. with this we're discarding any request we may have on close , if the node is disconnected, right? Seems sensible if the node is the Coordinator, no concerns there as you explained (similar to what the classic does kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java Line 1142 in b58aae2
But still thinking, could we be wrongly dropping other relevant requests on close, like close fetch sessions? (if let's say, the leader is briefly disconnected and comes back). The ClassicConsumer (and Async before this PR) give those the full closeTimeout kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java Lines 129 to 134 in b58aae2
wdyt? I could be missing something on how the flow would actually go in that case. If we think this could be an issue, I wonder if we could address this at the Coordinator level, to avoid any undesired side-effect on other requests. The CoordinatorReqMgr knows when it's Line 123 in b58aae2
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @lianetm , thanks for the review.
Not really, so we will still allow time for all pending requests (like kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java Lines 979 to 981 in 0f4dbf7
Again as mentioned above and in this comment - #19886 (comment), the boolean So even if a broker came up again now and responded to a previous request (lets say commit response), we would not be updating callbacks(or any background events) anymore as in the application thread we have finished all processing and reached the stage of closing the application event handler itself.
So the kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java Lines 1143 to 1147 in b58aae2
I tried this out locally and and if we shutdown the broker first, the coordinator here is null, so it closes immediately. kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java Lines 406 to 408 in 0f4dbf7
So the
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My concern was regarding the leader to send the close fetch sessions to (unrelated to the coordinator), but the answer to my concern is indeed in the last bit of the comment above: kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java Lines 406 to 407 in 33c5e84
(don't attempt any re-discovery of the leader to send the close session) So all good, thanks for addressing it!
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IIUC, the logic in this PR is assuming the public boolean isFindCoordinatorRequest() {
return requestBuilder.apiKey() == ApiKeys.FIND_COORDINATOR;
}And then the code here becomes more clear: } else if (u.isFindCoordinatorRequest() && onClose) {
log.debug("Removing unsent request {} because the client is closing", u);
iter.remove();
asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() - u.enqueueTimeMs());
u.handler.onFailure(currentTimeMs, Errors.NETWORK_EXCEPTION.exception());
}
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had another minor concern regarding the handling in For example, does this result in potentially misleading logging? For example, in Would it be better to pass a different exception type to |
||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
Uh oh!
There was an error while loading. Please reload this page.