|
77 | 77 | import org.apache.kafka.common.utils.MockTime; |
78 | 78 | import org.apache.kafka.common.utils.Time; |
79 | 79 | import org.apache.kafka.common.utils.Timer; |
80 | | -import org.apache.kafka.test.TestUtils; |
81 | 80 |
|
82 | 81 | import org.junit.jupiter.api.AfterEach; |
83 | 82 | import org.junit.jupiter.api.BeforeEach; |
@@ -366,14 +365,15 @@ public void testServerDisconnectedOnShareAcknowledge() throws InterruptedExcepti |
366 | 365 |
|
367 | 366 | assertEquals(1, shareConsumeRequestManager.requestStates(0).getAsyncRequest().getAcknowledgementsToSendCount(tip0)); |
368 | 367 |
|
369 | | - TestUtils.retryOnExceptionWithTimeout(() -> { |
370 | | - assertEquals(0, shareConsumeRequestManager.sendAcknowledgements()); |
371 | | - // We expect the remaining acknowledgements to be cleared due to share session epoch being set to 0. |
372 | | - assertNull(shareConsumeRequestManager.requestStates(0)); |
373 | | - // The callback for these unsent acknowledgements will be invoked with an error code. |
374 | | - assertEquals(Map.of(tip0, acknowledgements2), completedAcknowledgements.get(0)); |
375 | | - assertInstanceOf(ShareSessionNotFoundException.class, completedAcknowledgements.get(0).get(tip0).getAcknowledgeException()); |
376 | | - }); |
| 368 | + // Wait for backoff time before sending the next request. |
| 369 | + time.sleep(retryBackoffMs); |
| 370 | + |
| 371 | + assertEquals(0, shareConsumeRequestManager.sendAcknowledgements()); |
| 372 | + // We expect the remaining acknowledgements to be cleared due to share session epoch being set to 0. |
| 373 | + assertNull(shareConsumeRequestManager.requestStates(0)); |
| 374 | + // The callback for these unsent acknowledgements will be invoked with an error code. |
| 375 | + assertEquals(Map.of(tip0, acknowledgements2), completedAcknowledgements.get(0)); |
| 376 | + assertInstanceOf(ShareSessionNotFoundException.class, completedAcknowledgements.get(0).get(tip0).getAcknowledgeException()); |
377 | 377 |
|
378 | 378 | // Attempt a normal fetch to check if nodesWithPendingRequests is empty. |
379 | 379 | assertEquals(1, sendFetches()); |
@@ -653,7 +653,10 @@ public void testRetryAcknowledgements() throws InterruptedException { |
653 | 653 | assertEquals(6, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getIncompleteAcknowledgementsCount(tip0)); |
654 | 654 | assertEquals(0, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getInFlightAcknowledgementsCount(tip0)); |
655 | 655 |
|
656 | | - TestUtils.retryOnExceptionWithTimeout(() -> assertEquals(1, shareConsumeRequestManager.sendAcknowledgements())); |
| 656 | + // Wait for backoff time before sending the next request. |
| 657 | + // After the first attempt, it can maximum be 1.2x of the configured backoff when acknowledge fails. (jitter = 0.2) |
| 658 | + time.sleep((long) (1.5 * retryBackoffMs)); |
| 659 | + assertEquals(1, shareConsumeRequestManager.sendAcknowledgements()); |
657 | 660 |
|
658 | 661 | assertEquals(6, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getInFlightAcknowledgementsCount(tip0)); |
659 | 662 |
|
@@ -1232,7 +1235,9 @@ public void testCallbackHandlerConfig() throws InterruptedException { |
1232 | 1235 | shareConsumeRequestManager.commitAsync(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements2)), |
1233 | 1236 | calculateDeadlineMs(time.timer(defaultApiTimeoutMs))); |
1234 | 1237 |
|
1235 | | - TestUtils.retryOnExceptionWithTimeout(() -> assertEquals(1, shareConsumeRequestManager.sendAcknowledgements())); |
| 1238 | + // Wait for backoff time before sending the next request. |
| 1239 | + time.sleep(retryBackoffMs); |
| 1240 | + assertEquals(1, shareConsumeRequestManager.sendAcknowledgements()); |
1236 | 1241 |
|
1237 | 1242 | client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.NONE)); |
1238 | 1243 | networkClientDelegate.poll(time.timer(0)); |
@@ -1401,7 +1406,9 @@ public void testShareAcknowledgeInvalidResponse() throws InterruptedException { |
1401 | 1406 |
|
1402 | 1407 | assertEquals(1, shareConsumeRequestManager.requestStates(0).getAsyncRequest().getIncompleteAcknowledgementsCount(tip0)); |
1403 | 1408 |
|
1404 | | - TestUtils.retryOnExceptionWithTimeout(() -> assertEquals(1, shareConsumeRequestManager.sendAcknowledgements())); |
| 1409 | + // Wait for backoff time before sending the next request. (it can maximum be 1.2x of the configured backoff when acknowledge fails.) |
| 1410 | + time.sleep((long) (1.5 * retryBackoffMs)); |
| 1411 | + assertEquals(1, shareConsumeRequestManager.sendAcknowledgements()); |
1405 | 1412 |
|
1406 | 1413 | client.prepareResponse(fullAcknowledgeResponse(t2ip0, Errors.NONE)); |
1407 | 1414 | networkClientDelegate.poll(time.timer(0)); |
|
0 commit comments