All tests were executed against Kafka 4.3.0 using Dimster.
In the last post we used simulated consumer processing time to reveal how important it is to set an appropriate value for max.poll.records to ensure the consumer parallelism that we expect. With a uniform distribution of messages over partitions, the rule of thumb was a value somewhat lower than:
group.share.partition.max.record.locks / number of consumers per partition
But there’s more to parallel consumption than max.poll.records. The size of producer batches also plays a role when using the default share.acquire.mode (batch_optimized).
Some background on share consumer assignment and share.acquire.mode
Share group members are assigned to partitions like consumer group members are, except that share group assignment allows multiple consumers to be assigned to the same partition.
Share group assignments
If the number of share consumers is less than the partition count, then each consumer will be assigned multiple partitions. If the consumer count matches or exceeds the partition count, then each consumer will be assigned one partition.
Fig 1. Share consumer assignments. Left: consumer count < partition count. Right: consumer count > partition count.
When a consumer is assigned only one partition, it will always be fetching from one broker. If a consumer is assigned multiple partitions, it may fetch from multiple brokers concurrently.
share.acquire.mode
There are two values for share.acquire.mode:
batch_optimized(default)record_limit
The Javadoc says the following:
The application chooses between the two modes using the consumer share.acquire.mode configuration property.
If the application sets the property to batch_optimized or does not set it at all, the share consumer fetches records based on batch boundaries which may mean that the number of records returned may exceed the max.poll.records configuration property. The share consumer may also prefetch records and buffer them temporarily awaiting the application's next call to poll(Duration).
If the application sets the property to record_limit, the share consumer fetches no more than records at a time and does not prefetch. This is slower but gives the application tighter control on how many records are fetched and when the acquisition locks begin.
So why two modes?
It comes down to efficiency (batch_optimized) and consumer control (record_limit).
First of all the sentence “the share consumer fetches records based on batch boundaries” is correct but a little misleading. No matter what mode is used, whole batches are returned to the consumer over the wire. In other words, the data sent over the network is always based on batch boundaries as the record batch is the unit of data delivery.
What that sentence refers to is what records are acquired by the consumer and returned to the application:
With
batch_optimized, the configmax.poll.recordsis a soft cap. The consumer acquires any batches (in their entirety) that are covered by the offset range determined bymax.poll.records. These acquired batches are returned to the consumer, and the consumer returns the records of those batches to the calling application (that invokedconsumer.poll(Duration)).With
record_limit, the configmax.poll.recordsis a strict cap. The consumer only acquires the records that are covered by the offset range determined bymax.poll.records(though less if less records are available). However, the unit of data delivery is the record batch, so the consumer receives whole batches but only returns a specific offset range to the calling application.
For example, in the figure below we have three consumers sending fetch requests with max.poll.records=1 and share.acquire.mode=batch_optimized.
Fig 2. Three consumers fetching with batch_optimized
Despite asking for only one record, each consumer acquires and receives records along batch boundaries. The result of consumer.poll(Duration) for c1 is three records, not one.
If we rerun this scenario with record_limit:
c1 acquires record 0
c2 acquires record 1
c3 acquires record 2
However, the batch is the unit of data delivery, so batch 1 is sent in its entirety to each consumer (the consumer internals only returns the acquired records of the batch to the application).
Fig 3. Three consumers fetching with record_limit
This is obviously less efficient… We just sent the same batch three times! Nonetheless, record_limit exists because sometimes that inefficiency over the wire is countered by other concerns (one of which is covered in this post).
Another efficiency gain that batch_optimized has is that because each batch is only sent to one consumer, Kafka only needs to do share group housekeeping of the batch as a whole, not each record individually. This reduces CPU and makes metadata more compact. If we get mixed acknowledgments of the batch records (2 success, 1 reject) only then does the record tracking explode the metadata to be per-record.
With record_limit, the housekeeping always tracks state per record, which is more expensive.
The final difference between the modes is that in batch_optimized mode, a consumer can send concurrent fetches to all the brokers of its assigned partitions. This further increases the number of records that a consumer might receive as max.poll.records is a soft cap per broker. With record_limit, the consumer sends one fetch at a time, round-robin between the brokers of its partitions. This difference only manifests when the consumer count is less than the partition count. We’ll cover this aspect more in the next post.
Implications for parallel processing
The main implications are that:
With
batch_optimized, the effective consumer parallelism can be impacted by the average number of records per record batch.With
record_limit, the network throughput will increase in most scenarios as offset ranges are unlikely to align with batch boundaries. If themax.poll.recordsis larger than the average number of records per batch, then each batch may only be delivered twice. The network throughput can a lot if themax.poll.recordsis much smaller than the average number of records per record batch. Don’t worry if that isn’t clear yet, we’ll gather some empirical results next which should make it clearer.
Let’s test this out with Dimster’s interactive mode, using the same workload as the last post.
Test setup: 300 consumers and 5 ms processing time + larger record batches
In the last post, we calculated that the maximum theoretical consumption rate for 300 consumers with a processing time of 5 ms per message would be 60,000 msg/s. By setting max.poll.records to 30 we reached 55,000 msg/s and then finally reached 60,000 with low end-to-end latency by adding an additional 12 consumers (2 per partition).
So we use the following workload file (no dimensional stuff in this one as we’re going to use live-interaction):
In this test we’re going to make the record batches bigger and see what happens to the consumption rate.
First we start Dimster and ensure it’s handling the 60k msg/s.
./dimster run localBeefy -w run/workloads/run-test/run-sharegroup-proc-time-5ms.yaml
Once it has started and settled in, we see it’s coping well.
If I look at the metrics, the current record batch size is around 5KB with 10 records per batch. The average fetch size is 7KB with 14 records. This means some consumers get 1 record batch per fetch and some get 2 record batches per fetch.
Let’s increase the batch size. To do this we’ll drop to 1 producer, and set the linger.ms to 10 to reach the default batch.size of 16KB batches.
./dimster run localBeefy --modify --set producersPerTopic=1 kafkaConfig.producerConfig.linger.ms=10
We see that the batch size has risen to the default of 16KB, or 32 records per batch. The consumers should now, on average, receive 32 records per fetch (2 above the max.poll.records).
Fig 4. The record batches sent by the producers increase from 5.5 KB to 16 KB
The coordinator output shows that the consumers are still coping, as expected. With 500b records, the number of records returned per fetch will be 32 which is close enough to the max.poll.records of 30 to not impact consumption.
Now let’s double batch.size to 32786. From a separate terminal window to the coordinator output, we’ll run the following:
./dimster run localBeefy --modify --set kafkaConfig.producerConfig.batch.size=32768
We see the batch size increase again in the dashboard.
Fig 5. The record batches sent by the producers increase from 5.5 KB to 16 KB to 32 KB
The coordinator output shows that the consumers are no longer keeping up! Only managing 37K msg/s with a fast growing backlog.
How to fix this?
The problem is that each partition has an inflight budget of 2000 records and each record batch contains 64 records. That allows up to 31 effective consumers per partition (2000 / 64), leaving 21 consumers starved at any point in time. This explains the 37K msgs/s:
Effective consumers per partition x per consumer rate x number of partitions = consumption rate
--> 31 x 200 x 6 = 37200
We can fix this problem in three ways:
Set
batch.size=16384in the producer.Increase
group.share.partition.max.record.locksto create a larger inflight budgetUse
share.acquire.mode=record_limit.
We already know the default 16KB batch size is ok. Let’s first increase the inflight budget.
Fix 1 - Increasing group.share.partition.max.record.locks
We’ll double the budget and see what happens. First we’ll stop the producers and remove the processing time on the consumers to drain the backlog.
./dimster run localBeefy --modify --set producerRate=0 consumerProcMsMin=0 consumerProcMsMax=0
Next we need to update the broker config and restart the brokers. In run/broker-configs/default.yaml we add:
group.share.partition.max.record.locks: 4000
Then we’ll redeploy Kafka (again from a separate terminal window).
Now we’ll start the producers again and apply the 5 ms processing time to the consumers.
./dimster run localBeefy --modify --set producerRate=60000 consumerProcMsMin=5 consumerProcMsMax=5
We’re in business! The consumers are now coping with the larger batch sizes with this increased inflight budget.
Fix 2 - share.acquire.mode=record_limit
This time we’ll try record_limit.
First let’s walk back that inflight budget change by 1) stopping the producers, 2) commenting out the added line to our broker config, 3) redeploying Kafka.
While the producers are still stopped, I’ll change the consumers to use record_limit:
./dimster run localBeefy --modify --set kafkaConfig.shareConsumerConfig.share.acquire.mode=record_limit
Then start the producers again:
./dimster run localBeefy --modify --set producerRate 60000
In the coordinator, we see that the consumers are now coping with the 60K msg/s.
The reason that record_limit allows the consumers to keep up, despite the larger record batches, is that each consumer is only allocated a max of 30 records per fetch, even though each batch contains 64 records. However, each batch is now being delivered three times as 30 doesn’t align well with 64. We can see this in the Kafka client metrics.
Fig 6. On the left, with the larger inflight budget and batch_optimized. The middle was when we stopped the producers to restart Kafka with the original inflight budget. The right is with record_limit and each batch being sent three times.
We could make this more efficient if we increase max.poll.records to 32 to align with the 64 record batches. If I simply change the max.poll.records to 32, we don’t see much of an improvement as most offset ranges of 32 records will touch two batches. But if we stop the producers, ensure there is no backlog at all then set max.poll.records, the fetches will be perfectly aligned.
Fig 6. On the left, with unaligned fetches with max.poll.records=32 (each batch delivered 3 times). Right: aligned fetches with max.poll.records=32 (each batch delivered 2 times).
The lessons of this post
Let’s not over-index on this one case. The purpose of this post was to explain the underlying mechanics and back that up with some empirical benchmarks, sticking with the same workload example as the last post.
What we’ve learned:
Consumer parallelism is impacted by more than just consumer count and
max.poll.records. It is also impacted by:Record batch sizes (determined by the producers)
The inflight budget (
group.share.partition.max.record.locks)The share consumer config
share.acquire.modeRecord acquisition is along batch boundaries with
batch_optimized, and record ranges withrecord_limit.Record batches are the unit of delivery, so
record_limitcan cause consumer network bandwidth to increase because fetches likely will not align on batch boundaries causing batches to be delivered at least twice (more ifmax.poll.recordsis much smaller than the average number of records per batch).
In the next post we’re going to look a bit closer at record_limit.
ps: you can run this whole scenario with two terminal windows:
Window 1 - kick off the benchmark (using the workload yaml described in the post)
./dimster run localBeefy -w run/workloads/run-test/run-sharegroup-proc-time-5ms.yaml
Window 2 - wait a few minutes then run the following bash script:
Happy testing!