Skip to content

Commit fc6ce4b

Browse files
committed
feat: impl max_file_count for infer_schema
1 parent b26101e commit fc6ce4b

File tree

16 files changed

+240
-170
lines changed

16 files changed

+240
-170
lines changed

src/common/storage/src/stage.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ pub fn init_stage_operator(stage_info: &StageInfo) -> Result<Operator> {
9898
}
9999
/// select * from @s1/<path> (FILES => <files> PATTERN => <pattern>)
100100
/// copy from @s1/<path> FILES = <files> PATTERN => <pattern>
101-
#[derive(serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq, Debug, Hash)]
101+
#[derive(serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq, Debug)]
102102
pub struct StageFilesInfo {
103103
pub path: String,
104104
pub files: Option<Vec<String>>,

src/meta/app/src/principal/file_format.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ const OPT_BINARY_FORMAT: &str = "binary_format";
5252
const OPT_USE_LOGIC_TYPE: &str = "use_logic_type";
5353

5454
/// File format parameters after checking and parsing.
55-
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
55+
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
5656
#[serde(tag = "type")]
5757
pub enum FileFormatParams {
5858
Csv(CsvFileFormatParams),
@@ -446,7 +446,7 @@ impl FileFormatOptionsReader {
446446
}
447447
}
448448

449-
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
449+
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
450450
pub struct CsvFileFormatParams {
451451
pub compression: StageFileCompression,
452452

@@ -498,7 +498,7 @@ impl CsvFileFormatParams {
498498
}
499499
}
500500

501-
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
501+
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
502502
pub struct TsvFileFormatParams {
503503
pub compression: StageFileCompression,
504504
pub headers: u64,
@@ -532,7 +532,7 @@ impl TsvFileFormatParams {
532532
}
533533
}
534534

535-
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
535+
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
536536
pub struct XmlFileFormatParams {
537537
pub compression: StageFileCompression,
538538
pub row_tag: String,
@@ -558,7 +558,7 @@ impl Default for XmlFileFormatParams {
558558

559559
/// used for both `missing_field_as` and `null_field_as`
560560
/// for extensibility, it is stored as PB string in meta
561-
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
561+
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
562562
pub enum NullAs {
563563
/// for `missing_field_as` only, and is default for it for safety,
564564
/// in case of wrong field names when creating table.
@@ -570,7 +570,7 @@ pub enum NullAs {
570570
FieldDefault,
571571
}
572572

573-
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
573+
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
574574
pub enum EmptyFieldAs {
575575
#[default]
576576
Null,
@@ -638,7 +638,7 @@ impl Display for NullAs {
638638
}
639639
}
640640

641-
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
641+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
642642
pub enum BinaryFormat {
643643
#[default]
644644
Hex,
@@ -668,7 +668,7 @@ impl Display for BinaryFormat {
668668
}
669669
}
670670

671-
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
671+
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
672672
pub struct JsonFileFormatParams {
673673
pub compression: StageFileCompression,
674674
}
@@ -690,7 +690,7 @@ impl Default for JsonFileFormatParams {
690690
}
691691
}
692692

693-
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
693+
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
694694
pub struct NdJsonFileFormatParams {
695695
pub compression: StageFileCompression,
696696
pub missing_field_as: NullAs,
@@ -741,7 +741,7 @@ impl NdJsonFileFormatParams {
741741
}
742742
}
743743

744-
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
744+
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
745745
pub struct AvroFileFormatParams {
746746
pub compression: StageFileCompression,
747747
pub missing_field_as: NullAs,
@@ -791,7 +791,7 @@ impl AvroFileFormatParams {
791791
}
792792
}
793793

794-
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
794+
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
795795
pub struct ParquetFileFormatParams {
796796
// used only for unload
797797
pub compression: StageFileCompression,
@@ -828,7 +828,7 @@ impl ParquetFileFormatParams {
828828
}
829829
}
830830

831-
#[derive(Clone, Debug, Default, PartialEq, Eq, Hash, Serialize, Deserialize)]
831+
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
832832
pub struct OrcFileFormatParams {
833833
pub missing_field_as: NullAs,
834834
}

src/meta/app/src/principal/user_stage.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ pub const COPY_MAX_FILES_PER_COMMIT: usize = 15000;
6060
/// Instruction for exceeding 'copy into table' file limit.
6161
pub const COPY_MAX_FILES_COMMIT_MSG: &str = "Commit limit reached: 15,000 files for 'copy into table'. To handle more files, adjust 'CopyOption' with 'max_files=<num>'(e.g., 'max_files=10000') and perform several operations until all files are processed.";
6262

63-
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq, Hash)]
63+
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)]
6464
pub enum StageType {
6565
/// LegacyInternal will be deprecated.
6666
///
@@ -96,7 +96,7 @@ impl Default for StageType {
9696
}
9797
}
9898

