Lessons

Developing Code for Data Processing

Structured Streaming & the state model

Compare Spark Structured Streaming and Lakeflow Spark Declarative Pipelines to determine the optimal approach; build reliable batch and streaming pipelines. (Underlies streaming across Sections 1 and 3.)

[The one job — and the two axes everything lives on](/lessons/f1-the-one-job/) ended on a cliffhanger: take Way 1 — new records arriving — into its hardest home, streaming. Here we are.

Streaming is Way 1 with the brakes cut: the source never stops. There is no "end of the data" where the engine can finally give up and rebuild the target from scratch. So it has no choice — it must keep the target correct using only the new rows, forever. That single constraint generates this whole lesson. Hold the lesson as one question and everything below is an answer to it:

How do you keep a result correct from new rows alone, without ever looking back at the history?

One orientation before we build. Structured Streaming is the engine — the machine inside Spark that actually does this. You either drive it directly (spark.readStream … .writeStream in a notebook — procedural) or let Lakeflow SDP drive it for you (declarative). Same engine, two steering wheels; SDP runs on top of Structured Streaming, never beside it. So "Structured Streaming vs SDP" is never a versus — it's "which wheel fits the job." [Lakeflow Spark Declarative Pipelines](/lessons/s1-lakeflow-sdp/) is the wheel; this lesson is the engine.


The spine (read this part slowly)

Beat 1 — the one picture: a stream is a table that never stops growing

Structured Streaming does not hand you a new API to memorise. It treats a live stream as a table with rows forever being appended to the bottom, and lets you write the same query you'd write against an ordinary table. Look:

spark.read.table("orders")        # static: answers once
spark.readStream.table("orders")  # streaming: the SAME transformation, answered forever

Same logic — the streaming one just never stops answering as new orders land. Name the unit while it's in front of you: that whole running thing — readStream → transform → writeStream, kept alive as data arrives — is a streaming query. One never-ending job.

Lock it. A stream = an unbounded input table; a streaming query keeps its result correct as rows append. That is the entire conceptual leap — everything else is how.

Beat 2 — the problem the unbounded table creates: memory

Now the first real fork, and I want you to answer it before I do.

A new order lands. Your target is revenue per restaurant. You do not re-add ten million old orders — you take this one order and add its amount to that restaurant's running total (f1's anchor: only the work the change demands). So:

Predict: where is that running total kept, between one order arriving and the next?

Not in the new row — a row only knows itself. It's in a memory the engine carries forward from batch to batch. That memory is state. You already met the word in [The one job — and the two axes everything lives on](/lessons/f1-the-one-job/); streaming is where it becomes a physical thing with a size and a home.

And it cuts every transform into two kinds. Here are two — classify them yourself before reading on:

The first needs nothing but the row in front of it → stateless. The second needs the count carried from every earlier order → stateful, and that carried-forward count is the state. That's the whole test, and it's worth memorising because a dozen exam questions collapse into it:

The test: to produce the answer for this one record, do I need anything besides the record itself? No → stateless. Yes, I need memory of earlier rows → stateful.

Run it down a column and it never fails you:

TransformNeeds memory of past rows?Type
Convert each order's time to UTCNostateless
Drop orders with a null priceNostateless
Running count of orders todayYes — the count so farstateful
Revenue total per restaurantYes — each restaurant's total so farstateful
"Seen this order_id before?" dedupYes — the set of seen idsstateful
Join the orders stream to the payments streamYes — unmatched rows waiting for a partnerstateful

Three transforms are the stateful set worth banking together — aggregations, deduplication, stream-stream joins. They're precisely the ones whose notepad can explode. Recall from [The one job — and the two axes everything lives on](/lessons/f1-the-one-job/): the notepad grows with distinct groups (keys), not rows. Revenue per 500 restaurants → ~500 running totals. The same rows counted per customer_id across 50 million customers → a 50-million-entry notepad that kills the job. High-cardinality grouping = the state-explosion trap.

Lock it. Stateful = "needs memory of earlier rows." The trio: aggregations, dedup, stream-stream joins. State size tracks keys, not rows.

Beat 3 — where state lives, and why a restart doesn't lose it

Quick, because you already own the picture. In [The one job — and the two axes everything lives on](/lessons/f1-the-one-job/) I gave you the desk and the filing cabinet: the engine works at a desk (cluster memory) but continuously copies its notepad into a filing cabinet it never loses. Here are the real names behind that picture:

So on restart the engine reloads the checkpoint and picks the totals back up exactly where it left off.

Lock it. notepad = state store = RocksDB on the cluster, copied to the checkpoint so it survives a crash/restart.

Beat 4 — the leash: what stops the notepad growing forever

Now the question you should already be uneasy about: the stream never ends, and a stateful query keeps remembering — so doesn't the notepad grow without limit until the job dies?

It would. The watermark is the leash.

First, why the engine can't just drop old state the instant a window's clock passes — late data. An event stamped 12:00 might not arrive until 12:09 (network lag, a retry, a slow phone). Close the 12:00 window the moment the clock hits 12:05 and that straggler is lost forever. So the engine must hold windows open a while. The watermark is the exact rule for how long:

df.withWatermark("event_time", "10 minutes")   # don't wait for data more than 10 min behind the latest seen
  .groupBy(window("event_time", "5 minutes"), "device_id")
  .agg(avg("temperature"))

