Skip to content

Commit 736a613

Browse files
committed
[Improve][MulitTableSink] Refactor multi-table configurations
1 parent ae81879 commit 736a613

File tree

52 files changed

+1213
-451
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+1213
-451
lines changed

docs/en/concept/connector-v2-features.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,10 @@ For sink connector, the sink connector supports exactly-once if any piece of dat
6969
### cdc(change data capture)
7070

7171
If a sink connector supports writing row kinds(INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE) based on primary key, we think it supports cdc(change data capture).
72+
73+
### support multiple table write
74+
75+
Supports write multiple tables in one SeaTunnel job, users can dynamically specify the table's identifier by [configuring placeholders](./sink-options-placeholders.md).
76+
77+
requires:
78+
1. Support apis: `SupportMultiTableSink``SupportMultiTableSinkWriter``SupportMultiTableSinkAggregatedCommitter`
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
# Sink Options Placeholders.md
2+
3+
## Introduction
4+
5+
The SeaTunnel provides a sink options placeholders feature that allows you to get upstream table metadata through placeholders.
6+
7+
This functionality is essential when you need to dynamically get upstream table metadata (such as multi-table writes).
8+
9+
This document will guide you through the usage of these placeholders and how to leverage them effectively.
10+
11+
## Support Those Engines
12+
13+
> SeaTunnel Zeta<br/>
14+
> Flink<br/>
15+
> Spark<br/>
16+
17+
## Placeholder
18+
19+
The placeholders are mainly controlled by the following expressions:
20+
21+
- `${database_name}`
22+
- Used to get the database in the upstream catalog table
23+
- Default values can also be specified via expressions:`${database_name:default_my_db}`
24+
- `${schema_name}`
25+
- Used to get the schema in the upstream catalog table
26+
- Default values can also be specified via expressions:`${schema_name:default_my_schema}`
27+
- `${table_name}`
28+
- Used to get the table in the upstream catalog table
29+
- Default values can also be specified via expressions:`${table_name:default_my_table}`
30+
- `${schema_full_name}`
31+
- Used to get the schema full path(database & schema) in the upstream catalog table
32+
- `${table_full_name}`
33+
- Used to get the table full path(database & schema & table) in the upstream catalog table
34+
- `${primary_key}`
35+
- Used to get the table primary-key fields in the upstream catalog table
36+
- `${unique_key}`
37+
- Used to get the table unique-key fields in the upstream catalog table
38+
- `${field_names}`
39+
- Used to get the table field keys in the upstream catalog table
40+
41+
## Configuration
42+
43+
Requires:
44+
- Make sure the sink connector you are using has implemented `TableSinkFactory` API
45+
- Make sure the sink connector you are using has implemented `TableSinkFactory` API
46+
47+
### Example 1
48+
49+
```hocon
50+
env {
51+
// ignore...
52+
}
53+
source {
54+
MySQL-CDC {
55+
// ignore...
56+
}
57+
}
58+
59+
transform {
60+
// ignore...
61+
}
62+
63+
sink {
64+
jdbc {
65+
url = "jdbc:mysql://localhost:3306"
66+
driver = "com.mysql.cj.jdbc.Driver"
67+
user = "root"
68+
password = "123456"
69+
70+
database = "${database_name}_test"
71+
table = "${table_name}_test"
72+
primary_keys = ["${primary_key}"]
73+
}
74+
}
75+
```
76+
77+
### Example 2
78+
79+
```hocon
80+
env {
81+
// ignore...
82+
}
83+
source {
84+
Oracle-CDC {
85+
// ignore...
86+
}
87+
}
88+
89+
transform {
90+
// ignore...
91+
}
92+
93+
sink {
94+
jdbc {
95+
url = "jdbc:mysql://localhost:3306"
96+
driver = "com.mysql.cj.jdbc.Driver"
97+
user = "root"
98+
password = "123456"
99+
100+
database = "${schema_name}_test"
101+
table = "${table_name}_test"
102+
primary_keys = ["${primary_key}"]
103+
}
104+
}
105+
```
106+
107+
We will complete the placeholder replacement before the connector is started, ensuring that the sink options is ready before use.
108+
If the variable is not replaced, it may be that the upstream table metadata is missing this option, for example:
109+
- `mysql` source not contain `${schema_name}`
110+
- `oracle` source not contain `${databse_name}`
111+
- ...

docs/en/connector-v2/sink/Doris.md

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
- [x] [exactly-once](../../concept/connector-v2-features.md)
2020
- [x] [cdc](../../concept/connector-v2-features.md)
21+
- [x] [support multiple table write](../../concept/connector-v2-features.md)
2122

