Skip to content

Conversation

@Hisoka-X
Copy link
Member

@Hisoka-X Hisoka-X commented Mar 27, 2025

Purpose of this pull request

Fixed not invoke the SinkAggregatedCommitter's init method

Does this PR introduce any user-facing change?

no

How was this patch tested?

add new test.

Check list

@Hisoka-X Hisoka-X changed the title [Fix][API] Fixed not invoke the SinkAggregatedCommitter's init method [Fix][API] Fixed not invoke the SinkAggregatedCommitter's init method Mar 27, 2025
@hailin0 hailin0 requested a review from Copilot March 31, 2025 03:57
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR addresses an issue where the SinkAggregatedCommitter's init method was not being invoked by refactoring its implementation across various connectors. The key changes include refactoring the initialization of committer components by removing or adjusting final qualifiers and adding explicit init method calls in both API and connector implementations.

Reviewed Changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated no comments.

Show a summary per file
File Description
connector-jdbc/JdbcSinkAggregatedCommitter.java Modified field qualifiers and moved jdbcSinkConfig assignment to the constructor to ensure proper init method invocation.
connector-iceberg/IcebergAggregatedCommitter.java Adjusted field modifiers and shifted initialization logic into the init method.
connector-file/FileSinkAggregatedCommitter.java Introduced hadoopConf as a final field and moved initialization into init.
connector-common/SinkFlowTestUtils.java Updated aggregated committer variable naming and ensured proper init and resource manager usage.
connector-clickhouse/ClickhouseFileSinkAggCommitter.java Made clickhouseTable mutable and moved its initialization to init.
api/multitablesink/MultiTableSinkAggregatedCommitter.java Changed loop control from return to break to adjust initialization behavior for multi-table committers.
api/SinkAggregatedCommitter.java Updated documentation to clarify that the init method may be called on each retry.
Comments suppressed due to low confidence (4)

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkAggregatedCommitter.java:42

  • Removing the 'final' qualifier from xaFacade and xaGroupOps may introduce unintended mutability; confirm that this change is deliberate and does not impact thread-safety.
private XaFacade xaFacade;

seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergAggregatedCommitter.java:36

  • The removal of 'final' for tableLoader and filesCommitter may introduce potential state issues; please confirm that making these fields mutable is intentional.
private IcebergTableLoader tableLoader;

seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkAggregatedCommitter.java:55

  • Replacing 'return' with 'break' here changes the loop control flow; verify that skipping the remaining committers instead of exiting the method is the intended behavior.
if (!(aggCommitter instanceof SupportMultiTableSinkAggregatedCommitter)) { break; }

seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java:44

  • Removing the final qualifier from clickhouseTable changes its mutability; ensure that this modification aligns with the design and does not lead to unexpected state changes.
private ClickhouseTable clickhouseTable;

@Hisoka-X Hisoka-X force-pushed the fix-init-behavior branch from 1a5e686 to 024ab50 Compare March 31, 2025 04:07
@Hisoka-X Hisoka-X marked this pull request as ready for review March 31, 2025 04:07
@hailin0 hailin0 requested a review from Copilot April 2, 2025 09:35
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This pull request fixes an issue where the SinkAggregatedCommitter’s init method was not being invoked in multiple connectors. The changes ensure that the init method is correctly called by moving related field assignments from init to the constructor and adjusting control flow in resource manager initialization.

  • Updated Jdbc, Iceberg, File, and Clickhouse committers to properly initialize required fields.
  • Revised the MultiTableSinkAggregatedCommitter control flow and its corresponding test to ensure init and close methods are invoked.
  • Enhanced inline documentation for the init method in the SinkAggregatedCommitter interface.

Reviewed Changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated no comments.

Show a summary per file
File Description
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkAggregatedCommitter.java Moved jdbcSinkConfig initialization to constructor and removed duplicate assignment in init.
seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergAggregatedCommitter.java Updated field modifiers and restructured init method for clarity.
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java Injected HadoopConf via constructor to support reinitialization in init.
seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/sink/SinkFlowTestUtils.java Renamed variables and updated resource manager handling to match new initialization flow.
seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java Adjusted field and initialization logic in init to match similar committers.
seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkAggregatedCommitterTest.java Added test verifying that init and close are properly invoked by the aggregated committer.
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkAggregatedCommitter.java Changed control flow in initResourceManager method to support flexible initialization.
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java Updated the init method Javadoc to indicate that it may be called on each retry.
Comments suppressed due to low confidence (1)

seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkAggregatedCommitter.java:55

  • Changing from 'return' to 'break' in the initResourceManager method modifies the control flow by continuing the initialization for subsequent committers even if one does not support multi-table. Please verify that this change is intentional and does not lead to unexpected behavior when a non-supporting committer is encountered.
if (!(aggCommitter instanceof SupportMultiTableSinkAggregatedCommitter)) { break; }

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot reviewed 8 out of 8 changed files in this pull request and generated no comments.

Comments suppressed due to low confidence (1)

seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkAggregatedCommitter.java:55

  • Changing 'return' to 'break' here may result in incomplete initialization of multi-table resource management if not all committers support the interface. Please verify that this logic meets the intended behavior for committers that do not implement SupportMultiTableSinkAggregatedCommitter.
if (!(aggCommitter instanceof SupportMultiTableSinkAggregatedCommitter)) { break; }

@Hisoka-X Hisoka-X requested a review from Copilot April 15, 2025 08:48
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.

Comments suppressed due to low confidence (1)

seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkAggregatedCommitter.java:55

  • Replacing the early return with a break means that only the first committer is checked for multi-table support. Verify that this logic is intended to handle multiple aggregators and that aggregators not implementing SupportMultiTableSinkAggregatedCommitter are acceptable.
if (!(aggCommitter instanceof SupportMultiTableSinkAggregatedCommitter)) { break; }


private final XaFacade xaFacade;
private final XaGroupOps xaGroupOps;
private XaFacade xaFacade;
Copy link

Copilot AI Apr 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing the 'final' modifier for xaFacade (and similarly for xaGroupOps) means the fields are mutable until init() is called. Document the requirement that init() must be invoked immediately after construction to prevent unintended usage.

Copilot uses AI. Check for mistakes.

private final IcebergTableLoader tableLoader;
private final IcebergFilesCommitter filesCommitter;
private IcebergTableLoader tableLoader;
Copy link

Copilot AI Apr 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing tableLoader and filesCommitter from 'final' to non-final implies these members rely on an explicit init() call. Ensure that the documentation or comments clarify that init() must be called before any other methods are used.

Copilot uses AI. Check for mistakes.
@corgy-w corgy-w merged commit df0d11d into apache:dev Apr 16, 2025
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants