Skip to content

Commit ba64f72

Browse files
committed
Support config tableIdentifier for schema
1 parent b1d66c5 commit ba64f72

File tree

9 files changed

+133
-7
lines changed

9 files changed

+133
-7
lines changed

docs/en/concept/schema-feature.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ We can use SchemaOptions to define schema, the SchemaOptions contains some confi
1111

1212
```
1313
schema = {
14+
database = "databaseName"
15+
table = "tableName"
16+
comment = "comment"
1417
columns = [
1518
...
1619
]
@@ -24,6 +27,18 @@ schema = {
2427
}
2528
```
2629

30+
### database
31+
32+
The database name of the table identifier which the schema belongs to.
33+
34+
### table
35+
36+
The table name of the table identifier which the schema belongs to.
37+
38+
### comment
39+
40+
The comment of the CatalogTable which the schema belongs to.
41+
2742
### Columns
2843

2944
Columns is a list of config used to define the column in schema, each column can contains name, type, nullable, defaultValue, comment field.
@@ -131,6 +146,8 @@ source {
131146
result_table_name = "fake"
132147
row.num = 16
133148
schema {
149+
database = "FakeDatabase"
150+
table = "FakeTable"
134151
columns = [
135152
{
136153
name = id

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTable.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ public static CatalogTable of(
4848
Map<String, String> options,
4949
List<String> partitionKeys,
5050
String comment) {
51-
return new CatalogTable(tableId, tableSchema, options, partitionKeys, comment);
51+
return new CatalogTable(
52+
tableId, tableSchema, options, partitionKeys, comment, tableId.getCatalogName());
5253
}
5354

5455
public static CatalogTable of(
@@ -67,7 +68,7 @@ private CatalogTable(
6768
Map<String, String> options,
6869
List<String> partitionKeys,
6970
String comment) {
70-
this(tableId, tableSchema, options, partitionKeys, comment, "");
71+
this(tableId, tableSchema, options, partitionKeys, comment, tableId.getCatalogName());
7172
}
7273

7374
private CatalogTable(
@@ -127,6 +128,9 @@ public String toString() {
127128
+ ", comment='"
128129
+ comment
129130
+ '\''
131+
+ ", catalogName='"
132+
+ catalogName
133+
+ '\''
130134
+ '}';
131135
}
132136
}

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ public static List<CatalogTable> getCatalogTablesFromConfig(
135135
if (schemaMap.isEmpty()) {
136136
throw new SeaTunnelException("Schema config can not be empty");
137137
}
138-
CatalogTable catalogTable = CatalogTableUtil.buildWithConfig(readonlyConfig);
138+
CatalogTable catalogTable = CatalogTableUtil.buildWithConfig(factoryId, readonlyConfig);
139139
return Collections.singletonList(catalogTable);
140140
}
141141

@@ -190,19 +190,40 @@ public static SeaTunnelDataType<SeaTunnelRow> convertToDataType(
190190
}
191191
}
192192

193+
// We need to use buildWithConfig(String catalogName, ReadonlyConfig readonlyConfig);
194+
// Since this method will not inject the correct catalogName into CatalogTable
195+
@Deprecated
193196
public static CatalogTable buildWithConfig(ReadonlyConfig readonlyConfig) {
197+
return buildWithConfig("", readonlyConfig);
198+
}
199+
200+
public static CatalogTable buildWithConfig(String catalogName, ReadonlyConfig readonlyConfig) {
194201
if (readonlyConfig.get(TableSchemaOptions.SCHEMA) == null) {
195202
throw new RuntimeException(
196203
"Schema config need option [schema], please correct your config first");
197204
}
198205
TableSchema tableSchema = new ReadonlyConfigParser().parse(readonlyConfig);
206+
207+
ReadonlyConfig schemaConfig =
208+
readonlyConfig
209+
.getOptional(TableSchemaOptions.SCHEMA)
210+
.map(ReadonlyConfig::fromMap)
211+
.orElseThrow(
212+
() -> new IllegalArgumentException("Schema config can't be null"));
213+
214+
TableIdentifier tableIdentifier =
215+
TableIdentifier.of(
216+
catalogName,
217+
schemaConfig.get(TableSchemaOptions.TableIdentifierOptions.DATABASE),
218+
schemaConfig.get(TableSchemaOptions.TableIdentifierOptions.TABLE));
219+
199220
return CatalogTable.of(
200-
// TODO: other table info
201-
TableIdentifier.of("", "", ""),
221+
tableIdentifier,
202222
tableSchema,
203223
new HashMap<>(),
224+
// todo: add partitionKeys?
204225
new ArrayList<>(),
205-
"");
226+
readonlyConfig.get(TableSchemaOptions.TableIdentifierOptions.COMMENT));
206227
}
207228

208229
public static SeaTunnelRowType buildSimpleTextSchema() {

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/TableSchemaOptions.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,27 @@
2828

2929
public class TableSchemaOptions {
3030

31+
public static class TableIdentifierOptions {
32+
33+
public static final Option<String> DATABASE =
34+
Options.key("database")
35+
.stringType()
36+
.noDefaultValue()
37+
.withDescription("SeaTunnel Schema DataBase Name");
38+
39+
public static final Option<String> TABLE =
40+
Options.key("table")
41+
.stringType()
42+
.noDefaultValue()
43+
.withDescription("SeaTunnel Schema Table Name");
44+
45+
public static final Option<String> COMMENT =
46+
Options.key("comment")
47+
.stringType()
48+
.noDefaultValue()
49+
.withDescription("SeaTunnel Schema Table Comment");
50+
}
51+
3152
public static final Option<Map<String, Object>> SCHEMA =
3253
Options.key("schema")
3354
.type(new TypeReference<Map<String, Object>>() {})

seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertCatalogTableRule.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.seatunnel.api.table.catalog.Column;
2323
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
2424
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
25+
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
2526
import org.apache.seatunnel.api.table.catalog.TableSchema;
2627
import org.apache.seatunnel.connectors.seatunnel.assertion.exception.AssertConnectorException;
2728

@@ -48,6 +49,9 @@ public class AssertCatalogTableRule implements Serializable {
4849
@OptionMark(description = "column rule")
4950
private AssertColumnRule columnRule;
5051

52+
@OptionMark(description = "tableIdentifier rule")
53+
private AssertTableIdentifierRule tableIdentifierRule;
54+
5155
public void checkRule(CatalogTable catalogTable) {
5256
TableSchema tableSchema = catalogTable.getTableSchema();
5357
if (tableSchema == null) {
@@ -62,6 +66,9 @@ public void checkRule(CatalogTable catalogTable) {
6266
if (columnRule != null) {
6367
columnRule.checkRule(tableSchema.getColumns());
6468
}
69+
if (tableIdentifierRule != null) {
70+
tableIdentifierRule.checkRule(catalogTable.getTableId());
71+
}
6572
}
6673

6774
@Data
@@ -138,4 +145,24 @@ public void checkRule(List<Column> check) {
138145
}
139146
}
140147
}
148+
149+
@Data
150+
@AllArgsConstructor
151+
public static class AssertTableIdentifierRule implements Serializable {
152+
153+
private TableIdentifier tableIdentifier;
154+
155+
public void checkRule(TableIdentifier actiualTableIdentifier) {
156+
if (actiualTableIdentifier == null) {
157+
throw new AssertConnectorException(CATALOG_TABLE_FAILED, "tableIdentifier is null");
158+
}
159+
if (!actiualTableIdentifier.equals(tableIdentifier)) {
160+
throw new AssertConnectorException(
161+
CATALOG_TABLE_FAILED,
162+
String.format(
163+
"tableIdentifier: %s is not equal to %s",
164+
actiualTableIdentifier, tableIdentifier));
165+
}
166+
}
167+
}
141168
}

seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertCatalogTableRuleParser.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
2424
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
2525
import org.apache.seatunnel.api.table.catalog.SeaTunnelDataTypeConvertorUtil;
26+
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
2627
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
2728
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
2829

@@ -46,6 +47,10 @@
4647
import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.PRIMARY_KEY_COLUMNS;
4748
import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.PRIMARY_KEY_NAME;
4849
import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.PRIMARY_KEY_RULE;
50+
import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.TableIdentifierRule.TABLE_IDENTIFIER_CATALOG_NAME;
51+
import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.TableIdentifierRule.TABLE_IDENTIFIER_DATABASE_NAME;
52+
import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.TableIdentifierRule.TABLE_IDENTIFIER_RULE;
53+
import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.TableIdentifierRule.TABLE_IDENTIFIER_TABLE_NAME;
4954

5055
public class AssertCatalogTableRuleParser {
5156

@@ -55,6 +60,7 @@ public AssertCatalogTableRule parseCatalogTableRule(Config catalogTableRule) {
5560
parsePrimaryKeyRule(catalogTableRule).ifPresent(tableRule::setPrimaryKeyRule);
5661
parseConstraintKeyRule(catalogTableRule).ifPresent(tableRule::setConstraintKeyRule);
5762
parseColumnRule(catalogTableRule).ifPresent(tableRule::setColumnRule);
63+
parseTableIdentifierRule(catalogTableRule).ifPresent(tableRule::setTableIdentifierRule);
5864
return tableRule;
5965
}
6066

@@ -156,4 +162,18 @@ private Optional<AssertCatalogTableRule.AssertConstraintKeyRule> parseConstraint
156162
.collect(Collectors.toList());
157163
return Optional.of(new AssertCatalogTableRule.AssertConstraintKeyRule(constraintKeys));
158164
}
165+
166+
private Optional<AssertCatalogTableRule.AssertTableIdentifierRule> parseTableIdentifierRule(
167+
Config catalogTableRule) {
168+
if (!catalogTableRule.hasPath(TABLE_IDENTIFIER_RULE)) {
169+
return Optional.empty();
170+
}
171+
Config tableIdentifierRule = catalogTableRule.getConfig(TABLE_IDENTIFIER_RULE);
172+
TableIdentifier tableIdentifier =
173+
TableIdentifier.of(
174+
tableIdentifierRule.getString(TABLE_IDENTIFIER_CATALOG_NAME),
175+
tableIdentifierRule.getString(TABLE_IDENTIFIER_DATABASE_NAME),
176+
tableIdentifierRule.getString(TABLE_IDENTIFIER_TABLE_NAME));
177+
return Optional.of(new AssertCatalogTableRule.AssertTableIdentifierRule(tableIdentifier));
178+
}
159179
}

seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertConfig.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,14 @@ public class AssertConfig {
6464
public static final String COLUMN_DEFAULT_VALUE = "default_value";
6565
public static final String COLUMN_COMMENT = "comment";
6666

67+
public static class TableIdentifierRule {
68+
public static final String TABLE_IDENTIFIER_RULE = "table_identifier_rule";
69+
70+
public static final String TABLE_IDENTIFIER_CATALOG_NAME = "catalog_name";
71+
public static final String TABLE_IDENTIFIER_DATABASE_NAME = "database_name";
72+
public static final String TABLE_IDENTIFIER_TABLE_NAME = "table_name";
73+
}
74+
6775
public static final Option<String> COMMENT =
6876
Options.key("comment")
6977
.stringType()

seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public class FakeSource
6161
public FakeSource() {}
6262

6363
public FakeSource(ReadonlyConfig readonlyConfig) {
64-
this.catalogTable = CatalogTableUtil.buildWithConfig(readonlyConfig);
64+
this.catalogTable = CatalogTableUtil.buildWithConfig(getPluginName(), readonlyConfig);
6565
this.fakeConfig = FakeConfig.buildWithConfig(readonlyConfig.toConfig());
6666
}
6767

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_catalogtable.conf

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ source {
2424
FakeSource {
2525
row.num = 100
2626
schema = {
27+
database = "fakeDatabase"
28+
table = "fakeTable"
2729
columns = [
2830
{
2931
name = id
@@ -63,6 +65,12 @@ sink{
6365
Assert {
6466
rules {
6567
catalog_table_rule {
68+
table_identifier_rule = {
69+
catalog_name = "FakeSource"
70+
database_name = "fakeDatabase"
71+
table_name = "fakeTable"
72+
}
73+
6674
primary_key_rule = {
6775
primary_key_name = "primary key"
6876
primary_key_columns = ["id"]

0 commit comments

Comments
 (0)