Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.CLIENT_ID_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_INSTANCE_ID_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
Expand All @@ -109,6 +110,7 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

Expand Down Expand Up @@ -181,6 +183,63 @@ public void testAsyncConsumeCoordinatorFailover() throws InterruptedException {
testCoordinatorFailover(cluster, config);
}

@ClusterTest(
brokers = 1,
serverProperties = {
@ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"),
@ClusterConfigProperty(key = "transaction.state.log.replication.factor", value = "1"),
@ClusterConfigProperty(key = "transaction.state.log.min.isr", value = "1")
}
)
public void testClassicConsumerCloseOnBrokerShutdown() {
Map<String, Object> config = Map.of(
GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
);
testConsumerCloseOnBrokerShutdown(config);
}

@ClusterTest(
brokers = 1,
serverProperties = {
@ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"),
@ClusterConfigProperty(key = "transaction.state.log.replication.factor", value = "1"),
@ClusterConfigProperty(key = "transaction.state.log.min.isr", value = "1")
}
)
public void testAsyncConsumerCloseOnBrokerShutdown() {
Map<String, Object> config = Map.of(
GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
ENABLE_AUTO_COMMIT_CONFIG, false
);
// Disabling auto commit so that commitSync() does not block the close timeout.
testConsumerCloseOnBrokerShutdown(config);
}

private void testConsumerCloseOnBrokerShutdown(Map<String, Object> consumerConfig) {
try (Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig)) {
consumer.subscribe(List.of(TOPIC));

// Force consumer to discover coordinator by doing a poll
// This ensures coordinator is discovered before we shutdown the broker
consumer.poll(Duration.ofMillis(100));

// Now shutdown broker.
assertEquals(1, cluster.brokers().size());
KafkaBroker broker = cluster.brokers().get(0);
cluster.shutdownBroker(0);
broker.awaitShutdown();

// Do another poll to force the consumer to retry finding the coordinator.
consumer.poll(Duration.ofMillis(100));

// Close should not hang waiting for retries when broker is already down
assertTimeoutPreemptively(Duration.ofSeconds(5), () -> consumer.close(),
"Consumer close should not wait for full timeout when broker is already shutdown");
}
}

@ClusterTest
public void testClassicConsumerHeaders() throws Exception {
testHeaders(Map.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

Expand Down Expand Up @@ -1252,6 +1253,28 @@ public void testMultipleConsumersWithDifferentGroupIds() throws InterruptedExcep
}
}

@ClusterTest
public void testConsumerCloseOnBrokerShutdown() {
alterShareAutoOffsetReset("group1", "earliest");
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1");
shareConsumer.subscribe(Set.of(tp.topic()));

// To ensure coordinator discovery is complete before shutting down the broker
shareConsumer.poll(Duration.ofMillis(100));

// Shutdown the broker.
assertEquals(1, cluster.brokers().size());
KafkaBroker broker = cluster.brokers().get(0);
cluster.shutdownBroker(0);

broker.awaitShutdown();

// Assert that close completes in less than 5 seconds, not the full 30-second timeout.
assertTimeoutPreemptively(Duration.ofSeconds(5), () -> {
shareConsumer.close();
}, "Consumer close should not wait for full timeout when broker is already shutdown");
}

