[The one job — and the two axes everything lives on](/lessons/f1-the-one-job/) ended on a cliffhanger: take Way 1 — new records arriving — into its hardest home, streaming. Here we are.
Streaming is Way 1 with the brakes cut: the source never stops. There is no "end of the data" where the engine can finally give up and rebuild the target from scratch. So it has no choice — it must keep the target correct using only the new rows, forever. That single constraint generates this whole lesson. Hold the lesson as one question and everything below is an answer to it:
How do you keep a result correct from new rows alone, without ever looking back at the history?
One orientation before we build. Structured Streaming is the engine — the machine inside Spark that actually does this. You either drive it directly (spark.readStream … .writeStream in a notebook — procedural) or let Lakeflow SDP drive it for you (declarative). Same engine, two steering wheels; SDP runs on top of Structured Streaming, never beside it. So "Structured Streaming vs SDP" is never a versus — it's "which wheel fits the job." [Lakeflow Spark Declarative Pipelines](/lessons/s1-lakeflow-sdp/) is the wheel; this lesson is the engine.
The spine (read this part slowly)
Beat 1 — the one picture: a stream is a table that never stops growing
Structured Streaming does not hand you a new API to memorise. It treats a live stream as a table with rows forever being appended to the bottom, and lets you write the same query you'd write against an ordinary table. Look:
spark.read.table("orders") # static: answers once
spark.readStream.table("orders") # streaming: the SAME transformation, answered forever
Same logic — the streaming one just never stops answering as new orders land. Name the unit while it's in front of you: that whole running thing — readStream → transform → writeStream, kept alive as data arrives — is a streaming query. One never-ending job.
Lock it. A stream = an unbounded input table; a streaming query keeps its result correct as rows append. That is the entire conceptual leap — everything else is how.
Beat 2 — the problem the unbounded table creates: memory
Now the first real fork, and I want you to answer it before I do.
A new order lands. Your target is revenue per restaurant. You do not re-add ten million old orders — you take this one order and add its amount to that restaurant's running total (f1's anchor: only the work the change demands). So:
Predict: where is that running total kept, between one order arriving and the next?
…
Not in the new row — a row only knows itself. It's in a memory the engine carries forward from batch to batch. That memory is state. You already met the word in [The one job — and the two axes everything lives on](/lessons/f1-the-one-job/); streaming is where it becomes a physical thing with a size and a home.
And it cuts every transform into two kinds. Here are two — classify them yourself before reading on:
- Convert each order's timestamp to UTC.
- Count orders per restaurant so far.
The first needs nothing but the row in front of it → stateless. The second needs the count carried from every earlier order → stateful, and that carried-forward count is the state. That's the whole test, and it's worth memorising because a dozen exam questions collapse into it:
The test: to produce the answer for this one record, do I need anything besides the record itself? No → stateless. Yes, I need memory of earlier rows → stateful.
Run it down a column and it never fails you:
| Transform | Needs memory of past rows? | Type |
|---|---|---|
| Convert each order's time to UTC | No | stateless |
| Drop orders with a null price | No | stateless |
| Running count of orders today | Yes — the count so far | stateful |
| Revenue total per restaurant | Yes — each restaurant's total so far | stateful |
"Seen this order_id before?" dedup | Yes — the set of seen ids | stateful |
| Join the orders stream to the payments stream | Yes — unmatched rows waiting for a partner | stateful |
Three transforms are the stateful set worth banking together — aggregations, deduplication, stream-stream joins. They're precisely the ones whose notepad can explode. Recall from [The one job — and the two axes everything lives on](/lessons/f1-the-one-job/): the notepad grows with distinct groups (keys), not rows. Revenue per 500 restaurants → ~500 running totals. The same rows counted per customer_id across 50 million customers → a 50-million-entry notepad that kills the job. High-cardinality grouping = the state-explosion trap.
Lock it. Stateful = "needs memory of earlier rows." The trio: aggregations, dedup, stream-stream joins. State size tracks keys, not rows.
Beat 3 — where state lives, and why a restart doesn't lose it
Quick, because you already own the picture. In [The one job — and the two axes everything lives on](/lessons/f1-the-one-job/) I gave you the desk and the filing cabinet: the engine works at a desk (cluster memory) but continuously copies its notepad into a filing cabinet it never loses. Here are the real names behind that picture:
- the desk's memory is the state store — on Databricks that's RocksDB, a key-value store kept on the cluster's local disk/memory, deliberately off the JVM heap so a giant notepad can't choke Spark's garbage collection;
- the filing cabinet is the checkpoint in cloud storage, which the engine writes state into as it goes.
So on restart the engine reloads the checkpoint and picks the totals back up exactly where it left off.
Lock it. notepad = state store = RocksDB on the cluster, copied to the checkpoint so it survives a crash/restart.
Beat 4 — the leash: what stops the notepad growing forever
Now the question you should already be uneasy about: the stream never ends, and a stateful query keeps remembering — so doesn't the notepad grow without limit until the job dies?
It would. The watermark is the leash.
First, why the engine can't just drop old state the instant a window's clock passes — late data. An event stamped 12:00 might not arrive until 12:09 (network lag, a retry, a slow phone). Close the 12:00 window the moment the clock hits 12:05 and that straggler is lost forever. So the engine must hold windows open a while. The watermark is the exact rule for how long:
df.withWatermark("event_time", "10 minutes") # don't wait for data more than 10 min behind the latest seen
.groupBy(window("event_time", "5 minutes"), "device_id")
.agg(avg("temperature"))
The engine tracks the latest event_time it has seen; once a window falls more than the watermark (10 min) behind that latest time, it declares the window closed, emits its result, and erases those lines from the notepad. So the watermark is two things at once: (a) how much lateness you tolerate, and (b) when the engine is allowed to throw away old state. No watermark → unbounded state; watermark → bounded.
One separation the exam baits directly: the window (5 min) is how you slice time; the watermark (10 min) is how long you wait before closing a slice. "Maintain state for 10 minutes for late data" is watermark language, not window length. (The method really is withWatermark(eventTimeColumn, lateness) — awaitArrival / slidingWindow are invented distractor names.)
Lock it. Watermark = how long you wait for late data = when old state may be dropped. It is the one thing keeping stateful state bounded.
That's the spine. Unbounded table → some transforms need state → state lives in RocksDB and is protected by the checkpoint and bounded by the watermark. If those four beats hold, the rest is dials you set — and they click fast now.
The dials (skim on first read; return when a question needs one)
Everything here is a knob you turn once you understand state. None of it is a new idea — each is a consequence of the spine.
◆ Dial — output mode: what gets written each pass
A streaming query must decide what to emit on each trigger, and statefulness dictates the choice.
| Mode | Writes | Required when |
|---|---|---|
| Append (default) | only brand-new rows; existing rows never touched | stateless queries, or windowed aggregates after the watermark closes the window |
| Update | only rows that changed since the last trigger | running aggregations that keep changing (a live COUNT/SUM per group) |
| Complete | the entire result table, rewritten every trigger | small aggregations only — full recompute, expensive |
Predict: a live COUNT(*) per restaurant keeps revising existing rows as orders arrive. Can Append write that? No — Append can only add new rows, never revise one. So a moving aggregation needs Update (or Complete). Append is legal for an aggregation only after the watermark has finalised each window so it can never change again.
◆ Dial — trigger: how often a batch runs (a separate axis from output mode)
| Trigger | Behaviour |
|---|---|
| default (none) | micro-batches as fast as possible, continuously |
processingTime="30 seconds" | a micro-batch on a fixed clock interval |
availableNow=True | process all currently-available new data since the last checkpoint, then stop |
once=True | one batch then stop — deprecated, superseded by availableNow |
| continuous | experimental sub-second mode — not production-ready |
availableNow is streaming machinery doing an incremental batch — wakes, consumes what's new since the checkpoint, writes, stops. That's why "incremental batch" and "streaming" blur on Databricks (same engine; the trigger is the dial). "Process all pending data then stop, cheaply, on a schedule" → availableNow on a scheduled job. Recovery config to bank: a streaming job = new job cluster + unlimited retries + max concurrent runs = 1 (isolated, auto-restart, no two copies fighting one checkpoint).
◆ The name-trap: two things both called "state", two called "checkpoint"
The single most-tested confusion in streaming — pin it. Every streaming query — even stateless — keeps a checkpoint: a bookmark of how far it read the source (.option("checkpointLocation", …)). A stateful query's checkpoint also holds the notepad. So:
| Word | Meaning A | Meaning B |
|---|---|---|
| state | progress — how far the stream has read (every stream, even stateless) | operation state — the notepad for aggregations/joins/dedup (stateful only) |
| checkpoint | streaming checkpoint — checkpointLocation, holding progress + operation state for crash recovery | Delta-log checkpoint — a periodic Parquet snapshot of a table's transaction log (nothing to do with streaming; see [How Delta Lake works — the transaction log](/lessons/f2-delta-transaction-log/)) |
Two consequences the exam tests:
- "Streaming ingestion uses a checkpoint" does not make it stateful — that's just the progress bookmark. Stateless-with-a-checkpoint is the normal case.
- Each stream needs its own checkpoint directory. Point two streams at one directory and each clobbers the other's place in the book, so neither can recover. (Concurrent writes to the same table are fine; sharing a checkpoint never is.)
◆ The two streaming joins (opposite machinery)
- Stream–static (stream + a static Delta lookup) — the stream drives; the static side is re-read at its latest Delta version every micro-batch (
[How Delta Lake works — the transaction log](/lessons/f2-delta-transaction-log/)), so it picks up changes automatically — nothing buffered, effectively stateless. Illegal case: a fullouterjoin with a stream is invalid. - Stream–stream (both streams, e.g. impressions ↔ clicks) — the engine buffers unmatched rows from both sides as state until a partner arrives. To bound it you need both a watermark on each side and a time-range condition (
clickTime BETWEEN impressionTime AND + interval 1 hour). Without the time bound, state grows forever.
Stream-static leans on the table's current version (cheap); stream-stream leans on the engine's buffered state (watermark + time bound).
◆ The seat in the medallion flow
- Bronze = a stateless stream (Auto Loader/Kafka → append as-is): a progress checkpoint, no notepad. The "easy" streaming — replicate, don't remember.
- Silver/Gold = the stateful trio (dedup/SCD in silver, aggregations/joins in gold): notepad, watermark, Update mode. Complexity concentrates where a transform needs memory.
- Transforms read the previous layer, not the source (source → bronze → silver → gold) — the source may be transient (Kafka retains days); bronze is your durable, replayable copy, so a buggy silver step re-runs against bronze.
- Precision: stateless-vs-stateful is a property of the transform, not the layer.
Takeaways (rebuild it from these)
- A stream is an unbounded table; a streaming query keeps its result correct incrementally — the whole model.
- Stateless vs stateful = "do I need memory of earlier rows?" The stateful trio is aggregations, dedup, stream-stream joins; their notepad grows with distinct keys, not rows.
- State physically = RocksDB, backed up to the checkpoint so it survives restarts. Watermark bounds it — how long you wait for late data = when old state may be dropped.
- Two "state"/"checkpoint" meanings: progress bookmark (every stream) vs operation notepad (stateful only); streaming checkpoint vs Delta-log checkpoint. Each stream needs its own checkpoint directory.
- Output mode follows statefulness (running aggregation → Update); trigger is a separate pace dial (
availableNow= incremental-batch-as-stream).
Before you move on — say these out loud (don't scroll up)
If one won't come, that's the line to reread — that gap is exactly the "doubt" we're hunting:
- Why is a streaming query forced to be incremental — what did the never-ending source take away?
- The one test that sorts every transform into stateless vs stateful — state it.
- What physically holds state, and what makes it survive a restart?
- A stateful stream's notepad would grow forever — what bounds it, and by what rule?
- A live per-group COUNT keeps changing — which output mode, and why can't Append do it?
From here three doors open onto this same state model: letting the engine own the machinery declaratively (→ [Streaming tables vs materialized views](/lessons/s1-streaming-tables-vs-mv/), [Lakeflow Spark Declarative Pipelines](/lessons/s1-lakeflow-sdp/)), CDC upserts riding on it (→ [APPLY CHANGES — CDC and SCD, declaratively](/lessons/s1-apply-changes-cdc/)), and the two faces of deduplication depending on whether a duplicate lands inside or outside the watermark (→ [Deduplication — distinct, keep-latest, and the streaming state trap](/lessons/s3-dedup/)).