Skip to content

Commit 4b3af9b

Browse files
liugddxgdliu3Hisoka-X
authored
[Improve][Doris Connector] Unified serialization method,Use RowToJsonConverter and TextSerializationSchema (#7229)
* 1 * 1 * 1 * Update seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/serialization/RowBatch.java Co-authored-by: Jia Fan <[email protected]> --------- Co-authored-by: gdliu3 <[email protected]> Co-authored-by: Jia Fan <[email protected]>
1 parent 4ec25f3 commit 4b3af9b

File tree

11 files changed

+322
-265
lines changed

11 files changed

+322
-265
lines changed

seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterFactory.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919

2020
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
2121
import org.apache.seatunnel.api.table.converter.TypeConverter;
22-
import org.apache.seatunnel.common.exception.CommonError;
23-
import org.apache.seatunnel.connectors.doris.config.DorisConfig;
2422

2523
import lombok.NonNull;
2624
import lombok.extern.slf4j.Slf4j;
@@ -37,7 +35,7 @@ public static TypeConverter<BasicTypeDefine> getTypeConverter(@NonNull String do
3735
|| dorisVersion.toLowerCase(Locale.ROOT).startsWith("selectdb-doris-2.")) {
3836
return DorisTypeConverterV2.INSTANCE;
3937
} else {
40-
throw CommonError.unsupportedVersion(DorisConfig.IDENTIFIER, dorisVersion);
38+
return DorisTypeConverterV2.INSTANCE;
4139
}
4240
}
4341
}

seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowConverter.java

Lines changed: 0 additions & 107 deletions
This file was deleted.

seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializer.java

Lines changed: 53 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,28 @@
1717

1818
package org.apache.seatunnel.connectors.doris.serialize;
1919

20+
import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonGenerator;
21+
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
22+
2023
import org.apache.seatunnel.api.table.type.RowKind;
24+
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
2125
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2226
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
2327
import org.apache.seatunnel.connectors.doris.sink.writer.LoadConstants;
24-
25-
import com.fasterxml.jackson.databind.ObjectMapper;
28+
import org.apache.seatunnel.format.json.JsonSerializationSchema;
29+
import org.apache.seatunnel.format.text.TextSerializationSchema;
2630

2731
import java.io.IOException;
28-
import java.nio.charset.StandardCharsets;
29-
import java.util.HashMap;
30-
import java.util.Map;
31-
import java.util.StringJoiner;
32+
import java.util.Arrays;
33+
import java.util.List;
3234

33-
import static com.google.common.base.Preconditions.checkState;
35+
import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
3436
import static org.apache.seatunnel.connectors.doris.sink.writer.LoadConstants.CSV;
3537
import static org.apache.seatunnel.connectors.doris.sink.writer.LoadConstants.JSON;
3638
import static org.apache.seatunnel.connectors.doris.sink.writer.LoadConstants.NULL_VALUE;
3739