@ClusterTest
public void testMultipleConsumersInGroupSequentialConsumption() {
alterShareAutoOffsetReset("group1", "earliest");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ private void sendUnsentRequests(final Timer timer) {
return;

do {
networkClientDelegate.poll(timer.remainingMs(), timer.currentTimeMs());
networkClientDelegate.poll(timer.remainingMs(), timer.currentTimeMs(), true);
timer.update();
} while (timer.notExpired() && networkClientDelegate.hasAnyPendingRequests());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
import org.apache.kafka.common.utils.LogContext;
Expand Down Expand Up @@ -137,13 +138,25 @@ public void tryConnect(Node node) {
}

/**
* Returns the responses of the sent requests. This method will try to send the unsent requests, poll for responses,
* This method will try to send the unsent requests, poll for responses,
* and check the disconnected nodes.
*
* @param timeoutMs timeout time
* @param currentTimeMs current time
*/
public void poll(final long timeoutMs, final long currentTimeMs) {
poll(timeoutMs, currentTimeMs, false);
}

/**
* This method will try to send the unsent requests, poll for responses,
* and check the disconnected nodes.
*
* @param timeoutMs timeout time
* @param currentTimeMs current time
* @param onClose True when the network thread is closing.
*/
public void poll(final long timeoutMs, final long currentTimeMs, boolean onClose) {
trySend(currentTimeMs);

long pollTimeoutMs = timeoutMs;
Expand All @@ -152,7 +165,7 @@ public void poll(final long timeoutMs, final long currentTimeMs) {
}
this.client.poll(pollTimeoutMs, currentTimeMs);
maybePropagateMetadataError();
checkDisconnects(currentTimeMs);
checkDisconnects(currentTimeMs, onClose);
asyncConsumerMetrics.recordUnsentRequestsQueueSize(unsentRequests.size(), currentTimeMs);
}

Expand Down Expand Up @@ -219,7 +232,7 @@ boolean doSend(final UnsentRequest r, final long currentTimeMs) {
return true;
}

protected void checkDisconnects(final long currentTimeMs) {
protected void checkDisconnects(final long currentTimeMs, boolean onClose) {
// Check the connection of the unsent request. Disconnect the disconnected node if it is unable to be connected.
Iterator<UnsentRequest> iter = unsentRequests.iterator();
while (iter.hasNext()) {
Expand All @@ -229,6 +242,11 @@ protected void checkDisconnects(final long currentTimeMs) {
asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() - u.enqueueTimeMs());
AuthenticationException authenticationException = client.authenticationException(u.node.get());
u.handler.onFailure(currentTimeMs, authenticationException);
} else if (u.node.isEmpty() && onClose) {
log.debug("Removing unsent request {} because the client is closing", u);
iter.remove();
asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() - u.enqueueTimeMs());
u.handler.onFailure(currentTimeMs, Errors.NETWORK_EXCEPTION.exception());
Comment on lines +245 to +249
Copy link
Member

@lianetm lianetm Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with this we're discarding any request we may have on close , if the node is disconnected, right?

Seems sensible if the node is the Coordinator, no concerns there as you explained (similar to what the classic does

// If coordinator is not known, requests are aborted.
and the root case of the console issue from what you explain)

But still thinking, could we be wrongly dropping other relevant requests on close, like close fetch sessions? (if let's say, the leader is briefly disconnected and comes back). The ClassicConsumer (and Async before this PR) give those the full closeTimeout

// Poll to ensure that request has been written to the socket. Wait until either the timer has expired or until
// all requests have received a response.
while (timer.notExpired() && !requestFutures.stream().allMatch(RequestFuture::isDone)) {
client.poll(timer, null, true);
timer.update();
}

wdyt? I could be missing something on how the flow would actually go in that case.

If we think this could be an issue, I wonder if we could address this at the Coordinator level, to avoid any undesired side-effect on other requests. The CoordinatorReqMgr knows when it's closing. Could we consider handling a FindCoordinator failures differently if we know we're closing? here

return unsentRequest.whenComplete((clientResponse, throwable) -> {
(not retrying it anymore). Would that work? (and by "closing" I refer to the actual var inside the CoordReqMgr, that is only flipped after committing)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @lianetm , thanks for the review.

with this we're discarding any request we may have on close , if the node is disconnected, right?

Not really, so we will still allow time for all pending requests (like commitSync or acknowledgements callback or findCoordinator) to complete, here onClose will be true only when we have completed waiting in those steps and reached at the step to close the network thread itself.

if (applicationEventHandler != null)
closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed shutting down network thread", firstException);
closeTimer.update();


But still thinking, could we be wrongly dropping other relevant requests on close, like close fetch sessions?

Again as mentioned above and in this comment - #19886 (comment), the boolean onClose would only be true when we have completed all the required steps(updating callbacks, closing out sessions, after sending stopFindCoordinatorOnCloseEvent, etc).
So at this stage, there would be no other requests that we would be waiting for(if there are any pending requests, they already had their timer expired or get a response as "broker disconnected", that's why this stage of closing network thread was reached).

So even if a broker came up again now and responded to a previous request (lets say commit response), we would not be updating callbacks(or any background events) anymore as in the application thread we have finished all processing and reached the stage of closing the application event handler itself.


The ClassicConsumer (and Async before this PR) give those the full closeTimeout

So the ClassicConsumer does give it full closeTimeout but only if the coordinator isn't null.

Node coordinator = checkAndGetCoordinator();
if (coordinator != null && !client.awaitPendingRequests(coordinator, timer))
log.warn("Close timed out with {} pending requests to coordinator, terminating client connections",
client.pendingRequestCount(coordinator));
}

I tried this out locally and and if we shutdown the broker first, the coordinator here is null, so it closes immediately.
I got this log also when consumer closed, so there is a check to check for disconnected nodes there too.

if (fetchTarget == null || isUnavailable(fetchTarget)) {
log.debug("Skip sending close session request to broker {} since it is not reachable", fetchTarget);
return;


Would that work? (and by "closing" I refer to the actual var inside the CoordReqMgr, that is only flipped after committing)

  • I don't think so, as this variable is set to true only on getting "StopFindCoordinatorOnCloseEvent".
  • But if a broker was shutdown before the client was shutdown, for both AsyncConsumer and ShareConsumer, this unsent request for findCoordinator(to a "null" node) still lingers around in NetworkClientDelegate and is retried until the timeout and the stopFindCoordinatorOnCloseEvent has no effect.

So the whenComplete will be reached only after the timeout of 30 seconds. So it would not help terminate the consumer as soon we hit ctrl-c.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern was regarding the leader to send the close fetch sessions to (unrelated to the coordinator), but the answer to my concern is indeed in the last bit of the comment above:

if (fetchTarget == null || isUnavailable(fetchTarget)) {
log.debug("Skip sending close session request to broker {} since it is not reachable", fetchTarget);

(don't attempt any re-discovery of the leader to send the close session)

So all good, thanks for addressing it!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, the logic in this PR is assuming the UnsentRequest represents a FIND_COORDINATOR RPC because it has an empty Node. I was curious if we could make the check a little more explicit, for example, in UnsentRequest:

public boolean isFindCoordinatorRequest() {
    return requestBuilder.apiKey() == ApiKeys.FIND_COORDINATOR;
}

And then the code here becomes more clear:

            } else if (u.isFindCoordinatorRequest() && onClose) {
                log.debug("Removing unsent request {} because the client is closing", u);
                iter.remove();
                asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() - u.enqueueTimeMs());
                u.handler.onFailure(currentTimeMs, Errors.NETWORK_EXCEPTION.exception());
            }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had another minor concern regarding the handling in CoordinatorRequestManager of the NetworkException that's passed to onFailure(). How does the CoordinatorRequestManager logic handle that error?

For example, does this result in potentially misleading logging? For example, in CoordinatorRequestManager.markCoordinatorUnknown(), there is some logging that states that Rediscovery will be attempted, which isn't really true.

Would it be better to pass a different exception type to onFailure() that we know the CoordinatorRequestManager will interpret correctly in this special case?

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
@Override
public PollResult poll(long currentTimeMs) {
if (memberId == null) {
if (closing && !closeFuture.isDone()) {
closeFuture.complete(null);
}
return PollResult.EMPTY;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentMatchers;

import java.time.Duration;
import java.util.LinkedList;
Expand Down Expand Up @@ -203,7 +204,7 @@ public void testRunOnceInvokesReaper() {
public void testSendUnsentRequests() {
when(networkClientDelegate.hasAnyPendingRequests()).thenReturn(true).thenReturn(true).thenReturn(false);
consumerNetworkThread.cleanup();
verify(networkClientDelegate, times(2)).poll(anyLong(), anyLong());
verify(networkClientDelegate, times(2)).poll(anyLong(), anyLong(), ArgumentMatchers.booleanThat(onClose -> onClose));
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4266,7 +4266,7 @@ private List<UnsentRequest> removeUnsentRequestByNode(Node node) {
}

@Override
protected void checkDisconnects(final long currentTimeMs) {
protected void checkDisconnects(final long currentTimeMs, boolean onClose) {
// any disconnects affecting requests that have already been transmitted will be handled
// by NetworkClient, so we just need to check whether connections for any of the unsent
// requests have been disconnected; if they have, then we complete the corresponding future
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.metrics.Metrics;
Expand Down Expand Up @@ -282,6 +283,51 @@ public void testRecordUnsentRequestsQueueTime(String groupName) throws Exception
}
}

@Test
public void testPollWithOnClose() throws Exception {
try (NetworkClientDelegate ncd = newNetworkClientDelegate(false)) {
NetworkClientDelegate.UnsentRequest unsentRequest = newUnsentFindCoordinatorRequest();
ncd.add(unsentRequest);

// First poll without onClose
ncd.poll(0, time.milliseconds());
assertTrue(ncd.hasAnyPendingRequests());

// Poll with onClose=true
ncd.poll(0, time.milliseconds(), true);
assertTrue(ncd.hasAnyPendingRequests());

// Complete the request
client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, GROUP_ID, mockNode()));
ncd.poll(0, time.milliseconds(), true);
assertFalse(ncd.hasAnyPendingRequests());
}
}

@Test
public void testCheckDisconnectsWithOnClose() throws Exception {
try (NetworkClientDelegate ncd = newNetworkClientDelegate(false)) {
NetworkClientDelegate.UnsentRequest unsentRequest = newUnsentFindCoordinatorRequest();
ncd.add(unsentRequest);

// Mark node as disconnected
Node node = mockNode();
client.setUnreachable(node, REQUEST_TIMEOUT_MS);

// Poll with onClose=false (default)
ncd.poll(0, time.milliseconds());
assertTrue(ncd.hasAnyPendingRequests());

// Poll with onClose=true
ncd.poll(0, time.milliseconds(), true);

// Verify the request is absent since we're removing unsent requests on close.
assertFalse(ncd.hasAnyPendingRequests());
assertTrue(unsentRequest.future().isDone());
TestUtils.assertFutureThrows(NetworkException.class, unsentRequest.future());
}
}

public NetworkClientDelegate newNetworkClientDelegate(boolean notifyMetadataErrorsViaErrorQueue) {
return newNetworkClientDelegate(notifyMetadataErrorsViaErrorQueue, mock(AsyncConsumerMetrics.class));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ public void testAcknowledgeOnClose() {
// Remaining acknowledgements sent with close().
Acknowledgements acknowledgements2 = getAcknowledgements(2, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);

shareConsumeRequestManager.acknowledgeOnClose(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements2)),
CompletableFuture<Void> closeFuture = shareConsumeRequestManager.acknowledgeOnClose(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements2)),
calculateDeadlineMs(time.timer(100)));

assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
Expand All @@ -416,6 +416,25 @@ public void testAcknowledgeOnClose() {
// Verifying that all 3 offsets were acknowledged as part of the final ShareAcknowledge on close.
assertEquals(mergedAcks.getAcknowledgementsTypeMap(), completedAcknowledgements.get(0).get(tip0).getAcknowledgementsTypeMap());
assertTrue(shareConsumeRequestManager.hasCompletedFetches());

// Polling once more to complete the closeFuture.
shareConsumeRequestManager.sendFetches();
assertTrue(closeFuture.isDone());
}

@Test
public void testCloseFutureCompletedWhenMemberIdIsNull() {
buildRequestManager(new MetricConfig(), new ByteArrayDeserializer(), new ByteArrayDeserializer(), null, ShareAcquireMode.BATCH_OPTIMIZED);
assignFromSubscribed(Collections.singleton(tp0));

CompletableFuture<Void> closeFuture = shareConsumeRequestManager.acknowledgeOnClose(Map.of(),
calculateDeadlineMs(time.timer(100)));

assertFalse(closeFuture.isDone());

// The subsequent poll should complete the closeFuture as the memberId is null.
shareConsumeRequestManager.sendFetches();
assertTrue(closeFuture.isDone());
}

@Test
Expand Down Expand Up @@ -2466,7 +2485,7 @@ public void testFetchOneNodeAtATimeForRecordLimitMode() {
.setErrorCode(Errors.NONE.code()));
client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, List.of(), 0), nodeId1);
partitionData = buildPartitionDataMap(tip0, records, ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE, Errors.NONE);

client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, List.of(), 0), nodeId0);
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
Expand Down Expand Up @@ -2731,25 +2750,27 @@ private void buildRequestManager(ShareAcquireMode shareAcquireMode) {
private <K, V> void buildRequestManager(Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
ShareAcquireMode shareAcquireMode) {
buildRequestManager(new MetricConfig(), keyDeserializer, valueDeserializer, shareAcquireMode);
buildRequestManager(new MetricConfig(), keyDeserializer, valueDeserializer, Uuid.randomUuid().toString(), shareAcquireMode);
}

private <K, V> void buildRequestManager(MetricConfig metricConfig,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
String memberId,
ShareAcquireMode shareAcquireMode) {
LogContext logContext = new LogContext();
SubscriptionState subscriptionState = new SubscriptionState(logContext, AutoOffsetResetStrategy.EARLIEST);
buildRequestManager(metricConfig, keyDeserializer, valueDeserializer,
subscriptionState, logContext, shareAcquireMode);
subscriptionState, logContext, memberId, shareAcquireMode);
}

private <K, V> void buildRequestManager(MetricConfig metricConfig,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
SubscriptionState subscriptionState,
LogContext logContext,
ShareAcquireMode shareAcquireMode) {
String memberId,
ShareAcquireMode shareAcquireMode) {
buildDependencies(metricConfig, subscriptionState, logContext);
Deserializers<K, V> deserializers = new Deserializers<>(keyDeserializer, valueDeserializer, metrics);
int maxWaitMs = 0;
Expand Down Expand Up @@ -2781,7 +2802,8 @@ private <K, V> void buildRequestManager(MetricConfig metricConfig,
new ShareFetchBuffer(logContext),
acknowledgementEventHandler,
metricsManager,
shareFetchCollector));
shareFetchCollector,
memberId));
}

private void buildDependencies(MetricConfig metricConfig,
Expand Down Expand Up @@ -2820,11 +2842,14 @@ public TestableShareConsumeRequestManager(LogContext logContext,
ShareFetchBuffer shareFetchBuffer,
ShareAcknowledgementEventHandler acknowledgementEventHandler,
ShareFetchMetricsManager metricsManager,
ShareFetchCollector<K, V> fetchCollector) {
ShareFetchCollector<K, V> fetchCollector,
String memberId) {
super(time, logContext, groupId, metadata, subscriptions, shareFetchConfig, shareFetchBuffer,
acknowledgementEventHandler, metricsManager, retryBackoffMs, 1000);
this.shareFetchCollector = fetchCollector;
onMemberEpochUpdated(Optional.empty(), Uuid.randomUuid().toString());
if (memberId != null) {
onMemberEpochUpdated(Optional.empty(), memberId);
}
}

private ShareFetch<K, V> collectFetch() {
Expand Down Expand Up @@ -2914,7 +2939,7 @@ private List<UnsentRequest> removeUnsentRequestByNode(Node node) {
}

@Override
protected void checkDisconnects(final long currentTimeMs) {
protected void checkDisconnects(final long currentTimeMs, boolean onClose) {
// any disconnects affecting requests that have already been transmitted will be handled
// by NetworkClient, so we just need to check whether connections for any of the unsent
// requests have been disconnected; if they have, then we complete the corresponding future
Expand Down