Skip to content

Commit d50148a

Browse files
committed
Supports iceberg sink #6198
1 parent 5c917bc commit d50148a

File tree

70 files changed

+5529
-236
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+5529
-236
lines changed
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
# Apache Iceberg
2+
3+
> Apache Iceberg sink connector
4+
5+
## Support Iceberg Version
6+
7+
- 1.4.2
8+
9+
## Support Those Engines
10+
11+
> Spark<br/>
12+
> Flink<br/>
13+
> SeaTunnel Zeta<br/>
14+
15+
## Description
16+
17+
Sink connector for Apache Iceberg. It can support cdc mode 、auto create table and table schema evolution.
18+
19+
## Supported DataSource Info
20+
21+
| Datasource | Dependent | Maven |
22+
|------------|-----------|---------------------------------------------------------------------------|
23+
| Iceberg | hive-exec | [Download](https://mvnrepository.com/artifact/org.apache.hive/hive-exec) |
24+
| Iceberg | libfb303 | [Download](https://mvnrepository.com/artifact/org.apache.thrift/libfb303) |
25+
26+
## Database Dependency
27+
28+
> In order to be compatible with different versions of Hadoop and Hive, the scope of hive-exec in the project pom file are provided, so if you use the Flink engine, first you may need to add the following Jar packages to <FLINK_HOME>/lib directory, if you are using the Spark engine and integrated with Hadoop, then you do not need to add the following Jar packages.
29+
30+
```
31+
hive-exec-xxx.jar
32+
libfb303-xxx.jar
33+
```
34+
35+
> Some versions of the hive-exec package do not have libfb303-xxx.jar, so you also need to manually import the Jar package.
36+
37+
## Data Type Mapping
38+
39+
| SeaTunnel Data type | Iceberg Data type |
40+
|---------------------|-------------------|
41+
| BOOLEAN | BOOLEAN |
42+
| INT | INTEGER |
43+
| BIGINT | LONG |
44+
| FLOAT | FLOAT |
45+
| DOUBLE | DOUBLE |
46+
| DATE | DATE |
47+
| TIME | TIME |
48+
| TIMESTAMP | TIMESTAMP |
49+
| STRING | STRING |
50+
| BYTES | FIXED<br/>BINARY |
51+
| DECIMAL | DECIMAL |
52+
| ROW | STRUCT |
53+
| ARRAY | LIST |
54+
| MAP | MAP |
55+
56+
## Sink Options
57+
58+
| Name | Type | Required | Default | Description |
59+
|----------------------------------------|---------|----------|------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
60+
| catalog_name | string | yes | default | User-specified catalog name. default is `default` |
61+
| namespace | string | yes | default | The iceberg database name in the backend catalog. default is `default` |
62+
| table | string | yes | - | The iceberg table name in the backend catalog. |
63+
| iceberg.catalog.config | map | yes | - | Specify the properties for initializing the Iceberg catalog, which can be referenced in this file:"https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/CatalogProperties.java" |
64+
| hadoop.config | map | no | - | Properties passed through to the Hadoop configuration |
65+
| iceberg.hadoop-conf-path | string | no | - | The specified loading paths for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files. |
66+
| case_sensitive | boolean | no | false | If data columns where selected via schema [config], controls whether the match to the schema will be done with case sensitivity. |
67+
| iceberg.table.write-props | map | no | - | Properties passed through to Iceberg writer initialization, these take precedence, such as 'write.format.default', 'write.target-file-size-bytes', and other settings, can be found with specific parameters at 'https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/TableProperties.java'. |
68+
| iceberg.table.auto-create-props | map | no | - | Configuration specified by Iceberg during automatic table creation. |
69+
| iceberg.table.schema-evolution-enabled | boolean | no | false | Setting to true enables Iceberg tables to support schema evolution during the synchronization process |
70+
| iceberg.table.primary-keys | string | no | - | Default comma-separated list of columns that identify a row in tables (primary key) |
71+
| iceberg.table.partition-keys | string | no | - | Default comma-separated list of partition fields to use when creating tables |
72+
| iceberg.table.upsert-mode-enabled | boolean | no | false | Set to `true` to enable upsert mode, default is `false` |
73+
| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | the schema save mode, please refer to `schema_save_mode` below |
74+
| data_save_mode | Enum | no | APPEND_DATA | the data save mode, please refer to `data_save_mode` below |
75+
76+
## Task Example
77+
78+
### Simple:
79+
80+
```hocon
81+
env {
82+
parallelism = 1
83+
job.mode = "STREAMING"
84+
checkpoint.interval = 5000
85+
}
86+
87+
source {
88+
MySQL-CDC {
89+
result_table_name = "customers_mysql_cdc_iceberg"
90+
server-id = 5652
91+
username = "st_user"
92+
password = "seatunnel"
93+
table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"]
94+
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
95+
}
96+
}
97+
98+
transform {
99+
}
100+
101+
sink {
102+
Iceberg {
103+
catalog_name="seatunnel_test"
104+
iceberg.catalog.config={
105+
"type"="hadoop"
106+
"warehouse"="file:///tmp/seatunnel/iceberg/hadoop-sink/"
107+
}
108+
namespace="seatunnel_namespace"
109+
table="iceberg_sink_table"
110+
iceberg.table.write-props={
111+
write.format.default="parquet"
112+
write.target-file-size-bytes=536870912
113+
}
114+
iceberg.table.primary-keys="id"
115+
iceberg.table.partition-keys="f_datetime"
116+
iceberg.table.upsert-mode-enabled=true
117+
iceberg.table.schema-evolution-enabled=true
118+
case_sensitive=true
119+
}
120+
}
121+
```
122+
123+
### Hive Catalog:
124+
125+
```hocon
126+
sink {
127+
Iceberg {
128+
catalog_name="seatunnel_test"
129+
iceberg.catalog.config={
130+
type = "hive"
131+
uri = "thrift://localhost:9083"
132+
warehouse = "hdfs://your_cluster//tmp/seatunnel/iceberg/"
133+
}
134+
namespace="seatunnel_namespace"
135+
table="iceberg_sink_table"
136+
iceberg.table.write-props={
137+
write.format.default="parquet"
138+
write.target-file-size-bytes=536870912
139+
}
140+
iceberg.table.primary-keys="id"
141+
iceberg.table.partition-keys="f_datetime"
142+
iceberg.table.upsert-mode-enabled=true
143+
iceberg.table.schema-evolution-enabled=true
144+
case_sensitive=true
145+
}
146+
}
147+
```
148+
149+
### Hadoop catalog:
150+
151+
```hocon
152+
sink {
153+
Iceberg {
154+
catalog_name="seatunnel_test"
155+
iceberg.catalog.config={
156+
type = "hadoop"
157+
warehouse = "hdfs://your_cluster/tmp/seatunnel/iceberg/"
158+
}
159+
namespace="seatunnel_namespace"
160+
table="iceberg_sink_table"
161+
iceberg.table.write-props={
162+
write.format.default="parquet"
163+
write.target-file-size-bytes=536870912
164+
}
165+
iceberg.table.primary-keys="id"
166+
iceberg.table.partition-keys="f_datetime"
167+
iceberg.table.upsert-mode-enabled=true
168+
iceberg.table.schema-evolution-enabled=true
169+
case_sensitive=true
170+
}
171+
}
172+
173+
```
174+
175+
## Changelog
176+
177+
### 2.3.4-SNAPSHOT 2024-01-18
178+
179+
- Add Iceberg Sink Connector
180+
181+
### next version
182+

0 commit comments

Comments
 (0)