Skip to content

Commit 83f1097

Browse files
Hisoka-XThomas-HuWei
authored andcommitted
[Improve][Zeta] Fix JobMaster reset app classloader twice (apache#7063)
1 parent bd908d1 commit 83f1097

File tree

1 file changed

+31
-32
lines changed
  • seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master

1 file changed

+31
-32
lines changed

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java

Lines changed: 31 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -222,45 +222,44 @@ public synchronized void init(long initializationTimestamp, boolean restart) thr
222222
nodeEngine.getSerializationService(),
223223
classLoader,
224224
jobImmutableInformation.getLogicalDag());
225-
if (!restart
226-
&& !logicalDag.isStartWithSavePoint()
227-
&& ReadonlyConfig.fromMap(logicalDag.getJobConfig().getEnvOptions())
228-
.get(EnvCommonOptions.SAVEMODE_EXECUTE_LOCATION)
229-
.equals(SaveModeExecuteLocation.CLUSTER)) {
230-
try {
231-
Thread.currentThread().setContextClassLoader(classLoader);
225+
try {
226+
Thread.currentThread().setContextClassLoader(classLoader);
227+
if (!restart
228+
&& !logicalDag.isStartWithSavePoint()
229+
&& ReadonlyConfig.fromMap(logicalDag.getJobConfig().getEnvOptions())
230+
.get(EnvCommonOptions.SAVEMODE_EXECUTE_LOCATION)
231+
.equals(SaveModeExecuteLocation.CLUSTER)) {
232232
logicalDag.getLogicalVertexMap().values().stream()
233233
.map(LogicalVertex::getAction)
234234
.filter(action -> action instanceof SinkAction)
235235
.map(sink -> ((SinkAction<?, ?, ?, ?>) sink).getSink())
236236
.forEach(JobMaster::handleSaveMode);
237-
} finally {
238-
Thread.currentThread().setContextClassLoader(appClassLoader);
239237
}
240-
}
241238

242-
final Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> planTuple =
243-
PlanUtils.fromLogicalDAG(
244-
logicalDag,
245-
nodeEngine,
246-
jobImmutableInformation,
247-
initializationTimestamp,
248-
executorService,
249-
flakeIdGenerator,
250-
runningJobStateIMap,
251-
runningJobStateTimestampsIMap,
252-
engineConfig.getQueueType(),
253-
engineConfig);
254-
seaTunnelServer
255-
.getClassLoaderService()
256-
.releaseClassLoader(
257-
jobImmutableInformation.getJobId(),
258-
jobImmutableInformation.getPluginJarsUrls());
259-
// revert to app class loader, it may be changed by PlanUtils.fromLogicalDAG
260-
Thread.currentThread().setContextClassLoader(appClassLoader);
261-
this.physicalPlan = planTuple.f0();
262-
this.physicalPlan.setJobMaster(this);
263-
this.checkpointPlanMap = planTuple.f1();
239+
final Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> planTuple =
240+
PlanUtils.fromLogicalDAG(
241+
logicalDag,
242+
nodeEngine,
243+
jobImmutableInformation,
244+
initializationTimestamp,
245+
executorService,
246+
flakeIdGenerator,
247+
runningJobStateIMap,
248+
runningJobStateTimestampsIMap,
249+
engineConfig.getQueueType(),
250+
engineConfig);
251+
this.physicalPlan = planTuple.f0();
252+
this.physicalPlan.setJobMaster(this);
253+
this.checkpointPlanMap = planTuple.f1();
254+
} finally {
255+
// revert to app class loader, it may be changed by PlanUtils.fromLogicalDAG
256+
Thread.currentThread().setContextClassLoader(appClassLoader);
257+
seaTunnelServer
258+
.getClassLoaderService()
259+
.releaseClassLoader(
260+
jobImmutableInformation.getJobId(),
261+
jobImmutableInformation.getPluginJarsUrls());
262+
}
264263
Exception initException = null;
265264
try {
266265
this.initCheckPointManager(restart);

0 commit comments

Comments
 (0)