2223
## Description
2324

@@ -323,6 +324,95 @@ sink {
323324
}
324325
```
325326

327+
### Multiple table
328+
329+
#### example1
330+
331+
```hocon
332+
env {
333+
parallelism = 1
334+
job.mode = "STREAMING"
335+
checkpoint.interval = 5000
336+
}
337+
338+
source {
339+
Mysql-CDC {
340+
base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
341+
username = "root"
342+
password = "******"
343+
344+
table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"]
345+
}
346+
}
347+
348+
transform {
349+
}
350+
351+
sink {
352+
Doris {
353+
fenodes = "doris_cdc_e2e:8030"
354+
username = root
355+
password = ""
356+
database = "${database_name}_test"
357+
table = "${table_name}_test"
358+
sink.label-prefix = "test-cdc"
359+
sink.enable-2pc = "true"
360+
sink.enable-delete = "true"
361+
doris.config {
362+
format = "json"
363+
read_json_by_line = "true"
364+
}
365+
}
366+
}
367+
```
368+
369+
#### example2
370+
371+
```hocon
372+
env {
373+
parallelism = 1
374+
job.mode = "BATCH"
375+
}
376+
377+
source {
378+
Jdbc {
379+
driver = oracle.jdbc.driver.OracleDriver
380+
url = "jdbc:oracle:thin:@localhost:1521/XE"
381+
user = testUser
382+
password = testPassword
383+
384+
table_list = [
385+
{
386+
table_path = "TESTSCHEMA.TABLE_1"
387+
},
388+
{
389+
table_path = "TESTSCHEMA.TABLE_2"
390+
}
391+
]
392+
}
393+
}
394+
395+
transform {
396+
}
397+
398+
sink {
399+
Doris {
400+
fenodes = "doris_cdc_e2e:8030"
401+
username = root
402+
password = ""
403+
database = "${schema_name}_test"
404+
table = "${table_name}_test"
405+
sink.label-prefix = "test-cdc"
406+
sink.enable-2pc = "true"
407+
sink.enable-delete = "true"
408+
doris.config {
409+
format = "json"
410+
read_json_by_line = "true"
411+
}
412+
}
413+
}
414+
```
415+
326416
## Changelog
327417

328418
### 2.3.0-beta 2022-10-20

docs/en/connector-v2/sink/Hive.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ If you use SeaTunnel Engine, You need put seatunnel-hadoop3-3.1.4-uber.jar and h
1515

1616
## Key features
1717

18+
- [x] [support multiple table write](../../concept/connector-v2-features.md)
1819
- [x] [exactly-once](../../concept/connector-v2-features.md)
1920

2021
By default, we use 2PC commit to ensure `exactly-once`

docs/en/connector-v2/sink/Http.md

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
- [ ] [exactly-once](../../concept/connector-v2-features.md)
1414
- [ ] [cdc](../../concept/connector-v2-features.md)
15+
- [x] [support multiple table write](../../concept/connector-v2-features.md)
1516

1617
## Description
1718

@@ -56,6 +57,75 @@ Http {
5657
}
5758
```
5859

60+
### Multiple table
61+
62+
#### example1
63+
64+
```hocon
65+
env {
66+
parallelism = 1
67+
job.mode = "STREAMING"
68+
checkpoint.interval = 5000
69+
}
70+
71+
source {
72+
Mysql-CDC {
73+
base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
74+
username = "root"
75+
password = "******"
76+
77+
table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"]
78+
}
79+
}
80+
81+
transform {
82+
}
83+
84+
sink {
85+
Http {
86+
...
87+
url = "http://localhost/test/${database_name}_test/${table_name}_test"
88+
}
89+
}
90+
```
91+
92+
#### example2
93+
94+
```hocon
95+
env {
96+
parallelism = 1
97+
job.mode = "BATCH"
98+
}
99+
100+
source {
101+
Jdbc {
102+
driver = oracle.jdbc.driver.OracleDriver
103+
url = "jdbc:oracle:thin:@localhost:1521/XE"
104+
user = testUser
105+
password = testPassword
106+
107+
table_list = [
108+
{
109+
table_path = "TESTSCHEMA.TABLE_1"
110+
},
111+
{
112+
table_path = "TESTSCHEMA.TABLE_2"
113+
}
114+
]
115+
}
116+
}
117+
118+
transform {
119+
}
120+
121+
sink {
122+
Http {
123+
...
124+
url = "http://localhost/test/${schema_name}_test/${table_name}_test"
125+
}
126+
}
127+
```
128+
59129
## Changelog
60130

61131
### 2.2.0-beta 2022-09-26

0 commit comments

Comments
 (0)