The engine tracks the latest event_time it has seen; once a window falls more than the watermark (10 min) behind that latest time, it declares the window closed, emits its result, and erases those lines from the notepad. So the watermark is two things at once: (a) how much lateness you tolerate, and (b) when the engine is allowed to throw away old state. No watermark → unbounded state; watermark → bounded.

One separation the exam baits directly: the window (5 min) is how you slice time; the watermark (10 min) is how long you wait before closing a slice. "Maintain state for 10 minutes for late data" is watermark language, not window length. (The method really is withWatermark(eventTimeColumn, lateness)awaitArrival / slidingWindow are invented distractor names.)

Lock it. Watermark = how long you wait for late data = when old state may be dropped. It is the one thing keeping stateful state bounded.

That's the spine. Unbounded table → some transforms need state → state lives in RocksDB and is protected by the checkpoint and bounded by the watermark. If those four beats hold, the rest is dials you set — and they click fast now.


The dials (skim on first read; return when a question needs one)

Everything here is a knob you turn once you understand state. None of it is a new idea — each is a consequence of the spine.

◆ Dial — output mode: what gets written each pass

A streaming query must decide what to emit on each trigger, and statefulness dictates the choice.

ModeWritesRequired when
Append (default)only brand-new rows; existing rows never touchedstateless queries, or windowed aggregates after the watermark closes the window
Updateonly rows that changed since the last triggerrunning aggregations that keep changing (a live COUNT/SUM per group)
Completethe entire result table, rewritten every triggersmall aggregations only — full recompute, expensive

Predict: a live COUNT(*) per restaurant keeps revising existing rows as orders arrive. Can Append write that? No — Append can only add new rows, never revise one. So a moving aggregation needs Update (or Complete). Append is legal for an aggregation only after the watermark has finalised each window so it can never change again.

◆ Dial — trigger: how often a batch runs (a separate axis from output mode)

TriggerBehaviour
default (none)micro-batches as fast as possible, continuously
processingTime="30 seconds"a micro-batch on a fixed clock interval
availableNow=Trueprocess all currently-available new data since the last checkpoint, then stop
once=Trueone batch then stop — deprecated, superseded by availableNow
continuousexperimental sub-second mode — not production-ready

availableNow is streaming machinery doing an incremental batch — wakes, consumes what's new since the checkpoint, writes, stops. That's why "incremental batch" and "streaming" blur on Databricks (same engine; the trigger is the dial). "Process all pending data then stop, cheaply, on a schedule" → availableNow on a scheduled job. Recovery config to bank: a streaming job = new job cluster + unlimited retries + max concurrent runs = 1 (isolated, auto-restart, no two copies fighting one checkpoint).

◆ The name-trap: two things both called "state", two called "checkpoint"

The single most-tested confusion in streaming — pin it. Every streaming query — even stateless — keeps a checkpoint: a bookmark of how far it read the source (.option("checkpointLocation", …)). A stateful query's checkpoint also holds the notepad. So:

WordMeaning AMeaning B
stateprogress — how far the stream has read (every stream, even stateless)operation state — the notepad for aggregations/joins/dedup (stateful only)
checkpointstreaming checkpointcheckpointLocation, holding progress + operation state for crash recoveryDelta-log checkpoint — a periodic Parquet snapshot of a table's transaction log (nothing to do with streaming; see [How Delta Lake works — the transaction log](/lessons/f2-delta-transaction-log/))

Two consequences the exam tests:

◆ The two streaming joins (opposite machinery)

Stream-static leans on the table's current version (cheap); stream-stream leans on the engine's buffered state (watermark + time bound).

◆ The seat in the medallion flow

Takeaways (rebuild it from these)

  1. A stream is an unbounded table; a streaming query keeps its result correct incrementally — the whole model.
  2. Stateless vs stateful = "do I need memory of earlier rows?" The stateful trio is aggregations, dedup, stream-stream joins; their notepad grows with distinct keys, not rows.
  3. State physically = RocksDB, backed up to the checkpoint so it survives restarts. Watermark bounds it — how long you wait for late data = when old state may be dropped.
  4. Two "state"/"checkpoint" meanings: progress bookmark (every stream) vs operation notepad (stateful only); streaming checkpoint vs Delta-log checkpoint. Each stream needs its own checkpoint directory.
  5. Output mode follows statefulness (running aggregation → Update); trigger is a separate pace dial (availableNow = incremental-batch-as-stream).

Before you move on — say these out loud (don't scroll up)

If one won't come, that's the line to reread — that gap is exactly the "doubt" we're hunting:

  1. Why is a streaming query forced to be incremental — what did the never-ending source take away?
  2. The one test that sorts every transform into stateless vs stateful — state it.
  3. What physically holds state, and what makes it survive a restart?
  4. A stateful stream's notepad would grow forever — what bounds it, and by what rule?
  5. A live per-group COUNT keeps changing — which output mode, and why can't Append do it?

From here three doors open onto this same state model: letting the engine own the machinery declaratively (→ [Streaming tables vs materialized views](/lessons/s1-streaming-tables-vs-mv/), [Lakeflow Spark Declarative Pipelines](/lessons/s1-lakeflow-sdp/)), CDC upserts riding on it (→ [APPLY CHANGES — CDC and SCD, declaratively](/lessons/s1-apply-changes-cdc/)), and the two faces of deduplication depending on whether a duplicate lands inside or outside the watermark (→ [Deduplication — distinct, keep-latest, and the streaming state trap](/lessons/s3-dedup/)).

Prerequisites

Leads to