1313// limitations under the License.
1414
1515use std:: collections:: BTreeMap ;
16- use std:: io:: Cursor ;
1716use std:: sync:: Arc ;
1817
1918use arrow_csv:: reader:: Format ;
@@ -32,6 +31,7 @@ use databend_common_expression::FromData;
3231use databend_common_expression:: TableSchema ;
3332use databend_common_meta_app:: principal:: CsvFileFormatParams ;
3433use databend_common_meta_app:: principal:: FileFormatParams ;
34+ use databend_common_meta_app:: principal:: StageFileCompression ;
3535use databend_common_meta_app:: principal:: StageType ;
3636use databend_common_pipeline_core:: processors:: OutputPort ;
3737use databend_common_pipeline_core:: processors:: ProcessorPtr ;
@@ -145,6 +145,11 @@ impl AsyncSource for InferSchemaSource {
145145 TableSchema :: try_from ( & arrow_schema) ?
146146 }
147147 ( Some ( first_file) , FileFormatParams :: Csv ( params) ) => {
148+ if params. compression != StageFileCompression :: None {
149+ return Err ( ErrorCode :: InvalidCompressionData (
150+ "Compressed CSV files are not supported" ,
151+ ) ) ;
152+ }
148153 let arrow_schema = read_csv_metadata_async (
149154 & first_file. path ,
150155 & operator,
@@ -155,7 +160,12 @@ impl AsyncSource for InferSchemaSource {
155160 . await ?;
156161 TableSchema :: try_from ( & arrow_schema) ?
157162 }
158- ( Some ( first_file) , FileFormatParams :: NdJson ( _) ) => {
163+ ( Some ( first_file) , FileFormatParams :: NdJson ( params) ) => {
164+ if params. compression != StageFileCompression :: None {
165+ return Err ( ErrorCode :: InvalidCompressionData (
166+ "Compressed NDJSON files are not supported" ,
167+ ) ) ;
168+ }
159169 let arrow_schema = read_json_metadata_async (
160170 & first_file. path ,
161171 & operator,
@@ -167,7 +177,7 @@ impl AsyncSource for InferSchemaSource {
167177 }
168178 _ => {
169179 return Err ( ErrorCode :: BadArguments (
170- "infer_schema is currently limited to format Parquet" ,
180+ "infer_schema is currently limited to format Parquet, CSV and NDJSON " ,
171181 ) ) ;
172182 }
173183 } ;
@@ -214,7 +224,7 @@ pub async fn read_csv_metadata_async(
214224 } ;
215225
216226 // TODO: It would be better if it could be read in the form of Read trait
217- let buf = operator. read_with ( path) . range ( ..file_size) . await ?. to_vec ( ) ;
227+ let buf = operator. read_with ( path) . range ( ..file_size) . await ?;
218228 let mut format = Format :: default ( )
219229 . with_delimiter ( params. field_delimiter . as_bytes ( ) [ 0 ] )
220230 . with_quote ( params. quote . as_bytes ( ) [ 0 ] )
@@ -223,7 +233,7 @@ pub async fn read_csv_metadata_async(
223233 if let Some ( escape) = escape {
224234 format = format. with_escape ( escape) ;
225235 }
226- let ( schema, _) = format. infer_schema ( Cursor :: new ( & buf) , max_records) ?;
236+ let ( schema, _) = format. infer_schema ( buf, max_records) ?;
227237
228238 Ok ( schema)
229239}
@@ -239,8 +249,8 @@ pub async fn read_json_metadata_async(
239249 Some ( n) => n,
240250 } ;
241251 // TODO: It would be better if it could be read in the form of Read trait
242- let buf = operator. read_with ( path) . range ( ..file_size) . await ?. to_vec ( ) ;
243- let ( schema, _) = infer_json_schema ( Cursor :: new ( & buf) , max_records) ?;
252+ let buf = operator. read_with ( path) . range ( ..file_size) . await ?;
253+ let ( schema, _) = infer_json_schema ( buf, max_records) ?;
244254
245255 Ok ( schema)
246256}
0 commit comments