2323import org .apache .commons .lang3 .StringUtils ;
2424import org .apache .commons .lang3 .tuple .Pair ;
2525import org .apache .hadoop .conf .Configuration ;
26- import org .apache .hadoop .fs .CommonConfigurationKeys ;
2726import org .apache .hadoop .fs .FSDataInputStream ;
2827import org .apache .hadoop .fs .FSDataOutputStream ;
2928import org .apache .hadoop .fs .FileStatus ;
4342import java .util .ArrayList ;
4443import java .util .List ;
4544
46- import static org .apache .parquet .avro .AvroReadSupport .READ_INT96_AS_FIXED ;
47- import static org .apache .parquet .avro .AvroSchemaConverter .ADD_LIST_ELEMENT_RECORDS ;
48- import static org .apache .parquet .avro .AvroWriteSupport .WRITE_FIXED_AS_INT96 ;
49- import static org .apache .parquet .avro .AvroWriteSupport .WRITE_OLD_LIST_STRUCTURE ;
50-
5145@ Slf4j
5246public class HadoopFileSystemProxy implements Serializable , Closeable {
5347
@@ -64,30 +58,19 @@ public HadoopFileSystemProxy(@NonNull HadoopConf hadoopConf) {
6458 }
6559
6660 public boolean fileExist (@ NonNull String filePath ) throws IOException {
67- if (fileSystem == null ) {
68- initialize ();
69- }
70- Path fileName = new Path (filePath );
71- return fileSystem .exists (fileName );
61+ return getFileSystem ().exists (new Path (filePath ));
7262 }
7363
7464 public void createFile (@ NonNull String filePath ) throws IOException {
75- if (fileSystem == null ) {
76- initialize ();
77- }
78- Path path = new Path (filePath );
79- if (!fileSystem .createNewFile (path )) {
65+ if (!getFileSystem ().createNewFile (new Path (filePath ))) {
8066 throw CommonError .fileOperationFailed ("SeaTunnel" , "create" , filePath );
8167 }
8268 }
8369
8470 public void deleteFile (@ NonNull String filePath ) throws IOException {
85- if (fileSystem == null ) {
86- initialize ();
87- }
8871 Path path = new Path (filePath );
89- if (fileSystem .exists (path )) {
90- if (!fileSystem .delete (path , true )) {
72+ if (getFileSystem () .exists (path )) {
73+ if (!getFileSystem () .delete (path , true )) {
9174 throw CommonError .fileOperationFailed ("SeaTunnel" , "delete" , filePath );
9275 }
9376 }
@@ -98,9 +81,6 @@ public void renameFile(
9881 @ NonNull String newFilePath ,
9982 boolean removeWhenNewFilePathExist )
10083 throws IOException {
101- if (fileSystem == null ) {
102- initialize ();
103- }
10484 Path oldPath = new Path (oldFilePath );
10585 Path newPath = new Path (newFilePath );
10686
@@ -116,15 +96,15 @@ public void renameFile(
11696
11797 if (removeWhenNewFilePathExist ) {
11898 if (fileExist (newFilePath )) {
119- fileSystem .delete (newPath , true );
99+ getFileSystem () .delete (newPath , true );
120100 log .info ("Delete already file: {}" , newPath );
121101 }
122102 }
123103 if (!fileExist (newPath .getParent ().toString ())) {
124104 createDir (newPath .getParent ().toString ());
125105 }
126106
127- if (fileSystem .rename (oldPath , newPath )) {
107+ if (getFileSystem () .rename (oldPath , newPath )) {
128108 log .info ("rename file :[" + oldPath + "] to [" + newPath + "] finish" );
129109 } else {
130110 throw CommonError .fileOperationFailed (
@@ -133,42 +113,33 @@ public void renameFile(
133113 }
134114
135115 public void createDir (@ NonNull String filePath ) throws IOException {
136- if (fileSystem == null ) {
137- initialize ();
138- }
139116 Path dfs = new Path (filePath );
140- if (!fileSystem .mkdirs (dfs )) {
117+ if (!getFileSystem () .mkdirs (dfs )) {
141118 throw CommonError .fileOperationFailed ("SeaTunnel" , "create" , filePath );
142119 }
143120 }
144121
145122 public List <LocatedFileStatus > listFile (String path ) throws IOException {
146- if (fileSystem == null ) {
147- initialize ();
148- }
149123 List <LocatedFileStatus > fileList = new ArrayList <>();
150124 if (!fileExist (path )) {
151125 return fileList ;
152126 }
153127 Path fileName = new Path (path );
154128 RemoteIterator <LocatedFileStatus > locatedFileStatusRemoteIterator =
155- fileSystem .listFiles (fileName , false );
129+ getFileSystem () .listFiles (fileName , false );
156130 while (locatedFileStatusRemoteIterator .hasNext ()) {
157131 fileList .add (locatedFileStatusRemoteIterator .next ());
158132 }
159133 return fileList ;
160134 }
161135
162136 public List <Path > getAllSubFiles (@ NonNull String filePath ) throws IOException {
163- if (fileSystem == null ) {
164- initialize ();
165- }
166137 List <Path > pathList = new ArrayList <>();
167138 if (!fileExist (filePath )) {
168139 return pathList ;
169140 }
170141 Path fileName = new Path (filePath );
171- FileStatus [] status = fileSystem .listStatus (fileName );
142+ FileStatus [] status = getFileSystem () .listStatus (fileName );
172143 if (status != null ) {
173144 for (FileStatus fileStatus : status ) {
174145 if (fileStatus .isDirectory ()) {
@@ -180,31 +151,26 @@ public List<Path> getAllSubFiles(@NonNull String filePath) throws IOException {
180151 }
181152
182153 public FileStatus [] listStatus (String filePath ) throws IOException {
183- if (fileSystem == null ) {
184- initialize ();
185- }
186- return fileSystem .listStatus (new Path (filePath ));
154+ return getFileSystem ().listStatus (new Path (filePath ));
187155 }
188156
189157 public FileStatus getFileStatus (String filePath ) throws IOException {
190- if (fileSystem == null ) {
191- initialize ();
192- }
193- return fileSystem .getFileStatus (new Path (filePath ));
158+ return getFileSystem ().getFileStatus (new Path (filePath ));
194159 }
195160
196161 public FSDataOutputStream getOutputStream (String filePath ) throws IOException {
197- if (fileSystem == null ) {
198- initialize ();
199- }
200- return fileSystem .create (new Path (filePath ), true );
162+ return getFileSystem ().create (new Path (filePath ), true );
201163 }
202164
203165 public FSDataInputStream getInputStream (String filePath ) throws IOException {
166+ return getFileSystem ().open (new Path (filePath ));
167+ }
168+
169+ public FileSystem getFileSystem () {
204170 if (fileSystem == null ) {
205171 initialize ();
206172 }
207- return fileSystem . open ( new Path ( filePath )) ;
173+ return fileSystem ;
208174 }
209175
210176 @ SneakyThrows
@@ -258,16 +224,7 @@ private void initialize() {
258224 }
259225
260226 private Configuration createConfiguration () {
261- Configuration configuration = new Configuration ();
262- configuration .setBoolean (READ_INT96_AS_FIXED , true );
263- configuration .setBoolean (WRITE_FIXED_AS_INT96 , true );
264- configuration .setBoolean (ADD_LIST_ELEMENT_RECORDS , false );
265- configuration .setBoolean (WRITE_OLD_LIST_STRUCTURE , true );
266- configuration .set (CommonConfigurationKeys .FS_DEFAULT_NAME_KEY , hadoopConf .getHdfsNameKey ());
267- configuration .setBoolean (
268- String .format ("fs.%s.impl.disable.cache" , hadoopConf .getSchema ()), true );
269- configuration .set (
270- String .format ("fs.%s.impl" , hadoopConf .getSchema ()), hadoopConf .getFsHdfsImpl ());
227+ Configuration configuration = hadoopConf .toConfiguration ();
271228 hadoopConf .setExtraOptionsForConfiguration (configuration );
272229 return configuration ;
273230 }
0 commit comments