Skip to content

Conversation

@gene-bordegaray
Copy link
Contributor

@gene-bordegaray gene-bordegaray commented Dec 5, 2025

Which issue does this PR close?

Rationale for this change

Datafusion does not have the option to preserve file partitioning from file scans, rather it always returns UknownPartitioning.

Some queries and datasets would see great benefits by preserving their explicit partitioning to avoid shuffles. An example of this would be the following scenario:

Say have data partitioned by f_dkey and ordered by (f_dkey, timestamp), which is hive-style partitioned:

f_dkey=A/data.parquet
f_dkey=B/data.parquet'
f_dkey=C/data.parquet'
...

Each table (partitioned by f_dkey and sorted internally sorted by timestamp):
| f_dkey | timestamp              | value  |
|--------|------------------------|--------|
| A      | 2023-01-01T09:00:00    | 95.5   |
| A      | 2023-01-01T09:00:10    | 102.3  |
| A      | 2023-01-01T09:00:20    | 98.7   |
| A      | 2023-01-01T09:12:20    | 105.1  |
| A      | 2023-01-01T09:12:30    | 100.0  |
| A      | 2023-01-01T09:12:40    | 150.0  |
| A      | 2023-01-01T09:12:50    | 120.8  |

Runnuing the query:

EXPLAIN SELECT f_dkey, count(*), avg(value) 
FROM fact_table_ordered 
GROUP BY f_dkey 
ORDER BY f_dkey;

Prior to this PR would produce the plan:

01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST]
02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as count(*), avg(fact_table_ordered.value)@2 as avg(fact_table_ordered.value)]
03)----AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey], aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
04)------SortExec: expr=[f_dkey@0 ASC NULLS LAST], preserve_partitioning=[true]
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------RepartitionExec: partitioning=Hash([f_dkey@0], 3), input_partitions=3
07)------------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey], aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
08)--------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], file_type=parquet

This can be improved. Our data is ordered on (f_dkey, timestamp), when we hash repartition by f_dkey we lose our sort ordering thus forcing a SortExec to be inserted after the repartition. You could set datafusion.optimizer.prefer_existing_sort = true; to preserve the ordering through the repartition and thus preserve the ordering, but with the tradeoff of a more expensive shuffle.

Since our data is partitioned by f_dkey at file scan time we can eliminate both the hash repartitioning, the eliminating the SortExec in the process. This would result in a plan that looks like:

01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST]
02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as count(*), avg(fact_table_ordered.value)@2 as avg(fact_table_ordered.value)]
03)----AggregateExec: mode=SinglePartitioned, gby=[f_dkey@1 as f_dkey], aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
04)------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], file_type=parquet

What changes are included in this PR?

I have extended the FileScanConfig's implementation of DataSource to have the ability to preserve hive-style partitioning by declaring it's output_partitioning as hash partitoned on the hive columns.

When the user sets the option preserve_file_partitions > 0 (0 by default, which is disabled) Datafusion will take advantage of partitioned files. Specifically, when preserve_file_partitions is enabled:

  1. if preserve_file_partitions > the number distinct partitions found -> we fallback to partitioning by byte ranges
  2. if preserve_file_partitions <= the number distinct partitions found -> we will keep the file partitioning

Because we can have fewer file partition groups than target_partitions, forcing a partition group (with possibly large amounts of data) to be read in a single partition can increase file I/O. This configuration choice was made to be able to control the amount of I/O overhead a user is willing to have in order to eliminate shuffles. (This was recommended by @gabotechs and is a great approach to have more granularity over this behavior rather than a boolean flag, thank you)

Reusing hash repartitioning has rippling effects throughout query plans, such as propagating through joins and windows as well as preserving order, which is great to see.

Are these changes tested?

  • Added unit tests for new functionality
  • Added sqllogictests to confirm end to end behavior
  • Added new benchmark that compares queries with: no optimization, preserver order through repartitions, and preserve partitioning from file scans (this PR). We see nice speed ups and this scales linearly as data grows (table with results below)

Small Data:

  • Normal config: 10 partitions × 1000000 rows = 10000000 total rows
  • High-cardinality config: 25 partitions × 500000 rows = 12500000 total rows
Plan preserve_order (ms) Speedup vs not opt Speedup vs psr preserve_order_join (ms) Speedup vs not opt Speedup vs psr preserve_order_window (ms) Speedup vs not opt Speedup vs psr
not optimized 17.268 - - 41.521 - - 28.796 - -
preserve sort repartition 15.908 - - 40.580 - - 17.669 - -
optimized (this PR) 15.000 13.1% (1.15x) 5.7% (1.06x) 40.977 1.3% (1.01x) -1.0% (0.99x) 4.301 85.1% (6.70x) 75.7% (4.11x)

The optimized plan is roughly on par with the other plans for preserve_order and preserve_order_join, but it makes preserve_order_window 6.7× faster than not optimized and 4.1× faster than preserve-sort-repartition.


Medium Dataset:

  • Normal config: 30 partitions × 3000000 rows = 90000000 total rows
  • High-cardinality config: 75 partitions × 1500000 rows = 112500000 total rows
Plan preserve_order (ms) Speedup vs not opt Speedup vs psr preserve_order_join (ms) Speedup vs not opt Speedup vs psr preserve_order_window (ms) Speedup vs not opt Speedup vs psr
not optimized 752.130 - - 451.300 - - 193.210 - -
preserve sort repartition 392.050 - - 477.400 - - 115.320 - -
optimized (this PR) 93.818 87.5% (8.02x) 76.1% (4.18x) 203.540 54.9% (2.22x) 57.4% (2.35x) 9.841 94.9% (19.63x) 91.5% (11.72x)

