diff --git a/src/query/formats/src/output_format/parquet.rs b/src/query/formats/src/output_format/parquet.rs index 61d5e39470811..c95921cf5e5f4 100644 --- a/src/query/formats/src/output_format/parquet.rs +++ b/src/query/formats/src/output_format/parquet.rs @@ -53,7 +53,15 @@ impl OutputFormat for ParquetOutputFormat { return Ok(vec![]); } let mut buf = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE); - let _ = blocks_to_parquet(&self.schema, blocks, &mut buf, TableCompression::Zstd, None)?; + // While unloading data as parquet, enable dictionary unconditionally, usually this leads to smaller size. + let _ = blocks_to_parquet( + &self.schema, + blocks, + &mut buf, + TableCompression::Zstd, + true, + None, + )?; Ok(buf) } } diff --git a/src/query/service/src/interpreters/common/table_option_validation.rs b/src/query/service/src/interpreters/common/table_option_validation.rs index 1b4e379b38a0e..85dd7124e3906 100644 --- a/src/query/service/src/interpreters/common/table_option_validation.rs +++ b/src/query/service/src/interpreters/common/table_option_validation.rs @@ -32,6 +32,7 @@ use databend_common_storages_fuse::FUSE_OPT_KEY_DATA_RETENTION_NUM_SNAPSHOTS_TO_ use databend_common_storages_fuse::FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS; use databend_common_storages_fuse::FUSE_OPT_KEY_ENABLE_AUTO_ANALYZE; use databend_common_storages_fuse::FUSE_OPT_KEY_ENABLE_AUTO_VACUUM; +use databend_common_storages_fuse::FUSE_OPT_KEY_ENABLE_PARQUET_DICTIONARY; use databend_common_storages_fuse::FUSE_OPT_KEY_FILE_SIZE; use databend_common_storages_fuse::FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD; use databend_common_storages_fuse::FUSE_OPT_KEY_ROW_PER_BLOCK; @@ -89,6 +90,7 @@ pub static CREATE_FUSE_OPTIONS: LazyLock> = LazyLock::new( r.insert(OPT_KEY_TEMP_PREFIX); r.insert(OPT_KEY_SEGMENT_FORMAT); r.insert(OPT_KEY_ENABLE_COPY_DEDUP_FULL_PATH); + r.insert(FUSE_OPT_KEY_ENABLE_PARQUET_DICTIONARY); r }); @@ -259,3 +261,19 @@ where } Ok(()) } + +pub fn is_valid_fuse_parquet_dictionary_opt( + options: &BTreeMap, +) -> databend_common_exception::Result<()> { + is_valid_bool_opt(FUSE_OPT_KEY_ENABLE_PARQUET_DICTIONARY, options) +} + +fn is_valid_bool_opt( + key: &str, + options: &BTreeMap, +) -> databend_common_exception::Result<()> { + if let Some(value) = options.get(key) { + value.parse::()?; + } + Ok(()) +} diff --git a/src/query/service/src/interpreters/interpreter_table_create.rs b/src/query/service/src/interpreters/interpreter_table_create.rs index 104fe7d5f91da..dc518ea338eca 100644 --- a/src/query/service/src/interpreters/interpreter_table_create.rs +++ b/src/query/service/src/interpreters/interpreter_table_create.rs @@ -71,6 +71,7 @@ use crate::interpreters::common::table_option_validation::is_valid_bloom_index_c use crate::interpreters::common::table_option_validation::is_valid_change_tracking; use crate::interpreters::common::table_option_validation::is_valid_create_opt; use crate::interpreters::common::table_option_validation::is_valid_data_retention_period; +use crate::interpreters::common::table_option_validation::is_valid_fuse_parquet_dictionary_opt; use crate::interpreters::common::table_option_validation::is_valid_option_of_type; use crate::interpreters::common::table_option_validation::is_valid_random_seed; use crate::interpreters::common::table_option_validation::is_valid_row_per_block; @@ -455,6 +456,8 @@ impl CreateTableInterpreter { is_valid_random_seed(&table_meta.options)?; // check table level data_retention_period_in_hours is_valid_data_retention_period(&table_meta.options)?; + // check enable_parquet_encoding + is_valid_fuse_parquet_dictionary_opt(&table_meta.options)?; // Same as settings of FUSE_OPT_KEY_ENABLE_AUTO_VACUUM, expect value type is unsigned integer is_valid_option_of_type::(&table_meta.options, FUSE_OPT_KEY_ENABLE_AUTO_VACUUM)?; diff --git a/src/query/service/src/interpreters/interpreter_table_set_options.rs b/src/query/service/src/interpreters/interpreter_table_set_options.rs index 62388fa8f4651..fd7ca4cb78caf 100644 --- a/src/query/service/src/interpreters/interpreter_table_set_options.rs +++ b/src/query/service/src/interpreters/interpreter_table_set_options.rs @@ -56,6 +56,7 @@ use crate::interpreters::common::table_option_validation::is_valid_block_per_seg use crate::interpreters::common::table_option_validation::is_valid_bloom_index_columns; use crate::interpreters::common::table_option_validation::is_valid_create_opt; use crate::interpreters::common::table_option_validation::is_valid_data_retention_period; +use crate::interpreters::common::table_option_validation::is_valid_fuse_parquet_dictionary_opt; use crate::interpreters::common::table_option_validation::is_valid_option_of_type; use crate::interpreters::common::table_option_validation::is_valid_row_per_block; use crate::interpreters::Interpreter; @@ -95,6 +96,8 @@ impl Interpreter for SetOptionsInterpreter { is_valid_row_per_block(&self.plan.set_options)?; // check data_retention_period is_valid_data_retention_period(&self.plan.set_options)?; + // check enable_parquet_encoding + is_valid_fuse_parquet_dictionary_opt(&self.plan.set_options)?; // check storage_format let error_str = "invalid opt for fuse table in alter table statement"; diff --git a/src/query/service/src/test_kits/block_writer.rs b/src/query/service/src/test_kits/block_writer.rs index f1e88af79db8b..f9a7ee9f12918 100644 --- a/src/query/service/src/test_kits/block_writer.rs +++ b/src/query/service/src/test_kits/block_writer.rs @@ -157,6 +157,7 @@ impl<'a> BlockWriter<'a> { vec![index_block], &mut data, TableCompression::None, + false, None, )?; let size = data.len() as u64; diff --git a/src/query/storages/basic/src/result_cache/write/writer.rs b/src/query/storages/basic/src/result_cache/write/writer.rs index c281229b79ff6..9a654a0f2b7b3 100644 --- a/src/query/storages/basic/src/result_cache/write/writer.rs +++ b/src/query/storages/basic/src/result_cache/write/writer.rs @@ -77,6 +77,7 @@ impl ResultCacheWriter { self.blocks.clone(), &mut buf, TableCompression::None, + false, None, )?; diff --git a/src/query/storages/common/blocks/src/parquet_rs.rs b/src/query/storages/common/blocks/src/parquet_rs.rs index 06108ed4f05d8..6da54c70d985d 100644 --- a/src/query/storages/common/blocks/src/parquet_rs.rs +++ b/src/query/storages/common/blocks/src/parquet_rs.rs @@ -23,6 +23,8 @@ use parquet::basic::Encoding; use parquet::file::metadata::KeyValue; use parquet::file::properties::EnabledStatistics; use parquet::file::properties::WriterProperties; +use parquet::file::properties::WriterPropertiesBuilder; +use parquet::file::properties::WriterVersion; use parquet::format::FileMetaData; /// Serialize data blocks to parquet format. @@ -31,19 +33,13 @@ pub fn blocks_to_parquet( blocks: Vec, write_buffer: &mut Vec, compression: TableCompression, + enable_dictionary: bool, metadata: Option>, ) -> Result { assert!(!blocks.is_empty()); - let props = WriterProperties::builder() - .set_compression(compression.into()) - // use `usize::MAX` to effectively limit the number of row groups to 1 - .set_max_row_group_size(usize::MAX) - .set_encoding(Encoding::PLAIN) - .set_dictionary_enabled(false) - .set_statistics_enabled(EnabledStatistics::None) - .set_bloom_filter_enabled(false) - .set_key_value_metadata(metadata) - .build(); + let builder = parquet_writer_properties_builder(compression, enable_dictionary, metadata); + + let props = builder.build(); let batches = blocks .into_iter() .map(|block| block.to_record_batch(table_schema)) @@ -56,3 +52,26 @@ pub fn blocks_to_parquet( let file_meta = writer.close()?; Ok(file_meta) } + +pub fn parquet_writer_properties_builder( + compression: TableCompression, + enable_dictionary: bool, + metadata: Option>, +) -> WriterPropertiesBuilder { + let builder = WriterProperties::builder() + .set_compression(compression.into()) + // use `usize::MAX` to effectively limit the number of row groups to 1 + .set_max_row_group_size(usize::MAX) + .set_encoding(Encoding::PLAIN) + .set_statistics_enabled(EnabledStatistics::None) + .set_bloom_filter_enabled(false) + .set_key_value_metadata(metadata); + + if enable_dictionary { + builder + .set_writer_version(WriterVersion::PARQUET_2_0) + .set_dictionary_enabled(true) + } else { + builder.set_dictionary_enabled(false) + } +} diff --git a/src/query/storages/fuse/benches/bench.rs b/src/query/storages/fuse/benches/bench.rs index ae58ca32ec780..10fe5133c6075 100644 --- a/src/query/storages/fuse/benches/bench.rs +++ b/src/query/storages/fuse/benches/bench.rs @@ -141,11 +141,13 @@ mod dummy { let max_page_size = 8192; let block_per_seg = 1000; + let enable_parquet_dictionary = false; let write_settings = WriteSettings { storage_format, table_compression: compression, max_page_size, block_per_seg, + enable_parquet_dictionary, }; let schema = Arc::new(schema); let mut buffer = Vec::new(); diff --git a/src/query/storages/fuse/src/constants.rs b/src/query/storages/fuse/src/constants.rs index 3d7b61ed49cf5..1e70e029e380b 100644 --- a/src/query/storages/fuse/src/constants.rs +++ b/src/query/storages/fuse/src/constants.rs @@ -18,13 +18,13 @@ pub const FUSE_OPT_KEY_ROW_PER_BLOCK: &str = "row_per_block"; pub const FUSE_OPT_KEY_ROW_PER_PAGE: &str = "row_per_page"; pub const FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD: &str = "row_avg_depth_threshold"; pub const FUSE_OPT_KEY_FILE_SIZE: &str = "file_size"; - pub const FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS: &str = "data_retention_period_in_hours"; pub const FUSE_OPT_KEY_DATA_RETENTION_NUM_SNAPSHOTS_TO_KEEP: &str = "data_retention_num_snapshots_to_keep"; pub const FUSE_OPT_KEY_ENABLE_AUTO_VACUUM: &str = "enable_auto_vacuum"; pub const FUSE_OPT_KEY_ENABLE_AUTO_ANALYZE: &str = "enable_auto_analyze"; pub const FUSE_OPT_KEY_ATTACH_COLUMN_IDS: &str = "attach_column_ids"; +pub const FUSE_OPT_KEY_ENABLE_PARQUET_DICTIONARY: &str = "enable_parquet_dictionary"; pub const FUSE_TBL_BLOCK_PREFIX: &str = "_b"; pub const FUSE_TBL_BLOCK_INDEX_PREFIX: &str = "_i"; diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index c5ff7e414cee9..163f7e1ef2f12 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -133,6 +133,7 @@ use crate::FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD; use crate::FUSE_OPT_KEY_BLOCK_PER_SEGMENT; use crate::FUSE_OPT_KEY_DATA_RETENTION_NUM_SNAPSHOTS_TO_KEEP; use crate::FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS; +use crate::FUSE_OPT_KEY_ENABLE_PARQUET_DICTIONARY; use crate::FUSE_OPT_KEY_FILE_SIZE; use crate::FUSE_OPT_KEY_ROW_PER_BLOCK; use crate::FUSE_OPT_KEY_ROW_PER_PAGE; @@ -321,11 +322,15 @@ impl FuseTable { let block_per_seg = self.get_option(FUSE_OPT_KEY_BLOCK_PER_SEGMENT, DEFAULT_BLOCK_PER_SEGMENT); + let enable_parquet_dictionary_encoding = + self.get_option(FUSE_OPT_KEY_ENABLE_PARQUET_DICTIONARY, false); + WriteSettings { storage_format: self.storage_format, table_compression: self.table_compression, max_page_size, block_per_seg, + enable_parquet_dictionary: enable_parquet_dictionary_encoding, } } diff --git a/src/query/storages/fuse/src/io/write/block_writer.rs b/src/query/storages/fuse/src/io/write/block_writer.rs index 4f3a04f742797..531f70a17c5b5 100644 --- a/src/query/storages/fuse/src/io/write/block_writer.rs +++ b/src/query/storages/fuse/src/io/write/block_writer.rs @@ -85,6 +85,7 @@ pub fn serialize_block( vec![block], buf, write_settings.table_compression, + write_settings.enable_parquet_dictionary, None, )?; let meta = column_parquet_metas(&result, &schema)?; diff --git a/src/query/storages/fuse/src/io/write/bloom_index_writer.rs b/src/query/storages/fuse/src/io/write/bloom_index_writer.rs index 501fe22598607..384ca815f2d51 100644 --- a/src/query/storages/fuse/src/io/write/bloom_index_writer.rs +++ b/src/query/storages/fuse/src/io/write/bloom_index_writer.rs @@ -77,6 +77,7 @@ impl BloomIndexState { vec![index_block], &mut data, TableCompression::None, + false, None, )?; let data_size = data.len() as u64; diff --git a/src/query/storages/fuse/src/io/write/inverted_index_writer.rs b/src/query/storages/fuse/src/io/write/inverted_index_writer.rs index bffa59a4997a5..a596e38bd0bb1 100644 --- a/src/query/storages/fuse/src/io/write/inverted_index_writer.rs +++ b/src/query/storages/fuse/src/io/write/inverted_index_writer.rs @@ -336,6 +336,8 @@ impl InvertedIndexWriter { &mut data, // Zstd has the best compression ratio TableCompression::Zstd, + // No dictionary page for inverted index + false, None, )?; diff --git a/src/query/storages/fuse/src/io/write/stream/block_builder.rs b/src/query/storages/fuse/src/io/write/stream/block_builder.rs index b2934b2f0d8f3..4de89f7e0a6fb 100644 --- a/src/query/storages/fuse/src/io/write/stream/block_builder.rs +++ b/src/query/storages/fuse/src/io/write/stream/block_builder.rs @@ -38,6 +38,7 @@ use databend_common_meta_app::schema::TableIndex; use databend_common_native::write::NativeWriter; use databend_common_native::write::WriteOptions; use databend_common_sql::executor::physical_plans::MutationKind; +use databend_storages_common_blocks::parquet_writer_properties_builder; use databend_storages_common_index::BloomIndex; use databend_storages_common_index::BloomIndexBuilder; use databend_storages_common_index::Index; @@ -49,9 +50,6 @@ use databend_storages_common_table_meta::meta::ColumnMeta; use databend_storages_common_table_meta::meta::TableMetaTimestamps; use databend_storages_common_table_meta::table::TableCompression; use parquet::arrow::ArrowWriter; -use parquet::basic::Encoding; -use parquet::file::properties::EnabledStatistics; -use parquet::file::properties::WriterProperties; use crate::io::create_inverted_index_builders; use crate::io::write::stream::cluster_statistics::ClusterStatisticsBuilder; @@ -172,15 +170,14 @@ impl StreamBlockBuilder { let buffer = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE); let block_writer = match properties.write_settings.storage_format { FuseStorageFormat::Parquet => { - let props = WriterProperties::builder() - .set_compression(properties.write_settings.table_compression.into()) - // use `usize::MAX` to effectively limit the number of row groups to 1 - .set_max_row_group_size(usize::MAX) - .set_encoding(Encoding::PLAIN) - .set_dictionary_enabled(false) - .set_statistics_enabled(EnabledStatistics::None) - .set_bloom_filter_enabled(false) - .build(); + let write_settings = &properties.write_settings; + let props = parquet_writer_properties_builder( + write_settings.table_compression, + write_settings.enable_parquet_dictionary, + None, + ) + .build(); + let arrow_schema = Arc::new(properties.source_schema.as_ref().into()); let writer = ArrowWriter::try_new(buffer, arrow_schema, Some(props))?; BlockWriterImpl::Arrow(writer) diff --git a/src/query/storages/fuse/src/io/write/vector_index_writer.rs b/src/query/storages/fuse/src/io/write/vector_index_writer.rs index f6d2d169d6204..ca372d66c2c2d 100644 --- a/src/query/storages/fuse/src/io/write/vector_index_writer.rs +++ b/src/query/storages/fuse/src/io/write/vector_index_writer.rs @@ -214,6 +214,8 @@ impl VectorIndexBuilder { &mut data, // Zstd has the best compression ratio TableCompression::Zstd, + // No dictionary page for vector index + false, Some(metadata), )?; @@ -318,6 +320,8 @@ impl VectorIndexBuilder { &mut data, // Zstd has the best compression ratio TableCompression::Zstd, + // No dictionary page for vector index + false, Some(metadata), )?; diff --git a/src/query/storages/fuse/src/io/write/virtual_column_builder.rs b/src/query/storages/fuse/src/io/write/virtual_column_builder.rs index c1821e40efb24..a79b7a0b29b97 100644 --- a/src/query/storages/fuse/src/io/write/virtual_column_builder.rs +++ b/src/query/storages/fuse/src/io/write/virtual_column_builder.rs @@ -511,6 +511,7 @@ impl VirtualColumnBuilder { vec![virtual_block], &mut data, write_settings.table_compression, + write_settings.enable_parquet_dictionary, None, )?; diff --git a/src/query/storages/fuse/src/io/write/write_settings.rs b/src/query/storages/fuse/src/io/write/write_settings.rs index 763942c018916..9408ee33c26c8 100644 --- a/src/query/storages/fuse/src/io/write/write_settings.rs +++ b/src/query/storages/fuse/src/io/write/write_settings.rs @@ -28,6 +28,7 @@ pub struct WriteSettings { pub max_page_size: usize, pub block_per_seg: usize, + pub enable_parquet_dictionary: bool, } impl Default for WriteSettings { @@ -37,6 +38,7 @@ impl Default for WriteSettings { table_compression: TableCompression::default(), max_page_size: DEFAULT_ROW_PER_PAGE, block_per_seg: DEFAULT_BLOCK_PER_SEGMENT, + enable_parquet_dictionary: false, } } } diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0046_parquet_encoding.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0046_parquet_encoding.test new file mode 100644 index 0000000000000..92c5d76960ba7 --- /dev/null +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0046_parquet_encoding.test @@ -0,0 +1,124 @@ +statement ok +create or replace database test_tbl_opt_parquet_encoding; + +statement ok +use test_tbl_opt_parquet_encoding; + +############################################# +# Create table with parquet encoding option # +############################################# + +statement ok +create or replace table t_encoded (c int, s string) enable_parquet_dictionary = 'true' storage_format = 'parquet'; + +statement ok +create or replace table t(c int, s string) storage_format = 'parquet'; + +statement ok +insert into t_encoded(c, s) select 1 as c, to_string(1) as s from numbers(1000000); + +statement ok +optimize table t_encoded compact; + +statement ok +insert into t(c, s) select 1 as c, to_string(1) as s from numbers(1000000); + +statement ok +optimize table t compact; + +# In this case, lz4 with encoding produces smaller block files +query T +with + e as (select bytes_compressed c from fuse_snapshot('test_tbl_opt_parquet_encoding', 't_encoded') limit 1), + p as (select bytes_compressed c from fuse_snapshot('test_tbl_opt_parquet_encoding', 't') limit 1) + select e.c < p.c from e, p +---- +1 + + +################################ +# Alter table parquet encoding # +################################ + + +# 1. prepare plain encoded data and keep the file size +statement ok +create or replace table tbl (c int, s string) storage_format = 'parquet'; + +statement ok +insert into tbl(c, s) select 1 as c, to_string(1) as s from numbers(1000000); + +# insertion might be executed in a distributed manner, in this case, data blocks might be fragmented +statement ok +optimize table tbl compact; + +statement ok +create temp table tbl_size(s uint64); + +statement ok +insert into tbl_size select bytes_compressed from fuse_snapshot('test_tbl_opt_parquet_encoding', 'tbl') limit 1; + + +# 2. truncate table data and insert the same data with parquet encoding enabled +statement ok +truncate table tbl; + +statement ok +ALTER TABLE tbl SET OPTIONS (enable_parquet_dictionary = 'true'); + +statement ok +insert into tbl(c, s) select 1 as c, to_string(1) as s from numbers(1000000); + +# insertion might be executed in a distributed manner, in this case, data blocks might be fragmented, let's compact them +statement ok +optimize table tbl compact; + + +# 3. check that file size of newly created blocks with encoding is smaller + +query T +with + e as (select bytes_compressed c from fuse_snapshot('test_tbl_opt_parquet_encoding', 'tbl') limit 1), + p as (select s as c from tbl_size) + select e.c < p.c from e,p +---- +1 + +# keep the size, will be used later +statement ok +create temp table e_tbl_size(s uint64); + +statement ok +insert into e_tbl_size select bytes_compressed from fuse_snapshot('test_tbl_opt_parquet_encoding', 'tbl') limit 1; + +# 4. check that table option `enable_parquet_dictionary` could be turned off + +statement ok +truncate table tbl; + +statement ok +ALTER TABLE tbl SET OPTIONS (enable_parquet_dictionary = 'false'); + +statement ok +insert into tbl(c, s) select 1 as c, to_string(1) as s from numbers(1000000); + +statement ok +optimize table tbl compact; + + +# 3. check that file size of newly created blocks with encoding is smaller +query T +with + p as (select bytes_compressed c from fuse_snapshot('test_tbl_opt_parquet_encoding', 'tbl') limit 1), + e as (select s as c from e_tbl_size) + select e.c < p.c from e,p +---- +1 + + +# Test invalid option value + +statement error 1001 +ALTER TABLE tbl SET OPTIONS (enable_parquet_dictionary = 'invalid'); + +