-
Notifications
You must be signed in to change notification settings - Fork 2.1k
[Fix][API] Fixed not invoke the SinkAggregatedCommitter's init method
#9070
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
SinkAggregatedCommitter's init method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR addresses an issue where the SinkAggregatedCommitter's init method was not being invoked by refactoring its implementation across various connectors. The key changes include refactoring the initialization of committer components by removing or adjusting final qualifiers and adding explicit init method calls in both API and connector implementations.
Reviewed Changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| connector-jdbc/JdbcSinkAggregatedCommitter.java | Modified field qualifiers and moved jdbcSinkConfig assignment to the constructor to ensure proper init method invocation. |
| connector-iceberg/IcebergAggregatedCommitter.java | Adjusted field modifiers and shifted initialization logic into the init method. |
| connector-file/FileSinkAggregatedCommitter.java | Introduced hadoopConf as a final field and moved initialization into init. |
| connector-common/SinkFlowTestUtils.java | Updated aggregated committer variable naming and ensured proper init and resource manager usage. |
| connector-clickhouse/ClickhouseFileSinkAggCommitter.java | Made clickhouseTable mutable and moved its initialization to init. |
| api/multitablesink/MultiTableSinkAggregatedCommitter.java | Changed loop control from return to break to adjust initialization behavior for multi-table committers. |
| api/SinkAggregatedCommitter.java | Updated documentation to clarify that the init method may be called on each retry. |
Comments suppressed due to low confidence (4)
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkAggregatedCommitter.java:42
- Removing the 'final' qualifier from xaFacade and xaGroupOps may introduce unintended mutability; confirm that this change is deliberate and does not impact thread-safety.
private XaFacade xaFacade;
seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergAggregatedCommitter.java:36
- The removal of 'final' for tableLoader and filesCommitter may introduce potential state issues; please confirm that making these fields mutable is intentional.
private IcebergTableLoader tableLoader;
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkAggregatedCommitter.java:55
- Replacing 'return' with 'break' here changes the loop control flow; verify that skipping the remaining committers instead of exiting the method is the intended behavior.
if (!(aggCommitter instanceof SupportMultiTableSinkAggregatedCommitter)) { break; }
seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java:44
- Removing the final qualifier from clickhouseTable changes its mutability; ensure that this modification aligns with the design and does not lead to unexpected state changes.
private ClickhouseTable clickhouseTable;
1a5e686 to
024ab50
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This pull request fixes an issue where the SinkAggregatedCommitter’s init method was not being invoked in multiple connectors. The changes ensure that the init method is correctly called by moving related field assignments from init to the constructor and adjusting control flow in resource manager initialization.
- Updated Jdbc, Iceberg, File, and Clickhouse committers to properly initialize required fields.
- Revised the MultiTableSinkAggregatedCommitter control flow and its corresponding test to ensure init and close methods are invoked.
- Enhanced inline documentation for the init method in the SinkAggregatedCommitter interface.
Reviewed Changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkAggregatedCommitter.java | Moved jdbcSinkConfig initialization to constructor and removed duplicate assignment in init. |
| seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergAggregatedCommitter.java | Updated field modifiers and restructured init method for clarity. |
| seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java | Injected HadoopConf via constructor to support reinitialization in init. |
| seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/sink/SinkFlowTestUtils.java | Renamed variables and updated resource manager handling to match new initialization flow. |
| seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java | Adjusted field and initialization logic in init to match similar committers. |
| seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkAggregatedCommitterTest.java | Added test verifying that init and close are properly invoked by the aggregated committer. |
| seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkAggregatedCommitter.java | Changed control flow in initResourceManager method to support flexible initialization. |
| seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java | Updated the init method Javadoc to indicate that it may be called on each retry. |
Comments suppressed due to low confidence (1)
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkAggregatedCommitter.java:55
- Changing from 'return' to 'break' in the initResourceManager method modifies the control flow by continuing the initialization for subsequent committers even if one does not support multi-table. Please verify that this change is intentional and does not lead to unexpected behavior when a non-supporting committer is encountered.
if (!(aggCommitter instanceof SupportMultiTableSinkAggregatedCommitter)) { break; }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot reviewed 8 out of 8 changed files in this pull request and generated no comments.
Comments suppressed due to low confidence (1)
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkAggregatedCommitter.java:55
- Changing 'return' to 'break' here may result in incomplete initialization of multi-table resource management if not all committers support the interface. Please verify that this logic meets the intended behavior for committers that do not implement SupportMultiTableSinkAggregatedCommitter.
if (!(aggCommitter instanceof SupportMultiTableSinkAggregatedCommitter)) { break; }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (1)
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkAggregatedCommitter.java:55
- Replacing the early return with a break means that only the first committer is checked for multi-table support. Verify that this logic is intended to handle multiple aggregators and that aggregators not implementing SupportMultiTableSinkAggregatedCommitter are acceptable.
if (!(aggCommitter instanceof SupportMultiTableSinkAggregatedCommitter)) { break; }
|
|
||
| private final XaFacade xaFacade; | ||
| private final XaGroupOps xaGroupOps; | ||
| private XaFacade xaFacade; |
Copilot
AI
Apr 15, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing the 'final' modifier for xaFacade (and similarly for xaGroupOps) means the fields are mutable until init() is called. Document the requirement that init() must be invoked immediately after construction to prevent unintended usage.
|
|
||
| private final IcebergTableLoader tableLoader; | ||
| private final IcebergFilesCommitter filesCommitter; | ||
| private IcebergTableLoader tableLoader; |
Copilot
AI
Apr 15, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changing tableLoader and filesCommitter from 'final' to non-final implies these members rely on an explicit init() call. Ensure that the documentation or comments clarify that init() must be called before any other methods are used.
Purpose of this pull request
Fixed not invoke the SinkAggregatedCommitter's init method
Does this PR introduce any user-facing change?
no
How was this patch tested?
add new test.
Check list
New License Guide
release-note.