Skip to content

Preserve FIle Partitioning #19090

@gene-bordegaray

Description

@gene-bordegaray

Is your feature request related to a problem or challenge?

When data is pre-partitioned by a user (in a hive-style), Datafusion should have the ability to preserve this partitioning to avoid unnecessary repartitions. Take this scenario for example, you have data partitioned by f_dkey and ordered by (f_dkey, timestamp), which is hive-style partitioned:

f_dkey=A/data.parquet
f_dkey=B/data.parquet'
f_dkey=C/data.parquet'
...

Each table (partitioned by f_dkey and sorted internally sorted by timestamp):
| f_dkey | timestamp              | value  |
|--------|------------------------|--------|
| A      | 2023-01-01T09:00:00    | 95.5   |
| A      | 2023-01-01T09:00:10    | 102.3  |
| A      | 2023-01-01T09:00:20    | 98.7   |
| A      | 2023-01-01T09:12:20    | 105.1  |
| A      | 2023-01-01T09:12:30    | 100.0  |
| A      | 2023-01-01T09:12:40    | 150.0  |
| A      | 2023-01-01T09:12:50    | 120.8  |

Runnuing the query:

EXPLAIN SELECT f_dkey, count(*), avg(value) 
FROM fact_table_ordered 
GROUP BY f_dkey 
ORDER BY f_dkey;

would produce the plan:

01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST]
02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as count(*), avg(fact_table_ordered.value)@2 as avg(fact_table_ordered.value)]
03)----AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey], aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
04)------SortExec: expr=[f_dkey@0 ASC NULLS LAST], preserve_partitioning=[true]
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------RepartitionExec: partitioning=Hash([f_dkey@0], 3), input_partitions=3
07)------------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey], aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
08)--------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], file_type=parquet

Because our data is ordered on (f_dkey, timestamp), when we hash repartition by f_dkey we lose our sort ordering thus forcing a SortExec to be inserted after the repartition. You could set datafusion.optimizer.prefer_existing_sort = true; to preserve the ordering through the repartition and thus preserve the ordering, but with the tradeoff of a more expensive shuffle.

Since our data is partitioned by f_dkey at file scan time we can eliminate both the hash repartitioning, the eliminating the SortExec in the process. This would result in a plan that looks like:

01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST]
02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as count(*), avg(fact_table_ordered.value)@2 as avg(fact_table_ordered.value)]
03)----AggregateExec: mode=SinglePartitioned, gby=[f_dkey@1 as f_dkey], aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
04)------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], file_type=parquet

Describe the solution you'd like

When a user has partitioned data, introduce the ability to take advantage of this by setting a new option, preserve_file_partitioning, to true (false by default). When this is set, Datafusion will preserve partitioning at the file scan level by creating file groups via maintaining file partitioning and and representin this by returning Hash for its output_partitioning.

In turn, operators following this will no longer need to insert hash repartitions on our partition key.

Describe alternatives you've considered

I explored adding a new partitioning type: KeyPartitioned which would be used when data is explicitly partitioned by certain columns at the data source level. I explore this idea due to concerns about distinguishing between existing hash partitioning and this new partitioning type.

After speaking with @gabotechs @fmonjalet and @NGA-TRAN, it was decided that the existing hash semantics satisfied what is being represented through the file partitioning. They both are promising that particular values of the expression being partitioned by are contained within the same partition.

Furthermore, introducing a new partitioning type required many more rippling changes to properly propagate, apply rules to, and handle a new partitioning type. Extending hash functionality to the file scan level takes care of almost all this work.

Additional context

Metadata

Metadata

Labels

enhancementNew feature or request

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions