Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ public interface SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT>
* @param seaTunnelRowType The row type info of sink.
*/
@Deprecated
void setTypeInfo(SeaTunnelRowType seaTunnelRowType);
default void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
throw new UnsupportedOperationException("setTypeInfo method is not supported");
}

/**
* Get the data type of the records consumed by this sink.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public interface TableSinkFactory<IN, StateT, CommitInfoT, AggregatedCommitInfoT
* We will never use this method now. So gave a default implement and return null.
*
* @param context TableFactoryContext
* @return
* @return return the sink created by this factory
*/
default TableSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> createSink(
TableSinkFactoryContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.table.catalog.CatalogOptions;
Expand All @@ -39,7 +38,6 @@

import org.apache.commons.collections4.CollectionUtils;

import com.google.auto.service.AutoService;
import com.google.common.base.Throwables;

import java.util.ArrayList;
Expand All @@ -50,21 +48,15 @@
import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.ROW_RULES;
import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.RULES;

@AutoService(SeaTunnelSink.class)
public class AssertSink extends AbstractSimpleSink<SeaTunnelRow, Void>
implements SupportMultiTableSink {
private SeaTunnelRowType seaTunnelRowType;
private CatalogTable catalogTable;
private List<AssertFieldRule> assertFieldRules;
private List<AssertFieldRule.AssertRule> assertRowRules;
private AssertTableRule assertTableRule;

private final AssertTableRule assertTableRule;
private AssertCatalogTableRule assertCatalogTableRule;

public AssertSink() {}

public AssertSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) {
this.catalogTable = catalogTable;
this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
if (!pluginConfig.getOptional(RULES).isPresent()) {
Throwables.propagateIfPossible(new ConfigException.Missing(RULES.key()));
Expand Down Expand Up @@ -105,11 +97,6 @@ public AssertSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) {
}
}

@Override
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
this.seaTunnelRowType = seaTunnelRowType;
}

@Override
public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
return seaTunnelRowType;
Expand All @@ -121,40 +108,6 @@ public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context co
seaTunnelRowType, assertFieldRules, assertRowRules, assertTableRule);
}

@Override
public void prepare(Config pluginConfig) {
if (!pluginConfig.hasPath(RULES.key())) {
Throwables.propagateIfPossible(new ConfigException.Missing(RULES.key()));
}
Config ruleConfig = pluginConfig.getConfig(RULES.key());
List<? extends Config> rowConfigList = null;
List<? extends Config> configList = null;
if (ruleConfig.hasPath(ROW_RULES)) {
rowConfigList = ruleConfig.getConfigList(ROW_RULES);
assertRowRules = new AssertRuleParser().parseRowRules(rowConfigList);
}
if (ruleConfig.hasPath(FIELD_RULES)) {
configList = ruleConfig.getConfigList(FIELD_RULES);
assertFieldRules = new AssertRuleParser().parseRules(configList);
}

if (ruleConfig.hasPath(CatalogOptions.TABLE_NAMES.key())) {
assertTableRule =
new AssertTableRule(ruleConfig.getStringList(CatalogOptions.TABLE_NAMES.key()));
} else {
assertTableRule = new AssertTableRule(new ArrayList<>());
}

if (CollectionUtils.isEmpty(configList)
&& CollectionUtils.isEmpty(rowConfigList)
&& assertCatalogTableRule == null
&& assertTableRule.getTableNames().isEmpty()) {
Throwables.propagateIfPossible(
new ConfigException.BadValue(
RULES.key(), "Assert rule config is empty, please add rule config."));
}
}

