Skip to content

Commit 05077d7

Browse files
committed
Revert "[Fix][Zeta] Avoid redundant checkpoint reads when disabled checkpoint (apache#9552)"
This reverts commit 2fceb6c.
1 parent 2fceb6c commit 05077d7

File tree

7 files changed

+49
-405
lines changed

7 files changed

+49
-405
lines changed

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CheckpointService.java

Lines changed: 0 additions & 131 deletions
This file was deleted.

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@
1919

2020
import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
2121
import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
22+
import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorageFactory;
23+
import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
2224
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
2325
import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
26+
import org.apache.seatunnel.engine.common.utils.FactoryUtil;
2427
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
2528
import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter;
2629
import org.apache.seatunnel.engine.core.dag.actions.Action;
@@ -76,26 +79,30 @@ public class CheckpointManager {
7679

7780
private final CheckpointStorage checkpointStorage;
7881

79-
private final CheckpointConfig checkpointConfig;
80-
8182
private final JobMaster jobMaster;
8283

84+
private final ExecutorService executorService;
85+
8386
public CheckpointManager(
8487
long jobId,
8588
boolean isStartWithSavePoint,
8689
NodeEngine nodeEngine,
8790
JobMaster jobMaster,
8891
Map<Integer, CheckpointPlan> checkpointPlanMap,
8992
CheckpointConfig checkpointConfig,
90-
CheckpointStorage checkpointStorage,
9193
ExecutorService executorService,
92-
IMap<Object, Object> runningJobStateIMap) {
94+
IMap<Object, Object> runningJobStateIMap)
95+
throws CheckpointStorageException {
96+
this.executorService = executorService;
9397
this.jobId = jobId;
9498
this.nodeEngine = nodeEngine;
9599
this.jobMaster = jobMaster;
96-
this.checkpointStorage = checkpointStorage;
97-
this.checkpointConfig = checkpointConfig;
98-
100+
this.checkpointStorage =
101+
FactoryUtil.discoverFactory(
102+
Thread.currentThread().getContextClassLoader(),
103+
CheckpointStorageFactory.class,
104+
checkpointConfig.getStorage().getStorage())
105+
.create(checkpointConfig.getStorage().getStoragePluginConfig());
99106
this.coordinatorMap =
100107
checkpointPlanMap
101108
.values()
@@ -108,8 +115,7 @@ public CheckpointManager(
108115
try {
109116
idCounter.start();
110117
PipelineState pipelineState = null;
111-
if (checkpointConfig.isCheckpointEnable()
112-
&& isStartWithSavePoint) {
118+
if (isStartWithSavePoint) {
113119
pipelineState =
114120
checkpointStorage
115121
.getLatestCheckpointByJobIdAndPipelineId(
@@ -236,8 +242,7 @@ public CompletableFuture<Void> listenPipeline(int pipelineId, PipelineStatus pip
236242
* Listen to the {@link JobStatus} of the {@link Job}.
237243
*/
238244
public void clearCheckpointIfNeed(JobStatus jobStatus) {
239-
if (checkpointConfig.isCheckpointEnable()
240-
&& (jobStatus == JobStatus.FINISHED || jobStatus == JobStatus.CANCELED)
245+
if ((jobStatus == JobStatus.FINISHED || jobStatus == JobStatus.CANCELED)
241246
&& !isSavePointEnd()) {
242247
checkpointStorage.deleteCheckpoint(jobId + "");
243248
}

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.seatunnel.common.utils.ExceptionUtils;
3232
import org.apache.seatunnel.common.utils.RetryUtils;
3333
import org.apache.seatunnel.common.utils.SeaTunnelException;
34+
import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
3435
import org.apache.seatunnel.engine.common.Constant;
3536
import org.apache.seatunnel.engine.common.config.EngineConfig;
3637
import org.apache.seatunnel.engine.common.config.JobConfig;
@@ -280,7 +281,7 @@ public synchronized void init(long initializationTimestamp, boolean restart) thr
280281
}
281282
}
282283

283-
public void initCheckPointManager(boolean restart) {
284+
public void initCheckPointManager(boolean restart) throws CheckpointStorageException {
284285
this.checkpointManager =
285286
new CheckpointManager(
286287
jobImmutableInformation.getJobId(),
@@ -289,7 +290,6 @@ public void initCheckPointManager(boolean restart) {
289290
this,
290291
checkpointPlanMap,
291292
jobCheckpointConfig,
292-
seaTunnelServer.getCheckpointService().getCheckpointStorage(),
293293
executorService,
294294
runningJobStateIMap);
295295
}

seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorTest.java

Lines changed: 0 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -17,33 +17,20 @@
1717

1818
package org.apache.seatunnel.engine.server.checkpoint;
1919

20-
import org.apache.seatunnel.common.utils.ReflectionUtils;
2120
import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
2221
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
2322
import org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig;
24-
import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
2523
import org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
2624
import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
2725
import org.apache.seatunnel.engine.server.checkpoint.operation.TaskAcknowledgeOperation;
2826
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
2927
import org.apache.seatunnel.engine.server.execution.TaskLocation;
3028

31-
import org.junit.jupiter.api.Assertions;
3229
import org.junit.jupiter.api.Test;
33-
import org.mockito.MockedStatic;
34-
import org.mockito.Mockito;
3530

36-
import java.time.Instant;
3731
import java.util.ArrayList;
38-
import java.util.Collections;
3932
import java.util.HashMap;
4033
import java.util.Map;
41-
import java.util.concurrent.ExecutionException;
42-
import java.util.concurrent.ExecutorService;
43-
import java.util.concurrent.Executors;
44-
import java.util.concurrent.TimeUnit;
45-
import java.util.concurrent.TimeoutException;
46-
import java.util.concurrent.atomic.AtomicLong;
4734

4835
import static org.apache.seatunnel.engine.common.Constant.IMAP_RUNNING_JOB_STATE;
4936

@@ -64,7 +51,6 @@ void testACKNotExistPendingCheckpoint() throws CheckpointStorageException {
6451
null,
6552
planMap,
6653
checkpointConfig,
67-
server.getCheckpointService().getCheckpointStorage(),
6854
instance.getExecutorService("test"),
6955
nodeEngine.getHazelcastInstance().getMap(IMAP_RUNNING_JOB_STATE));
7056
checkpointManager.acknowledgeTask(
@@ -74,94 +60,4 @@ void testACKNotExistPendingCheckpoint() throws CheckpointStorageException {
7460
999, System.currentTimeMillis(), CheckpointType.CHECKPOINT_TYPE),
7561
new ArrayList<>()));
7662
}
77-
78-
@Test
79-
void testSchedulerThreadShouldNotBeInterruptedBeforeJobMasterCleaned()
80-
throws CheckpointStorageException, ExecutionException, InterruptedException,
81-
TimeoutException {
82-
CheckpointConfig checkpointConfig = new CheckpointConfig();
83-
// quickly fail the checkpoint
84-
checkpointConfig.setCheckpointTimeout(5000);
85-
checkpointConfig.setStorage(new CheckpointStorageConfig());
86-
Map<Integer, CheckpointPlan> planMap = new HashMap<>();
87-
planMap.put(
88-
1,
89-
CheckpointPlan.builder()
90-
.pipelineId(1)
91-
.pipelineSubtasks(Collections.singleton(new TaskLocation()))
92-
.build());
93-
CompletableFuture<Boolean> threadIsInterrupted = new CompletableFuture<>();
94-
ExecutorService executorService = Executors.newCachedThreadPool();
95-
try {
96-
CheckpointManager checkpointManager =
97-
new CheckpointManager(
98-
1L,
99-
false,
100-
nodeEngine,
101-
null,
102-
planMap,
103-
checkpointConfig,
104-
server.getCheckpointService().getCheckpointStorage(),
105-
executorService,
106-
nodeEngine.getHazelcastInstance().getMap(IMAP_RUNNING_JOB_STATE)) {
107-
108-
@Override
109-
protected void handleCheckpointError(int pipelineId, boolean neverRestore) {
110-
threadIsInterrupted.complete(Thread.interrupted());
111-
}
112-
};
113-
checkpointManager.reportedPipelineRunning(1, true);
114-
Assertions.assertFalse(threadIsInterrupted.get(1, TimeUnit.MINUTES));
115-
} finally {
116-
executorService.shutdownNow();
117-
}
118-
}
119-
120-
@Test
121-
void testCheckpointContinuesWorkAfterClockDrift()
122-
throws CheckpointStorageException, ExecutionException, InterruptedException,
123-
TimeoutException {
124-
CheckpointConfig checkpointConfig = new CheckpointConfig();
125-
checkpointConfig.setStorage(new CheckpointStorageConfig());
126-
checkpointConfig.setCheckpointTimeout(5000);
127-
checkpointConfig.setCheckpointInterval(5000);
128-
Map<Integer, CheckpointPlan> planMap = new HashMap<>();
129-
planMap.put(
130-
1,
131-
CheckpointPlan.builder()
132-
.pipelineId(1)
133-
.pipelineSubtasks(Collections.singleton(new TaskLocation()))
134-
.build());
135-
ExecutorService executorService = Executors.newCachedThreadPool();
136-
CompletableFuture<Boolean> invokedHandleCheckpointError = new CompletableFuture<>();
137-
Instant now = Instant.now();
138-
Instant startTime = now.minusSeconds(10);
139-
try (MockedStatic<Instant> mockedInstant = Mockito.mockStatic(Instant.class)) {
140-
mockedInstant.when(Instant::now).thenReturn(startTime);
141-
CheckpointManager checkpointManager =
142-
new CheckpointManager(
143-
1L,
144-
false,
145-
nodeEngine,
146-
null,
147-
planMap,
148-
checkpointConfig,
149-
server.getCheckpointService().getCheckpointStorage(),
150-
executorService,
151-
nodeEngine.getHazelcastInstance().getMap(IMAP_RUNNING_JOB_STATE)) {
152-
@Override
153-
protected void handleCheckpointError(int pipelineId, boolean neverRestore) {
154-
invokedHandleCheckpointError.complete(true);
155-
}
156-
};
157-
ReflectionUtils.setField(
158-
checkpointManager.getCheckpointCoordinator(1),
159-
"latestTriggerTimestamp",
160-
new AtomicLong(startTime.toEpochMilli()));
161-
checkpointManager.reportedPipelineRunning(1, true);
162-
Assertions.assertTrue(invokedHandleCheckpointError.get(1, TimeUnit.MINUTES));
163-
} finally {
164-
executorService.shutdownNow();
165-
}
166-
}
16763
}

seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ public void testHAByIMapCheckpointIDCounter() throws CheckpointStorageException
8888
null,
8989
planMap,
9090
new CheckpointConfig(),
91-
server.getCheckpointService().getCheckpointStorage(),
9291
instance.getExecutorService("test"),
9392
nodeEngine.getHazelcastInstance().getMap(IMAP_RUNNING_JOB_STATE));
9493
Assertions.assertTrue(checkpointManager.isCompletedPipeline(1));

0 commit comments

Comments
 (0)