2020-05-03

## Watermarks

### Definition

• Assumption: Any event in the streaming data has an associated logical event timestamp.
• Can use the time of the original event’s occurrence.
• Definition: The watermark is a monotonically increasing timestamp of the oldest work not yet completed.
• Completeness: Watermark allows to know when it’s correct to close a window.
• If watermark has passed a time T, we are guaranteed by its monotonic policy that no more processing would occur for an event with event time at or before T.
• Visibility
• Watermark cannot advance if an event is being stuck in the pipeline.
• We can find the source of problem if this happens.

### Source watermark creation

Watermarks can be perfect or heuristic.

• Perfect watermarks account for all data.
• Heuristic watermarks admit some late-data elements.

#### Perfect watermark creation

This requires perfect knowledge of the input, thus is not impractical for many real-world problems. Following examples can utilize perfect watermarks.

• Ingress timestamping
• Use the ingress times as the event times fot data items.
• Static sets of time-ordered logs.
• Just the minimum timestamp of unprocessed records across all partitions.

#### Heuristic watermark creation

Using heuristic watermarks can lead to late data, but it’s still possible to build a highly accurate heuristic watermarks. For example,

• Dynamic sets of time-ordered logs
• By tracking the minimum timestamp for unprocessed records in known set, growth rates, external knowledge like network topology and bandwidth.
• No guarantee on in-order delivery.

There is no one-fits-all solution. At least, we simplify the problem of tracking completeness in a pipeline, into the problem of creating a watermark at the source.

### Watermark propagation

A pipeline may have multiple independent stages. It’s meaningful to track individual watermarks for these stages. Note that for stages come later, their watermark is older than those of prior stages, since they have seen less records.

For a stage, defining watermarks at boundaries. We can calculate the amount of event-time latency/lag introduced by a stage.

• Input watermark:
• minimum of the output watermarks of its upstream input/sources/stages.
• Output watermark:
• minimum of its input watermark and event times of all non-late active messages within the stage.

Within a stage, processing is not monolithic, as different components/buffer may exist and they can have their own watermarks as well. Therefore, the output watermark of the stage may be the minimum of:

• Per-source watermark: for each sending stage.
• Per-external input watermark.
• Per-state component watermark: for each type of state that can be written.
• Per-output buffer watermark: for each receiving stage.

#### Watermark propagation and output timestamps

Within a stage, processing can be divided into windows in event-time domain. The output timestamp of each window advances the output watermark of this stage. We have several choice for the output timestamp for a window:

• End of the window
• allows the smoothest watermark progression.
• Timestamp of the first non-late element
• Most conservative
• Watermark progression may be likely delayed.
• Output watermark is held until that window processing is complete.
• Timestamp of a specific element
• For some special use cases, it may make sense.

#### The tricky case of overlapping windows

If an element is in 3 overlapping windows, when 1st stage of 1st window complete, the input watermark of the 2nd stage of the 1st window is held by the output watermark of the 1st stage of the 2nd and 3rd windows. This delay is unnecessary. Apache Beam has a special logic for this: output timestamp of N+1 window is always greater than that of N window.

### Percentile Watermarks

Instead of using the minimum of the event timestamps of active messages, we can consider the entire distribution of event timestamps and use percentile watermark.

• A 90-percentile watermark means we are guaranteed to have processed this percentage of all events with earlier timestamp.
• Avoids being delay by outliers.
• A good way to tune the trade-off between latency of materializing results and precision of results.

### Processing-time watermarks

• Event-time based watermark is unable to distinguish delays caused by old data or a delayed system (processing being stuck).
• Processing-time watermark is the processing-time timestamp of the oldest active message.
• Can be used at system-implementation level for tasks such as garbage collections.