Skip to content

Commit abc9a1c

Browse files
committed
[Improve][Zeta] Support close idle task for multiple sinks
1 parent 8f2049b commit abc9a1c

File tree

3 files changed

+6
-4
lines changed

3 files changed

+6
-4
lines changed

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ transform {
4343
}
4444

4545
sink {
46+
console {
47+
source_table_name = "customers_mysql_cdc"
48+
}
4649
jdbc {
4750
source_table_name = "customers_mysql_cdc"
4851
url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc2"

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_two_table.conf

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ transform {
4343
}
4444

4545
sink {
46+
console {
47+
source_table_name = "customers_mysql_cdc"
48+
}
4649
jdbc {
4750
source_table_name = "customers_mysql_cdc"
4851
url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc2"

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -441,10 +441,6 @@ protected void readyToCloseIdleTask(TaskLocation taskLocation) {
441441
subTask.getJobId());
442442
}
443443
}
444-
if (subTaskList.size() != 2) {
445-
throw new UnsupportedOperationException(
446-
"Unsupported close not reader/writer task group: " + subTaskList);
447-
}
448444
readyToCloseIdleTask.addAll(subTaskList);
449445
tryTriggerPendingCheckpoint(CheckpointType.CHECKPOINT_TYPE);
450446
}

0 commit comments

Comments
 (0)