Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 44 additions & 15 deletions src/common/column/src/types/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,6 @@ impl Neg for months_days_micros {
}
}

/// The in-memory representation of the MonthDayNano variant of the "Interval" logical type.
#[derive(
Debug,
Copy,
Expand All @@ -460,12 +459,12 @@ pub struct timestamp_tz(pub i128);

impl Hash for timestamp_tz {
fn hash<H: Hasher>(&self, state: &mut H) {
self.total_micros().hash(state)
self.utc_timestamp().hash(state)
}
}
impl PartialEq for timestamp_tz {
fn eq(&self, other: &Self) -> bool {
self.total_micros() == other.total_micros()
self.utc_timestamp() == other.utc_timestamp()
}
}
impl PartialOrd for timestamp_tz {
Expand All @@ -476,23 +475,30 @@ impl PartialOrd for timestamp_tz {

impl Ord for timestamp_tz {
fn cmp(&self, other: &Self) -> Ordering {
let total_micros = self.total_micros();
let other_micros = other.total_micros();
total_micros.cmp(&other_micros)
let timestamp = self.utc_timestamp();
let other_micros = other.utc_timestamp();
timestamp.cmp(&other_micros)
}
}

impl timestamp_tz {
pub const MICROS_PER_SECOND: i64 = 1_000_000;

#[inline]
pub fn new(timestamp: i64, offset: i32) -> Self {
let ts = timestamp as u64 as i128; // <- 中间加一次 u64 屏蔽符号位
let ts = timestamp as u64 as i128;
let off = (offset as i128) << 64;
Self(off | ts)
}

#[inline]
pub fn timestamp(&self) -> i64 {
pub fn new_local(timestamp: i64, offset: i32) -> Self {
let ts = Self::encode_utc(timestamp, offset);
Self::new(ts, offset)
}

#[inline]
pub fn utc_timestamp(&self) -> i64 {
self.0 as u64 as i64
}

Expand All @@ -506,33 +512,56 @@ impl timestamp_tz {
(self.seconds_offset() as i64).checked_mul(Self::MICROS_PER_SECOND)
}

#[inline]
pub fn micros_offset_inner(seconds: i64) -> Option<i64> {
seconds.checked_mul(Self::MICROS_PER_SECOND)
}

#[inline]
pub fn hours_offset(&self) -> i8 {
(self.seconds_offset() / 3600) as i8
}

#[inline]
pub fn total_micros(&self) -> i64 {
self.try_total_micros().unwrap_or_else(|| {
pub fn local_timestamp(&self) -> i64 {
self.try_local_timestamp().unwrap_or_else(|| {
error!(
"interval is out of range: timestamp={}, offset={}",
self.timestamp(),
"timestamp_with_offset is out of range: timestamp={}, offset={}",
self.utc_timestamp(),
self.seconds_offset()
);
0
})
}

#[inline]
pub fn try_total_micros(&self) -> Option<i64> {
pub fn try_local_timestamp(&self) -> Option<i64> {
let offset_micros = self.micros_offset()?;
self.timestamp().checked_sub(offset_micros)
self.utc_timestamp().checked_add(offset_micros)
}

#[inline]
fn encode_utc(timestamp: i64, offset: i32) -> i64 {
let Some(offset_micros) = Self::micros_offset_inner(offset as i64) else {
error!(
"timestamp offset is out of range: timestamp={}, offset={}",
timestamp, offset
);
return 0;
};
timestamp.checked_sub(offset_micros).unwrap_or_else(|| {
error!(
"timestamp is out of range: timestamp={}, offset={}",
timestamp, offset
);
0
})
}
}

impl Display for timestamp_tz {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let timestamp = Timestamp::from_microsecond(self.total_micros()).unwrap();
let timestamp = Timestamp::from_microsecond(self.utc_timestamp()).unwrap();

let offset = tz::Offset::from_seconds(self.seconds_offset()).unwrap();
let string = strtime::format(
Expand Down
9 changes: 9 additions & 0 deletions src/query/expression/src/row_encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use databend_common_column::types::months_days_micros;
use databend_common_column::types::timestamp_tz;

use crate::types::i256;
use crate::types::F32;
Expand Down Expand Up @@ -103,3 +104,11 @@ impl FixedLengthEncoding for months_days_micros {
self.total_micros().encode()
}
}

impl FixedLengthEncoding for timestamp_tz {
type Encoded = [u8; 8];

fn encode(self) -> [u8; 8] {
self.utc_timestamp().encode()
}
}
47 changes: 37 additions & 10 deletions src/query/expression/src/types/timestamp_tz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,41 @@ pub fn string_to_timestamp_tz<'a, F: FnOnce() -> &'a TimeZone>(
.or_else(|_| fmt::strtime::parse("%Y-%m-%d %H:%M:%S%.f %z", ts_str))
.or_else(|_| fmt::strtime::parse("%Y-%m-%d %H:%M:%S%.f %:z", ts_str))
.or_else(|_| fmt::strtime::parse("%Y-%m-%d %H:%M:%S%.f", ts_str))?;
let datetime = time.to_datetime()?;
let timestamp = tz::offset(0).to_timestamp(datetime)?;
let offset = time
.offset()
.unwrap_or_else(|| fn_tz().to_offset(timestamp));

Ok(timestamp_tz::new(
timestamp.as_microsecond(),
offset.seconds(),
))
match time.offset() {
None => {
let datetime = time.to_datetime()?;
let timestamp = tz::offset(0).to_timestamp(datetime)?;
let offset = fn_tz().to_offset(timestamp);

Ok(timestamp_tz::new_local(
timestamp.as_microsecond(),
offset.seconds(),
))
}
Some(offset) => {
let timestamp = time.to_timestamp()?;

Ok(timestamp_tz::new(
timestamp.as_microsecond(),
offset.seconds(),
))
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn stores_utc_in_timestamp_field() {
let tz = TimeZone::get("Asia/Shanghai").unwrap();
let value = string_to_timestamp_tz(b"2021-12-20 17:01:01 +0800", || &tz).expect("parse tz");
assert_eq!(value.seconds_offset(), 28_800);
// timestamp() keeps the UTC instant (09:01:01).
assert_eq!(value.utc_timestamp(), 1_639_990_861_000_000);
// local_timestamp() reconstructs the local wall clock (17:01:01).
assert_eq!(value.local_timestamp(), 1_640_019_661_000_000);
assert_eq!(value.to_string(), "2021-12-20 17:01:01.000000 +0800");
}
}
2 changes: 1 addition & 1 deletion src/query/expression/src/types/variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ pub fn cast_scalar_to_variant(
ScalarRef::String(s) => jsonb::Value::String(s.into()),
ScalarRef::Timestamp(ts) => jsonb::Value::Timestamp(jsonb::Timestamp { value: ts }),
ScalarRef::TimestampTz(ts_tz) => jsonb::Value::TimestampTz(jsonb::TimestampTz {
value: ts_tz.timestamp(),
value: ts_tz.utc_timestamp(),
offset: ts_tz.hours_offset(),
}),
ScalarRef::Date(d) => jsonb::Value::Date(jsonb::Date { value: d }),
Expand Down
10 changes: 5 additions & 5 deletions src/query/functions/src/scalars/timestamp/src/datetime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ fn timestamp_tz_domain_to_timestamp_domain(
domain: &SimpleDomain<timestamp_tz>,
) -> Option<SimpleDomain<i64>> {
Some(SimpleDomain {
min: domain.min.total_micros(),
max: domain.max.total_micros(),
min: domain.min.utc_timestamp(),
max: domain.max.utc_timestamp(),
})
}

Expand Down Expand Up @@ -689,7 +689,7 @@ fn register_timestamp_to_timestamp_tz(registry: &mut FunctionRegistry) {
}
};
let offset = ctx.func_ctx.tz.to_offset(ts);
let ts_tz = timestamp_tz::new(val, offset.seconds());
let ts_tz = timestamp_tz::new_local(val, offset.seconds());

output.push(ts_tz)
})(val, ctx)
Expand All @@ -712,7 +712,7 @@ fn register_timestamp_tz_to_timestamp(registry: &mut FunctionRegistry) {
ctx: &mut EvalContext,
) -> Value<TimestampType> {
vectorize_with_builder_1_arg::<TimestampTzType, TimestampType>(|val, output, _ctx| {
output.push(val.total_micros())
output.push(val.utc_timestamp())
})(val, ctx)
}
}
Expand Down Expand Up @@ -956,7 +956,7 @@ fn register_timestamp_tz_to_date(registry: &mut FunctionRegistry) {
let offset = Offset::from_seconds(val.seconds_offset()).map_err(|err| err.to_string())?;

Ok(val
.timestamp()
.utc_timestamp()
.to_timestamp(TimeZone::fixed(offset))
.date()
.since((Unit::Day, Date::new(1970, 1, 1).unwrap()))
Expand Down
11 changes: 6 additions & 5 deletions src/query/functions/src/scalars/timestamp/src/interval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ fn register_interval_add_sub_mul(registry: &mut FunctionRegistry) {
return;
}
};
eval_timestamp_plus(a, b, output, ctx, |input| input.timestamp(), |result| timestamp_tz::new(result, a.seconds_offset()), TimeZone::fixed(offset));
eval_timestamp_plus(a, b, output, ctx, |input| input.local_timestamp(), |result| timestamp_tz::new_local(result, a.seconds_offset()), TimeZone::fixed(offset));
},
),
);
Expand Down Expand Up @@ -183,7 +183,7 @@ fn register_interval_add_sub_mul(registry: &mut FunctionRegistry) {
return;
}
};
eval_timestamp_plus(a, b, output, ctx, |input| input.timestamp(), |result| timestamp_tz::new(result, a.seconds_offset()), TimeZone::fixed(offset));
eval_timestamp_plus(a, b, output, ctx, |input| input.local_timestamp(), |result| timestamp_tz::new_local(result, a.seconds_offset()), TimeZone::fixed(offset));
},
),
);
Expand Down Expand Up @@ -235,7 +235,7 @@ fn register_interval_add_sub_mul(registry: &mut FunctionRegistry) {
return;
}
};
eval_timestamp_minus(a, b, output, ctx, |input| input.timestamp(), |result| timestamp_tz::new(result, a.seconds_offset()), TimeZone::fixed(offset));
eval_timestamp_minus(a, b, output, ctx, |input| input.local_timestamp(), |result| timestamp_tz::new_local(result, a.seconds_offset()), TimeZone::fixed(offset));
},
),
);
Expand Down Expand Up @@ -268,7 +268,7 @@ fn register_interval_add_sub_mul(registry: &mut FunctionRegistry) {
vectorize_with_builder_2_arg::<TimestampTzType, TimestampTzType, IntervalType>(
|t1, t2, output, ctx| {
let fn_to_zoned = |ts_tz: timestamp_tz| {
let ts = Timestamp::from_microsecond(ts_tz.timestamp())?;
let ts = Timestamp::from_microsecond(ts_tz.utc_timestamp())?;
let zone = TimeZone::fixed(Offset::from_seconds(ts_tz.seconds_offset())?);

Result::Ok(ts.to_zoned(zone))
Expand Down Expand Up @@ -331,7 +331,8 @@ fn register_interval_add_sub_mul(registry: &mut FunctionRegistry) {

let zone = TimeZone::fixed(Offset::from_seconds(t2.seconds_offset())?);
let today_date = today_date(&ctx.func_ctx.now, &zone);
let mut t2 = Timestamp::from_microsecond(t2.timestamp())?.to_zoned(zone.clone());
let mut t2 =
Timestamp::from_microsecond(t2.utc_timestamp())?.to_zoned(zone.clone());
let mut t1 = calc_date_to_timestamp(today_date, zone.clone())?.to_timestamp(zone);

if t1 < t2 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ use std::ops::Range;
use databend_common_column::bitmap::Bitmap;
use databend_common_column::buffer::Buffer;
use databend_common_column::types::months_days_micros;
use databend_common_column::types::timestamp_tz;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::types::binary::BinaryColumn;
use databend_common_expression::types::binary::BinaryColumnBuilder;
use databend_common_expression::types::i256;
use databend_common_expression::types::nullable::NullableColumn;
use databend_common_expression::types::timestamp_tz::TimestampTzType;
use databend_common_expression::types::AccessType;
use databend_common_expression::types::BinaryType;
use databend_common_expression::types::BooleanType;
Expand Down Expand Up @@ -128,6 +130,7 @@ impl RowConverter<VariableRows> for VariableRowConverter {
| DataType::Number(_)
| DataType::Decimal(_)
| DataType::Timestamp
| DataType::TimestampTz
| DataType::Interval
| DataType::Date
| DataType::Binary
Expand Down Expand Up @@ -337,6 +340,11 @@ impl LengthCalculatorVisitor<'_> {
*length += i64::ENCODED_LEN
}
}
DataType::TimestampTz => {
for length in self.lengths.iter_mut() {
*length += timestamp_tz::ENCODED_LEN
}
}
DataType::Date => {
for length in self.lengths.iter_mut() {
*length += i32::ENCODED_LEN
Expand Down Expand Up @@ -570,6 +578,21 @@ impl EncodeVisitor<'_> {
self.field.nulls_first,
);
}
DataType::TimestampTz => {
let scalar_value = if is_null {
timestamp_tz::default()
} else {
*scalar.as_timestamp_tz().unwrap()
};
fixed_encode_const::<TimestampTzType>(
&mut self.out.data,
&mut self.out.offsets,
is_null,
scalar_value,
self.field.asc,
self.field.nulls_first,
);
}
DataType::Date => {
let scalar_value = if is_null {
0i32
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,20 @@ insert into t values(1, '2022-02-03T03:02:00+0800')
statement ok
drop table t

query TT
WITH t AS (
SELECT to_timestamp_tz('2021-12-20 09:00:01 +00:00') AS v, 'A_09:00@UTC' AS tag
UNION ALL
SELECT to_timestamp_tz('2021-12-20 17:00:00 +08:00'), 'B_17:00@+8'
UNION ALL
SELECT to_timestamp_tz('2021-12-20 09:30:00 +00:00'), 'C_09:30@UTC'
)
SELECT tag, v FROM t ORDER BY v;
----
B_17:00@+8 2021-12-20 17:00:00.000000 +0800
A_09:00@UTC 2021-12-20 09:00:01.000000 +0000
C_09:30@UTC 2021-12-20 09:30:00.000000 +0000

statement ok
set timezone='UTC'

Expand Down
Loading