All tests were executed against Kafka 4.2.0 using Dimster (and also validated against 4.3.0).
In the last post we measured the overhead that the mechanics of share groups adds, and saw that it is pretty small. Likewise we saw that raw throughput was also comparable to consumer groups and even saw it exceed consumer group throughput on one test.
In this post we’re going to simulate processing time in the consumers to make these benchmarks more realistic and show the utility of share groups (namely the ability to parallelize processing beyond the partition count).
We’ll see how the following two configurations play an important role in parallelizing consumption with share groups:
max.poll.records (consumer config)
group.share.partition.max.record.locks (broker-side config)
Theoretical max throughput
If we know the average processing time and the number of consumers, we can calculate the theoretical max throughput of a topic:
Consume throughput = per-consumer-rate * number of consumers
where:
Per-consumer-rate = 1000 / processing-time-ms
For example:
If we have 100 consumers, with an average processing time of 5 ms, then the maximum throughput will be 20,000 message/s.
If we have a topic which peaks at 60K message/s and our average processing time is 5 ms. We’ll need 300 consumers to handle that. If we use consumer groups we’ll also need 300 partitions.
> Of course we could do some fancy concurrent work in the consumer to parallelize the consumer work but that comes with some downsides, principally that consumer groups track a position in the log which doesn’t map well to concurrently processing multiple positions in the log simultaneously, should the consumer encounter problems, abruptly terminate or get reassigned partitions. The ParallelConsumer does some clever tricks to handle this.
With a share group we don’t need 300 partitions, we could have a handful of partitions with a group of 300 consumers and it should handle the load.
Let’s test it out with share group config defaults.
Test setup: 300 consumers and 5 ms processing time
Like last time, I’m going to ensure that load is even across the partitions so that load skew doesn’t pollute the results (I’ll be looking at load skew in a future post). See the last post for how I did that.
We’ll use Dimster’s live interaction feature to model the workload on the fly, to see the impact of changing configurations and consumer counts.
Fig 1. Dimster supports mutating the running workload.
The naïve run: 60K in, 4.8K out
The max theoretical throughput of 300 consumers with 5 ms processing time is 60K msg/s so we’ll start with producerRate: 60000 see what happens. Remember, Dimster uses named environments where commands take the format: dimster <command> <env>, my environment is called localBeefy (detailed in the last post).
./dimster run localBeefy -w run/workloads/run-test/ignore-run-sharegroup-proc-time-5m.yaml
Note: I prepend workload files with ignore to so they don’t appear as untracked files in Git.
The coordinator output shows:
Straight away we see that consumption is really low at 4800 msg/s, nowhere near 60K msg/s.
Let’s stop the producers and set processing time to 0 ms to allow the consumers to catch up so we can try again. From a separate terminal window:
./dimster run localBeefy --modify --set producerRate=0 consumerProcMsMin=0 consumerProcMsMax=0
==> Sending modify to coordinator pod: benchmark-coordinator-76b87b5887-cf9m8
{"status":"accepted","message":"2 actions applied"}
Viewing the coordinator output in the other terminal window, we wait until the backlog is drained, then slowly ramp up the producer rate to 50K msg/s.
Then from the live interaction terminal:
The coordinator output shows the consumers are now managing 50K msg/s. Much better than 4800 msg/s:
However, it soon degrades, dropping to 20K msg/s and eventually back down to 4800 msg/s.
Fig 2. The ramp up to 50k msg/s, followed by a reduction back down to 4800 msg/s
What on earth is going on? The metrics tell a story.
The max poll size (the number of records the consumer is returned after calling poll) is 500. Most calls to poll return few records, with p50 at 8. But the max tells us some return 500.
Fig 3. The number of records returned by consumer.poll()
The Kafka client metrics show that the average records per fetch response batch climbs toward 450.
Fig 4. The records per fetch start low and grow larger and larger
What we’re seeing is that most consumers aren’t getting very many records, but a tiny number are getting a lot. When the throughput was low and growing, the records per fetch were low (up to 10). But then the average records per batch started creeping up (while the consumption kept dropping) until the average fetch size was around 450 records. The average is high despite only few being full because most fetch requests sit idle until they can be serviced (by default up to 500 ms).
Fig 5. The average fetch latency creeps up and almost reaches the default fetch.max.wait.ms of 500.
It’s clear that the max.poll.records default of 500 is at play here.
Two regimes: Accidental fair-sharing vs greedy-capture
There is an interesting phenomenon here: at low producer rates, the broker does not have enough available records to fill each consumer’s max.poll.records, so each poll tends to return a small batch. Since many consumers are polling, processing, and acknowledging at roughly the same cadence, the available records get spread across the group. The result is an accidental fair-sharing regime: lots of consumers are active, each processing small batches, and aggregate throughput can approach the theoretical maximum.
But this regime is fragile. It is not guaranteed by the broker-side allocation policy. Once enough records are available to fill large polls, the greedy allocation behavior takes over. A small number of consumers can acquire large batches, occupying the partition’s inflight record budget while the rest of the consumers sit idle. We’ll call this the greedy-capture regime, as a few consumers greedily capture the inflight window.
This regime works as follows. The broker config group.share.partition.max.record.locks determines how many records can be locked/inflight per partition, and defaults to 2000. With the default max.poll.records of 500, a single consumer can acquire 25% of that budget. At 5 ms per record, that one batch takes 2.5 seconds to process. While those records are locked, other fetches may sit idle even as new records arrive.
This creates a feedback loop: larger batches consume more of the inflight budget, queued fetches wait longer, lag builds, and future fetches are more likely to be filled with large batches. Eventually the group collapses into the greedy-capture regime.
We see this in the behavior above. With an inflight budget of 2000 messages and large fetches of 500 records, we only have 4 effective consumers per partition at a time. Across 6 partitions we only have 24 effective consumers each able to process 200 messages a second, resulting in an aggregate 4800 msg/s (exactly the number we’ve seen).
Testing the stability of accidental fair-sharing
To test whether the fair-sharing state was actually stable, I tried ramping only to 30K msg/s and holding it there. I left it for ten minutes and it remained stable. Then I restarted the consumers.
./dimster run localBeefy —modify —-restart-consumers 300
Sure enough, throughput dropped back down to 4800 msg/s again.
Fig 6. Fair-sharing regime collapses after a consumer restart
Why go on about this accidental fair-sharing?
Because the system can appear healthy under a slowly changing throughput and a moderate load because it has entered accidental fair-sharing, despite a bad choice of max.poll.records. I imagine this could trip some people up. Consumption may look fine for a long time, but suddenly degrade causing some head scratching and stress!
Finding a better max.poll.records
The solution here is simple: reduce max.poll.records.
In theory we should carve up the inflight window between all consumers. So let’s take the configured group.share.partition.max.record.locks and divide it by the number of consumers per partition. In our case, we are using the default of 2000. With 50 consumers per partition, we should set max.poll.records to 2000/50 = 40.
First, let’s drain the backlog.
./dimster run localBeefy --modify --set producerRate=0 consumerProcMsMin=0 consumerProcMsMax=0
Once the backlog is drained, let’s set max.poll.records=40.
./dimster run localBeefy --modify --set kafkaConfig.shareConsumerConfig.max.poll.records=40
This causes all 300 consumers to restart with the new config.
Now we’ll attempt 60K msg/s with 5 ms processing time abruptly, no ramp up.
./dimster run localBeefy --modify --set producerRate=60000 consumerProcMsMin=5 consumerProcMsMax=5
The coordinator output shows:
We’re close, about 55K msg/s consumption, however this soon drops to 45K and remains stable there. It seems that 40 was still too high as it did not account for all the overhead of the fetch/response time, the timing of commits, etc.
After dropping max.poll.records to 30 and finally we hit 60K msg/s consumption! But the coordinator output shows that end-to-end latency is growing, little by little, it still isn’t quite keeping up.
Let’s add 2 more consumers per partition (300 -> 312 consumers).
./dimster run localBeefy --modify --set consumersPerGroup=312
The coordinator output now shows that end-to-end latency has dropped and stabilized.
At this point, the benchmark has a few minutes left. We can discard all the cumulative latency histograms so we record the last minutes with this stable configuration.
./dimster run localBeefy --modify --discard-stats
The final e2e latency distribution for 10 or so minutes with 312 consumers and max.poll.records=30 is:
Fig 7. End-to-end latency distribution of 60K msg/s, 5 ms processing time and 312 consumers
The rule of thumb for max.poll.records
Rules of thumb:
Set
max.poll.recordsby takinggroup.share.partition.max.record.locksand dividing it by the number of consumers per partition. Then set it somewhat lower to leave room for timing variance, uneven fetch timing, partition skew, and transient backlog.If you have very long processing time (over 1 second) you can even drop max.poll.records to 1 as the cost of a fetch is dwarfed by the processing time.
You can also try increasing the group.share.partition.max.record.locks (max of 10000) which will allow for a larger inflight budget and be more forgiving of a suboptimal max.poll.records.
Validation: Benchmarking with tuned max.poll.records
Now armed with a good rule-of-thumb, we’ll run two scenarios with Dimster’s explore limits mode, a benchmark mode for finding the highest sustainable throughput (see the last post for how it works):
Fig 8. All test points achieved 57,000 msg/s while staying under p75, 100 ms end-to-end latency.
All 5 workloads achieved 57K msg/s, just short of the max theoretical throughput (likely due to the latency constraint of explore mode). Adding some more consumers would be enough to reach 60K msg/s.
Next, with 1 ms processing time.
Fig 9. Share groups with 12+ partitions reached 95% of the theoretical max consumption throughput.
Share groups with 12, 30 and 60 partitions did best, reaching 95% of the max theoretical throughput. The reason 6 partitions fared a little worse is likely due to contention over the inflight window (6 * 2000 records). The higher partition tests had a larger window across the same number of consumers.
I expect the consumer groups could have gotten higher throughput, just not within the latency target of the test (100 ms, p75, based on the worst partition).
Final thoughts
First of all, I hope you see how useful live interaction using Dimster is! You can mutate a live workload to explore the impact of changing client configurations, producer rate, the number of producers and consumers, all on the fly. Once you have a topology you want to record stats for, clear the stats, set a new running time and get all the usual Dimster results.
You can download the results from this blog post:
Regarding Kafka, the important lesson is that share groups change the parallelism bottleneck. With consumer groups, it’s the partition count. With share groups, it’s easy to think it simply comes down to the number of consumers, but it’s a little more complicated than that. Parallelism is determined by the inflight budget and the size of fetch requests.
Setting max.poll.records carefully might seem obvious, but I think it could trip people up for a few reasons:
Defaults can come into play easily, especially for people without a lot of Kafka experience.
The greedy behavior is not necessarily obvious (especially in terms of message queues in general).
Synthetic benchmarks with 0 processing time will miss this (4 consumers per partition can handle whatever you throw at the partition). Only once you add processing time to a benchmark does the relationship between the inflight budget and fetch size become apparent.
This greedy algorithm makes max.poll.records a very important configuration for share groups and the default of 500 is arguably the wrong value for share groups. It would be nice for a future version of Kafka to offer an alternative which enforces fair-sharing broker-side. I’ve posted this sentiment to the dev mailing list.
Next: max.poll.records isn’t the only config that determines the size of consumer fetches! In the next post we’ll look at the role of the following (in terms of how they can affect consumer fetch sizes and therefore the parallelism of share groups):
Producer batch sizes.
Share group consumer config
share.acquire.mode:batch_optimized(default, used in this post)record_limit