-
Notifications
You must be signed in to change notification settings - Fork 1.8k
feat: Preserve File Partitioning From File Scans #19124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: Preserve File Partitioning From File Scans #19124
Conversation
…er to set threshold on when to fall back to traitional split and updated comments to reflect this
e4a8d0c to
f05c6fa
Compare
I read this knob as primarily controlling the tradeoff between scan parallelism vs I/O overhead and eliminating shuffles, which is very useful and more expressive than a boolean. However, I think we should also consider the impact of data skew. For heavily skewed tables, preserving file partitioning can make the scan itself significantly unbalanced (one or a few partition groups doing most of the I/O), and in those cases you might actually prefer to pay the shuffle cost rather than constrain execution to the file partition layout. That’s why I think it’s fine to keep a global configuration option, but ideally we would also support passing a different value per scan. In practice you may have:
If adding per-scan configuration is too much for this PR, it’s probably enough to call it out explicitly as follow-up work, but I think this skew aspect is important to keep in mind for the feature’s overall design. |
This is a great call out and I think could see some improvements for file I/O while using this behavior. I did some poking and looks like one approach may be to expose a new listing option, I think this would be great follow-up work as I am trying to keep the scope of this PR tight with good benefits 😄 . |
That’s a very good point. I think we can frame this as two distinct scenarios:
Overall, I believe we should support both options: let DataFusion operate as a library while giving users the flexibility to decide how they want to handle their data. |
Which issue does this PR close?
Rationale for this change
Datafusion does not have the option to preserve file partitioning from file scans, rather it always returns
UknownPartitioning.Some queries and datasets would see great benefits by preserving their explicit partitioning to avoid shuffles. An example of this would be the following scenario:
Say have data partitioned by
f_dkeyand ordered by(f_dkey, timestamp), which is hive-style partitioned:Runnuing the query:
Prior to this PR would produce the plan:
This can be improved. Our data is ordered on
(f_dkey, timestamp), when we hash repartition byf_dkeywe lose our sort ordering thus forcing aSortExecto be inserted after the repartition. You could setdatafusion.optimizer.prefer_existing_sort = true;to preserve the ordering through the repartition and thus preserve the ordering, but with the tradeoff of a more expensive shuffle.Since our data is partitioned by
f_dkeyat file scan time we can eliminate both the hash repartitioning, the eliminating theSortExecin the process. This would result in a plan that looks like:What changes are included in this PR?
I have extended the
FileScanConfig's implementation ofDataSourceto have the ability to preserve hive-style partitioning by declaring it'soutput_partitioningas hash partitoned on the hive columns.When the user sets the option
preserve_file_partitions > 0(0 by default, which is disabled) Datafusion will take advantage of partitioned files. Specifically, whenpreserve_file_partitionsis enabled:preserve_file_partitions> the number distinct partitions found -> we fallback to partitioning by byte rangespreserve_file_partitions<= the number distinct partitions found -> we will keep the file partitioningBecause we can have fewer file partition groups than
target_partitions, forcing a partition group (with possibly large amounts of data) to be read in a single partition can increase file I/O. This configuration choice was made to be able to control the amount of I/O overhead a user is willing to have in order to eliminate shuffles. (This was recommended by @gabotechs and is a great approach to have more granularity over this behavior rather than a boolean flag, thank you)Reusing hash repartitioning has rippling effects throughout query plans, such as propagating through joins and windows as well as preserving order, which is great to see.
Are these changes tested?
Small Data:
The optimized plan is roughly on par with the other plans for preserve_order and preserve_order_join, but it makes preserve_order_window 6.7× faster than not optimized and 4.1× faster than preserve-sort-repartition.
Medium Dataset:
The optimized plan makes preserve_order about 8.0× faster than not optimized (4.2× vs PSR), preserve_order_join about 2.2× faster than not optimized (2.35× vs PSR), and preserve_order_window a huge 19.6× faster than not optimized (11.7× vs PSR).
Large Dataset:
The optimized plan makes preserve_order about 9.3× faster than not optimized (4.3× vs PSR), preserve_order_join about 2.4× faster than not optimized (2.5× vs PSR), and preserve_order_window an extreme 53× faster than not optimized (32× vs PSR).
Are there any user-facing changes?
Yes users now can use the
preserve_file_partitionsoption to define the amount of partitions they want to preserve file partitioning for (0 disabled). If enabled and triggered, users will see elimination of repartitions on their file partition key when appropriate.Follow-Up Work
Hash(a)doesn't satisfyHash(a, b)although it should. This is becauseHash(a)guarantees that all ofais contained in a single partition. Thus, sinceHash(a, b)is a subset ofHash(a), anything that isHash(a)is alsoHash(a, b).timestampand then try to order bydate_bin('1 hour', timestamp), Datafusion will not recognize that this is implicitly satisfied. Thus, for monotonic functions:date_bin,CAST,FLOOR, etc. we should maintain ordering, eliminating unnecessary sorts.