Skip to content

Commit dc3c239

Browse files
authored
[Improve][Jdbc] Skip all index when auto create table to improve performance of write (apache#7288)
1 parent 16950a6 commit dc3c239

File tree

46 files changed

+1089
-84
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+1089
-84
lines changed

docs/en/connector-v2/sink/Jdbc.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ support `Xa transactions`. You can set `is_exactly_once=true` to enable it.
5858
| custom_sql | String | No | - |
5959
| enable_upsert | Boolean | No | true |
6060
| use_copy_statement | Boolean | No | false |
61+
| create_index | Boolean | No | true |
6162

6263
### driver [string]
6364

@@ -205,6 +206,12 @@ Use `COPY ${table} FROM STDIN` statement to import data. Only drivers with `getC
205206

206207
NOTICE: `MAP`, `ARRAY`, `ROW` types are not supported.
207208

209+
### create_index [boolean]
210+
211+
Create the index(contains primary key and any other indexes) or not when auto-create table. You can use this option to improve the performance of jdbc writes when migrating large tables.
212+
213+
Notice: Note that this will sacrifice read performance, so you'll need to manually create indexes after the table migration to improve read performance
214+
208215
## tips
209216

210217
In the case of is_exactly_once = "true", Xa transactions are used. This requires database support, and some databases require some setup :

seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ protected void dropTable() {
151151
catalog.dropTable(tablePath, true);
152152
}
153153

154-
protected void createTable() {
154+
protected void createTablePreCheck() {
155155
if (!catalog.databaseExists(tablePath.getDatabaseName())) {
156156
try {
157157
log.info(
@@ -175,6 +175,10 @@ protected void createTable() {
175175
} catch (UnsupportedOperationException ignore) {
176176
log.info("Creating table {}", tablePath);
177177
}
178+
}
179+
180+
protected void createTable() {
181+
createTablePreCheck();
178182
catalog.createTable(tablePath, catalogTable, true);
179183
}
180184

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,25 @@ default <T> void buildColumnsWithErrorCheck(
239239
void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists)
240240
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException;
241241

242+
/**
243+
* Create a new table in this catalog.
244+
*
245+
* @param tablePath Path of the table
246+
* @param table The table definition
247+
* @param ignoreIfExists Flag to specify behavior when a table with the given name already exist
248+
* @param createIndex If you want to create index or not
249+
* @throws TableAlreadyExistException thrown if the table already exists in the catalog and
250+
* ignoreIfExists is false
251+
* @throws DatabaseNotExistException thrown if the database in tablePath doesn't exist in the
252+
* catalog
253+
* @throws CatalogException in case of any runtime exception
254+
*/
255+
default void createTable(
256+
TablePath tablePath, CatalogTable table, boolean ignoreIfExists, boolean createIndex)
257+
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
258+
createTable(tablePath, table, ignoreIfExists);
259+
}
260+
242261
/**
243262
* Drop an existing table in this catalog.
244263
*

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,13 @@ && listTables(tablePath.getDatabaseName())
373373
@Override
374374
public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists)
375375
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
376+
createTable(tablePath, table, ignoreIfExists, true);
377+
}
378+
379+
@Override
380+
public void createTable(
381+
TablePath tablePath, CatalogTable table, boolean ignoreIfExists, boolean createIndex)
382+
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
376383
checkNotNull(tablePath, "Table path cannot be null");
377384

378385
if (!databaseExists(tablePath.getDatabaseName())) {
@@ -393,22 +400,25 @@ public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreI
393400
throw new TableAlreadyExistException(catalogName, tablePath);
394401
}
395402

396-
createTableInternal(tablePath, table);
403+
createTableInternal(tablePath, table, createIndex);
397404
}
398405

399-
protected String getCreateTableSql(TablePath tablePath, CatalogTable table) {
406+
protected String getCreateTableSql(
407+
TablePath tablePath, CatalogTable table, boolean createIndex) {
400408
throw new UnsupportedOperationException();
401409
}
402410

403-
protected List<String> getCreateTableSqls(TablePath tablePath, CatalogTable table) {
404-
return Collections.singletonList(getCreateTableSql(tablePath, table));
411+
protected List<String> getCreateTableSqls(
412+
TablePath tablePath, CatalogTable table, boolean createIndex) {
413+
return Collections.singletonList(getCreateTableSql(tablePath, table, createIndex));
405414
}
406415

407-
protected void createTableInternal(TablePath tablePath, CatalogTable table)
416+
protected void createTableInternal(TablePath tablePath, CatalogTable table, boolean createIndex)
408417
throws CatalogException {
409418
String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
410419
try {
411-
final List<String> createTableSqlList = getCreateTableSqls(tablePath, table);
420+
final List<String> createTableSqlList =
421+
getCreateTableSqls(tablePath, table, createIndex);
412422
for (String sql : createTableSqlList) {
413423
executeInternal(dbUrl, sql);
414424
}
@@ -646,7 +656,7 @@ public PreviewResult previewAction(
646656
ActionType actionType, TablePath tablePath, Optional<CatalogTable> catalogTable) {
647657
if (actionType == ActionType.CREATE_TABLE) {
648658
checkArgument(catalogTable.isPresent(), "CatalogTable cannot be null");
649-
return new SQLPreviewResult(getCreateTableSql(tablePath, catalogTable.get()));
659+
return new SQLPreviewResult(getCreateTableSql(tablePath, catalogTable.get(), true));
650660
} else if (actionType == ActionType.DROP_TABLE) {
651661
return new SQLPreviewResult(getDropTableSql(tablePath));
652662
} else if (actionType == ActionType.TRUNCATE_TABLE) {

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,4 +74,10 @@ public interface JdbcCatalogOptions {
7474
.noDefaultValue()
7575
.withDescription(
7676
"The table suffix name added when the table is automatically created");
77+
78+
Option<Boolean> CREATE_INDEX =
79+
Options.key("create_index")
80+
.booleanType()
81+
.defaultValue(true)
82+
.withDescription("Create index or not when auto create table");
7783
}

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,8 @@ protected String getListDatabaseSql() {
8888
}
8989

9090
@Override
91-
protected String getCreateTableSql(TablePath tablePath, CatalogTable table) {
91+
protected String getCreateTableSql(
92+
TablePath tablePath, CatalogTable table, boolean createIndex) {
9293
throw new UnsupportedOperationException();
9394
}
9495

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalog.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,9 @@ public IrisCatalog(
6666
}
6767

6868
@Override
69-
protected String getCreateTableSql(TablePath tablePath, CatalogTable table) {
70-
return new IrisCreateTableSqlBuilder(table).build(tablePath);
69+
protected String getCreateTableSql(
70+
TablePath tablePath, CatalogTable table, boolean createIndex) {
71+
return new IrisCreateTableSqlBuilder(table, createIndex).build(tablePath);
7172
}
7273

7374
@Override
@@ -224,7 +225,8 @@ public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
224225
}
225226

226227
@Override
227-
public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists)
228+
public void createTable(
229+
TablePath tablePath, CatalogTable table, boolean ignoreIfExists, boolean createIndex)
228230
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
229231
checkNotNull(tablePath, "Table path cannot be null");
230232
if (defaultSchema.isPresent()) {
@@ -242,7 +244,7 @@ public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreI
242244
throw new TableAlreadyExistException(catalogName, tablePath);
243245
}
244246

245-
createTableInternal(tablePath, table);
247+
createTableInternal(tablePath, table, createIndex);
246248
}
247249

248250
@Override

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCreateTableSqlBuilder.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,16 @@ public class IrisCreateTableSqlBuilder {
4040
private String fieldIde;
4141

4242
private String comment;
43+
private boolean createIndex;
4344

44-
public IrisCreateTableSqlBuilder(CatalogTable catalogTable) {
45+
public IrisCreateTableSqlBuilder(CatalogTable catalogTable, boolean createIndex) {
4546
this.columns = catalogTable.getTableSchema().getColumns();
4647
this.primaryKey = catalogTable.getTableSchema().getPrimaryKey();
4748
this.constraintKeys = catalogTable.getTableSchema().getConstraintKeys();
4849
this.sourceCatalogName = catalogTable.getCatalogName();
4950
this.fieldIde = catalogTable.getOptions().get("fieldIde");
5051
this.comment = catalogTable.getComment();
52+
this.createIndex = createIndex;
5153
}
5254

5355
public String build(TablePath tablePath) {
@@ -64,12 +66,13 @@ public String build(TablePath tablePath) {
6466
.collect(Collectors.toList());
6567

6668
// Add primary key directly in the create table statement
67-
if (primaryKey != null
69+
if (createIndex
70+
&& primaryKey != null
6871
&& primaryKey.getColumnNames() != null
6972
&& primaryKey.getColumnNames().size() > 0) {
7073
columnSqls.add(buildPrimaryKeySql(primaryKey));
7174
}
72-
if (CollectionUtils.isNotEmpty(constraintKeys)) {
75+
if (createIndex && CollectionUtils.isNotEmpty(constraintKeys)) {
7376
for (ConstraintKey constraintKey : constraintKeys) {
7477
if (StringUtils.isBlank(constraintKey.getConstraintName())
7578
|| (primaryKey != null

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/savemode/IrisSaveModeHandler.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,18 @@
3333

3434
@Slf4j
3535
public class IrisSaveModeHandler extends DefaultSaveModeHandler {
36+
public boolean createIndex;
37+
3638
public IrisSaveModeHandler(
3739
@Nonnull SchemaSaveMode schemaSaveMode,
3840
@Nonnull DataSaveMode dataSaveMode,
3941
@Nonnull Catalog catalog,
4042
@Nonnull TablePath tablePath,
4143
@Nullable CatalogTable catalogTable,
42-
@Nullable String customSql) {
44+
@Nullable String customSql,
45+
boolean createIndex) {
4346
super(schemaSaveMode, dataSaveMode, catalog, tablePath, catalogTable, customSql);
47+
this.createIndex = createIndex;
4448
}
4549

4650
@Override
@@ -53,7 +57,7 @@ protected void createTable() {
5357
Catalog.ActionType.CREATE_TABLE,
5458
tablePath,
5559
Optional.ofNullable(catalogTable)));
56-
catalog.createTable(tablePath, catalogTable, true);
60+
catalog.createTable(tablePath, catalogTable, true, createIndex);
5761
} catch (UnsupportedOperationException ignore) {
5862
log.info("Creating table {}", tablePath);
5963
}

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,8 +181,9 @@ protected Column buildColumn(ResultSet resultSet) throws SQLException {
181181
}
182182

183183
@Override
184-
protected String getCreateTableSql(TablePath tablePath, CatalogTable table) {
185-
return MysqlCreateTableSqlBuilder.builder(tablePath, table, typeConverter)
184+
protected String getCreateTableSql(
185+
TablePath tablePath, CatalogTable table, boolean createIndex) {
186+
return MysqlCreateTableSqlBuilder.builder(tablePath, table, typeConverter, createIndex)
186187
.build(table.getCatalogName());
187188
}
188189

0 commit comments

Comments
 (0)