Skip to content

Commit 86cba87

Browse files
authored
[Improve] Add default implement for SeaTunnelSink::setTypeInfo (#5682)
1 parent a8f6b18 commit 86cba87

File tree

12 files changed

+50
-195
lines changed
  • seatunnel-api/src/main/java/org/apache/seatunnel/api
  • seatunnel-connectors-v2
    • connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink
    • connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink
    • connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink
    • connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink
    • connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink
    • connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink
  • seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc
  • seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server

12 files changed

+50
-195
lines changed

seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,9 @@ public interface SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT>
5757
* @param seaTunnelRowType The row type info of sink.
5858
*/
5959
@Deprecated
60-
void setTypeInfo(SeaTunnelRowType seaTunnelRowType);
60+
default void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
61+
throw new UnsupportedOperationException("setTypeInfo method is not supported");
62+
}
6163

6264
/**
6365
* Get the data type of the records consumed by this sink.

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public interface TableSinkFactory<IN, StateT, CommitInfoT, AggregatedCommitInfoT
3434
* We will never use this method now. So gave a default implement and return null.
3535
*
3636
* @param context TableFactoryContext
37-
* @return
37+
* @return return the sink created by this factory
3838
*/
3939
default TableSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> createSink(
4040
TableSinkFactoryContext context) {

seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java

Lines changed: 1 addition & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
2323

2424
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
25-
import org.apache.seatunnel.api.sink.SeaTunnelSink;
2625
import org.apache.seatunnel.api.sink.SinkWriter;
2726
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
2827
import org.apache.seatunnel.api.table.catalog.CatalogOptions;
@@ -39,7 +38,6 @@
3938

4039
import org.apache.commons.collections4.CollectionUtils;
4140

42-
import com.google.auto.service.AutoService;
4341
import com.google.common.base.Throwables;
4442

4543
import java.util.ArrayList;
@@ -50,21 +48,15 @@
5048
import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.ROW_RULES;
5149
import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.RULES;
5250

53-
@AutoService(SeaTunnelSink.class)
5451
public class AssertSink extends AbstractSimpleSink<SeaTunnelRow, Void>
5552
implements SupportMultiTableSink {
5653
private SeaTunnelRowType seaTunnelRowType;
57-
private CatalogTable catalogTable;
5854
private List<AssertFieldRule> assertFieldRules;
5955
private List<AssertFieldRule.AssertRule> assertRowRules;
60-
private AssertTableRule assertTableRule;
61-
56+
private final AssertTableRule assertTableRule;
6257
private AssertCatalogTableRule assertCatalogTableRule;
6358

64-
public AssertSink() {}
65-
6659
public AssertSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) {
67-
this.catalogTable = catalogTable;
6860
this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
6961
if (!pluginConfig.getOptional(RULES).isPresent()) {
7062
Throwables.propagateIfPossible(new ConfigException.Missing(RULES.key()));
@@ -105,11 +97,6 @@ public AssertSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) {
10597
}
10698
}
10799

108-
@Override
109-
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
110-
this.seaTunnelRowType = seaTunnelRowType;
111-
}
112-
113100
@Override
114101
public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
115102
return seaTunnelRowType;
@@ -121,40 +108,6 @@ public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context co
121108
seaTunnelRowType, assertFieldRules, assertRowRules, assertTableRule);
122109
}
123110

