
Last Updated: Apache Kafka version 4.0 and Responsive SDK version 0.39.0
Learning Objective: By the end of this Kafka Streams 101 lesson you will understand which configurations are most critical for a reliable and efficient deployment.
Quick Summary
- Highlights the most important configurations to look out for
- Discusses the way different configurations relate and impact one another
- Suggested configurations to tweak during an incident
Concept Overview
Kafka Streams is a powerful library for building stream processing applications. However, the default settings may not always align with your requirements. Configuring Kafka Streams correctly is essential to achieve reliability, performance and consistency.
Note that a full configuration reference is available here
There are three types of configurations that go into configuring your application:
- Kafka Streams Configurations: these define the configurations that are specific to the Kafka Streams runtime
- Consumer Configurations: these are configurations that specify the behavior of the Kafka Consumer, which is embedded in Kafka Streams to consume messages from Kafka topics
- Producer Configurations: these are configurations that specify the behavior of the Kafka Producer, which is also embedded in Kafka Streams to product messages to Kafka topics
Note that for the consumer configurations, Kafka Streams has three different groupings of consumers:
Grouping | Prefix | Description |
---|---|---|
Main | main.consumer | the default consumer of stream source |
Restore | restore.consumer | used for recovery and standby tasks |
Global | global.consumer | used to construct GlobaKTables |
All Consumers | consumer | configurations that apply to all consumers |
All Producers | producer | configurations that apply to all producers |
Admin | admin | configurations that apply to admin instances created |
Topic | topic | configurations that are applied to topics created (changelog and repartition topics) |
The prefix can then be applied to any specific consumer config (e.g. main.consumer.max.poll.interval.ms
) to have it apply only to one of the sub-groups of consumers.
The following section outlines the most important configurations and how they relate to the correctness, performance and scalability of your Kafka Streams application.
Required Configurations
application.id
This is required for any Kafka Streams application and is what allows Kafka Streams to distribute work across multiple machines. Any Kafka Streams instance that uses the same application.id
will share the same consumer group (the application.id
becomes the group.id
).
bootstrap.servers
The URI for your Kafka broker, make sure that all processes with the same application.id use the same bootstrap servers.
Correctness
Kafka Stream is, mostly, correct out of the box without needing to tune configurations. That being said, there are some configurations that are important to consider:
processing.guarantee
The processing.guarantee
can be either at_least_once
or exactly_once_v2
. We wrote an entire blog detailing how exactly_once_v2
works and how to improve configuration under it here. The names are pretty descriptive of what they represent:
at_least_once
will make sure that every record in the input topic is processed at least once, but may be processed multiple times (the result of which will be reflected multiple times in the output topic)exactly_once_v2
makes certain that input records are processed exactly once, even in the face of failure
Perhaps a less clear question is when you should use each. The first step to answering that is “why not use exactly_once_v2
?”. Naturally, there is a tradeoff — and that tradeoff is performance. With exactly_once_v2
processed events aren’t seen by downstream consumers until the Kafka Streams application commits, which happens every commit.interval.ms
(this will be discussed below).
The next step is understanding your use case: can you tolerate repeat computation? In many situations, particularly those that compute statistics, the answer to that is yes.
We wrote an extremely detailed blog post about the ins and outs of exactly once semantics, you can read it here.
X.exception.handler
NOTE: before 4.0 these configurations have
default.
prefixed (e.g.default.X.exception.handler
) but were renamed in 4.0
There are three exception handler configurations in Kafka Streams: deserialization.exception.handler
, production.exception.handler
and processing.exception.handler
.
The deserialization
handler defaults to “log and fail” behavior, which will log the exception message and fail the application. If you have situations where you can accept dropping records, you may configure this to org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
. This allows Kafka Streams to make progress in the face of poison pill messages that can’t be read, but may mask real bugs and cause data loss that should not be ignored and result in incorrect handling.
The production
exception handler defaults to “log and fail” behavior, which will log an exception and fail the Kafka Streams instance. We typically recommend keeping this configuration to avoid the possibility of missing outputs but if you have a special use case (or want to configure the behavior of the handler) you can always create and specify your own class in the configuration.
The processing
exception handler will catch any other exception caused during processing records. The default also will log and fail.
Correctness in Failure Scenarios
Kafka Streams relies on the Apache Kafka brokers for durability. The section in the public documentation describes the recommended resiliency configurations well, so this post will just embed the information there directly for convenience:
Parameter Name | Corresponding Client | Default value | Consider setting to |
---|---|---|---|
acks | Producer | acks=1 | acks=all |
replication.factor (for broker version 2.3 or older) | Streams | -1 (this sets to the broker default, which may be 3 anyway) | 3 |
min.insync.replicas | Broker | 1 | 2 |
Note that of these configurations,
acks
,replication.factor
andmin.insync.replicas
are necessary for correctness. Setting any of those incorrectly will potentially risk data loss on a broker fail-over.
Miscellaneous Correctness Configurations
We won’t go into details on all the less-important configurations related to correctness here, but for completeness here are some configurations that you may want to read about if you have more complex requirements:
default.key.serde
/default.value.serde
- the default serializers and deserializers to use when processing datadefault.timestamp.extractor
- extracts the timestamp from the recordupgrade.from
- see (https://www.responsive.dev/blog/topology-upgrades-kafka-streams-101)max.task.idle.ms
- the maximum amount of time in milliseconds a stream task will stay idle when it is fully caught up on some (but not all) input partitions to wait for producers to send additional records and avoid potential out-of-order record processing across multiple input streams
High Availability
Configuring Kafka Streams for high availability in the face of application or hardware failures requires some careful thought!
num.standby.replicas
The num.standby.replcias
configuration ensures high availability for Kafka Streams applications, but is not necessary for correctness. With the default configuration, an application failure will require restoration from Kafka before processing which may take a significant amount of time for large state stores. We wrote about this in detail in our blog post about why decoupling state from compute in Kafka Streams is critical.
Note that the Responsive SDK does not allow configuring standby replicas since new tasks that use remote storage are immediately caught up.
max.warmup.replicas and acceptable.recovery.lag
The max.warmup.replicas
and acceptable.recovery.lag
make scaling out smoother. When a new node comes online, Kafka Streams will (by default) not assign it any active processing. Instead, based on the two aforementioned configurations, assign it a number of “warmup tasks” that temporarily function as standbys until their lag on the changelog topic is less than the acceptable.recovery.lag
.
We wrote about this configuration in a detailed post on rebalancing, as well as recommended configuration settings depending on your use case.
Note that the Responsive SDK does not use warmup replicas since new tasks that use remote storage are immediately caught up.
max.poll.interval.ms
This is the most important configuration for ensuring high availability and preventing unnecessary rebalances in Kafka Streams. The max.poll.interval.ms
setting governs the maximum time Kafka Streams is allowed to take between calls to poll()
. If a consumer does not call poll()
within this interval, Kafka assumes it has stalled and removes it from the group, triggering a rebalance.
Note that Kafka Streams is intelligent about calling poll. If it notices that more than 1/2 of the max.poll.interval.ms
has already elapsed, it will proactively stop processing records and attempt to poll again.
If you see frequent rebalances due to missed poll intervals, consider increasing the max.poll.interval.ms
from the default of 5 minutes or digging into your application to understand why processing is a batch of records is taking a long time.
session.timeout.ms and heartbeat.interval.ms
These two configurations determine how Kafka detects node failures. The session.timeout.ms
configuration defines the amount of time a Kafka broker will wait for heartbeats from a consumer before considering it dead and triggering a rebalance. Kafka Streams instances send these heartbeats every heartbeat.interval.ms
. The default settings 45s and 3s respectively are pretty good for most use cases, but you may want to tune the session.timeout.ms
if
- (lower) You are more sensitive to failures and cannot tolerate 45s of downtime.
- (higher) You see frequent rebalances caused by heartbeat timeouts that are unexpected.
state.dir
This configuration determines which directory on disk to keep any persisted state. Since Kafka Streams doesn’t know how you have decided to configure your filesystem, it creates a folder in /tmp
as the default directory. For obvious reasons, this isn’t great for persisting durable data! If you have stateful applications, setting this to a durable directory is paramount. If you are running with K8S make sure to use a directory from an attached PVC (such as EBS for EKS on AWS).
Note that if using a shared disk across multiple instances (e.g. EBS) it is important that each instance has a unique state.dir
so that they don’t. clash!
Miscellaneous Resiliency Configurations
We won’t go into details on all the less-important configurations related to resiliency here, but for completeness here are some configurations that you may want to read about if you are facing frequent rebalances or other availability issues:
default.timestamp.extractor
- allows you to specify something other than the timestamp metadata as the canonical timestamp for a recordtransaction.timeout.ms
(producer configuration, if using EOS) - this has a similar effect tosession.timeout.ms
except is applied to the producer commit
Performance
Later on in this 101 series we’ll have a more detailed post on performance, but this section will give a quick glance over the configurations that affect performance.
num.stream.threads
The number of nodes times the number of stream threads determines how many tasks you can process in parallel on one instance. Note that the number of tasks is limited by the number of input partitions for the processor, so it doesn’t help to configure number(instances)
x num.stream.threads
more than there are input partitions times the number of subtopologies (see https://www.responsive.dev/blog/topology-upgrades-kafka-streams-101 for details on identifying subtopologies).
That being said, increasing parallelism is the easiest way to improve throughput in Kafka Streams. If you need to process with more parallelism than the number of partitions you may want to check out the Responsive SDK’s async processing feature, that can process a single task with an additional thread pool of resources.
statestore.cache.max.bytes
This configuration replaces the old cache.max.bytes.buffering
config and configures the maximum in-memory bytes to be used for record caches across all threads. This has performance considerations in two ways:
- A larger cache will make state store lookups more efficient, as more data will be in the cache
- It will buffer aggregations in memory longer before emitting them. Aggregations will not emit an output record for every input record. Instead, they will emit either when
cache.max.bytes.buffering
is full or thecommit.interval.ms
(see below) is reached.
The second point has two further implications. First, it means that you may potentially see output of an aggregation less often, meaning the latency between getting an input and seeing an output will increase. Second, the fact that there are fewer events emit in total means that there is less overall load on downstream topologies.
The general summary is that for higher throughput, increase the value of this configuration, but for lower latency consider decreasing the cache size.
commit.interval.ms (when using EOS)
If you’re using processing.guarantee = exactly_once
downstream consumers will not be able to view the outputs of processing until a commit happens, which happens every commit.interval.ms
. Note that if you do enable EOS, the default of commit.interval.ms
will be reduced from 30s to 100ms to mitigate this consideration.
The downside of having more frequent commit intervals is that the streams application will spend more time in the “commit” phase of processing. Note that this is significant! For EOS applications, lower commit.interval.ms
is the number 1 cause of throughput degradation — if you can afford higher end-to-end latencies consider increasing the value of your commit interval.
topology.optimization
This one is somewhat of a “no-brainer”! The default configuration maintains backwards compatibility with older versions by setting the value to NO_OPTIMIZATIONS
. If you are writing a new Kafka Streams application, there is no downside to set the value of this to ALL
. This will make various optimizations, like merging repartition topics or reusing the input topic as the changelog for KTables. See this excellent blog for more details on how the topology optimizations work.
Miscellaneous Performance Configurations
We won’t go into details on all the less-important configurations related to performance here, but for completeness here are some configurations that you may want to read about if you are not able to keep up with lag:
rack.aware.*
are a set of configurations that enable rack aware assignmentlinger.ms
(for the producer configuration) allows the producer to group more data together to minimize network round trips
Furthermore, keep an eye out for our Kafka Streams 101 post on optimizing performance!
Code Sample Reference
There are two common ways to configure Kafka Streams applications. The first adds configurations directly in the code and the other loads property files. Note that you can mix the two approaches, first loading an initial set of configurations from file and then adding more to the the instantiated Properties
object.
Configuring Inline Code
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-application-id");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// applies only to the restore consumer
props.put("restore.consumer." + ConsumerConfig.MAX_POLL_RECORDS, 1000);
// ...
KafkaStreams streams = new KafkaStreams(topology, props);
Configuring With Properties File
// /path/to/config.properties contents:
//
// application.id=my-application-id
// bootstrap.servers=localhost:9092
// restore.consumer.max.poll.records=1000 // applies only to restore consumer
// ...
Properties props = new Properties();
try (InputStream inputStream = new FileInputStream(Paths.get("/path/to/config.properties"))) {
props.load(inputStream);
}
// ...
KafkaStreams streams = new KafkaStreams(topology, props);
Subscribe to be notified about the next Kafka Streams 101 lesson
Almog Gavra
Co-Founder