Last Updated: Apache Kafka version 4.0 and Responsive SDK version 0.39.0
Learning Objective: By the end of this Kafka Streams 101 lesson you will understand the different types of windows, when to use them and how to reason about out-of-order and late arriving events.
Quick Summary
- Understand the three definitions of time: Stream Time, Event Time & Wall Time
- Learn how to aggregate with windows and when to use Tumbling, Hopping, Sliding and Session windows
- Understand TTL (time to live) implementation options
- Be prepared to handle out of order and late arriving data
Concept Overview
Time
Time is a foundational concept in Kafka Streams that influences how records are processed. Kafka Streams uses three definitions of time to give you flexibility and precision in handling event streams: Stream Time, Event Time, and Wall Time.
A quick comparison of the three:
Time Type | Definition | Use |
---|---|---|
Event Time | Timestamp embedded in the event. | Determines “when” that event happened, which influences in which window to place an event. |
Stream Time | The largest Event Time seen by Kafka Streams on a particular partition. | Handles the progression of time, which determines when a window is “closed” and no longer accepting new data. |
Wall Time | System clock on the processing machine. | Not used for semantic operations, but is helpful for punctuations and suppression (covered in other KS101 lessons). |
Here is a visual representation tying them together:
Windows, Grace & Late-Arriving Records
Windowing divides a topic into smaller chunks based on time intervals or activity. Kafka Streams supports four types of windows:
Window Type | Definition | When to Use |
---|---|---|
Tumbling | Fixed-size, non-overlapping windows | Computations that result in one action per window (e.g. computing a monthly bill for a customer) |
Hopping | Overlapping windows with a fixed size and hop interval | Insight and analytics to know how a statistic is changing over time (e.g. daily average of ERROR messages, reported at one hour granularity) |
Session | Dynamically sized windows based on gaps of inactivity | Applications that do not have predetermined window sizes (e.g. how many pages a user viewed in one session on your website) |
Sliding | New windows are created each time a record enters or drops out of a defined timespan | Used similarly to hopping windows, but when a perfect granularity of each event is necessary (e.g. anomaly or fraud detection) |
The next thing to understand is how windows and the different versions of time are connected. Windows have two concepts: bounds and a grace period. To understand these concepts let’s focus on Tumbling Windows since they’re the simplest:
Bounds: A tumbling window has a window size and a start timestamp. In the example below, a window size of 5 results in three windows with bounds [0,4] : [5, 9] : [15, 19]
where the bounds are inclusive on both ends. Note that there is no data between [11, 14]
so we don’t have a window for that. An event with timestamp 2 falls in [0,4]
and an event with timestamp 18 falls in [15, 19]
.
Grace Period: This defines when a window, based on the max observed stream time, will no longer accept new entries and the results of its computation will be considered final. In the example below, the grace period is also 5, meaning that the window [0,4]
will no longer accept late arriving records after timestamp 10. The first record with ts=3
is accepted “late” because the current stream time at that point is 8, which is less than 10. The second record with ts=3
is rejected because the current stream time at that point is 18, which is greater than 10.
Different Window Types
To simplify the discussion on different windows, we will use illustrations assuming a grace period of zero for the remainder of this section.
Tumbling Windows
We described these in some detail above, but here is a more visual representation of a tumbling window with size 3:
Hopping Windows
Hopping windows are a more general case of tumbling windows. They define a window size and also a customizable “advance” parameter that indicates the start time of the next, potentially overlapping window. With this definition you can see how a tumbling window is a hopping window where the advance is equal to the window size.
Here is a visual example of a hopping window with size 6 and advance 2:
Session Windows
Session windows are unique in that they are not defined by a fixed size. Instead, they will continuously expand until the next event seen is more than an inactivity period after the last event seen. Since this can be confusing to understand without a motivating example, consider a streaming application that scores user sessions on a website:
- A session is considered to end when the user has not done anything on the website for over 15 minutes
- There are signals (such as adding an item to cart, clicking on more than 5 items, clicking on a featured item, etc…) that contribute to the session score
This is perfectly modeled using session windows. So long as page view events keep coming no more than 15 minutes apart for the same user, their window will be extended. When the window closes, the final aggregation that computes their score will be emitted. If they visit the site again in a couple of hours, Kafka Streams will initialize a new window for them as its considered a new session.
See this visual example below, where the session window has an inactivity period of 10:
And the same diagram without animation:
Note on out-of-order events in session windows. Session windows are unique in that an out of order event may merge two windows together. In the example above, if an event comes in with ts=15
windows 1 and 2 would be merged together into a single window because there’s no longer an inactivity gap of 10 between any two events in the window. From that point on, the output of the operator will no longer see outputs for window 1 or window 2 but instead see outputs for the newly combined window.
Sliding Windows
Sliding windows are windows that span events such that the difference between the first and last event in that window are under a fixed threshold. These are similar to hopping windows with fixed size and an advance of 1ms, except they are implemented in a way that ensures identical windows are not re-computed. In other words, a new sliding window is computed each time an event enters or leaves the window.
Below is a visual representation of a sliding window of size 4. Notice how these three events combine to create five different windows.
And the same diagram shown without animation:
Note that in this example there are five windows, containing the following entries:
Bounds | Entries | |
---|---|---|
Window 1 | [0,3] | [3] |
Window 2 | [1,4] | [3,4] |
Window 3 | [3,6] | [3,4,6] |
Window 4 | [4,7] | [4,6] |
Window 5 | [5,8] | [6] |
Emitting (Output) Data
In Kafka Streams, by default, data is output from a windowing aggregation N
times for every input event where N
is the number of active windows that the input event belongs to. Let’s take the sliding window example from above and focus on just one event, the event with t=4
:
When this event is processed, Kafka Streams notes that it belongs to two windows (windows 2 & 3 - window 4 isn't created until t=6
is processed). This means that when event t=4
is processed, the output of this aggregation will emit two different events: the updated aggregation value for window 2 and window 3.
If, for example, the aggregate was simply counting the number of events in each window the output for processing t=4
would be: window-2: 2
and window-3: 1
. Note that we have yet to process t=6
, so this event is not reflected in any of the windows. When t=6
is processed, the aggregate value for window-3
will be updated to 2
and window-4
will be updated to 3
and window-5
will be created and updated to 2
.
Notice that events are output before a window is closed. This means that, if you have an average of K
overlapping windows there will be a 1:K
fanout from the input to the output topic during a windowed aggregation.
What if you don’t want this fanout and instead are only interested in the final aggregate value of a window? This is a concept known as suppression and we’ll discuss it more in a future Kafka Streams 101 lesson. For now, you can read the documentation here: https://kafka.apache.org/21/javadoc/org/apache/kafka/streams/kstream/Suppressed.html
Note on Output Serialization
A common “gotcha” about windows is that the output keys are serialized using a special serializer that encodes not just the key, but the window that it belongs to as well. The simplest way to read the result of a windowed aggregation is to reuse the KTable<Windowed<K>, VR>
that is output from the .aggregate
method on a windowed stream.
If you need to read a windowed stream from a separate application, take a look at the org.apache.kafka.streams.kstream.WindowedSerdes
class, which contains deserializers that wrap you data deserializers to be able to read the result of windowed aggregations.
Bonus: Time to Live (TTL)
Time to Live (TTL) on stored data is useful when you want to automatically delete records from you state stores after some time has passed. TTL is natively supported in ResponsiveKafkaStreams
SDK (see documentation) but is not available in Apache Kafka Streams. To implement time-to-live on data in your state stores using windows is not recommended, instead we recommend the use of an out-of-band processor that reads from the beginning of the changelog topic and manually issues tombstones for expired records stopping when it reaches a timestamp that is not expired.
Code Sample Reference
Window Definition
// Define your window - this particular one creates a hopping window that
// advances every 5 minutes and does not accept any amount of late
// ariving data. See the JavaDoc of TimeWindows for more details on creating
// other Window configurations
TimeWindows window = TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5));
// Create a new KTable as a result of windowing `stream` by the window defined
// above, and counting how many times a specific key appears within a window of
// five minutes
stream.groupByKey()
.windowBy(window)
.count();
Configure Timestamp Extractor
// this timestamp extractor will simply skip records with invalid timestamps
// the default extractor will fail if a timestamp is negative
timestamp.extractor = org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp
Related Error Messages
Skipped Messages
Sample Error:
Skipping record for expired window. topic=[my-input-topic] partition=[10] offset=[30000] timestamp=[1734376015856] window=[1734375960000,1734376020000) expiration=[1734381823146] streamTime=[1734381823146]
Meaning: This message means you have late-arriving data that is out of the retention (window close + grace period). In this example, the error message contains all the information you need to understand what was going on:
- The timestamp of the event was 1734376015856 (or
December 16, 2024 7:06:55 PM
) - The window it falls within the window, but the window is expired
- The difference between
expiration
andstreamTime
is the grace period (in case of the example above, that is 0). Because1734381823146
corresponds toDecember 16, 2024 8:43:43 PM
you can see that the event is about 1.5h too late for that window.
Possible Fixes:
- Extend the grace period to handle delayed events.
- Determine the source for late arriving messages (such as a lagging upstream job) and prevent the messages from being emitted
- Change the window type to better handle late arriving messages
Note: If you see a version of this message that says Topic, partition and offset not known
it’s because the event was generated by a punctuator (this topic is covered in a different Kafka Streams 101 lesson).
Invalid Timestamps
Input record {} has invalid (negative) timestamp.
Skipping record due to negative extracted timestamp. topic=[my-topic] partition=[2] offset=[1000] extractedTimestamp=[-120] extractor=[...]
Meaning: Kafka Streams does not handle records with negative timestamps. See the deep dive section on setting timestamp extractors.
Possible Fixes:
- Set a timestamp extractor that will extract the correct timestamps or have a good fallback (such as
UsePartitionTimeOnInvalidTimestamp
)
Related Metrics
MBean | Metric | Description |
---|---|---|
kafka.streams:type=stream-task-metrics,thread-id=[threadId],task-id=[taskId] | dropped-records-rate | The rate of records dropped. Note that this includes records that are late arriving (after grace) as well as records with null keys and other invalid properties. |
kafka.streams:type=stream-task-metrics,thread-id=[threadId],task-id=[taskId] | dropped-records-total | The total number of records dropped since restarting the Kafka Streams application. |
1 January 29th, 2025 Update: The original post had some off by one errors and minor inconsistencies. Thanks to Matthias J. Sax for proofing and suggesting the updates!
Subscribe to be notified about the next Kafka Streams 101 lesson
Almog Gavra
Co-Founder