Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion docs/en/concept/JobEnvConfig.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# JobEnvConfig
# Job Env Config

This document describes env configuration information, the common parameters can be used in all engines. In order to better distinguish between engine parameters, the additional parameters of other engine need to carry a prefix.
In flink engine, we use `flink.` as the prefix. In the spark engine, we do not use any prefixes to modify parameters, because the official spark parameters themselves start with `spark.`
Expand Down Expand Up @@ -29,6 +29,10 @@ In `STREAMING` mode, checkpoints is required, if you do not set it, it will be o

This parameter configures the parallelism of source and sink.

### job.retry.times

Used to control the default retry times when a job fails. The default value is 3, and it only works in the Zeta engine.

### shade.identifier

Specify the method of encryption, if you didn't have the requirement for encrypting or decrypting config files, this option can be ignored.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ public interface EnvCommonOptions {
.defaultValue(JobMode.BATCH)
.withDescription("The job mode of this job, support Batch and Stream");

Option<Integer> JOB_RETRY_TIMES =
Options.key("job.retry.times")
.intType()
.defaultValue(3)
.withDescription("The retry times of this job");

Option<Long> CHECKPOINT_INTERVAL =
Options.key("checkpoint.interval")
.longType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.seatunnel.e2e.common.container.seatunnel;

