Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
e28fe06
feat: `infer_schema` expands csv and ndjson support
KKould Aug 18, 2025
f39edd1
chore: codefmt
KKould Aug 18, 2025
cc4d62e
chore: add check on csv and ndjson compression
KKould Aug 19, 2025
1481aa6
chore: add `max_bytes`
KKould Aug 19, 2025
54ed208
feat: support compressed files for infer_schema csv ndjson
KKould Aug 20, 2025
7075934
chore: add xz on `infer_schema.test`
KKould Aug 20, 2025
7ef9f88
chore: codefmt
KKould Aug 20, 2025
69dbbd3
feat(infer_schema): remove max_bytes and automatically infer the leng…
KKould Aug 20, 2025
684918c
test: add more type test for infer_schema
KKould Aug 20, 2025
9be4b6f
test: add array & object type ndjson test for infer_schema
KKould Aug 21, 2025
b2a6327
chore: add file size check and throw more detailed errors for json
KKould Aug 21, 2025
41b221d
chore: codefmt
KKould Aug 21, 2025
dd452b7
feat: Support multiple file scanning for `infer_schema`
KKould Aug 26, 2025
c66aae7
refactor: using Pipeline as an implementation of `infer_schema` for C…
KKould Sep 3, 2025
178aacf
feat: InferSeparator multi-file processing and Schema promote merging…
KKould Sep 4, 2025
4bd26e5
chore: codefmt
KKould Sep 4, 2025
fb7fd0e
feat: impl `max_file_count` for `infer_schema`
KKould Sep 9, 2025
b26101e
chore: codefmt
KKould Sep 9, 2025
fc6ce4b
feat: impl `max_file_count` for `infer_schema`
KKould Sep 9, 2025
4b6ef6d
chore: codefmt
KKould Sep 9, 2025
a77260d
Merge branch 'main' into feat/infer_schema_for_csv_ndjson
KKould Sep 10, 2025
1eb9928
Merge branch 'main' into feat/infer_schema_for_csv_ndjson
KKould Sep 10, 2025
282e0db
Merge branch 'main' into feat/infer_schema_for_csv_ndjson
KKould Sep 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,11 @@ arrow = { version = "55" }
arrow-array = { version = "55" }
arrow-buffer = { version = "55" }
arrow-cast = { version = "55", features = ["prettyprint"] }
arrow-csv = { version = "55" }
arrow-data = { version = "55" }
arrow-flight = { version = "55", features = ["flight-sql-experimental", "tls"] }
arrow-ipc = { version = "55", features = ["lz4", "zstd"] }
arrow-json = { version = "55" }
arrow-ord = { version = "55" }
arrow-schema = { version = "55", features = ["serde"] }
arrow-select = { version = "55" }
Expand Down
6 changes: 6 additions & 0 deletions src/query/catalog/src/table_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ pub fn bool_value(value: &Scalar) -> Result<bool> {
}
}

pub fn i64_value(value: &Scalar) -> Result<i64> {
value.get_i64().ok_or_else(|| {
ErrorCode::BadArguments(format!("invalid value {value} expect to be i64 literal."))
})
}

