Skip to content

Commit 9fb4fcb

Browse files
author
chenhongyu05
committed
[Feature][Jdbc] Add String type column split Support by charset-based splitting algorithm
1 parent 6da5d33 commit 9fb4fcb

File tree

6 files changed

+20
-9
lines changed

6 files changed

+20
-9
lines changed

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,5 +233,8 @@ public interface JdbcOptions {
233233
.withDescription("string_split_mode");
234234

235235
Option<String> STRING_SPLIT_MODE_COLLATE =
236-
Options.key("string_split_mode_collate").stringType().noDefaultValue().withDescription("string_split_mode_collate");
236+
Options.key("string_split_mode_collate")
237+
.stringType()
238+
.noDefaultValue()
239+
.withDescription("string_split_mode_collate");
237240
}

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@
1818
package org.apache.seatunnel.connectors.seatunnel.jdbc.config;
1919

2020
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
21+
import org.apache.seatunnel.connectors.seatunnel.jdbc.source.StringSplitMode;
2122

2223
import lombok.Builder;
2324
import lombok.Data;
24-
import org.apache.seatunnel.connectors.seatunnel.jdbc.source.StringSplitMode;
2525

2626
import java.io.Serializable;
2727
import java.util.List;

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,8 @@ private List<ChunkRange> splitTableIntoChunks(
131131
return evenlyColumnSplitChunks(table, splitColumnName, min, max, chunkSize);
132132
case STRING:
133133
if (useCharsetBasedStringSplitter) {
134-
return charsetBasedColumnSplitChunks(table, splitColumnName, min, max, chunkSize);
134+
return charsetBasedColumnSplitChunks(
135+
table, splitColumnName, min, max, chunkSize);
135136
} else {
136137
return evenlyColumnSplitChunks(table, splitColumnName, min, max, chunkSize);
137138
}

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/FixedChunkSplitter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,8 @@ private Collection<JdbcSourceSplit> getJdbcSourceSplits(
192192
@Override
193193
protected PreparedStatement createSplitStatement(JdbcSourceSplit split, TableSchema schema)
194194
throws SQLException {
195-
if (SqlType.STRING.equals(split.getSplitKeyType().getSqlType()) && !useCharsetBasedStringSplitter) {
195+
if (SqlType.STRING.equals(split.getSplitKeyType().getSqlType())
196+
&& !useCharsetBasedStringSplitter) {
196197
return createStringColumnSplitStatement(split);
197198
}
198199
if (split.getSplitStart() == null && split.getSplitEnd() == null) {

seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/CharsetBasedSplitterTest.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
1919

20-
import lombok.extern.slf4j.Slf4j;
2120
import org.junit.jupiter.api.DisplayName;
2221
import org.junit.jupiter.api.Test;
2322

23+
import lombok.extern.slf4j.Slf4j;
24+
2425
import java.math.BigInteger;
2526

2627
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -146,7 +147,12 @@ public void testPerformance() {
146147
long endTime = System.currentTimeMillis();
147148
long duration = endTime - startTime;
148149

149-
log.info("Executing " + iterations + " encoding/decoding operations took: " + duration + " milliseconds");
150+
log.info(
151+
"Executing "
152+
+ iterations
153+
+ " encoding/decoding operations took: "
154+
+ duration
155+
+ " milliseconds");
150156
log.info("Average time per operation: " + (double) duration / iterations + " milliseconds");
151157
}
152158

@@ -180,4 +186,4 @@ public void testRandomStrings() {
180186
assertEquals(randomString, decoded.trim());
181187
}
182188
}
183-
}
189+
}

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSplitIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -563,7 +563,7 @@ public void testDynamicCharSplit() {
563563
configMap.put("password", MYSQL_PASSWORD);
564564
configMap.put("table_path", MYSQL_DATABASE + "." + MYSQL_TABLE);
565565
configMap.put("split.size", "10");
566-
configMap.put("string_split_mode", "charsetBased");
566+
configMap.put("string_split_mode", "charset_based");
567567

568568
TablePath tablePathMySql = TablePath.of(MYSQL_DATABASE, MYSQL_TABLE);
569569
MySqlCatalog mySqlCatalog =
@@ -627,7 +627,7 @@ public void testFixedCharSplit() {
627627
configMap.put("user", MYSQL_USERNAME);
628628
configMap.put("password", MYSQL_PASSWORD);
629629
configMap.put("table_path", MYSQL_DATABASE + "." + MYSQL_TABLE);
630-
configMap.put("string_split_mode", "charsetBased");
630+
configMap.put("string_split_mode", "charset_based");
631631

632632
TablePath tablePathMySql = TablePath.of(MYSQL_DATABASE, MYSQL_TABLE);
633633
MySqlCatalog mySqlCatalog =

0 commit comments

Comments
 (0)