Skip to content

Commit ef8e64f

Browse files
committed
feat: infer_schema expands csv and ndjson support
1 parent a139283 commit ef8e64f

File tree

17 files changed

+221
-12
lines changed

17 files changed

+221
-12
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,8 +229,10 @@ arrow = { version = "55" }
229229
arrow-array = { version = "55" }
230230
arrow-buffer = { version = "55" }
231231
arrow-cast = { version = "55", features = ["prettyprint"] }
232+
arrow-csv = { version = "55" }
232233
arrow-data = { version = "55" }
233234
arrow-flight = { version = "55", features = ["flight-sql-experimental", "tls"] }
235+
arrow-json = { version = "55" }
234236
arrow-ipc = { version = "55" }
235237
arrow-ord = { version = "55" }
236238
arrow-schema = { version = "55", features = ["serde"] }

src/query/catalog/src/table_args.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,12 @@ pub fn bool_value(value: &Scalar) -> Result<bool> {
119119
}
120120
}
121121

122+
pub fn i64_value(value: &Scalar) -> Result<i64> {
123+
value.get_i64().ok_or_else(|| {
124+
ErrorCode::BadArguments(format!("invalid value {value} expect to be i64 literal."))
125+
})
126+
}
127+
122128
pub fn string_literal(val: &str) -> Scalar {
123129
Scalar::String(val.to_string())
124130
}

