|
19 | 19 |
|
20 | 20 | import org.apache.seatunnel.common.config.Common; |
21 | 21 | import org.apache.seatunnel.common.config.DeployMode; |
| 22 | +import org.apache.seatunnel.common.utils.RetryUtils; |
22 | 23 | import org.apache.seatunnel.engine.client.SeaTunnelClient; |
23 | 24 | import org.apache.seatunnel.engine.client.job.ClientJobProxy; |
24 | 25 | import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment; |
| 26 | +import org.apache.seatunnel.engine.common.Constant; |
25 | 27 | import org.apache.seatunnel.engine.common.config.ConfigProvider; |
26 | 28 | import org.apache.seatunnel.engine.common.config.JobConfig; |
27 | 29 | import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; |
@@ -143,24 +145,31 @@ public void testGetErrorInfo() throws ExecutionException, InterruptedException { |
143 | 145 | JobExecutionEnvironment jobExecutionEnv = |
144 | 146 | engineClient.createExecutionContext(filePath, jobConfig); |
145 | 147 | final ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); |
146 | | - JobStatus jobStatus = clientJobProxy.getJobStatus(); |
147 | | - while (jobStatus == JobStatus.RUNNING) { |
148 | | - Thread.sleep(1000L); |
149 | | - jobStatus = clientJobProxy.getJobStatus(); |
150 | | - } |
151 | 148 |
|
152 | 149 | CompletableFuture<JobResult> completableFuture = |
153 | 150 | CompletableFuture.supplyAsync( |
154 | 151 | () -> { |
155 | | - PassiveCompletableFuture<JobResult> jobFuture = |
156 | | - clientJobProxy.doWaitForJobComplete(); |
157 | | - return jobFuture.join(); |
| 152 | + try { |
| 153 | + return RetryUtils.retryWithException( |
| 154 | + () -> { |
| 155 | + PassiveCompletableFuture<JobResult> jobFuture = |
| 156 | + clientJobProxy.doWaitForJobComplete(); |
| 157 | + return jobFuture.get(); |
| 158 | + }, |
| 159 | + new RetryUtils.RetryMaterial( |
| 160 | + 100000, |
| 161 | + true, |
| 162 | + exception -> true, |
| 163 | + Constant.OPERATION_RETRY_SLEEP)); |
| 164 | + } catch (Exception e) { |
| 165 | + throw new RuntimeException(e); |
| 166 | + } |
158 | 167 | }); |
159 | 168 |
|
160 | | - await().atMost(600000, TimeUnit.MILLISECONDS) |
| 169 | + await().atMost(6000000, TimeUnit.MILLISECONDS) |
161 | 170 | .untilAsserted(() -> Assertions.assertTrue(completableFuture.isDone())); |
162 | 171 |
|
163 | | - JobResult result = completableFuture.join(); |
| 172 | + JobResult result = completableFuture.get(); |
164 | 173 | Assertions.assertEquals(result.getStatus(), JobStatus.FAILED); |
165 | 174 | Assertions.assertTrue(result.getError().startsWith("java.lang.NumberFormatException")); |
166 | 175 | } |
|
0 commit comments