Skip to content

Commit 0999515

Browse files
authored
[Bugfix][AmazonDynamoDB] Fix the problem that all table data cannot be obtained (#5146)
1 parent aae3e91 commit 0999515

File tree

1 file changed

+21
-12
lines changed
  • seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source

1 file changed

+21
-12
lines changed

seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,13 @@
3131
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
3232
import software.amazon.awssdk.regions.Region;
3333
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
34+
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
3435
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
3536
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
3637

3738
import java.io.IOException;
3839
import java.net.URI;
40+
import java.util.Map;
3941

4042
@Slf4j
4143
public class AmazonDynamoDBSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
@@ -78,18 +80,25 @@ public void close() throws IOException {
7880
@Override
7981
@SuppressWarnings("magicnumber")
8082
public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
81-
ScanResponse scan =
82-
dynamoDbClient.scan(
83-
ScanRequest.builder()
84-
.tableName(amazondynamodbSourceOptions.getTable())
85-
.build());
86-
if (scan.hasItems()) {
87-
scan.items()
88-
.forEach(
89-
item -> {
90-
output.collect(seaTunnelRowDeserializer.deserialize(item));
91-
});
92-
}
83+
Map<String, AttributeValue> lastKeyEvaluated = null;
84+
85+
ScanResponse scan;
86+
do {
87+
scan =
88+
dynamoDbClient.scan(
89+
ScanRequest.builder()
90+
.tableName(amazondynamodbSourceOptions.getTable())
91+
.exclusiveStartKey(lastKeyEvaluated)
92+
.build());
93+
if (scan.hasItems()) {
94+
scan.items()
95+
.forEach(
96+
item -> {
97+
output.collect(seaTunnelRowDeserializer.deserialize(item));
98+
});
99+
}
100+
lastKeyEvaluated = scan.lastEvaluatedKey();
101+
} while (lastKeyEvaluated != null && !lastKeyEvaluated.isEmpty());
93102
context.signalNoMoreElement();
94103
}
95104
}

0 commit comments

Comments
 (0)