src/query/service/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ io-uring = [
2323
anyhow = { workspace = true }
2424
arrow-array = { workspace = true }
2525
arrow-buffer = { workspace = true }
26+
arrow-csv = { workspace = true }
2627
arrow-flight = { workspace = true }
28+
arrow-json = { workspace = true }
2729
arrow-ipc = { workspace = true, features = ["lz4", "zstd"] }
2830
arrow-schema = { workspace = true }
2931
arrow-select = { workspace = true }

src/query/service/src/table_functions/infer_schema/infer_schema_table.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use databend_common_meta_app::schema::TableInfo;
3232
use databend_common_meta_app::schema::TableMeta;
3333
use databend_common_pipeline_core::Pipeline;
3434

35-
use super::parquet::ParquetInferSchemaSource;
35+
use super::source::InferSchemaSource;
3636
use crate::sessions::TableContext;
3737
use crate::table_functions::infer_schema::table_args::InferSchemaArgsParsed;
3838
use crate::table_functions::TableFunction;
@@ -114,9 +114,7 @@ impl Table for InferSchemaTable {
114114
_put_cache: bool,
115115
) -> Result<()> {
116116
pipeline.add_source(
117-
|output| {
118-
ParquetInferSchemaSource::create(ctx.clone(), output, self.args_parsed.clone())
119-
},
117+
|output| InferSchemaSource::create(ctx.clone(), output, self.args_parsed.clone()),
120118
1,
121119
)?;
122120
Ok(())

src/query/service/src/table_functions/infer_schema/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
// limitations under the License.
1414

1515
mod infer_schema_table;
16-
mod parquet;
16+
mod source;
1717
mod table_args;
1818

1919
pub use infer_schema_table::InferSchemaTable;

src/query/service/src/table_functions/infer_schema/parquet.rs renamed to src/query/service/src/table_functions/infer_schema/source.rs

Lines changed: 83 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,12 @@
1313
// limitations under the License.
1414

1515
use std::collections::BTreeMap;
16+
use std::io::Cursor;
1617
use std::sync::Arc;
1718

19+
use arrow_csv::reader::Format;
20+
use arrow_json::reader::infer_json_schema;
21+
use arrow_schema::Schema as ArrowSchema;
1822
use databend_common_ast::ast::FileLocation;
1923
use databend_common_ast::ast::UriLocation;
2024
use databend_common_catalog::table_context::TableContext;
@@ -26,7 +30,8 @@ use databend_common_expression::types::UInt64Type;
2630
use databend_common_expression::DataBlock;
2731
use databend_common_expression::FromData;
2832
use databend_common_expression::TableSchema;
29-
use databend_common_meta_app::principal::StageFileFormatType;
33+
use databend_common_meta_app::principal::CsvFileFormatParams;
34+
use databend_common_meta_app::principal::FileFormatParams;
3035
use databend_common_meta_app::principal::StageType;
3136
use databend_common_pipeline_core::processors::OutputPort;
3237
use databend_common_pipeline_core::processors::ProcessorPtr;
@@ -37,24 +42,25 @@ use databend_common_storage::init_stage_operator;
3742
use databend_common_storage::read_parquet_schema_async_rs;
3843
use databend_common_storage::StageFilesInfo;
3944
use databend_common_users::Object;
45+
use opendal::Operator;
4046
use opendal::Scheme;
4147

4248
use crate::table_functions::infer_schema::infer_schema_table::INFER_SCHEMA;
4349
use crate::table_functions::infer_schema::table_args::InferSchemaArgsParsed;
4450

45-
pub(crate) struct ParquetInferSchemaSource {
51+
pub(crate) struct InferSchemaSource {
4652
is_finished: bool,
4753
ctx: Arc<dyn TableContext>,
4854
args_parsed: InferSchemaArgsParsed,
4955
}
5056

51-
impl ParquetInferSchemaSource {
57+
impl InferSchemaSource {
5258
pub fn create(
5359
ctx: Arc<dyn TableContext>,
5460
output: Arc<OutputPort>,
5561
args_parsed: InferSchemaArgsParsed,
5662
) -> Result<ProcessorPtr> {
57-
AsyncSourcer::create(ctx.clone(), output, ParquetInferSchemaSource {
63+
AsyncSourcer::create(ctx.clone(), output, InferSchemaSource {
5864
is_finished: false,
5965
ctx,
6066
args_parsed,
@@ -63,7 +69,7 @@ impl ParquetInferSchemaSource {
6369
}
6470

6571
#[async_trait::async_trait]
66-
impl AsyncSource for ParquetInferSchemaSource {
72+
impl AsyncSource for InferSchemaSource {
6773
const NAME: &'static str = INFER_SCHEMA;
6874

6975
#[async_backtrace::framed]
@@ -127,9 +133,9 @@ impl AsyncSource for ParquetInferSchemaSource {
127133
Some(f) => self.ctx.get_file_format(f).await?,
128134
None => stage_info.file_format_params.clone(),
129135
};
130-
let schema = match (first_file.as_ref(), file_format_params.get_type()) {
136+
let schema = match (first_file.as_ref(), file_format_params) {
131137
(None, _) => return Ok(None),
132-
(Some(first_file), StageFileFormatType::Parquet) => {
138+
(Some(first_file), FileFormatParams::Parquet(_)) => {
133139
let arrow_schema = read_parquet_schema_async_rs(
134140
&operator,
135141
&first_file.path,
@@ -138,6 +144,27 @@ impl AsyncSource for ParquetInferSchemaSource {
138144
.await?;
139145
TableSchema::try_from(&arrow_schema)?
140146
}
147+
(Some(first_file), FileFormatParams::Csv(params)) => {
148+
let arrow_schema = read_csv_metadata_async(
149+
&first_file.path,
150+
&operator,
151+
Some(first_file.size),
152+
self.args_parsed.max_records,
153+
&params,
154+
)
155+
.await?;
156+
TableSchema::try_from(&arrow_schema)?
157+
}
158+
(Some(first_file), FileFormatParams::NdJson(_)) => {
159+
let arrow_schema = read_json_metadata_async(
160+
&first_file.path,
161+
&operator,
162+
Some(first_file.size),
163+
self.args_parsed.max_records,
164+
)
165+
.await?;
166+
TableSchema::try_from(&arrow_schema)?
167+
}
141168
_ => {
142169
return Err(ErrorCode::BadArguments(
143170
"infer_schema is currently limited to format Parquet",
@@ -168,3 +195,52 @@ impl AsyncSource for ParquetInferSchemaSource {
168195
Ok(Some(block))
169196
}
170197
}
198+
199+
pub async fn read_csv_metadata_async(
200+
path: &str,
201+
operator: &Operator,
202+
file_size: Option<u64>,
203+
max_records: Option<usize>,
204+
params: &CsvFileFormatParams,
205+
) -> Result<ArrowSchema> {
206+
let file_size = match file_size {
207+
None => operator.stat(path).await?.content_length(),
208+
Some(n) => n,
209+
};
210+
let escape = if params.escape.is_empty() {
211+
None
212+
} else {
213+
Some(params.escape.as_bytes()[0])
214+
};
215+
216+
// TODO: It would be better if it could be read in the form of Read trait
217+
let buf = operator.read_with(path).range(..file_size).await?.to_vec();
218+
let mut format = Format::default()
219+
.with_delimiter(params.field_delimiter.as_bytes()[0])
220+
.with_quote(params.quote.as_bytes()[0])
221+
.with_header(params.headers != 0);
222+
223+
if let Some(escape) = escape {
224+
format = format.with_escape(escape);
225+
}
226+
let (schema, _) = format.infer_schema(Cursor::new(&buf), max_records)?;
227+
228+
Ok(schema)
229+
}
230+
231+
pub async fn read_json_metadata_async(
232+
path: &str,
233+
operator: &Operator,
234+
file_size: Option<u64>,
235+
max_records: Option<usize>,
236+
) -> Result<ArrowSchema> {
237+
let file_size = match file_size {
238+
None => operator.stat(path).await?.content_length(),
239+
Some(n) => n,
240+
};
241+
// TODO: It would be better if it could be read in the form of Read trait
242+
let buf = operator.read_with(path).range(..file_size).await?.to_vec();
243+
let (schema, _) = infer_json_schema(Cursor::new(&buf), max_records)?;
244+
245+
Ok(schema)
246+
}

src/query/service/src/table_functions/infer_schema/table_args.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use databend_common_catalog::table_args::i64_value;
1516
use databend_common_catalog::table_args::TableArgs;
1617
use databend_common_exception::ErrorCode;
1718
use databend_common_exception::Result;
@@ -24,6 +25,7 @@ pub(crate) struct InferSchemaArgsParsed {
2425
pub(crate) connection_name: Option<String>,
2526
pub(crate) file_format: Option<String>,
2627
pub(crate) files_info: StageFilesInfo,
28+
pub(crate) max_records: Option<usize>,
2729
}
2830

2931
impl InferSchemaArgsParsed {
@@ -38,6 +40,7 @@ impl InferSchemaArgsParsed {
3840
files: None,
3941
pattern: None,
4042
};
43+
let mut max_records = None;
4144

4245
for (k, v) in &args {
4346
match k.to_lowercase().as_str() {
@@ -53,6 +56,9 @@ impl InferSchemaArgsParsed {
5356
"file_format" => {
5457
file_format = Some(string_value(v)?);
5558
}
59+
"max_records_pre_file" => {
60+
max_records = Some(i64_value(v)? as usize);
61+
}
5662
_ => {
5763
return Err(ErrorCode::BadArguments(format!(
5864
"unknown param {} for infer_schema",
@@ -70,6 +76,7 @@ impl InferSchemaArgsParsed {
7076
connection_name,
7177
file_format,
7278
files_info,
79+
max_records,
7380
})
7481
}
7582
}

tests/data/csv/max_records.csv

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
id,value
2+
1,100
3+
2,200
4+
3,300
5+
4,400
6+
5,500
7+
6,foo
8+
7,bar
9+
8,baz
10+
9,qux
11+
10,quux

tests/data/csv/mixed.csv

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
id,name,score,active
2+
1,Alice,88.5,true
3+
2,Bob,92.0,false
4+
3,Charlie,,true

0 commit comments

Comments
 (0)