Skip to content

Commit e9e51dd

Browse files
Hisoka-Xchaorongzhi
authored andcommitted
[Fix][Zeta] Fix release slot resource twice (apache#7236)
1 parent 4bc630a commit e9e51dd

File tree

3 files changed

+31
-6
lines changed

3 files changed

+31
-6
lines changed

seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,15 @@ public void testJobRetryTimes() throws IOException, InterruptedException {
6363
"Restore time 3, pipeline Job stream_fake_to_inmemory_with_error.conf"));
6464
}
6565

66+
@Test
67+
public void testNoDuplicatedReleaseSlot() throws IOException, InterruptedException {
68+
Container.ExecResult execResult =
69+
executeJob(server, "/savemode/fake_to_inmemory_savemode.conf");
70+
Assertions.assertEquals(0, execResult.getExitCode());
71+
Assertions.assertFalse(
72+
server.getLogs().contains("wrong target release operation with job"));
73+
}
74+
6675
@Test
6776
public void testMultiTableSinkFailedWithThrowable() throws IOException, InterruptedException {
6877
Container.ExecResult execResult =

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@
7070
import org.apache.seatunnel.engine.server.task.operation.GetTaskGroupMetricsOperation;
7171
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
7272

73-
import com.google.common.collect.Lists;
7473
import com.hazelcast.cluster.Address;
7574
import com.hazelcast.core.HazelcastInstanceNotActiveException;
7675
import com.hazelcast.flakeidgen.FlakeIdGenerator;
@@ -92,6 +91,8 @@
9291
import java.util.Map;
9392
import java.util.Optional;
9493
import java.util.concurrent.CompletableFuture;
94+
import java.util.concurrent.ConcurrentHashMap;
95+
import java.util.concurrent.CopyOnWriteArrayList;
9596
import java.util.concurrent.ExecutorService;
9697
import java.util.stream.Collectors;
9798

@@ -146,6 +147,8 @@ public class JobMaster {
146147

147148
private Map<Integer, CheckpointPlan> checkpointPlanMap;
148149

150+
private final Map<Integer, List<SlotProfile>> releasedSlotWhenTaskGroupFinished;
151+
149152
private final IMap<Long, JobInfo> runningJobInfoIMap;
150153

151154
private final IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> metricsImap;
@@ -190,6 +193,7 @@ public JobMaster(
190193
this.engineConfig = engineConfig;
191194
this.metricsImap = metricsImap;
192195
this.seaTunnelServer = seaTunnelServer;
196+
this.releasedSlotWhenTaskGroupFinished = new ConcurrentHashMap<>();
193197
}
194198

195199
public synchronized void init(long initializationTimestamp, boolean restart) throws Exception {
@@ -464,13 +468,17 @@ public void releaseTaskGroupResource(
464468
jobImmutableInformation.getJobId(),
465469
Collections.singletonList(taskGroupSlotProfile))
466470
.join();
467-
471+
releasedSlotWhenTaskGroupFinished
472+
.computeIfAbsent(
473+
pipelineLocation.getPipelineId(),
474+
k -> new CopyOnWriteArrayList<>())
475+
.add(taskGroupSlotProfile);
468476
return null;
469477
},
470478
new RetryUtils.RetryMaterial(
471479
Constant.OPERATION_RETRY_TIME,
472480
true,
473-
exception -> ExceptionUtil.isOperationNeedRetryException(exception),
481+
ExceptionUtil::isOperationNeedRetryException,
474482
Constant.OPERATION_RETRY_SLEEP));
475483
} catch (Exception e) {
476484
LOGGER.warning(
@@ -487,6 +495,11 @@ public void releasePipelineResource(SubPlan subPlan) {
487495
if (taskGroupLocationSlotProfileMap == null) {
488496
return;
489497
}
498+
List<SlotProfile> alreadyReleased = new ArrayList<>();
499+
if (releasedSlotWhenTaskGroupFinished.containsKey(subPlan.getPipelineId())) {
500+
alreadyReleased.addAll(
501+
releasedSlotWhenTaskGroupFinished.get(subPlan.getPipelineId()));
502+
}
490503

491504
RetryUtils.retryWithException(
492505
() -> {
@@ -497,10 +510,12 @@ public void releasePipelineResource(SubPlan subPlan) {
497510
resourceManager
498511
.releaseResources(
499512
jobImmutableInformation.getJobId(),
500-
Lists.newArrayList(
501-
taskGroupLocationSlotProfileMap.values()))
513+
taskGroupLocationSlotProfileMap.values().stream()
514+
.filter(p -> !alreadyReleased.contains(p))
515+
.collect(Collectors.toList()))
502516
.join();
503517
ownedSlotProfilesIMap.remove(subPlan.getPipelineLocation());
518+
releasedSlotWhenTaskGroupFinished.remove(subPlan.getPipelineId());
504519
return null;
505520
},
506521
new RetryUtils.RetryMaterial(

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/CheckTaskGroupIsExecutingOperation.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ public void run() {
4646
SeaTunnelServer server = getService();
4747
try {
4848
response =
49-
server.getTaskExecutionService().getExecutionContext(taskGroupLocation) != null;
49+
server.getTaskExecutionService().getActiveExecutionContext(taskGroupLocation)
50+
!= null;
5051
} catch (TaskGroupContextNotFoundException e) {
5152
response = false;
5253
}

0 commit comments

Comments
 (0)