99-
#[derive(serde::Serialize, serde::Deserialize, Clone, Copy, Debug, Eq, PartialEq, Hash)]
99+
#[derive(serde::Serialize, serde::Deserialize, Clone, Copy, Debug, Eq, PartialEq)]
100100
pub enum StageFileCompression {
101101
Auto,
102102
Gzip,
@@ -396,13 +396,13 @@ impl Display for FileFormatOptions {
396396
}
397397
}
398398

399-
#[derive(serde::Serialize, serde::Deserialize, Default, Clone, Debug, Eq, PartialEq, Hash)]
399+
#[derive(serde::Serialize, serde::Deserialize, Default, Clone, Debug, Eq, PartialEq)]
400400
#[serde(default)]
401401
pub struct StageParams {
402402
pub storage: StorageParams,
403403
}
404404

405-
#[derive(serde::Serialize, serde::Deserialize, Clone, Default, Debug, Eq, PartialEq, Hash)]
405+
#[derive(serde::Serialize, serde::Deserialize, Clone, Default, Debug, Eq, PartialEq)]
406406
#[serde(default)]
407407
pub struct CopyOptions {
408408
pub on_error: OnErrorMode,
@@ -419,7 +419,7 @@ pub struct CopyOptions {
419419
pub detailed_output: bool,
420420
}
421421

422-
#[derive(serde::Serialize, serde::Deserialize, Default, Clone, Debug, Eq, PartialEq, Hash)]
422+
#[derive(serde::Serialize, serde::Deserialize, Default, Clone, Debug, Eq, PartialEq)]
423423
#[serde(default)]
424424
pub struct StageInfo {
425425
pub stage_name: String,

src/query/ast/src/ast/statements/copy.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -648,9 +648,7 @@ impl Display for FileFormatValue {
648648
}
649649
}
650650

651-
#[derive(
652-
serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Drive, DriveMut, Eq, Hash,
653-
)]
651+
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Drive, DriveMut, Eq)]
654652
pub enum OnErrorMode {
655653
Continue,
656654
SkipFileNum(u64),

src/query/service/src/table_functions/infer_schema/infer_schema_table.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ impl InferSchemaTable {
102102
TableField::new("column_name", TableDataType::String),
103103
TableField::new("type", TableDataType::String),
104104
TableField::new("nullable", TableDataType::Boolean),
105+
TableField::new("filenames", TableDataType::String),
105106
TableField::new("order_id", TableDataType::Number(NumberDataType::UInt64)),
106107
])
107108
}

src/query/service/src/table_functions/infer_schema/parquet.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use databend_common_storage::init_stage_operator;
3232
use databend_common_storage::read_parquet_schema_async_rs;
3333
use databend_common_storage::StageFileInfo;
3434
use futures_util::future::try_join_all;
35+
use itertools::Itertools;
3536

3637
use crate::table_functions::infer_schema::infer_schema_table::INFER_SCHEMA;
3738

@@ -79,13 +80,20 @@ impl AsyncSource for ParquetInferSchemaSource {
7980
let mut names: Vec<String> = vec![];
8081
let mut types: Vec<String> = vec![];
8182
let mut nulls: Vec<bool> = vec![];
83+
let mut filenames: Vec<String> = vec![];
84+
let filenames_str = self
85+
.stage_file_infos
86+
.iter()
87+
.map(|info| &info.path)
88+
.join(", ");
8289

8390
for field in table_schema.fields().iter() {
8491
names.push(field.name().to_string());
8592

8693
let non_null_type = field.data_type().remove_recursive_nullable();
8794
types.push(non_null_type.sql_name());
8895
nulls.push(field.is_nullable());
96+
filenames.push(filenames_str.clone());
8997
}
9098

9199
let order_ids = (0..table_schema.fields().len() as u64).collect::<Vec<_>>();
@@ -94,6 +102,7 @@ impl AsyncSource for ParquetInferSchemaSource {
94102
StringType::from_data(names),
95103
StringType::from_data(types),
96104
BooleanType::from_data(nulls),
105+
StringType::from_data(filenames),
97106
UInt64Type::from_data(order_ids),
98107
]);
99108
Ok(Some(block))

src/query/service/src/table_functions/infer_schema/separator.rs

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,19 @@ use databend_common_expression::TableSchema;
3232
use databend_common_meta_app::principal::FileFormatParams;
3333
use databend_common_pipeline_transforms::AccumulatingTransform;
3434
use databend_common_storages_stage::BytesBatch;
35+
use itertools::Itertools;
3536

3637
use crate::table_functions::infer_schema::merge::merge_schema;
3738

39+
const MAX_SINGLE_FILE_BYTES: usize = 100 * 1024 * 1024;
40+
3841
pub struct InferSchemaSeparator {
3942
pub file_format_params: FileFormatParams,
4043
files: HashMap<String, Vec<u8>>,
4144
pub max_records: Option<usize>,
4245
schemas: Option<TableSchema>,
43-
remaining_files_len: usize,
46+
files_len: usize,
47+
filenames: Vec<String>,
4448
is_finished: bool,
4549
}
4650

