Kafka Streams 101: The Most Important Configs

Kafka Streams 101: The Most Important Configs

Last Updated: Apache Kafka version 4.0 and Responsive SDK version 0.39.0

Learning Objective: By the end of this Kafka Streams 101 lesson you will understand which configurations are most critical for a reliable and efficient deployment.

Quick Summary

  • Highlights the most important configurations to look out for
  • Discusses the way different configurations relate and impact one another
  • Suggested configurations to tweak during an incident

Concept Overview

Kafka Streams is a powerful library for building stream processing applications. However, the default settings may not always align with your requirements. Configuring Kafka Streams correctly is essential to achieve reliability, performance and consistency.

Note that a full configuration reference is available here

There are three types of configurations that go into configuring your application:

  1. Kafka Streams Configurations: these define the configurations that are specific to the Kafka Streams runtime
  2. Consumer Configurations: these are configurations that specify the behavior of the Kafka Consumer, which is embedded in Kafka Streams to consume messages from Kafka topics
  3. Producer Configurations: these are configurations that specify the behavior of the Kafka Producer, which is also embedded in Kafka Streams to product messages to Kafka topics

Note that for the consumer configurations, Kafka Streams has three different groupings of consumers:

GroupingPrefixDescription
Mainmain.consumerthe default consumer of stream source
Restorerestore.consumerused for recovery and standby tasks
Globalglobal.consumerused to construct GlobaKTables
All Consumersconsumerconfigurations that apply to all consumers
All Producersproducerconfigurations that apply to all producers
Adminadminconfigurations that apply to admin instances created
Topictopicconfigurations that are applied to topics created (changelog and repartition topics)

The prefix can then be applied to any specific consumer config (e.g. main.consumer.max.poll.interval.ms) to have it apply only to one of the sub-groups of consumers.

The following section outlines the most important configurations and how they relate to the correctness, performance and scalability of your Kafka Streams application.

Required Configurations

application.id

This is required for any Kafka Streams application and is what allows Kafka Streams to distribute work across multiple machines. Any Kafka Streams instance that uses the same application.id will share the same consumer group (the application.id becomes the group.id).

bootstrap.servers

The URI for your Kafka broker, make sure that all processes with the same application.id use the same bootstrap servers.

Correctness

Kafka Stream is, mostly, correct out of the box without needing to tune configurations. That being said, there are some configurations that are important to consider:

processing.guarantee

The processing.guarantee can be either at_least_once or exactly_once_v2. We wrote an entire blog detailing how exactly_once_v2 works and how to improve configuration under it here. The names are pretty descriptive of what they represent:

  • at_least_once will make sure that every record in the input topic is processed at least once, but may be processed multiple times (the result of which will be reflected multiple times in the output topic)
  • exactly_once_v2 makes certain that input records are processed exactly once, even in the face of failure

Perhaps a less clear question is when you should use each. The first step to answering that is “why not use exactly_once_v2?”. Naturally, there is a tradeoff — and that tradeoff is performance. With exactly_once_v2 processed events aren’t seen by downstream consumers until the Kafka Streams application commits, which happens every commit.interval.ms (this will be discussed below).

The next step is understanding your use case: can you tolerate repeat computation? In many situations, particularly those that compute statistics, the answer to that is yes.

We wrote an extremely detailed blog post about the ins and outs of exactly once semantics, you can read it here.

X.exception.handler

NOTE: before 4.0 these configurations have default. prefixed (e.g. default.X.exception.handler) but were renamed in 4.0

There are three exception handler configurations in Kafka Streams: deserialization.exception.handler, production.exception.handler and processing.exception.handler.

The deserialization handler defaults to “log and fail” behavior, which will log the exception message and fail the application. If you have situations where you can accept dropping records, you may configure this to org.apache.kafka.streams.errors.LogAndContinueExceptionHandler. This allows Kafka Streams to make progress in the face of poison pill messages that can’t be read, but may mask real bugs and cause data loss that should not be ignored and result in incorrect handling.

The production exception handler defaults to “log and fail” behavior, which will log an exception and fail the Kafka Streams instance. We typically recommend keeping this configuration to avoid the possibility of missing outputs but if you have a special use case (or want to configure the behavior of the handler) you can always create and specify your own class in the configuration.

The processing exception handler will catch any other exception caused during processing records. The default also will log and fail.