The optimized plan makes preserve_order about 8.0× faster than not optimized (4.2× vs PSR), preserve_order_join about 2.2× faster than not optimized (2.35× vs PSR), and preserve_order_window a huge 19.6× faster than not optimized (11.7× vs PSR).


Large Dataset:

  • Normal config: 50 partitions × 6000000 rows = 300000000 total rows
  • High-cardinality config: 125 partitions × 3000000 rows = 375000000 total rows
Plan preserve_order (ms) Speedup vs not opt Speedup vs psr preserve_order_join (ms) Speedup vs not opt Speedup vs psr preserve_order_window (ms) Speedup vs not opt Speedup vs psr
not optimized 2699.700 - - 1563.800 - - 614.440 - -
preserve sort repartition 1244.200 - - 1594.300 - - 371.300 - -
optimized (this PR) 290.740 89.2% (9.29x) 76.6% (4.28x) 645.180 58.7% (2.42x) 59.5% (2.47x) 11.538 98.1% (53.25x) 96.9% (32.18x)

The optimized plan makes preserve_order about 9.3× faster than not optimized (4.3× vs PSR), preserve_order_join about 2.4× faster than not optimized (2.5× vs PSR), and preserve_order_window an extreme 53× faster than not optimized (32× vs PSR).

Are there any user-facing changes?

Yes users now can use the preserve_file_partitions option to define the amount of partitions they want to preserve file partitioning for (0 disabled). If enabled and triggered, users will see elimination of repartitions on their file partition key when appropriate.

Follow-Up Work

  • Superset Partitioning: currently, Hash(a) doesn't satisfy Hash(a, b) although it should. This is because Hash(a) guarantees that all of a is contained in a single partition. Thus, since Hash(a, b) is a subset of Hash(a), anything that is Hash(a) is also Hash(a, b).
  • Reduce File I/O with Preserve File Partitioning: In the current implementation, when a partition value has many files all of this file I/O will go to one task. This is a tradeoff that increases I/O overhead to eliminate shuffle and sort overhead. There could be ways to increase I/O while still maintaining partitioning.
  • Sort Satisfaction for Monotonic Functions: If we are sorted by timestamp and then try to order by date_bin('1 hour', timestamp), Datafusion will not recognize that this is implicitly satisfied. Thus, for monotonic functions: date_bin, CAST, FLOOR, etc. we should maintain ordering, eliminating unnecessary sorts.

@github-actions github-actions bot added documentation Improvements or additions to documentation core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) catalog Related to the catalog crate common Related to common crate datasource Changes to the datasource crate labels Dec 5, 2025
@gene-bordegaray gene-bordegaray force-pushed the gene.bordegaray/2025/12/preserve_file_partitioning branch from e4a8d0c to f05c6fa Compare December 5, 2025 20:17
@gene-bordegaray gene-bordegaray marked this pull request as ready for review December 5, 2025 21:39
@asolimando
Copy link
Member

Because we can have fewer file partition groups than target_partitions, forcing a partition group (with possibly large amounts of data) to be read in a single partition can increase file I/O. This configuration choice was made to be able to control the amount of I/O overhead a user is willing to have in order to eliminate shuffles. (This was recommended by @gabotechs and is a great approach to have more granularity over this behavior rather than a boolean flag, thank you)

I read this knob as primarily controlling the tradeoff between scan parallelism vs I/O overhead and eliminating shuffles, which is very useful and more expressive than a boolean.

However, I think we should also consider the impact of data skew. For heavily skewed tables, preserving file partitioning can make the scan itself significantly unbalanced (one or a few partition groups doing most of the I/O), and in those cases you might actually prefer to pay the shuffle cost rather than constrain execution to the file partition layout.

That’s why I think it’s fine to keep a global configuration option, but ideally we would also support passing a different value per scan. In practice you may have:

  • a skewed table, where you don’t want to match in-memory partitioning to file partitions, and
  • other tables where you’re perfectly happy preserving file partitioning.

If adding per-scan configuration is too much for this PR, it’s probably enough to call it out explicitly as follow-up work, but I think this skew aspect is important to keep in mind for the feature’s overall design.

@gene-bordegaray
Copy link
Contributor Author

If adding per-scan configuration is too much for this PR, it’s probably enough to call it out explicitly as follow-up work, but I think this skew aspect is important to keep in mind for the feature’s overall design.

This is a great call out and I think could see some improvements for file I/O while using this behavior. I did some poking and looks like one approach may be to expose a new listing option, ListingOptions::with_preserve_file_partititons(threshold) that will force grouping logic on that table alone. Thus, you could run two table in the same session with the optimization on for one and off for the other.

I think this would be great follow-up work as I am trying to keep the scope of this PR tight with good benefits 😄 .

@NGA-TRAN
Copy link
Contributor

NGA-TRAN commented Dec 8, 2025

@asolimando

However, I think we should also consider the impact of data skew. For heavily skewed tables, preserving file partitioning can make the scan itself significantly unbalanced (one or a few partition groups doing most of the I/O), and in those cases you might actually prefer to pay the shuffle cost rather than constrain execution to the file partition layout.

That’s a very good point. I think we can frame this as two distinct scenarios:

  1. Standard input data — when the input consists of Parquet files (and similar formats) and want DataFusion handles skew, your suggestions fit perfectly.
  2. Customized input data — when the input is specialized and DataFusion lacks sufficient statistics to manage skew, or when the data has its own structure and skew‑handling logic. In these cases, users may want to preserve partitions as‑is for performance or correctness, and we should avoid intervening.

Overall, I believe we should support both options: let DataFusion operate as a library while giving users the flexibility to decide how they want to handle their data.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

catalog Related to the catalog crate common Related to common crate core Core DataFusion crate datasource Changes to the datasource crate documentation Improvements or additions to documentation sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Preserve FIle Partitioning

3 participants