Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/actions/benchmark_local/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ runs:
run: |
./benchmark_local.sh

- name: Upload failure
if: failure()
uses: ./.github/actions/artifact_failure
with:
name: benchmark_local

- name: Prepare Metadata
working-directory: benchmark/clickbench
shell: bash
Expand Down
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ serde_derive = "1"
serde_ignored = "0.1.10"
serde_json = { version = "1.0.85", default-features = false, features = ["preserve_order", "unbounded_depth"] }
serde_repr = "0.1.9"
serde_stacker = { version = "0.1" }
serde_stacker = "0.1"
serde_test = "1.0"
serde_urlencoded = "0.7.1"
serde_with = { version = "3.8.1" }
Expand All @@ -492,6 +492,7 @@ snap = "1"
socket2 = "0.5.3"
span-map = { version = "0.2.0" }
sqlx = { version = "0.8", features = ["mysql", "runtime-tokio"] }
stacker = "0.1"
state = "0.6.0"
state-machine-api = { version = "0.3.4" }
stream-more = "0.1.3"
Expand Down
75 changes: 18 additions & 57 deletions src/query/expression/src/aggregate/aggregate_function_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::alloc::Layout;
use std::ptr::NonNull;

use databend_common_base::hints::assume;
use databend_common_exception::Result;
use enum_as_inner::EnumAsInner;

Expand All @@ -24,50 +25,26 @@ use crate::types::DataType;
use crate::ColumnBuilder;

#[derive(Clone, Copy, Debug)]
pub struct StateAddr {
addr: usize,
}
pub struct StateAddr(*mut u8);

pub type StateAddrs = Vec<StateAddr>;

