diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java index e81166ffd9e..8fac3a6e5a3 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java @@ -25,6 +25,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.api.table.catalog.ConstraintKey; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; import org.apache.seatunnel.api.table.catalog.PrimaryKey; import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.catalog.TablePath; @@ -319,13 +320,48 @@ static CatalogTable mergeCatalogTable(CatalogTable tableOfPath, CatalogTable tab tableOfPath.getTableId().getDatabaseName(), tableOfPath.getTableId().getSchemaName(), tableOfPath.getTableId().getTableName()); + List columnsWithComment = + tableSchemaOfQuery.getColumns().stream() + .map( + column -> { + return columnsOfPath.containsKey(column.getName()) + && columnsOfPath + .get(column.getName()) + .getDataType() + .getSqlType() + .equals( + columnsOfQuery + .get(column.getName()) + .getDataType() + .getSqlType()) + ? PhysicalColumn.of( + column.getName(), + column.getDataType(), + column.getColumnLength() == null + ? null + : Math.toIntExact( + column.getColumnLength()), + column.isNullable(), + column.getDefaultValue(), + columnsOfPath + .get(column.getName()) + .getComment(), + column.getSourceType(), + column.isUnsigned(), + column.isZeroFill(), + column.getBitLen(), + column.getOptions(), + column.getLongColumnLength()) + : column; + }) + .collect(Collectors.toList()); CatalogTable mergedCatalogTable = CatalogTable.of( tableIdentifier, TableSchema.builder() .primaryKey(primaryKeyOfMerge) .constraintKey(constraintKeysOfMerge) - .columns(tableSchemaOfQuery.getColumns()) + .columns(columnsWithComment) .build(), tableOfPath.getOptions(), partitionKeysOfMerge, diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java index 223a5b345d1..3067e3c2ce5 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java @@ -67,7 +67,7 @@ public class JdbcCatalogUtilsTest { null, false, null, - null, + "f1 comment", "int unsigned", false, false, @@ -81,7 +81,7 @@ public class JdbcCatalogUtilsTest { 10, false, null, - null, + "f2 comment", "varchar(10)", false, false, @@ -95,7 +95,7 @@ public class JdbcCatalogUtilsTest { 20, false, null, - null, + "f3 comment", "varchar(20)", false, false, @@ -261,7 +261,7 @@ public void testColumnNotIncludeMerge() { null, true, null, - null, + "f1 comment", null, false, false, @@ -275,7 +275,7 @@ public void testColumnNotIncludeMerge() { 10, true, null, - null, + "f2 comment", null, false, false, @@ -289,7 +289,7 @@ public void testColumnNotIncludeMerge() { 20, false, null, - null, + "f3 comment", null, false, false,