Skip to content

Commit 64cb839

Browse files
authored
KAFKA-17853: Fix termination issue in ConsoleConsumer and ConsoleShareConsumer (#19886)
https://issues.apache.org/jira/browse/KAFKA-17853 - - There is an issue with the console share consumer where if the broker is unavailable, even after force terminating using ctrl-c, the consumer does not shut down immediately. It takes around ~30 seconds to close once the broker shuts down. - The console consumer on the other hand, was supposedly shutting down immediately once we press ctrl-c. On reproducing the issue with a local kafka server, I observed the issue was present in both the console consumer and the console share consumer. Issue : - On seeing the client debug logs, this issue seemed related to network thread sending repeated `FindCoordinator` requests until the timer expired. This was happening in both the console-consumer and console-share-consumer. - Debug logs showed that when the broker is shut down, the heartbeat fails with a `DisconnectException`(which is retriable), this triggers a `findCoordinator` request on the network thread which retries until the default timeout expires. - This request is sent even before we trigger a close on the consumer, so once we press ctrl-c, although the `ConsumerNetworkThread::close()` is triggered, it waits for the default timeout until all the requests are sent out for a graceful shutdown. PR aims to fix this issue by adding a check in `NetworkClientDelegate` to remove any pending unsent requests(with empty node values) during close. This would avoid unnecessary retries and the consumers would shut down immediately upon termination. Share consumers shutting down after the fix. ``` [2025-06-03 16:23:42,175] DEBUG [ShareConsumer clientId=console-share-consumer, groupId=console-share-consumer] Removing unsent request UnsentRequest{requestBuilder=FindCoordinatorRequestData(key='console-share-consumer', keyType=0, coordinatorKeys=[]), handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@2b351de8, node=Optional.empty, remainingMs=28565} because the client is closing (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate) [2025-06-03 16:23:42,175] DEBUG [ShareConsumer clientId=console-share-consumer, groupId=console-share-consumer] FindCoordinator request failed due to retriable exception (org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager) org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received. [2025-06-03 16:23:42,176] DEBUG [ShareConsumer clientId=console-share-consumer, groupId=console-share-consumer] Closing RequestManagers (org.apache.kafka.clients.consumer.internals.RequestManagers) [2025-06-03 16:23:42,177] DEBUG [ShareConsumer clientId=console-share-consumer, groupId=console-share-consumer] RequestManagers has been closed (org.apache.kafka.clients.consumer.internals.RequestManagers) [2025-06-03 16:23:42,179] DEBUG [ShareConsumer clientId=console-share-consumer, groupId=console-share-consumer] Closed the consumer network thread (org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread) [2025-06-03 16:23:42,181] DEBUG [ShareConsumer clientId=console-share-consumer, groupId=console-share-consumer] Kafka share consumer has been closed (org.apache.kafka.clients.consumer.internals.ShareConsumerImpl) Processed a total of 0 messages ``` Regular consumers shutting down after the fix. ``` [2025-06-03 16:24:27,196] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-5671] Removing unsent request UnsentRequest{requestBuilder=FindCoordinatorRequestData(key='console-consumer-5671', keyType=0, coordinatorKeys=[]), handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@3770591b, node=Optional.empty, remainingMs=29160} because the client is closing (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate) [2025-06-03 16:24:27,196] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-5671] FindCoordinator request failed due to retriable exception (org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager) org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received. [2025-06-03 16:24:27,197] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-5671] Closing RequestManagers (org.apache.kafka.clients.consumer.internals.RequestManagers) [2025-06-03 16:24:27,197] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-5671] Removing test-topic-23-0 from buffered fetch data as it is not in the set of partitions to retain ([]) (org.apache.kafka.clients.consumer.internals.FetchBuffer) [2025-06-03 16:24:27,197] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-5671] RequestManagers has been closed (org.apache.kafka.clients.consumer.internals.RequestManagers) [2025-06-03 16:24:27,200] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-5671] Closed the consumer network thread (org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread) [2025-06-03 16:24:27,202] DEBUG [Consumer clientId=console-consumer, groupId=console-consumer-5671] Kafka consumer has been closed (org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer) Processed a total of 0 messages ``` Reviewers: Lianet Magrans <[email protected]>, Kirk True <[email protected]>, Andrew Schofield <[email protected]>
1 parent b443130 commit 64cb839

File tree

9 files changed

+190
-15
lines changed

9 files changed

+190
-15
lines changed

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
8888
import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
8989
import static org.apache.kafka.clients.consumer.ConsumerConfig.CLIENT_ID_CONFIG;
90+
import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
9091
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
9192
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_INSTANCE_ID_CONFIG;
9293
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
@@ -109,6 +110,7 @@
109110
import static org.junit.jupiter.api.Assertions.assertNotNull;
110111
import static org.junit.jupiter.api.Assertions.assertNull;
111112
import static org.junit.jupiter.api.Assertions.assertThrows;
113+
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
112114
import static org.junit.jupiter.api.Assertions.assertTrue;
113115
import static org.junit.jupiter.api.Assertions.fail;
114116

@@ -181,6 +183,63 @@ public void testAsyncConsumeCoordinatorFailover() throws InterruptedException {
181183
testCoordinatorFailover(cluster, config);
182184
}
183185

186+
@ClusterTest(
187+
brokers = 1,
188+
serverProperties = {
189+
@ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
190+
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"),
191+
@ClusterConfigProperty(key = "transaction.state.log.replication.factor", value = "1"),
192+
@ClusterConfigProperty(key = "transaction.state.log.min.isr", value = "1")
193+
}
194+
)
195+
public void testClassicConsumerCloseOnBrokerShutdown() {
196+
Map<String, Object> config = Map.of(
197+
GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
198+
);
199+
testConsumerCloseOnBrokerShutdown(config);
200+
}
201+
202+
@ClusterTest(
203+
brokers = 1,
204+
serverProperties = {
205+
@ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
206+
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"),
207+
@ClusterConfigProperty(key = "transaction.state.log.replication.factor", value = "1"),
208+
@ClusterConfigProperty(key = "transaction.state.log.min.isr", value = "1")
209+
}
210+
)
211+
public void testAsyncConsumerCloseOnBrokerShutdown() {
212+
Map<String, Object> config = Map.of(
213+
GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
214+
ENABLE_AUTO_COMMIT_CONFIG, false
215+
);
216+
// Disabling auto commit so that commitSync() does not block the close timeout.
217+
testConsumerCloseOnBrokerShutdown(config);
218+
}
219+
220+
private void testConsumerCloseOnBrokerShutdown(Map<String, Object> consumerConfig) {
221+
try (Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig)) {
222+
consumer.subscribe(List.of(TOPIC));
223+
224+
// Force consumer to discover coordinator by doing a poll
225+
// This ensures coordinator is discovered before we shutdown the broker
226+
consumer.poll(Duration.ofMillis(100));
227+
228+
// Now shutdown broker.
229+
assertEquals(1, cluster.brokers().size());
230+
KafkaBroker broker = cluster.brokers().get(0);
231+
cluster.shutdownBroker(0);
232+
broker.awaitShutdown();
233+
234+
// Do another poll to force the consumer to retry finding the coordinator.
235+
consumer.poll(Duration.ofMillis(100));
236+
237+
// Close should not hang waiting for retries when broker is already down
238+
assertTimeoutPreemptively(Duration.ofSeconds(5), () -> consumer.close(),
239+
"Consumer close should not wait for full timeout when broker is already shutdown");
240+
}
241+
}
242+
184243
@ClusterTest
185244
public void testClassicConsumerHeaders() throws Exception {
186245
testHeaders(Map.of(

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@
113113
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
114114
import static org.junit.jupiter.api.Assertions.assertNotEquals;
115115
import static org.junit.jupiter.api.Assertions.assertThrows;
116+
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
116117
import static org.junit.jupiter.api.Assertions.assertTrue;
117118
import static org.junit.jupiter.api.Assertions.fail;
118119

@@ -1252,6 +1253,28 @@ public void testMultipleConsumersWithDifferentGroupIds() throws InterruptedExcep
12521253
}
12531254
}
12541255

1256+
@ClusterTest
1257+
public void testConsumerCloseOnBrokerShutdown() {
1258+
alterShareAutoOffsetReset("group1", "earliest");
1259+
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1");
1260+
shareConsumer.subscribe(Set.of(tp.topic()));
1261+
1262+
// To ensure coordinator discovery is complete before shutting down the broker
1263+
shareConsumer.poll(Duration.ofMillis(100));
1264+
1265+
// Shutdown the broker.
1266+
assertEquals(1, cluster.brokers().size());
1267+
KafkaBroker broker = cluster.brokers().get(0);
1268+
cluster.shutdownBroker(0);
1269+
1270+
broker.awaitShutdown();
1271+
1272+
// Assert that close completes in less than 5 seconds, not the full 30-second timeout.
1273+
assertTimeoutPreemptively(Duration.ofSeconds(5), () -> {
1274+
shareConsumer.close();
1275+
}, "Consumer close should not wait for full timeout when broker is already shutdown");
1276+
}
1277+
12551278
@ClusterTest
12561279
public void testMultipleConsumersInGroupSequentialConsumption() {
12571280
alterShareAutoOffsetReset("group1", "earliest");

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ private void sendUnsentRequests(final Timer timer) {
395395
return;
396396

397397
do {
398-
networkClientDelegate.poll(timer.remainingMs(), timer.currentTimeMs());
398+
networkClientDelegate.poll(timer.remainingMs(), timer.currentTimeMs(), true);
399399
timer.update();
400400
} while (timer.notExpired() && networkClientDelegate.hasAnyPendingRequests());
401401

clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.kafka.common.errors.TimeoutException;
3535
import org.apache.kafka.common.metrics.Metrics;
3636
import org.apache.kafka.common.metrics.Sensor;
37+
import org.apache.kafka.common.protocol.Errors;
3738
import org.apache.kafka.common.requests.AbstractRequest;
3839
import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
3940
import org.apache.kafka.common.utils.LogContext;
@@ -137,13 +138,25 @@ public void tryConnect(Node node) {
137138
}
138139

139140
/**
140-
* Returns the responses of the sent requests. This method will try to send the unsent requests, poll for responses,
141+
* This method will try to send the unsent requests, poll for responses,
141142
* and check the disconnected nodes.
142143
*
143144
* @param timeoutMs timeout time
144145
* @param currentTimeMs current time
145146
*/
146147
public void poll(final long timeoutMs, final long currentTimeMs) {
148+
poll(timeoutMs, currentTimeMs, false);
149+
}
150+
151+
/**
152+
* This method will try to send the unsent requests, poll for responses,
153+
* and check the disconnected nodes.
154+
*
155+
* @param timeoutMs timeout time
156+
* @param currentTimeMs current time
157+
* @param onClose True when the network thread is closing.
158+
*/
159+
public void poll(final long timeoutMs, final long currentTimeMs, boolean onClose) {
147160
trySend(currentTimeMs);
148161

149162
long pollTimeoutMs = timeoutMs;
@@ -152,7 +165,7 @@ public void poll(final long timeoutMs, final long currentTimeMs) {
152165
}
153166
this.client.poll(pollTimeoutMs, currentTimeMs);
154167
maybePropagateMetadataError();
155-
checkDisconnects(currentTimeMs);
168+
checkDisconnects(currentTimeMs, onClose);
156169
asyncConsumerMetrics.recordUnsentRequestsQueueSize(unsentRequests.size(), currentTimeMs);
157170
}
158171

@@ -219,7 +232,7 @@ boolean doSend(final UnsentRequest r, final long currentTimeMs) {
219232
return true;
220233
}
221234

222-
protected void checkDisconnects(final long currentTimeMs) {
235+
protected void checkDisconnects(final long currentTimeMs, boolean onClose) {
223236
// Check the connection of the unsent request. Disconnect the disconnected node if it is unable to be connected.
224237
Iterator<UnsentRequest> iter = unsentRequests.iterator();
225238
while (iter.hasNext()) {
@@ -229,6 +242,11 @@ protected void checkDisconnects(final long currentTimeMs) {
229242
asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() - u.enqueueTimeMs());
230243
AuthenticationException authenticationException = client.authenticationException(u.node.get());
231244
u.handler.onFailure(currentTimeMs, authenticationException);
245+
} else if (u.node.isEmpty() && onClose) {
246+
log.debug("Removing unsent request {} because the client is closing", u);
247+
iter.remove();
248+
asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() - u.enqueueTimeMs());
249+
u.handler.onFailure(currentTimeMs, Errors.NETWORK_EXCEPTION.exception());
232250
}
233251
}
234252
}

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,9 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
136136
@Override
137137
public PollResult poll(long currentTimeMs) {
138138
if (memberId == null) {
139+
if (closing && !closeFuture.isDone()) {
140+
closeFuture.complete(null);
141+
}
139142
return PollResult.EMPTY;
140143
}
141144

clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.junit.jupiter.params.ParameterizedTest;
3636
import org.junit.jupiter.params.provider.MethodSource;
3737
import org.junit.jupiter.params.provider.ValueSource;
38+
import org.mockito.ArgumentMatchers;
3839

3940
import java.time.Duration;
4041
import java.util.LinkedList;
@@ -203,7 +204,7 @@ public void testRunOnceInvokesReaper() {
203204
public void testSendUnsentRequests() {
204205
when(networkClientDelegate.hasAnyPendingRequests()).thenReturn(true).thenReturn(true).thenReturn(false);
205206
consumerNetworkThread.cleanup();
206-
verify(networkClientDelegate, times(2)).poll(anyLong(), anyLong());
207+
verify(networkClientDelegate, times(2)).poll(anyLong(), anyLong(), ArgumentMatchers.booleanThat(onClose -> onClose));
207208
}
208209

209210
@ParameterizedTest

clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4266,7 +4266,7 @@ private List<UnsentRequest> removeUnsentRequestByNode(Node node) {
42664266
}
42674267

42684268
@Override
4269-
protected void checkDisconnects(final long currentTimeMs) {
4269+
protected void checkDisconnects(final long currentTimeMs, boolean onClose) {
42704270
// any disconnects affecting requests that have already been transmitted will be handled
42714271
// by NetworkClient, so we just need to check whether connections for any of the unsent
42724272
// requests have been disconnected; if they have, then we complete the corresponding future

clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.kafka.common.Node;
2828
import org.apache.kafka.common.errors.AuthenticationException;
2929
import org.apache.kafka.common.errors.DisconnectException;
30+
import org.apache.kafka.common.errors.NetworkException;
3031
import org.apache.kafka.common.errors.TimeoutException;
3132
import org.apache.kafka.common.message.FindCoordinatorRequestData;
3233
import org.apache.kafka.common.metrics.Metrics;
@@ -282,6 +283,51 @@ public void testRecordUnsentRequestsQueueTime(String groupName) throws Exception
282283
}
283284
}
284285

286+
@Test
287+
public void testPollWithOnClose() throws Exception {
288+
try (NetworkClientDelegate ncd = newNetworkClientDelegate(false)) {
289+
NetworkClientDelegate.UnsentRequest unsentRequest = newUnsentFindCoordinatorRequest();
290+
ncd.add(unsentRequest);
291+
292+
// First poll without onClose
293+
ncd.poll(0, time.milliseconds());
294+
assertTrue(ncd.hasAnyPendingRequests());
295+
296+
// Poll with onClose=true
297+
ncd.poll(0, time.milliseconds(), true);
298+
assertTrue(ncd.hasAnyPendingRequests());
299+
300+
// Complete the request
301+
client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, GROUP_ID, mockNode()));
302+
ncd.poll(0, time.milliseconds(), true);
303+
assertFalse(ncd.hasAnyPendingRequests());
304+
}
305+
}
306+
307+
@Test
308+
public void testCheckDisconnectsWithOnClose() throws Exception {
309+
try (NetworkClientDelegate ncd = newNetworkClientDelegate(false)) {
310+
NetworkClientDelegate.UnsentRequest unsentRequest = newUnsentFindCoordinatorRequest();
311+
ncd.add(unsentRequest);
312+
313+
// Mark node as disconnected
314+
Node node = mockNode();
315+
client.setUnreachable(node, REQUEST_TIMEOUT_MS);
316+
317+
// Poll with onClose=false (default)
318+
ncd.poll(0, time.milliseconds());
319+
assertTrue(ncd.hasAnyPendingRequests());
320+
321+
// Poll with onClose=true
322+
ncd.poll(0, time.milliseconds(), true);
323+
324+
// Verify the request is absent since we're removing unsent requests on close.
325+
assertFalse(ncd.hasAnyPendingRequests());
326+
assertTrue(unsentRequest.future().isDone());
327+
TestUtils.assertFutureThrows(NetworkException.class, unsentRequest.future());
328+
}
329+
}
330+
285331
public NetworkClientDelegate newNetworkClientDelegate(boolean notifyMetadataErrorsViaErrorQueue) {
286332
return newNetworkClientDelegate(notifyMetadataErrorsViaErrorQueue, mock(AsyncConsumerMetrics.class));
287333
}

clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,7 @@ public void testAcknowledgeOnClose() {
402402
// Remaining acknowledgements sent with close().
403403
Acknowledgements acknowledgements2 = getAcknowledgements(2, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
404404

405-
shareConsumeRequestManager.acknowledgeOnClose(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements2)),
405+
CompletableFuture<Void> closeFuture = shareConsumeRequestManager.acknowledgeOnClose(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements2)),
406406
calculateDeadlineMs(time.timer(100)));
407407

408408
assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
@@ -416,6 +416,25 @@ public void testAcknowledgeOnClose() {
416416
// Verifying that all 3 offsets were acknowledged as part of the final ShareAcknowledge on close.
417417
assertEquals(mergedAcks.getAcknowledgementsTypeMap(), completedAcknowledgements.get(0).get(tip0).getAcknowledgementsTypeMap());
418418
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
419+
420+
// Polling once more to complete the closeFuture.
421+
shareConsumeRequestManager.sendFetches();
422+
assertTrue(closeFuture.isDone());
423+
}
424+
425+
@Test
426+
public void testCloseFutureCompletedWhenMemberIdIsNull() {
427+
buildRequestManager(new MetricConfig(), new ByteArrayDeserializer(), new ByteArrayDeserializer(), null, ShareAcquireMode.BATCH_OPTIMIZED);
428+
assignFromSubscribed(Collections.singleton(tp0));
429+
430+
CompletableFuture<Void> closeFuture = shareConsumeRequestManager.acknowledgeOnClose(Map.of(),
431+
calculateDeadlineMs(time.timer(100)));
432+
433+
assertFalse(closeFuture.isDone());
434+
435+
// The subsequent poll should complete the closeFuture as the memberId is null.
436+
shareConsumeRequestManager.sendFetches();
437+
assertTrue(closeFuture.isDone());
419438
}
420439

421440
@Test
@@ -2466,7 +2485,7 @@ public void testFetchOneNodeAtATimeForRecordLimitMode() {
24662485
.setErrorCode(Errors.NONE.code()));
24672486
client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, List.of(), 0), nodeId1);
24682487
partitionData = buildPartitionDataMap(tip0, records, ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE, Errors.NONE);
2469-
2488+
24702489
client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, List.of(), 0), nodeId0);
24712490
networkClientDelegate.poll(time.timer(0));
24722491
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
@@ -2731,25 +2750,27 @@ private void buildRequestManager(ShareAcquireMode shareAcquireMode) {
27312750
private <K, V> void buildRequestManager(Deserializer<K> keyDeserializer,
27322751
Deserializer<V> valueDeserializer,
27332752
ShareAcquireMode shareAcquireMode) {
2734-
buildRequestManager(new MetricConfig(), keyDeserializer, valueDeserializer, shareAcquireMode);
2753+
buildRequestManager(new MetricConfig(), keyDeserializer, valueDeserializer, Uuid.randomUuid().toString(), shareAcquireMode);
27352754
}
27362755

27372756
private <K, V> void buildRequestManager(MetricConfig metricConfig,
27382757
Deserializer<K> keyDeserializer,
27392758
Deserializer<V> valueDeserializer,
2759+
String memberId,
27402760
ShareAcquireMode shareAcquireMode) {
27412761
LogContext logContext = new LogContext();
27422762
SubscriptionState subscriptionState = new SubscriptionState(logContext, AutoOffsetResetStrategy.EARLIEST);
27432763
buildRequestManager(metricConfig, keyDeserializer, valueDeserializer,
2744-
subscriptionState, logContext, shareAcquireMode);
2764+
subscriptionState, logContext, memberId, shareAcquireMode);
27452765
}
27462766

