Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/query/formats/src/output_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,15 @@ impl OutputFormat for ParquetOutputFormat {
return Ok(vec![]);
}
let mut buf = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE);
let _ = blocks_to_parquet(&self.schema, blocks, &mut buf, TableCompression::Zstd, None)?;
// While unloading data as parquet, enable dictionary unconditionally, usually this leads to smaller size.
let _ = blocks_to_parquet(
&self.schema,
blocks,
&mut buf,
TableCompression::Zstd,
true,
None,
)?;
Ok(buf)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use databend_common_storages_fuse::FUSE_OPT_KEY_DATA_RETENTION_NUM_SNAPSHOTS_TO_
use databend_common_storages_fuse::FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS;
use databend_common_storages_fuse::FUSE_OPT_KEY_ENABLE_AUTO_ANALYZE;
use databend_common_storages_fuse::FUSE_OPT_KEY_ENABLE_AUTO_VACUUM;
use databend_common_storages_fuse::FUSE_OPT_KEY_ENABLE_PARQUET_DICTIONARY;
use databend_common_storages_fuse::FUSE_OPT_KEY_FILE_SIZE;
use databend_common_storages_fuse::FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD;
use databend_common_storages_fuse::FUSE_OPT_KEY_ROW_PER_BLOCK;
Expand Down Expand Up @@ -89,6 +90,7 @@ pub static CREATE_FUSE_OPTIONS: LazyLock<HashSet<&'static str>> = LazyLock::new(
r.insert(OPT_KEY_TEMP_PREFIX);
r.insert(OPT_KEY_SEGMENT_FORMAT);
r.insert(OPT_KEY_ENABLE_COPY_DEDUP_FULL_PATH);
r.insert(FUSE_OPT_KEY_ENABLE_PARQUET_DICTIONARY);
r
});

Expand Down Expand Up @@ -259,3 +261,19 @@ where
}
Ok(())
}

pub fn is_valid_fuse_parquet_dictionary_opt(
options: &BTreeMap<String, String>,
) -> databend_common_exception::Result<()> {
is_valid_bool_opt(FUSE_OPT_KEY_ENABLE_PARQUET_DICTIONARY, options)
}

