Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/en/connector-v2/source/OssFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ If you assign file type to `parquet` `orc`, schema option not required, connecto
| time_format | string | no | HH:mm:ss | Time type format, used to tell connector how to convert string to time, supported as the following formats:`HH:mm:ss` `HH:mm:ss.SSS` |
| filename_extension | string | no | - | Filter filename extension, which used for filtering files with specific extension. Example: `csv` `.txt` `json` `.xml`. |
| skip_header_row_number | long | no | 0 | Skip the first few lines, but only for the txt and csv. For example, set like following:`skip_header_row_number = 2`. Then SeaTunnel will skip the first 2 lines from source files |
| csv_use_header_line | boolean | no | false | Whether to use the header line to parse the file, only used when the file_format is `csv` and the file contains the header line that match RFC 4180 |
| schema | config | no | - | The schema of upstream data. |
| sheet_name | string | no | - | Reader the sheet of the workbook,Only used when file_format is excel. |
| xml_row_tag | string | no | - | Specifies the tag name of the data rows within the XML file, only used when file_format is xml. |
Expand Down
1 change: 1 addition & 0 deletions docs/en/connector-v2/source/S3File.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ If you assign file type to `parquet` `orc`, schema option not required, connecto
| datetime_format | string | no | yyyy-MM-dd HH:mm:ss | Datetime type format, used to tell connector how to convert string to datetime, supported as the following formats:`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` `yyyyMMddHHmmss` |
| time_format | string | no | HH:mm:ss | Time type format, used to tell connector how to convert string to time, supported as the following formats:`HH:mm:ss` `HH:mm:ss.SSS` |
| skip_header_row_number | long | no | 0 | Skip the first few lines, but only for the txt and csv. For example, set like following:`skip_header_row_number = 2`. Then SeaTunnel will skip the first 2 lines from source files |
| csv_use_header_line | boolean | no | false | Whether to use the header line to parse the file, only used when the file_format is `csv` and the file contains the header line that match RFC 4180 |
| schema | config | no | - | The schema of upstream data. |
| sheet_name | string | no | - | Reader the sheet of the workbook,Only used when file_format is excel. |
| xml_row_tag | string | no | - | Specifies the tag name of the data rows within the XML file, only valid for XML files. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,13 @@ public class BaseSourceConfigOptions {
.defaultValue(0L)
.withDescription("The number of rows to skip");

public static final Option<Boolean> CSV_USE_HEADER_LINE =
Options.key("csv_use_header_line")
.booleanType()
.defaultValue(Boolean.FALSE)
.withDescription(
"whether to use the header line to parse the file, only used when the file_format is csv");

public static final Option<List<String>> READ_PARTITIONS =
Options.key("read_partitions")
.listType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

@Slf4j
public class CsvReadStrategy extends AbstractReadStrategy {
Expand All @@ -67,6 +69,7 @@ public class CsvReadStrategy extends AbstractReadStrategy {
private int[] indexes;
private String encoding = BaseSourceConfigOptions.ENCODING.defaultValue();
private CatalogTable inputCatalogTable;
private boolean firstLineAsHeader = BaseSourceConfigOptions.CSV_USE_HEADER_LINE.defaultValue();

@Override
public void read(String path, String tableId, Collector<SeaTunnelRow> output)
Expand Down Expand Up @@ -102,6 +105,9 @@ public void readProcess(
}

CSVFormat csvFormat = CSVFormat.DEFAULT;
if (firstLineAsHeader) {
csvFormat = csvFormat.withFirstRecordAsHeader();
}
try (BufferedReader reader =
new BufferedReader(new InputStreamReader(actualInputStream, encoding));
CSVParser csvParser = new CSVParser(reader, csvFormat); ) {
Expand All @@ -114,10 +120,18 @@ public void readProcess(
}
}
// read lines
List<String> headers = getHeaders(csvParser);
for (CSVRecord csvRecord : csvParser) {
HashMap<Integer, String> fieldIdValueMap = new HashMap<>();
for (int i = 0; i < inputCatalogTable.getTableSchema().getColumns().size(); i++) {
fieldIdValueMap.put(i, csvRecord.get(i));
for (int i = 0; i < headers.size(); i++) {
// the user input schema may not contain all the columns in the csv header
// and may contain columns in a different order with the csv header
int index =
inputCatalogTable.getSeaTunnelRowType().indexOf(headers.get(i), false);
if (index == -1) {
continue;
}
fieldIdValueMap.put(index, csvRecord.get(i).replace("\uFEFF", ""));
}
SeaTunnelRow seaTunnelRow = deserializationSchema.getSeaTunnelRow(fieldIdValueMap);
if (!readColumns.isEmpty()) {
Expand Down Expand Up @@ -152,6 +166,22 @@ public void readProcess(
}
}

private List<String> getHeaders(CSVParser csvParser) {
List<String> headers;
if (firstLineAsHeader) {
headers =
csvParser.getHeaderNames().stream()
.map(header -> header.replace("\"", "").replace("\uFEFF", "").trim())
.collect(Collectors.toList());
} else {
headers =
inputCatalogTable.getTableSchema().getColumns().stream()
.map(column -> column.getName())
.collect(Collectors.toList());
}
return headers;
}

@Override
public SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) {
this.seaTunnelRowType = CatalogTableUtil.buildSimpleTextSchema();
Expand Down Expand Up @@ -205,6 +235,10 @@ public void setCatalogTable(CatalogTable catalogTable) {
readonlyConfig
.getOptional(BaseSourceConfigOptions.NULL_FORMAT)
.orElse(null));
if (pluginConfig.hasPath(BaseSourceConfigOptions.CSV_USE_HEADER_LINE.key())) {
firstLineAsHeader =
pluginConfig.getBoolean(BaseSourceConfigOptions.CSV_USE_HEADER_LINE.key());
}
if (isMergePartition) {
deserializationSchema =
builder.seaTunnelRowType(userDefinedRowTypeWithPartition).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,14 @@ public class LocalFileIT extends TestSuiteBase {
"/csv/break_line.csv",
"/seatunnel/read/csv/break_line/break_line.csv",
container);
ContainerUtil.copyFileIntoContainers(
"/csv/csv_with_header1.csv",
"/seatunnel/read/csv/header/csv_with_header1.csv",
container);
ContainerUtil.copyFileIntoContainers(
"/csv/csv_with_header2.csv",
"/seatunnel/read/csv/header/csv_with_header2.csv",
container);

ContainerUtil.copyFileIntoContainers(
"/text/e2e_null_format.txt",
Expand All @@ -305,6 +313,7 @@ public void testLocalFileReadAndWrite(TestContainer container)
TestHelper helper = new TestHelper(container);
helper.execute("/csv/fake_to_local_csv.conf");
helper.execute("/csv/local_csv_to_assert.conf");
helper.execute("/csv/csv_with_header_to_assert.conf");
helper.execute("/csv/breakline_csv_to_assert.conf");
helper.execute("/excel/fake_to_local_excel.conf");
helper.execute("/excel/local_excel_to_assert.conf");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
name, id, is_female
tom,20,true
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
name, is_female, id
tommy,false,30
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
parallelism = 1
job.mode = "BATCH"
}

source {
LocalFile {
path = "/seatunnel/read/csv/header"
file_format_type = csv
csv_use_header_line = true
schema = {
fields {
id = int
name = string
is_female = boolean
}
}
}
}

sink {
Assert {
rules {
row_rules = [
{
rule_type = MAX_ROW
rule_value = 2
}
{
rule_type = MIN_ROW
rule_value = 2
}
]
field_rules = [
{
field_name = id
field_type = int
field_value = [
{
rule_type = NOT_NULL
}
]
}
{
field_name = name
field_type = string
field_value = [
{
rule_type = NOT_NULL
}
]
}
{
field_name = is_female
field_type = boolean
field_value = [
{
rule_type = NOT_NULL
}
]
}
]
}
}
}
Loading