diff --git a/docs/en/connector-v2/source/CosFile.md b/docs/en/connector-v2/source/CosFile.md index 92222e221f6..f017fe7b5e2 100644 --- a/docs/en/connector-v2/source/CosFile.md +++ b/docs/en/connector-v2/source/CosFile.md @@ -72,6 +72,8 @@ To use this connector you need put hadoop-cos-{hadoop.version}-{version}.jar and | compress_codec | string | no | none | | archive_compress_codec | string | no | none | | encoding | string | no | UTF-8 | +| binary_chunk_size | int | no | 1024 | +| binary_complete_file_mode | boolean | no | false | | common-options | | no | - | ### path [string] @@ -365,6 +367,18 @@ Note: gz compressed excel file needs to compress the original file or specify th Only used when file_format_type is json,text,csv,xml. The encoding of the file to read. This param will be parsed by `Charset.forName(encoding)`. +### binary_chunk_size [int] + +Only used when file_format_type is binary. + +The chunk size (in bytes) for reading binary files. Default is 1024 bytes. Larger values may improve performance for large files but use more memory. + +### binary_complete_file_mode [boolean] + +Only used when file_format_type is binary. + +Whether to read the complete file as a single chunk instead of splitting into chunks. When enabled, the entire file content will be read into memory at once. Default is false. + ### common options Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details. @@ -420,6 +434,8 @@ source { region = "ap-chengdu" path = "/seatunnel/read/binary/" file_format_type = "binary" + binary_chunk_size = 2048 + binary_complete_file_mode = false } } sink { diff --git a/docs/en/connector-v2/source/FtpFile.md b/docs/en/connector-v2/source/FtpFile.md index 966c92c09b5..4219088e675 100644 --- a/docs/en/connector-v2/source/FtpFile.md +++ b/docs/en/connector-v2/source/FtpFile.md @@ -68,6 +68,8 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you | archive_compress_codec | string | no | none | | encoding | string | no | UTF-8 | | null_format | string | no | - | +| binary_chunk_size | int | no | 1024 | +| binary_complete_file_mode | boolean | no | false | | common-options | | no | - | ### host [string] @@ -380,6 +382,18 @@ null_format to define which strings can be represented as null. e.g: `\N` +### binary_chunk_size [int] + +Only used when file_format_type is binary. + +The chunk size (in bytes) for reading binary files. Default is 1024 bytes. Larger values may improve performance for large files but use more memory. + +### binary_complete_file_mode [boolean] + +Only used when file_format_type is binary. + +Whether to read the complete file as a single chunk instead of splitting into chunks. When enabled, the entire file content will be read into memory at once. Default is false. + ### common options Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details. @@ -482,6 +496,8 @@ source { password = tianchao path = "/seatunnel/read/binary/" file_format_type = "binary" + binary_chunk_size = 2048 + binary_complete_file_mode = false } } sink { diff --git a/docs/en/connector-v2/source/HdfsFile.md b/docs/en/connector-v2/source/HdfsFile.md index 04a23c3d976..0d628eeb017 100644 --- a/docs/en/connector-v2/source/HdfsFile.md +++ b/docs/en/connector-v2/source/HdfsFile.md @@ -71,6 +71,8 @@ Read data from hdfs file system. | archive_compress_codec | string | no | none | | encoding | string | no | UTF-8 | | | null_format | string | no | - | Only used when file_format_type is text. null_format to define which strings can be represented as null. e.g: `\N` | +| binary_chunk_size | int | no | 1024 | Only used when file_format_type is binary. The chunk size (in bytes) for reading binary files. Default is 1024 bytes. Larger values may improve performance for large files but use more memory. | +| binary_complete_file_mode | boolean | no | false | Only used when file_format_type is binary. Whether to read the complete file as a single chunk instead of splitting into chunks. When enabled, the entire file content will be read into memory at once. Default is false. | | common-options | | no | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details. | ### delimiter/field_delimiter [string] @@ -159,6 +161,18 @@ Note: gz compressed excel file needs to compress the original file or specify th Only used when file_format_type is json,text,csv,xml. The encoding of the file to read. This param will be parsed by `Charset.forName(encoding)`. +### binary_chunk_size [int] + +Only used when file_format_type is binary. + +The chunk size (in bytes) for reading binary files. Default is 1024 bytes. Larger values may improve performance for large files but use more memory. + +### binary_complete_file_mode [boolean] + +Only used when file_format_type is binary. + +Whether to read the complete file as a single chunk instead of splitting into chunks. When enabled, the entire file content will be read into memory at once. Default is false. + ### Tips > If you use spark/flink, In order to use this connector, You must ensure your spark/flink cluster already integrated hadoop. The tested hadoop version is 2.x. If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you download and install SeaTunnel Engine. You can check the jar package under ${SEATUNNEL_HOME}/lib to confirm this. diff --git a/docs/en/connector-v2/source/LocalFile.md b/docs/en/connector-v2/source/LocalFile.md index 408cfbfff35..804cd168c10 100644 --- a/docs/en/connector-v2/source/LocalFile.md +++ b/docs/en/connector-v2/source/LocalFile.md @@ -67,7 +67,9 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you | compress_codec | string | no | none | | archive_compress_codec | string | no | none | | encoding | string | no | UTF-8 | -| null_format | string | no | - | +| null_format | string | no | - | +| binary_chunk_size | int | no | 1024 | +| binary_complete_file_mode | boolean | no | false | | common-options | | no | - | | tables_configs | list | no | used to define a multiple table task | @@ -363,6 +365,18 @@ null_format to define which strings can be represented as null. e.g: `\N` +### binary_chunk_size [int] + +Only used when file_format_type is binary. + +The chunk size (in bytes) for reading binary files. Default is 1024 bytes. Larger values may improve performance for large files but use more memory. + +### binary_complete_file_mode [boolean] + +Only used when file_format_type is binary. + +Whether to read the complete file as a single chunk instead of splitting into chunks. When enabled, the entire file content will be read into memory at once. Default is false. + ### common options Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details @@ -477,6 +491,8 @@ source { LocalFile { path = "/seatunnel/read/binary/" file_format_type = "binary" + binary_chunk_size = 2048 + binary_complete_file_mode = false } } sink { diff --git a/docs/en/connector-v2/source/OssFile.md b/docs/en/connector-v2/source/OssFile.md index bf19076c8cd..76c2436e027 100644 --- a/docs/en/connector-v2/source/OssFile.md +++ b/docs/en/connector-v2/source/OssFile.md @@ -203,6 +203,8 @@ If you assign file type to `parquet` `orc`, schema option not required, connecto | compress_codec | string | no | none | Which compress codec the files used. | | encoding | string | no | UTF-8 | | null_format | string | no | - | Only used when file_format_type is text. null_format to define which strings can be represented as null. e.g: `\N` | +| binary_chunk_size | int | no | 1024 | Only used when file_format_type is binary. The chunk size (in bytes) for reading binary files. Default is 1024 bytes. Larger values may improve performance for large files but use more memory. | +| binary_complete_file_mode | boolean | no | false | Only used when file_format_type is binary. Whether to read the complete file as a single chunk instead of splitting into chunks. When enabled, the entire file content will be read into memory at once. Default is false. | | file_filter_pattern | string | no | | Filter pattern, which used for filtering files. | | common-options | config | no | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details. | @@ -221,6 +223,18 @@ The compress codec of files and the details that supported as the following show Only used when file_format_type is json,text,csv,xml. The encoding of the file to read. This param will be parsed by `Charset.forName(encoding)`. +### binary_chunk_size [int] + +Only used when file_format_type is binary. + +The chunk size (in bytes) for reading binary files. Default is 1024 bytes. Larger values may improve performance for large files but use more memory. + +### binary_complete_file_mode [boolean] + +Only used when file_format_type is binary. + +Whether to read the complete file as a single chunk instead of splitting into chunks. When enabled, the entire file content will be read into memory at once. Default is false. + ### file_filter_pattern [string] Filter pattern, which used for filtering files. diff --git a/docs/en/connector-v2/source/S3File.md b/docs/en/connector-v2/source/S3File.md index 2369096a546..7f5a6d6d2b9 100644 --- a/docs/en/connector-v2/source/S3File.md +++ b/docs/en/connector-v2/source/S3File.md @@ -211,6 +211,8 @@ If you assign file type to `parquet` `orc`, schema option not required, connecto | archive_compress_codec | string | no | none | | | encoding | string | no | UTF-8 | | | null_format | string | no | - | Only used when file_format_type is text. null_format to define which strings can be represented as null. e.g: `\N` | +| binary_chunk_size | int | no | 1024 | Only used when file_format_type is binary. The chunk size (in bytes) for reading binary files. Default is 1024 bytes. Larger values may improve performance for large files but use more memory. | +| binary_complete_file_mode | boolean | no | false | Only used when file_format_type is binary. Whether to read the complete file as a single chunk instead of splitting into chunks. When enabled, the entire file content will be read into memory at once. Default is false. | | file_filter_pattern | string | no | | Filter pattern, which used for filtering files. | | filename_extension | string | no | - | Filter filename extension, which used for filtering files with specific extension. Example: `csv` `.txt` `json` `.xml`. | | common-options | | no | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details. | @@ -301,6 +303,18 @@ Note: gz compressed excel file needs to compress the original file or specify th Only used when file_format_type is json,text,csv,xml. The encoding of the file to read. This param will be parsed by `Charset.forName(encoding)`. +### binary_chunk_size [int] + +Only used when file_format_type is binary. + +The chunk size (in bytes) for reading binary files. Default is 1024 bytes. Larger values may improve performance for large files but use more memory. + +### binary_complete_file_mode [boolean] + +Only used when file_format_type is binary. + +Whether to read the complete file as a single chunk instead of splitting into chunks. When enabled, the entire file content will be read into memory at once. Default is false. + ## Example 1. In this example, We read data from s3 path `s3a://seatunnel-test/seatunnel/text` and the file type is orc in this path. diff --git a/docs/en/connector-v2/source/SftpFile.md b/docs/en/connector-v2/source/SftpFile.md index b6745cc681e..0a643af9a13 100644 --- a/docs/en/connector-v2/source/SftpFile.md +++ b/docs/en/connector-v2/source/SftpFile.md @@ -99,6 +99,8 @@ The File does not have a specific type list, and we can indicate which SeaTunnel | archive_compress_codec | string | no | none | | encoding | string | no | UTF-8 | | null_format | string | no | - | Only used when file_format_type is text. null_format to define which strings can be represented as null. e.g: `\N` | +| binary_chunk_size | int | no | 1024 | Only used when file_format_type is binary. The chunk size (in bytes) for reading binary files. Default is 1024 bytes. Larger values may improve performance for large files but use more memory. | +| binary_complete_file_mode | boolean | no | false | Only used when file_format_type is binary. Whether to read the complete file as a single chunk instead of splitting into chunks. When enabled, the entire file content will be read into memory at once. Default is false. | | common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details. | ### file_filter_pattern [string] @@ -254,6 +256,18 @@ Note: gz compressed excel file needs to compress the original file or specify th Only used when file_format_type is json,text,csv,xml. The encoding of the file to read. This param will be parsed by `Charset.forName(encoding)`. +### binary_chunk_size [int] + +Only used when file_format_type is binary. + +The chunk size (in bytes) for reading binary files. Default is 1024 bytes. Larger values may improve performance for large files but use more memory. + +### binary_complete_file_mode [boolean] + +Only used when file_format_type is binary. + +Whether to read the complete file as a single chunk instead of splitting into chunks. When enabled, the entire file content will be read into memory at once. Default is false. + ### schema [config] #### fields [Config] diff --git a/docs/zh/connector-v2/source/CosFile.md b/docs/zh/connector-v2/source/CosFile.md index b160eafecd0..37069108af7 100644 --- a/docs/zh/connector-v2/source/CosFile.md +++ b/docs/zh/connector-v2/source/CosFile.md @@ -71,6 +71,8 @@ import ChangeLog from '../changelog/connector-file-cos.md'; | compress_codec | string | 否 | none | | archive_compress_codec | string | 否 | none | | encoding | string | 否 | UTF-8 | +| binary_chunk_size | int | 否 | 1024 | +| binary_complete_file_mode | boolean | 否 | false | | common-options | | 否 | - | ### path [string] @@ -359,6 +361,18 @@ default `HH:mm:ss` 仅当file_format_type为json、text、csv、xml时使用。 要读取的文件的编码。此参数将由`Charset.forName(encoding)`解析。 +### binary_chunk_size [int] + +仅在 file_format_type 为 binary 时使用。 + +读取二进制文件的块大小(以字节为单位)。默认为 1024 字节。较大的值可能会提高大文件的性能,但会使用更多内存。 + +### binary_complete_file_mode [boolean] + +仅在 file_format_type 为 binary 时使用。 + +是否将完整文件作为单个块读取,而不是分割成块。启用时,整个文件内容将一次性读入内存。默认为 false。 + ### common options 源插件常用参数,详见[源端通用选项](../Source-common-Options.md)。 @@ -414,6 +428,8 @@ source { region = "ap-chengdu" path = "/seatunnel/read/binary/" file_format_type = "binary" + binary_chunk_size = 2048 + binary_complete_file_mode = false } } sink { diff --git a/docs/zh/connector-v2/source/FtpFile.md b/docs/zh/connector-v2/source/FtpFile.md index 51b376cc07f..bb934a70c28 100644 --- a/docs/zh/connector-v2/source/FtpFile.md +++ b/docs/zh/connector-v2/source/FtpFile.md @@ -66,6 +66,8 @@ import ChangeLog from '../changelog/connector-file-ftp.md'; | archive_compress_codec | string | 否 | none | | encoding | string | 否 | UTF-8 | | null_format | string | 否 | - | +| binary_chunk_size | int | 否 | 1024 | +| binary_complete_file_mode | boolean | 否 | false | | common-options | | 否 | - | ### host [string] @@ -360,6 +362,18 @@ SeaTunnel 将从源文件中跳过前 2 行。 例如:`\N` +### binary_chunk_size [int] + +仅在 file_format_type 为 binary 时使用。 + +读取二进制文件的块大小(以字节为单位)。默认为 1024 字节。较大的值可能会提高大文件的性能,但会使用更多内存。 + +### binary_complete_file_mode [boolean] + +仅在 file_format_type 为 binary 时使用。 + +是否将完整文件作为单个块读取,而不是分割成块。启用时,整个文件内容将一次性读入内存。默认为 false。 + ### 通用选项 源插件的通用参数,详情请参考 [源通用选项](../source-common-options.md)。 @@ -462,6 +476,8 @@ source { password = tianchao path = "/seatunnel/read/binary/" file_format_type = "binary" + binary_chunk_size = 2048 + binary_complete_file_mode = false } } sink { diff --git a/docs/zh/connector-v2/source/HdfsFile.md b/docs/zh/connector-v2/source/HdfsFile.md index cd06a3dbd9a..37eb6140e8e 100644 --- a/docs/zh/connector-v2/source/HdfsFile.md +++ b/docs/zh/connector-v2/source/HdfsFile.md @@ -1,8 +1,8 @@ import ChangeLog from '../changelog/connector-file-hadoop.md'; -# Hdfs文件 +# HdfsFile -> Hdfs文件 数据源连接器 +> Hdfs 文件数据源连接器 ## 支持的引擎 @@ -16,70 +16,77 @@ import ChangeLog from '../changelog/connector-file-hadoop.md'; - [ ] [流处理](../../concept/connector-v2-features.md) - [x] [精确一次](../../concept/connector-v2-features.md) -在一次 pollNext 调用中读取分片中的所有数据。将读取的分片保存在快照中。 +在 pollNext 调用中读取分片中的所有数据。读取的分片将保存在快照中。 - [x] [列投影](../../concept/connector-v2-features.md) - [x] [并行度](../../concept/connector-v2-features.md) -- [ ] [支持用户定义的分片](../../concept/connector-v2-features.md) -- [x] 文件格式 - - [x] 文本 - - [x] CSV - - [x] Parquet - - [x] ORC - - [x] JSON - - [x] Excel +- [ ] [支持用户定义分片](../../concept/connector-v2-features.md) +- [x] 文件格式类型 + - [x] text + - [x] csv + - [x] parquet + - [x] orc + - [x] json + - [x] excel + - [x] xml + - [x] binary ## 描述 -从Hdfs文件系统中读取数据。 +从 hdfs 文件系统读取数据。 ## 支持的数据源信息 -| 数据源 | 支持的版本 | +| 数据源 | 支持的版本 | |--------|------------------| -| Hdfs文件 | hadoop 2.x 和 3.x | - -## 源选项 - -| 名称 | 类型 | 是否必须 | 默认值 | 描述 | -|---------------------------|---------|------|----------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| path | string | 是 | - | 源文件路径。 | -| file_format_type | string | 是 | - | 我们支持以下文件类型:`text` `json` `csv` `orc` `parquet` `excel`。请注意,最终文件名将以文件格式的后缀结束,文本文件的后缀是 `txt`。 | -| fs.defaultFS | string | 是 | - | 以 `hdfs://` 开头的 Hadoop 集群地址,例如:`hdfs://hadoopcluster`。 | -| read_columns | list | 否 | - | 数据源的读取列列表,用户可以使用它实现字段投影。支持的文件类型的列投影如下所示:[text,json,csv,orc,parquet,excel]。提示:如果用户在读取 `text` `json` `csv` 文件时想要使用此功能,必须配置 schema 选项。 | -| hdfs_site_path | string | 否 | - | `hdfs-site.xml` 的路径,用于加载 namenodes 的 ha 配置。 | -| delimiter/field_delimiter | string | 否 | \001 | 字段分隔符,用于告诉连接器在读取文本文件时如何切分字段。默认 `\001`,与 Hive 的默认分隔符相同。 | -| parse_partition_from_path | boolean | 否 | true | 控制是否从文件路径中解析分区键和值。例如,如果您从路径 `hdfs://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26` 读取文件,则来自文件的每条记录数据将添加这两个字段:[name:tyrantlucifer,age:26]。提示:不要在 schema 选项中定义分区字段。 | -| date_format | string | 否 | yyyy-MM-dd | 日期类型格式,用于告诉连接器如何将字符串转换为日期,支持的格式如下:`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd`,默认 `yyyy-MM-dd`。日期时间类型格式,用于告诉连接器如何将字符串转换为日期时间,支持的格式如下:`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` `yyyyMMddHHmmss`,默认 `yyyy-MM-dd HH:mm:ss`。 | -| time_format | string | 否 | HH:mm:ss | 时间类型格式,用于告诉连接器如何将字符串转换为时间,支持的格式如下:`HH:mm:ss` `HH:mm:ss.SSS`,默认 `HH:mm:ss`。 | -| remote_user | string | 否 | - | 用于连接 Hadoop 的登录用户。它旨在用于 RPC 中的远程用户,不会有任何凭据。 | -| krb5_path | string | 否 | /etc/krb5.conf | kerberos 的 krb5 路径。 | -| kerberos_principal | string | 否 | - | kerberos 的 principal。 | -| kerberos_keytab_path | string | 否 | - | kerberos 的 keytab 路径。 | -| skip_header_row_number | long | 否 | 0 | 跳过前几行,但仅适用于 txt 和 csv。例如,设置如下:`skip_header_row_number = 2`。然后 Seatunnel 将跳过源文件中的前两行。 | -| file_filter_pattern | string | 否 | - | 过滤模式,用于过滤文件。 | -| filename_extension | string | 否 | - | 过滤文件扩展名, 用于过滤出指定扩展名的文件。 例如 `csv` `.txt` `json` `.xml`。 | -| null_format | string | 否 | - | 定义哪些字符串可以表示为 null,但仅适用于 txt 和 csv. 例如: `\N` | -| schema | config | 否 | - | 上游数据的模式字段。 | -| sheet_name | string | 否 | - | 读取工作簿的表格,仅在文件格式为 excel 时使用。 | -| compress_codec | string | 否 | none | 文件的压缩编解码器。 | -| common-options | | 否 | - | 源插件通用参数,请参阅 [源通用选项](../../../en/connector-v2/source-common-options.md) 获取详细信息。 | -| csv_use_header_line | boolean | 否 | false | 是否使用标题行来解析文件,仅当 file_format 为 `csv` 且文件包含与 RFC 4180 匹配的标题行时使用 | - +| HdfsFile | hadoop 2.x 和 3.x | + +## 数据源选项 + +| 名称 | 类型 | 是否必须 | 默认值 | 描述 | +|---------------------------|---------|----------|---------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| path | string | 是 | - | 源文件路径。 | +| file_format_type | string | 是 | - | 我们支持以下文件类型:`text` `csv` `parquet` `orc` `json` `excel` `xml` `binary`。请注意,最终文件名将以文件格式的后缀结束,文本文件的后缀是 `txt`。 | +| fs.defaultFS | string | 是 | - | 以 `hdfs://` 开头的 hadoop 集群地址,例如:`hdfs://hadoopcluster` | +| read_columns | list | 否 | - | 数据源的读取列列表,用户可以使用它来实现字段投影。支持列投影的文件类型如下所示:[text,json,csv,orc,parquet,excel,xml]。提示:如果用户想在读取 `text` `json` `csv` 文件时使用此功能,必须配置 schema 选项。 | +| hdfs_site_path | string | 否 | - | `hdfs-site.xml` 的路径,用于加载 namenodes 的 ha 配置 | +| delimiter/field_delimiter | string | 否 | \001 | 字段分隔符,用于告诉连接器在读取文本文件时如何分割字段。默认 `\001`,与 hive 的默认分隔符相同 | +| parse_partition_from_path | boolean | 否 | true | 控制是否从文件路径解析分区键和值。例如,如果您从路径 `hdfs://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26` 读取文件。文件中的每条记录数据都将添加这两个字段:[name:tyrantlucifer,age:26]。提示:不要在 schema 选项中定义分区字段。 | +| date_format | string | 否 | yyyy-MM-dd | 日期类型格式,用于告诉连接器如何将字符串转换为日期,支持以下格式:`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd` 默认 `yyyy-MM-dd`。日期类型格式,用于告诉连接器如何将字符串转换为日期,支持以下格式:`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd` 默认 `yyyy-MM-dd` | +| datetime_format | string | 否 | yyyy-MM-dd HH:mm:ss | 日期时间类型格式,用于告诉连接器如何将字符串转换为日期时间,支持以下格式:`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` `yyyyMMddHHmmss`。默认 `yyyy-MM-dd HH:mm:ss` | +| time_format | string | 否 | HH:mm:ss | 时间类型格式,用于告诉连接器如何将字符串转换为时间,支持以下格式:`HH:mm:ss` `HH:mm:ss.SSS`。默认 `HH:mm:ss` | +| remote_user | string | 否 | - | 用于连接到 hadoop 登录名的登录用户。它旨在用于 RPC 中的远程用户,不会有任何凭据。 | +| krb5_path | string | 否 | /etc/krb5.conf | kerberos 的 krb5 路径 | +| kerberos_principal | string | 否 | - | kerberos 的主体 | +| kerberos_keytab_path | string | 否 | - | kerberos 的 keytab 路径 | +| skip_header_row_number | long | 否 | 0 | 跳过前几行,但仅适用于 txt 和 csv。例如,设置如下:`skip_header_row_number = 2`。然后 Seatunnel 将跳过源文件的前 2 行 | +| schema | config | 否 | - | 上游数据的 schema 字段 | +| sheet_name | string | 否 | - | 读取工作簿的工作表,仅在 file_format 为 excel 时使用。 | +| xml_row_tag | string | 否 | - | 指定 XML 文件中数据行的标签名称,仅在 file_format 为 xml 时使用。 | +| xml_use_attr_format | boolean | 否 | - | 指定是否使用标签属性格式处理数据,仅在 file_format 为 xml 时使用。 | +| csv_use_header_line | boolean | 否 | false | 是否使用标题行解析文件,仅在 file_format 为 `csv` 且文件包含符合 RFC 4180 的标题行时使用 | +| file_filter_pattern | string | 否 | | 过滤模式,用于过滤文件。 | +| filename_extension | string | 否 | - | 过滤文件扩展名,用于过滤具有特定扩展名的文件。示例:`csv` `.txt` `json` `.xml`。 | +| compress_codec | string | 否 | none | 文件的压缩编解码器 | +| archive_compress_codec | string | 否 | none | +| encoding | string | 否 | UTF-8 | | +| null_format | string | 否 | - | 仅在 file_format_type 为 text 时使用。null_format 定义哪些字符串可以表示为 null。例如:`\N` | +| binary_chunk_size | int | 否 | 1024 | 仅在 file_format_type 为 binary 时使用。读取二进制文件的块大小(以字节为单位)。默认为 1024 字节。较大的值可能会提高大文件的性能,但会使用更多内存。 | +| binary_complete_file_mode | boolean | 否 | false | 仅在 file_format_type 为 binary 时使用。是否将完整文件作为单个块读取,而不是分割成块。启用时,整个文件内容将一次性读入内存。默认为 false。 | +| common-options | | 否 | - | 数据源插件通用参数,请参阅 [数据源通用选项](../source-common-options.md) 了解详情。 | ### delimiter/field_delimiter [string] -**delimiter** 参数在版本 2.3.5 后将被弃用,请改用 **field_delimiter**。 +**delimiter** 参数将在 2.3.5 版本后弃用,请使用 **field_delimiter** 代替。 ### file_filter_pattern [string] 过滤模式,用于过滤文件。 -这个过滤规则遵循正则表达式. 关于详情,请参考 https://en.wikipedia.org/wiki/Regular_expression 学习 - -这里是一些例子. +该模式遵循标准正则表达式。详情请参考 https://en.wikipedia.org/wiki/Regular_expression。 +以下是一些示例。 -文件清单: +文件结构示例: ``` /data/seatunnel/20241001/report.txt /data/seatunnel/20241007/abch202410.csv @@ -87,38 +94,38 @@ import ChangeLog from '../changelog/connector-file-hadoop.md'; /data/seatunnel/20241005/old_data.csv /data/seatunnel/20241012/logo.png ``` -匹配规则: +匹配规则示例: -**例子 1**: *匹配所有txt为后缀名的文件*,匹配正则为: +**示例 1**:*匹配所有 .txt 文件*,正则表达式: ``` /data/seatunnel/20241001/.*\.txt ``` -匹配的结果是: +此示例匹配的结果是: ``` /data/seatunnel/20241001/report.txt ``` -**例子 2**: *匹配所有文件名以abc开头的文件*,匹配正则为: +**示例 2**:*匹配所有以 abc 开头的文件*,正则表达式: ``` /data/seatunnel/20241002/abc.* ``` -匹配的结果是: +此示例匹配的结果是: ``` /data/seatunnel/20241007/abch202410.csv /data/seatunnel/20241002/abcg202410.csv ``` -**例子 3**: *匹配所有文件名以abc开头,并且文件第四个字母是 h 或者 g 的文件*, 匹配正则为: +**示例 3**:*匹配所有以 abc 开头,且第四个字符是 h 或 g 的文件*,正则表达式: ``` /data/seatunnel/20241007/abc[h,g].* ``` -匹配的结果是: +此示例匹配的结果是: ``` /data/seatunnel/20241007/abch202410.csv ``` -**例子 4**: *匹配所有文件夹第三级以 202410 开头并且文件后缀名是.csv的文件*, 匹配正则为: +**示例 4**:*匹配以 202410 开头的第三级文件夹和以 .csv 结尾的文件*,正则表达式: ``` /data/seatunnel/202410\d*/.*\.csv ``` -匹配的结果是: +此示例匹配的结果是: ``` /data/seatunnel/20241007/abch202410.csv /data/seatunnel/20241002/abcg202410.csv @@ -127,25 +134,54 @@ import ChangeLog from '../changelog/connector-file-hadoop.md'; ### compress_codec [string] -文件的压缩编解码器及支持的详细信息如下所示: +文件的压缩编解码器及其支持的详细信息如下所示: -- txt:`lzo` `none` -- json:`lzo` `none` -- csv:`lzo` `none` -- orc/parquet: +- txt: `lzo` `none` +- json: `lzo` `none` +- csv: `lzo` `none` +- orc/parquet: 自动识别压缩类型,无需额外设置。 -### 提示 +### archive_compress_codec [string] + +归档文件的压缩编解码器及其支持的详细信息如下所示: + +| archive_compress_codec | file_format | archive_compress_suffix | +|------------------------|-------------------|-------------------------| +| ZIP | txt,json,excel,xml | .zip | +| TAR | txt,json,excel,xml | .tar | +| TAR_GZ | txt,json,excel,xml | .tar.gz | +| GZ | txt,json,excel,xml | .gz | +| NONE | all | .* | + +注意:gz 压缩的 excel 文件需要压缩原始文件或指定文件后缀,例如 e2e.xls ->e2e_test.xls.gz + +### encoding [string] + +仅在 file_format_type 为 json,text,csv,xml 时使用。 +要读取的文件的编码。此参数将由 `Charset.forName(encoding)` 解析。 -> 如果您使用 spark/flink,为了 +### binary_chunk_size [int] + +仅在 file_format_type 为 binary 时使用。 + +读取二进制文件的块大小(以字节为单位)。默认为 1024 字节。较大的值可能会提高大文件的性能,但会使用更多内存。 + +### binary_complete_file_mode [boolean] + +仅在 file_format_type 为 binary 时使用。 + +是否将完整文件作为单个块读取,而不是分割成块。启用时,整个文件内容将一次性读入内存。默认为 false。 + +### 提示 -使用此连接器,您必须确保您的 spark/flink 集群已经集成了 hadoop。测试过的 hadoop 版本是 2.x。如果您使用 SeaTunnel Engine,则在下载和安装 SeaTunnel Engine 时会自动集成 hadoop jar。您可以检查 `${SEATUNNEL_HOME}/lib` 下的 jar 包来确认这一点。 +> 如果您使用 spark/flink,为了使用此连接器,您必须确保您的 spark/flink 集群已经集成了 hadoop。测试过的 hadoop 版本是 2.x。如果您使用 SeaTunnel Engine,则在下载和安装 SeaTunnel Engine 时会自动集成 hadoop jar。您可以检查 `${SEATUNNEL_HOME}/lib` 下的 jar 包来确认这一点。 ## 任务示例 ### 简单示例 -> 此示例定义了一个 SeaTunnel 同步任务,从 Hdfs 中读取数据并将其发送到 Hdfs。 +> 此示例定义了一个 SeaTunnel 同步任务,从 Hdfs 读取数据并将其发送到 Hdfs。 ``` # 定义运行时环境 @@ -163,15 +199,15 @@ source { } } path = "/apps/hive/demo/student" - type = "json" + file_format_type = "json" fs.defaultFS = "hdfs://namenode001" } - # 如果您想获取有关如何配置 seatunnel 和查看源插件完整列表的更多信息, + # 如果您想获取有关如何配置 seatunnel 的更多信息和查看完整的数据源插件列表, # 请访问 https://seatunnel.apache.org/docs/connector-v2/source } transform { - # 如果您想获取有关如何配置 seatunnel 和查看转换插件完整列表的更多信息, + # 如果您想获取有关如何配置 seatunnel 的更多信息和查看完整的转换插件列表, # 请访问 https://seatunnel.apache.org/docs/transform-v2 } @@ -179,14 +215,14 @@ sink { HdfsFile { fs.defaultFS = "hdfs://hadoopcluster" path = "/tmp/hive/warehouse/test2" - file_format = "orc" + file_format_type = "orc" } - # 如果您想获取有关如何配置 seatunnel 和查看接收器插件完整列表的更多信息, + # 如果您想获取有关如何配置 seatunnel 的更多信息和查看完整的接收器插件列表, # 请访问 https://seatunnel.apache.org/docs/connector-v2/sink } ``` -### Filter File +### 过滤文件 ```hocon env { @@ -199,6 +235,7 @@ source { path = "/apps/hive/demo/student" file_format_type = "json" fs.defaultFS = "hdfs://namenode001" + // 文件示例 abcD2024.csv file_filter_pattern = "abc[DX]*.*" } } diff --git a/docs/zh/connector-v2/source/LocalFile.md b/docs/zh/connector-v2/source/LocalFile.md new file mode 100644 index 00000000000..9b00f627c3e --- /dev/null +++ b/docs/zh/connector-v2/source/LocalFile.md @@ -0,0 +1,534 @@ +import ChangeLog from '../changelog/connector-file-local.md'; + +# LocalFile + +> 本地文件数据源连接器 + +## 支持的引擎 + +> Spark
+> Flink
+> SeaTunnel Zeta
+ +## 主要特性 + +- [x] [批处理](../../concept/connector-v2-features.md) +- [ ] [流处理](../../concept/connector-v2-features.md) +- [x] [精确一次](../../concept/connector-v2-features.md) + +在 pollNext 调用中读取分片中的所有数据。读取的分片将保存在快照中。 + +- [ ] [列投影](../../concept/connector-v2-features.md) +- [x] [并行度](../../concept/connector-v2-features.md) +- [ ] [支持用户定义分片](../../concept/connector-v2-features.md) +- [x] 文件格式类型 + - [x] text + - [x] csv + - [x] parquet + - [x] orc + - [x] json + - [x] excel + - [x] xml + - [x] binary + +## 描述 + +从本地文件系统读取数据。 + +:::tip + +如果您使用 spark/flink,为了使用此连接器,您必须确保您的 spark/flink 集群已经集成了 hadoop。测试过的 hadoop 版本是 2.x。 + +如果您使用 SeaTunnel Engine,则在下载和安装 SeaTunnel Engine 时会自动集成 hadoop jar。您可以检查 `${SEATUNNEL_HOME}/lib` 下的 jar 包来确认这一点。 + +::: + +## 选项 + +| 名称 | 类型 | 是否必须 | 默认值 | +|---------------------------|---------|----------|--------------------------------------| +| path | string | 是 | - | +| file_format_type | string | 是 | - | +| read_columns | list | 否 | - | +| delimiter/field_delimiter | string | 否 | \001 | +| parse_partition_from_path | boolean | 否 | true | +| date_format | string | 否 | yyyy-MM-dd | +| datetime_format | string | 否 | yyyy-MM-dd HH:mm:ss | +| time_format | string | 否 | HH:mm:ss | +| skip_header_row_number | long | 否 | 0 | +| schema | config | 否 | - | +| sheet_name | string | 否 | - | +| excel_engine | string | 否 | POI | +| xml_row_tag | string | 否 | - | +| xml_use_attr_format | boolean | 否 | - | +| csv_use_header_line | boolean | 否 | false | +| file_filter_pattern | string | 否 | - | +| filename_extension | string | 否 | - | +| compress_codec | string | 否 | none | +| archive_compress_codec | string | 否 | none | +| encoding | string | 否 | UTF-8 | +| null_format | string | 否 | - | +| binary_chunk_size | int | 否 | 1024 | +| binary_complete_file_mode | boolean | 否 | false | +| common-options | | 否 | - | +| tables_configs | list | 否 | 用于定义多表任务 | + +### path [string] + +源文件路径。 + +### file_format_type [string] + +文件类型,支持以下文件类型: + +`text` `csv` `parquet` `orc` `json` `excel` `xml` `binary` + +如果您将文件类型指定为 `json`,您还应该指定 schema 选项来告诉连接器如何将数据解析为您想要的行。 + +例如: + +上游数据如下: + +```json + +{"code": 200, "data": "get success", "success": true} + +``` + +您也可以在一个文件中保存多条数据并用换行符分割: + +```json lines + +{"code": 200, "data": "get success", "success": true} +{"code": 300, "data": "get failed", "success": false} + +``` + +您应该按如下方式指定 schema: + +```hocon + +schema { + fields { + code = int + data = string + success = boolean + } +} + +``` + +连接器将生成如下数据: + +| code | data | success | +|------|-------------|---------| +| 200 | get success | true | + +如果您将文件类型指定为 `parquet` `orc`,则不需要 schema 选项,连接器可以自动找到上游数据的 schema。 + +如果您将文件类型指定为 `text` `csv`,您可以选择指定或不指定 schema 信息。 + +例如,上游数据如下: + +```text + +tyrantlucifer#26#male + +``` + +如果您不指定数据 schema,连接器将把上游数据视为如下: + +| content | +|-----------------------| +| tyrantlucifer#26#male | + +如果您指定数据 schema,除了 CSV 文件类型外,您还应该指定选项 `field_delimiter` + +您应该按如下方式指定 schema 和分隔符: + +```hocon + +field_delimiter = "#" +schema { + fields { + name = string + age = int + gender = string + } +} + +``` + +连接器将生成如下数据: + +| name | age | gender | +|---------------|-----|--------| +| tyrantlucifer | 26 | male | + +如果您将文件类型指定为 `binary`,SeaTunnel 可以同步任何格式的文件, +例如压缩包、图片等。简而言之,任何文件都可以同步到目标位置。 +在此要求下,您需要确保源和接收器同时使用 `binary` 格式进行文件同步。 +您可以在下面的示例中找到具体用法。 + +### read_columns [list] + +数据源的读取列列表,用户可以使用它来实现字段投影。 + +### delimiter/field_delimiter [string] + +**delimiter** 参数将在 2.3.5 版本后弃用,请使用 **field_delimiter** 代替。 + +仅在 file_format 为 text 时需要配置。 + +字段分隔符,用于告诉连接器如何分割字段。 + +默认 `\001`,与 hive 的默认分隔符相同 + +### parse_partition_from_path [boolean] + +控制是否从文件路径解析分区键和值 + +例如,如果您从路径 `file://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26` 读取文件 + +文件中的每条记录数据都将添加这两个字段: + +| name | age | +|---------------|-----| +| tyrantlucifer | 26 | + +提示:**不要在 schema 选项中定义分区字段** + +### date_format [string] + +日期类型格式,用于告诉连接器如何将字符串转换为日期,支持以下格式: + +`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd` + +默认 `yyyy-MM-dd` + +### datetime_format [string] + +日期时间类型格式,用于告诉连接器如何将字符串转换为日期时间,支持以下格式: + +`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` `yyyyMMddHHmmss` + +默认 `yyyy-MM-dd HH:mm:ss` + +### time_format [string] + +时间类型格式,用于告诉连接器如何将字符串转换为时间,支持以下格式: + +`HH:mm:ss` `HH:mm:ss.SSS` + +默认 `HH:mm:ss` + +### skip_header_row_number [long] + +跳过前几行,但仅适用于 txt 和 csv。 + +例如,设置如下: + +`skip_header_row_number = 2` + +然后 SeaTunnel 将跳过源文件的前 2 行 + +### schema [config] + +仅在 file_format_type 为 text、json、excel、xml 或 csv(或其他我们无法从元数据读取 schema 的格式)时需要配置。 + +#### fields [Config] + +上游数据的 schema 信息。 + +### sheet_name [string] + +仅在 file_format 为 excel 时需要配置。 + +读取工作簿的工作表。 + +### excel_engine [string] + +仅在 file_format 为 excel 时需要配置。 + +支持以下文件类型: +`POI` `EasyExcel` + +默认的 excel 读取引擎是 POI,但当读取超过 65,000 行的 Excel 时,POI 容易导致内存溢出,因此您可以切换到 EasyExcel 作为读取引擎。 + + +### xml_row_tag [string] + +仅在 file_format 为 xml 时需要配置。 + +指定 XML 文件中数据行的标签名称。 + +### xml_use_attr_format [boolean] + +仅在 file_format 为 xml 时需要配置。 + +指定是否使用标签属性格式处理数据。 + +### csv_use_header_line [boolean] + +是否使用标题行解析文件,仅在 file_format 为 `csv` 且文件包含符合 RFC 4180 的标题行时使用 + +### file_filter_pattern [string] + +过滤模式,用于过滤文件。 + +该模式遵循标准正则表达式。详情请参考 https://en.wikipedia.org/wiki/Regular_expression。 +以下是一些示例。 + +文件结构示例: +``` +/data/seatunnel/20241001/report.txt +/data/seatunnel/20241007/abch202410.csv +/data/seatunnel/20241002/abcg202410.csv +/data/seatunnel/20241005/old_data.csv +/data/seatunnel/20241012/logo.png +``` +匹配规则示例: + +**示例 1**:*匹配所有 .txt 文件*,正则表达式: +``` +/data/seatunnel/20241001/.*\.txt +``` +此示例匹配的结果是: +``` +/data/seatunnel/20241001/report.txt +``` +**示例 2**:*匹配所有以 abc 开头的文件*,正则表达式: +``` +/data/seatunnel/20241002/abc.* +``` +此示例匹配的结果是: +``` +/data/seatunnel/20241007/abch202410.csv +/data/seatunnel/20241002/abcg202410.csv +``` +**示例 3**:*匹配所有以 abc 开头,且第四个字符是 h 或 g 的文件*,正则表达式: +``` +/data/seatunnel/20241007/abc[h,g].* +``` +此示例匹配的结果是: +``` +/data/seatunnel/20241007/abch202410.csv +``` +**示例 4**:*匹配以 202410 开头的第三级文件夹和以 .csv 结尾的文件*,正则表达式: +``` +/data/seatunnel/202410\d*/.*\.csv +``` +此示例匹配的结果是: +``` +/data/seatunnel/20241007/abch202410.csv +/data/seatunnel/20241002/abcg202410.csv +/data/seatunnel/20241005/old_data.csv +``` + +### filename_extension [string] + +过滤文件扩展名,用于过滤具有特定扩展名的文件。示例:`csv` `.txt` `json` `.xml`。 + +### compress_codec [string] + +文件的压缩编解码器及其支持的详细信息如下所示: + +- txt: `lzo` `none` +- json: `lzo` `none` +- csv: `lzo` `none` +- orc/parquet: + 自动识别压缩类型,无需额外设置。 + +### archive_compress_codec [string] + +归档文件的压缩编解码器及其支持的详细信息如下所示: + +| archive_compress_codec | file_format | archive_compress_suffix | +|------------------------|--------------------|-------------------------| +| ZIP | txt,json,excel,xml | .zip | +| TAR | txt,json,excel,xml | .tar | +| TAR_GZ | txt,json,excel,xml | .tar.gz | +| GZ | txt,json,excel,xml | .gz | +| NONE | all | .* | + +注意:gz 压缩的 excel 文件需要压缩原始文件或指定文件后缀,例如 e2e.xls ->e2e_test.xls.gz + +### encoding [string] + +仅在 file_format_type 为 json,text,csv,xml 时使用。 +要读取的文件的编码。此参数将由 `Charset.forName(encoding)` 解析。 + +### null_format [string] + +仅在 file_format_type 为 text 时使用。 +null_format 定义哪些字符串可以表示为 null。 + +例如:`\N` + +### binary_chunk_size [int] + +仅在 file_format_type 为 binary 时使用。 + +读取二进制文件的块大小(以字节为单位)。默认为 1024 字节。较大的值可能会提高大文件的性能,但会使用更多内存。 + +### binary_complete_file_mode [boolean] + +仅在 file_format_type 为 binary 时使用。 + +是否将完整文件作为单个块读取,而不是分割成块。启用时,整个文件内容将一次性读入内存。默认为 false。 + +### 通用选项 + +数据源插件通用参数,请参阅 [数据源通用选项](../source-common-options.md) 了解详情 + +### tables_configs + +用于定义多表任务,当您有多个表要读取时,可以使用此选项定义多个表。 + +## 示例 + +### 单表 + +```hocon + +LocalFile { + path = "/apps/hive/demo/student" + file_format_type = "parquet" +} + +``` + +```hocon + +LocalFile { + schema { + fields { + name = string + age = int + } + } + path = "/apps/hive/demo/student" + file_format_type = "json" +} + +``` + +对于带有 `encoding` 的 json、text 或 csv 文件格式 + +```hocon + +LocalFile { + path = "/tmp/hive/warehouse/test2" + file_format_type = "text" + encoding = "gbk" +} + +``` + +### 多表 + +```hocon + +LocalFile { + tables_configs = [ + { + schema { + table = "student" + } + path = "/apps/hive/demo/student" + file_format_type = "parquet" + }, + { + schema { + table = "teacher" + } + path = "/apps/hive/demo/teacher" + file_format_type = "parquet" + } + ] +} + +``` + +```hocon + +LocalFile { + tables_configs = [ + { + schema { + fields { + name = string + age = int + } + } + path = "/apps/hive/demo/student" + file_format_type = "json" + }, + { + schema { + fields { + name = string + age = int + } + } + path = "/apps/hive/demo/teacher" + file_format_type = "json" + } +} + +``` + +### 传输二进制文件 + +```hocon + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + LocalFile { + path = "/seatunnel/read/binary/" + file_format_type = "binary" + binary_chunk_size = 2048 + binary_complete_file_mode = false + } +} +sink { + // 您可以将本地文件传输到 s3/hdfs/oss 等。 + LocalFile { + path = "/seatunnel/read/binary2/" + file_format_type = "binary" + } +} + +``` + +### 过滤文件 + +```hocon +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + LocalFile { + path = "/data/seatunnel/" + file_format_type = "csv" + skip_header_row_number = 1 + // 文件示例 abcD2024.csv + file_filter_pattern = "abc[DX]*.*" + } +} + +sink { + Console { + } +} +``` + +## 变更日志 + + diff --git a/docs/zh/connector-v2/source/OssFile.md b/docs/zh/connector-v2/source/OssFile.md new file mode 100644 index 00000000000..27e55124d92 --- /dev/null +++ b/docs/zh/connector-v2/source/OssFile.md @@ -0,0 +1,560 @@ +import ChangeLog from '../changelog/connector-file-oss.md'; + +# OssFile + +> Oss文件数据源连接器 + +## 支持的引擎 + +> Spark
+> Flink
+> SeaTunnel Zeta
+ +## 使用依赖 + +### 对于Spark/Flink引擎 + +1. 您必须确保您的spark/flink集群已经集成了hadoop。测试过的hadoop版本是2.x。 +2. 您必须确保`hadoop-aliyun-xx.jar`、`aliyun-sdk-oss-xx.jar`和`jdom-xx.jar`在`${SEATUNNEL_HOME}/plugins/`目录中,并且`hadoop-aliyun` jar的版本需要与您在spark/flink中使用的hadoop版本相等,`aliyun-sdk-oss-xx.jar`和`jdom-xx.jar`版本需要是与`hadoop-aliyun`版本对应的版本。例如:`hadoop-aliyun-3.1.4.jar`依赖`aliyun-sdk-oss-3.4.1.jar`和`jdom-1.1.jar`。 + +### 对于SeaTunnel Zeta引擎 + +1. 您必须确保`seatunnel-hadoop3-3.1.4-uber.jar`、`aliyun-sdk-oss-3.4.1.jar`、`hadoop-aliyun-3.1.4.jar`和`jdom-1.1.jar`在`${SEATUNNEL_HOME}/lib/`目录中。 + +## 主要特性 + +- [x] [批处理](../../concept/connector-v2-features.md) +- [ ] [流处理](../../concept/connector-v2-features.md) +- [x] [精确一次](../../concept/connector-v2-features.md) + +在一次pollNext调用中读取分片中的所有数据。将读取的分片保存在快照中。 + +- [x] [列投影](../../concept/connector-v2-features.md) +- [x] [并行度](../../concept/connector-v2-features.md) +- [ ] [支持用户定义的分片](../../concept/connector-v2-features.md) +- [x] 文件格式类型 + - [x] text + - [x] csv + - [x] parquet + - [x] orc + - [x] json + - [x] excel + - [x] xml + - [x] binary + +## 数据类型映射 + +数据类型映射与正在读取的文件类型相关,我们支持以下文件类型: + +`text` `csv` `parquet` `orc` `json` `excel` `xml` + +### JSON文件类型 + +如果您将文件类型指定为`json`,您还应该指定schema选项来告诉连接器如何将数据解析为您想要的行。 + +例如: + +上游数据如下: + +```json + +{"code": 200, "data": "get success", "success": true} + +``` + +您也可以在一个文件中保存多条数据,并用换行符分隔: + +```json lines + +{"code": 200, "data": "get success", "success": true} +{"code": 300, "data": "get failed", "success": false} + +``` + +您应该按如下方式指定schema: + +```hocon + +schema { + fields { + code = int + data = string + success = boolean + } +} + +``` + +连接器将生成如下数据: + +| code | data | success | +|------|-------------|---------| +| 200 | get success | true | + +### 文本或CSV文件类型 + +如果您将`file_format_type`设置为`text`、`excel`、`csv`、`xml`。那么需要设置`schema`字段来告诉连接器如何将数据解析为行。 + +如果您设置了`schema`字段,您还应该设置选项`field_delimiter`,除非`file_format_type`是`csv`、`xml`、`excel` + +您可以按如下方式设置schema和分隔符: + +```hocon + +field_delimiter = "#" +schema { + fields { + name = string + age = int + gender = string + } +} + +``` + +连接器将生成如下数据: + +| name | age | gender | +|---------------|-----|--------| +| tyrantlucifer | 26 | male | + +### Orc文件类型 + +如果您将文件类型指定为`parquet` `orc`,则不需要schema选项,连接器可以自动找到上游数据的schema。 + +| Orc数据类型 | SeaTunnel数据类型 | +|----------------------------------|----------------------------------------------------------------| +| BOOLEAN | BOOLEAN | +| INT | INT | +| BYTE | BYTE | +| SHORT | SHORT | +| LONG | LONG | +| FLOAT | FLOAT | +| DOUBLE | DOUBLE | +| BINARY | BINARY | +| STRING
VARCHAR
CHAR
| STRING | +| DATE | LOCAL_DATE_TYPE | +| TIMESTAMP | LOCAL_DATE_TIME_TYPE | +| DECIMAL | DECIMAL | +| LIST(STRING) | STRING_ARRAY_TYPE | +| LIST(BOOLEAN) | BOOLEAN_ARRAY_TYPE | +| LIST(TINYINT) | BYTE_ARRAY_TYPE | +| LIST(SMALLINT) | SHORT_ARRAY_TYPE | +| LIST(INT) | INT_ARRAY_TYPE | +| LIST(BIGINT) | LONG_ARRAY_TYPE | +| LIST(FLOAT) | FLOAT_ARRAY_TYPE | +| LIST(DOUBLE) | DOUBLE_ARRAY_TYPE | +| Map | MapType,K和V的类型将转换为SeaTunnel类型 | +| STRUCT | SeaTunnelRowType | + +### Parquet文件类型 + +如果您将文件类型指定为`parquet` `orc`,则不需要schema选项,连接器可以自动找到上游数据的schema。 + +| Parquet数据类型 | SeaTunnel数据类型 | +|----------------------|----------------------------------------------------------------| +| INT_8 | BYTE | +| INT_16 | SHORT | +| DATE | DATE | +| TIMESTAMP_MILLIS | TIMESTAMP | +| INT64 | LONG | +| INT96 | TIMESTAMP | +| BINARY | BYTES | +| FLOAT | FLOAT | +| DOUBLE | DOUBLE | +| BOOLEAN | BOOLEAN | +| FIXED_LEN_BYTE_ARRAY | TIMESTAMP
DECIMAL | +| DECIMAL | DECIMAL | +| LIST(STRING) | STRING_ARRAY_TYPE | +| LIST(BOOLEAN) | BOOLEAN_ARRAY_TYPE | +| LIST(TINYINT) | BYTE_ARRAY_TYPE | +| LIST(SMALLINT) | SHORT_ARRAY_TYPE | +| LIST(INT) | INT_ARRAY_TYPE | +| LIST(BIGINT) | LONG_ARRAY_TYPE | +| LIST(FLOAT) | FLOAT_ARRAY_TYPE | +| LIST(DOUBLE) | DOUBLE_ARRAY_TYPE | +| Map | MapType,K和V的类型将转换为SeaTunnel类型 | +| STRUCT | SeaTunnelRowType | + +## 选项 + +| 名称 | 类型 | 是否必需 | 默认值 | 描述 | +|---------------------------|---------|----------|---------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| path | string | 是 | - | 需要读取的Oss路径,可以有子路径,但子路径需要满足一定的格式要求。具体要求可以参考"parse_partition_from_path"选项 | +| file_format_type | string | 是 | - | 文件类型,支持以下文件类型:`text` `csv` `parquet` `orc` `json` `excel` `xml` `binary` | +| bucket | string | 是 | - | oss文件系统的bucket地址,例如:`oss://seatunnel-test`。 | +| endpoint | string | 是 | - | fs oss端点 | +| read_columns | list | 否 | - | 数据源的读取列列表,用户可以使用它来实现字段投影。支持列投影的文件类型如下所示:`text` `csv` `parquet` `orc` `json` `excel` `xml`。如果用户想在读取`text` `json` `csv`文件时使用此功能,必须配置"schema"选项。 | +| access_key | string | 否 | - | | +| access_secret | string | 否 | - | | +| delimiter | string | 否 | \001 | 字段分隔符,用于告诉连接器在读取文本文件时如何切分字段。默认`\001`,与hive的默认分隔符相同。 | +| parse_partition_from_path | boolean | 否 | true | 控制是否从文件路径解析分区键和值。例如,如果您从路径`oss://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26`读取文件。文件中的每条记录数据都将添加这两个字段:name="tyrantlucifer",age=16 | +| date_format | string | 否 | yyyy-MM-dd | 日期类型格式,用于告诉连接器如何将字符串转换为日期,支持以下格式:`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd`。默认`yyyy-MM-dd` | +| datetime_format | string | 否 | yyyy-MM-dd HH:mm:ss | 日期时间类型格式,用于告诉连接器如何将字符串转换为日期时间,支持以下格式:`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` `yyyyMMddHHmmss` | +| time_format | string | 否 | HH:mm:ss | 时间类型格式,用于告诉连接器如何将字符串转换为时间,支持以下格式:`HH:mm:ss` `HH:mm:ss.SSS` | +| filename_extension | string | 否 | - | 过滤文件名扩展名,用于过滤具有特定扩展名的文件。例如:`csv` `.txt` `json` `.xml`。 | +| skip_header_row_number | long | 否 | 0 | 跳过前几行,但仅适用于txt和csv。例如,设置如下:`skip_header_row_number = 2`。然后SeaTunnel将跳过源文件的前2行 | +| csv_use_header_line | boolean | 否 | false | 是否使用标题行来解析文件,仅在file_format为`csv`且文件包含符合RFC 4180的标题行时使用 | +| schema | config | 否 | - | 上游数据的schema。 | +| sheet_name | string | 否 | - | 读取工作簿的工作表,仅在file_format为excel时使用。 | +| xml_row_tag | string | 否 | - | 指定XML文件中数据行的标签名称,仅在file_format为xml时使用。 | +| xml_use_attr_format | boolean | 否 | - | 指定是否使用标签属性格式处理数据,仅在file_format为xml时使用。 | +| compress_codec | string | 否 | none | 文件使用的压缩编解码器。 | +| encoding | string | 否 | UTF-8 | +| null_format | string | 否 | - | 仅在file_format_type为text时使用。null_format用于定义哪些字符串可以表示为null。例如:`\N` | +| binary_chunk_size | int | 否 | 1024 | 仅在file_format_type为binary时使用。读取二进制文件的块大小(以字节为单位)。默认为1024字节。较大的值可能会提高大文件的性能,但会使用更多内存。 | +| binary_complete_file_mode | boolean | 否 | false | 仅在file_format_type为binary时使用。是否将完整文件作为单个块读取,而不是分割成块。启用时,整个文件内容将一次性读入内存。默认为false。 | +| file_filter_pattern | string | 否 | | 过滤模式,用于过滤文件。 | +| common-options | config | 否 | - | 数据源插件通用参数,请参考[数据源通用选项](../source-common-options.md)了解详情。 | + +### compress_codec [string] + +文件的压缩编解码器,支持的详细信息如下所示: + +- txt: `lzo` `none` +- json: `lzo` `none` +- csv: `lzo` `none` +- orc/parquet: + 自动识别压缩类型,无需额外设置。 + +### encoding [string] + +仅在file_format_type为json、text、csv、xml时使用。 +要读取的文件的编码。此参数将由`Charset.forName(encoding)`解析。 + +### binary_chunk_size [int] + +仅在file_format_type为binary时使用。 + +读取二进制文件的块大小(以字节为单位)。默认为1024字节。较大的值可能会提高大文件的性能,但会使用更多内存。 + +### binary_complete_file_mode [boolean] + +仅在file_format_type为binary时使用。 + +是否将完整文件作为单个块读取,而不是分割成块。启用时,整个文件内容将一次性读入内存。默认为false。 + +### file_filter_pattern [string] + +过滤模式,用于过滤文件。 + +该模式遵循标准正则表达式。详情请参考 https://en.wikipedia.org/wiki/Regular_expression。 +以下是一些示例。 + +文件结构示例: +``` +/data/seatunnel/20241001/report.txt +/data/seatunnel/20241007/abch202410.csv +/data/seatunnel/20241002/abcg202410.csv +/data/seatunnel/20241005/old_data.csv +/data/seatunnel/20241012/logo.png +``` +匹配规则示例: + +**示例1**:*匹配所有.txt文件*,正则表达式: +``` +/data/seatunnel/20241001/.*\.txt +``` +此示例匹配的结果是: +``` +/data/seatunnel/20241001/report.txt +``` +**示例2**:*匹配所有以abc开头的文件*,正则表达式: +``` +/data/seatunnel/20241002/abc.* +``` +此示例匹配的结果是: +``` +/data/seatunnel/20241007/abch202410.csv +/data/seatunnel/20241002/abcg202410.csv +``` +**示例3**:*匹配所有以abc开头,且第四个字符是h或g的文件*,正则表达式: +``` +/data/seatunnel/20241007/abc[h,g].* +``` +此示例匹配的结果是: +``` +/data/seatunnel/20241007/abch202410.csv +``` +**示例4**:*匹配以202410开头的第三级文件夹和以.csv结尾的文件*,正则表达式: +``` +/data/seatunnel/202410\d*/.*\.csv +``` +此示例匹配的结果是: +``` +/data/seatunnel/20241007/abch202410.csv +/data/seatunnel/20241002/abcg202410.csv +/data/seatunnel/20241005/old_data.csv +``` + +### schema [config] + +仅在file_format_type为text、json、excel、xml或csv时需要配置(或其他我们无法从元数据读取schema的格式)。 + +#### fields [Config] + +上游数据的schema。 + +## 如何创建Oss数据同步作业 + +以下示例演示如何创建从Oss读取数据并在本地客户端打印的数据同步作业: + +```bash +# 设置要执行的任务的基本配置 +env { + parallelism = 1 + job.mode = "BATCH" +} + +# 创建连接到Oss的数据源 +source { + OssFile { + path = "/seatunnel/orc" + bucket = "oss://tyrantlucifer-image-bed" + access_key = "xxxxxxxxxxxxxxxxx" + access_secret = "xxxxxxxxxxxxxxxxxxxxxx" + endpoint = "oss-cn-beijing.aliyuncs.com" + file_format_type = "orc" + } +} + +# 控制台打印读取的Oss数据 +sink { + Console { + } +} +``` + +```bash +# 设置要执行的任务的基本配置 +env { + parallelism = 1 + job.mode = "BATCH" +} + +# 创建连接到Oss的数据源 +source { + OssFile { + path = "/seatunnel/json" + bucket = "oss://tyrantlucifer-image-bed" + access_key = "xxxxxxxxxxxxxxxxx" + access_secret = "xxxxxxxxxxxxxxxxxxxxxx" + endpoint = "oss-cn-beijing.aliyuncs.com" + file_format_type = "json" + schema { + fields { + id = int + name = string + } + } + } +} + +# 控制台打印读取的Oss数据 +sink { + Console { + } +} +``` + +### 多表 + +无需配置schema文件类型,例如:`orc`。 + +``` +env { + parallelism = 1 + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local + job.mode = "BATCH" +} + +source { + OssFile { + tables_configs = [ + { + schema = { + table = "fake01" + } + bucket = "oss://whale-ops" + access_key = "xxxxxxxxxxxxxxxxxxx" + access_secret = "xxxxxxxxxxxxxxxxxxx" + endpoint = "https://oss-accelerate.aliyuncs.com" + path = "/test/seatunnel/read/orc" + file_format_type = "orc" + }, + { + schema = { + table = "fake02" + } + bucket = "oss://whale-ops" + access_key = "xxxxxxxxxxxxxxxxxxx" + access_secret = "xxxxxxxxxxxxxxxxxxx" + endpoint = "https://oss-accelerate.aliyuncs.com" + path = "/test/seatunnel/read/orc" + file_format_type = "orc" + } + ] + plugin_output = "fake" + } +} + +sink { + Assert { + rules { + table-names = ["fake01", "fake02"] + } + } +} +``` + +需要配置schema文件类型,例如:`json` + +``` + +env { + execution.parallelism = 1 + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local + job.mode = "BATCH" +} + +source { + OssFile { + tables_configs = [ + { + bucket = "oss://whale-ops" + access_key = "xxxxxxxxxxxxxxxxxxx" + access_secret = "xxxxxxxxxxxxxxxxxxx" + endpoint = "https://oss-accelerate.aliyuncs.com" + path = "/test/seatunnel/read/json" + file_format_type = "json" + schema = { + table = "fake01" + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + c_row = { + C_MAP = "map" + C_ARRAY = "array" + C_STRING = string + C_BOOLEAN = boolean + C_TINYINT = tinyint + C_SMALLINT = smallint + C_INT = int + C_BIGINT = bigint + C_FLOAT = float + C_DOUBLE = double + C_BYTES = bytes + C_DATE = date + C_DECIMAL = "decimal(38, 18)" + C_TIMESTAMP = timestamp + } + } + } + }, + { + bucket = "oss://whale-ops" + access_key = "xxxxxxxxxxxxxxxxxxx" + access_secret = "xxxxxxxxxxxxxxxxxxx" + endpoint = "https://oss-accelerate.aliyuncs.com" + path = "/test/seatunnel/read/json" + file_format_type = "json" + schema = { + table = "fake02" + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + c_row = { + C_MAP = "map" + C_ARRAY = "array" + C_STRING = string + C_BOOLEAN = boolean + C_TINYINT = tinyint + C_SMALLINT = smallint + C_INT = int + C_BIGINT = bigint + C_FLOAT = float + C_DOUBLE = double + C_BYTES = bytes + C_DATE = date + C_DECIMAL = "decimal(38, 18)" + C_TIMESTAMP = timestamp + } + } + } + } + ] + plugin_output = "fake" + } +} + +sink { + Assert { + rules { + table-names = ["fake01", "fake02"] + } + } +} +``` + +### 过滤文件 + +```hocon +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + OssFile { + path = "/seatunnel/orc" + bucket = "oss://tyrantlucifer-image-bed" + access_key = "xxxxxxxxxxxxxxxxx" + access_secret = "xxxxxxxxxxxxxxxxxxxxxx" + endpoint = "oss-cn-beijing.aliyuncs.com" + file_format_type = "orc" + // 文件示例 abcD2024.csv + file_filter_pattern = "abc[DX]*.*" + } +} + +sink { + Console { + } +} +``` + +## 变更日志 + + diff --git a/docs/zh/connector-v2/source/S3File.md b/docs/zh/connector-v2/source/S3File.md new file mode 100644 index 00000000000..4ea95ca4711 --- /dev/null +++ b/docs/zh/connector-v2/source/S3File.md @@ -0,0 +1,443 @@ +import ChangeLog from '../changelog/connector-file-s3.md'; + +# S3File + +> S3文件数据源连接器 + +## 支持的引擎 + +> Spark
+> Flink
+> SeaTunnel Zeta
+ +## 主要特性 + +- [x] [批处理](../../concept/connector-v2-features.md) +- [ ] [流处理](../../concept/connector-v2-features.md) +- [x] [精确一次](../../concept/connector-v2-features.md) + +在一次pollNext调用中读取分片中的所有数据。将读取的分片保存在快照中。 + +- [x] [列投影](../../concept/connector-v2-features.md) +- [x] [并行度](../../concept/connector-v2-features.md) +- [ ] [支持用户定义的分片](../../concept/connector-v2-features.md) +- [x] 文件格式类型 + - [x] text + - [x] csv + - [x] parquet + - [x] orc + - [x] json + - [x] excel + - [x] xml + - [x] binary + +## 描述 + +从aws s3文件系统读取数据。 + +## 支持的数据源信息 + +| 数据源 | 支持的版本 | +|------------|--------------------| +| S3 | current | + +## 依赖 + +> 如果您使用spark/flink,为了使用此连接器,您必须确保您的spark/flink集群已经集成了hadoop。测试过的hadoop版本是2.x。
+> +> 如果您使用SeaTunnel Zeta,它在您下载和安装SeaTunnel Zeta时会自动集成hadoop jar。您可以检查${SEATUNNEL_HOME}/lib下的jar包来确认这一点。
+> 要使用此连接器,您需要将hadoop-aws-3.1.4.jar和aws-java-sdk-bundle-1.12.692.jar放在${SEATUNNEL_HOME}/lib目录中。 + +## 数据类型映射 + +数据类型映射与正在读取的文件类型相关,我们支持以下文件类型: + +`text` `csv` `parquet` `orc` `json` `excel` `xml` + +### JSON文件类型 + +如果您将文件类型指定为`json`,您还应该指定schema选项来告诉连接器如何将数据解析为您想要的行。 + +例如: + +上游数据如下: + +```json + +{"code": 200, "data": "get success", "success": true} + +``` + +您也可以在一个文件中保存多条数据,并用换行符分隔: + +```json lines + +{"code": 200, "data": "get success", "success": true} +{"code": 300, "data": "get failed", "success": false} + +``` + +您应该按如下方式指定schema: + +```hocon + +schema { + fields { + code = int + data = string + success = boolean + } +} + +``` + +连接器将生成如下数据: + +| code | data | success | +|------|-------------|---------| +| 200 | get success | true | + +### 文本或CSV文件类型 + +如果您将`file_format_type`设置为`text`、`excel`、`csv`、`xml`。那么需要设置`schema`字段来告诉连接器如何将数据解析为行。 + +如果您设置了`schema`字段,您还应该设置选项`field_delimiter`,除非`file_format_type`是`csv`、`xml`、`excel` + +您可以按如下方式设置schema和分隔符: + +```hocon + +field_delimiter = "#" +schema { + fields { + name = string + age = int + gender = string + } +} + +``` + +连接器将生成如下数据: + +| name | age | gender | +|---------------|-----|--------| +| tyrantlucifer | 26 | male | + +### Orc文件类型 + +如果您将文件类型指定为`parquet` `orc`,则不需要schema选项,连接器可以自动找到上游数据的schema。 + +| Orc数据类型 | SeaTunnel数据类型 | +|----------------------------------|----------------------------------------------------------------| +| BOOLEAN | BOOLEAN | +| INT | INT | +| BYTE | BYTE | +| SHORT | SHORT | +| LONG | LONG | +| FLOAT | FLOAT | +| DOUBLE | DOUBLE | +| BINARY | BINARY | +| STRING
VARCHAR
CHAR
| STRING | +| DATE | LOCAL_DATE_TYPE | +| TIMESTAMP | LOCAL_DATE_TIME_TYPE | +| DECIMAL | DECIMAL | +| LIST(STRING) | STRING_ARRAY_TYPE | +| LIST(BOOLEAN) | BOOLEAN_ARRAY_TYPE | +| LIST(TINYINT) | BYTE_ARRAY_TYPE | +| LIST(SMALLINT) | SHORT_ARRAY_TYPE | +| LIST(INT) | INT_ARRAY_TYPE | +| LIST(BIGINT) | LONG_ARRAY_TYPE | +| LIST(FLOAT) | FLOAT_ARRAY_TYPE | +| LIST(DOUBLE) | DOUBLE_ARRAY_TYPE | +| Map | MapType,K和V的类型将转换为SeaTunnel类型 | +| STRUCT | SeaTunnelRowType | + +### Parquet文件类型 + +如果您将文件类型指定为`parquet` `orc`,则不需要schema选项,连接器可以自动找到上游数据的schema。 + +| Parquet数据类型 | SeaTunnel数据类型 | +|----------------------|----------------------------------------------------------------| +| INT_8 | BYTE | +| INT_16 | SHORT | +| DATE | DATE | +| TIMESTAMP_MILLIS | TIMESTAMP | +| INT64 | LONG | +| INT96 | TIMESTAMP | +| BINARY | BYTES | +| FLOAT | FLOAT | +| DOUBLE | DOUBLE | +| BOOLEAN | BOOLEAN | +| FIXED_LEN_BYTE_ARRAY | TIMESTAMP
DECIMAL | +| DECIMAL | DECIMAL | +| LIST(STRING) | STRING_ARRAY_TYPE | +| LIST(BOOLEAN) | BOOLEAN_ARRAY_TYPE | +| LIST(TINYINT) | BYTE_ARRAY_TYPE | +| LIST(SMALLINT) | SHORT_ARRAY_TYPE | +| LIST(INT) | INT_ARRAY_TYPE | +| LIST(BIGINT) | LONG_ARRAY_TYPE | +| LIST(FLOAT) | FLOAT_ARRAY_TYPE | +| LIST(DOUBLE) | DOUBLE_ARRAY_TYPE | +| Map | MapType,K和V的类型将转换为SeaTunnel类型 | +| STRUCT | SeaTunnelRowType | + +## 选项 + +| 名称 | 类型 | 是否必需 | 默认值 | 描述 | +|---------------------------------|---------|----------|-------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| path | string | 是 | - | 需要读取的s3路径,可以有子路径,但子路径需要满足一定的格式要求。具体要求可以参考"parse_partition_from_path"选项 | +| file_format_type | string | 是 | - | 文件类型,支持以下文件类型:`text` `csv` `parquet` `orc` `json` `excel` `xml` `binary` | +| bucket | string | 是 | - | s3文件系统的bucket地址,例如:`s3n://seatunnel-test`,如果您使用`s3a`协议,此参数应为`s3a://seatunnel-test`。 | +| fs.s3a.endpoint | string | 是 | - | fs s3a端点 | +| fs.s3a.aws.credentials.provider | string | 是 | com.amazonaws.auth.InstanceProfileCredentialsProvider | s3a的认证方式。我们目前只支持`org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider`和`com.amazonaws.auth.InstanceProfileCredentialsProvider`。有关凭据提供程序的更多信息,您可以查看[Hadoop AWS文档](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#Simple_name.2Fsecret_credentials_with_SimpleAWSCredentialsProvider.2A) | +| read_columns | list | 否 | - | 数据源的读取列列表,用户可以使用它来实现字段投影。支持列投影的文件类型如下所示:`text` `csv` `parquet` `orc` `json` `excel` `xml`。如果用户想在读取`text` `json` `csv`文件时使用此功能,必须配置"schema"选项。 | +| access_key | string | 否 | - | 仅在`fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider`时使用 | +| access_secret | string | 否 | - | 仅在`fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider`时使用 | +| hadoop_s3_properties | map | 否 | - | 如果您需要添加其他选项,可以在此处添加并参考此[链接](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html) | +| delimiter/field_delimiter | string | 否 | \001 | 字段分隔符,用于告诉连接器在读取文本文件时如何切分字段。默认`\001`,与hive的默认分隔符相同。 | +| parse_partition_from_path | boolean | 否 | true | 控制是否从文件路径解析分区键和值。例如,如果您从路径`s3n://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26`读取文件。文件中的每条记录数据都将添加这两个字段:name="tyrantlucifer",age=16 | +| date_format | string | 否 | yyyy-MM-dd | 日期类型格式,用于告诉连接器如何将字符串转换为日期,支持以下格式:`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd`。默认`yyyy-MM-dd` | +| datetime_format | string | 否 | yyyy-MM-dd HH:mm:ss | 日期时间类型格式,用于告诉连接器如何将字符串转换为日期时间,支持以下格式:`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` `yyyyMMddHHmmss` | +| time_format | string | 否 | HH:mm:ss | 时间类型格式,用于告诉连接器如何将字符串转换为时间,支持以下格式:`HH:mm:ss` `HH:mm:ss.SSS` | +| skip_header_row_number | long | 否 | 0 | 跳过前几行,但仅适用于txt和csv。例如,设置如下:`skip_header_row_number = 2`。然后SeaTunnel将跳过源文件的前2行 | +| csv_use_header_line | boolean | 否 | false | 是否使用标题行来解析文件,仅在file_format为`csv`且文件包含符合RFC 4180的标题行时使用 | +| schema | config | 否 | - | 上游数据的schema。 | +| sheet_name | string | 否 | - | 读取工作簿的工作表,仅在file_format为excel时使用。 | +| xml_row_tag | string | 否 | - | 指定XML文件中数据行的标签名称,仅对XML文件有效。 | +| xml_use_attr_format | boolean | 否 | - | 指定是否使用标签属性格式处理数据,仅对XML文件有效。 | +| compress_codec | string | 否 | none | | +| archive_compress_codec | string | 否 | none | | +| encoding | string | 否 | UTF-8 | | +| null_format | string | 否 | - | 仅在file_format_type为text时使用。null_format用于定义哪些字符串可以表示为null。例如:`\N` | +| binary_chunk_size | int | 否 | 1024 | 仅在file_format_type为binary时使用。读取二进制文件的块大小(以字节为单位)。默认为1024字节。较大的值可能会提高大文件的性能,但会使用更多内存。 | +| binary_complete_file_mode | boolean | 否 | false | 仅在file_format_type为binary时使用。是否将完整文件作为单个块读取,而不是分割成块。启用时,整个文件内容将一次性读入内存。默认为false。 | +| file_filter_pattern | string | 否 | | 过滤模式,用于过滤文件。 | +| filename_extension | string | 否 | - | 过滤文件名扩展名,用于过滤具有特定扩展名的文件。例如:`csv` `.txt` `json` `.xml`。 | +| common-options | | 否 | - | 数据源插件通用参数,请参考[数据源通用选项](../source-common-options.md)了解详情。 | + +### delimiter/field_delimiter [string] + +**delimiter**参数将在2.3.5版本后弃用,请使用**field_delimiter**代替。 + +### file_filter_pattern [string] + +过滤模式,用于过滤文件。 + +该模式遵循标准正则表达式。详情请参考 https://en.wikipedia.org/wiki/Regular_expression。 +以下是一些示例。 + +文件结构示例: +``` +/data/seatunnel/20241001/report.txt +/data/seatunnel/20241007/abch202410.csv +/data/seatunnel/20241002/abcg202410.csv +/data/seatunnel/20241005/old_data.csv +/data/seatunnel/20241012/logo.png +``` +匹配规则示例: + +**示例1**:*匹配所有.txt文件*,正则表达式: +``` +/data/seatunnel/20241001/.*\.txt +``` +此示例匹配的结果是: +``` +/data/seatunnel/20241001/report.txt +``` +**示例2**:*匹配所有以abc开头的文件*,正则表达式: +``` +/data/seatunnel/20241002/abc.* +``` +此示例匹配的结果是: +``` +/data/seatunnel/20241007/abch202410.csv +/data/seatunnel/20241002/abcg202410.csv +``` +**示例3**:*匹配所有以abc开头,且第四个字符是h或g的文件*,正则表达式: +``` +/data/seatunnel/20241007/abc[h,g].* +``` +此示例匹配的结果是: +``` +/data/seatunnel/20241007/abch202410.csv +``` +**示例4**:*匹配以202410开头的第三级文件夹和以.csv结尾的文件*,正则表达式: +``` +/data/seatunnel/202410\d*/.*\.csv +``` +此示例匹配的结果是: +``` +/data/seatunnel/20241007/abch202410.csv +/data/seatunnel/20241002/abcg202410.csv +/data/seatunnel/20241005/old_data.csv +``` + +### compress_codec [string] + +文件的压缩编解码器,支持的详细信息如下所示: + +- txt: `lzo` `none` +- json: `lzo` `none` +- csv: `lzo` `none` +- orc/parquet: + 自动识别压缩类型,无需额外设置。 + +### archive_compress_codec [string] + +归档文件的压缩编解码器,支持的详细信息如下所示: + +| archive_compress_codec | file_format | archive_compress_suffix | +|------------------------|------------|-------------------------| +| ZIP | txt,json,excel,xml | .zip | +| TAR | txt,json,excel,xml | .tar | +| TAR_GZ | txt,json,excel,xml | .tar.gz | +| GZ | txt,json,excel,xml | .gz | +| NONE | all | .* | + +注意:gz压缩的excel文件需要压缩原始文件或指定文件后缀,例如e2e.xls ->e2e_test.xls.gz + +### encoding [string] + +仅在file_format_type为json、text、csv、xml时使用。 +要读取的文件的编码。此参数将由`Charset.forName(encoding)`解析。 + +### binary_chunk_size [int] + +仅在file_format_type为binary时使用。 + +读取二进制文件的块大小(以字节为单位)。默认为1024字节。较大的值可能会提高大文件的性能,但会使用更多内存。 + +### binary_complete_file_mode [boolean] + +仅在file_format_type为binary时使用。 + +是否将完整文件作为单个块读取,而不是分割成块。启用时,整个文件内容将一次性读入内存。默认为false。 + +## 示例 + +1. 在此示例中,我们从s3路径`s3a://seatunnel-test/seatunnel/text`读取数据,此路径中的文件类型是orc。 + 我们使用`org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider`进行身份验证,因此需要`access_key`和`secret_key`。 + 文件中的所有列都将被读取并发送到接收器。 + +``` +# 定义运行时环境 +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + S3File { + path = "/seatunnel/text" + fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn" + fs.s3a.aws.credentials.provider = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider" + access_key = "xxxxxxxxxxxxxxxxx" + secret_key = "xxxxxxxxxxxxxxxxx" + bucket = "s3a://seatunnel-test" + file_format_type = "orc" + } +} + +transform { + # 如果您想获取有关如何配置seatunnel和查看转换插件完整列表的更多信息, + # 请访问 https://seatunnel.apache.org/docs/transform-v2 +} + +sink { + Console {} +} +``` + +2. 使用`InstanceProfileCredentialsProvider`进行身份验证 + S3中的文件类型是json,因此需要配置schema选项。 + +```hocon + + S3File { + path = "/seatunnel/json" + bucket = "s3a://seatunnel-test" + fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn" + fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider" + file_format_type = "json" + schema { + fields { + id = int + name = string + } + } + } + +``` + +3. 使用`InstanceProfileCredentialsProvider`进行身份验证 + S3中的文件类型是json,有五个字段(`id`、`name`、`age`、`sex`、`type`),因此需要配置schema选项。 + 在此作业中,我们只需要将`id`和`name`列发送到mysql。 + +``` +# 定义运行时环境 +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + S3File { + path = "/seatunnel/json" + bucket = "s3a://seatunnel-test" + fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn" + fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider" + file_format_type = "json" + read_columns = ["id", "name"] + schema { + fields { + id = int + name = string + age = int + sex = int + type = string + } + } + } +} + +transform { + # 如果您想获取有关如何配置seatunnel和查看转换插件完整列表的更多信息, + # 请访问 https://seatunnel.apache.org/docs/transform-v2 +} + +sink { + Console {} +} +``` + +### 过滤文件 + +```hocon +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + S3File { + path = "/seatunnel/json" + bucket = "s3a://seatunnel-test" + fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn" + fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider" + file_format_type = "json" + read_columns = ["id", "name"] + // 文件示例 abcD2024.csv + file_filter_pattern = "abc[DX]*.*" + } +} + +sink { + Console { + } +} +``` + +## 变更日志 + + diff --git a/docs/zh/connector-v2/source/SftpFile.md b/docs/zh/connector-v2/source/SftpFile.md new file mode 100644 index 00000000000..e52876d633c --- /dev/null +++ b/docs/zh/connector-v2/source/SftpFile.md @@ -0,0 +1,411 @@ +import ChangeLog from '../changelog/connector-file-sftp.md'; + +# SftpFile + +> Sftp文件数据源连接器 + +## 支持的引擎 + +> Spark
+> Flink
+> SeaTunnel Zeta
+ +## 主要特性 + +- [x] [批处理](../../concept/connector-v2-features.md) +- [ ] [流处理](../../concept/connector-v2-features.md) +- [ ] [精确一次](../../concept/connector-v2-features.md) +- [x] [列投影](../../concept/connector-v2-features.md) +- [x] [并行度](../../concept/connector-v2-features.md) +- [ ] [支持用户定义的分片](../../concept/connector-v2-features.md) +- [x] 文件格式类型 + - [x] text + - [x] csv + - [x] json + - [x] excel + - [x] xml + - [x] binary + +## 描述 + +从sftp文件服务器读取数据。 + +## 支持的数据源信息 + +为了使用SftpFile连接器,需要以下依赖项。 +可以通过install-plugin.sh或从Maven中央仓库下载。 + +| 数据源 | 支持的版本 | 依赖 | +|------------|--------------------|-----------------------------------------------------------------------------------------| +| SftpFile | universal | [下载](https://mvnrepository.com/artifact/org.apache.seatunnel/connector-file-sftp) | + +:::tip + +如果您使用spark/flink,为了使用此连接器,您必须确保您的spark/flink集群已经集成了hadoop。测试过的hadoop版本是2.x。 + +如果您使用SeaTunnel引擎,它在您下载和安装SeaTunnel引擎时会自动集成hadoop jar。您可以检查${SEATUNNEL_HOME}/lib下的jar包来确认这一点。 + +为了支持更多文件类型,我们做了一些权衡,因此我们使用HDFS协议进行内部访问Sftp,此连接器需要一些hadoop依赖项。 +它只支持hadoop版本**2.9.X+**。 + +::: + +## 数据类型映射 + +文件没有特定的类型列表,我们可以通过在配置中指定Schema来指示相应的数据需要转换为哪种SeaTunnel数据类型。 + +| SeaTunnel数据类型 | +|---------------------| +| STRING | +| SHORT | +| INT | +| BIGINT | +| BOOLEAN | +| DOUBLE | +| DECIMAL | +| FLOAT | +| DATE | +| TIME | +| TIMESTAMP | +| BYTES | +| ARRAY | +| MAP | + +## 数据源选项 + +| 名称 | 类型 | 是否必需 | 默认值 | 描述 | +|---------------------------|---------|----------|---------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| host | String | 是 | - | 目标sftp主机是必需的 | +| port | Int | 是 | - | 目标sftp端口是必需的 | +| user | String | 是 | - | 目标sftp用户名是必需的 | +| password | String | 是 | - | 目标sftp密码是必需的 | +| path | String | 是 | - | 源文件路径。 | +| file_format_type | String | 是 | - | 请查看下面的#file_format_type | +| file_filter_pattern | String | 否 | - | 过滤模式,用于过滤文件。 | +| filename_extension | string | 否 | - | 过滤文件名扩展名,用于过滤具有特定扩展名的文件。例如:`csv` `.txt` `json` `.xml`。 | +| delimiter/field_delimiter | String | 否 | \001 | **delimiter**参数将在2.3.5版本后弃用,请使用**field_delimiter**代替。
字段分隔符,用于告诉连接器在读取文本文件时如何切分字段。
默认`\001`,与hive的默认分隔符相同 | +| parse_partition_from_path | Boolean | 否 | true | 控制是否从文件路径解析分区键和值
例如,如果您从路径`oss://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26`读取文件
文件中的每条记录数据都将添加这两个字段:
name age
tyrantlucifer 26
提示:**不要在schema选项中定义分区字段** | +| date_format | String | 否 | yyyy-MM-dd | 日期类型格式,用于告诉连接器如何将字符串转换为日期,支持以下格式:
`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd`
默认`yyyy-MM-dd` | +| datetime_format | String | 否 | yyyy-MM-dd HH:mm:ss | 日期时间类型格式,用于告诉连接器如何将字符串转换为日期时间,支持以下格式:
`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` `yyyyMMddHHmmss`
默认`yyyy-MM-dd HH:mm:ss` | +| time_format | String | 否 | HH:mm:ss | 时间类型格式,用于告诉连接器如何将字符串转换为时间,支持以下格式:
`HH:mm:ss` `HH:mm:ss.SSS`
默认`HH:mm:ss` | +| skip_header_row_number | Long | 否 | 0 | 跳过前几行,但仅适用于txt和csv。
例如,设置如下:
`skip_header_row_number = 2`
然后SeaTunnel将跳过源文件的前2行 | +| read_columns | list | 否 | - | 数据源的读取列列表,用户可以使用它来实现字段投影。 | +| sheet_name | String | 否 | - | 读取工作簿的工作表,仅在file_format为excel时使用。 | +| xml_row_tag | string | 否 | - | 指定XML文件中数据行的标签名称,仅在file_format为xml时使用。 | +| xml_use_attr_format | boolean | 否 | - | 指定是否使用标签属性格式处理数据,仅在file_format为xml时使用。 | +| csv_use_header_line | boolean | 否 | false | 是否使用标题行来解析文件,仅在file_format为`csv`且文件包含符合RFC 4180的标题行时使用 | +| schema | Config | 否 | - | 请查看下面的#schema | +| compress_codec | String | 否 | None | 文件的压缩编解码器,支持的详细信息如下所示:
- txt: `lzo` `None`
- json: `lzo` `None`
- csv: `lzo` `None`
- orc: `lzo` `snappy` `lz4` `zlib` `None`
- parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `None`
提示:excel类型不支持任何压缩格式 | +| archive_compress_codec | string | 否 | none | +| encoding | string | 否 | UTF-8 | +| null_format | string | 否 | - | 仅在file_format_type为text时使用。null_format用于定义哪些字符串可以表示为null。例如:`\N` | +| binary_chunk_size | int | 否 | 1024 | 仅在file_format_type为binary时使用。读取二进制文件的块大小(以字节为单位)。默认为1024字节。较大的值可能会提高大文件的性能,但会使用更多内存。 | +| binary_complete_file_mode | boolean | 否 | false | 仅在file_format_type为binary时使用。是否将完整文件作为单个块读取,而不是分割成块。启用时,整个文件内容将一次性读入内存。默认为false。 | +| common-options | | 否 | - | 数据源插件通用参数,请参考[数据源通用选项](../source-common-options.md)了解详情。 | + +### file_filter_pattern [string] + +过滤模式,用于过滤文件。 + +该模式遵循标准正则表达式。详情请参考 https://en.wikipedia.org/wiki/Regular_expression。 +以下是一些示例。 + +文件结构示例: +``` +/data/seatunnel/20241001/report.txt +/data/seatunnel/20241007/abch202410.csv +/data/seatunnel/20241002/abcg202410.csv +/data/seatunnel/20241005/old_data.csv +/data/seatunnel/20241012/logo.png +``` +匹配规则示例: + +**示例1**:*匹配所有.txt文件*,正则表达式: +``` +/data/seatunnel/20241001/.*\.txt +``` +此示例匹配的结果是: +``` +/data/seatunnel/20241001/report.txt +``` +**示例2**:*匹配所有以abc开头的文件*,正则表达式: +``` +/data/seatunnel/20241002/abc.* +``` +此示例匹配的结果是: +``` +/data/seatunnel/20241007/abch202410.csv +/data/seatunnel/20241002/abcg202410.csv +``` +**示例3**:*匹配所有以abc开头,且第四个字符是h或g的文件*,正则表达式: +``` +/data/seatunnel/20241007/abc[h,g].* +``` +此示例匹配的结果是: +``` +/data/seatunnel/20241007/abch202410.csv +``` +**示例4**:*匹配以202410开头的第三级文件夹和以.csv结尾的文件*,正则表达式: +``` +/data/seatunnel/202410\d*/.*\.csv +``` +此示例匹配的结果是: +``` +/data/seatunnel/20241007/abch202410.csv +/data/seatunnel/20241002/abcg202410.csv +/data/seatunnel/20241005/old_data.csv +``` + +### file_format_type [string] + +文件类型,支持以下文件类型: +`text` `csv` `parquet` `orc` `json` `excel` `xml` `binary` +如果您将文件类型指定为`json`,您还应该指定schema选项来告诉连接器如何将数据解析为您想要的行。 +例如: +上游数据如下: + +```json +{"code": 200, "data": "get success", "success": true} +``` + +您也可以在一个文件中保存多条数据,并用换行符分隔: + +```json lines +{"code": 200, "data": "get success", "success": true} +{"code": 300, "data": "get failed", "success": false} +``` + +您应该按如下方式指定schema: + +```hocon +schema { + fields { + code = int + data = string + success = boolean + } +} +``` + +连接器将生成如下数据: +| code | data | success | +|------|-------------|---------| +| 200 | get success | true | +如果您将文件类型指定为`parquet` `orc`,则不需要schema选项,连接器可以自动找到上游数据的schema。 +如果您将文件类型指定为`text` `csv`,您可以选择指定schema信息或不指定。 +例如,上游数据如下: + +```text +tyrantlucifer#26#male +``` + +如果您不指定数据schema,连接器将把上游数据视为如下: +| content | +|-----------------------| +| tyrantlucifer#26#male | +如果您指定数据schema,除了CSV文件类型外,您还应该指定选项`field_delimiter` +您应该按如下方式指定schema和分隔符: + +```hocon +field_delimiter = "#" +schema { + fields { + name = string + age = int + gender = string + } +} +``` + +连接器将生成如下数据: +| name | age | gender | +|---------------|-----|--------| +| tyrantlucifer | 26 | male | + +如果您将文件类型指定为`binary`,SeaTunnel可以同步任何格式的文件, +例如压缩包、图片等。简而言之,任何文件都可以同步到目标位置。 +在此要求下,您需要确保源和接收器同时使用`binary`格式进行文件同步。 + +### compress_codec [string] + +文件的压缩编解码器,支持的详细信息如下所示: + +- txt: `lzo` `none` +- json: `lzo` `none` +- csv: `lzo` `none` +- orc/parquet: + 自动识别压缩类型,无需额外设置。 + +### archive_compress_codec [string] + +归档文件的压缩编解码器,支持的详细信息如下所示: + +| archive_compress_codec | file_format | archive_compress_suffix | +|--------------------|--------------------|---------------------| +| ZIP | txt,json,excel,xml | .zip | +| TAR | txt,json,excel,xml | .tar | +| TAR_GZ | txt,json,excel,xml | .tar.gz | +| GZ | txt,json,excel,xml | .gz | +| NONE | all | .* | + +注意:gz压缩的excel文件需要压缩原始文件或指定文件后缀,例如e2e.xls ->e2e_test.xls.gz + +### encoding [string] + +仅在file_format_type为json、text、csv、xml时使用。 +要读取的文件的编码。此参数将由`Charset.forName(encoding)`解析。 + +### binary_chunk_size [int] + +仅在file_format_type为binary时使用。 + +读取二进制文件的块大小(以字节为单位)。默认为1024字节。较大的值可能会提高大文件的性能,但会使用更多内存。 + +### binary_complete_file_mode [boolean] + +仅在file_format_type为binary时使用。 + +是否将完整文件作为单个块读取,而不是分割成块。启用时,整个文件内容将一次性读入内存。默认为false。 + +### schema [config] + +#### fields [Config] + +上游数据的schema。 + +## 如何创建Sftp数据同步作业 + +以下示例演示如何创建从sftp读取数据并在本地客户端打印的数据同步作业: + +```bash +# 设置要执行的任务的基本配置 +env { + parallelism = 1 + job.mode = "BATCH" +} + +# 创建连接到sftp的数据源 +source { + SftpFile { + host = "sftp" + port = 22 + user = seatunnel + password = pass + path = "tmp/seatunnel/read/json" + file_format_type = "json" + plugin_output = "sftp" + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + c_row = { + C_MAP = "map" + C_ARRAY = "array" + C_STRING = string + C_BOOLEAN = boolean + C_TINYINT = tinyint + C_SMALLINT = smallint + C_INT = int + C_BIGINT = bigint + C_FLOAT = float + C_DOUBLE = double + C_BYTES = bytes + C_DATE = date + C_DECIMAL = "decimal(38, 18)" + C_TIMESTAMP = timestamp + } + } + } + } +} + +# 控制台打印读取的sftp数据 +sink { + Console { + parallelism = 1 + } +} +``` +### 多表 + +```hocon + +SftpFile { + tables_configs = [ + { + schema { + table = "student" + fields { + name = string + age = int + } + } + path = "/tmp/seatunnel/sink/text" + host = "192.168.31.48" + port = 21 + user = tyrantlucifer + password = tianchao + file_format_type = "parquet" + }, + { + schema { + table = "teacher" + fields { + name = string + age = int + } + } + path = "/tmp/seatunnel/sink/text" + host = "192.168.31.48" + port = 21 + user = tyrantlucifer + password = tianchao + file_format_type = "parquet" + } + ] +} + +``` + +### 过滤文件 + +```hocon +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + SftpFile { + host = "sftp" + port = 22 + user = seatunnel + password = pass + path = "tmp/seatunnel/read/json" + file_format_type = "json" + plugin_output = "sftp" + // 文件示例 abcD2024.csv + file_filter_pattern = "abc[DX]*.*" + } +} + +sink { + Console { + } +} +``` +## 变更日志 + + diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseSourceOptions.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseSourceOptions.java index 4602ee79ceb..d4adc8a2123 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseSourceOptions.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseSourceOptions.java @@ -88,4 +88,20 @@ public class FileBaseSourceOptions extends FileBaseOptions { .noDefaultValue() .withDescription( "File pattern. The connector will filter some files base on the pattern."); + + public static final Option BINARY_CHUNK_SIZE = + Options.key("binary_chunk_size") + .intType() + .defaultValue(1024) + .withDescription( + "The chunk size (in bytes) for reading binary files. Default is 1024 bytes. " + + "Larger values may improve performance for large files but use more memory.Only valid when file_format_type is binary."); + + public static final Option BINARY_COMPLETE_FILE_MODE = + Options.key("binary_complete_file_mode") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to read the complete file as a single chunk instead of splitting into chunks. " + + "When enabled, the entire file content will be read into memory at once.Only valid when file_format_type is binary."); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java index 137a4322975..897c37a3447 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java @@ -27,6 +27,8 @@ import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; +import org.apache.commons.io.IOUtils; + import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -43,11 +45,34 @@ public class BinaryReadStrategy extends AbstractReadStrategy { }); private File basePath; + private int binaryChunkSize = FileBaseSourceOptions.BINARY_CHUNK_SIZE.defaultValue(); + private boolean completeFileMode = + FileBaseSourceOptions.BINARY_COMPLETE_FILE_MODE.defaultValue(); @Override public void init(HadoopConf conf) { super.init(conf); basePath = new File(pluginConfig.getString(FileBaseSourceOptions.FILE_PATH.key())); + + // Load binary chunk size configuration + if (pluginConfig.hasPath(FileBaseSourceOptions.BINARY_CHUNK_SIZE.key())) { + binaryChunkSize = pluginConfig.getInt(FileBaseSourceOptions.BINARY_CHUNK_SIZE.key()); + // Validate chunk size - should be positive and reasonable + if (binaryChunkSize <= 0) { + throw new IllegalArgumentException( + "Binary chunk size must be positive, got: " + binaryChunkSize); + } + if (binaryChunkSize > 100 * 1024 * 1024) { // 100MB limit + throw new IllegalArgumentException( + "Binary chunk size too large (max 100MB), got: " + binaryChunkSize); + } + } + + // Load complete file mode configuration + if (pluginConfig.hasPath(FileBaseSourceOptions.BINARY_COMPLETE_FILE_MODE.key())) { + completeFileMode = + pluginConfig.getBoolean(FileBaseSourceOptions.BINARY_COMPLETE_FILE_MODE.key()); + } } @Override @@ -66,21 +91,49 @@ public void read(String path, String tableId, Collector output) relativePath = relativePath.substring(File.separator.length()); } } - // TODO config this size - int maxSize = 1024; - byte[] buffer = new byte[maxSize]; - long partIndex = 0; - int readSize; - while ((readSize = inputStream.read(buffer)) != -1) { - if (readSize != maxSize) { - buffer = Arrays.copyOf(buffer, readSize); - } - SeaTunnelRow row = new SeaTunnelRow(new Object[] {buffer, relativePath, partIndex}); - buffer = new byte[1024]; - row.setTableId(tableId); - output.collect(row); - partIndex++; + + if (completeFileMode) { + // Read entire file as a single chunk + readCompleteFile(inputStream, relativePath, tableId, output); + } else { + // Read file in configurable chunks + readFileInChunks(inputStream, relativePath, tableId, output); + } + } + } + + /** Read the entire file as a single chunk. */ + private void readCompleteFile( + InputStream inputStream, + String relativePath, + String tableId, + Collector output) + throws IOException { + byte[] fileContent = IOUtils.toByteArray(inputStream); + SeaTunnelRow row = new SeaTunnelRow(new Object[] {fileContent, relativePath, 0L}); + row.setTableId(tableId); + output.collect(row); + } + + /** Read the file in configurable chunks. */ + private void readFileInChunks( + InputStream inputStream, + String relativePath, + String tableId, + Collector output) + throws IOException { + byte[] buffer = new byte[binaryChunkSize]; + long partIndex = 0; + int readSize; + while ((readSize = inputStream.read(buffer)) != -1) { + if (readSize != binaryChunkSize) { + buffer = Arrays.copyOf(buffer, readSize); } + SeaTunnelRow row = new SeaTunnelRow(new Object[] {buffer, relativePath, partIndex}); + buffer = new byte[binaryChunkSize]; + row.setTableId(tableId); + output.collect(row); + partIndex++; } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/reader/BinaryReadStrategyTest.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/reader/BinaryReadStrategyTest.java new file mode 100644 index 00000000000..90fc3d03bb6 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/reader/BinaryReadStrategyTest.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.reader; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; + +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; +import org.apache.seatunnel.connectors.seatunnel.file.source.reader.BinaryReadStrategy; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import lombok.Getter; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT; + +public class BinaryReadStrategyTest { + + @TempDir Path tempDir; + + private BinaryReadStrategy binaryReadStrategy; + private LocalConf localConf; + + @BeforeEach + public void setUp() { + binaryReadStrategy = new BinaryReadStrategy(); + localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT); + } + + @Test + public void testBinaryReadWithDefaultChunkSize() throws IOException { + // Create a test file with 2048 bytes (2 chunks of 1024 bytes each) + File testFile = createTestFile("test_binary_default.bin", 2048); + + Config config = createConfig(testFile.getParent(), null, null); + binaryReadStrategy.setPluginConfig(config); + binaryReadStrategy.init(localConf); + + TestCollector collector = new TestCollector(); + binaryReadStrategy.read(testFile.getAbsolutePath(), "test_table", collector); + + List rows = collector.getRows(); + Assertions.assertEquals( + 2, rows.size(), "Should have 2 chunks for 2048 bytes with default 1024 chunk size"); + + // Verify first chunk + SeaTunnelRow firstRow = rows.get(0); + Assertions.assertEquals(3, firstRow.getArity()); + byte[] firstChunkData = (byte[]) firstRow.getField(0); + Assertions.assertEquals(1024, firstChunkData.length); + Assertions.assertEquals("test_binary_default.bin", firstRow.getField(1)); + Assertions.assertEquals(0L, firstRow.getField(2)); + + // Verify second chunk + SeaTunnelRow secondRow = rows.get(1); + byte[] secondChunkData = (byte[]) secondRow.getField(0); + Assertions.assertEquals(1024, secondChunkData.length); + Assertions.assertEquals("test_binary_default.bin", secondRow.getField(1)); + Assertions.assertEquals(1L, secondRow.getField(2)); + } + + @Test + public void testBinaryReadWithCustomChunkSize() throws IOException { + // Create a test file with 1500 bytes + File testFile = createTestFile("test_binary_custom.bin", 1500); + + Config config = createConfig(testFile.getParent(), 512, null); + binaryReadStrategy.setPluginConfig(config); + binaryReadStrategy.init(localConf); + + TestCollector collector = new TestCollector(); + binaryReadStrategy.read(testFile.getAbsolutePath(), "test_table", collector); + + List rows = collector.getRows(); + Assertions.assertEquals( + 3, rows.size(), "Should have 3 chunks for 1500 bytes with 512 chunk size"); + + // Verify chunk sizes: 512, 512, 476 + Assertions.assertEquals(512, ((byte[]) rows.get(0).getField(0)).length); + Assertions.assertEquals(512, ((byte[]) rows.get(1).getField(0)).length); + Assertions.assertEquals(476, ((byte[]) rows.get(2).getField(0)).length); + + // Verify part indices + Assertions.assertEquals(0L, rows.get(0).getField(2)); + Assertions.assertEquals(1L, rows.get(1).getField(2)); + Assertions.assertEquals(2L, rows.get(2).getField(2)); + } + + @Test + public void testBinaryReadCompleteFileMode() throws IOException { + // Create a test file with 2048 bytes + File testFile = createTestFile("test_binary_complete.bin", 2048); + + Config config = createConfig(testFile.getParent(), null, true); + binaryReadStrategy.setPluginConfig(config); + binaryReadStrategy.init(localConf); + + TestCollector collector = new TestCollector(); + binaryReadStrategy.read(testFile.getAbsolutePath(), "test_table", collector); + + List rows = collector.getRows(); + Assertions.assertEquals(1, rows.size(), "Should have 1 row in complete file mode"); + + SeaTunnelRow row = rows.get(0); + byte[] fileData = (byte[]) row.getField(0); + Assertions.assertEquals(2048, fileData.length, "Should read entire file content"); + Assertions.assertEquals("test_binary_complete.bin", row.getField(1)); + Assertions.assertEquals(0L, row.getField(2)); + } + + private File createTestFile(String fileName, int sizeInBytes) throws IOException { + File testFile = tempDir.resolve(fileName).toFile(); + + if (sizeInBytes > 0) { + try (FileOutputStream fos = new FileOutputStream(testFile)) { + // Create test data with a pattern for verification + byte[] pattern = "SEATUNNEL_TEST_DATA_".getBytes(); + int written = 0; + while (written < sizeInBytes) { + int toWrite = Math.min(pattern.length, sizeInBytes - written); + fos.write(pattern, 0, toWrite); + written += toWrite; + } + } + } else { + // Create empty file + testFile.createNewFile(); + } + + return testFile; + } + + private Config createConfig(String filePath, Integer chunkSize, Boolean completeFileMode) { + Map configMap = new HashMap<>(); + configMap.put("path", filePath); // Fixed: use "path" instead of "file_path" + configMap.put("file_format_type", "binary"); + + if (chunkSize != null) { + configMap.put("binary_chunk_size", chunkSize); + } + if (completeFileMode != null) { + configMap.put("binary_complete_file_mode", completeFileMode); + } + + return ConfigFactory.parseMap(configMap); + } + + @Getter + public static class TestCollector implements Collector { + private final List rows = new ArrayList<>(); + + @Override + public void collect(SeaTunnelRow record) { + rows.add(record); + } + + @Override + public Object getCheckpointLock() { + return null; + } + } + + public static class LocalConf extends HadoopConf { + private static final String HDFS_IMPL = "org.apache.hadoop.fs.LocalFileSystem"; + private static final String SCHEMA = "file"; + + public LocalConf(String hdfsNameKey) { + super(hdfsNameKey); + } + + @Override + public String getFsHdfsImpl() { + return HDFS_IMPL; + } + + @Override + public String getSchema() { + return SCHEMA; + } + } +}