Lessons

Data Transformation, Cleansing & Quality

Advanced transformations — window functions, joins, aggregations

Write efficient Spark SQL and PySpark code to apply advanced transformations — window functions, joins, aggregations — on large datasets.

Bronze is raw; the real engineering happens when you transform it into silver and gold. This lesson is the advanced-transformation toolkit the exam expects you to write, not just recognise. One tool dominates — window functions — so we build there first, then joins and aggregations.


The spine

Beat 1 — the anchor: a window keeps every row; GROUP BY collapses

Here's the distinction that makes window functions click. Say you want to tag each employee with a salary tier within their department — but you must keep every employee row in the output.

Predict: can GROUP BY department do that?

No. GROUP BY collapses each department into one summary row — the individual employees vanish. You need something that computes a per-group value but keeps the rows. That's a window function:

Anchor. A window function computes a value for each row using a group of related rows, but keeps every row. GROUP BY collapses a group to one row; a window function adds a column to every row, computed over its group. That's the whole power: rank, running total, "previous row" — without losing the rows.

A window is three parts inside OVER (...):

Beat 2 — ranking, and the tie behaviour (the most-tested detail)

Three ranking functions look identical and differ only on ties:

FunctionOn a tie (equal ORDER BY value)Sequence
ROW_NUMBER()each tied row gets a different number (arbitrary among ties)1, 2, 3, 4
RANK()tied rows get the same rank, then it skips1, 2, 2, 4
DENSE_RANK()tied rows get the same rank, no gap1, 2, 2, 3

Predict: "assign a salary tier per department, where employees with the same salary share the same tier, and tiers don't skip numbers" — which one?

DENSE_RANK() — same value → same tier (rules out ROW_NUMBER), no gaps (rules out RANK).

from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank
w = Window.partitionBy("department").orderBy(col("salary").desc())
df.withColumn("tier", dense_rank().over(w))

Beat 3 — running totals, and the frame gotcha

For a cumulative sum/average, the frame is what makes it "running":

AVG(score) OVER (PARTITION BY student ORDER BY exam_date
                 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)  -- cumulative average

ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW = "all rows from the start of the group up to this row" → a running/cumulative value (this exact frame is the cumulative-average answer).

The gotcha to bank: state the frame explicitly. Leave it off with an ORDER BY present and you inherit a default frame you didn't intend — so your "running total" quietly isn't running.

LAG() / LEAD() reach the previous/next row (e.g. "days since a user's last event").

Lock it. Window = per-row value over a group, keeps every row. Ties → ROW_NUMBER (all different) / RANK (gaps) / DENSE_RANK (no gaps). Running total → explicit frame UNBOUNDED PRECEDING → CURRENT ROW.


The dials (skim now; return when a question needs one)

◆ Name the collision: analytic window vs time window

Two different things are both called "window," and Section-3 questions use both:

Tell: OVER (...) → analytic window; window(col, "…") inside a groupBy → time window. "Metrics per non-overlapping 10-minute interval per server" is a time window, not OVER. And note the full signature — window(timeCol, windowDuration, slideDuration, startTime): non-overlapping means slideDuration = windowDuration (tumbling), and "offset by 3 minutes" is the startTime argument (window(ts, "10 minutes", "10 minutes", "3 minutes")).

◆ Joins — pick by "which unmatched rows do I keep," plus two you'll forget

Standard INNER / LEFT / RIGHT / FULL OUTER you know. The two people forget, because they're tested:

Recall from [Reading the evidence — Query Profile & Spark UI](/lessons/s6-query-profile/) / [Job & environment configuration — compute and Spark tuning](/lessons/s1-job-env-config/): a broadcast join ships a small dimension to every executor so the big fact isn't shuffled (/*+ BROADCAST(dim) */ or let AQE do it).

◆ Aggregations beyond GROUP BY

◆ Extracting date parts (a small tested trap)

To get the date from a timestamp: to_date(ts), date(ts), or CAST(ts AS DATE). To get a component (day/month/year number): date_part('day', ts), EXTRACT(DAY FROM ts), dayofmonth(ts). Tell: date_part('day', ts) returns the day number, so it's not "extracting the date" — that's the odd-one-out in a "which is NOT valid for extracting the date" question.

Takeaways (rebuild it from these)

  1. Window = per-row value over a group, keeping every row (GROUP BY collapses; window adds a column). PARTITION BY + ORDER BY + frame.
  2. Ties: ROW_NUMBER (all different) · RANK (same, gaps) · DENSE_RANK (same, no gaps → "same value = same tier").
  3. Running totals need explicit ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW; always state the frame. LAG/LEAD = previous/next row.
  4. Collision: analytic OVER(...) vs time window(col,"5 min") (+ startTime = offset, slide = window = non-overlapping).
  5. Joins: ANTI = exclusion, SEMI = "has a match, no dup", broadcast = small dim, no shuffle. ROLLUP = hierarchy, CUBE = all combos; higher-order fns for arrays.

Before you move on — say these without scrolling up

  1. You need a per-employee tier but must keep every row — why can't GROUP BY, and what does?
  2. "Same salary → same tier, no skipped numbers" — which ranking function?
  3. What turns a plain SUM(...) OVER(...) into a running total — and the gotcha if you omit it?
  4. OVER(...) vs window(col, "5 minutes") — which is which, and where does "offset by 3 minutes" go?

Next: a specific cleansing transformation with a sharp gotcha — deduplication, and why the fix depends on how far apart the duplicates arrive → [Deduplication — distinct, keep-latest, and the streaming state trap](/lessons/s3-dedup/).

Prerequisites

Leads to