Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ use crate::pipelines::processors::transforms::hash_join::SingleBinaryHashJoinHas
use crate::pipelines::processors::HashJoinState;
use crate::sessions::QueryContext;

pub(crate) const INLIST_RUNTIME_FILTER_THRESHOLD: usize = 1024;

/// Define some shared states for all hash join build threads.
pub struct HashJoinBuildState {
pub(crate) ctx: Arc<QueryContext>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,32 @@ use super::packet::JoinRuntimeFilterPacket;
use super::packet::RuntimeFilterPacket;
use super::packet::SerializableDomain;
use crate::pipelines::processors::transforms::hash_join::desc::RuntimeFilterDesc;
use crate::pipelines::processors::transforms::hash_join::hash_join_build_state::INLIST_RUNTIME_FILTER_THRESHOLD;
use crate::pipelines::processors::transforms::hash_join::util::hash_by_method;

struct JoinRuntimeFilterPacketBuilder<'a> {
build_key_column: Column,
func_ctx: &'a FunctionContext,
inlist_threshold: usize,
bloom_threshold: usize,
min_max_threshold: usize,
}

impl<'a> JoinRuntimeFilterPacketBuilder<'a> {
fn new(
data_blocks: &'a [DataBlock],
func_ctx: &'a FunctionContext,
build_key: &Expr,
inlist_threshold: usize,
bloom_threshold: usize,
min_max_threshold: usize,
) -> Result<Self> {
let build_key_column = Self::eval_build_key_column(data_blocks, func_ctx, build_key)?;
Ok(Self {
func_ctx,
build_key_column,
inlist_threshold,
bloom_threshold,
min_max_threshold,
})
}
fn build(&self, desc: &RuntimeFilterDesc) -> Result<RuntimeFilterPacket> {
Expand All @@ -73,16 +81,15 @@ impl<'a> JoinRuntimeFilterPacketBuilder<'a> {
}

fn enable_min_max(&self, desc: &RuntimeFilterDesc) -> bool {
desc.enable_min_max_runtime_filter
desc.enable_min_max_runtime_filter && self.build_key_column.len() < self.min_max_threshold
}

fn enable_inlist(&self, desc: &RuntimeFilterDesc) -> bool {
desc.enable_inlist_runtime_filter
&& self.build_key_column.len() < INLIST_RUNTIME_FILTER_THRESHOLD
desc.enable_inlist_runtime_filter && self.build_key_column.len() < self.inlist_threshold
}

fn enable_bloom(&self, desc: &RuntimeFilterDesc) -> bool {
desc.enable_bloom_runtime_filter
desc.enable_bloom_runtime_filter && self.build_key_column.len() < self.bloom_threshold
}

fn build_min_max(&self) -> Result<SerializableDomain> {
Expand Down Expand Up @@ -147,6 +154,9 @@ pub fn build_runtime_filter_packet(
build_num_rows: usize,
runtime_filter_desc: &[RuntimeFilterDesc],
func_ctx: &FunctionContext,
inlist_threshold: usize,
bloom_threshold: usize,
min_max_threshold: usize,
) -> Result<JoinRuntimeFilterPacket> {
if build_num_rows == 0 {
return Ok(JoinRuntimeFilterPacket::default());
Expand All @@ -155,8 +165,15 @@ pub fn build_runtime_filter_packet(
for rf in runtime_filter_desc {
runtime_filters.insert(
rf.id,
JoinRuntimeFilterPacketBuilder::new(build_chunks, func_ctx, &rf.build_key)?
.build(rf)?,
JoinRuntimeFilterPacketBuilder::new(
build_chunks,
func_ctx,
&rf.build_key,
inlist_threshold,
bloom_threshold,
min_max_threshold,
)?
.build(rf)?,
);
}
Ok(JoinRuntimeFilterPacket {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,50 +89,33 @@ fn build_inlist_filter(inlist: Column, probe_key: &Expr<String>) -> Result<Expr<
display_name: probe_key.display_name.clone(),
};

let scalars: Vec<_> = inlist
let eq_exprs: Vec<RawExpr<String>> = inlist
.iter()
.map(|scalar_ref| scalar_ref.to_owned())
.collect();

let balanced_expr = build_balanced_or_tree(&raw_probe_key, &scalars);

let expr = type_check::check(&balanced_expr, &BUILTIN_FUNCTIONS)?;
Ok(expr)
}

fn build_balanced_or_tree(probe_key: &RawExpr<String>, scalars: &[Scalar]) -> RawExpr<String> {
match scalars.len() {
0 => RawExpr::Constant {
.map(|scalar_ref| RawExpr::FunctionCall {
span: None,
scalar: Scalar::Boolean(false),
data_type: None,
},
1 => {
let constant_expr = RawExpr::Constant {
name: "eq".to_string(),
params: vec![],
args: vec![raw_probe_key.clone(), RawExpr::Constant {
span: None,
scalar: scalars[0].clone(),
scalar: scalar_ref.to_owned(),
data_type: None,
};
RawExpr::FunctionCall {
span: None,
name: "eq".to_string(),
params: vec![],
args: vec![probe_key.clone(), constant_expr],
}
}
_ => {
let mid = scalars.len() / 2;
let left_subtree = build_balanced_or_tree(probe_key, &scalars[..mid]);
let right_subtree = build_balanced_or_tree(probe_key, &scalars[mid..]);
}],
})
.collect();

RawExpr::FunctionCall {
span: None,
name: "or".to_string(),
params: vec![],
args: vec![left_subtree, right_subtree],
}
let or_filters_expr = if eq_exprs.len() == 1 {
eq_exprs[0].clone()
} else {
RawExpr::FunctionCall {
span: None,
name: "or_filters".to_string(),
params: vec![],
args: eq_exprs,
}
}
};

let expr = type_check::check(&or_filters_expr, &BUILTIN_FUNCTIONS)?;
Ok(expr)
}

fn build_min_max_filter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,26 @@ pub async fn build_and_push_down_runtime_filter(
build_num_rows: usize,
join: &HashJoinBuildState,
) -> Result<()> {
let inlist_threshold = join
.ctx
.get_settings()
.get_inlist_runtime_filter_threshold()? as usize;
let bloom_threshold = join
.ctx
.get_settings()
.get_bloom_runtime_filter_threshold()? as usize;
let min_max_threshold = join
.ctx
.get_settings()
.get_min_max_runtime_filter_threshold()? as usize;
let mut packet = build_runtime_filter_packet(
build_chunks,
build_num_rows,
join.runtime_filter_desc(),
&join.func_ctx,
inlist_threshold,
bloom_threshold,
min_max_threshold,
)?;
log::info!("[RUNTIME-FILTER] build runtime filter packet: {:?}, build_num_rows: {}, runtime_filter_desc: {:?}", packet, build_num_rows, join.runtime_filter_desc());
if let Some(broadcast_id) = join.broadcast_id {
Expand Down
21 changes: 21 additions & 0 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,27 @@ impl DefaultSettings {
scope: SettingScope::Both,
range: Some(SettingRange::Numeric(0..=u64::MAX)),
}),
("inlist_runtime_filter_threshold", DefaultSettingValue {
value: UserSettingValue::UInt64(1024),
desc: "Sets the maximum number of values in an IN list for runtime filter generation.",
mode: SettingMode::Both,
scope: SettingScope::Both,
range: Some(SettingRange::Numeric(0..=u64::MAX)),
}),
("bloom_runtime_filter_threshold", DefaultSettingValue {
value: UserSettingValue::UInt64(u64::MAX),
desc: "Sets the maximum number of rows for bloom runtime filter generation.",
mode: SettingMode::Both,
scope: SettingScope::Both,
range: Some(SettingRange::Numeric(0..=u64::MAX)),
}),
("min_max_runtime_filter_threshold", DefaultSettingValue {
value: UserSettingValue::UInt64(u64::MAX),
desc: "Sets the maximum number of rows for min-max runtime filter generation.",
mode: SettingMode::Both,
scope: SettingScope::Both,
range: Some(SettingRange::Numeric(0..=u64::MAX)),
}),
("unquoted_ident_case_sensitive", DefaultSettingValue {
value: UserSettingValue::UInt64(0),
desc: "Set to 1 to make unquoted names (like table or column names) case-sensitive, or 0 for case-insensitive.",
Expand Down
9 changes: 9 additions & 0 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,15 @@ impl Settings {
pub fn get_max_inlist_to_or(&self) -> Result<u64> {
self.try_get_u64("max_inlist_to_or")
}
pub fn get_inlist_runtime_filter_threshold(&self) -> Result<u64> {
self.try_get_u64("inlist_runtime_filter_threshold")
}
pub fn get_bloom_runtime_filter_threshold(&self) -> Result<u64> {
self.try_get_u64("bloom_runtime_filter_threshold")
}
pub fn get_min_max_runtime_filter_threshold(&self) -> Result<u64> {
self.try_get_u64("min_max_runtime_filter_threshold")
}

pub fn get_unquoted_ident_case_sensitive(&self) -> Result<bool> {
Ok(self.try_get_u64("unquoted_ident_case_sensitive")? != 0)
Expand Down