124-
@Override
125-
public void prepare(Config pluginConfig) {
126-
if (!pluginConfig.hasPath(RULES.key())) {
127-
Throwables.propagateIfPossible(new ConfigException.Missing(RULES.key()));
128-
}
129-
Config ruleConfig = pluginConfig.getConfig(RULES.key());
130-
List<? extends Config> rowConfigList = null;
131-
List<? extends Config> configList = null;
132-
if (ruleConfig.hasPath(ROW_RULES)) {
133-
rowConfigList = ruleConfig.getConfigList(ROW_RULES);
134-
assertRowRules = new AssertRuleParser().parseRowRules(rowConfigList);
135-
}
136-
if (ruleConfig.hasPath(FIELD_RULES)) {
137-
configList = ruleConfig.getConfigList(FIELD_RULES);
138-
assertFieldRules = new AssertRuleParser().parseRules(configList);
139-
}
140-
141-
if (ruleConfig.hasPath(CatalogOptions.TABLE_NAMES.key())) {
142-
assertTableRule =
143-
new AssertTableRule(ruleConfig.getStringList(CatalogOptions.TABLE_NAMES.key()));
144-
} else {
145-
assertTableRule = new AssertTableRule(new ArrayList<>());
146-
}
147-
148-
if (CollectionUtils.isEmpty(configList)
149-
&& CollectionUtils.isEmpty(rowConfigList)
150-
&& assertCatalogTableRule == null
151-
&& assertTableRule.getTableNames().isEmpty()) {
152-
Throwables.propagateIfPossible(
153-
new ConfigException.BadValue(
154-
RULES.key(), "Assert rule config is empty, please add rule config."));
155-
}
156-
}
157-
158111
@Override
159112
public String getPluginName() {
160113
return "Assert";

seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,7 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.common.multitablesink;
1919

20-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21-
2220
import org.apache.seatunnel.api.common.JobContext;
23-
import org.apache.seatunnel.api.common.PrepareFailException;
2421
import org.apache.seatunnel.api.serialization.DefaultSerializer;
2522
import org.apache.seatunnel.api.serialization.Serializer;
2623
import org.apache.seatunnel.api.sink.DataSaveMode;
@@ -33,10 +30,6 @@
3330
import org.apache.seatunnel.api.table.factory.MultiTableFactoryContext;
3431
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
3532
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
36-
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
37-
38-
import com.google.auto.service.AutoService;
39-
import lombok.NoArgsConstructor;
4033

4134
import java.io.IOException;
4235
import java.util.HashMap;
@@ -48,8 +41,6 @@
4841

4942
import static org.apache.seatunnel.api.sink.DataSaveMode.KEEP_SCHEMA_AND_DATA;
5043

51-
@AutoService(SeaTunnelSink.class)
52-
@NoArgsConstructor
5344
public class MultiTableSink
5445
implements SeaTunnelSink<
5546
SeaTunnelRow,
@@ -58,8 +49,8 @@ public class MultiTableSink
5849
MultiTableAggregatedCommitInfo>,
5950
SupportDataSaveMode {
6051

61-
private Map<String, SeaTunnelSink> sinks;
62-
private int replicaNum;
52+
private final Map<String, SeaTunnelSink> sinks;
53+
private final int replicaNum;
6354

6455
public MultiTableSink(MultiTableFactoryContext context) {
6556
this.sinks = context.getSinks();
@@ -71,17 +62,6 @@ public String getPluginName() {
7162
return "MultiTableSink";
7263
}
7364

74-
@Override
75-
public void prepare(Config pluginConfig) throws PrepareFailException {
76-
throw new UnsupportedOperationException(
77-
"Please use MultiTableSinkFactory to create MultiTableSink");
78-
}
79-
80-
@Override
81-
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
82-
throw new UnsupportedOperationException("MultiTableSink only support CatalogTable");
83-
}
84-
8565
@Override
8666
public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
8767
throw new UnsupportedOperationException("MultiTableSink only support CatalogTable");

seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,7 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.console.sink;
1919

20-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21-
2220
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
23-
import org.apache.seatunnel.api.sink.SeaTunnelSink;
2421
import org.apache.seatunnel.api.sink.SinkWriter;
2522
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
2623
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -29,31 +26,21 @@
2926
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
3027
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
3128

32-
import com.google.auto.service.AutoService;
33-
import lombok.NoArgsConstructor;
34-
3529
import static org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkFactory.LOG_PRINT_DATA;
3630
import static org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkFactory.LOG_PRINT_DELAY;
3731

38-
@NoArgsConstructor
39-
@AutoService(SeaTunnelSink.class)
4032
public class ConsoleSink extends AbstractSimpleSink<SeaTunnelRow, Void>
4133
implements SupportMultiTableSink {
42-
private SeaTunnelRowType seaTunnelRowType;
43-
private boolean isPrintData = true;
44-
private int delayMs = 0;
34+
private final SeaTunnelRowType seaTunnelRowType;
35+
private final boolean isPrintData;
36+
private final int delayMs;
4537

4638
public ConsoleSink(SeaTunnelRowType seaTunnelRowType, ReadonlyConfig options) {
4739
this.seaTunnelRowType = seaTunnelRowType;
4840
this.isPrintData = options.get(LOG_PRINT_DATA);
4941
this.delayMs = options.get(LOG_PRINT_DELAY);
5042
}
5143

52-
@Override
53-
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
54-
this.seaTunnelRowType = seaTunnelRowType;
55-
}
56-
5744
@Override
5845
public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
5946
return this.seaTunnelRowType;
@@ -68,10 +55,4 @@ public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context co
6855
public String getPluginName() {
6956
return "Console";
7057
}
71-
72-
@Override
73-
public void prepare(Config pluginConfig) {
74-
this.isPrintData = ReadonlyConfig.fromConfig(pluginConfig).get(LOG_PRINT_DATA);
75-
this.delayMs = ReadonlyConfig.fromConfig(pluginConfig).get(LOG_PRINT_DELAY);
76-
}
7758
}

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java

Lines changed: 6 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,7 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
1919

20-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21-
2220
import org.apache.seatunnel.api.common.JobContext;
23-
import org.apache.seatunnel.api.common.PrepareFailException;
2421
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2522
import org.apache.seatunnel.api.serialization.DefaultSerializer;
2623
import org.apache.seatunnel.api.serialization.Serializer;
@@ -42,7 +39,6 @@
4239
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
4340
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
4441
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
45-
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader;
4642
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
4743
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcAggregatedCommitInfo;
4844
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState;
@@ -51,34 +47,31 @@
5147

5248
import org.apache.commons.lang3.StringUtils;
5349

54-
import com.google.auto.service.AutoService;
55-
5650
import java.io.IOException;
5751
import java.util.ArrayList;
5852
import java.util.List;
5953
import java.util.Optional;
6054

6155
import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
6256

63-
@AutoService(SeaTunnelSink.class)
6457
public class JdbcSink
6558
implements SeaTunnelSink<SeaTunnelRow, JdbcSinkState, XidInfo, JdbcAggregatedCommitInfo>,
6659
SupportDataSaveMode,
6760
SupportMultiTableSink {
6861

69-
private SeaTunnelRowType seaTunnelRowType;
62+
private final SeaTunnelRowType seaTunnelRowType;
7063

7164
private JobContext jobContext;
7265

73-
private JdbcSinkConfig jdbcSinkConfig;
66+
private final JdbcSinkConfig jdbcSinkConfig;
7467

75-
private JdbcDialect dialect;
68+
private final JdbcDialect dialect;
7669

77-
private ReadonlyConfig config;
70+
private final ReadonlyConfig config;
7871

79-
private DataSaveMode dataSaveMode;
72+
private final DataSaveMode dataSaveMode;
8073

81-
private CatalogTable catalogTable;
74+
private final CatalogTable catalogTable;
8275

8376
public JdbcSink(
8477
ReadonlyConfig config,
@@ -94,31 +87,11 @@ public JdbcSink(
9487
this.seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType();
9588
}
9689

97-
public JdbcSink() {}
98-
9990
@Override
10091
public String getPluginName() {
10192
return "Jdbc";
10293
}
10394

104-
@Override
105-
public void prepare(Config pluginConfig) throws PrepareFailException {
106-
this.config = ReadonlyConfig.fromConfig(pluginConfig);
107-
this.jdbcSinkConfig = JdbcSinkConfig.of(config);
108-
this.dialect =
109-
JdbcDialectLoader.load(
110-
jdbcSinkConfig.getJdbcConnectionConfig().getUrl(),
111-
jdbcSinkConfig.getJdbcConnectionConfig().getCompatibleMode(),
112-
config.get(JdbcOptions.FIELD_IDE) == null
113-
? null
114-
: config.get(JdbcOptions.FIELD_IDE).getValue());
115-
this.dialect.connectionUrlParse(
116-
jdbcSinkConfig.getJdbcConnectionConfig().getUrl(),
117-
jdbcSinkConfig.getJdbcConnectionConfig().getProperties(),
118-
this.dialect.defaultParameter());
119-
this.dataSaveMode = DataSaveMode.KEEP_SCHEMA_AND_DATA;
120-
}
121-
12295
@Override
12396
public SinkWriter<SeaTunnelRow, XidInfo, JdbcSinkState> createWriter(
12497
SinkWriter.Context context) {
@@ -167,11 +140,6 @@ public SinkWriter<SeaTunnelRow, XidInfo, JdbcSinkState> restoreWriter(
167140
return Optional.empty();
168141
}
169142

170-
@Override
171-
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
172-
this.seaTunnelRowType = seaTunnelRowType;
173-
}
174-
175143
@Override
176144
public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
177145
return this.seaTunnelRowType;

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,7 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
1919

20-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21-
22-
import org.apache.seatunnel.api.common.PrepareFailException;
2320
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
24-
import org.apache.seatunnel.api.configuration.util.ConfigValidator;
2521
import org.apache.seatunnel.api.serialization.DefaultSerializer;
2622
import org.apache.seatunnel.api.serialization.Serializer;
2723
import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -34,9 +30,6 @@
3430
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
3531
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;
3632

37-
import com.google.auto.service.AutoService;
38-
import lombok.NoArgsConstructor;
39-
4033
import java.util.Collections;
4134
import java.util.List;
4235
import java.util.Optional;
@@ -45,32 +38,18 @@
4538
* Kafka Sink implementation by using SeaTunnel sink API. This class contains the method to create
4639
* {@link KafkaSinkWriter} and {@link KafkaSinkCommitter}.
4740
*/
48-
@AutoService(SeaTunnelSink.class)
49-
@NoArgsConstructor
5041
public class KafkaSink
5142
implements SeaTunnelSink<
5243
SeaTunnelRow, KafkaSinkState, KafkaCommitInfo, KafkaAggregatedCommitInfo> {
5344

54-
private ReadonlyConfig pluginConfig;
55-
private SeaTunnelRowType seaTunnelRowType;
45+
private final ReadonlyConfig pluginConfig;
46+
private final SeaTunnelRowType seaTunnelRowType;
5647

5748
public KafkaSink(ReadonlyConfig pluginConfig, SeaTunnelRowType rowType) {
5849
this.pluginConfig = pluginConfig;
5950
this.seaTunnelRowType = rowType;
6051
}
6152

62-
@Override
63-
public void prepare(Config pluginConfig) throws PrepareFailException {
64-
ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig))
65-
.validate(new KafkaSinkFactory().optionRule());
66-
this.pluginConfig = ReadonlyConfig.fromConfig(pluginConfig);
67-
}
68-
69-
@Override
70-
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
71-
this.seaTunnelRowType = seaTunnelRowType;
72-
}
73-
7453
@Override
7554
public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
7655
return this.seaTunnelRowType;

0 commit comments

Comments
 (0)