-
Notifications
You must be signed in to change notification settings - Fork 2.1k
[Feature][http-Sink] Implementing http batch writes #9292
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
d1a933e
54ccf2e
02d0fbd
5e74c48
25579f8
c3758dc
f21c178
94c85b0
90162c5
1edda2c
ee0bddd
a3ca1c9
2fde7fd
14e7446
f61cd41
59a7d66
5829b5e
dd180ca
a15aecc
68abc4c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,4 +17,33 @@ | |
|
|
||
| package org.apache.seatunnel.connectors.seatunnel.http.config; | ||
|
|
||
| public class HttpSinkOptions extends HttpCommonOptions {} | ||
| import org.apache.seatunnel.api.configuration.Option; | ||
| import org.apache.seatunnel.api.configuration.Options; | ||
|
|
||
| public class HttpSinkOptions extends HttpCommonOptions { | ||
| public static final Option<Boolean> ARRAY_MODE = | ||
| Options.key("array_mode") | ||
| .booleanType() | ||
| .defaultValue(false) | ||
| .withDescription( | ||
| "Send data as a JSON array when true, or as a single JSON object when false (default)"); | ||
|
|
||
| public static final Option<Integer> BATCH_SIZE = | ||
| Options.key("batch_size") | ||
| .intType() | ||
| .defaultValue(1) | ||
| .withDescription( | ||
| "The batch size of records to send in one HTTP request. Only works when array_mode is true"); | ||
|
|
||
| public static final Option<Integer> REQUEST_INTERVAL_MS = | ||
| Options.key("request_interval_ms") | ||
| .intType() | ||
| .defaultValue(0) | ||
| .withDescription("The interval milliseconds between two HTTP requests"); | ||
|
|
||
| public static final Option<String> FORMAT = | ||
|
||
| Options.key("format") | ||
| .stringType() | ||
| .defaultValue("json") | ||
| .withDescription("The format of the batch data, only support json now"); | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -58,11 +58,29 @@ public String getPluginName() { | |||
|
|
||||
| @Override | ||||
| public HttpSinkWriter createWriter(SinkWriter.Context context) throws IOException { | ||||
| return new HttpSinkWriter(seaTunnelRowType, httpParameter); | ||||
| boolean arrayMode = pluginConfig.get(HttpSinkOptions.ARRAY_MODE); | ||||
| int batchSize = pluginConfig.get(HttpSinkOptions.BATCH_SIZE); | ||||
| int requestIntervalMs = pluginConfig.get(HttpSinkOptions.REQUEST_INTERVAL_MS); | ||||
|
||||
| public class HttpParameter implements Serializable { |
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -17,6 +17,9 @@ | |||||||
|
|
||||||||
| package org.apache.seatunnel.connectors.seatunnel.http.sink; | ||||||||
|
|
||||||||
| import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper; | ||||||||
| import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ArrayNode; | ||||||||
|
|
||||||||
| import org.apache.seatunnel.api.serialization.SerializationSchema; | ||||||||
| import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; | ||||||||
| import org.apache.seatunnel.api.table.type.SeaTunnelRow; | ||||||||
|
|
@@ -30,6 +33,8 @@ | |||||||
| import lombok.extern.slf4j.Slf4j; | ||||||||
|
|
||||||||
| import java.io.IOException; | ||||||||
| import java.util.ArrayList; | ||||||||
| import java.util.List; | ||||||||
| import java.util.Objects; | ||||||||
|
|
||||||||
| @Slf4j | ||||||||
|
|
@@ -40,6 +45,14 @@ public class HttpSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> | |||||||
| protected final HttpParameter httpParameter; | ||||||||
| protected final SerializationSchema serializationSchema; | ||||||||
|
|
||||||||
| // Batch related fields | ||||||||
| private final boolean arrayMode; | ||||||||
| private final int batchSize; | ||||||||
| private final int requestIntervalMs; | ||||||||
| private final String format; | ||||||||
| private final List<SeaTunnelRow> batchBuffer; | ||||||||
| private long lastRequestTime; | ||||||||
|
|
||||||||
| public HttpSinkWriter(SeaTunnelRowType seaTunnelRowType, HttpParameter httpParameter) { | ||||||||
| this(seaTunnelRowType, httpParameter, new JsonSerializationSchema(seaTunnelRowType)); | ||||||||
| } | ||||||||
|
|
@@ -48,18 +61,107 @@ public HttpSinkWriter( | |||||||
| SeaTunnelRowType seaTunnelRowType, | ||||||||
| HttpParameter httpParameter, | ||||||||
| SerializationSchema serializationSchema) { | ||||||||
| this(seaTunnelRowType, httpParameter, serializationSchema, false, 1, 0, "json"); | ||||||||
| } | ||||||||
|
|
||||||||
| public HttpSinkWriter( | ||||||||
| SeaTunnelRowType seaTunnelRowType, | ||||||||
| HttpParameter httpParameter, | ||||||||
| boolean arrayMode, | ||||||||
| int batchSize, | ||||||||
| int requestIntervalMs, | ||||||||
| String format) { | ||||||||
| this( | ||||||||
| seaTunnelRowType, | ||||||||
| httpParameter, | ||||||||
| new JsonSerializationSchema(seaTunnelRowType), | ||||||||
| arrayMode, | ||||||||
| batchSize, | ||||||||
| requestIntervalMs, | ||||||||
| format); | ||||||||
| } | ||||||||
|
|
||||||||
| public HttpSinkWriter( | ||||||||
| SeaTunnelRowType seaTunnelRowType, | ||||||||
| HttpParameter httpParameter, | ||||||||
| SerializationSchema serializationSchema, | ||||||||
| boolean arrayMode, | ||||||||
| int batchSize, | ||||||||
| int requestIntervalMs, | ||||||||
| String format) { | ||||||||
| this.seaTunnelRowType = seaTunnelRowType; | ||||||||
| this.httpParameter = httpParameter; | ||||||||
| this.httpClient = new HttpClientProvider(httpParameter); | ||||||||
| this.httpClient = createHttpClient(httpParameter); | ||||||||
| this.serializationSchema = serializationSchema; | ||||||||
| this.arrayMode = arrayMode; | ||||||||
| this.batchSize = batchSize; | ||||||||
| this.requestIntervalMs = requestIntervalMs; | ||||||||
| this.format = format; | ||||||||
| this.batchBuffer = new ArrayList<>(batchSize); | ||||||||
| this.lastRequestTime = System.currentTimeMillis(); | ||||||||
| } | ||||||||
|
|
||||||||
| @Override | ||||||||
| public void write(SeaTunnelRow element) throws IOException { | ||||||||
| if (!arrayMode) { | ||||||||
| // Object mode: send each record individually, ignore batch_size setting | ||||||||
|
||||||||
| // Object mode: send each record individually, ignore batch_size setting |
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // Array mode: batch processing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // Check request interval |
Please do not add useless comment.
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should throw exception directly. Othewise the writer woule never be closed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // Send HTTP request |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should invoke flush method when invoke prepareCommit method too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| protected HttpClientProvider createHttpClient(HttpParameter httpParameter) { | |
| @VisibleForTesting | |
| protected HttpClientProvider createHttpClient(HttpParameter httpParameter) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what another format we can support in the future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently think of just json, later expansion, the code can be modified to enumerate the judgment can be
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove useless config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 removed useless config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docs should be updated too.