Skip to content

Commit bbaf7bc

Browse files
committed
Support config tableIdentifier for schema
1 parent 59bf0a5 commit bbaf7bc

File tree

12 files changed

+187
-22
lines changed

12 files changed

+187
-22
lines changed

docs/en/concept/schema-feature.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ We can use SchemaOptions to define schema, the SchemaOptions contains some confi
1111

1212
```
1313
schema = {
14+
table = "database.schema.table"
15+
schemaFirst = false
16+
comment = "comment"
1417
columns = [
1518
...
1619
]
@@ -24,6 +27,20 @@ schema = {
2427
}
2528
```
2629

30+
### table
31+
32+
The table full name of the table identifier which the schema belongs to, it contains database, schema, table name. e.g. `database.schema.table`, `database.table`, `table`.
33+
34+
### schema_first
35+
36+
Default is false.
37+
38+
If the schema_first is true, the schema will be used first, this means if we set `table = "a.b"`, a will be parsed as schema rather than database, then we can support write `table = "schema.table"`.
39+
40+
### comment
41+
42+
The comment of the CatalogTable which the schema belongs to.
43+
2744
### Columns
2845

2946
Columns is a list of config used to define the column in schema, each column can contains name, type, nullable, defaultValue, comment field.
@@ -131,6 +148,7 @@ source {
131148
result_table_name = "fake"
132149
row.num = 16
133150
schema {
151+
table = "FakeDatabase.FakeTable"
134152
columns = [
135153
{
136154
name = id

docs/en/connector-v2/source/FakeSource.md

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,21 @@ The template list of double type that connector generated, if user configured it
265265

266266
### table-names
267267

268-
The table list that connector generated, used to simulate multi-table scenarios
268+
The table list that connector generated, used to simulate multi-table scenarios.
269+
270+
This option will override the `table` option in the `schema` option.
271+
For example, if you configure the `table-names` option as follows, the connector will generate data for the `test.table1` and `test.table2` tables, the `database.schema.table` will be drop.
272+
273+
```agsl
274+
FakeSource {
275+
table-names = ["test.table1", "test.table2"]
276+
schema = {
277+
table = "database.schema.table"
278+
...
279+
}
280+
...
281+
}
282+
```
269283

270284
### common options
271285

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTable.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ public static CatalogTable of(
4848
Map<String, String> options,
4949
List<String> partitionKeys,
5050
String comment) {
51-
return new CatalogTable(tableId, tableSchema, options, partitionKeys, comment);
51+
return new CatalogTable(
52+
tableId, tableSchema, options, partitionKeys, comment, tableId.getCatalogName());
5253
}
5354

5455
public static CatalogTable of(
@@ -67,7 +68,7 @@ private CatalogTable(
6768
Map<String, String> options,
6869
List<String> partitionKeys,
6970
String comment) {
70-
this(tableId, tableSchema, options, partitionKeys, comment, "");
71+
this(tableId, tableSchema, options, partitionKeys, comment, tableId.getCatalogName());
7172
}
7273

7374
private CatalogTable(
@@ -127,6 +128,9 @@ public String toString() {
127128
+ ", comment='"
128129
+ comment
129130
+ '\''
131+
+ ", catalogName='"
132+
+ catalogName
133+
+ '\''
130134
+ '}';
131135
}
132136
}

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
3232
import org.apache.seatunnel.common.utils.SeaTunnelException;
3333

34+
import org.apache.commons.lang3.StringUtils;
35+
3436
import lombok.extern.slf4j.Slf4j;
3537

3638
import java.io.Serializable;
@@ -135,7 +137,7 @@ public static List<CatalogTable> getCatalogTablesFromConfig(
135137
if (schemaMap.isEmpty()) {
136138
throw new SeaTunnelException("Schema config can not be empty");
137139
}
138-
CatalogTable catalogTable = CatalogTableUtil.buildWithConfig(readonlyConfig);
140+
CatalogTable catalogTable = CatalogTableUtil.buildWithConfig(factoryId, readonlyConfig);
139141
return Collections.singletonList(catalogTable);
140142
}
141143

@@ -190,6 +192,9 @@ public static SeaTunnelDataType<SeaTunnelRow> convertToDataType(
190192
}
191193
}
192194

195+
// We need to use buildWithConfig(String catalogName, ReadonlyConfig readonlyConfig);
196+
// Since this method will not inject the correct catalogName into CatalogTable
197+
@Deprecated
193198
public static List<CatalogTable> convertDataTypeToCatalogTables(
194199
SeaTunnelDataType<?> seaTunnelDataType, String tableId) {
195200
List<CatalogTable> catalogTables;
@@ -210,18 +215,42 @@ public static List<CatalogTable> convertDataTypeToCatalogTables(
210215
}
211216

212217
public static CatalogTable buildWithConfig(ReadonlyConfig readonlyConfig) {
218+
return buildWithConfig("", readonlyConfig);
219+
}
220+
221+
public static CatalogTable buildWithConfig(String catalogName, ReadonlyConfig readonlyConfig) {
213222
if (readonlyConfig.get(TableSchemaOptions.SCHEMA) == null) {
214223
throw new RuntimeException(
215224
"Schema config need option [schema], please correct your config first");
216225
}
217226
TableSchema tableSchema = new ReadonlyConfigParser().parse(readonlyConfig);
227+
228+
ReadonlyConfig schemaConfig =
229+
readonlyConfig
230+
.getOptional(TableSchemaOptions.SCHEMA)
231+
.map(ReadonlyConfig::fromMap)
232+
.orElseThrow(
233+
() -> new IllegalArgumentException("Schema config can't be null"));
234+
235+
TablePath tablePath;
236+
if (StringUtils.isNotEmpty(
237+
schemaConfig.get(TableSchemaOptions.TableIdentifierOptions.TABLE))) {
238+
tablePath =
239+
TablePath.of(
240+
schemaConfig.get(TableSchemaOptions.TableIdentifierOptions.TABLE),
241+
schemaConfig.get(
242+
TableSchemaOptions.TableIdentifierOptions.SCHEMA_FIRST));
243+
} else {
244+
tablePath = TablePath.EMPTY;
245+
}
246+
218247
return CatalogTable.of(
219-
// TODO: other table info
220-
TableIdentifier.of("", "", ""),
248+
TableIdentifier.of(catalogName, tablePath),
221249
tableSchema,
222250
new HashMap<>(),
251+
// todo: add partitionKeys?
223252
new ArrayList<>(),
224-
"");
253+
readonlyConfig.get(TableSchemaOptions.TableIdentifierOptions.COMMENT));
225254
}
226255

227256
public static SeaTunnelRowType buildSimpleTextSchema() {

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,26 @@ public final class TablePath implements Serializable {
3434
private final String schemaName;
3535
private final String tableName;
3636

37+
public static final TablePath EMPTY = TablePath.of(null, null, null);
38+
3739
public static TablePath of(String fullName) {
40+
return of(fullName, false);
41+
}
42+
43+
public static TablePath of(String fullName, boolean schemaFirst) {
3844
String[] paths = fullName.split("\\.");
3945

46+
if (paths.length == 1) {
47+
return of(null, paths[0]);
48+
}
49+
4050
if (paths.length == 2) {
41-
return of(paths[0], paths[1]);
51+
if (schemaFirst) {
52+
return of(null, paths[0], paths[1]);
53+
}
54+
return of(paths[0], null, paths[1]);
4255
}
56+
4357
if (paths.length == 3) {
4458
return of(paths[0], paths[1], paths[2]);
4559
}

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/TableSchemaOptions.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,27 @@
2828

2929
public class TableSchemaOptions {
3030

31+
public static class TableIdentifierOptions {
32+
33+
public static final Option<Boolean> SCHEMA_FIRST =
34+
Options.key("schema_first")
35+
.booleanType()
36+
.defaultValue(false)
37+
.withDescription("Parse Schema First from table");
38+
39+
public static final Option<String> TABLE =
40+
Options.key("table")
41+
.stringType()
42+
.noDefaultValue()
43+
.withDescription("SeaTunnel Schema Full Table Name");
44+
45+
public static final Option<String> COMMENT =
46+
Options.key("comment")
47+
.stringType()
48+
.noDefaultValue()
49+
.withDescription("SeaTunnel Schema Table Comment");
50+
}
51+
3152
public static final Option<Map<String, Object>> SCHEMA =
3253
Options.key("schema")
3354
.type(new TypeReference<Map<String, Object>>() {})

seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertCatalogTableRule.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.seatunnel.api.table.catalog.Column;
2323
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
2424
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
25+
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
2526
import org.apache.seatunnel.api.table.catalog.TableSchema;
2627
import org.apache.seatunnel.connectors.seatunnel.assertion.exception.AssertConnectorException;
2728

@@ -48,6 +49,9 @@ public class AssertCatalogTableRule implements Serializable {
4849
@OptionMark(description = "column rule")
4950
private AssertColumnRule columnRule;
5051

52+
@OptionMark(description = "tableIdentifier rule")
53+
private AssertTableIdentifierRule tableIdentifierRule;
54+
5155
public void checkRule(CatalogTable catalogTable) {
5256
TableSchema tableSchema = catalogTable.getTableSchema();
5357
if (tableSchema == null) {
@@ -62,6 +66,9 @@ public void checkRule(CatalogTable catalogTable) {
6266
if (columnRule != null) {
6367
columnRule.checkRule(tableSchema.getColumns());
6468
}
69+
if (tableIdentifierRule != null) {
70+
tableIdentifierRule.checkRule(catalogTable.getTableId());
71+
}
6572
}
6673

6774
@Data
@@ -138,4 +145,24 @@ public void checkRule(List<Column> check) {
138145
}
139146
}
140147
}
148+
149+
@Data
150+
@AllArgsConstructor
151+
public static class AssertTableIdentifierRule implements Serializable {
152+
153+
private TableIdentifier tableIdentifier;
154+
155+
public void checkRule(TableIdentifier actiualTableIdentifier) {
156+
if (actiualTableIdentifier == null) {
157+
throw new AssertConnectorException(CATALOG_TABLE_FAILED, "tableIdentifier is null");
158+
}
159+
if (!actiualTableIdentifier.equals(tableIdentifier)) {
160+
throw new AssertConnectorException(
161+
CATALOG_TABLE_FAILED,
162+
String.format(
163+
"tableIdentifier: %s is not equal to %s",
164+
actiualTableIdentifier, tableIdentifier));
165+
}
166+
}
167+
}
141168
}

seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertCatalogTableRuleParser.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
2424
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
2525
import org.apache.seatunnel.api.table.catalog.SeaTunnelDataTypeConvertorUtil;
26+
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
27+
import org.apache.seatunnel.api.table.catalog.TablePath;
2628
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
2729
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
2830

@@ -46,6 +48,9 @@
4648
import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.PRIMARY_KEY_COLUMNS;
4749
import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.PRIMARY_KEY_NAME;
4850
import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.PRIMARY_KEY_RULE;
51+
import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.TableIdentifierRule.TABLE_IDENTIFIER_CATALOG_NAME;
52+
import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.TableIdentifierRule.TABLE_IDENTIFIER_RULE;
53+
import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.TableIdentifierRule.TABLE_IDENTIFIER_TABLE_NAME;
4954

5055
public class AssertCatalogTableRuleParser {
5156

@@ -55,6 +60,7 @@ public AssertCatalogTableRule parseCatalogTableRule(Config catalogTableRule) {
5560
parsePrimaryKeyRule(catalogTableRule).ifPresent(tableRule::setPrimaryKeyRule);
5661
parseConstraintKeyRule(catalogTableRule).ifPresent(tableRule::setConstraintKeyRule);
5762
parseColumnRule(catalogTableRule).ifPresent(tableRule::setColumnRule);
63+
parseTableIdentifierRule(catalogTableRule).ifPresent(tableRule::setTableIdentifierRule);
5864
return tableRule;
5965
}
6066

@@ -156,4 +162,17 @@ private Optional<AssertCatalogTableRule.AssertConstraintKeyRule> parseConstraint
156162
.collect(Collectors.toList());
157163
return Optional.of(new AssertCatalogTableRule.AssertConstraintKeyRule(constraintKeys));
158164
}
165+
166+
private Optional<AssertCatalogTableRule.AssertTableIdentifierRule> parseTableIdentifierRule(
167+
Config catalogTableRule) {
168+
if (!catalogTableRule.hasPath(TABLE_IDENTIFIER_RULE)) {
169+
return Optional.empty();
170+
}
171+
Config tableIdentifierRule = catalogTableRule.getConfig(TABLE_IDENTIFIER_RULE);
172+
TableIdentifier tableIdentifier =
173+
TableIdentifier.of(
174+
tableIdentifierRule.getString(TABLE_IDENTIFIER_CATALOG_NAME),
175+
TablePath.of(tableIdentifierRule.getString(TABLE_IDENTIFIER_TABLE_NAME)));
176+
return Optional.of(new AssertCatalogTableRule.AssertTableIdentifierRule(tableIdentifier));
177+
}
159178
}

seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertConfig.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,13 @@ public class AssertConfig {
6464
public static final String COLUMN_DEFAULT_VALUE = "default_value";
6565
public static final String COLUMN_COMMENT = "comment";
6666

67+
public static class TableIdentifierRule {
68+
public static final String TABLE_IDENTIFIER_RULE = "table_identifier_rule";
69+
70+
public static final String TABLE_IDENTIFIER_CATALOG_NAME = "catalog_name";
71+
public static final String TABLE_IDENTIFIER_TABLE_NAME = "table";
72+
}
73+
6774
public static final Option<String> COMMENT =
6875
Options.key("comment")
6976
.stringType()

seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,7 @@ public static FakeConfig buildWithConfig(Config config) {
416416
List<String> tableNames = config.getStringList(CatalogOptions.TABLE_NAMES.key());
417417
List<TableIdentifier> tableIdentifiers = new ArrayList<>(tableNames.size());
418418
for (String tableName : tableNames) {
419-
tableIdentifiers.add(TableIdentifier.of("fake", TablePath.of(tableName)));
419+
tableIdentifiers.add(TableIdentifier.of("FakeSource", TablePath.of(tableName)));
420420
}
421421
builder.tableIdentifiers(tableIdentifiers);
422422
}

0 commit comments

Comments
 (0)