Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions release-note.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
- [Connector-v2] [Clickhouse] fix get clickhouse local table name with closing bracket from distributed table engineFull (#4710)
- [Connector-v2] [CDC] Fix jdbc connection leak for mysql (#5037)
- [Connector-v2] [File] Fix WriteStrategy parallel writing thread unsafe issue #5546
- [Connector-v2] [File] Inject FileSystem to OrcWriteStrategy
- [Connector-v2] [File] Support assign encoding for file source/sink (#5973)

### Zeta(ST-Engine)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.file.config;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.Path;

import lombok.Data;
Expand All @@ -26,6 +27,11 @@
import java.util.HashMap;
import java.util.Map;

import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED;
import static org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;

@Data
public class HadoopConf implements Serializable {
private static final String HDFS_IMPL = "org.apache.hadoop.hdfs.DistributedFileSystem";
Expand Down Expand Up @@ -60,4 +66,16 @@ public void setExtraOptionsForConfiguration(Configuration configuration) {
configuration.addResource(new Path(hdfsSitePath));
}
}

public Configuration toConfiguration() {
Configuration configuration = new Configuration();
configuration.setBoolean(READ_INT96_AS_FIXED, true);
configuration.setBoolean(WRITE_FIXED_AS_INT96, true);
configuration.setBoolean(ADD_LIST_ELEMENT_RECORDS, false);
configuration.setBoolean(WRITE_OLD_LIST_STRUCTURE, true);
configuration.setBoolean(String.format("fs.%s.impl.disable.cache", getSchema()), true);
configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, getHdfsNameKey());
configuration.set(String.format("fs.%s.impl", getSchema()), getFsHdfsImpl());
return configuration;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
Expand All @@ -43,11 +42,6 @@
import java.util.ArrayList;
import java.util.List;

import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED;
import static org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;

@Slf4j
public class HadoopFileSystemProxy implements Serializable, Closeable {

Expand All @@ -64,30 +58,19 @@ public HadoopFileSystemProxy(@NonNull HadoopConf hadoopConf) {
}

public boolean fileExist(@NonNull String filePath) throws IOException {
if (fileSystem == null) {
initialize();
}
Path fileName = new Path(filePath);
return fileSystem.exists(fileName);
return getFileSystem().exists(new Path(filePath));
}

public void createFile(@NonNull String filePath) throws IOException {
if (fileSystem == null) {
initialize();
}
Path path = new Path(filePath);
if (!fileSystem.createNewFile(path)) {
if (!getFileSystem().createNewFile(new Path(filePath))) {
throw CommonError.fileOperationFailed("SeaTunnel", "create", filePath);
}
}

public void deleteFile(@NonNull String filePath) throws IOException {
if (fileSystem == null) {
initialize();
}
Path path = new Path(filePath);
if (fileSystem.exists(path)) {
if (!fileSystem.delete(path, true)) {
if (getFileSystem().exists(path)) {
if (!getFileSystem().delete(path, true)) {
throw CommonError.fileOperationFailed("SeaTunnel", "delete", filePath);
}
}
Expand All @@ -98,9 +81,6 @@ public void renameFile(
@NonNull String newFilePath,
boolean removeWhenNewFilePathExist)
throws IOException {
if (fileSystem == null) {
initialize();
}
Path oldPath = new Path(oldFilePath);
Path newPath = new Path(newFilePath);

Expand All @@ -116,15 +96,15 @@ public void renameFile(

if (removeWhenNewFilePathExist) {
if (fileExist(newFilePath)) {
fileSystem.delete(newPath, true);
getFileSystem().delete(newPath, true);
log.info("Delete already file: {}", newPath);
}
}
if (!fileExist(newPath.getParent().toString())) {
createDir(newPath.getParent().toString());
}

if (fileSystem.rename(oldPath, newPath)) {
if (getFileSystem().rename(oldPath, newPath)) {
log.info("rename file :[" + oldPath + "] to [" + newPath + "] finish");
} else {
throw CommonError.fileOperationFailed(
Expand All @@ -133,42 +113,33 @@ public void renameFile(
}

public void createDir(@NonNull String filePath) throws IOException {
if (fileSystem == null) {
initialize();
}
Path dfs = new Path(filePath);
if (!fileSystem.mkdirs(dfs)) {
if (!getFileSystem().mkdirs(dfs)) {
throw CommonError.fileOperationFailed("SeaTunnel", "create", filePath);
}
}

public List<LocatedFileStatus> listFile(String path) throws IOException {
if (fileSystem == null) {
initialize();
}
List<LocatedFileStatus> fileList = new ArrayList<>();
if (!fileExist(path)) {
return fileList;
}
Path fileName = new Path(path);
RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator =
fileSystem.listFiles(fileName, false);
getFileSystem().listFiles(fileName, false);
while (locatedFileStatusRemoteIterator.hasNext()) {
fileList.add(locatedFileStatusRemoteIterator.next());
}
return fileList;
}

public List<Path> getAllSubFiles(@NonNull String filePath) throws IOException {
if (fileSystem == null) {
initialize();
}
List<Path> pathList = new ArrayList<>();
if (!fileExist(filePath)) {
return pathList;
}
Path fileName = new Path(filePath);
FileStatus[] status = fileSystem.listStatus(fileName);
FileStatus[] status = getFileSystem().listStatus(fileName);
if (status != null) {
for (FileStatus fileStatus : status) {
if (fileStatus.isDirectory()) {
Expand All @@ -180,31 +151,26 @@ public List<Path> getAllSubFiles(@NonNull String filePath) throws IOException {
}

public FileStatus[] listStatus(String filePath) throws IOException {
if (fileSystem == null) {
initialize();
}
return fileSystem.listStatus(new Path(filePath));
return getFileSystem().listStatus(new Path(filePath));
}

public FileStatus getFileStatus(String filePath) throws IOException {
if (fileSystem == null) {
initialize();
}
return fileSystem.getFileStatus(new Path(filePath));
return getFileSystem().getFileStatus(new Path(filePath));
}

public FSDataOutputStream getOutputStream(String filePath) throws IOException {
if (fileSystem == null) {
initialize();
}
return fileSystem.create(new Path(filePath), true);
return getFileSystem().create(new Path(filePath), true);
}

public FSDataInputStream getInputStream(String filePath) throws IOException {
return getFileSystem().open(new Path(filePath));
}

public FileSystem getFileSystem() {
if (fileSystem == null) {
initialize();
}
return fileSystem.open(new Path(filePath));
return fileSystem;
}

@SneakyThrows
Expand Down Expand Up @@ -258,16 +224,7 @@ private void initialize() {
}

private Configuration createConfiguration() {
Configuration configuration = new Configuration();
configuration.setBoolean(READ_INT96_AS_FIXED, true);
configuration.setBoolean(WRITE_FIXED_AS_INT96, true);
configuration.setBoolean(ADD_LIST_ELEMENT_RECORDS, false);
configuration.setBoolean(WRITE_OLD_LIST_STRUCTURE, true);
configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey());
configuration.setBoolean(
String.format("fs.%s.impl.disable.cache", hadoopConf.getSchema()), true);
configuration.set(
String.format("fs.%s.impl", hadoopConf.getSchema()), hadoopConf.getFsHdfsImpl());
Configuration configuration = hadoopConf.toConfiguration();
hadoopConf.setExtraOptionsForConfiguration(configuration);
return configuration;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -58,11 +57,6 @@
import java.util.regex.Matcher;
import java.util.stream.Collectors;

import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED;
import static org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;

public abstract class AbstractWriteStrategy implements WriteStrategy {
protected final Logger log = LoggerFactory.getLogger(this.getClass());
protected final FileSinkConfig fileSinkConfig;
Expand Down Expand Up @@ -148,14 +142,7 @@ protected SeaTunnelRowType buildSchemaWithRowType(
*/
@Override
public Configuration getConfiguration(HadoopConf hadoopConf) {
Configuration configuration = new Configuration();
configuration.setBoolean(READ_INT96_AS_FIXED, true);
configuration.setBoolean(WRITE_FIXED_AS_INT96, true);
configuration.setBoolean(ADD_LIST_ELEMENT_RECORDS, false);
configuration.setBoolean(WRITE_OLD_LIST_STRUCTURE, false);
configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey());
configuration.set(
String.format("fs.%s.impl", hadoopConf.getSchema()), hadoopConf.getFsHdfsImpl());
Configuration configuration = hadoopConf.toConfiguration();
this.hadoopConf.setExtraOptionsForConfiguration(configuration);
return configuration;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ private Writer getOrCreateWriter(@NonNull String filePath) {
.compress(compressFormat.getOrcCompression())
// use orc version 0.12
.version(OrcFile.Version.V_0_12)
.fileSystem(hadoopFileSystemProxy.getFileSystem())
.overwrite(true);
Writer newWriter = OrcFile.createWriter(path, options);
this.beingWrittenWriter.put(filePath, newWriter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,25 +139,33 @@ private ParquetWriter<GenericRecord> getOrCreateWriter(@NonNull String filePath)
dataModel.addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion());
if (writer == null) {
Path path = new Path(filePath);
try {
HadoopOutputFile outputFile =
HadoopOutputFile.fromPath(path, getConfiguration(hadoopConf));
ParquetWriter<GenericRecord> newWriter =
AvroParquetWriter.<GenericRecord>builder(outputFile)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withDataModel(dataModel)
// use parquet v1 to improve compatibility
.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
.withCompressionCodec(compressFormat.getParquetCompression())
.withSchema(schema)
.build();
this.beingWrittenWriter.put(filePath, newWriter);
return newWriter;
} catch (IOException e) {
String errorMsg = String.format("Get parquet writer for file [%s] error", filePath);
throw new FileConnectorException(
CommonErrorCodeDeprecated.WRITER_OPERATION_FAILED, errorMsg, e);
}
// initialize the kerberos login
return hadoopFileSystemProxy.doWithHadoopAuth(
(configuration, userGroupInformation) -> {
try {
HadoopOutputFile outputFile =
HadoopOutputFile.fromPath(path, getConfiguration(hadoopConf));
ParquetWriter<GenericRecord> newWriter =
AvroParquetWriter.<GenericRecord>builder(outputFile)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withDataModel(dataModel)
// use parquet v1 to improve compatibility
.withWriterVersion(
ParquetProperties.WriterVersion.PARQUET_1_0)
.withCompressionCodec(
compressFormat.getParquetCompression())
.withSchema(schema)
.build();
this.beingWrittenWriter.put(filePath, newWriter);
return newWriter;
} catch (IOException e) {
String errorMsg =
String.format(
"Get parquet writer for file [%s] error", filePath);
throw new FileConnectorException(
CommonErrorCodeDeprecated.WRITER_OPERATION_FAILED, errorMsg, e);
}
});
}
return writer;
}
Expand Down