pub fn string_literal(val: &str) -> Scalar {
Scalar::String(val.to_string())
}
Expand Down
3 changes: 3 additions & 0 deletions src/query/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ io-uring = [
anyhow = { workspace = true }
arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
arrow-csv = { workspace = true }
arrow-flight = { workspace = true }
arrow-ipc = { workspace = true }
arrow-json = { workspace = true }
arrow-schema = { workspace = true }
arrow-select = { workspace = true }
arrow-udf-runtime = { workspace = true }
Expand Down Expand Up @@ -54,6 +56,7 @@ databend-common-cache = { workspace = true }
databend-common-catalog = { workspace = true }
databend-common-cloud-control = { workspace = true }
databend-common-column = { workspace = true }
databend-common-compress = { workspace = true }
databend-common-config = { workspace = true }
databend-common-exception = { workspace = true }
databend-common-expression = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,52 @@
// limitations under the License.

use std::any::Any;
use std::collections::BTreeMap;
use std::sync::Arc;

use databend_common_ast::ast::FileLocation;
use databend_common_ast::ast::UriLocation;
use databend_common_catalog::plan::DataSourcePlan;
use databend_common_catalog::plan::PartInfo;
use databend_common_catalog::plan::PartStatistics;
use databend_common_catalog::plan::Partitions;
use databend_common_catalog::plan::PartitionsShuffleKind;
use databend_common_catalog::plan::PushDownInfo;
use databend_common_catalog::plan::StageTableInfo;
use databend_common_catalog::table::Table;
use databend_common_catalog::table_args::TableArgs;
use databend_common_compress::CompressAlgorithm;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::types::NumberDataType;
use databend_common_expression::BlockThresholds;
use databend_common_expression::TableDataType;
use databend_common_expression::TableField;
use databend_common_expression::TableSchema;
use databend_common_expression::TableSchemaRefExt;
use databend_common_meta_app::principal::FileFormatParams;
use databend_common_meta_app::principal::StageInfo;
use databend_common_meta_app::principal::StageType;
use databend_common_meta_app::schema::TableIdent;
use databend_common_meta_app::schema::TableInfo;
use databend_common_meta_app::schema::TableMeta;
use databend_common_pipeline_core::Pipeline;
use databend_common_pipeline_sources::PrefetchAsyncSourcer;
use databend_common_pipeline_transforms::TransformPipelineHelper;
use databend_common_sql::binder::resolve_file_location;
use databend_common_storage::init_stage_operator;
use databend_common_storage::StageFilesInfo;
use databend_common_storages_stage::BytesReader;
use databend_common_storages_stage::Decompressor;
use databend_common_storages_stage::InferSchemaPartInfo;
use databend_common_storages_stage::LoadContext;
use databend_common_users::Object;
use databend_storages_common_stage::SingleFilePartition;
use opendal::Scheme;

use super::parquet::ParquetInferSchemaSource;
use crate::sessions::TableContext;
use crate::table_functions::infer_schema::separator::InferSchemaSeparator;
use crate::table_functions::infer_schema::table_args::InferSchemaArgsParsed;
use crate::table_functions::TableFunction;

Expand Down Expand Up @@ -77,9 +102,27 @@ impl InferSchemaTable {
TableField::new("column_name", TableDataType::String),
TableField::new("type", TableDataType::String),
TableField::new("nullable", TableDataType::Boolean),
TableField::new("filenames", TableDataType::String),
TableField::new("order_id", TableDataType::Number(NumberDataType::UInt64)),
])
}

fn build_read_stage_source(
ctx: Arc<dyn TableContext>,
pipeline: &mut Pipeline,
stage_info: &StageInfo,
) -> Result<()> {
let operator = init_stage_operator(stage_info)?;
let batch_size = ctx.get_settings().get_input_read_buffer_size()? as usize;
pipeline.add_source(
|output| {
let reader = BytesReader::try_create(ctx.clone(), operator.clone(), batch_size, 1)?;
PrefetchAsyncSourcer::create(ctx.clone(), output, reader)
},
1,
)?;
Ok(())
}
}

