The Three Joins AQE Can't Save You From
When and how to override Spark's automatic join strategy — a technical deep-dive for senior engineers.
The Three Joins AQE Can't Save You From
When and how to override Spark's automatic join strategy — a technical deep-dive for senior engineers
If you ask ten data engineers when to override Spark's join strategy, eight will recite some version of "broadcast the smaller table." That answer was correct in 2018. It is now usually wrong, occasionally dangerous, and almost always a sign that the engineer hasn't worked seriously with Adaptive Query Execution in production.
Since Spark 3.0, AQE silently fixes most of the join planning mistakes that used to require manual hints. The runtime optimizer measures real partition sizes after shuffles, switches join algorithms mid-flight, and splits skewed partitions on the fly. For most workloads, the senior move is to trust the optimizer and stop sprinkling broadcast() calls across your codebase like seasoning.
But "most" isn't "all." There are exactly three production scenarios where AQE structurally cannot help you, and a senior engineer recognizes each one. This article walks through them, the override mechanics for each, and the trade-offs you accept by intervening.
A quick refresher on what AQE actually does
Adaptive Query Execution is a feedback loop bolted onto Spark's query planner. Catalyst still produces an initial physical plan from catalog statistics, but AQE inspects the runtime output of each shuffle stage and rewrites the remaining plan based on what it actually sees on disk and across the wire.
Three behaviors matter for joins:
- Dynamic join algorithm switching. If a shuffle exchange produces a side smaller than
spark.sql.adaptive.autoBroadcastJoinThreshold(10 MB by default), AQE rewrites the downstream sort-merge join into a broadcast hash join. - Dynamic partition coalescing. Tiny post-shuffle partitions get merged into reasonably sized chunks, so you stop paying scheduling overhead for thousands of 4 KB tasks.
- Dynamic skew handling. Partitions that are both larger than
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes(default 256 MB) and bigger than five times the median are split into sub-partitions and joined in parallel.
The first time most engineers see AQE in action, they delete a pile of broadcast() hints and watch their pipelines run faster. That's the right instinct.
The catch — and it's the only catch that matters here — is that AQE has nothing to measure until a shuffle completes. The very first stage of a query runs on whatever plan Catalyst chose at compile time, using whatever statistics the catalog handed it. If those inputs are wrong, the first stage is wrong, and AQE arrives too late to undo the damage.
That single property generates all three of the scenarios below.
Scenario 1: Catalog statistics are missing or stale
This is the most common reason AQE fails silently. Catalyst sees a table whose sizeInBytes is either absent or wildly inaccurate, picks sort-merge as the conservative default, and dispatches a 200-partition shuffle for what is actually a 4 MB lookup table.
When AQE finally measures the real size after the shuffle, it can still convert downstream operators to broadcast — but you've already paid the full sort and exchange cost on the way in. You get the right join algorithm eventually, with the wall-clock penalty of the wrong one.
You see this most often in:
- Delta or Iceberg tables that have been heavily compacted but never re-analyzed. The on-disk footprint shrank by 95%; the catalog still thinks the table weighs gigabytes.
- Cross-engine joins where one side comes from a JDBC source, an external Hive table, or a federated query. The catalog typically has no statistics at all for these, and Catalyst falls back to absurdly conservative defaults.
- Newly written tables in pipelines that don't run
ANALYZE TABLEas part of the publish step.
The override is an explicit broadcast hint. The hint is applied during planning, before execution begins, so it bypasses the bad initial plan entirely:
from pyspark.sql.functions import broadcast
orders = spark.table("warehouse.orders") # 8 TB fact
customer = spark.table("warehouse.customer_lookup") # 4 MB, no stats
result = orders.join(broadcast(customer), "customer_id")The structural fix is to populate stats once and let AQE handle it from then on:
ANALYZE TABLE warehouse.customer_lookup COMPUTE STATISTICS NOSCAN;Use the hint when you need correctness now; use ANALYZE when you want correctness forever.
A subtle point worth knowing: broadcast() is a directive, not a request. If the table is actually 4 GB at runtime, Spark will still try to broadcast it, your driver will burn through memory pulling it in, and you'll get a BroadcastTimeout or, worse, an OOM that takes down the application. The hint trusts you. Make sure you're trustworthy.
Scenario 2: Severe key skew that AQE's split can't tame
AQE's skew handler is good — but it has a ceiling. It splits oversized partitions into smaller ones, which works when one or two keys are moderately hot. It does not work when a single key carries 100× the rows of the median key, because splitting just pins the same total volume of data to N tasks instead of one. The wall clock improves, but it's still dominated by that key.
The canonical example is a user_id-keyed join where one user is a bot, an internal service account, or a celebrity whose row count dwarfs the long tail. AQE's split takes you from one 45-minute straggler to four 12-minute stragglers. Better, but the job is still wall-clock-bound on that key.
The structural fix is salting: rewrite the join key on the large side with a random suffix, replicate the small side N times across the same suffix space, and join on the salted composite key. The hot key is now spread across N partitions evenly.
from pyspark.sql.functions import col, lit, rand, explode, array
N = 64 # salt cardinality — tune to the actual skew, not a round number
events_salted = (
events
.withColumn("salt", (rand() * N).cast("int"))
)
dim_users_replicated = (
dim_users
.withColumn("salt", explode(array([lit(i) for i in range(N)])))
)
joined = events_salted.join(
dim_users_replicated,
on=["user_id", "salt"],
how="inner"
)The trade-off is explicit: you N-times-replicate the dimension side and roughly double the shuffle volume on the fact side. In return, every executor processes a comparable slice of work and the straggler disappears.
A few field notes on salting:
- Tune N to the actual skew, not to a round number. N too small leaves residual skew; N too large wastes shuffle on the cold path. Look at the histogram of rows-per-key on the hot side and pick N so the hottest key, after salting, is roughly the size of your median partition.
- Don't salt unless you've confirmed the skew. Open the Spark UI's stage tab — if one task's input bytes is wildly larger than the median, you have skew. If they're all comparable, you don't, and salting will only slow you down.
- Salt the join key, not the data. This is the easy thing to get backwards if you're working from memory.
The framing that helps me remember the difference: AQE's split is reactive — it observes skew and reduces the blast radius. Salting is proactive — it prevents the skewed partition from forming in the first place. For severe skew, only the proactive fix moves the needle.
Scenario 3: Both sides are large, unsorted, and SMJ is overkill
Sort-merge join is the safe default for large-large joins because it scales linearly and never allocates an unbounded hash table. But it's expensive: both sides must be sorted by the join key after shuffling. If your data arrives unsorted from upstream — and it almost always does, unless you're working with a bucketed table — you're paying for two full sorts every run.
When the smaller side is "large" by broadcast standards (say, 800 MB) but still fits comfortably in an executor's heap, shuffle hash join is often faster. SHJ shuffles both sides on the join key, builds an in-memory hash table on the smaller partition of each task, and probes with the larger partition. No global sort. The cost is memory for the hash table; the win is skipping the sort phase entirely.
Spark prefers SMJ by default because hash joins are riskier: if the build side overflows the heap, you OOM the executor with no graceful spill. Flipping the preference is a configuration change:
spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")I prefer the per-query hint for surgical fixes — a global config flip changes behavior for queries you aren't thinking about right now:
result = orders.hint("shuffle_hash").join(line_items, "order_id")Use shuffle hash when:
- Both sides are too large to broadcast (above 10 MB / your tuned threshold).
- The smaller side, divided by your shuffle partition count, fits comfortably in
spark.executor.memory * spark.memory.fraction. - The data arrives unsorted (i.e., the upstream is not a bucketed table sorted on the join key).
Skip it when the smaller side is genuinely huge or when executor memory is already tight. SHJ does not gracefully degrade — it crashes.
The decision framework, in one paragraph
Trust AQE by default. Override only when you have a specific reason: broadcast hint when catalog stats are missing or stale and you know the true size; salting when key skew is severe enough that AQE's partition split can't equalize task durations; shuffle hash when both sides are too large to broadcast but the smaller side fits in heap and you want to skip the sort. Everything else is AQE's problem, not yours, and reaching for a hint anyway is how you ship pipelines that ignore three years of runtime improvements.
How to actually verify what Spark is doing
Two debugging moves catch most join pathology before it hits production.
First, inspect the physical plan with cost-formatted explain:
df.explain("cost")You're looking for BroadcastHashJoin, SortMergeJoin, or ShuffleHashJoin in the plan, plus the row-count and size estimates each operator works with. If the estimates are obviously wrong — a 4 MB table reported as 4 GB — your stats need a refresh, and that's a Scenario 1 problem.
Second, when a job is slow, open the SQL tab in the Spark UI and find the stage that's taking the wall clock. The task summary table shows min / median / max input bytes and durations per task. If max is more than ~5× the median, you have skew (Scenario 2). If the durations are uniformly long but every task spends most of its time in sort, you have an algorithm problem (Scenario 3). The fix flows from which one you see.
These two habits — explain("cost") before you ship, and the SQL tab when something feels slow — separate engineers who guess at performance from engineers who reason about it.
The senior signal
Memorizing join types is a junior move. Memorizing override hints is a mid-level move. The senior signal is knowing where the optimizer's authority ends and yours begins — and being able to articulate, in three sentences, exactly what AQE can't see and why your manual override addresses that specific blind spot.
Trust the optimizer. Override deliberately. And when you do override, be ready to explain what AQE could not have known.
Thanks for reading. If this was useful, follow for more deep-dives on Spark internals, query optimization, and the kinds of production failures the documentation never quite warns you about.
Comments
No comments yet. Be the first to leave one below.
Leave a comment