@@ -55,7 +59,8 @@ impl InferSchemaSeparator {
5559
files: HashMap::new(),
5660
max_records,
5761
schemas: None,
58-
remaining_files_len: files_len,
62+
files_len,
63+
filenames: Vec::with_capacity(files_len),
5964
is_finished: false,
6065
}
6166
}
@@ -76,6 +81,14 @@ impl AccumulatingTransform for InferSchemaSeparator {
7681
let bytes = self.files.entry(batch.path.clone()).or_default();
7782
bytes.extend(batch.data);
7883

84+
if bytes.len() > MAX_SINGLE_FILE_BYTES {
85+
return Err(ErrorCode::InvalidArgument(format!(
86+
"The file '{}' is too large(maximum allowed: {})",
87+
batch.path,
88+
human_readable_size(MAX_SINGLE_FILE_BYTES),
89+
)));
90+
}
91+
7992
// When max_records exists, it will try to use the current bytes to read, otherwise it will buffer all bytes
8093
if self.max_records.is_none() && !batch.is_eof {
8194
return Ok(vec![DataBlock::empty()]);
@@ -138,15 +151,15 @@ impl AccumulatingTransform for InferSchemaSeparator {
138151
}
139152
};
140153
self.files.remove(&batch.path);
154+
self.filenames.push(batch.path);
141155

142156
let merge_schema = match self.schemas.take() {
143157
None => TableSchema::try_from(&arrow_schema)?,
144158
Some(schema) => merge_schema(schema, TableSchema::try_from(&arrow_schema)?),
145159
};
146160
self.schemas = Some(merge_schema);
147161

148-
self.remaining_files_len = self.remaining_files_len.saturating_sub(1);
149-
if self.remaining_files_len > 0 {
162+
if self.files_len > self.filenames.len() {
150163
return Ok(vec![DataBlock::empty()]);
151164
}
152165
self.is_finished = true;
@@ -157,13 +170,16 @@ impl AccumulatingTransform for InferSchemaSeparator {
157170
let mut names: Vec<String> = vec![];
158171
let mut types: Vec<String> = vec![];
159172
let mut nulls: Vec<bool> = vec![];
173+
let mut filenames: Vec<String> = vec![];
174+
let filenames_str = self.filenames.iter().join(", ");
160175

161176
for field in table_schema.fields().iter() {
162177
names.push(field.name().to_string());
163178

164179
let non_null_type = field.data_type().remove_recursive_nullable();
165180
types.push(non_null_type.sql_name());
166181
nulls.push(field.is_nullable());
182+
filenames.push(filenames_str.clone());
167183
}
168184

169185
let order_ids = (0..table_schema.fields().len() as u64).collect::<Vec<_>>();
@@ -172,8 +188,26 @@ impl AccumulatingTransform for InferSchemaSeparator {
172188
StringType::from_data(names),
173189
StringType::from_data(types),
174190
BooleanType::from_data(nulls),
191+
StringType::from_data(filenames),
175192
UInt64Type::from_data(order_ids),
176193
]);
177194
Ok(vec![block])
178195
}
179196
}
197+
198+
fn human_readable_size(bytes: usize) -> String {
199+
const KB: f64 = 1024.0;
200+
const MB: f64 = KB * 1024.0;
201+
const GB: f64 = MB * 1024.0;
202+
203+
let b = bytes as f64;
204+
if b >= GB {
205+
format!("{:.2} GB", b / GB)
206+
} else if b >= MB {
207+
format!("{:.2} MB", b / MB)
208+
} else if b >= KB {
209+
format!("{:.2} KB", b / KB)
210+
} else {
211+
format!("{} B", bytes)
212+
}
213+
}

src/query/storages/stage/src/infer_schema.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,6 @@
1313
// limitations under the License.
1414

1515
use std::any::Any;
16-
use std::hash::DefaultHasher;
17-
use std::hash::Hash;
18-
use std::hash::Hasher;
1916
use std::sync::Arc;
2017

2118
use databend_common_catalog::plan::PartInfo;
@@ -48,11 +45,7 @@ impl PartInfo for InferSchemaPartInfo {
4845
}
4946

5047
fn hash(&self) -> u64 {
51-
let mut s = DefaultHasher::new();
52-
self.files_info.hash(&mut s);
53-
self.file_format_params.hash(&mut s);
54-
self.stage_info.hash(&mut s);
55-
s.finish()
48+
0
5649
}
5750

5851
fn part_type(&self) -> PartInfoType {
Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
col1,col2,col3,col4,col5
22
0,1,2,3,4
33
5,6,7,8,9
4-
10,11,12,13,14
5-
a,b,c,d,e
4+
10,11,12,13,14
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{"id": 1, "value": 100}
2+
{"id": 2, "value": 200}
3+
{"id": 3, "value": 300}

0 commit comments

Comments
 (0)