38-
public class SeaTunnelRowSerializer extends SeaTunnelRowConverter implements DorisSerializer {
40+
public class SeaTunnelRowSerializer implements DorisSerializer {
3941
String type;
40-
private ObjectMapper objectMapper;
4142
private final SeaTunnelRowType seaTunnelRowType;
4243
private final String fieldDelimiter;
4344
private final boolean enableDelete;
@@ -51,48 +52,29 @@ public SeaTunnelRowSerializer(
5152
this.seaTunnelRowType = seaTunnelRowType;
5253
this.fieldDelimiter = fieldDelimiter;
5354
this.enableDelete = enableDelete;
54-
if (JSON.equals(type)) {
55-
objectMapper = new ObjectMapper();
56-
}
5755
}
5856

59-
@Override
60-
public byte[] serialize(SeaTunnelRow seaTunnelRow) throws IOException {
61-
String valString;
62-
if (JSON.equals(type)) {
63-
valString = buildJsonString(seaTunnelRow);
64-
} else if (CSV.equals(type)) {
65-
valString = buildCSVString(seaTunnelRow);
66-
} else {
67-
throw new IllegalArgumentException("The type " + type + " is not supported!");
68-
}
69-
return valString.getBytes(StandardCharsets.UTF_8);
57+
public byte[] buildJsonString(SeaTunnelRow row, SeaTunnelRowType seaTunnelRowType)
58+
throws IOException {
59+
60+
JsonSerializationSchema jsonSerializationSchema =
61+
new JsonSerializationSchema(seaTunnelRowType, NULL_VALUE);
62+
ObjectMapper mapper = jsonSerializationSchema.getMapper();
63+
mapper.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, true);
64+
return jsonSerializationSchema.serialize(row);
7065
}
7166

72-
public String buildJsonString(SeaTunnelRow row) throws IOException {
73-
Map<String, Object> rowMap = new HashMap<>(row.getFields().length);
67+
public byte[] buildCSVString(SeaTunnelRow row, SeaTunnelRowType seaTunnelRowType)
68+
throws IOException {
7469

75-
for (int i = 0; i < row.getFields().length; i++) {
76-
Object value = convert(seaTunnelRowType.getFieldType(i), row.getField(i));
77-
rowMap.put(seaTunnelRowType.getFieldName(i), value);
78-
}
79-
if (enableDelete) {
80-
rowMap.put(LoadConstants.DORIS_DELETE_SIGN, parseDeleteSign(row.getRowKind()));
81-
}
82-
return objectMapper.writeValueAsString(rowMap);
83-
}
70+
TextSerializationSchema build =
71+
TextSerializationSchema.builder()
72+
.seaTunnelRowType(seaTunnelRowType)
73+
.delimiter(fieldDelimiter)
74+
.nullValue(NULL_VALUE)
75+
.build();
8476

85-
public String buildCSVString(SeaTunnelRow row) throws IOException {
86-
StringJoiner joiner = new StringJoiner(fieldDelimiter);
87-
for (int i = 0; i < row.getFields().length; i++) {
88-
Object field = convert(seaTunnelRowType.getFieldType(i), row.getField(i));
89-
String value = field != null ? field.toString() : NULL_VALUE;
90-
joiner.add(value);
91-
}
92-
if (enableDelete) {
93-
joiner.add(parseDeleteSign(row.getRowKind()));
94-
}
95-
return joiner.toString();
77+
return build.serialize(row);
9678
}
9779

9880
public String parseDeleteSign(RowKind rowKind) {
@@ -105,46 +87,40 @@ public String parseDeleteSign(RowKind rowKind) {
10587
}
10688
}
10789

108-
public static Builder builder() {
109-
return new Builder();
110-
}
111-
112-
/** Builder for RowDataSerializer. */
113-
public static class Builder {
114-
private SeaTunnelRowType seaTunnelRowType;
115-
private String type;
116-
private String fieldDelimiter;
117-
private boolean deletable;
118-
119-
public Builder setType(String type) {
120-
this.type = type;
121-
return this;
122-
}
90+
@Override
91+
public void open() throws IOException {}
12392

124-
public Builder setSeaTunnelRowType(SeaTunnelRowType seaTunnelRowType) {
125-
this.seaTunnelRowType = seaTunnelRowType;
126-
return this;
127-
}
93+
@Override
94+
public byte[] serialize(SeaTunnelRow seaTunnelRow) throws IOException {
12895

129-
public Builder setFieldDelimiter(String fieldDelimiter) {
130-
this.fieldDelimiter = fieldDelimiter;
131-
return this;
132-
}
96+
List<String> fieldNames = Arrays.asList(seaTunnelRowType.getFieldNames());
97+
List<SeaTunnelDataType<?>> fieldTypes = Arrays.asList(seaTunnelRowType.getFieldTypes());
13398

134-
public Builder enableDelete(boolean deletable) {
135-
this.deletable = deletable;
136-
return this;
99+
if (enableDelete) {
100+
SeaTunnelRow seaTunnelRowEnableDelete = seaTunnelRow.copy();
101+
seaTunnelRowEnableDelete.setField(
102+
seaTunnelRow.getFields().length, parseDeleteSign(seaTunnelRow.getRowKind()));
103+
fieldNames.add(LoadConstants.DORIS_DELETE_SIGN);
104+
fieldTypes.add(STRING_TYPE);
137105
}
138106

139-
public SeaTunnelRowSerializer build() {
140-
checkState(CSV.equals(type) && fieldDelimiter != null || JSON.equals(type));
141-
return new SeaTunnelRowSerializer(type, seaTunnelRowType, fieldDelimiter, deletable);
107+
if (JSON.equals(type)) {
108+
return buildJsonString(
109+
seaTunnelRow,
110+
new SeaTunnelRowType(
111+
fieldNames.toArray(new String[0]),
112+
fieldTypes.toArray(new SeaTunnelDataType<?>[0])));
113+
} else if (CSV.equals(type)) {
114+
return buildCSVString(
115+
seaTunnelRow,
116+
new SeaTunnelRowType(
117+
fieldNames.toArray(new String[0]),
118+
fieldTypes.toArray(new SeaTunnelDataType<?>[0])));
119+
} else {
120+
throw new IllegalArgumentException("The type " + type + " is not supported!");
142121
}
143122
}
144123

145-
@Override
146-
public void open() throws IOException {}
147-
148124
@Override
149125
public void close() throws IOException {}
150126
}

0 commit comments

Comments
 (0)