Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
use std::sync::Arc;

use arrow::array::{record_batch, RecordBatch};
use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};

use datafusion::assert_batches_eq;
use datafusion::common::not_impl_err;
use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion::common::{Result, ScalarValue};
use datafusion::common::Result;
use datafusion::datasource::listing::{
ListingTable, ListingTableConfig, ListingTableConfigExt, ListingTableUrl,
};
Expand Down Expand Up @@ -209,14 +209,4 @@ impl PhysicalExprAdapter for CustomCastsPhysicalExprAdapter {
})
.data()
}

fn with_partition_values(
&self,
partition_values: Vec<(FieldRef, ScalarValue)>,
) -> Arc<dyn PhysicalExprAdapter> {
Arc::new(Self {
inner: self.inner.with_partition_values(partition_values),
..self.clone()
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use arrow::array::RecordBatch;
use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use async_trait::async_trait;

use datafusion::assert_batches_eq;
Expand Down Expand Up @@ -287,7 +287,6 @@ impl PhysicalExprAdapterFactory for DefaultValuePhysicalExprAdapterFactory {
logical_file_schema,
physical_file_schema,
default_adapter,
partition_values: Vec::new(),
})
}
}
Expand All @@ -299,7 +298,6 @@ struct DefaultValuePhysicalExprAdapter {
logical_file_schema: SchemaRef,
physical_file_schema: SchemaRef,
default_adapter: Arc<dyn PhysicalExprAdapter>,
partition_values: Vec<(FieldRef, ScalarValue)>,
}

impl PhysicalExprAdapter for DefaultValuePhysicalExprAdapter {
Expand All @@ -316,27 +314,8 @@ impl PhysicalExprAdapter for DefaultValuePhysicalExprAdapter {
.data()?;

// Then apply the default adapter as a fallback to handle standard schema differences
// like type casting, partition column handling, etc.
let default_adapter = if !self.partition_values.is_empty() {
self.default_adapter
.with_partition_values(self.partition_values.clone())
} else {
self.default_adapter.clone()
};

default_adapter.rewrite(rewritten)
}

fn with_partition_values(
&self,
partition_values: Vec<(FieldRef, ScalarValue)>,
) -> Arc<dyn PhysicalExprAdapter> {
Arc::new(DefaultValuePhysicalExprAdapter {
logical_file_schema: self.logical_file_schema.clone(),
physical_file_schema: self.physical_file_schema.clone(),
default_adapter: self.default_adapter.clone(),
partition_values,
})
// like type casting, etc.
self.default_adapter.rewrite(rewritten)
}
}

Expand Down
29 changes: 3 additions & 26 deletions datafusion-examples/examples/data_io/json_shredding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::any::Any;
use std::sync::Arc;

use arrow::array::{RecordBatch, StringArray};
use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};

