Skip to content

Commit f16aed5

Browse files
hailin0chaorongzhi
authored andcommitted
[Feature][Core] Support using upstream table placeholders in sink options and auto replacement (apache#7131)
1 parent a80f509 commit f16aed5

File tree

79 files changed

+1680
-537
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

79 files changed

+1680
-537
lines changed

docs/en/concept/connector-v2-features.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,7 @@ For sink connector, the sink connector supports exactly-once if any piece of dat
6969
### cdc(change data capture)
7070

7171
If a sink connector supports writing row kinds(INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE) based on primary key, we think it supports cdc(change data capture).
72+
73+
### support multiple table write
74+
75+
Supports write multiple tables in one SeaTunnel job, users can dynamically specify the table's identifier by [configuring placeholders](./sink-options-placeholders.md).
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
# Sink Options Placeholders
2+
3+
## Introduction
4+
5+
The SeaTunnel provides a sink options placeholders feature that allows you to get upstream table metadata through placeholders.
6+
7+
This functionality is essential when you need to dynamically get upstream table metadata (such as multi-table writes).
8+
9+
This document will guide you through the usage of these placeholders and how to leverage them effectively.
10+
11+
## Support Those Engines
12+
13+
> SeaTunnel Zeta<br/>
14+
> Flink<br/>
15+
> Spark<br/>
16+
17+
## Placeholder
18+
19+
The placeholders are mainly controlled by the following expressions:
20+
21+
- `${database_name}`
22+
- Used to get the database in the upstream catalog table
23+
- Default values can also be specified via expressions:`${database_name:default_my_db}`
24+
- `${schema_name}`
25+
- Used to get the schema in the upstream catalog table
26+
- Default values can also be specified via expressions:`${schema_name:default_my_schema}`
27+
- `${table_name}`
28+
- Used to get the table in the upstream catalog table
29+
- Default values can also be specified via expressions:`${table_name:default_my_table}`
30+
- `${schema_full_name}`
31+
- Used to get the schema full path(database & schema) in the upstream catalog table
32+
- `${table_full_name}`
33+
- Used to get the table full path(database & schema & table) in the upstream catalog table
34+
- `${primary_key}`
35+
- Used to get the table primary-key fields in the upstream catalog table
36+
- `${unique_key}`
37+
- Used to get the table unique-key fields in the upstream catalog table
38+
- `${field_names}`
39+
- Used to get the table field keys in the upstream catalog table
40+
41+
## Configuration
42+
43+
*Requires*:
44+
- Make sure the sink connector you are using has implemented `TableSinkFactory` API
45+
46+
### Example 1
47+
48+
```hocon
49+
env {
50+
// ignore...
51+
}
52+
source {
53+
MySQL-CDC {
54+
// ignore...
55+
}
56+
}
57+
58+
transform {
59+
// ignore...
60+
}
61+
62+
sink {
63+
jdbc {
64+
url = "jdbc:mysql://localhost:3306"
65+
driver = "com.mysql.cj.jdbc.Driver"
66+
user = "root"
67+
password = "123456"
68+
69+
database = "${database_name}_test"
70+
table = "${table_name}_test"
71+
primary_keys = ["${primary_key}"]
72+
}
73+
}
74+
```
75+
76+
### Example 2
77+
78+
```hocon
79+
env {
80+
// ignore...
81+
}
82+
source {
83+
Oracle-CDC {
84+
// ignore...
85+
}
86+
}
87+
88+
transform {
89+
// ignore...
90+
}
91+
92+
sink {
93+
jdbc {
94+
url = "jdbc:mysql://localhost:3306"
95+
driver = "com.mysql.cj.jdbc.Driver"
96+
user = "root"
97+
password = "123456"
98+
99+
database = "${schema_name}_test"
100+
table = "${table_name}_test"
101+
primary_keys = ["${primary_key}"]
102+
}
103+
}
104+
```
105+
106+
We will complete the placeholder replacement before the connector is started, ensuring that the sink options is ready before use.
107+
If the variable is not replaced, it may be that the upstream table metadata is missing this option, for example:
108+
- `mysql` source not contain `${schema_name}`
109+
- `oracle` source not contain `${databse_name}`
110+
- ...

docs/en/connector-v2/sink/Doris.md

Lines changed: 92 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
- [x] [exactly-once](../../concept/connector-v2-features.md)
2020
- [x] [cdc](../../concept/connector-v2-features.md)
21+
- [x] [support multiple table write](../../concept/connector-v2-features.md)
2122

2223
## Description
2324

@@ -76,7 +77,7 @@ and the default template can be modified according to the situation.
7677
Default template:
7778

7879
```sql
79-
CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` (
80+
CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (
8081
${rowtype_primary_key},
8182
${rowtype_fields}
8283
) ENGINE=OLAP
@@ -93,7 +94,7 @@ DISTRIBUTED BY HASH (${rowtype_primary_key})
9394
If a custom field is filled in the template, such as adding an `id` field
9495

9596
```sql
96-
CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}`
97+
CREATE TABLE IF NOT EXISTS `${database}`.`${table}`
9798
(
9899
id,
99100
${rowtype_fields}
@@ -323,6 +324,95 @@ sink {
323324
}
324325
```
325326

327+
### Multiple table
328+
329+
#### example1
330+
331+
```hocon
332+
env {
333+
parallelism = 1
334+
job.mode = "STREAMING"
335+
checkpoint.interval = 5000
336+
}
337+
338+
source {
339+
Mysql-CDC {
340+
base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
341+
username = "root"
342+
password = "******"
343+
344+
table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"]
345+
}
346+
}
347+
348+
transform {
349+
}
350+
351+
sink {
352+
Doris {
353+
fenodes = "doris_cdc_e2e:8030"
354+
username = root
355+
password = ""
356+
database = "${database_name}_test"
357+
table = "${table_name}_test"
358+
sink.label-prefix = "test-cdc"
359+
sink.enable-2pc = "true"
360+
sink.enable-delete = "true"
361+
doris.config {
362+
format = "json"
363+
read_json_by_line = "true"
364+
}
365+
}
366+
}
367+
```
368+
369+
#### example2
370+
371+
```hocon
372+
env {
373+
parallelism = 1
374+
job.mode = "BATCH"
375+
}
376+
377+
source {
378+
Jdbc {
379+
driver = oracle.jdbc.driver.OracleDriver
380+
url = "jdbc:oracle:thin:@localhost:1521/XE"
381+
user = testUser
382+
password = testPassword
383+
384+
table_list = [
385+
{
386+
table_path = "TESTSCHEMA.TABLE_1"
387+
},
388+
{
389+
table_path = "TESTSCHEMA.TABLE_2"
390+
}
391+
]
392+
}
393+
}
394+
395+
transform {
396+
}
397+
398+
sink {
399+
Doris {
400+
fenodes = "doris_cdc_e2e:8030"
401+
username = root
402+
password = ""
403+
database = "${schema_name}_test"
404+
table = "${table_name}_test"
405+
sink.label-prefix = "test-cdc"
406+
sink.enable-2pc = "true"
407+
sink.enable-delete = "true"
408+
doris.config {
409+
format = "json"
410+
read_json_by_line = "true"
411+
}
412+
}
413+
}
414+
```
415+
326416
## Changelog
327417

328418
### 2.3.0-beta 2022-10-20

docs/en/connector-v2/sink/Druid.md

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ Write data to Druid
99
## Key features
1010

1111
- [ ] [exactly-once](../../concept/connector-v2-features.md)
12+
- [x] [support multiple table write](../../concept/connector-v2-features.md)
1213

1314
## Data Type Mapping
1415

@@ -52,10 +53,25 @@ Sink plugin common parameters, please refer to [Sink Common Options](common-opti
5253

5354
## Example
5455

56+
Simple example:
57+
58+
```hocon
59+
sink {
60+
Druid {
61+
coordinatorUrl = "testHost:8888"
62+
datasource = "seatunnel"
63+
}
64+
}
65+
```
66+
67+
Use placeholders get upstream table metadata example:
68+
5569
```hocon
56-
Druid {
57-
coordinatorUrl = "testHost:8888"
58-
datasource = "seatunnel"
70+
sink {
71+
Druid {
72+
coordinatorUrl = "testHost:8888"
73+
datasource = "${table_name}_test"
74+
}
5975
}
6076
```
6177

docs/en/connector-v2/sink/Hive.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ If you use SeaTunnel Engine, You need put seatunnel-hadoop3-3.1.4-uber.jar and h
1515

1616
## Key features
1717

18+
- [x] [support multiple table write](../../concept/connector-v2-features.md)
1819
- [x] [exactly-once](../../concept/connector-v2-features.md)
1920

2021
By default, we use 2PC commit to ensure `exactly-once`

docs/en/connector-v2/sink/Http.md

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
- [ ] [exactly-once](../../concept/connector-v2-features.md)
1414
- [ ] [cdc](../../concept/connector-v2-features.md)
15+
- [x] [support multiple table write](../../concept/connector-v2-features.md)
1516

1617
## Description
1718

@@ -56,6 +57,75 @@ Http {
5657
}
5758
```
5859

60+
### Multiple table
61+
62+
#### example1
63+
64+
```hocon
65+
env {
66+
parallelism = 1
67+
job.mode = "STREAMING"
68+
checkpoint.interval = 5000
69+
}
70+
71+
source {
72+
Mysql-CDC {
73+
base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
74+
username = "root"
75+
password = "******"
76+
77+
table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"]
78+
}
79+
}
80+
81+
transform {
82+
}
83+
84+
sink {
85+
Http {
86+
...
87+
url = "http://localhost/test/${database_name}_test/${table_name}_test"
88+
}
89+
}
90+
```
91+
92+
#### example2
93+
94+
```hocon
95+
env {
96+
parallelism = 1
97+
job.mode = "BATCH"
98+
}
99+
100+
source {
101+
Jdbc {
102+
driver = oracle.jdbc.driver.OracleDriver
103+
url = "jdbc:oracle:thin:@localhost:1521/XE"
104+
user = testUser
105+
password = testPassword
106+
107+
table_list = [
108+
{
109+
table_path = "TESTSCHEMA.TABLE_1"
110+
},
111+
{
112+
table_path = "TESTSCHEMA.TABLE_2"
113+
}
114+
]
115+
}
116+
}
117+
118+
transform {
119+
}
120+
121+
sink {
122+
Http {
123+
...
124+
url = "http://localhost/test/${schema_name}_test/${table_name}_test"
125+
}
126+
}
127+
```
128+
59129
## Changelog
60130

61131
### 2.2.0-beta 2022-09-26

0 commit comments

Comments
 (0)