Skip to content

Commit d74b042

Browse files
committed
[Bugifx][kafka] Fix kafka enumerator assign split NPE
1 parent e6413c3 commit d74b042

File tree

1 file changed

+8
-2
lines changed

1 file changed

+8
-2
lines changed

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ public class KafkaSourceSplitEnumerator
6868
private final Map<TopicPartition, KafkaSourceSplit> assignedSplit;
6969
private ScheduledExecutorService executor;
7070
private ScheduledFuture<?> scheduledFuture;
71+
private volatile boolean initialized;
7172

7273
private final Map<String, TablePath> topicMappingTablePathMap = new HashMap<>();
7374

@@ -128,7 +129,9 @@ public void open() {
128129
executor.scheduleWithFixedDelay(
129130
() -> {
130131
try {
131-
discoverySplits();
132+
if (initialized) {
133+
discoverySplits();
134+
}
132135
} catch (Exception e) {
133136
log.error("Dynamic discovery failure:", e);
134137
}
@@ -144,6 +147,9 @@ public void run() throws ExecutionException, InterruptedException {
144147
fetchPendingPartitionSplit();
145148
setPartitionStartOffset();
146149
assignSplit();
150+
if (!initialized) {
151+
initialized = true;
152+
}
147153
}
148154

149155
private void setPartitionStartOffset() throws ExecutionException, InterruptedException {
@@ -254,7 +260,7 @@ public void handleSplitRequest(int subtaskId) {
254260

255261
@Override
256262
public void registerReader(int subtaskId) {
257-
if (!pendingSplit.isEmpty()) {
263+
if (!pendingSplit.isEmpty() && initialized) {
258264
assignSplit();
259265
}
260266
}

0 commit comments

Comments
 (0)