import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.e2e.common.container.AbstractTestContainer;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
import org.apache.seatunnel.e2e.common.container.TestContainer;
Expand Down Expand Up @@ -45,7 +46,9 @@
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -75,7 +78,11 @@ public class SeaTunnelContainer extends AbstractTestContainer {

@Override
public void startUp() throws Exception {
server =
server = createSeaTunnelServer();
}

private GenericContainer<?> createSeaTunnelServer() throws IOException, InterruptedException {
GenericContainer<?> server =
new GenericContainer<>(getDockerImage())
.withNetwork(NETWORK)
.withEnv("TZ", "UTC")
Expand Down Expand Up @@ -106,6 +113,75 @@ public void startUp() throws Exception {
executeExtraCommands(server);

server.start();
return server;
}

protected GenericContainer<?> createSeaTunnelContainerWithFakeSourceAndInMemorySink(
String configFilePath) throws IOException, InterruptedException {
GenericContainer<?> server =
new GenericContainer<>(getDockerImage())
.withNetwork(NETWORK)
.withEnv("TZ", "UTC")
.withCommand(
ContainerUtil.adaptPathForWin(
Paths.get(SEATUNNEL_HOME, "bin", SERVER_SHELL).toString()))
.withNetworkAliases("server")
.withExposedPorts()
.withLogConsumer(
new Slf4jLogConsumer(
DockerLoggerFactory.getLogger(
"seatunnel-engine:" + JDK_DOCKER_IMAGE)))
.waitingFor(Wait.forListeningPort());
copySeaTunnelStarterToContainer(server);
server.setPortBindings(Collections.singletonList("5801:5801"));
server.setExposedPorts(Collections.singletonList(5801));

server.withCopyFileToContainer(
MountableFile.forHostPath(
PROJECT_ROOT_PATH
+ "/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/"),
Paths.get(SEATUNNEL_HOME, "config").toString());

server.withCopyFileToContainer(
MountableFile.forHostPath(configFilePath),
Paths.get(SEATUNNEL_HOME, "config", "seatunnel.yaml").toString());

server.withCopyFileToContainer(
MountableFile.forHostPath(
PROJECT_ROOT_PATH
+ "/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/target/seatunnel-hadoop3-3.1.4-uber.jar"),
Paths.get(SEATUNNEL_HOME, "lib/seatunnel-hadoop3-3.1.4-uber.jar").toString());

server.start();
// execute extra commands
executeExtraCommands(server);

File module = new File(PROJECT_ROOT_PATH + File.separator + getConnectorModulePath());
List<File> connectorFiles =
ContainerUtil.getConnectorFiles(
module, Collections.singleton("connector-fake"), getConnectorNamePrefix());
URL url =
FileUtils.searchJarFiles(
Paths.get(
PROJECT_ROOT_PATH
+ File.separator
+ "seatunnel-e2e/seatunnel-e2e-common/target"))
.stream()
.filter(jar -> jar.toString().endsWith("-tests.jar"))
.findFirst()
.get();
connectorFiles.add(new File(url.getFile()));
connectorFiles.forEach(
jar ->
server.copyFileToContainer(
MountableFile.forHostPath(jar.getAbsolutePath()),
Paths.get(SEATUNNEL_HOME, "connectors", jar.getName()).toString()));
server.copyFileToContainer(
MountableFile.forHostPath(
PROJECT_ROOT_PATH
+ "/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/fake-and-inmemory/plugin-mapping.properties"),
Paths.get(SEATUNNEL_HOME, "connectors", "plugin-mapping.properties").toString());
return server;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,87 +17,41 @@

package org.apache.seatunnel.engine.e2e;

import org.apache.seatunnel.e2e.common.util.ContainerUtil;

import org.apache.commons.lang3.StringUtils;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.images.PullPolicy;
import org.testcontainers.utility.DockerLoggerFactory;
import org.testcontainers.utility.MountableFile;

import java.io.IOException;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.stream.Collectors;

import static org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH;

public class JobClientJobProxyIT extends SeaTunnelContainer {
private static final String JDK_DOCKER_IMAGE = "openjdk:8";
private static final String SERVER_SHELL = "seatunnel-cluster.sh";

@Override
@BeforeAll
public void startUp() throws Exception {
// use seatunnel_fixed_slot_num.yaml replace seatunnel.yaml in container
this.server =
new GenericContainer<>(getDockerImage())
.withNetwork(NETWORK)
.withCommand(
ContainerUtil.adaptPathForWin(
Paths.get(SEATUNNEL_HOME, "bin", SERVER_SHELL).toString()))
.withNetworkAliases("server")
.withImagePullPolicy(PullPolicy.alwaysPull())
.withExposedPorts()
.withLogConsumer(
new Slf4jLogConsumer(
DockerLoggerFactory.getLogger(
"seatunnel-engine:" + JDK_DOCKER_IMAGE)))
.waitingFor(Wait.forListeningPort());
copySeaTunnelStarterToContainer(server);
server.setExposedPorts(Arrays.asList(5801));
server.setPortBindings(Collections.singletonList("5801:5801"));
server.withCopyFileToContainer(
MountableFile.forHostPath(
createSeaTunnelContainerWithFakeSourceAndInMemorySink(
PROJECT_ROOT_PATH
+ "/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/"),
Paths.get(SEATUNNEL_HOME, "config").toString());
+ "/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml");
}

// use seatunnel_fixed_slot_num.yaml replace seatunnel.yaml in container
server.withCopyFileToContainer(
MountableFile.forHostPath(
PROJECT_ROOT_PATH
+ "/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml"),
Paths.get(SEATUNNEL_HOME, "config/seatunnel.yaml").toString());
@Test
public void testJobRetryTimes() throws IOException, InterruptedException {
Container.ExecResult execResult =
executeJob(server, "/retry-times/stream_fake_to_inmemory_with_error_retry_1.conf");
Assertions.assertNotEquals(0, execResult.getExitCode());
Assertions.assertTrue(server.getLogs().contains("Restore time 1, pipeline"));
Assertions.assertFalse(server.getLogs().contains("Restore time 3, pipeline"));

server.withCopyFileToContainer(
MountableFile.forHostPath(
PROJECT_ROOT_PATH
+ "/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/target/seatunnel-hadoop3-3.1.4-uber.jar"),
Paths.get(SEATUNNEL_HOME, "lib/seatunnel-hadoop3-3.1.4-uber.jar").toString());
LOG.info(
"find images: "
+ DockerClientFactory.lazyClient().listImagesCmd().exec().stream()
.map(
image -> {
if (image.getRepoTags() != null) {
return image.getRepoTags()[0];
} else {
return image.getRepoDigests()[0];
}
})
.collect(Collectors.joining(",")));
server.start();
// execute extra commands
executeExtraCommands(server);
Container.ExecResult execResult2 =
executeJob(server, "/retry-times/stream_fake_to_inmemory_with_error.conf");
Assertions.assertNotEquals(0, execResult2.getExitCode());
Assertions.assertTrue(server.getLogs().contains("Restore time 3, pipeline"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.seatunnel.engine.e2e.classloader;

import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.e2e.common.util.ContainerUtil;
import org.apache.seatunnel.engine.e2e.SeaTunnelContainer;
import org.apache.seatunnel.engine.server.rest.RestConstant;
Expand All @@ -28,20 +27,12 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerLoggerFactory;
import org.testcontainers.utility.MountableFile;

import io.restassured.response.Response;

import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -189,71 +180,10 @@ private boolean containsDaemonThread() throws IOException, InterruptedException
@BeforeEach
public void startUp() throws Exception {
server =
new GenericContainer<>(getDockerImage())
.withNetwork(NETWORK)
.withEnv("TZ", "UTC")
.withCommand(
ContainerUtil.adaptPathForWin(
Paths.get(SEATUNNEL_HOME, "bin", SERVER_SHELL).toString()))
.withNetworkAliases("server")
.withExposedPorts()
.withLogConsumer(
new Slf4jLogConsumer(
DockerLoggerFactory.getLogger(
"seatunnel-engine:" + JDK_DOCKER_IMAGE)))
.waitingFor(Wait.forListeningPort());
copySeaTunnelStarterToContainer(server);
server.setExposedPorts(Collections.singletonList(5801));

server.withCopyFileToContainer(
MountableFile.forHostPath(
PROJECT_ROOT_PATH
+ "/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/"),
Paths.get(SEATUNNEL_HOME, "config").toString());

server.withCopyFileToContainer(
MountableFile.forHostPath(
createSeaTunnelContainerWithFakeSourceAndInMemorySink(
PROJECT_ROOT_PATH
+ "/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/classloader/"
+ seatunnelConfigFileName()),
Paths.get(SEATUNNEL_HOME, "config", "seatunnel.yaml").toString());

server.withCopyFileToContainer(
MountableFile.forHostPath(
PROJECT_ROOT_PATH
+ "/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/target/seatunnel-hadoop3-3.1.4-uber.jar"),
Paths.get(SEATUNNEL_HOME, "lib/seatunnel-hadoop3-3.1.4-uber.jar").toString());

server.start();
// execute extra commands
executeExtraCommands(server);

File module = new File(PROJECT_ROOT_PATH + File.separator + getConnectorModulePath());
List<File> connectorFiles =
ContainerUtil.getConnectorFiles(
module, Collections.singleton("connector-fake"), getConnectorNamePrefix());
URL url =
FileUtils.searchJarFiles(
Paths.get(
PROJECT_ROOT_PATH
+ File.separator
+ "seatunnel-e2e/seatunnel-e2e-common/target"))
.stream()
.filter(jar -> jar.toString().endsWith("-tests.jar"))
.findFirst()
.get();
connectorFiles.add(new File(url.getFile()));
connectorFiles.forEach(
jar ->
server.copyFileToContainer(
MountableFile.forHostPath(jar.getAbsolutePath()),
Paths.get(SEATUNNEL_HOME, "connectors", jar.getName()).toString()));

server.copyFileToContainer(
MountableFile.forHostPath(
PROJECT_ROOT_PATH
+ "/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/classloader/plugin-mapping.properties"),
Paths.get(SEATUNNEL_HOME, "connectors", "plugin-mapping.properties").toString());
+ seatunnelConfigFileName());
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@
# SeaTunnel Connector-V2

seatunnel.source.FakeSource = connector-fake
seatunnel.sink.Console = connector-console
seatunnel.sink.InMemory = seatunnel-e2e-common
Loading