diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java index 897d18ac5f0..d1488d00db1 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java @@ -38,7 +38,20 @@ public interface SourceSplitEnumerator void open(); - /** The method is executed by the engine only once. */ + /** + * Executes engine setup steps in a fixed, non‑concurrent sequence. + * + *

Before the first {@link #run()} invocation, methods are called in this order: + * + *

    + *
  1. {@link #open()} + *
  2. {@link #addSplitsBack(List, int)} + *
  3. {@link #registerReader(int)} + *
+ * + *

{@implNote The engine guarantees this invocation order and ensures there are no + * concurrency issues between these calls.} + */ void run() throws Exception; /** @@ -63,7 +76,13 @@ public interface SourceSplitEnumerator void registerReader(int subtaskId); - /** If the source is bounded, checkpoint is not triggered. */ + /** + * Used to snapshot the state of the enumerator. + * + *

Concurrency Consideration:
+ * This method and {@link #run()} can be invoked concurrently by different threads. + * Systematically manage shared state access to prevent race conditions. + */ StateT snapshotState(long checkpointId) throws Exception; /** diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/enumerator/TiDBSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/enumerator/TiDBSourceSplitEnumerator.java index 466d056bc1a..fc4b7dd332f 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/enumerator/TiDBSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/enumerator/TiDBSourceSplitEnumerator.java @@ -211,7 +211,9 @@ public void registerReader(int subtaskId) { */ @Override public TiDBSourceCheckpointState snapshotState(long checkpointId) throws Exception { - return new TiDBSourceCheckpointState(shouldEnumerate, pendingSplit); + synchronized (stateLock) { + return new TiDBSourceCheckpointState(shouldEnumerate, pendingSplit); + } } @Override diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/split/ClickhouseSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/split/ClickhouseSourceSplitEnumerator.java index 451d814e021..be90e842435 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/split/ClickhouseSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/split/ClickhouseSourceSplitEnumerator.java @@ -121,16 +121,14 @@ public void close() throws IOException { @Override public void addSplitsBack(List splits, int subtaskId) { if (!splits.isEmpty()) { - synchronized (stateLock) { - addPendingSplit(splits, subtaskId); - if (context.registeredReaders().contains(subtaskId)) { - assignSplit(Collections.singletonList(subtaskId)); - } else { - LOG.warn( - "Reader {} is not registered. Pending splits {} are not assigned.", - subtaskId, - splits); - } + addPendingSplit(splits, subtaskId); + if (context.registeredReaders().contains(subtaskId)) { + assignSplit(Collections.singletonList(subtaskId)); + } else { + LOG.warn( + "Reader {} is not registered. Pending splits {} are not assigned.", + subtaskId, + splits); } } LOG.info("Add back splits {} to JdbcSourceSplitEnumerator.", splits.size()); diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/split/DorisSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/split/DorisSourceSplitEnumerator.java index af18ac56295..c2f7cdf007b 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/split/DorisSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/split/DorisSourceSplitEnumerator.java @@ -107,16 +107,14 @@ public void run() { public void addSplitsBack(List splits, int subtaskId) { log.debug("Add back splits {} to DorisSourceSplitEnumerator.", splits); if (!splits.isEmpty()) { - synchronized (stateLock) { - addPendingSplit(splits); - if (context.registeredReaders().contains(subtaskId)) { - assignSplit(Collections.singletonList(subtaskId)); - } else { - log.warn( - "Reader {} is not registered. Pending splits {} are not assigned.", - subtaskId, - splits); - } + addPendingSplit(splits); + if (context.registeredReaders().contains(subtaskId)) { + assignSplit(Collections.singletonList(subtaskId)); + } else { + log.warn( + "Reader {} is not registered. Pending splits {} are not assigned.", + subtaskId, + splits); } } } @@ -137,9 +135,7 @@ public void handleSplitRequest(int subtaskId) { public void registerReader(int subtaskId) { log.debug("Register reader {} to DorisSourceSplitEnumerator.", subtaskId); if (!pendingSplit.isEmpty()) { - synchronized (stateLock) { - assignSplit(Collections.singletonList(subtaskId)); - } + assignSplit(Collections.singletonList(subtaskId)); } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java index 544c759e212..dadb7066cf2 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java @@ -43,6 +43,7 @@ public class FileSourceSplitEnumerator new TreeSet<>(Comparator.comparing(FileSourceSplit::splitId)); private Set assignedSplit; private final List filePaths; + private final Object lock = new Object(); private final AtomicInteger assignCount = new AtomicInteger(0); public FileSourceSplitEnumerator( @@ -69,7 +70,9 @@ public void open() { public void run() { for (int i = 0; i < context.currentParallelism(); i++) { LOGGER.info("Assigned splits to reader [{}]", i); - assignSplit(i); + synchronized (lock) { + assignSplit(i); + } } } @@ -139,7 +142,9 @@ public void registerReader(int subtaskId) { @Override public FileSourceState snapshotState(long checkpointId) { - return new FileSourceState(assignedSplit); + synchronized (lock) { + return new FileSourceState(assignedSplit); + } } @Override diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumerator.java index 4f5938b85ae..bfa60244793 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumerator.java @@ -46,6 +46,7 @@ public class MultipleTableFileSourceSplitEnumerator private final Set assignedSplit; private final Map> filePathMap; private final AtomicInteger assignCount = new AtomicInteger(0); + private final Object lock = new Object(); public MultipleTableFileSourceSplitEnumerator( Context context, @@ -107,7 +108,9 @@ public void registerReader(int subtaskId) {} @Override public FileSourceState snapshotState(long checkpointId) { - return new FileSourceState(assignedSplit); + synchronized (lock) { + return new FileSourceState(assignedSplit); + } } @Override @@ -155,7 +158,9 @@ private static int getSplitOwner(int assignCount, int numReaders) { public void run() throws Exception { for (int i = 0; i < context.currentParallelism(); i++) { log.info("Assigned splits to reader [{}]", i); - assignSplit(i); + synchronized (lock) { + assignSplit(i); + } } } diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/split/MultipleTableHiveSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/split/MultipleTableHiveSourceSplitEnumerator.java index d927f834363..96a8779e26d 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/split/MultipleTableHiveSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/split/MultipleTableHiveSourceSplitEnumerator.java @@ -47,6 +47,7 @@ public class MultipleTableHiveSourceSplitEnumerator private final Set assignedSplit; private final Map> filePathMap; private final AtomicInteger assignCount = new AtomicInteger(0); + private final Object lock = new Object(); public MultipleTableHiveSourceSplitEnumerator( SourceSplitEnumerator.Context context, @@ -108,7 +109,9 @@ public void registerReader(int subtaskId) {} @Override public FileSourceState snapshotState(long checkpointId) { - return new FileSourceState(assignedSplit); + synchronized (lock) { + return new FileSourceState(assignedSplit); + } } @Override @@ -156,7 +159,9 @@ private static int getSplitOwner(int assignCount, int numReaders) { public void run() throws Exception { for (int i = 0; i < context.currentParallelism(); i++) { log.info("Assigned splits to reader [{}]", i); - assignSplit(i); + synchronized (lock) { + assignSplit(i); + } } } diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java index 4dff9e67c94..39c874a63be 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java @@ -131,16 +131,14 @@ public void open() { @Override public void addSplitsBack(List splits, int subtaskId) { if (!splits.isEmpty()) { - synchronized (stateLock) { - addPendingSplits(splits); - if (context.registeredReaders().contains(subtaskId)) { - assignPendingSplits(Collections.singleton(subtaskId)); - } else { - log.warn( - "Reader {} is not registered. Pending splits {} are not assigned.", - subtaskId, - splits); - } + addPendingSplits(splits); + if (context.registeredReaders().contains(subtaskId)) { + assignPendingSplits(Collections.singleton(subtaskId)); + } else { + log.warn( + "Reader {} is not registered. Pending splits {} are not assigned.", + subtaskId, + splits); } } log.info("Add back splits {} to JdbcSourceSplitEnumerator.", splits.size()); @@ -163,9 +161,7 @@ public void handleSplitRequest(int subtaskId) {} @Override public void registerReader(int subtaskId) { log.debug("Adding reader {} to IcebergSourceEnumerator.", subtaskId); - synchronized (stateLock) { - assignPendingSplits(Collections.singleton(subtaskId)); - } + assignPendingSplits(Collections.singleton(subtaskId)); } @Override diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java index 303e84678d8..a0948dfe0dd 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java @@ -103,16 +103,14 @@ public void close() throws IOException { @Override public void addSplitsBack(List splits, int subtaskId) { if (!splits.isEmpty()) { - synchronized (stateLock) { - addPendingSplit(splits, subtaskId); - if (context.registeredReaders().contains(subtaskId)) { - assignSplit(Collections.singletonList(subtaskId)); - } else { - LOG.warn( - "Reader {} is not registered. Pending splits {} are not assigned.", - subtaskId, - splits); - } + addPendingSplit(splits, subtaskId); + if (context.registeredReaders().contains(subtaskId)) { + assignSplit(Collections.singletonList(subtaskId)); + } else { + LOG.warn( + "Reader {} is not registered. Pending splits {} are not assigned.", + subtaskId, + splits); } } LOG.info("Add back splits {} to JdbcSourceSplitEnumerator.", splits.size()); @@ -134,9 +132,7 @@ public void handleSplitRequest(int subtaskId) { public void registerReader(int subtaskId) { LOG.info("Register reader {} to JdbcSourceSplitEnumerator.", subtaskId); if (!pendingSplits.isEmpty()) { - synchronized (stateLock) { - assignSplit(Collections.singletonList(subtaskId)); - } + assignSplit(Collections.singletonList(subtaskId)); } } diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java index 024f6416a88..26c61ddb2ec 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java @@ -69,7 +69,7 @@ public class KafkaSourceSplitEnumerator private ScheduledExecutorService executor; private ScheduledFuture scheduledFuture; private volatile boolean initialized; - + private final Object lock = new Object(); private final Map topicMappingTablePathMap = new HashMap<>(); private boolean isStreamingMode; @@ -145,9 +145,13 @@ public void open() { @Override public void run() throws ExecutionException, InterruptedException { - fetchPendingPartitionSplit(); - setPartitionStartOffset(); - assignSplit(); + synchronized (lock) { + fetchPendingPartitionSplit(); + setPartitionStartOffset(); + } + synchronized (lock) { + assignSplit(); + } if (!initialized) { initialized = true; } @@ -288,7 +292,9 @@ public void registerReader(int subtaskId) { @Override public KafkaSourceState snapshotState(long checkpointId) throws Exception { - return new KafkaSourceState(new HashSet<>(assignedSplit.values())); + synchronized (lock) { + return new KafkaSourceState(new HashSet<>(assignedSplit.values())); + } } @Override diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceSplitEnumerator.java index ee6574c274d..469332da1cb 100644 --- a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceSplitEnumerator.java @@ -124,16 +124,14 @@ public void close() throws IOException { @Override public void addSplitsBack(List splits, int subtaskId) { if (!splits.isEmpty()) { - synchronized (stateLock) { - addPendingSplit(splits, subtaskId); - if (enumeratorContext.registeredReaders().contains(subtaskId)) { - assignSplit(Collections.singletonList(subtaskId)); - } else { - log.warn( - "Reader {} is not registered. Pending splits {} are not assigned.", - subtaskId, - splits); - } + addPendingSplit(splits, subtaskId); + if (enumeratorContext.registeredReaders().contains(subtaskId)) { + assignSplit(Collections.singletonList(subtaskId)); + } else { + log.warn( + "Reader {} is not registered. Pending splits {} are not assigned.", + subtaskId, + splits); } } log.info("Add back splits {} to JdbcSourceSplitEnumerator.", splits.size()); @@ -192,10 +190,8 @@ public void handleSplitRequest(int subtaskId) { @Override public void registerReader(int subtaskId) { log.debug("Register reader {} to KuduSourceSplitEnumerator.", subtaskId); - synchronized (stateLock) { - if (!pendingSplits.isEmpty()) { - assignSplit(Collections.singletonList(subtaskId)); - } + if (!pendingSplits.isEmpty()) { + assignSplit(Collections.singletonList(subtaskId)); } } diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplitEnumerator.java index f44f27cb0c3..e6685f0f850 100644 --- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplitEnumerator.java @@ -44,6 +44,7 @@ public class MaxcomputeSourceSplitEnumerator private Set assignedSplits; private final ReadonlyConfig readonlyConfig; private final Map sourceTableInfos; + private final Object stateLock = new Object(); public MaxcomputeSourceSplitEnumerator( SourceSplitEnumerator.Context enumeratorContext, @@ -70,8 +71,12 @@ public void open() {} @Override public void run() throws Exception { - discoverySplits(); - assignPendingSplits(); + synchronized (stateLock) { + discoverySplits(); + } + synchronized (stateLock) { + assignPendingSplits(); + } } @Override @@ -92,7 +97,9 @@ public void registerReader(int subtaskId) {} @Override public MaxcomputeSourceState snapshotState(long checkpointId) { - return new MaxcomputeSourceState(assignedSplits); + synchronized (stateLock) { + return new MaxcomputeSourceState(assignedSplits); + } } @Override diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumerator.java index 1308a73bef9..b1c242d682a 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumerator.java @@ -200,16 +200,14 @@ public void close() throws IOException { @Override public void addSplitsBack(List splits, int subtaskId) { if (!splits.isEmpty()) { - synchronized (stateLock) { - addPendingSplit(splits, subtaskId); - if (context.registeredReaders().contains(subtaskId)) { - assignSplit(Collections.singletonList(subtaskId)); - } else { - log.warn( - "Reader {} is not registered. Pending splits {} are not assigned.", - subtaskId, - splits); - } + addPendingSplit(splits, subtaskId); + if (context.registeredReaders().contains(subtaskId)) { + assignSplit(Collections.singletonList(subtaskId)); + } else { + log.warn( + "Reader {} is not registered. Pending splits {} are not assigned.", + subtaskId, + splits); } } log.info("Add back splits {} to JdbcSourceSplitEnumerator.", splits.size()); @@ -235,9 +233,7 @@ public void handleSplitRequest(int subtaskId) { public void registerReader(int subtaskId) { log.info("Register reader {} to MilvusSourceSplitEnumerator.", subtaskId); if (!pendingSplits.isEmpty()) { - synchronized (stateLock) { - assignSplit(Collections.singletonList(subtaskId)); - } + assignSplit(Collections.singletonList(subtaskId)); } } diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/enumerator/MongodbSplitEnumerator.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/enumerator/MongodbSplitEnumerator.java index 1aa30b2c169..423092d4668 100644 --- a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/enumerator/MongodbSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/enumerator/MongodbSplitEnumerator.java @@ -47,7 +47,7 @@ public class MongodbSplitEnumerator private final Context context; private final MongodbClientProvider clientProvider; - + private final Object stateLock = new Object(); private final MongoSplitStrategy strategy; public MongodbSplitEnumerator( @@ -74,14 +74,18 @@ public void open() {} @Override public synchronized void run() { log.info("Starting MongoSplitEnumerator."); - Set readers = context.registeredReaders(); - pendingSplits.addAll(strategy.split()); - MongoNamespace namespace = clientProvider.getDefaultCollection().getNamespace(); - log.info( - "Added {} pending splits for namespace {}.", - pendingSplits.size(), - namespace.getFullName()); - assignSplits(readers); + synchronized (stateLock) { + pendingSplits.addAll(strategy.split()); + MongoNamespace namespace = clientProvider.getDefaultCollection().getNamespace(); + log.info( + "Added {} pending splits for namespace {}.", + pendingSplits.size(), + namespace.getFullName()); + } + synchronized (stateLock) { + Set readers = context.registeredReaders(); + assignSplits(readers); + } } @Override @@ -121,7 +125,9 @@ public void registerReader(int subtaskId) { @Override public ArrayList snapshotState(long checkpointId) { - return pendingSplits; + synchronized (stateLock) { + return pendingSplits; + } } @Override diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/AbstractSplitEnumerator.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/AbstractSplitEnumerator.java index 887769488b6..2d748ad2578 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/AbstractSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/AbstractSplitEnumerator.java @@ -63,6 +63,7 @@ public abstract class AbstractSplitEnumerator protected Deque pendingSplits; protected final TableScan tableScan; + protected final Object stateLock = new Object(); private final int splitMaxNum; @@ -98,7 +99,9 @@ public void open() {} @Override public void run() throws Exception { - loadNewSplits(); + synchronized (stateLock) { + loadNewSplits(); + } } @Override @@ -129,7 +132,9 @@ public void registerReader(int subtaskId) { @Override public PaimonSourceState snapshotState(long checkpointId) throws Exception { - return new PaimonSourceState(pendingSplits, nextSnapshotId); + synchronized (stateLock) { + return new PaimonSourceState(pendingSplits, nextSnapshotId); + } } @Override diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonBatchSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonBatchSourceSplitEnumerator.java index b00b38587a8..46ecf427569 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonBatchSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonBatchSourceSplitEnumerator.java @@ -44,7 +44,9 @@ public PaimonBatchSourceSplitEnumerator( @Override public void run() throws Exception { - this.processDiscoveredSplits(this.scanNextSnapshot(), null); + synchronized (stateLock) { + this.processDiscoveredSplits(this.scanNextSnapshot(), null); + } Set readers = context.registeredReaders(); log.debug( "No more splits to assign." + " Sending NoMoreSplitsEvent to reader {}.", readers); @@ -53,7 +55,9 @@ public void run() throws Exception { @Override public PaimonSourceState snapshotState(long checkpointId) throws Exception { - return new PaimonSourceState(pendingSplits, null); + synchronized (stateLock) { + return new PaimonSourceState(pendingSplits, null); + } } @Override diff --git a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumerator.java b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumerator.java index e5fcef3e7a9..9986ae4272c 100644 --- a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumerator.java @@ -61,6 +61,7 @@ public class PulsarSplitEnumerator private final long partitionDiscoveryIntervalMs; private final StartCursor startCursor; private final StopCursor stopCursor; + private final Object stateLock = new Object(); /** The consumer group id used for this PulsarSource. */ private final String subscriptionName; @@ -152,9 +153,11 @@ public void run() throws Exception { } private void discoverySplits() { - Set subscribedTopicPartitions = - partitionDiscoverer.getSubscribedTopicPartitions(pulsarAdmin); - checkPartitionChanges(subscribedTopicPartitions); + synchronized (stateLock) { + Set subscribedTopicPartitions = + partitionDiscoverer.getSubscribedTopicPartitions(pulsarAdmin); + checkPartitionChanges(subscribedTopicPartitions); + } } private void checkPartitionChanges(Set fetchedPartitions) { @@ -299,7 +302,9 @@ public void registerReader(int subtaskId) { @Override public PulsarSplitEnumeratorState snapshotState(long checkpointId) throws Exception { - return new PulsarSplitEnumeratorState(assignedPartitions); + synchronized (stateLock) { + return new PulsarSplitEnumeratorState(assignedPartitions); + } } @Override diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java index 45ded447f2f..18d60dbba94 100644 --- a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java @@ -58,6 +58,7 @@ public class RocketMqSourceSplitEnumerator private final Map pendingSplit; private ScheduledExecutorService executor; private ScheduledFuture scheduledFuture; + private final Object lock = new Object(); // ms private long discoveryIntervalMillis; @@ -118,9 +119,14 @@ public void open() { @Override public void run() throws Exception { - fetchPendingPartitionSplit(); - setPartitionStartOffset(); - assignSplit(); + synchronized (lock) { + fetchPendingPartitionSplit(); + setPartitionStartOffset(); + } + + synchronized (lock) { + assignSplit(); + } } @Override @@ -159,7 +165,8 @@ public void addSplitsBack(List splits, int subtaskId) { split.setEndOffset(listOffsets.get(split.getMessageQueue())); }); return splits.stream() - .collect(Collectors.toMap(split -> split.getMessageQueue(), split -> split)); + .collect( + Collectors.toMap(RocketMqSourceSplit::getMessageQueue, split -> split)); } catch (Exception e) { throw new RocketMqConnectorException( RocketMqConnectorErrorCode.ADD_SPLIT_BACK_TO_ENUMERATOR_FAILED, e); @@ -185,7 +192,9 @@ public void registerReader(int subtaskId) { @Override public RocketMqSourceState snapshotState(long checkpointId) throws Exception { - return new RocketMqSourceState(assignedSplit.values().stream().collect(Collectors.toSet())); + synchronized (lock) { + return new RocketMqSourceState(new HashSet<>(assignedSplit.values())); + } } @Override @@ -194,8 +203,12 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { } private void discoverySplits() { - fetchPendingPartitionSplit(); - assignSplit(); + synchronized (lock) { + fetchPendingPartitionSplit(); + } + synchronized (lock) { + assignSplit(); + } } private void fetchPendingPartitionSplit() { diff --git a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceSplitEnumerator.java index f178d441a33..5a872eb7f33 100644 --- a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceSplitEnumerator.java @@ -56,6 +56,7 @@ public class SlsSourceSplitEnumerator private final Map pendingSplit; private final Map assignedSplit; + private final Object lock = new Object(); private SlsSourceState slsSourceState; private ScheduledExecutorService executor; @@ -124,8 +125,7 @@ public void open() { @Override public void run() throws Exception { - fetchPendingShardSplit(); - assignSplit(); + discoverySplits(); } @Override @@ -157,8 +157,12 @@ public void registerReader(int subtaskId) { public void notifyCheckpointComplete(long checkpointId) throws Exception {} private void discoverySplits() throws LogException { - fetchPendingShardSplit(); - assignSplit(); + synchronized (lock) { + fetchPendingShardSplit(); + } + synchronized (lock) { + assignSplit(); + } } private void fetchPendingShardSplit() throws LogException { @@ -296,7 +300,9 @@ private static int getSplitOwner(int shardId, int numReaders) { @Override public SlsSourceState snapshotState(long checkpointId) throws Exception { - return new SlsSourceState(new HashSet<>(assignedSplit.values())); + synchronized (lock) { + return new SlsSourceState(new HashSet<>(assignedSplit.values())); + } } public boolean checkConsumerGroupExists(String project, String logstore, String consumerGroup) diff --git a/seatunnel-connectors-v2/connector-typesense/src/main/java/org/apache/seatunnel/connectors/seatunnel/typesense/source/TypesenseSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-typesense/src/main/java/org/apache/seatunnel/connectors/seatunnel/typesense/source/TypesenseSourceSplitEnumerator.java index 0088a32a2b1..29125ea380a 100644 --- a/seatunnel-connectors-v2/connector-typesense/src/main/java/org/apache/seatunnel/connectors/seatunnel/typesense/source/TypesenseSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-typesense/src/main/java/org/apache/seatunnel/connectors/seatunnel/typesense/source/TypesenseSourceSplitEnumerator.java @@ -20,7 +20,6 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.source.SourceSplitEnumerator; import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; -import org.apache.seatunnel.connectors.seatunnel.typesense.client.TypesenseClient; import org.apache.seatunnel.connectors.seatunnel.typesense.config.TypesenseBaseOptions; import org.apache.seatunnel.connectors.seatunnel.typesense.config.TypesenseSourceOptions; import org.apache.seatunnel.connectors.seatunnel.typesense.dto.SourceCollectionInfo; @@ -45,8 +44,6 @@ public class TypesenseSourceSplitEnumerator private final ReadonlyConfig config; - private TypesenseClient typesenseClient; - private final Object stateLock = new Object(); private Map> pendingSplit; diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/java/org/apache/seatunnel/core/starter/flink/multitable/MultiTableSinkTest.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/java/org/apache/seatunnel/core/starter/flink/multitable/MultiTableSinkTest.java index 1cd17878833..2fcf40d4604 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/java/org/apache/seatunnel/core/starter/flink/multitable/MultiTableSinkTest.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/java/org/apache/seatunnel/core/starter/flink/multitable/MultiTableSinkTest.java @@ -22,6 +22,7 @@ import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs; import org.apache.seatunnel.e2e.sink.inmemory.InMemoryAggregatedCommitter; import org.apache.seatunnel.e2e.sink.inmemory.InMemorySinkWriter; +import org.apache.seatunnel.e2e.source.inmemory.InMemorySourceSplitEnumerator; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Order; @@ -41,7 +42,7 @@ public class MultiTableSinkTest { @Test public void testMultiTableSink() throws FileNotFoundException, URISyntaxException, CommandException { - String configurePath = "/config/fake_to_inmemory_multi_table.conf"; + String configurePath = "/config/inmemory_to_inmemory_multi_table.conf"; String configFile = getTestConfigFile(configurePath); FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs(); flinkCommandArgs.setConfigFile(configFile); @@ -73,6 +74,10 @@ public void testMultiTableSink() // Assertions.assertIterableEquals( // Collections.singletonList("InMemoryMultiTableResourceManager::close"), // committerResourceManagersEvents); + + Assertions.assertIterableEquals( + Arrays.asList("registerReader_0", "run"), + InMemorySourceSplitEnumerator.getMethodInvoked()); } public static String getTestConfigFile(String configFile) diff --git a/seatunnel-core/seatunnel-starter/src/test/resources/config/fake_to_inmemory_multi_table.conf b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/resources/config/inmemory_to_inmemory_multi_table.conf similarity index 81% rename from seatunnel-core/seatunnel-starter/src/test/resources/config/fake_to_inmemory_multi_table.conf rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/resources/config/inmemory_to_inmemory_multi_table.conf index 846f00e7af9..4778f59be3c 100644 --- a/seatunnel-core/seatunnel-starter/src/test/resources/config/fake_to_inmemory_multi_table.conf +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/resources/config/inmemory_to_inmemory_multi_table.conf @@ -24,17 +24,8 @@ env { } source { - # This is a example source plugin **only for test and demonstrate the feature source plugin** - FakeSource { + InMemorySource { plugin_output = "fake" - parallelism = 1 - schema = { - fields { - name = "string" - age = "int" - score = "double" - } - } } } diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/test/java/org/apache/seatunnel/core/starter/spark/multitable/MultiTableSinkTest.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/test/java/org/apache/seatunnel/core/starter/spark/multitable/MultiTableSinkTest.java index 41b4285391a..8212b366127 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/test/java/org/apache/seatunnel/core/starter/spark/multitable/MultiTableSinkTest.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/test/java/org/apache/seatunnel/core/starter/spark/multitable/MultiTableSinkTest.java @@ -23,6 +23,7 @@ import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs; import org.apache.seatunnel.e2e.sink.inmemory.InMemoryAggregatedCommitter; import org.apache.seatunnel.e2e.sink.inmemory.InMemorySinkWriter; +import org.apache.seatunnel.e2e.source.inmemory.InMemorySourceSplitEnumerator; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Order; @@ -45,7 +46,7 @@ public class MultiTableSinkTest { @Test public void testMultiTableSink() throws FileNotFoundException, URISyntaxException, CommandException { - String configurePath = "/config/fake_to_inmemory_multi_table.conf"; + String configurePath = "/config/inmemory_to_inmemory_multi_table.conf"; String configFile = getTestConfigFile(configurePath); SparkCommandArgs sparkCommandArgs = new SparkCommandArgs(); sparkCommandArgs.setConfigFile(configFile); @@ -76,6 +77,10 @@ public void testMultiTableSink() // Assertions.assertIterableEquals( // Collections.singletonList("InMemoryMultiTableResourceManager::close"), // committerResourceManagersEvents); + + Assertions.assertIterableEquals( + Arrays.asList("registerReader_0", "run"), + InMemorySourceSplitEnumerator.getMethodInvoked()); } public static String getTestConfigFile(String configFile) diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/test/resources/config/fake_to_inmemory_multi_table.conf b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/test/resources/config/inmemory_to_inmemory_multi_table.conf similarity index 89% rename from seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/test/resources/config/fake_to_inmemory_multi_table.conf rename to seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/test/resources/config/inmemory_to_inmemory_multi_table.conf index 8fe9317b4a3..88473e37795 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/test/resources/config/fake_to_inmemory_multi_table.conf +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/test/resources/config/inmemory_to_inmemory_multi_table.conf @@ -29,16 +29,8 @@ env { source { # This is a example source plugin **only for test and demonstrate the feature source plugin** - FakeSource { + InMemorySource { plugin_output = "fake" - parallelism = 1 - schema = { - fields { - name = "string" - age = "int" - score = "double" - } - } } } diff --git a/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/multitable/MultiTableSinkTest.java b/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/multitable/MultiTableSinkTest.java index a542964511e..cf39db9c89f 100644 --- a/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/multitable/MultiTableSinkTest.java +++ b/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/multitable/MultiTableSinkTest.java @@ -23,6 +23,7 @@ import org.apache.seatunnel.core.starter.seatunnel.args.ClientCommandArgs; import org.apache.seatunnel.e2e.sink.inmemory.InMemoryAggregatedCommitter; import org.apache.seatunnel.e2e.sink.inmemory.InMemorySinkWriter; +import org.apache.seatunnel.e2e.source.inmemory.InMemorySourceSplitEnumerator; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Order; @@ -45,7 +46,7 @@ public class MultiTableSinkTest { @DisabledOnOs(value = {OS.WINDOWS}) public void testMultiTableSink() throws FileNotFoundException, URISyntaxException, CommandException { - String configurePath = "/config/fake_to_inmemory_multi_table.conf"; + String configurePath = "/config/inmemory_to_inmemory_multi_table.conf"; String configFile = getTestConfigFile(configurePath); ClientCommandArgs clientCommandArgs = new ClientCommandArgs(); clientCommandArgs.setConfigFile(configFile); @@ -74,6 +75,10 @@ public void testMultiTableSink() Assertions.assertIterableEquals( Collections.singletonList("InMemoryMultiTableResourceManager::close"), committerResourceManagersEvents); + + Assertions.assertIterableEquals( + Arrays.asList("registerReader_0", "run"), + InMemorySourceSplitEnumerator.getMethodInvoked()); } public static String getTestConfigFile(String configFile) diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/resources/config/fake_to_inmemory_multi_table.conf b/seatunnel-core/seatunnel-starter/src/test/resources/config/inmemory_to_inmemory_multi_table.conf similarity index 88% rename from seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/resources/config/fake_to_inmemory_multi_table.conf rename to seatunnel-core/seatunnel-starter/src/test/resources/config/inmemory_to_inmemory_multi_table.conf index 846f00e7af9..f805d6508ec 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/resources/config/fake_to_inmemory_multi_table.conf +++ b/seatunnel-core/seatunnel-starter/src/test/resources/config/inmemory_to_inmemory_multi_table.conf @@ -25,16 +25,8 @@ env { source { # This is a example source plugin **only for test and demonstrate the feature source plugin** - FakeSource { + InMemorySource { plugin_output = "fake" - parallelism = 1 - schema = { - fields { - name = "string" - age = "int" - score = "double" - } - } } } diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySource.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySource.java new file mode 100644 index 00000000000..e27f1cffbcd --- /dev/null +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySource.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.source.inmemory; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +import java.util.Collections; +import java.util.List; + +public class InMemorySource + implements SeaTunnelSource { + + private final ReadonlyConfig config; + + public InMemorySource(ReadonlyConfig config) { + this.config = config; + } + + @Override + public String getPluginName() { + return "InMemorySource"; + } + + @Override + public List getProducedCatalogTables() { + return Collections.singletonList( + CatalogTable.of( + TableIdentifier.of("e2e", TablePath.DEFAULT), + TableSchema.builder().build(), + Collections.emptyMap(), + Collections.emptyList(), + "InMemorySource")); + } + + @Override + public SourceReader createReader( + SourceReader.Context readerContext) { + return new InMemorySourceReader(Collections.emptyList(), readerContext); + } + + @Override + public SourceSplitEnumerator createEnumerator( + SourceSplitEnumerator.Context enumeratorContext) { + return new InMemorySourceSplitEnumerator(enumeratorContext); + } + + @Override + public SourceSplitEnumerator restoreEnumerator( + SourceSplitEnumerator.Context enumeratorContext, + InMemoryState checkpointState) { + return new InMemorySourceSplitEnumerator(enumeratorContext); + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.BOUNDED; + } +} diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySourceFactory.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySourceFactory.java new file mode 100644 index 00000000000..4b6ae52f13b --- /dev/null +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySourceFactory.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.source.inmemory; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.connector.TableSource; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSourceFactory; +import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; + +import com.google.auto.service.AutoService; + +import java.io.Serializable; + +@AutoService(Factory.class) +public class InMemorySourceFactory implements TableSourceFactory { + @Override + public String factoryIdentifier() { + return "InMemorySource"; + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder().build(); + } + + @Override + public + TableSource createSource(TableSourceFactoryContext context) { + return () -> (SeaTunnelSource) new InMemorySource(context.getOptions()); + } + + @Override + public Class getSourceClass() { + return InMemorySource.class; + } +} diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySourceReader.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySourceReader.java new file mode 100644 index 00000000000..b8a366b2285 --- /dev/null +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySourceReader.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.source.inmemory; + +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +import java.util.Collections; +import java.util.Deque; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedDeque; + +public class InMemorySourceReader implements SourceReader { + + private final Iterator iterator; + private final SourceReader.Context context; + private final Deque sourceSplits = new ConcurrentLinkedDeque<>(); + private volatile boolean noMoreSplit; + + public InMemorySourceReader(List rows, SourceReader.Context context) { + this.iterator = rows.iterator(); + this.context = context; + } + + @Override + public void open() throws Exception {} + + @Override + public void close() {} + + @Override + public void pollNext(Collector output) throws Exception { + synchronized (output.getCheckpointLock()) { + InMemorySourceSplit split = sourceSplits.poll(); + if (null != split) { + while (iterator.hasNext()) { + SeaTunnelRow row = iterator.next(); + output.collect(row); + } + } else if (noMoreSplit && sourceSplits.isEmpty()) { + context.signalNoMoreElement(); + } else { + Thread.sleep(1000L); + } + } + } + + @Override + public List snapshotState(long checkpointId) throws Exception { + return Collections.emptyList(); + } + + @Override + public void addSplits(List splits) { + sourceSplits.addAll(splits); + } + + @Override + public void handleNoMoreSplits() { + noMoreSplit = true; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception {} +} diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySourceSplit.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySourceSplit.java new file mode 100644 index 00000000000..208eea694e6 --- /dev/null +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySourceSplit.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.source.inmemory; + +import org.apache.seatunnel.api.source.SourceSplit; + +public class InMemorySourceSplit implements SourceSplit { + + private final String splitId; + + public InMemorySourceSplit(String splitId) { + this.splitId = splitId; + } + + @Override + public String splitId() { + return splitId; + } + + @Override + public String toString() { + return "InMemorySourceSplit{" + "splitId='" + splitId + '\'' + '}'; + } +} diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySourceSplitEnumerator.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySourceSplitEnumerator.java new file mode 100644 index 00000000000..e576249aff2 --- /dev/null +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySourceSplitEnumerator.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.source.inmemory; + +import org.apache.seatunnel.api.source.SourceSplitEnumerator; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class InMemorySourceSplitEnumerator + implements SourceSplitEnumerator { + + private final Context context; + private final Object lock = new Object(); + + public static final List methodInvoked = new ArrayList<>(); + + public InMemorySourceSplitEnumerator(Context context) { + this.context = context; + } + + public static List getMethodInvoked() { + return methodInvoked; + } + + @Override + public void open() {} + + @Override + public void run() { + methodInvoked.add("run"); + for (int i = 0; i < context.currentParallelism(); i++) { + synchronized (lock) { + context.assignSplit(i, new InMemorySourceSplit("split-" + i)); + context.signalNoMoreSplits(i); + } + } + } + + @Override + public void close() throws IOException { + // do nothing + } + + @Override + public void addSplitsBack(List splits, int subtaskId) { + methodInvoked.add("addSplitsBack"); + } + + @Override + public int currentUnassignedSplitSize() { + return -1; + } + + @Override + public void registerReader(int subtaskId) { + methodInvoked.add("registerReader_" + subtaskId); + } + + @Override + public InMemoryState snapshotState(long checkpointId) { + synchronized (lock) { + return new InMemoryState(); + } + } + + @Override + public void notifyCheckpointComplete(long checkpointId) {} + + @Override + public void handleSplitRequest(int subtaskId) {} +} diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemoryState.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemoryState.java new file mode 100644 index 00000000000..7a8ff4b0bf8 --- /dev/null +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemoryState.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.source.inmemory; + +import java.io.Serializable; + +public class InMemoryState implements Serializable {} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java index 2fb46d5dd04..2cfd76c549b 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java @@ -224,7 +224,9 @@ public void receivedReader(TaskLocation readerId, Address memberAddr) SourceSplitEnumerator enumerator = getEnumerator(); this.addTaskMemberMapping(readerId, memberAddr); - enumerator.registerReader(readerId.getTaskIndex()); + synchronized (this) { + enumerator.registerReader(readerId.getTaskIndex()); + } int taskSize = taskMemberMapping.size(); if (maxReaderSize == taskSize) { readerRegisterComplete = true; @@ -303,7 +305,7 @@ private void stateProcess() throws Exception { reportTaskStatus(WAITING_RESTORE); break; case WAITING_RESTORE: - if (restoreComplete.isDone()) { + if (restoreComplete.isDone() && readerRegisterComplete) { currState = READY_START; reportTaskStatus(READY_START); } else { @@ -311,7 +313,7 @@ private void stateProcess() throws Exception { } break; case READY_START: - if (startCalled && readerRegisterComplete) { + if (startCalled) { currState = STARTING; } else { Thread.sleep(100); diff --git a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java index 4e5d864369f..842ee173dd6 100644 --- a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java +++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java @@ -145,22 +145,16 @@ public void open() throws Exception { (subtaskId, splits) -> { splitEnumerator.addSplitsBack(splits, subtaskId); }); - readerMap - .entrySet() - .parallelStream() - .forEach( - entry -> { - try { - entry.getValue().open(); - readerContextMap - .get(entry.getKey()) - .getEventListener() - .onEvent(new ReaderOpenEvent()); - splitEnumerator.registerReader(entry.getKey()); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); + readerMap.forEach( + (key, value) -> { + try { + value.open(); + readerContextMap.get(key).getEventListener().onEvent(new ReaderOpenEvent()); + splitEnumerator.registerReader(key); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); } @Override diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumerator.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumerator.java index 7d8052bfd18..c73cd863da5 100644 --- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumerator.java +++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumerator.java @@ -33,6 +33,7 @@ import java.io.IOException; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; /** @@ -56,7 +57,7 @@ public class FlinkSourceEnumerator private final Object lock = new Object(); - private volatile boolean isRun = false; + private AtomicBoolean isRun = new AtomicBoolean(false); private volatile int currentRegisterReaders = 0; @@ -82,30 +83,33 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname @Override public void addSplitsBack(List> splits, int subtaskId) { - sourceSplitEnumerator.addSplitsBack( - splits.stream().map(SplitWrapper::getSourceSplit).collect(Collectors.toList()), - subtaskId); + synchronized (lock) { + sourceSplitEnumerator.addSplitsBack( + splits.stream().map(SplitWrapper::getSourceSplit).collect(Collectors.toList()), + subtaskId); + } } @Override public void addReader(int subtaskId) { - sourceSplitEnumerator.registerReader(subtaskId); synchronized (lock) { + sourceSplitEnumerator.registerReader(subtaskId); currentRegisterReaders++; - if (!isRun && currentRegisterReaders == parallelism) { - try { - sourceSplitEnumerator.run(); - } catch (Exception e) { - throw new RuntimeException(e); - } - isRun = true; + } + if (currentRegisterReaders == parallelism && !isRun.getAndSet(true)) { + try { + sourceSplitEnumerator.run(); + } catch (Exception e) { + throw new RuntimeException(e); } } } @Override public EnumStateT snapshotState(long checkpointId) throws Exception { - return sourceSplitEnumerator.snapshotState(checkpointId); + synchronized (lock) { + return sourceSplitEnumerator.snapshotState(checkpointId); + } } @Override