use datafusion::assert_batches_eq;
use datafusion::common::tree_node::{
Expand Down Expand Up @@ -281,10 +281,8 @@ impl PhysicalExprAdapterFactory for ShreddedJsonRewriterFactory {
.create(logical_file_schema.clone(), physical_file_schema.clone());

Arc::new(ShreddedJsonRewriter {
logical_file_schema,
physical_file_schema,
default_adapter,
partition_values: Vec::new(),
})
}
}
Expand All @@ -293,10 +291,8 @@ impl PhysicalExprAdapterFactory for ShreddedJsonRewriterFactory {
/// and wraps DefaultPhysicalExprAdapter for standard schema adaptation
#[derive(Debug)]
struct ShreddedJsonRewriter {
logical_file_schema: SchemaRef,
physical_file_schema: SchemaRef,
default_adapter: Arc<dyn PhysicalExprAdapter>,
partition_values: Vec<(FieldRef, ScalarValue)>,
}

impl PhysicalExprAdapter for ShreddedJsonRewriter {
Expand All @@ -307,27 +303,8 @@ impl PhysicalExprAdapter for ShreddedJsonRewriter {
.data()?;

// Then apply the default adapter as a fallback to handle standard schema differences
// like type casting, missing columns, and partition column handling
let default_adapter = if !self.partition_values.is_empty() {
self.default_adapter
.with_partition_values(self.partition_values.clone())
} else {
self.default_adapter.clone()
};

default_adapter.rewrite(rewritten)
}

fn with_partition_values(
&self,
partition_values: Vec<(FieldRef, ScalarValue)>,
) -> Arc<dyn PhysicalExprAdapter> {
Arc::new(ShreddedJsonRewriter {
logical_file_schema: self.logical_file_schema.clone(),
physical_file_schema: self.physical_file_schema.clone(),
default_adapter: self.default_adapter.clone(),
partition_values,
})
// like type casting and missing columns
self.default_adapter.rewrite(rewritten)
}
}

Expand Down
13 changes: 13 additions & 0 deletions datafusion/common/src/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ pub trait PruningStatistics {
/// This feeds into [`CompositePruningStatistics`] to allow pruning
/// with filters that depend both on partition columns and data columns
/// (e.g. `WHERE partition_col = data_col`).
#[deprecated(
since = "52.0.0",
note = "This struct is no longer used internally. Use `replace_columns_with_literals` from `datafusion-physical-expr-adapter` to substitute partition column values before pruning. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
)]
#[derive(Clone)]
pub struct PartitionPruningStatistics {
/// Values for each column for each container.
Expand All @@ -156,6 +160,7 @@ pub struct PartitionPruningStatistics {
partition_schema: SchemaRef,
}

#[expect(deprecated)]
impl PartitionPruningStatistics {
/// Create a new instance of [`PartitionPruningStatistics`].
///
Expand Down Expand Up @@ -232,6 +237,7 @@ impl PartitionPruningStatistics {
}
}

#[expect(deprecated)]
impl PruningStatistics for PartitionPruningStatistics {
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
let index = self.partition_schema.index_of(column.name()).ok()?;
Expand Down Expand Up @@ -439,10 +445,15 @@ impl PruningStatistics for PrunableStatistics {
/// the first one is returned without any regard for completeness or accuracy.
/// That is: if the first statistics has information for a column, even if it is incomplete,
/// that is returned even if a later statistics has more complete information.
#[deprecated(
since = "52.0.0",
note = "This struct is no longer used internally. It may be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first. Please open an issue if you have a use case for it."
)]
pub struct CompositePruningStatistics {
pub statistics: Vec<Box<dyn PruningStatistics>>,
}

#[expect(deprecated)]
impl CompositePruningStatistics {
/// Create a new instance of [`CompositePruningStatistics`] from
/// a vector of [`PruningStatistics`].
Expand All @@ -457,6 +468,7 @@ impl CompositePruningStatistics {
}
}

#[expect(deprecated)]
impl PruningStatistics for CompositePruningStatistics {
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
for stats in &self.statistics {
Expand Down Expand Up @@ -513,6 +525,7 @@ impl PruningStatistics for CompositePruningStatistics {
}

#[cfg(test)]
#[expect(deprecated)]
mod tests {
use crate::{
ColumnStatistics,
Expand Down
16 changes: 2 additions & 14 deletions datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ mod tests {
use ::object_store::{path::Path, ObjectMeta};
use arrow::{
array::Int32Array,
datatypes::{DataType, Field, FieldRef, Schema, SchemaRef},
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::RecordBatch,
};
use datafusion_common::{
Expand Down Expand Up @@ -209,19 +209,17 @@ mod tests {
impl PhysicalExprAdapterFactory for TestPhysicalExprAdapterFactory {
fn create(
&self,
logical_file_schema: SchemaRef,
_logical_file_schema: SchemaRef,
physical_file_schema: SchemaRef,
) -> Arc<dyn PhysicalExprAdapter> {
Arc::new(TestPhysicalExprAdapter {
logical_file_schema,
physical_file_schema,
})
}
}

#[derive(Debug)]
struct TestPhysicalExprAdapter {
logical_file_schema: SchemaRef,
physical_file_schema: SchemaRef,
}

Expand All @@ -243,15 +241,5 @@ mod tests {
})
.data()
}

fn with_partition_values(
&self,
_partition_values: Vec<(FieldRef, ScalarValue)>,
) -> Arc<dyn PhysicalExprAdapter> {
Arc::new(TestPhysicalExprAdapter {
logical_file_schema: self.logical_file_schema.clone(),
physical_file_schema: self.physical_file_schema.clone(),
})
}
}
}
13 changes: 1 addition & 12 deletions datafusion/core/tests/parquet/schema_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use std::sync::Arc;

use arrow::array::{record_batch, RecordBatch};
use arrow_schema::{DataType, Field, FieldRef, Schema, SchemaRef};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use bytes::{BufMut, BytesMut};
use datafusion::assert_batches_eq;
use datafusion::common::Result;
Expand Down Expand Up @@ -123,17 +123,6 @@ impl PhysicalExprAdapter for CustomPhysicalExprAdapter {
.data()?;
self.inner.rewrite(expr)
}

fn with_partition_values(
&self,
partition_values: Vec<(FieldRef, ScalarValue)>,
) -> Arc<dyn PhysicalExprAdapter> {
assert!(
partition_values.is_empty(),
"Partition values are not supported in this test"
);
Arc::new(self.clone())
}
}

#[tokio::test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::sync::Arc;

use arrow::array::RecordBatch;

use arrow_schema::{DataType, Field, FieldRef, Schema, SchemaRef};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use bytes::{BufMut, BytesMut};
use datafusion::common::Result;
use datafusion::config::{ConfigOptions, TableParquetOptions};
Expand All @@ -35,7 +35,7 @@ use datafusion::prelude::SessionContext;
use datafusion_common::config::CsvOptions;
use datafusion_common::record_batch;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{ColumnStatistics, ScalarValue};
use datafusion_common::ColumnStatistics;
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_datasource::schema_adapter::{
SchemaAdapter, SchemaAdapterFactory, SchemaMapper,
Expand Down Expand Up @@ -193,19 +193,17 @@ struct UppercasePhysicalExprAdapterFactory;
impl PhysicalExprAdapterFactory for UppercasePhysicalExprAdapterFactory {
fn create(
&self,
logical_file_schema: SchemaRef,
_logical_file_schema: SchemaRef,
physical_file_schema: SchemaRef,
) -> Arc<dyn PhysicalExprAdapter> {
Arc::new(UppercasePhysicalExprAdapter {
logical_file_schema,
physical_file_schema,
})
}
}

#[derive(Debug)]
struct UppercasePhysicalExprAdapter {
logical_file_schema: SchemaRef,
physical_file_schema: SchemaRef,
}

Expand All @@ -226,16 +224,6 @@ impl PhysicalExprAdapter for UppercasePhysicalExprAdapter {
})
.data()
}

fn with_partition_values(
&self,
_partition_values: Vec<(FieldRef, ScalarValue)>,
) -> Arc<dyn PhysicalExprAdapter> {
Arc::new(Self {
logical_file_schema: self.logical_file_schema.clone(),
physical_file_schema: self.physical_file_schema.clone(),
})
}
}

#[derive(Clone)]
Expand Down
Loading