Correctness in Failure Scenarios

Kafka Streams relies on the Apache Kafka brokers for durability. The section in the public documentation describes the recommended resiliency configurations well, so this post will just embed the information there directly for convenience:

Parameter NameCorresponding ClientDefault valueConsider setting to
acksProduceracks=1acks=all
replication.factor (for broker version 2.3 or older)Streams-1 (this sets to the broker default, which may be 3 anyway)3
min.insync.replicasBroker12

Note that of these configurations, acks, replication.factor and min.insync.replicas are necessary for correctness. Setting any of those incorrectly will potentially risk data loss on a broker fail-over.

Miscellaneous Correctness Configurations

We won’t go into details on all the less-important configurations related to correctness here, but for completeness here are some configurations that you may want to read about if you have more complex requirements:

  • default.key.serde / default.value.serde - the default serializers and deserializers to use when processing data
  • default.timestamp.extractor - extracts the timestamp from the record
  • upgrade.from - see (https://www.responsive.dev/blog/topology-upgrades-kafka-streams-101)
  • max.task.idle.ms - the maximum amount of time in milliseconds a stream task will stay idle when it is fully caught up on some (but not all) input partitions to wait for producers to send additional records and avoid potential out-of-order record processing across multiple input streams

High Availability

Configuring Kafka Streams for high availability in the face of application or hardware failures requires some careful thought!

num.standby.replicas

The num.standby.replcias configuration ensures high availability for Kafka Streams applications, but is not necessary for correctness. With the default configuration, an application failure will require restoration from Kafka before processing which may take a significant amount of time for large state stores. We wrote about this in detail in our blog post about why decoupling state from compute in Kafka Streams is critical.

Note that the Responsive SDK does not allow configuring standby replicas since new tasks that use remote storage are immediately caught up.

max.warmup.replicas and acceptable.recovery.lag

The max.warmup.replicas and acceptable.recovery.lag make scaling out smoother. When a new node comes online, Kafka Streams will (by default) not assign it any active processing. Instead, based on the two aforementioned configurations, assign it a number of “warmup tasks” that temporarily function as standbys until their lag on the changelog topic is less than the acceptable.recovery.lag.

We wrote about this configuration in a detailed post on rebalancing, as well as recommended configuration settings depending on your use case.

Note that the Responsive SDK does not use warmup replicas since new tasks that use remote storage are immediately caught up.

max.poll.interval.ms

This is the most important configuration for ensuring high availability and preventing unnecessary rebalances in Kafka Streams. The max.poll.interval.ms setting governs the maximum time Kafka Streams is allowed to take between calls to poll(). If a consumer does not call poll() within this interval, Kafka assumes it has stalled and removes it from the group, triggering a rebalance.

Note that Kafka Streams is intelligent about calling poll. If it notices that more than 1/2 of the max.poll.interval.ms has already elapsed, it will proactively stop processing records and attempt to poll again.

If you see frequent rebalances due to missed poll intervals, consider increasing the max.poll.interval.ms from the default of 5 minutes or digging into your application to understand why processing is a batch of records is taking a long time.

session.timeout.ms and heartbeat.interval.ms

These two configurations determine how Kafka detects node failures. The session.timeout.ms configuration defines the amount of time a Kafka broker will wait for heartbeats from a consumer before considering it dead and triggering a rebalance. Kafka Streams instances send these heartbeats every heartbeat.interval.ms. The default settings 45s and 3s respectively are pretty good for most use cases, but you may want to tune the session.timeout.ms if

  1. (lower) You are more sensitive to failures and cannot tolerate 45s of downtime.
  2. (higher) You see frequent rebalances caused by heartbeat timeouts that are unexpected.

state.dir

This configuration determines which directory on disk to keep any persisted state. Since Kafka Streams doesn’t know how you have decided to configure your filesystem, it creates a folder in /tmp as the default directory. For obvious reasons, this isn’t great for persisting durable data! If you have stateful applications, setting this to a durable directory is paramount. If you are running with K8S make sure to use a directory from an attached PVC (such as EBS for EKS on AWS).

Note that if using a shared disk across multiple instances (e.g. EBS) it is important that each instance has a unique state.dir so that they don’t. clash!

Miscellaneous Resiliency Configurations

We won’t go into details on all the less-important configurations related to resiliency here, but for completeness here are some configurations that you may want to read about if you are facing frequent rebalances or other availability issues:

  • default.timestamp.extractor - allows you to specify something other than the timestamp metadata as the canonical timestamp for a record
  • transaction.timeout.ms (producer configuration, if using EOS) - this has a similar effect to session.timeout.ms except is applied to the producer commit

Performance

Later on in this 101 series we’ll have a more detailed post on performance, but this section will give a quick glance over the configurations that affect performance.

num.stream.threads

The number of nodes times the number of stream threads determines how many tasks you can process in parallel on one instance. Note that the number of tasks is limited by the number of input partitions for the processor, so it doesn’t help to configure number(instances) x num.stream.threads more than there are input partitions times the number of subtopologies (see https://www.responsive.dev/blog/topology-upgrades-kafka-streams-101 for details on identifying subtopologies).

That being said, increasing parallelism is the easiest way to improve throughput in Kafka Streams. If you need to process with more parallelism than the number of partitions you may want to check out the Responsive SDK’s async processing feature, that can process a single task with an additional thread pool of resources.

statestore.cache.max.bytes

This configuration replaces the old cache.max.bytes.buffering config and configures the maximum in-memory bytes to be used for record caches across all threads. This has performance considerations in two ways:

  1. A larger cache will make state store lookups more efficient, as more data will be in the cache
  2. It will buffer aggregations in memory longer before emitting them. Aggregations will not emit an output record for every input record. Instead, they will emit either when cache.max.bytes.buffering is full or the commit.interval.ms (see below) is reached.

The second point has two further implications. First, it means that you may potentially see output of an aggregation less often, meaning the latency between getting an input and seeing an output will increase. Second, the fact that there are fewer events emit in total means that there is less overall load on downstream topologies.

The general summary is that for higher throughput, increase the value of this configuration, but for lower latency consider decreasing the cache size.

commit.interval.ms (when using EOS)

If you’re using processing.guarantee = exactly_once downstream consumers will not be able to view the outputs of processing until a commit happens, which happens every commit.interval.ms. Note that if you do enable EOS, the default of commit.interval.ms will be reduced from 30s to 100ms to mitigate this consideration.

The downside of having more frequent commit intervals is that the streams application will spend more time in the “commit” phase of processing. Note that this is significant! For EOS applications, lower commit.interval.ms is the number 1 cause of throughput degradation — if you can afford higher end-to-end latencies consider increasing the value of your commit interval.

topology.optimization

This one is somewhat of a “no-brainer”! The default configuration maintains backwards compatibility with older versions by setting the value to NO_OPTIMIZATIONS. If you are writing a new Kafka Streams application, there is no downside to set the value of this to ALL. This will make various optimizations, like merging repartition topics or reusing the input topic as the changelog for KTables. See this excellent blog for more details on how the topology optimizations work.

Miscellaneous Performance Configurations

We won’t go into details on all the less-important configurations related to performance here, but for completeness here are some configurations that you may want to read about if you are not able to keep up with lag:

  • rack.aware.* are a set of configurations that enable rack aware assignment
  • linger.ms (for the producer configuration) allows the producer to group more data together to minimize network round trips

Furthermore, keep an eye out for our Kafka Streams 101 post on optimizing performance!

Code Sample Reference

There are two common ways to configure Kafka Streams applications. The first adds configurations directly in the code and the other loads property files. Note that you can mix the two approaches, first loading an initial set of configurations from file and then adding more to the the instantiated Properties object.

Configuring Inline Code

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-application-id");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// applies only to the restore consumer
props.put("restore.consumer." + ConsumerConfig.MAX_POLL_RECORDS, 1000);
// ...
KafkaStreams streams = new KafkaStreams(topology, props);

Configuring With Properties File

// /path/to/config.properties contents:
//
// application.id=my-application-id
// bootstrap.servers=localhost:9092
// restore.consumer.max.poll.records=1000 // applies only to restore consumer
// ...

Properties props = new Properties();
try (InputStream inputStream = new FileInputStream(Paths.get("/path/to/config.properties"))) {
  props.load(inputStream);
}
// ...
KafkaStreams streams = new KafkaStreams(topology, props);

Subscribe to be notified about the next Kafka Streams 101 lesson

A

Almog Gavra

Co-Founder

See all posts