#[async_trait::async_trait]
Expand All @@ -95,11 +138,72 @@ impl Table for InferSchemaTable {
#[async_backtrace::framed]
async fn read_partitions(
&self,
_ctx: Arc<dyn TableContext>,
ctx: Arc<dyn TableContext>,
_push_downs: Option<PushDownInfo>,
_dry_run: bool,
) -> Result<(PartStatistics, Partitions)> {
Ok((PartStatistics::default(), Partitions::default()))
let file_location = if let Some(location) =
self.args_parsed.location.clone().strip_prefix('@')
{
FileLocation::Stage(location.to_string())
} else if let Some(connection_name) = &self.args_parsed.connection_name {
let conn = ctx.get_connection(connection_name).await?;
let uri =
UriLocation::from_uri(self.args_parsed.location.clone(), conn.storage_params)?;
let proto = conn.storage_type.parse::<Scheme>()?;
if proto != uri.protocol.parse::<Scheme>()? {
return Err(ErrorCode::BadArguments(format!(
"protocol from connection_name={connection_name} ({proto}) not match with uri protocol ({0}).",
uri.protocol
)));
}
FileLocation::Uri(uri)
} else {
let uri =
UriLocation::from_uri(self.args_parsed.location.clone(), BTreeMap::default())?;
FileLocation::Uri(uri)
};
let (stage_info, path) = resolve_file_location(ctx.as_ref(), &file_location).await?;
let enable_experimental_rbac_check =
ctx.get_settings().get_enable_experimental_rbac_check()?;
if enable_experimental_rbac_check {
let visibility_checker = ctx.get_visibility_checker(false, Object::Stage).await?;
if !(stage_info.is_temporary
|| visibility_checker.check_stage_read_visibility(&stage_info.stage_name)
|| stage_info.stage_type == StageType::User
&& stage_info.stage_name == ctx.get_current_user()?.name)
{
return Err(ErrorCode::PermissionDenied(format!(
"Permission denied: privilege READ is required on stage {} for user {}",
stage_info.stage_name.clone(),
&ctx.get_current_user()?.identity().display(),
)));
}
}
let files_info = StageFilesInfo {
path: path.clone(),
..self.args_parsed.files_info.clone()
};

let file_format_params = match &self.args_parsed.file_format {
Some(f) => ctx.get_file_format(f).await?,
None => stage_info.file_format_params.clone(),
};
let operator = init_stage_operator(&stage_info)?;
let stage_file_infos = files_info
.list(&operator, 1, self.args_parsed.max_file_count)
.await?;
Ok((
PartStatistics::default(),
Partitions::create(PartitionsShuffleKind::Seq, vec![
InferSchemaPartInfo::create(
files_info,
file_format_params,
stage_info,
stage_file_infos,
),
]),
))
}

fn table_args(&self) -> Option<TableArgs> {
Expand All @@ -113,12 +217,98 @@ impl Table for InferSchemaTable {
pipeline: &mut Pipeline,
_put_cache: bool,
) -> Result<()> {
pipeline.add_source(
|output| {
ParquetInferSchemaSource::create(ctx.clone(), output, self.args_parsed.clone())
},
1,
)?;
let Some(part) = ctx.get_partition() else {
return Ok(());
};
let info = InferSchemaPartInfo::from_part(&part)?;

match info.file_format_params {
FileFormatParams::Csv(_) | FileFormatParams::NdJson(_) => {
let partitions = info
.stage_file_infos
.iter()
.map(|v| {
let part = SingleFilePartition {
path: v.path.clone(),
size: v.size as usize,
};
let part_info: Box<dyn PartInfo> = Box::new(part);
Arc::new(part_info)
})
.collect::<Vec<_>>();
ctx.set_partitions(Partitions::create(PartitionsShuffleKind::Seq, partitions))?;
Self::build_read_stage_source(ctx.clone(), pipeline, &info.stage_info)?;

let stage_table_info = StageTableInfo {
stage_root: "".to_string(),
stage_info: info.stage_info.clone(),
schema: Arc::new(Default::default()),
default_exprs: None,
files_info: info.files_info.clone(),
files_to_copy: None,
duplicated_files_detected: vec![],
is_select: false,
copy_into_table_options: Default::default(),
is_variant: false,
};

let load_ctx = Arc::new(LoadContext::try_create_for_copy(
ctx.clone(),
&stage_table_info,
None,
BlockThresholds::default(),
vec![],
)?);

let mut algo = None;

for file_info in info.stage_file_infos.iter() {
let Some(new_algo) = CompressAlgorithm::from_path(&file_info.path) else {
continue;
};

if let Some(algo) = algo {
if algo != new_algo {
return Err(ErrorCode::UnknownCompressionType(
"`infer_schema` only supports single compression type",
));
}
}
algo = Some(new_algo);
}
if algo.is_some() {
pipeline.try_add_accumulating_transformer(|| {
Decompressor::try_create(load_ctx.clone(), algo)
})?;
}
pipeline.add_accumulating_transformer(|| {
InferSchemaSeparator::create(
info.file_format_params.clone(),
self.args_parsed.max_records,
info.stage_file_infos.len(),
)
});
}
FileFormatParams::Parquet(_) => {
pipeline.add_source(
|output| {
ParquetInferSchemaSource::create(
ctx.clone(),
output,
info.stage_info.clone(),
info.stage_file_infos.clone(),
)
},
1,
)?;
}
_ => {
return Err(ErrorCode::BadArguments(
"infer_schema is currently limited to format Parquet, CSV and NDJSON",
));
}
}

Ok(())
}
}
Expand Down
Loading