Skip to content

Commit 49165a9

Browse files
committed
[Improve][Paimon] Add check for the base type between source and sink
1 parent 23a744b commit 49165a9

File tree

5 files changed

+148
-4
lines changed

5 files changed

+148
-4
lines changed

seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_ROW_KIND;
4646
import static org.apache.seatunnel.common.exception.CommonErrorCode.VERSION_NOT_SUPPORTED;
4747
import static org.apache.seatunnel.common.exception.CommonErrorCode.WRITE_SEATUNNEL_ROW_ERROR;
48+
import static org.apache.seatunnel.common.exception.CommonErrorCode.WRITE_SEATUNNEL_ROW_ERROR_WITH_FILEDS_NOT_MATCH;
49+
import static org.apache.seatunnel.common.exception.CommonErrorCode.WRITE_SEATUNNEL_ROW_ERROR_WITH_SCHEMA_INCOMPATIBLE_SCHEMA;
4850

4951
/**
5052
* The common error of SeaTunnel. This is an alternative to {@link CommonErrorCodeDeprecated} and is
@@ -245,4 +247,32 @@ public static SeaTunnelRuntimeException unsupportedRowKind(
245247
params.put("rowKind", rowKind);
246248
return new SeaTunnelRuntimeException(UNSUPPORTED_ROW_KIND, params);
247249
}
250+
251+
public static SeaTunnelRuntimeException writeRowErrorWithSchemaIncompatibleSchema(
252+
String connector,
253+
String sourceFiledName,
254+
String sourceFiledType,
255+
String exceptType,
256+
String sinkFiledName,
257+
String sinkFiledType) {
258+
Map<String, String> params = new HashMap<>();
259+
params.put("connector", connector);
260+
params.put("sourceFiledName", sourceFiledName);
261+
params.put("sourceFiledType", sourceFiledType);
262+
params.put("exceptType", exceptType);
263+
params.put("sinkFiledName", sinkFiledName);
264+
params.put("sinkFiledType", sinkFiledType);
265+
return new SeaTunnelRuntimeException(
266+
WRITE_SEATUNNEL_ROW_ERROR_WITH_SCHEMA_INCOMPATIBLE_SCHEMA, params);
267+
}
268+
269+
public static SeaTunnelRuntimeException writeRowErrorWithFiledsNotMatch(
270+
String connector, int sourceFieldsNum, int sinkFieldsNum) {
271+
Map<String, String> params = new HashMap<>();
272+
params.put("connector", connector);
273+
params.put("sourceFiledName", String.valueOf(sourceFieldsNum));
274+
params.put("sourceFiledType", String.valueOf(sinkFieldsNum));
275+
return new SeaTunnelRuntimeException(
276+
WRITE_SEATUNNEL_ROW_ERROR_WITH_FILEDS_NOT_MATCH, params);
277+
}
248278
}

seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,15 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {
6262
"COMMON-28",
6363
"'<identifier>' array type not support genericType '<genericType>' of '<fieldName>'"),
6464
UNSUPPORTED_ROW_KIND(
65-
"COMMON-29", "'<identifier>' table '<tableId>' not support rowKind '<rowKind>'");
65+
"COMMON-29", "'<identifier>' table '<tableId>' not support rowKind '<rowKind>'"),
66+
67+
WRITE_SEATUNNEL_ROW_ERROR_WITH_SCHEMA_INCOMPATIBLE_SCHEMA(
68+
"COMMON-30",
69+
"<connector> write SeaTunnelRow failed, The source filed with name '<sourceFiledName>' has the sql type '<sourceFiledType>', except datatype of sink is '<exceptType>'; but the filed with name '<sinkFiledName>' in sink table which actual type is '<sinkFiledType>'.Please check schema of sink table."),
70+
71+
WRITE_SEATUNNEL_ROW_ERROR_WITH_FILEDS_NOT_MATCH(
72+
"COMMON-31",
73+
"<connector> write SeaTunnelRow failed. The source has '<sourceFieldsNum>' fields, but paimon table of sink has '<sinkFieldsNum>' fields. Please check schema of sink table.");
6674

6775
private final String code;
6876
private final String description;

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -342,8 +342,14 @@ public static SeaTunnelRow convert(InternalRow rowData, SeaTunnelRowType seaTunn
342342
*/
343343
public static InternalRow reconvert(
344344
SeaTunnelRow seaTunnelRow, SeaTunnelRowType seaTunnelRowType, TableSchema tableSchema) {
345-
List<DataField> fields = tableSchema.fields();
346-
BinaryRow binaryRow = new BinaryRow(seaTunnelRowType.getTotalFields());
345+
List<DataField> sinkTotalFields = tableSchema.fields();
346+
int sourceTotalFields = seaTunnelRowType.getTotalFields();
347+
if (sourceTotalFields != sinkTotalFields.size()) {
348+
throw new CommonError()
349+
.writeRowErrorWithFiledsNotMatch(
350+
"Paimon", sourceTotalFields, sinkTotalFields.size());
351+
}
352+
BinaryRow binaryRow = new BinaryRow(sourceTotalFields);
347353
BinaryWriter binaryWriter = new BinaryRowWriter(binaryRow);
348354
// Convert SeaTunnel RowKind to Paimon RowKind
349355
org.apache.paimon.types.RowKind rowKind =
@@ -362,6 +368,7 @@ public static InternalRow reconvert(
362368
binaryWriter.setNullAt(i);
363369
continue;
364370
}
371+
checkCanWriteWithType(i, seaTunnelRowType, sinkTotalFields);
365372
String fieldName = seaTunnelRowType.getFieldName(i);
366373
switch (fieldTypes[i].getSqlType()) {
367374
case TINYINT:
@@ -408,7 +415,7 @@ public static InternalRow reconvert(
408415
.setValue(binaryWriter, i, DateTimeUtils.toInternal(date));
409416
break;
410417
case TIMESTAMP:
411-
DataField dataField = SchemaUtil.getDataField(fields, fieldName);
418+
DataField dataField = SchemaUtil.getDataField(sinkTotalFields, fieldName);
412419
int precision = ((TimestampType) dataField.type()).getPrecision();
413420
LocalDateTime datetime = (LocalDateTime) seaTunnelRow.getField(i);
414421
binaryWriter.writeTimestamp(
@@ -462,4 +469,24 @@ public static InternalRow reconvert(
462469
}
463470
return binaryRow;
464471
}
472+
473+
private static void checkCanWriteWithType(
474+
int i, SeaTunnelRowType seaTunnelRowType, List<DataField> fields) {
475+
String sourceFieldName = seaTunnelRowType.getFieldName(i);
476+
SeaTunnelDataType<?> sourceFieldType = seaTunnelRowType.getFieldType(i);
477+
DataField sinkDataField = fields.get(i);
478+
DataType exceptDataType =
479+
RowTypeConverter.reconvert(sourceFieldName, seaTunnelRowType.getFieldType(i));
480+
DataType sinkDataType = sinkDataField.type();
481+
if (!exceptDataType.getTypeRoot().equals(sinkDataType.getTypeRoot())) {
482+
throw new CommonError()
483+
.writeRowErrorWithSchemaIncompatibleSchema(
484+
"Paimon",
485+
sourceFieldName,
486+
sourceFieldType.getSqlType().toString(),
487+
exceptDataType.getTypeRoot().toString(),
488+
sinkDataField.name(),
489+
sinkDataField.type().getTypeRoot().toString());
490+
}
491+
}
465492
}

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.junit.jupiter.api.AfterAll;
4848
import org.junit.jupiter.api.Assertions;
4949
import org.junit.jupiter.api.BeforeAll;
50+
import org.junit.jupiter.api.Order;
5051
import org.junit.jupiter.api.TestTemplate;
5152
import org.testcontainers.containers.Container;
5253

@@ -120,6 +121,7 @@ public void testSinkWithMultipleInBatchMode(TestContainer container) throws Exce
120121
});
121122
}
122123

