Sitemap
Expedia Group Technology

Stories from the Expedia Group Technology teams

Expedia Group Technology — Data

Turbocharging Efficiency & Slashing Costs: Mastering Spark & Iceberg Joins with Storage-Partitioned

9 min readDec 3, 2024

--

Camels in the desert
Photo by Sergey Pesterev on Unsplash

Introduction

Introduction to Spark

Introduction to Iceberg: Understanding why it’s a game-changer compared to Hive

Road to SPJ: theory

Joins in a distributed system: data shuffle is the wolf behind the scenes

An image showcasing data shuffling across nodes
data shuffling across nodes

Deep dive into Spark execution plans

CREATE TABLE target_table (
id INT,
description STRING,
some_column1 STRING,
some_column2 INT
) USING ICEBERG
PARTITIONED BY (bucket(N, id));
INSERT OVERWRITE target_table
SELECT
a.id,
b.description,
b.some_column1,
b.some_column2
FROM
table_a a
FULL JOIN
table_b b ON a.id = b.id;

Sort-merge join: the standard method for handling non-broadcasted joins

An image showcasing sort merge join execution plan with two batches of data
Sort-merge join execution plan

Enable write fanout

An image explaining write.spark.fanout.enabled default value triggers a sort before write
write.spark.fanout.enabled default value triggers a sort before write
alter table set tblproperties ('write.spark.fanout.enabled'='true')
An image of the Execution plan with write.spark.fanout.enabled
Execution plan with write.spark.fanout.enabled

Shuffle-hash join: an improvement removing two expensive operations

An image showcasing how shuffle-hash join remove 2 expensive cpu intensive steps
Shuffle-hash join remove 2 expensive cpu intensive steps
spark.conf.set("spark.sql.join.preferSortMergeJoin", value = false)
An image explaining how a shuffle hash join execution plan is still overwhelmed by the 2 exchanges
Shuffle hash join execution plan is still overwhelmed by the 2 exchanges

SPJ: the successor of bucketed join

storage partitioned join execution plan removes 2 exchanges and 2 sorts

SPJ requirements

spark.conf.set("spark.sql.sources.v2.bucketing.enabled", value = true)
spark.conf.set("spark.sql.requireAllClusterKeysForCoPartition", value = false)
spark.conf.set("spark.sql.sources.v2.bucketing.pushPartValues.enabled", value = true)
spark.conf.set("spark.sql.iceberg.planning.preserve-data-grouping", value = true)
spark.conf.set("spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled", value = true)

Results: achieving 45% to 70% savings on queries involving joins

Configuration

spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "1024m")  # Spark partition size after AQE
spark.conf.set("spark.sql.shuffle.partitions", 7680) # Multiple of number of cores and let AQE do its job

Scenarios

Performance comparison of join strategies

SPJ is a cost-breaker

Merge statement: the big winner

When not to use SPJ

Conclusion

Learn more

Expedia Group Technology
Expedia Group Technology
Samy Gharsouli
Samy Gharsouli

Written by Samy Gharsouli

I write about Data Engineering. LinkedIn :

Responses (1)