Skip to content

Commit 5fb8b01

Browse files
committed
Support config tableIdentifier for schema
1 parent a15b79f commit 5fb8b01

File tree

11 files changed

+143
-20
lines changed

11 files changed

+143
-20
lines changed

docs/en/concept/schema-feature.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ We can use SchemaOptions to define schema, the SchemaOptions contains some confi
1111

1212
```
1313
schema = {
14+
table = "database.schema.table"
15+
comment = "comment"
1416
columns = [
1517
...
1618
]
@@ -24,6 +26,14 @@ schema = {
2426
}
2527
```
2628

29+
### table
30+
31+
The table full name of the table identifier which the schema belongs to, it contains database, schema, table name. e.g. `database.schema.table`, `database.table`.
32+
33+
### comment
34+
35+
The comment of the CatalogTable which the schema belongs to.
36+
2737
### Columns
2838

2939
Columns is a list of config used to define the column in schema, each column can contains name, type, nullable, defaultValue, comment field.
@@ -131,6 +141,7 @@ source {
131141
result_table_name = "fake"
132142
row.num = 16
133143
schema {
144+
table = "FakeDatabase.FakeTable"
134145
columns = [
135146
{
136147
name = id

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTable.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ public static CatalogTable of(
4848
Map<String, String> options,
4949
List<String> partitionKeys,
5050
String comment) {
51-
return new CatalogTable(tableId, tableSchema, options, partitionKeys, comment);
51+
return new CatalogTable(
52+
tableId, tableSchema, options, partitionKeys, comment, tableId.getCatalogName());
5253
}
5354

5455
public static CatalogTable of(
@@ -67,7 +68,7 @@ private CatalogTable(
6768
Map<String, String> options,
6869
List<String> partitionKeys,
6970
String comment) {
70-
this(tableId, tableSchema, options, partitionKeys, comment, "");
71+
this(tableId, tableSchema, options, partitionKeys, comment, tableId.getCatalogName());
7172
}
7273

7374
private CatalogTable(
@@ -127,6 +128,9 @@ public String toString() {
127128
+ ", comment='"
128129
+ comment
129130
+ '\''
131+
+ ", catalogName='"
132+
+ catalogName
133+
+ '\''
130134
+ '}';
131135
}
132136
}

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
3232
import org.apache.seatunnel.common.utils.SeaTunnelException;
3333

34+
import org.apache.commons.lang3.StringUtils;
35+
3436
import lombok.extern.slf4j.Slf4j;
3537

3638
import java.io.Serializable;
@@ -135,7 +137,7 @@ public static List<CatalogTable> getCatalogTablesFromConfig(
135137
if (schemaMap.isEmpty()) {
136138
throw new SeaTunnelException("Schema config can not be empty");
137139
}
138-
CatalogTable catalogTable = CatalogTableUtil.buildWithConfig(readonlyConfig);
140+
CatalogTable catalogTable = CatalogTableUtil.buildWithConfig(factoryId, readonlyConfig);
139141
return Collections.singletonList(catalogTable);
140142
}
141143

@@ -190,6 +192,9 @@ public static SeaTunnelDataType<SeaTunnelRow> convertToDataType(
190192
}
191193
}
192194

195+
// We need to use buildWithConfig(String catalogName, ReadonlyConfig readonlyConfig);
196+
// Since this method will not inject the correct catalogName into CatalogTable
197+
@Deprecated
193198
public static List<CatalogTable> convertDataTypeToCatalogTables(
194199
SeaTunnelDataType<?> seaTunnelDataType, String tableId) {
195200
List<CatalogTable> catalogTables;
@@ -210,18 +215,39 @@ public static List<CatalogTable> convertDataTypeToCatalogTables(
210215
}
211216

212217
public static CatalogTable buildWithConfig(ReadonlyConfig readonlyConfig) {
218+
return buildWithConfig("", readonlyConfig);
219+
}
220+
221+
public static CatalogTable buildWithConfig(String catalogName, ReadonlyConfig readonlyConfig) {
213222
if (readonlyConfig.get(TableSchemaOptions.SCHEMA) == null) {
214223
throw new RuntimeException(
215224
"Schema config need option [schema], please correct your config first");
216225
}
217226
TableSchema tableSchema = new ReadonlyConfigParser().parse(readonlyConfig);
227+
228+
ReadonlyConfig schemaConfig =
229+
readonlyConfig
230+
.getOptional(TableSchemaOptions.SCHEMA)
231+
.map(ReadonlyConfig::fromMap)
232+
.orElseThrow(
233+
() -> new IllegalArgumentException("Schema config can't be null"));
234+
235+
TablePath tablePath;
236+
if (StringUtils.isNotEmpty(
237+
schemaConfig.get(TableSchemaOptions.TableIdentifierOptions.TABLE))) {
238+
tablePath =
239+
TablePath.of(schemaConfig.get(TableSchemaOptions.TableIdentifierOptions.TABLE));
240+
} else {
241+
tablePath = TablePath.EMPTY;
242+
}
243+
218244
return CatalogTable.of(
219-
// TODO: other table info
220-
TableIdentifier.of("", "", ""),
245+
TableIdentifier.of(catalogName, tablePath),
221246
tableSchema,
222247
new HashMap<>(),
248+
// todo: add partitionKeys?
223249
new ArrayList<>(),
224-
"");
250+
readonlyConfig.get(TableSchemaOptions.TableIdentifierOptions.COMMENT));
225251
}
226252

227253
public static SeaTunnelRowType buildSimpleTextSchema() {

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ public final class TablePath implements Serializable {
3434
private final String schemaName;
3535
private final String tableName;
3636

37+
public static final TablePath EMPTY = TablePath.of(null, null, null);
38+
3739
public static TablePath of(String fullName) {
3840
String[] paths = fullName.split("\\.");
3941

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/TableSchemaOptions.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,21 @@
2828

2929
public class TableSchemaOptions {
3030

31+
public static class TableIdentifierOptions {
32+
33+
public static final Option<String> TABLE =
34+
Options.key("table")
35+
.stringType()
36+
.noDefaultValue()
37+
.withDescription("SeaTunnel Schema Full Table Name");
38+
39+
public static final Option<String> COMMENT =
40+
Options.key("comment")
41+
.stringType()
42+
.noDefaultValue()
43+
.withDescription("SeaTunnel Schema Table Comment");
44+
}
45+
3146
public static final Option<Map<String, Object>> SCHEMA =
3247
Options.key("schema")
3348
.type(new TypeReference<Map<String, Object>>() {})

seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertCatalogTableRule.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.seatunnel.api.table.catalog.Column;
2323
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
2424
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
25+
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
2526
import org.apache.seatunnel.api.table.catalog.TableSchema;
2627
import org.apache.seatunnel.connectors.seatunnel.assertion.exception.AssertConnectorException;
2728

@@ -48,6 +49,9 @@ public class AssertCatalogTableRule implements Serializable {
4849
@OptionMark(description = "column rule")
4950
private AssertColumnRule columnRule;
5051

52+
@OptionMark(description = "tableIdentifier rule")
53+
private AssertTableIdentifierRule tableIdentifierRule;
54+
5155
public void checkRule(CatalogTable catalogTable) {
5256
TableSchema tableSchema = catalogTable.getTableSchema();
5357
if (tableSchema == null) {
@@ -62,6 +66,9 @@ public void checkRule(CatalogTable catalogTable) {
6266
if (columnRule != null) {
6367
columnRule.checkRule(tableSchema.getColumns());
6468
}
69+
if (tableIdentifierRule != null) {
70+
tableIdentifierRule.checkRule(catalogTable.getTableId());
71+
}
6572
}
6673

6774
@Data
@@ -138,4 +145,24 @@ public void checkRule(List<Column> check) {
138145
}
139146
}
140147
}
148+
149+
@Data
150+
@AllArgsConstructor
151+
public static class AssertTableIdentifierRule implements Serializable {
152+
153+
private TableIdentifier tableIdentifier;
154+
155+
public void checkRule(TableIdentifier actiualTableIdentifier) {
156+
if (actiualTableIdentifier == null) {
157+
throw new AssertConnectorException(CATALOG_TABLE_FAILED, "tableIdentifier is null");
158+
}
159+
if (!actiualTableIdentifier.equals(tableIdentifier)) {
160+
throw new AssertConnectorException(
161+
CATALOG_TABLE_FAILED,
162+
String.format(
163+
"tableIdentifier: %s is not equal to %s",
164+
actiualTableIdentifier, tableIdentifier));
165+
}
166+
}
167+
}
141168
}

seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertCatalogTableRuleParser.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
2424
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
2525
import org.apache.seatunnel.api.table.catalog.SeaTunnelDataTypeConvertorUtil;
26+
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
27+
import org.apache.seatunnel.api.table.catalog.TablePath;
2628
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
2729
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
2830

@@ -46,6 +48,9 @@
4648
import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.PRIMARY_KEY_COLUMNS;
4749
import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.PRIMARY_KEY_NAME;
4850
import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.PRIMARY_KEY_RULE;
51+
import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.TableIdentifierRule.TABLE_IDENTIFIER_CATALOG_NAME;
52+
import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.TableIdentifierRule.TABLE_IDENTIFIER_RULE;
53+
import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.TableIdentifierRule.TABLE_IDENTIFIER_TABLE_NAME;
4954

5055
public class AssertCatalogTableRuleParser {
5156

@@ -55,6 +60,7 @@ public AssertCatalogTableRule parseCatalogTableRule(Config catalogTableRule) {
5560
parsePrimaryKeyRule(catalogTableRule).ifPresent(tableRule::setPrimaryKeyRule);
5661
parseConstraintKeyRule(catalogTableRule).ifPresent(tableRule::setConstraintKeyRule);
5762
parseColumnRule(catalogTableRule).ifPresent(tableRule::setColumnRule);
63+
parseTableIdentifierRule(catalogTableRule).ifPresent(tableRule::setTableIdentifierRule);
5864
return tableRule;
5965
}
6066

@@ -156,4 +162,17 @@ private Optional<AssertCatalogTableRule.AssertConstraintKeyRule> parseConstraint
156162
.collect(Collectors.toList());
157163
return Optional.of(new AssertCatalogTableRule.AssertConstraintKeyRule(constraintKeys));
158164
}
165+
166+
private Optional<AssertCatalogTableRule.AssertTableIdentifierRule> parseTableIdentifierRule(
167+
Config catalogTableRule) {
168+
if (!catalogTableRule.hasPath(TABLE_IDENTIFIER_RULE)) {
169+
return Optional.empty();
170+
}
171+
Config tableIdentifierRule = catalogTableRule.getConfig(TABLE_IDENTIFIER_RULE);
172+
TableIdentifier tableIdentifier =
173+
TableIdentifier.of(
174+
tableIdentifierRule.getString(TABLE_IDENTIFIER_CATALOG_NAME),
175+
TablePath.of(tableIdentifierRule.getString(TABLE_IDENTIFIER_TABLE_NAME)));
176+
return Optional.of(new AssertCatalogTableRule.AssertTableIdentifierRule(tableIdentifier));
177+
}
159178
}

seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertConfig.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,13 @@ public class AssertConfig {
6464
public static final String COLUMN_DEFAULT_VALUE = "default_value";
6565
public static final String COLUMN_COMMENT = "comment";
6666

67+
public static class TableIdentifierRule {
68+
public static final String TABLE_IDENTIFIER_RULE = "table_identifier_rule";
69+
70+
public static final String TABLE_IDENTIFIER_CATALOG_NAME = "catalog_name";
71+
public static final String TABLE_IDENTIFIER_TABLE_NAME = "table";
72+
}
73+
6774
public static final Option<String> COMMENT =
6875
Options.key("comment")
6976
.stringType()

seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,7 @@ public static FakeConfig buildWithConfig(Config config) {
416416
List<String> tableNames = config.getStringList(CatalogOptions.TABLE_NAMES.key());
417417
List<TableIdentifier> tableIdentifiers = new ArrayList<>(tableNames.size());
418418
for (String tableName : tableNames) {
419-
tableIdentifiers.add(TableIdentifier.of("fake", TablePath.of(tableName)));
419+
tableIdentifiers.add(TableIdentifier.of("FakeSource", TablePath.of(tableName)));
420420
}
421421
builder.tableIdentifiers(tableIdentifiers);
422422
}

seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.seatunnel.api.source.SupportParallelism;
3131
import org.apache.seatunnel.api.table.catalog.CatalogTable;
3232
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
33+
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
3334
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
3435
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
3536
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -41,6 +42,8 @@
4142
import org.apache.seatunnel.connectors.seatunnel.fake.exception.FakeConnectorException;
4243
import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeSourceState;
4344

45+
import org.apache.commons.collections4.CollectionUtils;
46+
4447
import com.google.auto.service.AutoService;
4548
import com.google.common.collect.Lists;
4649

@@ -61,7 +64,7 @@ public class FakeSource
6164
public FakeSource() {}
6265

6366
public FakeSource(ReadonlyConfig readonlyConfig) {
64-
this.catalogTable = CatalogTableUtil.buildWithConfig(readonlyConfig);
67+
this.catalogTable = CatalogTableUtil.buildWithConfig(getPluginName(), readonlyConfig);
6568
this.fakeConfig = FakeConfig.buildWithConfig(readonlyConfig.toConfig());
6669
}
6770

@@ -74,20 +77,23 @@ public Boundedness getBoundedness() {
7477

7578
@Override
7679
public List<CatalogTable> getProducedCatalogTables() {
77-
if (fakeConfig.getTableIdentifiers().isEmpty()) {
80+
// If tableNames is empty, means this is only one catalogTable, return the original
81+
// catalogTable
82+
if (CollectionUtils.isEmpty(fakeConfig.getTableIdentifiers())) {
7883
return Lists.newArrayList(catalogTable);
79-
} else {
80-
return fakeConfig.getTableIdentifiers().stream()
81-
.map(
82-
tableIdentifier ->
83-
CatalogTable.of(
84-
tableIdentifier,
85-
catalogTable.getTableSchema(),
86-
catalogTable.getOptions(),
87-
catalogTable.getPartitionKeys(),
88-
catalogTable.getComment()))
89-
.collect(Collectors.toList());
9084
}
85+
// Otherwise, return the catalogTables with the tableNames
86+
return fakeConfig.getTableIdentifiers().stream()
87+
.map(
88+
tableIdentifier ->
89+
CatalogTable.of(
90+
TableIdentifier.of(
91+
getPluginName(), tableIdentifier.toTablePath()),
92+
catalogTable.getTableSchema(),
93+
catalogTable.getOptions(),
94+
catalogTable.getPartitionKeys(),
95+
catalogTable.getComment()))
96+
.collect(Collectors.toList());
9197
}
9298

9399
@Override

0 commit comments

Comments
 (0)