Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions docs/en/connector-v2/sink/Http.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor
| retry_backoff_max_ms | Int | No | 10000 | The maximum retry-backoff times(millis) if request http failed |
| connect_timeout_ms | Int | No | 12000 | Connection timeout setting, default 12s. |
| socket_timeout_ms | Int | No | 60000 | Socket timeout setting, default 60s. |
| array_mode | Boolean| No | false | Send data as a JSON array when true, or as a single JSON object when false (default) |
| batch_size | Int | No | 1 | The batch size of records to send in one HTTP request. Only works when array_mode is true. |
| request_interval_ms | Int | No | 0 | The interval milliseconds between two HTTP requests, to avoid sending requests too frequently. |
| format | String | No | json | The format of batch data. Currently only "json" is supported, which will send data as JSON array. |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what another format we can support in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently think of just json, later expansion, the code can be modified to enumerate the judgment can be

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove useless config.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 removed useless config.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docs should be updated too.

| common-options | | No | - | Sink plugin common parameters, please refer to [Sink Common Options](../sink-common-options.md) for details |

## Example
Expand All @@ -59,6 +63,22 @@ Http {
}
```

### With Batch Processing

```hocon
Http {
url = "http://localhost/test/webhook"
headers {
token = "9e32e859ef044462a257e1fc76730066"
Content-Type = "application/json"
}
array_mode = true
batch_size = 50
request_interval_ms = 500
format = "json"
}
```

### Multiple table

#### example1
Expand Down
20 changes: 20 additions & 0 deletions docs/zh/connector-v2/sink/Http.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ import ChangeLog from '../changelog/connector-http.md';
| retry_backoff_max_ms | Int | 否 | 10000 | http请求失败,最大重试回退时间(毫秒) |
| connect_timeout_ms | Int | 否 | 12000 | 连接超时设置,默认12s |
| socket_timeout_ms | Int | 否 | 60000 | 套接字超时设置,默认为60s |
| array_mode | Boolean| 否 | false | 为true时将数据作为JSON数组发送,为false时作为单个JSON对象发送(默认) |
| batch_size | Int | 否 | 1 | 在一个HTTP请求中发送的记录批量大小。仅在array_mode为true时有效 |
| request_interval_ms | Int | 否 | 0 | 两次HTTP请求之间的间隔毫秒数,以避免请求过于频繁 |
| format | String | 否 | json | 批量数据的格式。目前只支持"json",将以JSON数组形式发送数据 |
| common-options | | 否 | - | Sink插件常用参数,请参考 [Sink常用选项 ](../sink-common-options.md) 了解详情 |

## 示例
Expand All @@ -57,6 +61,22 @@ Http {
}
```

### 带批处理的示例

```hocon
Http {
url = "http://localhost/test/webhook"
headers {
token = "9e32e859ef044462a257e1fc76730066"
Content-Type = "application/json"
}
array_mode = true
batch_size = 50
request_interval_ms = 500
format = "json"
}
```

## 变更日志

<ChangeLog />
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,33 @@

package org.apache.seatunnel.connectors.seatunnel.http.config;

public class HttpSinkOptions extends HttpCommonOptions {}
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;