124+
@Order(1)
123125
@TestTemplate
124126
public void testFakeCDCSinkPaimon(TestContainer container) throws Exception {
125127
Container.ExecResult execResult = container.executeJob("/fake_cdc_sink_paimon_case1.conf");
@@ -148,6 +150,21 @@ public void testFakeCDCSinkPaimon(TestContainer container) throws Exception {
148150
});
149151
}
150152

153+
@Order(2)
154+
@TestTemplate
155+
public void testSinkWithIncompatibleSchema(TestContainer container) throws Exception {
156+
Container.ExecResult execResult = container.executeJob("/fake_cdc_sink_paimon_case1.conf");
157+
Assertions.assertEquals(0, execResult.getExitCode());
158+
Container.ExecResult errResult =
159+
container.executeJob("/fake_cdc_sink_paimon_case1_with_error_schema.conf");
160+
Assertions.assertEquals(1, errResult.getExitCode());
161+
Assertions.assertTrue(
162+
errResult
163+
.getStderr()
164+
.contains(
165+
"ErrorCode:[COMMON-30], ErrorDescription:[Paimon write SeaTunnelRow failed, The source filed with name 'name' has the sql type 'INT', except datatype of sink is 'INT'; but the filed with name 'name' in sink table which actual type is 'STRING'.Please check schema of sink table."));
166+
}
167+
151168
@TestTemplate
152169
public void testFakeMultipleTableSinkPaimon(TestContainer container) throws Exception {
153170
Container.ExecResult execResult = container.executeJob("/fake_cdc_sink_paimon_case2.conf");
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
######
18+
###### This config file is a demonstration of streaming processing in seatunnel config
19+
######
20+
21+
env {
22+
parallelism = 1
23+
job.mode = "BATCH"
24+
}
25+
26+
source {
27+
FakeSource {
28+
schema = {
29+
fields {
30+
pk_id = bigint
31+
name = int
32+
score = string
33+
}
34+
primaryKey {
35+
name = "pk_id"
36+
columnNames = [pk_id]
37+
}
38+
}
39+
rows = [
40+
{
41+
kind = INSERT
42+
fields = [1, 100, "A"]
43+
},
44+
{
45+
kind = INSERT
46+
fields = [2, 100, "B"]
47+
},
48+
{
49+
kind = INSERT
50+
fields = [3, 100, "C"]
51+
}
52+
]
53+
}
54+
}
55+
56+
sink {
57+
Paimon {
58+
warehouse = "file:///tmp/paimon"
59+
database = "seatunnel_namespace1"
60+
table = "st_test"
61+
}
62+
}

0 commit comments

Comments
 (0)