Skip to content

Commit c66aae7

Browse files
committed
refactor: using Pipeline as an implementation of infer_schema for CSV and NDJSON
1 parent dd452b7 commit c66aae7

File tree

18 files changed

+540
-332
lines changed

18 files changed

+540
-332
lines changed

โ€Žsrc/common/storage/src/stage.rsโ€Ž

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ pub fn init_stage_operator(stage_info: &StageInfo) -> Result<Operator> {
9898
}
9999
/// select * from @s1/<path> (FILES => <files> PATTERN => <pattern>)
100100
/// copy from @s1/<path> FILES = <files> PATTERN => <pattern>
101-
#[derive(serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq, Debug)]
101+
#[derive(serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq, Debug, Hash)]
102102
pub struct StageFilesInfo {
103103
pub path: String,
104104
pub files: Option<Vec<String>>,

โ€Žsrc/meta/app/src/principal/file_format.rsโ€Ž

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ const OPT_BINARY_FORMAT: &str = "binary_format";
5252
const OPT_USE_LOGIC_TYPE: &str = "use_logic_type";
5353

5454
/// File format parameters after checking and parsing.
55-
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
55+
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
5656
#[serde(tag = "type")]
5757
pub enum FileFormatParams {
5858
Csv(CsvFileFormatParams),
@@ -446,7 +446,7 @@ impl FileFormatOptionsReader {
446446
}
447447
}
448448

449-
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
449+
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
450450
pub struct CsvFileFormatParams {
451451
pub compression: StageFileCompression,
452452

@@ -498,7 +498,7 @@ impl CsvFileFormatParams {
498498
}
499499
}
500500

501-
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
501+
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
502502
pub struct TsvFileFormatParams {
503503
pub compression: StageFileCompression,
504504
pub headers: u64,
@@ -532,7 +532,7 @@ impl TsvFileFormatParams {
532532
}
533533
}
534534

535-
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
535+
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
536536
pub struct XmlFileFormatParams {
537537
pub compression: StageFileCompression,
538538
pub row_tag: String,
@@ -558,7 +558,7 @@ impl Default for XmlFileFormatParams {
558558

559559
/// used for both `missing_field_as` and `null_field_as`
560560
/// for extensibility, it is stored as PB string in meta
561-
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
561+
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
562562
pub enum NullAs {
563563
/// for `missing_field_as` only, and is default for it for safety,
564564
/// in case of wrong field names when creating table.
@@ -570,7 +570,7 @@ pub enum NullAs {
570570
FieldDefault,
571571
}
572572

573-
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
573+
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
574574
pub enum EmptyFieldAs {
575575
#[default]
576576
Null,
@@ -638,7 +638,7 @@ impl Display for NullAs {
638638
}
639639
}
640640

641-
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
641+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
642642
pub enum BinaryFormat {
643643
#[default]
644644
Hex,
@@ -668,7 +668,7 @@ impl Display for BinaryFormat {
668668
}
669669
}
670670

671-
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
671+
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
672672
pub struct JsonFileFormatParams {
673673
pub compression: StageFileCompression,
674674
}
@@ -690,7 +690,7 @@ impl Default for JsonFileFormatParams {
690690
}
691691
}
692692

693-
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
693+
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
694694
pub struct NdJsonFileFormatParams {
695695
pub compression: StageFileCompression,
696696
pub missing_field_as: NullAs,
@@ -741,7 +741,7 @@ impl NdJsonFileFormatParams {
741741
}
742742
}
743743

744-
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
744+
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
745745
pub struct AvroFileFormatParams {
746746
pub compression: StageFileCompression,
747747
pub missing_field_as: NullAs,
@@ -791,7 +791,7 @@ impl AvroFileFormatParams {
791791
}
792792
}
793793

794-
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
794+
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
795795
pub struct ParquetFileFormatParams {
796796
// used only for unload
797797
pub compression: StageFileCompression,
@@ -828,7 +828,7 @@ impl ParquetFileFormatParams {
828828
}
829829
}
830830

831-
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
831+
#[derive(Clone, Debug, Default, PartialEq, Eq, Hash, Serialize, Deserialize)]
832832
pub struct OrcFileFormatParams {
833833
pub missing_field_as: NullAs,
834834
}

โ€Žsrc/meta/app/src/principal/user_stage.rsโ€Ž

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ pub const COPY_MAX_FILES_PER_COMMIT: usize = 15000;
6060
/// Instruction for exceeding 'copy into table' file limit.
6161
pub const COPY_MAX_FILES_COMMIT_MSG: &str = "Commit limit reached: 15,000 files for 'copy into table'. To handle more files, adjust 'CopyOption' with 'max_files=<num>'(e.g., 'max_files=10000') and perform several operations until all files are processed.";
6262

63-
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)]
63+
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq, Hash)]
6464
pub enum StageType {
6565
/// LegacyInternal will be deprecated.
6666
///
@@ -96,7 +96,7 @@ impl Default for StageType {
9696
}
9797
}
9898

99-
#[derive(serde::Serialize, serde::Deserialize, Clone, Copy, Debug, Eq, PartialEq)]
99+
#[derive(serde::Serialize, serde::Deserialize, Clone, Copy, Debug, Eq, PartialEq, Hash)]
100100
pub enum StageFileCompression {
101101
Auto,
102102
Gzip,
@@ -396,13 +396,13 @@ impl Display for FileFormatOptions {
396396
}
397397
}
398398

399-
#[derive(serde::Serialize, serde::Deserialize, Default, Clone, Debug, Eq, PartialEq)]
399+
#[derive(serde::Serialize, serde::Deserialize, Default, Clone, Debug, Eq, PartialEq, Hash)]
400400
#[serde(default)]
401401
pub struct StageParams {
402402
pub storage: StorageParams,
403403
}
404404

405-
#[derive(serde::Serialize, serde::Deserialize, Clone, Default, Debug, Eq, PartialEq)]
405+
#[derive(serde::Serialize, serde::Deserialize, Clone, Default, Debug, Eq, PartialEq, Hash)]
406406
#[serde(default)]
407407
pub struct CopyOptions {
408408
pub on_error: OnErrorMode,
@@ -419,7 +419,7 @@ pub struct CopyOptions {
419419
pub detailed_output: bool,
420420
}
421421

422-
#[derive(serde::Serialize, serde::Deserialize, Default, Clone, Debug, Eq, PartialEq)]
422+
#[derive(serde::Serialize, serde::Deserialize, Default, Clone, Debug, Eq, PartialEq, Hash)]
423423
#[serde(default)]
424424
pub struct StageInfo {
425425
pub stage_name: String,

โ€Žsrc/query/ast/src/ast/statements/copy.rsโ€Ž

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -648,7 +648,7 @@ impl Display for FileFormatValue {
648648
}
649649
}
650650

651-
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Drive, DriveMut, Eq)]
651+
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Drive, DriveMut, Eq, Hash)]
652652
pub enum OnErrorMode {
653653
Continue,
654654
SkipFileNum(u64),

0 commit comments

Comments
ย (0)