Skip to content

Commit fa34ac9

Browse files
authored
[Improve][API] Check catalog table fields name legal before send to downstream (apache#7358)
* [Improve][API] Check catalog table fields name legal before send to downstream * update
1 parent 068c5e3 commit fa34ac9

File tree

5 files changed

+136
-1
lines changed

5 files changed

+136
-1
lines changed

.github/workflows/backend.yml

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -553,6 +553,8 @@ jobs:
553553
java-version: ${{ matrix.java }}
554554
distribution: 'temurin'
555555
cache: 'maven'
556+
- name: free disk space
557+
run: tools/github/free_disk_space.sh
556558
- name: run seatunnel zeta integration test
557559
if: needs.changes.outputs.api == 'true'
558560
run: |
@@ -609,6 +611,8 @@ jobs:
609611
java-version: ${{ matrix.java }}
610612
distribution: 'temurin'
611613
cache: 'maven'
614+
- name: free disk space
615+
run: tools/github/free_disk_space.sh
612616
- name: run transform-v2 integration test (part-1)
613617
if: needs.changes.outputs.api == 'true'
614618
run: |
@@ -633,6 +637,8 @@ jobs:
633637
java-version: ${{ matrix.java }}
634638
distribution: 'temurin'
635639
cache: 'maven'
640+
- name: free disk space
641+
run: tools/github/free_disk_space.sh
636642
- name: run transform-v2 integration test (part-2)
637643
if: needs.changes.outputs.api == 'true'
638644
run: |
@@ -657,6 +663,8 @@ jobs:
657663
java-version: ${{ matrix.java }}
658664
distribution: 'temurin'
659665
cache: 'maven'
666+
- name: free disk space
667+
run: tools/github/free_disk_space.sh
660668
- name: run connector-v2 integration test (part-1)
661669
if: needs.changes.outputs.api == 'true'
662670
run: |
@@ -684,6 +692,8 @@ jobs:
684692
java-version: ${{ matrix.java }}
685693
distribution: 'temurin'
686694
cache: 'maven'
695+
- name: free disk space
696+
run: tools/github/free_disk_space.sh
687697
- name: run connector-v2 integration test (part-2)
688698
if: needs.changes.outputs.api == 'true'
689699
run: |
@@ -711,6 +721,8 @@ jobs:
711721
java-version: ${{ matrix.java }}
712722
distribution: 'temurin'
713723
cache: 'maven'
724+
- name: free disk space
725+
run: tools/github/free_disk_space.sh
714726
- name: run connector-v2 integration test (part-3)
715727
if: needs.changes.outputs.api == 'true'
716728
run: |
@@ -738,6 +750,8 @@ jobs:
738750
java-version: ${{ matrix.java }}
739751
distribution: 'temurin'
740752
cache: 'maven'
753+
- name: free disk space
754+
run: tools/github/free_disk_space.sh
741755
- name: run connector-v2 integration test (part-4)
742756
if: needs.changes.outputs.api == 'true'
743757
run: |
@@ -765,6 +779,8 @@ jobs:
765779
java-version: ${{ matrix.java }}
766780
distribution: 'temurin'
767781
cache: 'maven'
782+
- name: free disk space
783+
run: tools/github/free_disk_space.sh
768784
- name: run connector-v2 integration test (part-5)
769785
if: needs.changes.outputs.api == 'true'
770786
run: |
@@ -792,6 +808,8 @@ jobs:
792808
java-version: ${{ matrix.java }}
793809
distribution: 'temurin'
794810
cache: 'maven'
811+
- name: free disk space
812+
run: tools/github/free_disk_space.sh
795813
- name: run connector-v2 integration test (part-6)
796814
if: needs.changes.outputs.api == 'true'
797815
run: |
@@ -819,6 +837,8 @@ jobs:
819837
java-version: ${{ matrix.java }}
820838
distribution: 'temurin'
821839
cache: 'maven'
840+
- name: free disk space
841+
run: tools/github/free_disk_space.sh
822842
- name: run connector-v2 integration test (part-7)
823843
if: needs.changes.outputs.api == 'true'
824844
run: |
@@ -898,6 +918,8 @@ jobs:
898918
java-version: ${{ matrix.java }}
899919
distribution: 'temurin'
900920
cache: 'maven'
921+
- name: free disk space
922+
run: tools/github/free_disk_space.sh
901923
- name: run jdbc connectors integration test (part-3)
902924
if: needs.changes.outputs.api == 'true'
903925
run: |
@@ -922,6 +944,8 @@ jobs:
922944
java-version: ${{ matrix.java }}
923945
distribution: 'temurin'
924946
cache: 'maven'
947+
- name: free disk space
948+
run: tools/github/free_disk_space.sh
925949
- name: run jdbc connectors integration test (part-4)
926950
if: needs.changes.outputs.api == 'true'
927951
run: |
@@ -946,6 +970,8 @@ jobs:
946970
java-version: ${{ matrix.java }}
947971
distribution: 'temurin'
948972
cache: 'maven'
973+
- name: free disk space
974+
run: tools/github/free_disk_space.sh
949975
- name: run jdbc connectors integration test (part-5)
950976
if: needs.changes.outputs.api == 'true'
951977
run: |
@@ -996,6 +1022,8 @@ jobs:
9961022
java-version: ${{ matrix.java }}
9971023
distribution: 'temurin'
9981024
cache: 'maven'
1025+
- name: free disk space
1026+
run: tools/github/free_disk_space.sh
9991027
- name: run jdbc connectors integration test (part-7)
10001028
if: needs.changes.outputs.api == 'true'
10011029
run: |
@@ -1020,6 +1048,8 @@ jobs:
10201048
java-version: ${{ matrix.java }}
10211049
distribution: 'temurin'
10221050
cache: 'maven'
1051+
- name: free disk space
1052+
run: tools/github/free_disk_space.sh
10231053
- name: run kudu connector integration test
10241054
run: |
10251055
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-kudu-e2e -am -Pci
@@ -1043,6 +1073,8 @@ jobs:
10431073
java-version: ${{ matrix.java }}
10441074
distribution: 'temurin'
10451075
cache: 'maven'
1076+
- name: free disk space
1077+
run: tools/github/free_disk_space.sh
10461078
- name: run amazonsqs connector integration test
10471079
run: |
10481080
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-amazonsqs-e2e -am -Pci
@@ -1066,6 +1098,8 @@ jobs:
10661098
java-version: ${{ matrix.java }}
10671099
distribution: 'temurin'
10681100
cache: 'maven'
1101+
- name: free disk space
1102+
run: tools/github/free_disk_space.sh
10691103
- name: run kafka connector integration test
10701104
run: |
10711105
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-kafka-e2e -am -Pci
@@ -1089,6 +1123,8 @@ jobs:
10891123
java-version: ${{ matrix.java }}
10901124
distribution: 'temurin'
10911125
cache: 'maven'
1126+
- name: free disk space
1127+
run: tools/github/free_disk_space.sh
10921128
- name: run rocket connector integration test
10931129
run: |
10941130
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-rocketmq-e2e -am -Pci
@@ -1139,6 +1175,8 @@ jobs:
11391175
java-version: ${{ matrix.java }}
11401176
distribution: 'temurin'
11411177
cache: 'maven'
1178+
- name: free disk space
1179+
run: tools/github/free_disk_space.sh
11421180
- name: run oracle cdc connector integration test
11431181
run: |
11441182
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-cdc-oracle-e2e -am -Pci

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,16 @@
1818
package org.apache.seatunnel.api.table.factory;
1919

2020
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
21+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
22+
import org.apache.seatunnel.common.utils.SeaTunnelException;
23+
24+
import org.apache.commons.lang3.StringUtils;
2125

2226
import lombok.Getter;
2327

28+
import java.util.ArrayList;
29+
import java.util.List;
30+
2431
@Getter
2532
public abstract class TableFactoryContext {
2633

@@ -31,4 +38,25 @@ public TableFactoryContext(ReadonlyConfig options, ClassLoader classLoader) {
3138
this.options = options;
3239
this.classLoader = classLoader;
3340
}
41+
42+
protected static void checkCatalogTableIllegal(List<CatalogTable> catalogTables) {
43+
for (CatalogTable catalogTable : catalogTables) {
44+
List<String> alreadyChecked = new ArrayList<>();
45+
for (String fieldName : catalogTable.getTableSchema().getFieldNames()) {
46+
if (StringUtils.isBlank(fieldName)) {
47+
throw new SeaTunnelException(
48+
String.format(
49+
"Table %s field name cannot be empty",
50+
catalogTable.getTablePath().getFullName()));
51+
}
52+
if (alreadyChecked.contains(fieldName)) {
53+
throw new SeaTunnelException(
54+
String.format(
55+
"Table %s field %s duplicate",
56+
catalogTable.getTablePath().getFullName(), fieldName));
57+
}
58+
alreadyChecked.add(fieldName);
59+
}
60+
}
61+
}
3462
}

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactoryContext.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,24 @@
2121
import org.apache.seatunnel.api.sink.TablePlaceholder;
2222
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2323

24+
import com.google.common.annotations.VisibleForTesting;
2425
import lombok.Getter;
2526

2627
import java.util.Collection;
28+
import java.util.Collections;
2729

2830
@Getter
2931
public class TableSinkFactoryContext extends TableFactoryContext {
3032

3133
private final CatalogTable catalogTable;
3234

33-
protected TableSinkFactoryContext(
35+
@VisibleForTesting
36+
public TableSinkFactoryContext(
3437
CatalogTable catalogTable, ReadonlyConfig options, ClassLoader classLoader) {
3538
super(options, classLoader);
39+
if (catalogTable != null) {
40+
checkCatalogTableIllegal(Collections.singletonList(catalogTable));
41+
}
3642
this.catalogTable = catalogTable;
3743
}
3844

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactoryContext.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public class TableTransformFactoryContext extends TableFactoryContext {
3232
public TableTransformFactoryContext(
3333
List<CatalogTable> catalogTables, ReadonlyConfig options, ClassLoader classLoader) {
3434
super(options, classLoader);
35+
checkCatalogTableIllegal(catalogTables);
3536
this.catalogTables = catalogTables;
3637
}
3738
}

seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableTest.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,11 @@
1818
package org.apache.seatunnel.api.table.catalog;
1919

2020
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
21+
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
22+
import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
23+
import org.apache.seatunnel.api.table.type.BasicType;
2124
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
25+
import org.apache.seatunnel.common.utils.SeaTunnelException;
2226

2327
import org.junit.jupiter.api.Assertions;
2428
import org.junit.jupiter.api.Test;
@@ -89,4 +93,62 @@ public void testReadCatalogTableWithUnsupportedType() {
8993
});
9094
Assertions.assertEquals(result, exception.getParamsValueAs("tableUnsupportedTypes"));
9195
}
96+
97+
@Test
98+
public void testCatalogTableWithIllegalFieldNames() {
99+
CatalogTable catalogTable =
100+
CatalogTable.of(
101+
TableIdentifier.of("catalog", "database", "table"),
102+
TableSchema.builder()
103+
.column(
104+
PhysicalColumn.of(
105+
" ", BasicType.STRING_TYPE, 1L, true, null, ""))
106+
.build(),
107+
Collections.emptyMap(),
108+
Collections.emptyList(),
109+
"comment");
110+
SeaTunnelException exception =
111+
Assertions.assertThrows(
112+
SeaTunnelException.class,
113+
() ->
114+
new TableTransformFactoryContext(
115+
Collections.singletonList(catalogTable), null, null));
116+
SeaTunnelException exception2 =
117+
Assertions.assertThrows(
118+
SeaTunnelException.class,
119+
() -> new TableSinkFactoryContext(catalogTable, null, null));
120+
Assertions.assertEquals(
121+
"Table database.table field name cannot be empty", exception.getMessage());
122+
Assertions.assertEquals(
123+
"Table database.table field name cannot be empty", exception2.getMessage());
124+
125+
CatalogTable catalogTable2 =
126+
CatalogTable.of(
127+
TableIdentifier.of("catalog", "database", "table"),
128+
TableSchema.builder()
129+
.column(
130+
PhysicalColumn.of(
131+
"name1", BasicType.STRING_TYPE, 1L, true, null, ""))
132+
.column(
133+
PhysicalColumn.of(
134+
"name1", BasicType.STRING_TYPE, 1L, true, null, ""))
135+
.build(),
136+
Collections.emptyMap(),
137+
Collections.emptyList(),
138+
"comment");
139+
SeaTunnelException exception3 =
140+
Assertions.assertThrows(
141+
SeaTunnelException.class,
142+
() ->
143+
new TableTransformFactoryContext(
144+
Collections.singletonList(catalogTable2), null, null));
145+
SeaTunnelException exception4 =
146+
Assertions.assertThrows(
147+
SeaTunnelException.class,
148+
() -> new TableSinkFactoryContext(catalogTable2, null, null));
149+
Assertions.assertEquals(
150+
"Table database.table field name1 duplicate", exception3.getMessage());
151+
Assertions.assertEquals(
152+
"Table database.table field name1 duplicate", exception4.getMessage());
153+
}
92154
}

0 commit comments

Comments
 (0)