Real ingestion produces duplicates: a source retries, a file gets reprocessed, an event fires twice. Removing them looks trivial — "remove duplicates" — until you notice the word duplicate hides two decisions, and that the one-line fix that works on a batch quietly destroys a streaming job. This lesson makes both precise.
The spine
Beat 1 — the anchor: two questions before any dedup
Anchor. Before removing duplicates, answer two things. (1) Duplicate by what? — identical whole rows, or "same key, keep one specific survivor"? (2) Is the data bounded (a finite batch you can see all at once) or unbounded (a stream, where "have I seen this before?" needs remembered state)? Every technique is just an answer to those two.
Miss question 2 and you write a dedup that needs infinite memory. That's the trap this lesson exists for.
Beat 2 — batch: identity vs keep-latest
Identical rows (easy). The whole dataset is in view, so Spark compares every row:
SELECT DISTINCT * FROM t/df.dropDuplicates()— remove fully identical rows.df.dropDuplicates(["user_id", "event_date"])— dup by a subset of columns: one row per key, but arbitrary which one.
Predict: that "arbitrary which one" — when is it not what you want?
…
Almost always. You usually want a specific survivor — the latest, the most complete — and dropDuplicates(keys) can't express that. That's where last lesson pays off:
SELECT * FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY event_ts DESC) AS rn
FROM events
) WHERE rn = 1; -- one row per user_id: the most recent
PARTITION BY the key, ORDER BY newest-first, keep ROW_NUMBER() = 1. ROW_NUMBER specifically — it's unique per partition, so exactly one survivor (RANK/DENSE_RANK would keep ties, from [Advanced transformations — window functions, joins, aggregations](/lessons/s3-transformations/)).
Beat 3 — streaming: the state trap, and why a watermark is mandatory
Now run the same dedup as a stream.
Predict:
dropDuplicates(["id"])on a stream must recognise a repeat of a row seen an hour ago. What must it remember — and what happens to that memory on an endless stream?
…
It must remember every key it has ever seen — that "seen" set is state ([Structured Streaming & the state model](/lessons/s1-structured-streaming-state/)), held in the state store, and on an unbounded stream it grows forever until the job's memory dies.
The fix is the same tool that bounded streaming aggregations — a watermark: "records can't be later than N behind the max event time, so anything older can be forgotten." That caps dedup state to a time window:
df.withWatermark("event_ts", "1 hour").dropDuplicates(["id"])— keeps the "seen" state ~1 hour back. (Docs warn its state can still grow when duplicates carry different event times.)dropDuplicatesWithinWatermark(["id"])(watermark set) — purpose-built: dedups within the window even when duplicates' event times differ, and bounds state cleanly. The current recommended streaming-dedup function (DBR 13.3 LTS+).
Lock it. Batch keep-latest =
ROW_NUMBER()…=1. Streaming dedup with no watermark = unbounded, ever-growing state. "Streaming + dedup" ⇒ watermark — an answer that omits it is wrong.
The dials (skim now; return when a question needs one)
◆ The streaming pattern the exam favours: dedup-in-batch + insert-only merge
The recommended streaming-dedup shape isn't just dropDuplicates — it's de-duplicate within each micro-batch, then insert-only MERGE into the target:
MERGE INTO orders t USING microbatch s
ON t.order_id = s.order_id AND t.order_timestamp = s.order_timestamp
WHEN NOT MATCHED THEN INSERT * -- existing keys skipped → idempotent
Only WHEN NOT MATCHED, so an already-present key is a no-op and re-running inserts nothing new — the ingest is idempotent (run twice, same result). Inside foreachBatch the merge must be idempotent, or a restart replays the batch.
◆ The batch trap: dropDuplicates dedups the batch, not the target
A favourite question: a nightly job does dropDuplicates then appends. Each write contains unique records — but records may duplicate rows already in the target from earlier writes. dropDuplicates only sees the current batch. To dedup against history you need the insert-only merge (which checks the target), often on a natural composite key (e.g. user_id, review_id, product_id, review_timestamp). Likewise a watermark that keeps only ~2h of state means duplicates arriving more than ~2h apart may be retained.
◆ Write-level idempotency: txnAppId + txnVersion
For retry-safety across failures, Delta accepts txnAppId + txnVersion on the write — it records them in the log and silently ignores a re-applied (appId, version), so a failed-then-retried batch can't double-write.
◆ The MERGE limitation that forces dedup
MERGE errors when multiple source rows match the same target row — the outcome would be ambiguous (which source row wins?), so Delta refuses. The fix is the keep-latest idiom: dedup the source first so each target key matches ≤1 source row. That's why dedup and merge travel together. (Aside: with schema evolution, a MERGE adding a new column reads existing rows as NULL for it — no rewrite, per [How Delta Lake works — the transaction log](/lessons/f2-delta-transaction-log/).)
◆ Name the collision: dropDuplicates vs MERGE
dropDuplicates dedups on identity/columns, keeps an arbitrary row; MERGE dedups on a key, lets you control the survivor (or skip existing). "Remove exact repeat rows" → dropDuplicates/DISTINCT; "one row per business key, upsert-style, idempotent" → MERGE. Change feed with ordering + SCD? Don't hand-write it — AUTO CDC ([APPLY CHANGES — CDC and SCD, declaratively](/lessons/s1-apply-changes-cdc/)) does dedup + ordering + SCD for you.
Takeaways (rebuild it from these)
- Define the duplicate first: by identity (
DISTINCT/dropDuplicates) or by key (keep a specific survivor). - Keep-latest per key =
ROW_NUMBER() OVER (PARTITION BY key ORDER BY ts DESC)then= 1—ROW_NUMBERbecause it's unique per partition. - Streaming dedup needs a watermark — plain
dropDuplicateson a stream = unbounded state. PreferdropDuplicatesWithinWatermark. No watermark ⇒ wrong. - Recommended streaming shape: dedup-in-batch + insert-only
MERGE(idempotent;foreachBatchmerge must be idempotent).dropDuplicatesdedups the batch, not the target — use the merge for cross-history.txnAppId/txnVersion= retry-safe writes. - MERGE errors on multiple source rows matching one target → dedup source first. Collision:
dropDuplicates(identity, arbitrary) vsMERGE(key, chosen). Change feeds →AUTO CDC.
Before you move on — say these without scrolling up
- The two questions to answer before any dedup.
- Keep-latest per key — which window function exactly, and why not
RANK? - Streaming
dropDuplicateswith no watermark — what grows, and what's the fix? - A nightly
dropDuplicates-then-append still leaves dupes in the target — why, and what fixes it?
Next: once deduped, we enforce that the data is correct — expectations and constraints, and what each does to a bad row → [Data quality — expectations and constraints, and what happens to a bad row](/lessons/s3-data-quality/).