Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
import org.apache.seatunnel.engine.server.checkpoint.CompletedCheckpoint;

import lombok.Getter;
import lombok.SneakyThrows;

import java.io.IOException;
Expand All @@ -45,7 +46,7 @@
* <p>The service provides the APIs to get the latest checkpoint data of a job.
*/
public class CheckpointService {
private CheckpointStorage checkpointStorage;
@Getter private CheckpointStorage checkpointStorage;
private Serializer serializer = new ProtoStuffSerializer();

@SneakyThrows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,8 @@
import org.apache.seatunnel.api.tracing.MDCTracer;
import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorageFactory;
import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
import org.apache.seatunnel.engine.common.utils.FactoryUtil;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter;
Expand Down Expand Up @@ -81,9 +78,9 @@ public class CheckpointManager {

private final CheckpointStorage checkpointStorage;

private final JobMaster jobMaster;
private final CheckpointConfig checkpointConfig;

private final ExecutorService executorService;
private final JobMaster jobMaster;

public CheckpointManager(
long jobId,
Expand All @@ -92,19 +89,15 @@ public CheckpointManager(
JobMaster jobMaster,
Map<Integer, CheckpointPlan> checkpointPlanMap,
CheckpointConfig checkpointConfig,
CheckpointStorage checkpointStorage,
ExecutorService executorService,
IMap<Object, Object> runningJobStateIMap)
throws CheckpointStorageException {
this.executorService = executorService;
IMap<Object, Object> runningJobStateIMap) {
this.jobId = jobId;
this.nodeEngine = nodeEngine;
this.jobMaster = jobMaster;
this.checkpointStorage =
FactoryUtil.discoverFactory(
Thread.currentThread().getContextClassLoader(),
CheckpointStorageFactory.class,
checkpointConfig.getStorage().getStorage())
.create(checkpointConfig.getStorage().getStoragePluginConfig());
this.checkpointStorage = checkpointStorage;
this.checkpointConfig = checkpointConfig;

this.coordinatorMap =
MDCTracer.tracing(checkpointPlanMap.values().parallelStream())
.map(
Expand All @@ -115,7 +108,8 @@ public CheckpointManager(
try {
idCounter.start();
PipelineState pipelineState = null;
if (isStartWithSavePoint) {
if (checkpointConfig.isCheckpointEnable()
&& isStartWithSavePoint) {
pipelineState =
checkpointStorage
.getLatestCheckpointByJobIdAndPipelineId(
Expand Down Expand Up @@ -240,7 +234,8 @@ public CompletableFuture<Void> listenPipeline(int pipelineId, PipelineStatus pip
* Listen to the {@link JobStatus} of the {@link Job}.
*/
public void clearCheckpointIfNeed(JobStatus jobStatus) {
if ((jobStatus == JobStatus.FINISHED || jobStatus == JobStatus.CANCELED)
if (checkpointConfig.isCheckpointEnable()
&& (jobStatus == JobStatus.FINISHED || jobStatus == JobStatus.CANCELED)
&& !isSavePointEnd()) {
checkpointStorage.deleteCheckpoint(jobId + "");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.EngineConfig;
import org.apache.seatunnel.engine.common.config.JobConfig;
Expand Down Expand Up @@ -317,7 +316,7 @@ public synchronized void init(long initializationTimestamp, boolean restart) thr
}
}

public void initCheckPointManager(boolean restart) throws CheckpointStorageException {
public void initCheckPointManager(boolean restart) {
this.checkpointManager =
new CheckpointManager(
jobImmutableInformation.getJobId(),
Expand All @@ -326,6 +325,7 @@ public void initCheckPointManager(boolean restart) throws CheckpointStorageExcep
this,
checkpointPlanMap,
jobCheckpointConfig,
seaTunnelServer.getCheckpointService().getCheckpointStorage(),
executorService,
runningJobStateIMap);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ void testACKNotExistPendingCheckpoint() throws CheckpointStorageException {
null,
planMap,
checkpointConfig,
server.getCheckpointService().getCheckpointStorage(),
instance.getExecutorService("test"),
nodeEngine.getHazelcastInstance().getMap(IMAP_RUNNING_JOB_STATE));
checkpointManager.acknowledgeTask(
Expand Down Expand Up @@ -100,6 +101,7 @@ void testSchedulerThreadShouldNotBeInterruptedBeforeJobMasterCleaned()
null,
planMap,
checkpointConfig,
server.getCheckpointService().getCheckpointStorage(),
executorService,
nodeEngine.getHazelcastInstance().getMap(IMAP_RUNNING_JOB_STATE)) {

Expand Down Expand Up @@ -144,6 +146,7 @@ void testCheckpointContinuesWorkAfterClockDrift()
null,
planMap,
checkpointConfig,
server.getCheckpointService().getCheckpointStorage(),
executorService,
nodeEngine.getHazelcastInstance().getMap(IMAP_RUNNING_JOB_STATE)) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public void testHAByIMapCheckpointIDCounter() throws CheckpointStorageException
null,
planMap,
new CheckpointConfig(),
server.getCheckpointService().getCheckpointStorage(),
instance.getExecutorService("test"),
nodeEngine.getHazelcastInstance().getMap(IMAP_RUNNING_JOB_STATE));
Assertions.assertTrue(checkpointManager.isCompletedPipeline(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,26 @@

package org.apache.seatunnel.engine.server.checkpoint;

import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorageFactory;
import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
import org.apache.seatunnel.engine.common.utils.FactoryUtil;
import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
import org.apache.seatunnel.engine.server.CheckpointService;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.awaitility.Awaitility.await;

Expand All @@ -45,6 +47,8 @@ public class CheckpointStorageTest extends AbstractSeaTunnelServerTest {
public static String BATCH_CONF_PATH = "batch_fakesource_to_file.conf";
public static String BATCH_CONF_WITH_CHECKPOINT_PATH =
"batch_fakesource_to_file_with_checkpoint.conf";
public static String BATCH_CONF_WITHOUT_CHECKPOINT_INTERVAL_PATH =
"batch_fake_to_console_without_checkpoint_interval.conf";

public static String STREAM_CONF_WITH_CHECKPOINT_PATH =
"stream_fake_to_console_with_checkpoint.conf";
Expand All @@ -64,15 +68,8 @@ public SeaTunnelConfig loadSeaTunnelConfig() {
public void testGenerateFileWhenSavepoint()
throws CheckpointStorageException, InterruptedException {
long jobId = System.currentTimeMillis();
CheckpointConfig checkpointConfig =
server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig();

CheckpointStorage checkpointStorage =
FactoryUtil.discoverFactory(
Thread.currentThread().getContextClassLoader(),
CheckpointStorageFactory.class,
checkpointConfig.getStorage().getStorage())
.create(checkpointConfig.getStorage().getStoragePluginConfig());
CheckpointStorage checkpointStorage = server.getCheckpointService().getCheckpointStorage();
startJob(jobId, STREAM_CONF_PATH, false);
await().atMost(120000, TimeUnit.MILLISECONDS)
.untilAsserted(
Expand All @@ -98,15 +95,8 @@ public void testGenerateFileWhenSavepoint()
@Test
public void testBatchJob() throws CheckpointStorageException {
long jobId = System.currentTimeMillis();
CheckpointConfig checkpointConfig =
server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig();

CheckpointStorage checkpointStorage =
FactoryUtil.discoverFactory(
Thread.currentThread().getContextClassLoader(),
CheckpointStorageFactory.class,
checkpointConfig.getStorage().getStorage())
.create(checkpointConfig.getStorage().getStoragePluginConfig());
CheckpointStorage checkpointStorage = server.getCheckpointService().getCheckpointStorage();
startJob(jobId, BATCH_CONF_PATH, false);
await().atMost(120000, TimeUnit.MILLISECONDS)
.untilAsserted(
Expand All @@ -126,12 +116,7 @@ public void testBatchJobWithCheckpoint() throws CheckpointStorageException {
server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig();
server.getSeaTunnelConfig().getEngineConfig().setCheckpointConfig(checkpointConfig);

CheckpointStorage checkpointStorage =
FactoryUtil.discoverFactory(
Thread.currentThread().getContextClassLoader(),
CheckpointStorageFactory.class,
checkpointConfig.getStorage().getStorage())
.create(checkpointConfig.getStorage().getStoragePluginConfig());
CheckpointStorage checkpointStorage = server.getCheckpointService().getCheckpointStorage();
startJob(jobId, BATCH_CONF_WITH_CHECKPOINT_PATH, false);
await().atMost(120000, TimeUnit.MILLISECONDS)
.untilAsserted(
Expand All @@ -151,12 +136,7 @@ public void testStreamJobWithCancel() throws CheckpointStorageException, Interru
server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig();
server.getSeaTunnelConfig().getEngineConfig().setCheckpointConfig(checkpointConfig);

CheckpointStorage checkpointStorage =
FactoryUtil.discoverFactory(
Thread.currentThread().getContextClassLoader(),
CheckpointStorageFactory.class,
checkpointConfig.getStorage().getStorage())
.create(checkpointConfig.getStorage().getStoragePluginConfig());
CheckpointStorage checkpointStorage = server.getCheckpointService().getCheckpointStorage();
startJob(jobId, STREAM_CONF_WITH_CHECKPOINT_PATH, false);
await().atMost(120000, TimeUnit.MILLISECONDS)
.untilAsserted(
Expand All @@ -177,4 +157,102 @@ public void testStreamJobWithCancel() throws CheckpointStorageException, Interru
checkpointStorage.getAllCheckpoints(String.valueOf(jobId));
Assertions.assertEquals(0, allCheckpoints.size());
}

@Test
public void testBatchJobResetCheckpointStorage() throws CheckpointStorageException {
long jobId = System.currentTimeMillis();
CheckpointConfig checkpointConfig =
server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig();
server.getSeaTunnelConfig().getEngineConfig().setCheckpointConfig(checkpointConfig);
final CheckpointStorage originalCheckpointStorage =
server.getCheckpointService().getCheckpointStorage();

// access checkpoint storage counter
AtomicInteger accessCounter = new AtomicInteger(0);
CheckpointStorage checkpointStorage =
new CheckpointStorage() {
@Override
public String storeCheckPoint(PipelineState pipelineState)
throws CheckpointStorageException {
accessCounter.incrementAndGet();
return "";
}

@Override
public void asyncStoreCheckPoint(PipelineState pipelineState)
throws CheckpointStorageException {
accessCounter.incrementAndGet();
}

@Override
public List<PipelineState> getAllCheckpoints(String s)
throws CheckpointStorageException {
accessCounter.incrementAndGet();
return Collections.emptyList();
}

@Override
public List<PipelineState> getLatestCheckpoint(String s)
throws CheckpointStorageException {
accessCounter.incrementAndGet();
return Collections.emptyList();
}

@Override
public PipelineState getLatestCheckpointByJobIdAndPipelineId(
String s, String s1) throws CheckpointStorageException {
accessCounter.incrementAndGet();
return null;
}

@Override
public List<PipelineState> getCheckpointsByJobIdAndPipelineId(
String s, String s1) throws CheckpointStorageException {
accessCounter.incrementAndGet();
return Collections.emptyList();
}

@Override
public void deleteCheckpoint(String s) {
accessCounter.incrementAndGet();
}

@Override
public PipelineState getCheckpoint(String s, String s1, String s2)
throws CheckpointStorageException {
accessCounter.incrementAndGet();
return null;
}

@Override
public void deleteCheckpoint(String s, String s1, String s2)
throws CheckpointStorageException {
accessCounter.incrementAndGet();
}

@Override
public void deleteCheckpoint(String s, String s1, List<String> list)
throws CheckpointStorageException {
accessCounter.incrementAndGet();
}
};

// replace the checkpoint storage reused by the system
CheckpointService checkpointService = server.getCheckpointService();
ReflectionUtils.setField(checkpointService, "checkpointStorage", checkpointStorage);

startJob(jobId, BATCH_CONF_WITHOUT_CHECKPOINT_INTERVAL_PATH, false);
await().atMost(120000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
server.getCoordinatorService().getJobStatus(jobId),
JobStatus.FINISHED));

checkpointStorage.getAllCheckpoints(String.valueOf(jobId));
Assertions.assertEquals(1, accessCounter.get());

// restore the server's checkpointStorage to avoid affecting other unit cases
ReflectionUtils.setField(checkpointService, "checkpointStorage", originalCheckpointStorage);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in SeaTunnel config
######

env {
# You can set SeaTunnel environment configuration here
parallelism = 2
job.mode = "BATCH"
# remove `checkpoint.interval` config
# checkpoint.interval = 10000
}

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
parallelism = 2
plugin_output = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}
}

sink {
Console {
}
}