27472767
private <K, V> void buildRequestManager(MetricConfig metricConfig,
27482768
Deserializer<K> keyDeserializer,
27492769
Deserializer<V> valueDeserializer,
27502770
SubscriptionState subscriptionState,
27512771
LogContext logContext,
2752-
ShareAcquireMode shareAcquireMode) {
2772+
String memberId,
2773+
ShareAcquireMode shareAcquireMode) {
27532774
buildDependencies(metricConfig, subscriptionState, logContext);
27542775
Deserializers<K, V> deserializers = new Deserializers<>(keyDeserializer, valueDeserializer, metrics);
27552776
int maxWaitMs = 0;
@@ -2781,7 +2802,8 @@ private <K, V> void buildRequestManager(MetricConfig metricConfig,
27812802
new ShareFetchBuffer(logContext),
27822803
acknowledgementEventHandler,
27832804
metricsManager,
2784-
shareFetchCollector));
2805+
shareFetchCollector,
2806+
memberId));
27852807
}
27862808

27872809
private void buildDependencies(MetricConfig metricConfig,
@@ -2820,11 +2842,14 @@ public TestableShareConsumeRequestManager(LogContext logContext,
28202842
ShareFetchBuffer shareFetchBuffer,
28212843
ShareAcknowledgementEventHandler acknowledgementEventHandler,
28222844
ShareFetchMetricsManager metricsManager,
2823-
ShareFetchCollector<K, V> fetchCollector) {
2845+
ShareFetchCollector<K, V> fetchCollector,
2846+
String memberId) {
28242847
super(time, logContext, groupId, metadata, subscriptions, shareFetchConfig, shareFetchBuffer,
28252848
acknowledgementEventHandler, metricsManager, retryBackoffMs, 1000);
28262849
this.shareFetchCollector = fetchCollector;
2827-
onMemberEpochUpdated(Optional.empty(), Uuid.randomUuid().toString());
2850+
if (memberId != null) {
2851+
onMemberEpochUpdated(Optional.empty(), memberId);
2852+
}
28282853
}
28292854

28302855
private ShareFetch<K, V> collectFetch() {
@@ -2914,7 +2939,7 @@ private List<UnsentRequest> removeUnsentRequestByNode(Node node) {
29142939
}
29152940

29162941
@Override
2917-
protected void checkDisconnects(final long currentTimeMs) {
2942+
protected void checkDisconnects(final long currentTimeMs, boolean onClose) {
29182943
// any disconnects affecting requests that have already been transmitted will be handled
29192944
// by NetworkClient, so we just need to check whether connections for any of the unsent
29202945
// requests have been disconnected; if they have, then we complete the corresponding future

0 commit comments

Comments
 (0)