-
Notifications
You must be signed in to change notification settings - Fork 1.8k
feat: add datafusion-physical-adapter, implement predicate adaptation missing fields of structs
#16589
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Marking as draft until I have time do a review myself. @kosiew feel free to take a look if you have time but I expect this needs work before it's ready. |
Cargo.toml
Outdated
| datafusion-macros = { path = "datafusion/macros", version = "48.0.0" } | ||
| datafusion-optimizer = { path = "datafusion/optimizer", version = "48.0.0", default-features = false } | ||
| datafusion-physical-expr = { path = "datafusion/physical-expr", version = "48.0.0", default-features = false } | ||
| datafusion-physical-expr-adapter = { path = "datafusion/physical-expr-adapter", version = "48.0.0", default-features = false } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to make a new package because physical-expr does not have access to functions-nested
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| }; | ||
| use datafusion_physical_expr_common::physical_expr::PhysicalExpr; | ||
|
|
||
| /// Build a struct expression by recursively extracting and rewriting fields from a source struct. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add an example of what this is doing?
It seems like if the table schema is like {a: int, b:int} and the file only has {a: int} this function will build up an expression like
struct(a, source.a, b, null)
Something about relying on struct and field extraction feels very wrong to me. Among other things now this casting may not be consistent with what happens when someone tries to call CAST(..) manually
Is it the case that build_struct_expr would be unecessary if cast had native support for casting structs to structs and filling in nulls 🤔
How about this for an alternate idea:
- Add a
castwrapper in datafusion with the same signature as arrow-rs's cast - If the arguments were both Struct types, apply the special DataFusion casting logic(fill in missing fields with nulls)
- otherwise just call into the existing arrow cast kernel
That would
- Avoid the need to call the struct and get_field functions
- Allow other parts of the system, like coercsion, and SQL, to have the same semantics
We have talked about adding cast for evolving structs in arrow-rs for a while but it seems the consensus was to put it in DataFusion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something about relying on struct and field extraction feels very wrong to me. Among other things now this casting may not be consistent with what happens when someone tries to call CAST(..) manually
Does it have to match? These seem like two similar but not exactly identical use cases to me.
How about this for an alternate idea:
That seems reasonable to me.
Avoid the need to call the struct and get_field functions
I am probably missing something but thinking about it to me it didn't seem like those calls would be much of a problem. Either way a new struct column is going to have to be built and the existing struct column is going to have to be read from disk in its entirety.
But the pushback made me think that maybe we should be doing something else here: we should be inspecting the expression more deeply and in the case of col.field if field is not present in the physical schema we replace the whole get_field(col, 'field') expression with null instead of get_col(struct('other', get_field(col, 'other'), 'field', null))) which is basically what is happening now. Then we avoid reading the column altogether. We can do something similar with casts. I think this is complimentary to your proposal because the fallthrough case for select struct_col can call cast if needed like any other column, and our internal cast implementation can do the "smart things" with structs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something about relying on struct and field extraction feels very wrong to me. Among other things now this casting may not be consistent with what happens when someone tries to call CAST(..) manually
Does it have to match? These seem like two similar but not exactly identical use cases to me.
I worry that if they don't match we'll have potentially divergent behavior which could be confusing to users.
Also another use case is when doing stuff like coercing structs (so for example you could compare two struct columns for equality or UNION them together)
I am probably missing something but thinking about it to me it didn't seem like those calls would be much of a problem. Either way a new struct column is going to have to be built and the existing struct column is going to have to be read from disk in its entirety.
Yeah, I don't think it will be that much faster (maybe it would save some virtual function calls or something). What I am really concerned about is having consistent cast behavior
But the pushback made me think that maybe we should be doing something else here: we should be inspecting the expression more deeply and in the case of
col.fieldiffieldis not present in the physical schema we replace the wholeget_field(col, 'field')expression withnullinstead ofget_col(struct('other', get_field(col, 'other'), 'field', null)))which is basically what is happening now. Then we avoid reading the column altogether. We can do something similar with casts. I think this is complimentary to your proposal because the fallthrough case forselect struct_colcan callcastif needed like any other column, and our internalcastimplementation can do the "smart things" with structs.
👍
|
@alamb I've reworked this as per discussion in #16589 (comment). This now leaves the actual casting work up the cast functions, which means that we can do the work there and it will just trickle through to here. I do still recommend moving forward with this PR because if we ever wanted to reference |
kosiew
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some observations that stood out to me.
Continuing to review...
| Ok(Transformed::no(expr)) | ||
| } | ||
|
|
||
| fn try_rewrite_struct_field_access( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new try_rewrite_struct_field_access handles missing struct fields by returning Null. I tried adding a test case for nested structs (e.g., a.b.c) to ensure recursive behavior but it failed.
#[test]
fn test_rewrite_nested_struct_missing_field() {
let physical_schema = Schema::new(vec![Field::new(
"nested",
DataType::Struct(
vec![Field::new(
"a",
DataType::Struct(vec![Field::new("b", DataType::Utf8, true)].into()),
true,
)]
.into(),
),
true,
)]);
let logical_schema = Schema::new(vec![Field::new(
"nested",
DataType::Struct(
vec![Field::new(
"a",
DataType::Struct(
vec![
Field::new("b", DataType::Utf8, true),
Field::new("c", DataType::Int32, true),
]
.into(),
),
true,
)]
.into(),
),
true,
)]);
let rewriter = PhysicalExprSchemaRewriter::new(&physical_schema, &logical_schema);
let column_expr = Arc::new(Column::new("nested", 0));
let result = rewriter.rewrite(column_expr).unwrap();
let expected = Arc::new(CastExpr::new(
Arc::new(Column::new("nested", 0)),
DataType::Struct(
vec![Field::new(
"a",
DataType::Struct(
vec![
Field::new("b", DataType::Utf8, true),
Field::new("c", DataType::Int32, true),
]
.into(),
),
true,
)]
.into(),
),
None,
)) as Arc<dyn PhysicalExpr>;
assert_eq!(result.to_string(), expected.to_string());
}called `Result::unwrap()` on an `Err` value: Execution("Cannot cast column 'nested' from 'Struct(a Struct(b Utf8))' (physical data type) to 'Struct(a Struct(b Utf8, c Int32))' (logical data type)")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's expected: the point is that we need to implement struct casting as part of the cast operator in general, not as part of this PR. That's the point @alamb made in #16589 (comment). Is there any reason why we haven't done that since you've basically implemented it for SchemaAdapter? I'd think it's the same code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically: I think we need to incorporate your logic from #1637 into CastExpr. Somewhat related to apache/arrow-rs#6735
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aaa..... 🤔
The PR Title - implement predicate adaptation for nested structs and
Functionality equivalent to https://github.com/apache/datafusion/pull/16371 but for https://github.com/apache/datafusion/pull/16461.
gave me the expectation that this PR implements nested struct adaption already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. It used to. I've updated the title and PR description
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can file a ticket to make sure this is properly tracked
| } | ||
|
|
||
| #[test] | ||
| fn test_rewrite_mulit_column_expr_with_type_cast() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason for replacing this multi column test with the single column test_rewrite_column_with_type_cast?
| } | ||
|
|
||
| #[test] | ||
| fn test_rewrite_no_change_needed() -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason for removing this test?
f214376 to
5d5daa9
Compare
|
Looks good actually, just import churn. |
|
I will put this on my review queue for tomorrow |
datafusion-physical-adapter, implement predicate adaptation missing fields of structs
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @adriangb -- this PR makes sense to me
It would be great to unify this struct coercion/casting logic eventually
| This handles cases such as `lit(ScalarValue::Int32(123)) = int64_column` by rewriting it to `lit(ScalarValue::Int32(123)) = cast(int64_column, 'Int32')` | ||
| (note: this does not attempt to then simplify such expressions, that is done by shared simplifiers). | ||
|
|
||
| ## Overview |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend putting this information on the doc comments of PhysicalExprSchemaRewriter itself as it will be more likely someone will find it (either in docs.rs or their IDE) than this readme
The high level overview is cool but I recommend keeping the README relatively brief
| Ok(Transformed::no(expr)) | ||
| } | ||
|
|
||
| fn try_rewrite_struct_field_access( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can file a ticket to make sure this is properly tracked
| None => return Ok(None), | ||
| }; | ||
|
|
||
| if get_field_expr.name() != "get_field" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to check the function name. Isn't the check for downcast_ref().is_none sufficient?
Also, it seems to me that this rewrite is specific to GetFieldFunc so I think it might make sense, long term, to put the function alongside the GetFieldFunc somehow 🤔
Maybe we could add a trait to PhysicalExpr like rewrite_for_schema or something 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've seen / used this pattern a lot. I added 50b1e91 which should simplify it not just for this PR but any other use of matching a specific function from a PhysicalExpr as well.
| None => return Ok(None), | ||
| }; | ||
|
|
||
| let field_name = match lit.value() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
try_as_str might be useful here: https://docs.rs/datafusion/latest/datafusion/common/enum.ScalarValue.html#method.try_as_str
| let result = adapter.rewrite(column_expr); | ||
| assert!(result.is_err()); | ||
| let error_msg = result.unwrap_err().to_string(); | ||
| assert_contains!(error_msg, "Cannot cast column 'data'"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can make this simpler by using unwrap_err I think
| let result = adapter.rewrite(column_expr); | |
| assert!(result.is_err()); | |
| let error_msg = result.unwrap_err().to_string(); | |
| assert_contains!(error_msg, "Cannot cast column 'data'"); | |
| let error_msg = adapter.rewrite(column_expr).unwrap_err().to_string(); | |
| assert_contains!(error_msg, "Cannot cast column 'data'"); |
Similarly for other ones below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks! 16ed5a0
Add back three tests that were removed during the refactoring: - test_rewrite_mulit_column_expr_with_type_cast: Tests complex multi-column expressions - test_rewrite_no_change_needed: Tests that expressions are unchanged when no transformation needed - test_adapt_batches: Example showing how to use the rewriter with RecordBatches These tests provide important coverage and documentation of the API. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
This commit: - Reverts the API change from trait-based to struct-based design - Keeps the new datafusion-physical-expr-adapter crate to avoid circular dependencies - Keeps the try_rewrite_struct_field_access functionality (the main feature) - Restores PhysicalExprAdapter and PhysicalExprAdapterFactory traits - Keeps all tests using the trait-based API The only structural change that remains is moving the code to a new crate, which is necessary to avoid circular dependencies. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
|
@alamb I've also:
Once you confirm those are okay I think we can merge this. I still have concerns about the lack of full support for nested structs but (1) I think partial support is better than no support, we can add it later and (2) I would like to make the breaking change of moving the code ASAP while these public APIs are still brand new and have few users |
|
@kosiew is this PR ok with you? |
Adds predicate adaptation to fill in missing struct fields with null, i.e.
struct_col.field_missing_in_file->null.