-
Notifications
You must be signed in to change notification settings - Fork 2.1k
[Feature][Connectors-v2] Support auto-increment id for FakeSource #9505
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
corgy-w
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good contribution
...main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/AutoIncrementIdGenerator.java
Outdated
Show resolved
Hide resolved
...ake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/IdGeneratorUtils.java
Outdated
Show resolved
Hide resolved
…e/seatunnel/connectors/seatunnel/fake/utils/AutoIncrementIdGenerator.java Co-authored-by: Jia Fan <[email protected]>
…e/seatunnel/connectors/seatunnel/fake/utils/IdGeneratorUtils.java Co-authored-by: Jia Fan <[email protected]>
Hisoka-X
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the source runs on multiple workers? Should it be allocated in advance in the enumerator? Or should it be limited to a single degree of parallelism?
|
|
||
| public class IdGeneratorUtils { | ||
|
|
||
| private static final Map<String, AutoIncrementIdGenerator> idGenerators = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Static variables will cause problems when the same job is run multiple times
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds support for auto-incrementing primary key IDs in the FakeSource connector and updates related tests and documentation.
- Introduces new
auto.increment.enabledandauto.increment.startoptions inFakeSourceOptionsandFakeConfig - Implements ID generators (
AutoIncrementIdGenerator,IdGeneratorUtils) and integrates them intoFakeDataRandomUtils,FakeDataGenerator, andFakeSourceReader - Adds an end-to-end E2E test (
testAutoIncrementId) and updates documentation with examples
Reviewed Changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| connector-fake/src/main/java/.../FakeSourceOptions.java | Defined new connector options for auto-increment |
| connector-fake/src/main/java/.../FakeConfig.java | Mapped new options into FakeConfig |
| connector-fake/src/main/java/.../AutoIncrementIdGenerator.java | Implemented the ID generator |
| connector-fake/src/main/java/.../IdGeneratorUtils.java | Cached and retrieved per-job ID generators |
| connector-fake/src/main/java/.../FakeDataRandomUtils.java | Applied auto-increment logic for ints and bigints |
| connector-fake/src/main/java/.../FakeDataGenerator.java | Passed jobId to random utils |
| connector-fake/src/main/java/.../FakeSourceReader.java | Thread-safe generator lookup via jobId |
| connector-fake/src/main/java/.../FakeSource.java | Propagated jobId into reader creation |
| connector-fake/src/test/java/.../FakeDataGeneratorTest.java | Updated test constructors and added auto-increment test |
| connector-pulsar-e2e/.../PulsarBatchIT.java | Adjusted FakeDataGenerator constructor calls |
| connector-fake/src/test/resources/fake-auto-increment-id.conf | Added a config file for auto-increment testing |
| docs/zh/connector-v2/source/FakeSource.md | Documented new options and example |
| docs/en/connector-v2/source/FakeSource.md | Documented new options and example |
Comments suppressed due to low confidence (5)
docs/zh/connector-v2/source/FakeSource.md:33
- The default value for
auto.increment.startis missing in the table; it should reflect the code default (1).
| auto.increment.start | int | 否 | | 自动递增ID的起始值 |
docs/en/connector-v2/source/FakeSource.md:34
- The default value for
auto.increment.startis not shown; update the table to indicate the default of1.
| auto.increment.start | int | no | | Starting value for auto increment ID |
seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java:238
- The same config file is provided twice in the
@ValueSource; remove the duplicate to avoid redundant test runs.
@ValueSource(strings = {"fake-auto-increment-id.conf", "fake-auto-increment-id.conf"})
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java:154
- [nitpick] Passing
nullforjobIdmay mask issues; consider adding an overload that defaultsjobIdinternally or retrieving a real job ID from the test context.
FakeDataGenerator fakeDataGenerator = new FakeDataGenerator(fakeConfig, null);
seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java:246
- [nitpick] The variable name
uCompletableFutureis unclear; consider renaming it to simplyfutureorrowFuturefor readability.
CompletableFuture<List<SeaTunnelRow>> uCompletableFuture =
| import java.util.List; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| @Slf4j |
Copilot
AI
Jul 2, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The @Slf4j annotation is added but no logging calls are used in this class; consider removing it or adding relevant log statements.
|
|
||
| public Long randomBigint(Column column) { | ||
| if (fakeConfig.getAutoIncrementEnabled() | ||
| && IdGeneratorUtils.isPrimaryColumn(fakeConfig, column.getName())) { |
Copilot
AI
Jul 2, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no overflow check for auto.increment.start + parallelism * rowNum when generating bigints; this could exceed Long.MAX_VALUE.
| && IdGeneratorUtils.isPrimaryColumn(fakeConfig, column.getName())) { | |
| && IdGeneratorUtils.isPrimaryColumn(fakeConfig, column.getName())) { | |
| long autoIncrementStart = fakeConfig.getAutoIncrementStart(); | |
| long parallelism = fakeConfig.getParallelism(); | |
| long rowNum = fakeConfig.getRowNum(); | |
| if (autoIncrementStart > 0 && parallelism > 0 && rowNum > 0) { | |
| if (autoIncrementStart > Long.MAX_VALUE - (parallelism * rowNum)) { | |
| throw new IllegalArgumentException( | |
| "The auto increment start value is too large, or the combination of parallelism and rowNum causes an overflow. Please check your configuration."); | |
| } | |
| } |
| .expireAfterWrite(30, TimeUnit.MINUTES) | ||
| .build(); | ||
|
|
||
| public static synchronized Optional<AutoIncrementIdGenerator> getIdGenerator( |
Copilot
AI
Jul 2, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method is declared synchronized, which may become a bottleneck under high concurrency; consider relying on Guava's thread-safe cache instead of external synchronization.
| public static synchronized Optional<AutoIncrementIdGenerator> getIdGenerator( | |
| public static Optional<AutoIncrementIdGenerator> getIdGenerator( |
Purpose of this pull request
Does this PR introduce any user-facing change?
How was this patch tested?
Check list
New License Guide