public class HttpSinkOptions extends HttpCommonOptions {
public static final Option<Boolean> ARRAY_MODE =
Options.key("array_mode")
.booleanType()
.defaultValue(false)
.withDescription(
"Send data as a JSON array when true, or as a single JSON object when false (default)");

public static final Option<Integer> BATCH_SIZE =
Options.key("batch_size")
.intType()
.defaultValue(1)
.withDescription(
"The batch size of records to send in one HTTP request. Only works when array_mode is true");

public static final Option<Integer> REQUEST_INTERVAL_MS =
Options.key("request_interval_ms")
.intType()
.defaultValue(0)
.withDescription("The interval milliseconds between two HTTP requests");

public static final Option<String> FORMAT =
Copy link

Copilot AI May 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The description for the FORMAT option indicates that only 'json' is supported; consider clarifying the documentation or renaming the option to avoid confusion regarding supported formats.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

@ocean-zhc ocean-zhc May 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Http.md documentation already states that

Options.key("format")
.stringType()
.defaultValue("json")
.withDescription("The format of the batch data, only support json now");
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,29 @@ public String getPluginName() {

@Override
public HttpSinkWriter createWriter(SinkWriter.Context context) throws IOException {
return new HttpSinkWriter(seaTunnelRowType, httpParameter);
boolean arrayMode = pluginConfig.get(HttpSinkOptions.ARRAY_MODE);
int batchSize = pluginConfig.get(HttpSinkOptions.BATCH_SIZE);
int requestIntervalMs = pluginConfig.get(HttpSinkOptions.REQUEST_INTERVAL_MS);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

String format = pluginConfig.get(HttpSinkOptions.FORMAT);
return new DefaultHttpSinkWriter(
seaTunnelRowType, httpParameter, arrayMode, batchSize, requestIntervalMs, format);
}

@Override
public Optional<CatalogTable> getWriteCatalogTable() {
return Optional.ofNullable(catalogTable);
}

/** Default HttpSinkWriter Implementation */
private static class DefaultHttpSinkWriter extends HttpSinkWriter {
public DefaultHttpSinkWriter(
SeaTunnelRowType seaTunnelRowType,
HttpParameter httpParameter,
boolean arrayMode,
int batchSize,
int requestIntervalMs,
String format) {
super(seaTunnelRowType, httpParameter, arrayMode, batchSize, requestIntervalMs, format);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public OptionRule optionRule() {
.optional(HttpSinkOptions.RETRY)
.optional(HttpSinkOptions.RETRY_BACKOFF_MULTIPLIER_MS)
.optional(HttpSinkOptions.RETRY_BACKOFF_MAX_MS)
.optional(HttpSinkOptions.ARRAY_MODE)
.optional(HttpSinkOptions.BATCH_SIZE)
.optional(HttpSinkOptions.REQUEST_INTERVAL_MS)
.optional(HttpSinkOptions.FORMAT)
.optional(SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.seatunnel.connectors.seatunnel.http.sink;

import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ArrayNode;

import org.apache.seatunnel.api.serialization.SerializationSchema;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
Expand All @@ -30,6 +33,8 @@
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

@Slf4j
Expand All @@ -40,6 +45,14 @@ public class HttpSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
protected final HttpParameter httpParameter;
protected final SerializationSchema serializationSchema;

// Batch related fields
private final boolean arrayMode;
private final int batchSize;
private final int requestIntervalMs;
private final String format;
private final List<SeaTunnelRow> batchBuffer;
private long lastRequestTime;

public HttpSinkWriter(SeaTunnelRowType seaTunnelRowType, HttpParameter httpParameter) {
this(seaTunnelRowType, httpParameter, new JsonSerializationSchema(seaTunnelRowType));
}
Expand All @@ -48,18 +61,107 @@ public HttpSinkWriter(
SeaTunnelRowType seaTunnelRowType,
HttpParameter httpParameter,
SerializationSchema serializationSchema) {
this(seaTunnelRowType, httpParameter, serializationSchema, false, 1, 0, "json");
}

public HttpSinkWriter(
SeaTunnelRowType seaTunnelRowType,
HttpParameter httpParameter,
boolean arrayMode,
int batchSize,
int requestIntervalMs,
String format) {
this(
seaTunnelRowType,
httpParameter,
new JsonSerializationSchema(seaTunnelRowType),
arrayMode,
batchSize,
requestIntervalMs,
format);
}

public HttpSinkWriter(
SeaTunnelRowType seaTunnelRowType,
HttpParameter httpParameter,
SerializationSchema serializationSchema,
boolean arrayMode,
int batchSize,
int requestIntervalMs,
String format) {
this.seaTunnelRowType = seaTunnelRowType;
this.httpParameter = httpParameter;
this.httpClient = new HttpClientProvider(httpParameter);
this.httpClient = createHttpClient(httpParameter);
this.serializationSchema = serializationSchema;
this.arrayMode = arrayMode;
this.batchSize = batchSize;
this.requestIntervalMs = requestIntervalMs;
this.format = format;
this.batchBuffer = new ArrayList<>(batchSize);
this.lastRequestTime = System.currentTimeMillis();
}

@Override
public void write(SeaTunnelRow element) throws IOException {
if (!arrayMode) {
// Object mode: send each record individually, ignore batch_size setting
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Object mode: send each record individually, ignore batch_size setting

writeSingleRecord(element);
} else {
// Array mode: batch processing
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Array mode: batch processing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

batchBuffer.add(element);
if (batchBuffer.size() >= batchSize) {
flush();
}
}
}

private void writeSingleRecord(SeaTunnelRow element) throws IOException {
byte[] serialize = serializationSchema.serialize(element);
String body = new String(serialize);
doHttpRequest(body);
}

private void flush() throws IOException {
if (batchBuffer.isEmpty()) {
return;
}

// Check request interval
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Check request interval

Please do not add useless comment.

long currentTime = System.currentTimeMillis();
long timeSinceLastRequest = currentTime - lastRequestTime;
if (requestIntervalMs > 0 && timeSinceLastRequest < requestIntervalMs) {
try {
Thread.sleep(requestIntervalMs - timeSinceLastRequest);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Sleep interrupted", e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should throw exception directly. Othewise the writer woule never be closed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

}
}

// Array mode: serialize batch data
if ("json".equalsIgnoreCase(format)) {
ObjectMapper mapper = new ObjectMapper();
ArrayNode arrayNode = mapper.createArrayNode();
for (SeaTunnelRow row : batchBuffer) {
byte[] serialize = serializationSchema.serialize(row);
arrayNode.add(new String(serialize));
}
String body = mapper.writeValueAsString(arrayNode);
doHttpRequest(body);
} else {
log.warn("Unsupported format: {}, fallback to sending records one by one", format);
for (SeaTunnelRow row : batchBuffer) {
writeSingleRecord(row);
}
}

batchBuffer.clear();
lastRequestTime = System.currentTimeMillis();
}

private void doHttpRequest(String body) {
try {
// only support post web hook
// Send HTTP request
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Send HTTP request

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

HttpResponse response =
httpClient.doPost(httpParameter.getUrl(), httpParameter.getHeaders(), body);
if (HttpResponse.STATUS_OK == response.getCode()) {
Expand All @@ -76,8 +178,15 @@ public void write(SeaTunnelRow element) throws IOException {

@Override
public void close() throws IOException {
if (arrayMode) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should invoke flush method when invoke prepareCommit method too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

flush(); // Ensure that all data in the buffer is sent out before shutdown
}
if (Objects.nonNull(httpClient)) {
httpClient.close();
}
}

protected HttpClientProvider createHttpClient(HttpParameter httpParameter) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
protected HttpClientProvider createHttpClient(HttpParameter httpParameter) {
@VisibleForTesting
protected HttpClientProvider createHttpClient(HttpParameter httpParameter) {

return new HttpClientProvider(httpParameter);
}
}
Loading