Skip to content

Commit 59f60b9

Browse files
authored
[Hotfix][Zeta] Fix job can not restore when last checkpoint failed (#6193)
1 parent fa5b7d3 commit 59f60b9

File tree

7 files changed

+164
-17
lines changed

7 files changed

+164
-17
lines changed

seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkAggregatedCommitter.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ private void initResourceManager() {
7373
@Override
7474
public List<MultiTableAggregatedCommitInfo> commit(
7575
List<MultiTableAggregatedCommitInfo> aggregatedCommitInfo) throws IOException {
76+
List<MultiTableAggregatedCommitInfo> errorList = new ArrayList<>();
7677
for (String sinkIdentifier : aggCommitters.keySet()) {
7778
SinkAggregatedCommitter<?, ?> sinkCommitter = aggCommitters.get(sinkIdentifier);
7879
if (sinkCommitter != null) {
@@ -85,10 +86,20 @@ public List<MultiTableAggregatedCommitInfo> commit(
8586
.get(sinkIdentifier))
8687
.filter(Objects::nonNull)
8788
.collect(Collectors.toList());
88-
sinkCommitter.commit(commitInfo);
89+
List errCommitList = sinkCommitter.commit(commitInfo);
90+
if (errCommitList.size() == 0) {
91+
continue;
92+
}
93+
94+
for (int i = 0; i < errCommitList.size(); i++) {
95+
if (errorList.size() < i + 1) {
96+
errorList.add(i, new MultiTableAggregatedCommitInfo(new HashMap<>()));
97+
}
98+
errorList.get(i).getCommitInfo().put(sinkIdentifier, errCommitList.get(i));
99+
}
89100
}
90101
}
91-
return new ArrayList<>();
102+
return errorList;
92103
}
93104

94105
@Override

seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,4 +201,31 @@ void afterClass() {
201201
hazelcastInstance.shutdown();
202202
}
203203
}
204+
205+
@Test
206+
public void testLastCheckpointErrorJob() throws Exception {
207+
Common.setDeployMode(DeployMode.CLIENT);
208+
String filePath = TestUtils.getResource("batch_last_checkpoint_error.conf");
209+
JobConfig jobConfig = new JobConfig();
210+
jobConfig.setName("batch_last_checkpoint_error");
211+
212+
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
213+
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
214+
SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
215+
ClientJobExecutionEnvironment jobExecutionEnv =
216+
engineClient.createExecutionContext(filePath, jobConfig, SEATUNNEL_CONFIG);
217+
218+
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
219+
220+
CompletableFuture<JobStatus> objectCompletableFuture =
221+
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
222+
223+
await().atMost(600000, TimeUnit.MILLISECONDS)
224+
.untilAsserted(
225+
() ->
226+
Assertions.assertTrue(
227+
objectCompletableFuture.isDone()
228+
&& JobStatus.FAILED.equals(
229+
objectCompletableFuture.get())));
230+
}
204231
}

seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ public class RestApiIT {
6262

6363
private static HazelcastInstanceImpl node2;
6464

65+
private static SeaTunnelClient engineClient;
66+
6567
private static final String jobName = "test测试";
6668
private static final String paramJobName = "param_test测试";
6769

@@ -80,7 +82,7 @@ void beforeClass() throws Exception {
8082

8183
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
8284
clientConfig.setClusterName(testClusterName);
83-
SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
85+
engineClient = new SeaTunnelClient(clientConfig);
8486
ClientJobExecutionEnvironment jobExecutionEnv =
8587
engineClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig);
8688

@@ -456,6 +458,10 @@ public void testEncryptConfig() {
456458

457459
@AfterEach
458460
void afterClass() {
461+
if (engineClient != null) {
462+
engineClient.close();
463+
}
464+
459465
if (node1 != null) {
460466
node1.shutdown();
461467
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
######
18+
###### This config file is a demonstration of streaming processing in seatunnel config
19+
######
20+
21+
env {
22+
job.mode = "BATCH"
23+
}
24+
25+
source {
26+
# This is a example source plugin **only for test and demonstrate the feature source plugin**
27+
FakeSource {
28+
row.num = 10
29+
map.size = 10
30+
array.size = 10
31+
bytes.length = 10
32+
string.length = 10
33+
parallelism = 1
34+
result_table_name = "fake"
35+
schema = {
36+
fields {
37+
c_map = "map<string, array<int>>"
38+
c_array = "array<int>"
39+
c_string = string
40+
c_boolean = boolean
41+
c_tinyint = tinyint
42+
c_smallint = smallint
43+
c_int = int
44+
c_bigint = bigint
45+
c_float = float
46+
c_double = double
47+
c_decimal = "decimal(30, 8)"
48+
c_null = "null"
49+
c_bytes = bytes
50+
c_date = date
51+
c_timestamp = timestamp
52+
c_row = {
53+
c_map = "map<string, map<string, string>>"
54+
c_array = "array<int>"
55+
c_string = string
56+
c_boolean = boolean
57+
c_tinyint = tinyint
58+
c_smallint = smallint
59+
c_int = int
60+
c_bigint = bigint
61+
c_float = float
62+
c_double = double
63+
c_decimal = "decimal(30, 8)"
64+
c_null = "null"
65+
c_bytes = bytes
66+
c_date = date
67+
c_timestamp = timestamp
68+
}
69+
}
70+
}
71+
}
72+
}
73+
74+
transform {
75+
}
76+
77+
sink {
78+
LocalFile {
79+
path = "/hive/warehouse/test1"
80+
field_delimiter = "\t"
81+
row_delimiter = "\n"
82+
partition_by = ["c_string"]
83+
partition_dir_expression = "${k0}=${v0}"
84+
is_partition_field_write_in_file = true
85+
file_name_expression = "${transactionId}_${now}"
86+
file_format_type = "text"
87+
filename_time_format = "yyyy.MM.dd"
88+
is_enable_transaction = true
89+
save_mode = "error"
90+
}
91+
}

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -470,18 +470,24 @@ public static Map<Long, Integer> getPipelineTasks(Set<TaskLocation> pipelineSubt
470470
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().size()));
471471
}
472472

473+
@SneakyThrows
473474
public PassiveCompletableFuture<CompletedCheckpoint> startSavepoint() {
474475
LOG.info(String.format("Start save point for Job (%s)", jobId));
475476
if (!isAllTaskReady) {
476-
CompletableFuture savepointFuture = new CompletableFuture();
477+
CompletableFuture<CompletedCheckpoint> savepointFuture = new CompletableFuture<>();
477478
savepointFuture.completeExceptionally(
478479
new CheckpointException(
479480
CheckpointCloseReason.TASK_NOT_ALL_READY_WHEN_SAVEPOINT));
480481
return new PassiveCompletableFuture<>(savepointFuture);
481482
}
482-
CompletableFuture<PendingCheckpoint> savepoint =
483-
createPendingCheckpoint(Instant.now().toEpochMilli(), SAVEPOINT_TYPE);
484-
startTriggerPendingCheckpoint(savepoint);
483+
CompletableFuture<PendingCheckpoint> savepoint;
484+
synchronized (lock) {
485+
while (pendingCounter.get() > 0) {
486+
Thread.sleep(500);
487+
}
488+
savepoint = createPendingCheckpoint(Instant.now().toEpochMilli(), SAVEPOINT_TYPE);
489+
startTriggerPendingCheckpoint(savepoint);
490+
}
485491
PendingCheckpoint savepointPendingCheckpoint = savepoint.join();
486492
LOG.info(
487493
String.format(
@@ -827,6 +833,18 @@ public boolean isCompleted() {
827833
&& !latestCompletedCheckpoint.isRestored();
828834
}
829835

836+
public boolean isNoErrorCompleted() {
837+
if (latestCompletedCheckpoint == null) {
838+
return false;
839+
}
840+
CheckpointCoordinatorStatus status =
841+
(CheckpointCoordinatorStatus) runningJobStateIMap.get(checkpointStateImapKey);
842+
return latestCompletedCheckpoint.getCheckpointType().isFinalCheckpoint()
843+
&& (status.equals(CheckpointCoordinatorStatus.FINISHED)
844+
|| status.equals(CheckpointCoordinatorStatus.SUSPEND))
845+
&& !latestCompletedCheckpoint.isRestored();
846+
}
847+
830848
public boolean isEndOfSavePoint() {
831849
if (latestCompletedCheckpoint == null) {
832850
return false;

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -164,14 +164,6 @@ public PassiveCompletableFuture<CompletedCheckpoint>[] triggerSavePoints() {
164164
.toArray(PassiveCompletableFuture[]::new);
165165
}
166166

167-
/**
168-
* Called by the JobMaster, actually triggered by the user. <br>
169-
* After the savepoint is triggered, it will cause the pipeline to stop automatically.
170-
*/
171-
public PassiveCompletableFuture<CompletedCheckpoint> triggerSavepoint(int pipelineId) {
172-
return getCheckpointCoordinator(pipelineId).startSavepoint();
173-
}
174-
175167
public void reportedPipelineRunning(int pipelineId, boolean alreadyStarted) {
176168
log.info(
177169
"reported pipeline running stack: "
@@ -253,7 +245,7 @@ public CompletableFuture<Void> shutdown(JobStatus jobStatus) {
253245
* the pipeline has been completed;
254246
*/
255247
public boolean isCompletedPipeline(int pipelineId) {
256-
return getCheckpointCoordinator(pipelineId).isCompleted();
248+
return getCheckpointCoordinator(pipelineId).isNoErrorCompleted();
257249
}
258250

259251
/**

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -560,7 +560,9 @@ public void handleCheckpointError() {
560560
log.warn(
561561
String.format(
562562
"%s checkpoint have error, cancel the pipeline", getPipelineFullName()));
563-
this.cancelPipeline();
563+
if (!getPipelineState().isEndState()) {
564+
updatePipelineState(PipelineStatus.CANCELING);
565+
}
564566
}
565567

566568
public void startSubPlanStateProcess() {

0 commit comments

Comments
 (0)