Lessons

Data Transformation, Cleansing & Quality

Deduplication — distinct, keep-latest, and the streaming state trap

Deduplicate data using appropriate techniques for batch and streaming, including watermark-bounded streaming deduplication and idempotent merges.

Real ingestion produces duplicates: a source retries, a file gets reprocessed, an event fires twice. Removing them looks trivial — "remove duplicates" — until you notice the word duplicate hides two decisions, and that the one-line fix that works on a batch quietly destroys a streaming job. This lesson makes both precise.


The spine

Beat 1 — the anchor: two questions before any dedup

Anchor. Before removing duplicates, answer two things. (1) Duplicate by what? — identical whole rows, or "same key, keep one specific survivor"? (2) Is the data bounded (a finite batch you can see all at once) or unbounded (a stream, where "have I seen this before?" needs remembered state)? Every technique is just an answer to those two.

Miss question 2 and you write a dedup that needs infinite memory. That's the trap this lesson exists for.

Beat 2 — batch: identity vs keep-latest

Identical rows (easy). The whole dataset is in view, so Spark compares every row:

Predict: that "arbitrary which one" — when is it not what you want?

Almost always. You usually want a specific survivor — the latest, the most complete — and dropDuplicates(keys) can't express that. That's where last lesson pays off:

SELECT * FROM (
  SELECT *, ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY event_ts DESC) AS rn
  FROM events
) WHERE rn = 1;   -- one row per user_id: the most recent

PARTITION BY the key, ORDER BY newest-first, keep ROW_NUMBER() = 1. ROW_NUMBER specifically — it's unique per partition, so exactly one survivor (RANK/DENSE_RANK would keep ties, from [Advanced transformations — window functions, joins, aggregations](/lessons/s3-transformations/)).

Beat 3 — streaming: the state trap, and why a watermark is mandatory

Now run the same dedup as a stream.

Predict: dropDuplicates(["id"]) on a stream must recognise a repeat of a row seen an hour ago. What must it remember — and what happens to that memory on an endless stream?

It must remember every key it has ever seen — that "seen" set is state ([Structured Streaming & the state model](/lessons/s1-structured-streaming-state/)), held in the state store, and on an unbounded stream it grows forever until the job's memory dies.

The fix is the same tool that bounded streaming aggregations — a watermark: "records can't be later than N behind the max event time, so anything older can be forgotten." That caps dedup state to a time window:

Lock it. Batch keep-latest = ROW_NUMBER()…=1. Streaming dedup with no watermark = unbounded, ever-growing state. "Streaming + dedup" ⇒ watermark — an answer that omits it is wrong.


The dials (skim now; return when a question needs one)

◆ The streaming pattern the exam favours: dedup-in-batch + insert-only merge

The recommended streaming-dedup shape isn't just dropDuplicates — it's de-duplicate within each micro-batch, then insert-only MERGE into the target:

MERGE INTO orders t USING microbatch s
  ON t.order_id = s.order_id AND t.order_timestamp = s.order_timestamp
WHEN NOT MATCHED THEN INSERT *   -- existing keys skipped → idempotent

Only WHEN NOT MATCHED, so an already-present key is a no-op and re-running inserts nothing new — the ingest is idempotent (run twice, same result). Inside foreachBatch the merge must be idempotent, or a restart replays the batch.

◆ The batch trap: dropDuplicates dedups the batch, not the target

A favourite question: a nightly job does dropDuplicates then appends. Each write contains unique records — but records may duplicate rows already in the target from earlier writes. dropDuplicates only sees the current batch. To dedup against history you need the insert-only merge (which checks the target), often on a natural composite key (e.g. user_id, review_id, product_id, review_timestamp). Likewise a watermark that keeps only ~2h of state means duplicates arriving more than ~2h apart may be retained.

◆ Write-level idempotency: txnAppId + txnVersion

For retry-safety across failures, Delta accepts txnAppId + txnVersion on the write — it records them in the log and silently ignores a re-applied (appId, version), so a failed-then-retried batch can't double-write.

◆ The MERGE limitation that forces dedup

MERGE errors when multiple source rows match the same target row — the outcome would be ambiguous (which source row wins?), so Delta refuses. The fix is the keep-latest idiom: dedup the source first so each target key matches ≤1 source row. That's why dedup and merge travel together. (Aside: with schema evolution, a MERGE adding a new column reads existing rows as NULL for it — no rewrite, per [How Delta Lake works — the transaction log](/lessons/f2-delta-transaction-log/).)

◆ Name the collision: dropDuplicates vs MERGE

dropDuplicates dedups on identity/columns, keeps an arbitrary row; MERGE dedups on a key, lets you control the survivor (or skip existing). "Remove exact repeat rows" → dropDuplicates/DISTINCT; "one row per business key, upsert-style, idempotent" → MERGE. Change feed with ordering + SCD? Don't hand-write it — AUTO CDC ([APPLY CHANGES — CDC and SCD, declaratively](/lessons/s1-apply-changes-cdc/)) does dedup + ordering + SCD for you.

Takeaways (rebuild it from these)

  1. Define the duplicate first: by identity (DISTINCT/dropDuplicates) or by key (keep a specific survivor).
  2. Keep-latest per key = ROW_NUMBER() OVER (PARTITION BY key ORDER BY ts DESC) then = 1ROW_NUMBER because it's unique per partition.
  3. Streaming dedup needs a watermark — plain dropDuplicates on a stream = unbounded state. Prefer dropDuplicatesWithinWatermark. No watermark ⇒ wrong.
  4. Recommended streaming shape: dedup-in-batch + insert-only MERGE (idempotent; foreachBatch merge must be idempotent). dropDuplicates dedups the batch, not the target — use the merge for cross-history. txnAppId/txnVersion = retry-safe writes.
  5. MERGE errors on multiple source rows matching one target → dedup source first. Collision: dropDuplicates (identity, arbitrary) vs MERGE (key, chosen). Change feeds → AUTO CDC.

Before you move on — say these without scrolling up

  1. The two questions to answer before any dedup.
  2. Keep-latest per key — which window function exactly, and why not RANK?
  3. Streaming dropDuplicates with no watermark — what grows, and what's the fix?
  4. A nightly dropDuplicates-then-append still leaves dupes in the target — why, and what fixes it?

Next: once deduped, we enforce that the data is correct — expectations and constraints, and what each does to a bad row → [Data quality — expectations and constraints, and what happens to a bad row](/lessons/s3-data-quality/).

Prerequisites

Leads to