@Override
public String getPluginName() {
return "Assert";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@

package org.apache.seatunnel.connectors.seatunnel.common.multitablesink;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.DataSaveMode;
Expand All @@ -33,10 +30,6 @@
import org.apache.seatunnel.api.table.factory.MultiTableFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;

import com.google.auto.service.AutoService;
import lombok.NoArgsConstructor;

import java.io.IOException;
import java.util.HashMap;
Expand All @@ -48,8 +41,6 @@

import static org.apache.seatunnel.api.sink.DataSaveMode.KEEP_SCHEMA_AND_DATA;

@AutoService(SeaTunnelSink.class)
@NoArgsConstructor
public class MultiTableSink
implements SeaTunnelSink<
SeaTunnelRow,
Expand All @@ -58,8 +49,8 @@ public class MultiTableSink
MultiTableAggregatedCommitInfo>,
SupportDataSaveMode {

private Map<String, SeaTunnelSink> sinks;
private int replicaNum;
private final Map<String, SeaTunnelSink> sinks;
private final int replicaNum;

public MultiTableSink(MultiTableFactoryContext context) {
this.sinks = context.getSinks();
Expand All @@ -71,17 +62,6 @@ public String getPluginName() {
return "MultiTableSink";
}

@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
throw new UnsupportedOperationException(
"Please use MultiTableSinkFactory to create MultiTableSink");
}

@Override
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
throw new UnsupportedOperationException("MultiTableSink only support CatalogTable");
}

@Override
public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
throw new UnsupportedOperationException("MultiTableSink only support CatalogTable");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@

package org.apache.seatunnel.connectors.seatunnel.console.sink;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
Expand All @@ -29,31 +26,21 @@
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;

import com.google.auto.service.AutoService;
import lombok.NoArgsConstructor;

import static org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkFactory.LOG_PRINT_DATA;
import static org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkFactory.LOG_PRINT_DELAY;

@NoArgsConstructor
@AutoService(SeaTunnelSink.class)
public class ConsoleSink extends AbstractSimpleSink<SeaTunnelRow, Void>
implements SupportMultiTableSink {
private SeaTunnelRowType seaTunnelRowType;
private boolean isPrintData = true;
private int delayMs = 0;
private final SeaTunnelRowType seaTunnelRowType;
private final boolean isPrintData;
private final int delayMs;

public ConsoleSink(SeaTunnelRowType seaTunnelRowType, ReadonlyConfig options) {
this.seaTunnelRowType = seaTunnelRowType;
this.isPrintData = options.get(LOG_PRINT_DATA);
this.delayMs = options.get(LOG_PRINT_DELAY);
}

@Override
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
this.seaTunnelRowType = seaTunnelRowType;
}

@Override
public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
return this.seaTunnelRowType;
Expand All @@ -68,10 +55,4 @@ public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context co
public String getPluginName() {
return "Console";
}

@Override
public void prepare(Config pluginConfig) {
this.isPrintData = ReadonlyConfig.fromConfig(pluginConfig).get(LOG_PRINT_DATA);
this.delayMs = ReadonlyConfig.fromConfig(pluginConfig).get(LOG_PRINT_DELAY);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@

package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
Expand All @@ -42,7 +39,6 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState;
Expand All @@ -51,34 +47,31 @@

import org.apache.commons.lang3.StringUtils;

import com.google.auto.service.AutoService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;

@AutoService(SeaTunnelSink.class)
public class JdbcSink
implements SeaTunnelSink<SeaTunnelRow, JdbcSinkState, XidInfo, JdbcAggregatedCommitInfo>,
SupportDataSaveMode,
SupportMultiTableSink {

private SeaTunnelRowType seaTunnelRowType;
private final SeaTunnelRowType seaTunnelRowType;

private JobContext jobContext;

private JdbcSinkConfig jdbcSinkConfig;
private final JdbcSinkConfig jdbcSinkConfig;

private JdbcDialect dialect;
private final JdbcDialect dialect;

private ReadonlyConfig config;
private final ReadonlyConfig config;

private DataSaveMode dataSaveMode;
private final DataSaveMode dataSaveMode;

private CatalogTable catalogTable;
private final CatalogTable catalogTable;

public JdbcSink(
ReadonlyConfig config,
Expand All @@ -94,31 +87,11 @@ public JdbcSink(
this.seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType();
}

public JdbcSink() {}

@Override
public String getPluginName() {
return "Jdbc";
}

@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
this.config = ReadonlyConfig.fromConfig(pluginConfig);
this.jdbcSinkConfig = JdbcSinkConfig.of(config);
this.dialect =
JdbcDialectLoader.load(
jdbcSinkConfig.getJdbcConnectionConfig().getUrl(),
jdbcSinkConfig.getJdbcConnectionConfig().getCompatibleMode(),
config.get(JdbcOptions.FIELD_IDE) == null
? null
: config.get(JdbcOptions.FIELD_IDE).getValue());
this.dialect.connectionUrlParse(
jdbcSinkConfig.getJdbcConnectionConfig().getUrl(),
jdbcSinkConfig.getJdbcConnectionConfig().getProperties(),
this.dialect.defaultParameter());
this.dataSaveMode = DataSaveMode.KEEP_SCHEMA_AND_DATA;
}

@Override
public SinkWriter<SeaTunnelRow, XidInfo, JdbcSinkState> createWriter(
SinkWriter.Context context) {
Expand Down Expand Up @@ -167,11 +140,6 @@ public SinkWriter<SeaTunnelRow, XidInfo, JdbcSinkState> restoreWriter(
return Optional.empty();
}

@Override
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
this.seaTunnelRowType = seaTunnelRowType;
}

@Override
public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
return this.seaTunnelRowType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,7 @@

package org.apache.seatunnel.connectors.seatunnel.kafka.sink;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
Expand All @@ -34,9 +30,6 @@
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;

import com.google.auto.service.AutoService;
import lombok.NoArgsConstructor;

import java.util.Collections;
import java.util.List;
import java.util.Optional;
Expand All @@ -45,32 +38,18 @@
* Kafka Sink implementation by using SeaTunnel sink API. This class contains the method to create
* {@link KafkaSinkWriter} and {@link KafkaSinkCommitter}.
*/
@AutoService(SeaTunnelSink.class)
@NoArgsConstructor
public class KafkaSink
implements SeaTunnelSink<
SeaTunnelRow, KafkaSinkState, KafkaCommitInfo, KafkaAggregatedCommitInfo> {

private ReadonlyConfig pluginConfig;
private SeaTunnelRowType seaTunnelRowType;
private final ReadonlyConfig pluginConfig;
private final SeaTunnelRowType seaTunnelRowType;

public KafkaSink(ReadonlyConfig pluginConfig, SeaTunnelRowType rowType) {
this.pluginConfig = pluginConfig;
this.seaTunnelRowType = rowType;
}

@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig))
.validate(new KafkaSinkFactory().optionRule());
this.pluginConfig = ReadonlyConfig.fromConfig(pluginConfig);
}

@Override
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
this.seaTunnelRowType = seaTunnelRowType;
}

@Override
public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
return this.seaTunnelRowType;
Expand Down
Loading