Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion .github/workflows/backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1092,4 +1092,26 @@ jobs:
run: |
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-doris-e2e -am -Pci
env:
MAVEN_OPTS: -Xmx4096m
MAVEN_OPTS: -Xmx4096m


oracle-cdc-connector-it:
needs: [ changes, sanity-check ]
if: needs.changes.outputs.api == 'true' || contains(needs.changes.outputs.it-modules, 'connector-cdc-oracle-e2e')
runs-on: ${{ matrix.os }}
strategy:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
timeout-minutes: 90
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
uses: actions/setup-java@v3
with:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: run oracle cdc connector integration test
run: |
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-cdc-oracle-e2e -am -Pci
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>oracle-xe</artifactId>
<version>1.15.3</version>
<version>${testcontainer.version}</version>
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.OracleContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerLoggerFactory;
Expand Down Expand Up @@ -67,7 +66,7 @@
"Currently SPARK do not support cdc,Flink is prone to time out, temporarily disable")
public class OracleCDCIT extends TestSuiteBase implements TestResource {

private static final String ORACLE_IMAGE = "jark/oracle-xe-11g-r2-cdc:0.1";
private static final String ORACLE_IMAGE = "goodboy008/oracle-19.3.0-ee:non-cdb";

private static final String HOST = "oracle-host";

Expand All @@ -77,15 +76,6 @@ public class OracleCDCIT extends TestSuiteBase implements TestResource {
System.setProperty("oracle.jdbc.timezoneAsRegion", "false");
}

public static final OracleContainer ORACLE_CONTAINER =
new OracleContainer(ORACLE_IMAGE)
.withNetwork(NETWORK)
.withNetworkAliases(HOST)
.withExposedPorts(ORACLE_PORT)
.withLogConsumer(
new Slf4jLogConsumer(
DockerLoggerFactory.getLogger("oracle-docker-image")));

public static final String CONNECTOR_USER = "dbzuser";

public static final String CONNECTOR_PWD = "dbz";
Expand All @@ -106,6 +96,18 @@ public class OracleCDCIT extends TestSuiteBase implements TestResource {

private static final String SOURCE_SQL_TEMPLATE = "select * from %s.%s ORDER BY ID";

public static final OracleContainer ORACLE_CONTAINER =
new OracleContainer(ORACLE_IMAGE)
.withUsername(CONNECTOR_USER)
.withPassword(CONNECTOR_PWD)
.withDatabaseName("ORCLCDB")
.withNetwork(NETWORK)
.withNetworkAliases(HOST)
.withExposedPorts(ORACLE_PORT)
.withLogConsumer(
new Slf4jLogConsumer(
DockerLoggerFactory.getLogger("oracle-docker-image")));

private String driverUrl() {
return "https://repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc8/12.2.0.1/ojdbc8-12.2.0.1.jar";
}
Expand Down Expand Up @@ -548,8 +550,7 @@ private void executeSql(String sql) {

public static Connection getJdbcConnection(OracleContainer oracleContainer)
throws SQLException {
return DriverManager.getConnection(
oracleContainer.getJdbcUrl(), CONNECTOR_USER, CONNECTOR_PWD);
return DriverManager.getConnection(oracleContainer.getJdbcUrl(), SCHEMA_USER, SCHEMA_PWD);
}

public static Connection getJdbcConnection(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cdc.oracle;

import org.apache.commons.lang3.StringUtils;

import org.jetbrains.annotations.NotNull;
import org.testcontainers.containers.JdbcDatabaseContainer;
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
import org.testcontainers.utility.DockerImageName;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Future;

/** Copy from testcontainers. */
public class OracleContainer extends JdbcDatabaseContainer<OracleContainer> {

public static final String NAME = "oracle";

private static final DockerImageName DEFAULT_IMAGE_NAME =
DockerImageName.parse("goodboy008/oracle-19.3.0-ee");

static final String DEFAULT_TAG = "latest";

static final String IMAGE = DEFAULT_IMAGE_NAME.getUnversionedPart();

static final int ORACLE_PORT = 1521;

private static final int APEX_HTTP_PORT = 8080;

private static final int DEFAULT_STARTUP_TIMEOUT_SECONDS = 240;

private static final int DEFAULT_CONNECT_TIMEOUT_SECONDS = 120;

// Container defaults
static final String DEFAULT_DATABASE_NAME = "xepdb1";

static final String DEFAULT_SID = "ORCLCDB";

static final String DEFAULT_SYSTEM_USER = "system";

static final String DEFAULT_SYS_USER = "sys";

// Test container defaults
static final String APP_USER = "test";

static final String APP_USER_PASSWORD = "system";

// Restricted user and database names
private static final List<String> ORACLE_SYSTEM_USERS =
Arrays.asList(DEFAULT_SYSTEM_USER, DEFAULT_SYS_USER);

private String databaseName = DEFAULT_DATABASE_NAME;

private String username = APP_USER;

private String password = APP_USER_PASSWORD;

private boolean usingSid = false;

/** @deprecated use @link OracleContainer(DockerImageName) instead */
@Deprecated
public OracleContainer() {
this(DEFAULT_IMAGE_NAME.withTag(DEFAULT_TAG));
}

public OracleContainer(String dockerImageName) {
this(DockerImageName.parse(dockerImageName));
}

public OracleContainer(final DockerImageName dockerImageName) {
super(dockerImageName);
dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME);
preconfigure();
}

public OracleContainer(Future<String> dockerImageName) {
super(dockerImageName);
preconfigure();
}

private void preconfigure() {
this.waitStrategy =
new LogMessageWaitStrategy()
.withRegEx(".*DATABASE IS READY TO USE!.*\\s")
.withTimes(1)
.withStartupTimeout(
Duration.of(DEFAULT_STARTUP_TIMEOUT_SECONDS, ChronoUnit.SECONDS));

withConnectTimeoutSeconds(DEFAULT_CONNECT_TIMEOUT_SECONDS);
addExposedPorts(ORACLE_PORT, APEX_HTTP_PORT);
}

@Override
protected void waitUntilContainerStarted() {
getWaitStrategy().waitUntilReady(this);
}

@NotNull @Override
public Set<Integer> getLivenessCheckPortNumbers() {
return Collections.singleton(getMappedPort(ORACLE_PORT));
}

@Override
public String getDriverClassName() {
return "oracle.jdbc.driver.OracleDriver";
}

@Override
public String getJdbcUrl() {
return isUsingSid()
? "jdbc:oracle:thin:" + "@" + getHost() + ":" + getOraclePort() + ":" + getSid()
: "jdbc:oracle:thin:"
+ "@"
+ getHost()
+ ":"
+ getOraclePort()
+ "/"
+ getDatabaseName();
}

@Override
public String getUsername() {
// An application user is tied to the database, and therefore not authenticated to connect
// to SID.
return isUsingSid() ? DEFAULT_SYSTEM_USER : username;
}

@Override
public String getPassword() {
return password;
}

@Override
public String getDatabaseName() {
return databaseName;
}

protected boolean isUsingSid() {
return usingSid;
}

@Override
public OracleContainer withUsername(String username) {
if (StringUtils.isEmpty(username)) {
throw new IllegalArgumentException("Username cannot be null or empty");
}
if (ORACLE_SYSTEM_USERS.contains(username.toLowerCase())) {
throw new IllegalArgumentException("Username cannot be one of " + ORACLE_SYSTEM_USERS);
}
this.username = username;
return self();
}

@Override
public OracleContainer withPassword(String password) {
if (StringUtils.isEmpty(password)) {
throw new IllegalArgumentException("Password cannot be null or empty");
}
this.password = password;
return self();
}

@Override
public OracleContainer withDatabaseName(String databaseName) {
if (StringUtils.isEmpty(databaseName)) {
throw new IllegalArgumentException("Database name cannot be null or empty");
}

if (DEFAULT_DATABASE_NAME.equals(databaseName.toLowerCase())) {
throw new IllegalArgumentException(
"Database name cannot be set to " + DEFAULT_DATABASE_NAME);
}

this.databaseName = databaseName;
return self();
}

public OracleContainer usingSid() {
this.usingSid = true;
return self();
}

@Override
public OracleContainer withUrlParam(String paramName, String paramValue) {
throw new UnsupportedOperationException("The Oracle Database driver does not support this");
}

@SuppressWarnings("SameReturnValue")
public String getSid() {
return DEFAULT_SID;
}

public Integer getOraclePort() {
return getMappedPort(ORACLE_PORT);
}

@SuppressWarnings("unused")
public Integer getWebPort() {
return getMappedPort(APEX_HTTP_PORT);
}

@Override
public String getTestQueryString() {
return "SELECT 1 FROM DUAL";
}

@Override
protected void configure() {
withEnv("ORACLE_PASSWORD", password);

// Only set ORACLE_DATABASE if different than the default.
if (databaseName != DEFAULT_DATABASE_NAME) {
withEnv("ORACLE_DATABASE", databaseName);
}

withEnv("APP_USER", username);
withEnv("APP_USER_PASSWORD", password);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ create table DEBEZIUM.FULL_TYPES (
primary key (ID)
);

ALTER TABLE DEBEZIUM.FULL_TYPES ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

create table DEBEZIUM.FULL_TYPES2 (
ID NUMBER(9) not null,
VAL_VARCHAR VARCHAR2(1000),
Expand Down Expand Up @@ -102,6 +104,8 @@ create table DEBEZIUM.FULL_TYPES2 (
primary key (ID)
);

ALTER TABLE DEBEZIUM.FULL_TYPES2 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

create table DEBEZIUM.FULL_TYPES_NO_PRIMARY_KEY (
ID NUMBER(9) not null,
VAL_VARCHAR VARCHAR2(1000),
Expand Down Expand Up @@ -142,6 +146,8 @@ create table DEBEZIUM.FULL_TYPES_NO_PRIMARY_KEY (
VAL_TSLTZ TIMESTAMP(6) WITH LOCAL TIME ZONE
);

ALTER TABLE DEBEZIUM.FULL_TYPES_NO_PRIMARY_KEY ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

INSERT INTO DEBEZIUM.FULL_TYPES VALUES (
1, 'vc2', 'vc2', 'nvc2', 'c', 'nc',
1.1, 2.22, 3.33, 8.888, 4.4444, 5.555, 6.66, 1234.567891, 1234.567891, 77.323,
Expand Down
Loading