Skip to content

Commit bdf4aab

Browse files
wildpea龙妮
authored andcommitted
[improve][Connector-jdbc] add comments when schema not include all columns (apache#9559)
Co-authored-by: 龙妮 <[email protected]>
1 parent a04d452 commit bdf4aab

File tree

2 files changed

+43
-7
lines changed

2 files changed

+43
-7
lines changed

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2626
import org.apache.seatunnel.api.table.catalog.Column;
2727
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
28+
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
2829
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
2930
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
3031
import org.apache.seatunnel.api.table.catalog.TablePath;
@@ -319,13 +320,48 @@ static CatalogTable mergeCatalogTable(CatalogTable tableOfPath, CatalogTable tab
319320
tableOfPath.getTableId().getDatabaseName(),
320321
tableOfPath.getTableId().getSchemaName(),
321322
tableOfPath.getTableId().getTableName());
323+
List<Column> columnsWithComment =
324+
tableSchemaOfQuery.getColumns().stream()
325+
.map(
326+
column -> {
327+
return columnsOfPath.containsKey(column.getName())
328+
&& columnsOfPath
329+
.get(column.getName())
330+
.getDataType()
331+
.getSqlType()
332+
.equals(
333+
columnsOfQuery
334+
.get(column.getName())
335+
.getDataType()
336+
.getSqlType())
337+
? PhysicalColumn.of(
338+
column.getName(),
339+
column.getDataType(),
340+
column.getColumnLength() == null
341+
? null
342+
: Math.toIntExact(
343+
column.getColumnLength()),
344+
column.isNullable(),
345+
column.getDefaultValue(),
346+
columnsOfPath
347+
.get(column.getName())
348+
.getComment(),
349+
column.getSourceType(),
350+
column.isUnsigned(),
351+
column.isZeroFill(),
352+
column.getBitLen(),
353+
column.getOptions(),
354+
column.getLongColumnLength())
355+
: column;
356+
})
357+
.collect(Collectors.toList());
322358
CatalogTable mergedCatalogTable =
323359
CatalogTable.of(
324360
tableIdentifier,
325361
TableSchema.builder()
326362
.primaryKey(primaryKeyOfMerge)
327363
.constraintKey(constraintKeysOfMerge)
328-
.columns(tableSchemaOfQuery.getColumns())
364+
.columns(columnsWithComment)
329365
.build(),
330366
tableOfPath.getOptions(),
331367
partitionKeysOfMerge,

seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public class JdbcCatalogUtilsTest {
6767
null,
6868
false,
6969
null,
70-
null,
70+
"f1 comment",
7171
"int unsigned",
7272
false,
7373
false,
@@ -81,7 +81,7 @@ public class JdbcCatalogUtilsTest {
8181
10,
8282
false,
8383
null,
84-
null,
84+
"f2 comment",
8585
"varchar(10)",
8686
false,
8787
false,
@@ -95,7 +95,7 @@ public class JdbcCatalogUtilsTest {
9595
20,
9696
false,
9797
null,
98-
null,
98+
"f3 comment",
9999
"varchar(20)",
100100
false,
101101
false,
@@ -261,7 +261,7 @@ public void testColumnNotIncludeMerge() {
261261
null,
262262
true,
263263
null,
264-
null,
264+
"f1 comment",
265265
null,
266266
false,
267267
false,
@@ -275,7 +275,7 @@ public void testColumnNotIncludeMerge() {
275275
10,
276276
true,
277277
null,
278-
null,
278+
"f2 comment",
279279
null,
280280
false,
281281
false,
@@ -289,7 +289,7 @@ public void testColumnNotIncludeMerge() {
289289
20,
290290
false,
291291
null,
292-
null,
292+
"f3 comment",
293293
null,
294294
false,
295295
false,

0 commit comments

Comments
 (0)