You can fine-tune Kafka producers using configuration properties to optimize the streaming of data to consumers. Get the tuning right, and even a small adjustment to your producer configuration can make a significant improvement to the way your producers operate.
In this post we’ll discuss typical tuning considerations for Kafka producers.
Optimizing Kafka producers
Obviously, we want our producers to deliver data to Kafka topics as efficiently as possible. But what do we mean by this, and how do we quantify it? Do we base this on the number of messages sent over a set period of time? Or on how producers are set up to handle failure?
Before starting your adventure in optimization, think about your destination. What are the results your are hoping to achieve? Think long enough about this, and you might find competing requirements. For example, by maximizing throughput you might also increase latency.
Be prepared to make adjustments to your adjustments.
How is your producer performing?
It’s only when you have been monitoring the performance of your producers for some time that you can gauge how best to tune their performance.
To begin with, you might start with a basic producer configuration in development as a benchmark. When you start to analyze producer metrics to see how the producers actually perform in typical production scenarios, you can make incremental changes and make comparisons until you hit the sweet spot.
If you want to read more about performance metrics for monitoring Kafka producers, see Kafka’s Producer Sender Metrics.
When you start investigating how you to tune the performance of your producers, look at how your producers perform on average.
For example, broker restarts will have an outsize impact on very high (99%) percentile latencies. So you might concentrate on tuning your producer to achieve a latency target within a narrower bound under more typical conditions.
Basic producer configuration
Before looking at the properties to use for fine-tuning your producer, let’s assume we have a basic configuration.
Something like this.
bootstrap.servers=localhost:9092 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer client.id=my-client compression.type=gzip
This configuration specifies the bootstrap address for connection to the Kafka cluster, and the serializers that transform the key and value of a message from from a String to its corresponding raw byte data representation.
Optionally, it’s good practice to add a unique client ID, which is used to identify the source of requests in logs and metrics.
Compression is useful for improving throughput and reducing the load on storage, but might not be suitable for low latency applications where the cost of compression or decompression could be prohibitive. More on compression later.
In our basic configuration, acknowledgments only confirm that messages have reached the broker, and there is no guarantee on message ordering.
We can change that…
Let’s look at how you can add or change producer configuration properties for fine-tuning.
Here are the properties we’ll consider:
- min.insync.replicas (topics)
We won’t discuss the properties in isolation here, as we’ll be concentrating on how to use them to achieve a certain result.
We’ll look at how you can use a combination of these properties to regulate:
- Data durability to minimize data loss when passing messages
- Ordered delivery of messages
- Reliability guarantees on message transactions involving batches of related messages
- Throughput measured in the number of messages processed over a specific period
- Latency measured in the time it takes for messages reach the broker
It’s quite likely you’ll want to balance throughput and latency targets whilst also minimizing data loss and guaranteeing ordering.
If you want to read more about what each property does, see Kafka’s Producer configs.
Remember, the producer configuration properties you can use will also be driven by the requirements of your application. Avoid any change that breaks a property or guarantee provided by your application.
It you want to reduce the likelihood that messages are lost, use message delivery acknowledgments.
acks=all in your producer configuration to force a partition leader to replicate messages to a certain number of followers before acknowledging that the message request was successfully received.
# ... acks=all # ...
acks=all producer configuration in conjunction with the
min.insync.replicas property for topics. You set the
min.insync.replicas property in the
min.insync.replicas configuration sets the numbers of brokers that need to have logged a message before an acknowledgment is sent to the producer.
apiVersion: kafka.strimzi.io/v1alpha1 kind: KafkaTopic metadata: name: my-topic labels: strimzi.io/cluster: my-cluster spec: partitions: 1 replicas: 3 config: min.insync.replicas: 2 #...
Using a topic replication factor of 3, and 2 in-sync replicas on other brokers, the producer can continue unaffected if a single broker is unavailable and at least one other broker is in-sync.
If a second broker becomes unavailable, using
acks=all the producer won’t receive acknowledgments and won’t be able to produce more messages.
Because of the additional checks,
acks=all increases the latency between the producer sending a message and receiving acknowledgment. So you will have to consider the trade-off when investigating whether this is the right approach for you.
You have two approaches to guaranteeing the order of message delivery from producers. And they depend, to a large degree, on whether or not you are using
acks=all for data durability.
If you are using
acks=all, you can (and should) enable idempotence for the producer to ensure that messages are delivered only once.
Using idempotence, IDs and sequence numbers are assigned to messages, so that the order is preserved even after a failed delivery.
As idempotence preserves the message ordering, you can speed the process along by increasing the number of in-flight requests you allow at one time using the
Here we see an example configuration showing idempotence enabled, and used with
# ... enable.idempotence=true max.in.flight.requests.per.connection=5 acks=all retries=2147483647
retries property sets the number of retries when resending a failed message request. While that number might look impressive, what we’re saying is, in effect, “retry forever”.
If you are using
delivery.timeout.msin your producer configuration, producer requests will fail before the number of retries has been used if the timeout expires before a successful acknowledgment. The
delivery.timeout.mssets a limit on the time to wait for an acknowledgment of the success or failure to deliver a message. You can choose to leave
retriesunset and use
delivery.timeout.msto perform a similar function instead.
There is a performance cost to introducing additional checks to the order of delivery.
If you prefer not to use
acks=all and idempotency,
another option is to set the number of in-flight requests to 1 (the default is 5) to preserve ordering.
# ... enable.idempotence=false max.in.flight.requests.per.connection=1 retries=2147483647
In this way you avoid a situation where Message-A fails only to succeed after Message-B was already written to the broker.
Idempotence on its own is useful for exactly once writes to a single partition.
But how do we guarantee the reliability of message delivery for exactly once writes for a set of messages across multiple partitions? We use idempotence again, but combine it with a unique transactional ID defined for the producer. Transactions guarantee that messages using the same transactional ID are produced once, and either all are successfully written to the respective logs or none of them are.
You specify a unique transactional ID in the producer configuration,
and also set the maximum allowed time for transactions in milliseconds before a timeout error is returned. The default is
900000 or 15 minutes.
# ... enable.idempotence=true max.in.flight.requests.per.connection=5 acks=all retries=2147483647 transactional.id=_UNIQUE-ID_ transaction.timeout.ms=900000 # ...
The transactional ID is registered with the Kafka cluster on the first operation, as well as a producer
epoch checkpoint number, which is used to identify the active producer instance.
Why might we want to identify the active producer? Say an application determines that a producer has failed and creates a new producer instance to restart a transaction. If both producers are now sending messages, duplicate records are being created and we have lost our exactly once integrity.
By specifying a transaction ID, if a new producer instance starts, older instances of the producer are identified by their older
epoch number and fenced-off by Kafka so that their messages are not included. This maintains the integrity of the message passing by ensuring that there is only ever one valid producer with the transactional ID. Each
transactional.id should be used for a unique set of topic partitions.
You can map topic partition names to transactional IDs, or compute the transactional ID from the topic partition names using a function that avoids collisions.
Optimizing throughput and latency
Depending on your objective, Kafka offers a number of configuration parameters and techniques for tuning producer performance for throughput and latency.
Usually, the requirement of a system is to satisfy a particular throughput target for a proportion of messages within a given latency. For example, targeting 50,000 messages per second with 95% of messages being acknowledged within 2 seconds.
Batching and buffering messages
Message batching delays sending messages so that more messages destined for the same broker are batched into a single request. Without message batching, messages are sent without delay.
You can use two properties to set batch thresholds:
batch.sizespecifies a maximum batch size in bytes (default 16384)
linger.msspecifies a maximum duration to fill the batch in milliseconds (default 0 or no delay)
Messages are delayed until either of these thresholds are reached.
For example, if we use
linger.ms to add a a 500ms delay, all the messages accumulated in that time are sent in a single request.
linger.ms=500 batch.size=16384 buffer.memory=33554432
If a maximum
batch.size is also used, a request is sent when messages are accumulated up the maximum batch size, or messages have been queued for longer than
linger.ms — whichever comes sooner.
buffer.memory to configure a buffer memory size that must be at least as big as the batch size, and also capable of accommodating buffering, compression and in-flight requests.
Size is important. If the batch threshold is too big for the frequency of the messages produced, you’re adding unnecessary delay to the messages waiting in the send buffer. You’re also allocating more buffer memory than you need. If the batch threshold is too small, larger messages can be delayed.
What you gain in higher throughput you concede with the buffering that adds higher latency to the message delivery. It’s a compromise, so you will need to consider how to strike the right balance.
send() blocking and latency
Batching and buffering also mitigates the impact of
send()blocking on latency.
When your application calls
KafkaProducer.send(), the messages produced are:
- Processed by any interceptors
- Assigned to a partition
- Added to a batch of messages in a per-partition queue
At which point the
send()method returns. So the time
send()is blocked is determined by:
- The time spent in the interceptors, serializers and partitioner
- The compression algorithm used
- The time spent waiting for a buffer to use for compression
Batches will remain in the queue until one of the following occurs:
- The batch is full (according to
- The delay introduced by
- The sender is about to send message batches for other partitions to the same broker, and it is possible to add this batch too
- The producer is being flushed or closed
Compressing message batches
Compressing data batches using the
compression.type property improves throughput and reduces the load on storage, but might not be suitable for low-latency applications where the cost of compression or decompression is prohibitive.
Message compression adds latency in the producer (CPU time spent compressing the messages), but makes requests (and potentially disk writes) smaller, which can increase throughput.
compression.type property to specify a valid compression codec.
You can choose
zstd, each of which have varying compression speeds.
If you think compression is worthwhile, the best type of compression to use will depend on the messages being sent.
Compression is handled on the thread calling
KafkaProducer.send(), so if the latency of this method matters for your application you can add more more threads.
Pipelining might sound like it has something to do with surfing the famous Pipeline reef in Hawaii, but it’s actually the sending of more requests from producers before the response to a previous request has been received.
For pipelining we use our old friend the
You might recall its contribution to ordered delivery earlier in this post.
Moving message requests along more frequently will obviously improve throughput. But there is a point at which you might see less beneficial effects, such as less efficient batching.
Adjusting message delivery waiting time
Improve throughput of your message requests by adjusting the maximum time to wait before a message is delivered and completes a send request.
delivery.timeout.ms property to specify the maximum time in milliseconds to wait for a complete send request.
You can set the value to
MAX_LONG to delegate to Kafka an indefinite number of retries.
You can also direct messages to a specified partition by writing a custom partitioner to replace Kafka’s default, and specify the class name using the
partitioner.class property. A custom partitioner allows you to choose how you map messages to partitions, based on the data in the message.
# ... delivery.timeout.ms=120000 partitioner.class=my-custom-partitioner # ...
Adapt to survive
Fine-tuning your producers helps alleviate performance issues. But don’t be tempted to make a few adjustments and think your work is done.
You should consider fine-tuning as part of a continual optimization process. Monitor Kafka regularly. Look for changing trends in usage and investigate how fine-tuning can help your Kafka deployment adapt. But, as you’ll know, this is only one half of the story.
Next time we’ll look at how you can optimize your consumers.
Be sure to check back for the next installment.