impl StateAddr {
#[inline]
pub fn new(addr: usize) -> StateAddr {
Self { addr }
pub fn null() -> StateAddr {
Self(std::ptr::null_mut())
}

#[inline]
pub fn get<'a, T>(&self) -> &'a mut T
where T: Send + 'static {
unsafe { &mut *(self.addr as *mut T) }
}

#[inline]
pub fn addr(&self) -> usize {
self.addr
}

/// # Safety
/// ptr must ensure point to valid memory
#[inline]
pub unsafe fn from_ptr(ptr: *mut u8) -> Self {
Self { addr: ptr as usize }
unsafe { &mut *self.0.cast::<T>() }
}

#[inline]
#[must_use]
pub fn next(&self, offset: usize) -> Self {
Self {
addr: self.addr + offset,
}
}

#[inline]
#[must_use]
pub fn prev(&self, offset: usize) -> Self {
Self {
addr: self.addr.wrapping_sub(offset),
}
unsafe { Self(self.0.add(offset)) }
}

#[inline]
Expand All @@ -77,7 +54,7 @@ impl StateAddr {
T: Send + 'static,
{
unsafe {
let ptr = self.addr as *mut T;
let ptr = self.0.cast::<T>();
std::ptr::write(ptr, f());
}
}
Expand All @@ -86,35 +63,21 @@ impl StateAddr {
pub fn write_state<T>(&self, state: T)
where T: Send + 'static {
unsafe {
let ptr = self.addr as *mut T;
let ptr = self.0.cast::<T>();
std::ptr::write(ptr, state);
}
}
}

impl From<NonNull<u8>> for StateAddr {
fn from(s: NonNull<u8>) -> Self {
Self {
addr: s.as_ptr() as usize,
}
}
}

impl From<usize> for StateAddr {
fn from(addr: usize) -> Self {
Self { addr }
}
}

impl From<StateAddr> for NonNull<u8> {
fn from(s: StateAddr) -> Self {
unsafe { NonNull::new_unchecked(s.addr as *mut u8) }
Self(s.as_ptr())
}
}

impl From<StateAddr> for usize {
fn from(s: StateAddr) -> Self {
s.addr
impl From<*mut u8> for StateAddr {
fn from(s: *mut u8) -> Self {
Self(s)
}
}

Expand Down Expand Up @@ -281,21 +244,19 @@ impl<'a> AggrState<'a> {

pub fn get<'b, T>(&self) -> &'b mut T
where T: Send + 'static {
debug_assert_eq!(self.loc.len(), 1);
self.addr
.next(self.loc[0].into_custom().unwrap().1)
.get::<T>()
assume(self.loc.len() == 1);
debug_assert!(self.loc[0].is_custom());
self.addr.next(self.loc[0].offset()).get::<T>()
}

pub fn write<T, F>(&self, f: F)
where
F: FnOnce() -> T,
T: Send + 'static,
{
debug_assert_eq!(self.loc.len(), 1);
self.addr
.next(self.loc[0].into_custom().unwrap().1)
.write(f);
assume(self.loc.len() == 1);
debug_assert!(self.loc[0].is_custom());
self.addr.next(self.loc[0].offset()).write(f);
}

pub fn remove_last_loc(&self) -> Self {
Expand Down
18 changes: 7 additions & 11 deletions src/query/expression/src/aggregate/aggregate_hashtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,24 +288,20 @@ impl AggregateHashTable {
while payload.flush(flush_state) {
let row_count = flush_state.row_count;

let _ = self.probe_and_create(
&mut flush_state.probe_state,
(&flush_state.group_columns).into(),
row_count,
);
let state = &mut *flush_state.probe_state;
let _ = self.probe_and_create(state, (&flush_state.group_columns).into(), row_count);

let places = &mut state.state_places[..row_count];

// set state places
if !self.payload.aggrs.is_empty() {
for i in 0..row_count {
flush_state.probe_state.state_places[i] =
flush_state.probe_state.addresses[i].state_addr(&self.payload.row_layout);
for (place, ptr) in places.iter_mut().zip(&state.addresses[..row_count]) {
*place = ptr.state_addr(&self.payload.row_layout)
}
}

let state = &mut flush_state.probe_state;
let places = &state.state_places.as_slice()[0..row_count];
let rhses = &flush_state.state_places.as_slice()[0..row_count];
if let Some(layout) = self.payload.row_layout.states_layout.as_ref() {
let rhses = &flush_state.state_places[..row_count];
for (aggr, loc) in self.payload.aggrs.iter().zip(layout.states_loc.iter()) {
aggr.batch_merge_states(places, rhses, loc)?;
}
Expand Down
45 changes: 19 additions & 26 deletions src/query/expression/src/aggregate/hash_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use super::PartitionedPayload;
use super::ProbeState;
use super::RowPtr;
use super::BATCH_SIZE;
use super::LOAD_FACTOR;
use crate::ProjectedBlock;

Expand Down Expand Up @@ -120,7 +119,7 @@ impl Entry {
}

pub fn get_pointer(&self) -> RowPtr {
RowPtr::new((self.0 & POINTER_MASK) as *const u8)
RowPtr::new((self.0 & POINTER_MASK) as *mut u8)
}

pub fn set_pointer(&mut self, ptr: RowPtr) {
Expand Down Expand Up @@ -152,21 +151,16 @@ impl HashIndex {
row_count: usize,
mut adapter: impl TableAdapter,
) -> usize {
#[derive(Default, Clone, Copy, Debug)]
struct Item {
slot: usize,
hash: u64,
for (i, row) in state.no_match_vector[..row_count].iter_mut().enumerate() {
*row = i;
}

let mut items = [Item::default(); BATCH_SIZE];

for row in 0..row_count {
items[row] = Item {
slot: self.init_slot(state.group_hashes[row]),
hash: state.group_hashes[row],
};
state.no_match_vector[row] = row;
}
let mut slots = state.get_temp();
slots.extend(
state.group_hashes[..row_count]
.iter()
.map(|hash| self.init_slot(*hash)),
);

let mut new_group_count = 0;
let mut remaining_entries = row_count;
Expand All @@ -178,11 +172,11 @@ impl HashIndex {

// 1. inject new_group_count, new_entry_count, need_compare_count, no_match_count
for row in state.no_match_vector[..remaining_entries].iter().copied() {
let item = &mut items[row];
let slot = &mut slots[row];
let hash = state.group_hashes[row];

let is_new;
(item.slot, is_new) =
self.find_or_insert(item.slot, Entry::hash_to_salt(item.hash));
(*slot, is_new) = self.find_or_insert(*slot, Entry::hash_to_salt(hash));

if is_new {
state.empty_vector[new_entry_count] = row;
Expand All @@ -200,7 +194,7 @@ impl HashIndex {
adapter.append_rows(state, new_entry_count);

for row in state.empty_vector[..new_entry_count].iter().copied() {
let entry = self.mut_entry(items[row].slot);
let entry = self.mut_entry(slots[row]);
entry.set_pointer(state.addresses[row]);
debug_assert_eq!(entry.get_pointer(), state.addresses[row]);
}
Expand All @@ -212,10 +206,10 @@ impl HashIndex {
.iter()
.copied()
{
let entry = self.mut_entry(items[row].slot);
let entry = self.mut_entry(slots[row]);

debug_assert!(entry.is_occupied());
debug_assert_eq!(entry.get_salt(), (items[row].hash >> 48) as u16);
debug_assert_eq!(entry.get_salt(), (state.group_hashes[row] >> 48) as u16);
state.addresses[row] = entry.get_pointer();
}

Expand All @@ -225,7 +219,7 @@ impl HashIndex {

// 5. Linear probing, just increase iter_times
for row in state.no_match_vector[..no_match_count].iter().copied() {
let slot = &mut items[row].slot;
let slot = &mut slots[row];
*slot += 1;
if *slot >= self.capacity {
*slot = 0;
Expand All @@ -234,6 +228,7 @@ impl HashIndex {
remaining_entries = no_match_count;
}

state.save_temp(slots);
self.count += new_group_count;

new_group_count
Expand Down Expand Up @@ -289,10 +284,8 @@ mod tests {
}

fn init_state(&self) -> ProbeState {
let mut state = ProbeState {
row_count: self.incoming.len(),
..Default::default()
};
let mut state = ProbeState::default();
state.row_count = self.incoming.len();

for (i, (_, hash)) in self.incoming.iter().enumerate() {
state.group_hashes[i] = *hash
Expand Down
6 changes: 0 additions & 6 deletions src/query/expression/src/aggregate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,6 @@ pub use payload_flush::*;
pub use probe_state::*;
use row_ptr::RowPtr;

pub type SelectVector = [usize; BATCH_SIZE];

pub fn new_sel() -> SelectVector {
[0; BATCH_SIZE]
}

// A batch size to probe, flush, repartition, etc.
pub(crate) const BATCH_SIZE: usize = 2048;
const LOAD_FACTOR: f64 = 1.5;
Expand Down
Loading