Skip to content

Commit c4338d6

Browse files
Carl-Zhou-CNfcb-xiaobo
authored andcommitted
[Fix][Kafka] Fix in kafka streaming mode can not read incremental data (apache#7871)
1 parent 867f261 commit c4338d6

File tree

5 files changed

+194
-82
lines changed

5 files changed

+194
-82
lines changed

docs/en/connector-v2/source/kafka.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor
5959
### Simple
6060

6161
> This example reads the data of kafka's topic_1, topic_2, topic_3 and prints it to the client.And if you have not yet installed and deployed SeaTunnel, you need to follow the instructions in Install SeaTunnel to install and deploy SeaTunnel. And if you have not yet installed and deployed SeaTunnel, you need to follow the instructions in [Install SeaTunnel](../../start-v2/locally/deployment.md) to install and deploy SeaTunnel. And then follow the instructions in [Quick Start With SeaTunnel Engine](../../start-v2/locally/quick-start-seatunnel-engine.md) to run this job.
62+
> In batch mode, during the enumerator sharding process, it will fetch the latest offset for each partition and use it as the stopping point.
6263
6364
```hocon
6465
# Defining the runtime environment

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,15 +104,22 @@ public SourceReader<SeaTunnelRow, KafkaSourceSplit> createReader(
104104
@Override
105105
public SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState> createEnumerator(
106106
SourceSplitEnumerator.Context<KafkaSourceSplit> enumeratorContext) {
107-
return new KafkaSourceSplitEnumerator(kafkaSourceConfig, enumeratorContext, null);
107+
return new KafkaSourceSplitEnumerator(
108+
kafkaSourceConfig,
109+
enumeratorContext,
110+
null,
111+
getBoundedness() == Boundedness.UNBOUNDED);
108112
}
109113

110114
@Override
111115
public SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState> restoreEnumerator(
112116
SourceSplitEnumerator.Context<KafkaSourceSplit> enumeratorContext,
113117
KafkaSourceState checkpointState) {
114118
return new KafkaSourceSplitEnumerator(
115-
kafkaSourceConfig, enumeratorContext, checkpointState);
119+
kafkaSourceConfig,
120+
enumeratorContext,
121+
checkpointState,
122+
getBoundedness() == Boundedness.UNBOUNDED);
116123
}
117124

118125
@Override

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,21 +70,25 @@ public class KafkaSourceSplitEnumerator
7070

7171
private final Map<String, TablePath> topicMappingTablePathMap = new HashMap<>();
7272

73+
private boolean isStreamingMode;
74+
7375
KafkaSourceSplitEnumerator(
7476
KafkaSourceConfig kafkaSourceConfig,
7577
Context<KafkaSourceSplit> context,
76-
KafkaSourceState sourceState) {
78+
KafkaSourceState sourceState,
79+
boolean isStreamingMode) {
7780
this.kafkaSourceConfig = kafkaSourceConfig;
7881
this.tablePathMetadataMap = kafkaSourceConfig.getMapMetadata();
7982
this.context = context;
8083
this.assignedSplit = new HashMap<>();
8184
this.pendingSplit = new HashMap<>();
8285
this.adminClient = initAdminClient(this.kafkaSourceConfig.getProperties());
8386
this.discoveryIntervalMillis = kafkaSourceConfig.getDiscoveryIntervalMillis();
87+
this.isStreamingMode = isStreamingMode;
8488
}
8589

8690
@VisibleForTesting
87-
protected KafkaSourceSplitEnumerator(
91+
public KafkaSourceSplitEnumerator(
8892
AdminClient adminClient,
8993
Map<TopicPartition, KafkaSourceSplit> pendingSplit,
9094
Map<TopicPartition, KafkaSourceSplit> assignedSplit) {
@@ -97,6 +101,16 @@ protected KafkaSourceSplitEnumerator(
97101
this.assignedSplit = assignedSplit;
98102
}
99103

104+
@VisibleForTesting
105+
public KafkaSourceSplitEnumerator(
106+
AdminClient adminClient,
107+
Map<TopicPartition, KafkaSourceSplit> pendingSplit,
108+
Map<TopicPartition, KafkaSourceSplit> assignedSplit,
109+
boolean isStreamingMode) {
110+
this(adminClient, pendingSplit, assignedSplit);
111+
this.isStreamingMode = isStreamingMode;
112+
}
113+
100114
@Override
101115
public void open() {
102116
if (discoveryIntervalMillis > 0) {
@@ -204,7 +218,7 @@ public void addSplitsBack(List<KafkaSourceSplit> splits, int subtaskId) {
204218
private Map<TopicPartition, ? extends KafkaSourceSplit> convertToNextSplit(
205219
List<KafkaSourceSplit> splits) {
206220
try {
207-
Map<TopicPartition, Long> listOffsets =
221+
Map<TopicPartition, Long> latestOffsets =
208222
listOffsets(
209223
splits.stream()
210224
.map(KafkaSourceSplit::getTopicPartition)
@@ -214,7 +228,10 @@ public void addSplitsBack(List<KafkaSourceSplit> splits, int subtaskId) {
214228
splits.forEach(
215229
split -> {
216230
split.setStartOffset(split.getEndOffset() + 1);
217-
split.setEndOffset(listOffsets.get(split.getTopicPartition()));
231+
split.setEndOffset(
232+
isStreamingMode
233+
? Long.MAX_VALUE
234+
: latestOffsets.get(split.getTopicPartition()));
218235
});
219236
return splits.stream()
220237
.collect(Collectors.toMap(KafkaSourceSplit::getTopicPartition, split -> split));
@@ -305,7 +322,10 @@ private Set<KafkaSourceSplit> getTopicInfo() throws ExecutionException, Interrup
305322
// Obtain the corresponding topic TablePath from kafka topic
306323
TablePath tablePath = topicMappingTablePathMap.get(partition.topic());
307324
KafkaSourceSplit split = new KafkaSourceSplit(tablePath, partition);
308-
split.setEndOffset(latestOffsets.get(split.getTopicPartition()));
325+
split.setEndOffset(
326+
isStreamingMode
327+
? Long.MAX_VALUE
328+
: latestOffsets.get(partition));
309329
return split;
310330
})
311331
.collect(Collectors.toSet());
@@ -344,6 +364,7 @@ private static int getSplitOwner(TopicPartition tp, int numReaders) {
344364
private Map<TopicPartition, Long> listOffsets(
345365
Collection<TopicPartition> partitions, OffsetSpec offsetSpec)
346366
throws ExecutionException, InterruptedException {
367+
347368
Map<TopicPartition, OffsetSpec> topicPartitionOffsets =
348369
partitions.stream()
349370
.collect(Collectors.toMap(partition -> partition, __ -> offsetSpec));
@@ -391,7 +412,8 @@ private void discoverySplits() throws ExecutionException, InterruptedException {
391412
assignSplit();
392413
}
393414

394-
private void fetchPendingPartitionSplit() throws ExecutionException, InterruptedException {
415+
@VisibleForTesting
416+
public void fetchPendingPartitionSplit() throws ExecutionException, InterruptedException {
395417
getTopicInfo()
396418
.forEach(
397419
split -> {
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kafka.clients.admin;
19+
20+
import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplit;
21+
import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplitEnumerator;
22+
23+
import org.apache.kafka.common.KafkaFuture;
24+
import org.apache.kafka.common.TopicPartition;
25+
import org.apache.kafka.common.TopicPartitionInfo;
26+
27+
import org.junit.jupiter.api.Assertions;
28+
import org.junit.jupiter.api.BeforeEach;
29+
import org.junit.jupiter.api.Test;
30+
import org.mockito.Mockito;
31+
32+
import java.util.Arrays;
33+
import java.util.Collections;
34+
import java.util.HashMap;
35+
import java.util.List;
36+
import java.util.Map;
37+
import java.util.Optional;
38+
import java.util.concurrent.ExecutionException;
39+
40+
class KafkaSourceSplitEnumeratorTest {
41+
42+
AdminClient adminClient = Mockito.mock(KafkaAdminClient.class);
43+
// prepare
44+
TopicPartition partition = new TopicPartition("test", 0);
45+
46+
@BeforeEach
47+
void init() {
48+
49+
Mockito.when(adminClient.listOffsets(Mockito.any(java.util.Map.class)))
50+
.thenReturn(
51+
new ListOffsetsResult(
52+
new HashMap<
53+
TopicPartition,
54+
KafkaFuture<ListOffsetsResult.ListOffsetsResultInfo>>() {
55+
{
56+
put(
57+
partition,
58+
KafkaFuture.completedFuture(
59+
new ListOffsetsResult.ListOffsetsResultInfo(
60+
0, 0, Optional.of(0))));
61+
}
62+
}));
63+
Mockito.when(adminClient.describeTopics(Mockito.any(java.util.Collection.class)))
64+
.thenReturn(
65+
DescribeTopicsResult.ofTopicNames(
66+
new HashMap<String, KafkaFuture<TopicDescription>>() {
67+
{
68+
put(
69+
partition.topic(),
70+
KafkaFuture.completedFuture(
71+
new TopicDescription(
72+
partition.topic(),
73+
false,
74+
Collections.singletonList(
75+
new TopicPartitionInfo(
76+
0,
77+
null,
78+
Collections
79+
.emptyList(),
80+
Collections
81+
.emptyList())))));
82+
}
83+
}));
84+
}
85+
86+
@Test
87+
void addSplitsBack() {
88+
// test
89+
Map<TopicPartition, KafkaSourceSplit> assignedSplit =
90+
new HashMap<TopicPartition, KafkaSourceSplit>() {
91+
{
92+
put(partition, new KafkaSourceSplit(null, partition));
93+
}
94+
};
95+
Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>();
96+
List<KafkaSourceSplit> splits = Arrays.asList(new KafkaSourceSplit(null, partition));
97+
KafkaSourceSplitEnumerator enumerator =
98+
new KafkaSourceSplitEnumerator(adminClient, pendingSplit, assignedSplit);
99+
enumerator.addSplitsBack(splits, 1);
100+
Assertions.assertTrue(pendingSplit.size() == splits.size());
101+
Assertions.assertNull(assignedSplit.get(partition));
102+
Assertions.assertTrue(pendingSplit.get(partition).getEndOffset() == 0);
103+
}
104+
105+
@Test
106+
void addStreamingSplitsBack() {
107+
// test
108+
Map<TopicPartition, KafkaSourceSplit> assignedSplit =
109+
new HashMap<TopicPartition, KafkaSourceSplit>() {
110+
{
111+
put(partition, new KafkaSourceSplit(null, partition));
112+
}
113+
};
114+
Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>();
115+
List<KafkaSourceSplit> splits =
116+
Collections.singletonList(new KafkaSourceSplit(null, partition));
117+
KafkaSourceSplitEnumerator enumerator =
118+
new KafkaSourceSplitEnumerator(adminClient, pendingSplit, assignedSplit, true);
119+
enumerator.addSplitsBack(splits, 1);
120+
Assertions.assertEquals(pendingSplit.size(), splits.size());
121+
Assertions.assertNull(assignedSplit.get(partition));
122+
Assertions.assertTrue(pendingSplit.get(partition).getEndOffset() == Long.MAX_VALUE);
123+
}
124+
125+
@Test
126+
void addStreamingSplits() throws ExecutionException, InterruptedException {
127+
// test
128+
Map<TopicPartition, KafkaSourceSplit> assignedSplit =
129+
new HashMap<TopicPartition, KafkaSourceSplit>();
130+
Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>();
131+
List<KafkaSourceSplit> splits =
132+
Collections.singletonList(new KafkaSourceSplit(null, partition));
133+
KafkaSourceSplitEnumerator enumerator =
134+
new KafkaSourceSplitEnumerator(adminClient, pendingSplit, assignedSplit, true);
135+
enumerator.fetchPendingPartitionSplit();
136+
Assertions.assertEquals(pendingSplit.size(), splits.size());
137+
Assertions.assertNotNull(pendingSplit.get(partition));
138+
Assertions.assertTrue(pendingSplit.get(partition).getEndOffset() == Long.MAX_VALUE);
139+
}
140+
141+
@Test
142+
void addplits() throws ExecutionException, InterruptedException {
143+
// test
144+
Map<TopicPartition, KafkaSourceSplit> assignedSplit =
145+
new HashMap<TopicPartition, KafkaSourceSplit>();
146+
Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>();
147+
List<KafkaSourceSplit> splits =
148+
Collections.singletonList(new KafkaSourceSplit(null, partition));
149+
KafkaSourceSplitEnumerator enumerator =
150+
new KafkaSourceSplitEnumerator(adminClient, pendingSplit, assignedSplit, false);
151+
enumerator.fetchPendingPartitionSplit();
152+
Assertions.assertEquals(pendingSplit.size(), splits.size());
153+
Assertions.assertNotNull(pendingSplit.get(partition));
154+
Assertions.assertTrue(pendingSplit.get(partition).getEndOffset() == 0);
155+
}
156+
}

seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumeratorTest.java

Lines changed: 0 additions & 74 deletions
This file was deleted.

0 commit comments

Comments
 (0)