Skip to content

Commit 84be0f9

Browse files
Carl-Zhou-CNzhouyao
andauthored
[improve][SelectDB] Add a jobId to the selectDB label to distinguish between tasks (#4864)
Co-authored-by: zhouyao <[email protected]>
1 parent abf8fcd commit 84be0f9

File tree

3 files changed

+22
-12
lines changed

3 files changed

+22
-12
lines changed

seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSink.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.seatunnel.shade.com.typesafe.config.Config;
2121

22+
import org.apache.seatunnel.api.common.JobContext;
2223
import org.apache.seatunnel.api.common.PrepareFailException;
2324
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
2425
import org.apache.seatunnel.api.serialization.Serializer;
@@ -59,6 +60,7 @@ public class SelectDBSink
5960
SeaTunnelRow, SelectDBSinkState, SelectDBCommitInfo, SelectDBCommitInfo> {
6061
private Config pluginConfig;
6162
private SeaTunnelRowType seaTunnelRowType;
63+
private String jobId;
6264

6365
@Override
6466
public String getPluginName() {
@@ -85,6 +87,11 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
8587
}
8688
}
8789

90+
@Override
91+
public void setJobContext(JobContext jobContext) {
92+
this.jobId = jobContext.getJobId();
93+
}
94+
8895
@Override
8996
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
9097
this.seaTunnelRowType = seaTunnelRowType;
@@ -98,20 +105,20 @@ public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
98105
@Override
99106
public SinkWriter<SeaTunnelRow, SelectDBCommitInfo, SelectDBSinkState> createWriter(
100107
SinkWriter.Context context) throws IOException {
101-
SelectDBSinkWriter dorisWriter =
108+
SelectDBSinkWriter selectDBSinkWriter =
102109
new SelectDBSinkWriter(
103-
context, Collections.emptyList(), seaTunnelRowType, pluginConfig);
104-
dorisWriter.initializeLoad(Collections.emptyList());
105-
return dorisWriter;
110+
context, Collections.emptyList(), seaTunnelRowType, pluginConfig, jobId);
111+
selectDBSinkWriter.initializeLoad(Collections.emptyList());
112+
return selectDBSinkWriter;
106113
}
107114

108115
@Override
109116
public SinkWriter<SeaTunnelRow, SelectDBCommitInfo, SelectDBSinkState> restoreWriter(
110117
SinkWriter.Context context, List<SelectDBSinkState> states) throws IOException {
111-
SelectDBSinkWriter dorisWriter =
112-
new SelectDBSinkWriter(context, states, seaTunnelRowType, pluginConfig);
113-
dorisWriter.initializeLoad(states);
114-
return dorisWriter;
118+
SelectDBSinkWriter selectDBSinkWriter =
119+
new SelectDBSinkWriter(context, states, seaTunnelRowType, pluginConfig, jobId);
120+
selectDBSinkWriter.initializeLoad(states);
121+
return selectDBSinkWriter;
115122
}
116123

117124
@Override

seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkStateSerializer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,11 @@
2929
public class SelectDBSinkStateSerializer implements Serializer<SelectDBSinkState> {
3030

3131
@Override
32-
public byte[] serialize(SelectDBSinkState obj) throws IOException {
32+
public byte[] serialize(SelectDBSinkState selectDBSinkState) throws IOException {
3333
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
3434
final DataOutputStream out = new DataOutputStream(baos)) {
35-
out.writeUTF(obj.getLabelPrefix());
35+
out.writeUTF(selectDBSinkState.getLabelPrefix());
36+
out.writeLong(selectDBSinkState.getCheckpointId());
3637
out.flush();
3738
return baos.toByteArray();
3839
}

seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkWriter.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,15 +54,17 @@ public SelectDBSinkWriter(
5454
SinkWriter.Context context,
5555
List<SelectDBSinkState> state,
5656
SeaTunnelRowType seaTunnelRowType,
57-
Config pluginConfig) {
57+
Config pluginConfig,
58+
String jobId) {
5859
this.selectdbConfig = SelectDBConfig.loadConfig(pluginConfig);
5960
this.lastCheckpointId = state.size() != 0 ? state.get(0).getCheckpointId() : 0;
6061
log.info("restore checkpointId {}", lastCheckpointId);
6162
// filename prefix is uuid
6263
log.info("labelPrefix " + selectdbConfig.getLabelPrefix());
6364
this.selectdbSinkState =
6465
new SelectDBSinkState(selectdbConfig.getLabelPrefix(), lastCheckpointId);
65-
this.labelPrefix = selectdbConfig.getLabelPrefix() + "_" + context.getIndexOfSubtask();
66+
this.labelPrefix =
67+
selectdbConfig.getLabelPrefix() + "_" + jobId + "_" + context.getIndexOfSubtask();
6668
this.lineDelimiter =
6769
selectdbConfig
6870
.getStageLoadProps()

0 commit comments

Comments
 (0)