fn is_valid_bool_opt(
key: &str,
options: &BTreeMap<String, String>,
) -> databend_common_exception::Result<()> {
if let Some(value) = options.get(key) {
value.parse::<bool>()?;
}
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ use crate::interpreters::common::table_option_validation::is_valid_bloom_index_c
use crate::interpreters::common::table_option_validation::is_valid_change_tracking;
use crate::interpreters::common::table_option_validation::is_valid_create_opt;
use crate::interpreters::common::table_option_validation::is_valid_data_retention_period;
use crate::interpreters::common::table_option_validation::is_valid_fuse_parquet_dictionary_opt;
use crate::interpreters::common::table_option_validation::is_valid_option_of_type;
use crate::interpreters::common::table_option_validation::is_valid_random_seed;
use crate::interpreters::common::table_option_validation::is_valid_row_per_block;
Expand Down Expand Up @@ -455,6 +456,8 @@ impl CreateTableInterpreter {
is_valid_random_seed(&table_meta.options)?;
// check table level data_retention_period_in_hours
is_valid_data_retention_period(&table_meta.options)?;
// check enable_parquet_encoding
is_valid_fuse_parquet_dictionary_opt(&table_meta.options)?;

// Same as settings of FUSE_OPT_KEY_ENABLE_AUTO_VACUUM, expect value type is unsigned integer
is_valid_option_of_type::<u32>(&table_meta.options, FUSE_OPT_KEY_ENABLE_AUTO_VACUUM)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ use crate::interpreters::common::table_option_validation::is_valid_block_per_seg
use crate::interpreters::common::table_option_validation::is_valid_bloom_index_columns;
use crate::interpreters::common::table_option_validation::is_valid_create_opt;
use crate::interpreters::common::table_option_validation::is_valid_data_retention_period;
use crate::interpreters::common::table_option_validation::is_valid_fuse_parquet_dictionary_opt;
use crate::interpreters::common::table_option_validation::is_valid_option_of_type;
use crate::interpreters::common::table_option_validation::is_valid_row_per_block;
use crate::interpreters::Interpreter;
Expand Down Expand Up @@ -95,6 +96,8 @@ impl Interpreter for SetOptionsInterpreter {
is_valid_row_per_block(&self.plan.set_options)?;
// check data_retention_period
is_valid_data_retention_period(&self.plan.set_options)?;
// check enable_parquet_encoding
is_valid_fuse_parquet_dictionary_opt(&self.plan.set_options)?;

// check storage_format
let error_str = "invalid opt for fuse table in alter table statement";
Expand Down
1 change: 1 addition & 0 deletions src/query/service/src/test_kits/block_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ impl<'a> BlockWriter<'a> {
vec![index_block],
&mut data,
TableCompression::None,
false,
None,
)?;
let size = data.len() as u64;
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/basic/src/result_cache/write/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ impl ResultCacheWriter {
self.blocks.clone(),
&mut buf,
TableCompression::None,
false,
None,
)?;

Expand Down
39 changes: 29 additions & 10 deletions src/query/storages/common/blocks/src/parquet_rs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use parquet::basic::Encoding;
use parquet::file::metadata::KeyValue;
use parquet::file::properties::EnabledStatistics;
use parquet::file::properties::WriterProperties;
use parquet::file::properties::WriterPropertiesBuilder;
use parquet::file::properties::WriterVersion;
use parquet::format::FileMetaData;

/// Serialize data blocks to parquet format.
Expand All @@ -31,19 +33,13 @@ pub fn blocks_to_parquet(
blocks: Vec<DataBlock>,
write_buffer: &mut Vec<u8>,
compression: TableCompression,
enable_dictionary: bool,
metadata: Option<Vec<KeyValue>>,
) -> Result<FileMetaData> {
assert!(!blocks.is_empty());
let props = WriterProperties::builder()
.set_compression(compression.into())
// use `usize::MAX` to effectively limit the number of row groups to 1
.set_max_row_group_size(usize::MAX)
.set_encoding(Encoding::PLAIN)
.set_dictionary_enabled(false)
.set_statistics_enabled(EnabledStatistics::None)
.set_bloom_filter_enabled(false)
.set_key_value_metadata(metadata)
.build();
let builder = parquet_writer_properties_builder(compression, enable_dictionary, metadata);

let props = builder.build();
let batches = blocks
.into_iter()
.map(|block| block.to_record_batch(table_schema))
Expand All @@ -56,3 +52,26 @@ pub fn blocks_to_parquet(
let file_meta = writer.close()?;
Ok(file_meta)
}

pub fn parquet_writer_properties_builder(
compression: TableCompression,
enable_dictionary: bool,
metadata: Option<Vec<KeyValue>>,
) -> WriterPropertiesBuilder {
let builder = WriterProperties::builder()
.set_compression(compression.into())
// use `usize::MAX` to effectively limit the number of row groups to 1
.set_max_row_group_size(usize::MAX)
.set_encoding(Encoding::PLAIN)
.set_statistics_enabled(EnabledStatistics::None)
.set_bloom_filter_enabled(false)
.set_key_value_metadata(metadata);

if enable_dictionary {
builder
.set_writer_version(WriterVersion::PARQUET_2_0)
.set_dictionary_enabled(true)
} else {
builder.set_dictionary_enabled(false)
}
}
2 changes: 2 additions & 0 deletions src/query/storages/fuse/benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,13 @@ mod dummy {
let max_page_size = 8192;
let block_per_seg = 1000;

let enable_parquet_dictionary = false;
let write_settings = WriteSettings {
storage_format,
table_compression: compression,
max_page_size,
block_per_seg,
enable_parquet_dictionary,
};
let schema = Arc::new(schema);
let mut buffer = Vec::new();
Expand Down
2 changes: 1 addition & 1 deletion src/query/storages/fuse/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ pub const FUSE_OPT_KEY_ROW_PER_BLOCK: &str = "row_per_block";
pub const FUSE_OPT_KEY_ROW_PER_PAGE: &str = "row_per_page";
pub const FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD: &str = "row_avg_depth_threshold";
pub const FUSE_OPT_KEY_FILE_SIZE: &str = "file_size";

pub const FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS: &str = "data_retention_period_in_hours";
pub const FUSE_OPT_KEY_DATA_RETENTION_NUM_SNAPSHOTS_TO_KEEP: &str =
"data_retention_num_snapshots_to_keep";
pub const FUSE_OPT_KEY_ENABLE_AUTO_VACUUM: &str = "enable_auto_vacuum";
pub const FUSE_OPT_KEY_ENABLE_AUTO_ANALYZE: &str = "enable_auto_analyze";
pub const FUSE_OPT_KEY_ATTACH_COLUMN_IDS: &str = "attach_column_ids";
pub const FUSE_OPT_KEY_ENABLE_PARQUET_DICTIONARY: &str = "enable_parquet_dictionary";

pub const FUSE_TBL_BLOCK_PREFIX: &str = "_b";
pub const FUSE_TBL_BLOCK_INDEX_PREFIX: &str = "_i";
Expand Down
5 changes: 5 additions & 0 deletions src/query/storages/fuse/src/fuse_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ use crate::FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD;
use crate::FUSE_OPT_KEY_BLOCK_PER_SEGMENT;
use crate::FUSE_OPT_KEY_DATA_RETENTION_NUM_SNAPSHOTS_TO_KEEP;
use crate::FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS;
use crate::FUSE_OPT_KEY_ENABLE_PARQUET_DICTIONARY;
use crate::FUSE_OPT_KEY_FILE_SIZE;
use crate::FUSE_OPT_KEY_ROW_PER_BLOCK;
use crate::FUSE_OPT_KEY_ROW_PER_PAGE;
Expand Down Expand Up @@ -321,11 +322,15 @@ impl FuseTable {
let block_per_seg =
self.get_option(FUSE_OPT_KEY_BLOCK_PER_SEGMENT, DEFAULT_BLOCK_PER_SEGMENT);

let enable_parquet_dictionary_encoding =
self.get_option(FUSE_OPT_KEY_ENABLE_PARQUET_DICTIONARY, false);

WriteSettings {
storage_format: self.storage_format,
table_compression: self.table_compression,
max_page_size,
block_per_seg,
enable_parquet_dictionary: enable_parquet_dictionary_encoding,
}
}

Expand Down
1 change: 1 addition & 0 deletions src/query/storages/fuse/src/io/write/block_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ pub fn serialize_block(
vec![block],
buf,
write_settings.table_compression,
write_settings.enable_parquet_dictionary,
None,
)?;
let meta = column_parquet_metas(&result, &schema)?;
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/fuse/src/io/write/bloom_index_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ impl BloomIndexState {
vec![index_block],
&mut data,
TableCompression::None,
false,
None,
)?;
let data_size = data.len() as u64;
Expand Down
2 changes: 2 additions & 0 deletions src/query/storages/fuse/src/io/write/inverted_index_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,8 @@ impl InvertedIndexWriter {
&mut data,
// Zstd has the best compression ratio
TableCompression::Zstd,
// No dictionary page for inverted index
false,
None,
)?;

Expand Down
21 changes: 9 additions & 12 deletions src/query/storages/fuse/src/io/write/stream/block_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use databend_common_meta_app::schema::TableIndex;
use databend_common_native::write::NativeWriter;
use databend_common_native::write::WriteOptions;
use databend_common_sql::executor::physical_plans::MutationKind;
use databend_storages_common_blocks::parquet_writer_properties_builder;
use databend_storages_common_index::BloomIndex;
use databend_storages_common_index::BloomIndexBuilder;
use databend_storages_common_index::Index;
Expand All @@ -49,9 +50,6 @@ use databend_storages_common_table_meta::meta::ColumnMeta;
use databend_storages_common_table_meta::meta::TableMetaTimestamps;
use databend_storages_common_table_meta::table::TableCompression;
use parquet::arrow::ArrowWriter;
use parquet::basic::Encoding;
use parquet::file::properties::EnabledStatistics;
use parquet::file::properties::WriterProperties;

use crate::io::create_inverted_index_builders;
use crate::io::write::stream::cluster_statistics::ClusterStatisticsBuilder;
Expand Down Expand Up @@ -172,15 +170,14 @@ impl StreamBlockBuilder {
let buffer = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE);
let block_writer = match properties.write_settings.storage_format {
FuseStorageFormat::Parquet => {
let props = WriterProperties::builder()
.set_compression(properties.write_settings.table_compression.into())
// use `usize::MAX` to effectively limit the number of row groups to 1
.set_max_row_group_size(usize::MAX)
.set_encoding(Encoding::PLAIN)
.set_dictionary_enabled(false)
.set_statistics_enabled(EnabledStatistics::None)
.set_bloom_filter_enabled(false)
.build();
let write_settings = &properties.write_settings;
let props = parquet_writer_properties_builder(
write_settings.table_compression,
write_settings.enable_parquet_dictionary,
None,
)
.build();

let arrow_schema = Arc::new(properties.source_schema.as_ref().into());
let writer = ArrowWriter::try_new(buffer, arrow_schema, Some(props))?;
BlockWriterImpl::Arrow(writer)
Expand Down
4 changes: 4 additions & 0 deletions src/query/storages/fuse/src/io/write/vector_index_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ impl VectorIndexBuilder {
&mut data,
// Zstd has the best compression ratio
TableCompression::Zstd,
// No dictionary page for vector index
false,
Some(metadata),
)?;

Expand Down Expand Up @@ -318,6 +320,8 @@ impl VectorIndexBuilder {
&mut data,
// Zstd has the best compression ratio
TableCompression::Zstd,
// No dictionary page for vector index
false,
Some(metadata),
)?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,7 @@ impl VirtualColumnBuilder {
vec![virtual_block],
&mut data,
write_settings.table_compression,
write_settings.enable_parquet_dictionary,
None,
)?;

Expand Down
2 changes: 2 additions & 0 deletions src/query/storages/fuse/src/io/write/write_settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub struct WriteSettings {
pub max_page_size: usize,

pub block_per_seg: usize,
pub enable_parquet_dictionary: bool,
}

impl Default for WriteSettings {
Expand All @@ -37,6 +38,7 @@ impl Default for WriteSettings {
table_compression: TableCompression::default(),
max_page_size: DEFAULT_ROW_PER_PAGE,
block_per_seg: DEFAULT_BLOCK_PER_SEGMENT,
enable_parquet_dictionary: false,
}
}
}
Loading