Skip to content

Commit 670bba0

Browse files
authored
[Improve][Zeta] Clean checkpoint file when job FINISHED/CANCELED (#6938)
1 parent 4aa5998 commit 670bba0

File tree

6 files changed

+181
-5
lines changed

6 files changed

+181
-5
lines changed

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -241,12 +241,11 @@ public CompletableFuture<Void> listenPipeline(int pipelineId, PipelineStatus pip
241241
* Called by the JobMaster. <br>
242242
* Listen to the {@link JobStatus} of the {@link Job}.
243243
*/
244-
public CompletableFuture<Void> shutdown(JobStatus jobStatus) {
244+
public void clearCheckpointIfNeed(JobStatus jobStatus) {
245245
if ((jobStatus == JobStatus.FINISHED || jobStatus == JobStatus.CANCELED)
246246
&& !isSavePointEnd()) {
247247
checkpointStorage.deleteCheckpoint(jobId + "");
248248
}
249-
return CompletableFuture.completedFuture(null);
250249
}
251250

252251
/**

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -518,6 +518,7 @@ public void releasePipelineResource(SubPlan subPlan) {
518518
}
519519

520520
public void cleanJob() {
521+
checkpointManager.clearCheckpointIfNeed(physicalPlan.getJobStatus());
521522
jobHistoryService.storeJobInfo(jobImmutableInformation.getJobId(), getJobDAGInfo());
522523
jobHistoryService.storeFinishedJobState(this);
523524
removeJobIMap();

seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import java.time.Instant;
4242
import java.util.HashMap;
4343
import java.util.Map;
44-
import java.util.concurrent.CompletableFuture;
4544

4645
import static org.apache.seatunnel.engine.common.Constant.IMAP_CHECKPOINT_ID;
4746
import static org.apache.seatunnel.engine.common.Constant.IMAP_RUNNING_JOB_STATE;
@@ -94,8 +93,7 @@ public void testHAByIMapCheckpointIDCounter() throws CheckpointStorageException
9493
Assertions.assertTrue(checkpointManager.isCompletedPipeline(1));
9594
checkpointManager.listenPipeline(1, PipelineStatus.FINISHED);
9695
Assertions.assertNull(checkpointIdMap.get(1));
97-
CompletableFuture<Void> future = checkpointManager.shutdown(JobStatus.FINISHED);
98-
future.join();
96+
checkpointManager.clearCheckpointIfNeed(JobStatus.FINISHED);
9997
Assertions.assertTrue(checkpointStorage.getAllCheckpoints(jobId + "").isEmpty());
10098
}
10199
}

seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ public class CheckpointStorageTest extends AbstractSeaTunnelServerTest {
4343

4444
public static String STREAM_CONF_PATH = "stream_fake_to_console_biginterval.conf";
4545
public static String BATCH_CONF_PATH = "batch_fakesource_to_file.conf";
46+
public static String BATCH_CONF_WITH_CHECKPOINT_PATH =
47+
"batch_fakesource_to_file_with_checkpoint.conf";
48+
49+
public static String STREAM_CONF_WITH_CHECKPOINT_PATH =
50+
"stream_fake_to_console_with_checkpoint.conf";
4651

4752
@Override
4853
public SeaTunnelConfig loadSeaTunnelConfig() {
@@ -113,4 +118,63 @@ public void testBatchJob() throws CheckpointStorageException {
113118
checkpointStorage.getAllCheckpoints(String.valueOf(jobId));
114119
Assertions.assertEquals(0, allCheckpoints.size());
115120
}
121+
122+
@Test
123+
public void testBatchJobWithCheckpoint() throws CheckpointStorageException {
124+
long jobId = System.currentTimeMillis();
125+
CheckpointConfig checkpointConfig =
126+
server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig();
127+
server.getSeaTunnelConfig().getEngineConfig().setCheckpointConfig(checkpointConfig);
128+
129+
CheckpointStorage checkpointStorage =
130+
FactoryUtil.discoverFactory(
131+
Thread.currentThread().getContextClassLoader(),
132+
CheckpointStorageFactory.class,
133+
checkpointConfig.getStorage().getStorage())
134+
.create(checkpointConfig.getStorage().getStoragePluginConfig());
135+
startJob(jobId, BATCH_CONF_WITH_CHECKPOINT_PATH, false);
136+
await().atMost(120000, TimeUnit.MILLISECONDS)
137+
.untilAsserted(
138+
() ->
139+
Assertions.assertEquals(
140+
server.getCoordinatorService().getJobStatus(jobId),
141+
JobStatus.FINISHED));
142+
List<PipelineState> allCheckpoints =
143+
checkpointStorage.getAllCheckpoints(String.valueOf(jobId));
144+
Assertions.assertEquals(0, allCheckpoints.size());
145+
}
146+
147+
@Test
148+
public void testStreamJobWithCancel() throws CheckpointStorageException, InterruptedException {
149+
long jobId = System.currentTimeMillis();
150+
CheckpointConfig checkpointConfig =
151+
server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig();
152+
server.getSeaTunnelConfig().getEngineConfig().setCheckpointConfig(checkpointConfig);
153+
154+
CheckpointStorage checkpointStorage =
155+
FactoryUtil.discoverFactory(
156+
Thread.currentThread().getContextClassLoader(),
157+
CheckpointStorageFactory.class,
158+
checkpointConfig.getStorage().getStorage())
159+
.create(checkpointConfig.getStorage().getStoragePluginConfig());
160+
startJob(jobId, STREAM_CONF_WITH_CHECKPOINT_PATH, false);
161+
await().atMost(120000, TimeUnit.MILLISECONDS)
162+
.untilAsserted(
163+
() ->
164+
Assertions.assertEquals(
165+
server.getCoordinatorService().getJobStatus(jobId),
166+
JobStatus.RUNNING));
167+
// wait for checkpoint
168+
Thread.sleep(10 * 1000);
169+
server.getCoordinatorService().getJobMaster(jobId).cancelJob();
170+
await().atMost(120000, TimeUnit.MILLISECONDS)
171+
.untilAsserted(
172+
() ->
173+
Assertions.assertEquals(
174+
server.getCoordinatorService().getJobStatus(jobId),
175+
JobStatus.CANCELED));
176+
List<PipelineState> allCheckpoints =
177+
checkpointStorage.getAllCheckpoints(String.valueOf(jobId));
178+
Assertions.assertEquals(0, allCheckpoints.size());
179+
}
116180
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
######
18+
###### This config file is a demonstration of streaming processing in seatunnel config
19+
######
20+
21+
env {
22+
parallelism = 1
23+
job.mode = "BATCH"
24+
checkpoint.interval = 1000
25+
}
26+
27+
source {
28+
FakeSource {
29+
row.num = 100
30+
split.num = 5
31+
split.read-interval = 3000
32+
result_table_name = "fake"
33+
schema = {
34+
fields {
35+
name = "string"
36+
age = "int"
37+
}
38+
}
39+
parallelism = 1
40+
}
41+
}
42+
43+
transform {
44+
}
45+
46+
sink {
47+
LocalFile {
48+
path="/tmp/hive/warehouse/test2"
49+
field_delimiter="\t"
50+
row_delimiter="\n"
51+
partition_by=["age"]
52+
partition_dir_expression="${k0}=${v0}"
53+
is_partition_field_write_in_file=true
54+
file_name_expression="${transactionId}_${now}"
55+
file_format_type="text"
56+
sink_columns=["name","age"]
57+
filename_time_format="yyyy.MM.dd"
58+
is_enable_transaction=true
59+
save_mode="error"
60+
61+
}
62+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
######
18+
###### This config file is a demonstration of streaming processing in SeaTunnel config
19+
######
20+
21+
env {
22+
# You can set SeaTunnel environment configuration here
23+
parallelism = 2
24+
job.mode = "STREAMING"
25+
checkpoint.interval = 1000
26+
}
27+
28+
source {
29+
# This is a example source plugin **only for test and demonstrate the feature source plugin**
30+
FakeSource {
31+
parallelism = 2
32+
result_table_name = "fake"
33+
row.num = 16
34+
schema = {
35+
fields {
36+
name = "string"
37+
age = "int"
38+
}
39+
}
40+
}
41+
42+
# If you would like to get more information about how to configure SeaTunnel and see full list of source plugins,
43+
# please go to https://seatunnel.apache.org/docs/category/source-v2
44+
}
45+
46+
sink {
47+
Console {
48+
}
49+
50+
# If you would like to get more information about how to configure SeaTunnel and see full list of sink plugins,
51+
# please go to https://seatunnel.apache.org/docs/category/sink-v2
52+
}

0 commit comments

Comments
 (0)