Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ private Set<String> buildWhiteList() {
whiteList.add("Neo4jSourceOptions");
whiteList.add("QdrantSourceOptions");
whiteList.add("SocketSourceOptions");
whiteList.add("OpenMldbSourceOptions");
whiteList.add("PostgresIncrementalSourceOptions");
whiteList.add("SqlServerIncrementalSourceOptions");
whiteList.add("OracleIncrementalSourceOptions");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ public class OpenMldbParameters implements Serializable {
private String zkPath;
private String host;
private int port;
private int sessionTimeout = OpenMldbConfig.SESSION_TIMEOUT.defaultValue();
private int requestTimeout = OpenMldbConfig.REQUEST_TIMEOUT.defaultValue();
private int sessionTimeout = OpenMldbSourceOptions.SESSION_TIMEOUT.defaultValue();
private int requestTimeout = OpenMldbSourceOptions.REQUEST_TIMEOUT.defaultValue();
private Boolean clusterMode;
private String database;
private String sql;
Expand All @@ -41,34 +41,35 @@ private OpenMldbParameters() {

public static OpenMldbParameters buildWithConfig(Config pluginConfig) {
OpenMldbParameters openMldbParameters = new OpenMldbParameters();
openMldbParameters.clusterMode = pluginConfig.getBoolean(OpenMldbConfig.CLUSTER_MODE.key());
openMldbParameters.database = pluginConfig.getString(OpenMldbConfig.DATABASE.key());
openMldbParameters.sql = pluginConfig.getString(OpenMldbConfig.SQL.key());
openMldbParameters.clusterMode =
pluginConfig.getBoolean(OpenMldbSourceOptions.CLUSTER_MODE.key());
openMldbParameters.database = pluginConfig.getString(OpenMldbSourceOptions.DATABASE.key());
openMldbParameters.sql = pluginConfig.getString(OpenMldbSourceOptions.SQL.key());
// set zkHost
if (pluginConfig.hasPath(OpenMldbConfig.ZK_HOST.key())) {
openMldbParameters.zkHost = pluginConfig.getString(OpenMldbConfig.ZK_HOST.key());
if (pluginConfig.hasPath(OpenMldbSourceOptions.ZK_HOST.key())) {
openMldbParameters.zkHost = pluginConfig.getString(OpenMldbSourceOptions.ZK_HOST.key());
}
// set zkPath
if (pluginConfig.hasPath(OpenMldbConfig.ZK_PATH.key())) {
openMldbParameters.zkPath = pluginConfig.getString(OpenMldbConfig.ZK_PATH.key());
if (pluginConfig.hasPath(OpenMldbSourceOptions.ZK_PATH.key())) {
openMldbParameters.zkPath = pluginConfig.getString(OpenMldbSourceOptions.ZK_PATH.key());
}
// set host
if (pluginConfig.hasPath(OpenMldbConfig.HOST.key())) {
openMldbParameters.host = pluginConfig.getString(OpenMldbConfig.HOST.key());
if (pluginConfig.hasPath(OpenMldbSourceOptions.HOST.key())) {
openMldbParameters.host = pluginConfig.getString(OpenMldbSourceOptions.HOST.key());
}
// set port
if (pluginConfig.hasPath(OpenMldbConfig.PORT.key())) {
openMldbParameters.port = pluginConfig.getInt(OpenMldbConfig.PORT.key());
if (pluginConfig.hasPath(OpenMldbSourceOptions.PORT.key())) {
openMldbParameters.port = pluginConfig.getInt(OpenMldbSourceOptions.PORT.key());
}
// set session timeout
if (pluginConfig.hasPath(OpenMldbConfig.SESSION_TIMEOUT.key())) {
if (pluginConfig.hasPath(OpenMldbSourceOptions.SESSION_TIMEOUT.key())) {
openMldbParameters.sessionTimeout =
pluginConfig.getInt(OpenMldbConfig.SESSION_TIMEOUT.key());
pluginConfig.getInt(OpenMldbSourceOptions.SESSION_TIMEOUT.key());
}
// set request timeout
if (pluginConfig.hasPath(OpenMldbConfig.REQUEST_TIMEOUT.key())) {
if (pluginConfig.hasPath(OpenMldbSourceOptions.REQUEST_TIMEOUT.key())) {
openMldbParameters.requestTimeout =
pluginConfig.getInt(OpenMldbConfig.REQUEST_TIMEOUT.key());
pluginConfig.getInt(OpenMldbSourceOptions.REQUEST_TIMEOUT.key());
}
return openMldbParameters;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;

public class OpenMldbConfig {
public class OpenMldbSourceOptions {
private static final int DEFAULT_SESSION_TIMEOUT = 10000;
private static final int DEFAULT_REQUEST_TIMEOUT = 60000;
public static final Option<String> ZK_HOST =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,22 @@

package org.apache.seatunnel.connectors.seatunnel.openmldb.source;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SupportColumnProjection;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import org.apache.seatunnel.connectors.seatunnel.openmldb.config.OpenMldbConfig;
import org.apache.seatunnel.connectors.seatunnel.openmldb.config.OpenMldbParameters;
import org.apache.seatunnel.connectors.seatunnel.openmldb.config.OpenMldbSqlExecutor;
import org.apache.seatunnel.connectors.seatunnel.openmldb.exception.OpenMldbConnectorException;
Expand All @@ -47,75 +41,40 @@
import com._4paradigm.openmldb.sdk.Schema;
import com._4paradigm.openmldb.sdk.SqlException;
import com._4paradigm.openmldb.sdk.impl.SqlClusterExecutor;
import com.google.auto.service.AutoService;

import java.sql.SQLException;
import java.sql.Types;
import java.util.Collections;
import java.util.List;

@AutoService(SeaTunnelSource.class)
public class OpenMldbSource extends AbstractSingleSplitSource<SeaTunnelRow>
implements SupportColumnProjection {
private OpenMldbParameters openMldbParameters;
private final OpenMldbParameters openMldbParameters;
private final CatalogTable catalogTable;
private JobContext jobContext;
private SeaTunnelRowType seaTunnelRowType;

@Override
public String getPluginName() {
return "OpenMldb";
}

@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
CheckResult result =
CheckConfigUtil.checkAllExists(
pluginConfig,
OpenMldbConfig.CLUSTER_MODE.key(),
OpenMldbConfig.SQL.key(),
OpenMldbConfig.DATABASE.key());
if (!result.isSuccess()) {
throw new OpenMldbConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE, result.getMsg()));
}
if (pluginConfig.getBoolean(OpenMldbConfig.CLUSTER_MODE.key())) {
// cluster mode
result =
CheckConfigUtil.checkAllExists(
pluginConfig,
OpenMldbConfig.ZK_HOST.key(),
OpenMldbConfig.ZK_PATH.key());
} else {
// single mode
result =
CheckConfigUtil.checkAllExists(
pluginConfig, OpenMldbConfig.HOST.key(), OpenMldbConfig.PORT.key());
}
if (!result.isSuccess()) {
throw new OpenMldbConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE, result.getMsg()));
}
this.openMldbParameters = OpenMldbParameters.buildWithConfig(pluginConfig);
public OpenMldbSource(OpenMldbParameters openMldbParameters) {
this.openMldbParameters = openMldbParameters;
OpenMldbSqlExecutor.initSdkOption(openMldbParameters);
try {
SqlClusterExecutor sqlExecutor = OpenMldbSqlExecutor.getSqlExecutor();
Schema inputSchema =
sqlExecutor.getInputSchema(
openMldbParameters.getDatabase(), openMldbParameters.getSql());
List<Column> columnList = inputSchema.getColumnList();
this.seaTunnelRowType = convert(columnList);
this.catalogTable = convert(columnList);
} catch (SQLException | SqlException e) {
throw new OpenMldbConnectorException(
CommonErrorCodeDeprecated.TABLE_SCHEMA_GET_FAILED,
"Failed to initialize data schema");
}
}

@Override
public String getPluginName() {
return "OpenMldb";
}

@Override
public Boundedness getBoundedness() {
return JobMode.BATCH.equals(jobContext.getJobMode())
Expand All @@ -124,14 +83,15 @@ public Boundedness getBoundedness() {
}

@Override
public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
return seaTunnelRowType;
public List<CatalogTable> getProducedCatalogTables() {
return Collections.singletonList(catalogTable);
}

@Override
public AbstractSingleSplitReader<SeaTunnelRow> createReader(
SingleSplitReaderContext readerContext) throws Exception {
return new OpenMldbSourceReader(openMldbParameters, seaTunnelRowType, readerContext);
return new OpenMldbSourceReader(
openMldbParameters, catalogTable.getSeaTunnelRowType(), readerContext);
}

@Override
Expand Down Expand Up @@ -166,14 +126,24 @@ private SeaTunnelDataType<?> convertSeaTunnelDataType(int type) {
}
}

private SeaTunnelRowType convert(List<Column> columnList) {
String[] fieldsName = new String[columnList.size()];
SeaTunnelDataType<?>[] fieldsType = new SeaTunnelDataType<?>[columnList.size()];
private CatalogTable convert(List<Column> columnList) {
TableSchema.Builder builder = TableSchema.builder();
for (int i = 0; i < columnList.size(); i++) {
Column column = columnList.get(i);
fieldsName[i] = column.getColumnName();
fieldsType[i] = convertSeaTunnelDataType(column.getSqlType());
builder.column(
PhysicalColumn.of(
column.getColumnName(),
convertSeaTunnelDataType(column.getSqlType()),
(Long) null,
column.isNotNull(),
null,
null));
}
return new SeaTunnelRowType(fieldsName, fieldsType);
return CatalogTable.of(
TableIdentifier.of("OpenMldb", openMldbParameters.getDatabase(), "default"),
builder.build(),
null,
null,
null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,18 @@

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.connectors.seatunnel.openmldb.config.OpenMldbConfig;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.openmldb.config.OpenMldbParameters;
import org.apache.seatunnel.connectors.seatunnel.openmldb.config.OpenMldbSourceOptions;

import com.google.auto.service.AutoService;

import java.io.Serializable;

@AutoService(Factory.class)
public class OpenMldbSourceFactory implements TableSourceFactory {
@Override
Expand All @@ -35,26 +41,34 @@ public String factoryIdentifier() {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
.required(OpenMldbConfig.CLUSTER_MODE)
.required(OpenMldbConfig.SQL)
.required(OpenMldbConfig.DATABASE)
.optional(OpenMldbConfig.SESSION_TIMEOUT)
.optional(OpenMldbConfig.REQUEST_TIMEOUT)
.required(OpenMldbSourceOptions.CLUSTER_MODE)
.required(OpenMldbSourceOptions.SQL)
.required(OpenMldbSourceOptions.DATABASE)
.optional(OpenMldbSourceOptions.SESSION_TIMEOUT)
.optional(OpenMldbSourceOptions.REQUEST_TIMEOUT)
.conditional(
OpenMldbConfig.CLUSTER_MODE,
OpenMldbSourceOptions.CLUSTER_MODE,
false,
OpenMldbConfig.HOST,
OpenMldbConfig.PORT)
OpenMldbSourceOptions.HOST,
OpenMldbSourceOptions.PORT)
.conditional(
OpenMldbConfig.CLUSTER_MODE,
OpenMldbSourceOptions.CLUSTER_MODE,
true,
OpenMldbConfig.ZK_HOST,
OpenMldbConfig.ZK_PATH)
OpenMldbSourceOptions.ZK_HOST,
OpenMldbSourceOptions.ZK_PATH)
.build();
}

@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return OpenMldbSource.class;
}

@Override
public <T, SplitT extends SourceSplit, StateT extends Serializable>
TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
OpenMldbParameters openMldbParameters =
OpenMldbParameters.buildWithConfig(context.getOptions().toConfig());
return () -> (SeaTunnelSource<T, SplitT, StateT>) new OpenMldbSource(openMldbParameters);
}
}
Loading