Kora - Serverless Kafka - ASDS Chapter 2

The Architecture of Serverless Data Systems introduction chapter.

This is the second instalment of the Architecture of Serverless Data Systems series. In the first chapter, I covered DynamoDB, and I will refer back to it where comparison and contrast are interesting.

Kora is the multi-tenant serverless Kafka engine inside Confluent Cloud. It was designed to offer virtual Kafka clusters on top of shared physical clusters, based on a heavily modified version of Apache Kafka. Today, as little as 20% of the code is shared with the open-source version as the demands of large-scale multi-tenant systems diverge from the needs of single-tenant clusters.

The goals of Kora were to avoid stamping cookie-cutter single-tenant Kafka clusters which would miss out on the economic and reliability benefits of large-scale multi-tenancy. This architecture is evolving fast, and a year or two from now, this description will likely be stale as we continue to disaggregate the architecture from the original Kafka monolith.

Architecture overview

The Kafka client protocol (which I will refer to as the Kafka API) is basically a storage API of a sequential nature. This aspect of the workload is beneficial in many ways for building out a multi-tenant system with good tenant isolation. As I described in the DynamoDB deep-dive, low variability per-request workloads can be easier to build around, and the sequential pattern of access provides additional benefits. However, the API also presents some challenges, such as being a session-based protocol with persistent TCP connections to brokers and the fact that partitions are exposed to clients. These aspects of the API add challenges to building a multi-tenant system around it.

The architecture of Kora is one of a light proxy layer over a fast fault-tolerant cache that offloads data asynchronously to cloud object storage as the primary data store. As I described in the series introduction, cloud object storage presents both a large opportunity for reducing storage costs but also comes with the significant downside of high request latency and an economic model that punishes small requests (making it even harder to build systems with even moderate latency requirements).

Fig 1. Choices for integrating (or not) cheap, durable cloud object storage.

The reason that Kora has been able to integrate object storage without paying the latency cost is due to the sequential nature of its workload. DynamoDB chose not to use S3 as a primary data store due to the combination of a random access workload and predictable low-latency performance being a core tenet of the product. Any table row can be read at any time and even sophisticated caching would not prevent some requests from getting blocked behind a much slower synchronous S3 request.

Kora does not have this problem for a number of reasons:

  • Produce latency is not impacted by object storage because a synchronous write only hits the fast fault-tolerant cache layer.

  • Low end-to-end latency is achieved because recent data is always stored in the fast fault-tolerant cache layer and can be served either from memory or block storage (SSD).

  • Historical data (written in the region 15-30 minutes previously) is likely to be found only in object storage. Historical reads are still sequential in nature, and therefore, prefetching is an extremely effective technique to avoid fetch requests being forced to wait for higher latency object storage requests.

  • The fast fault-tolerant cache allows for the asynchronous offloading of data in large data batches, which is in line with the economic model of cloud object storage APIs (fewer larger requests over more numerous small requests).

This fault-tolerant cache layer is composed of a large number of Kora brokers (based originally on Apache Kafka brokers) which replicate writes across multiple brokers (and availability zones) before acknowledging back to the client.

Fig 2. Kora brokers act as a fault-tolerant cache in front of cloud object storage as the primary data store for streams.

Just like with DynamoDB, the unit of distribution is the replicated partition, and each partition has a leader replica and multiple follower replicas. The Kafka replication protocol, which Kora also implements, is a leader-follower protocol where all writes must go through the leader but reads can be serviced by any replica if the client has configured that (known as follower fetching).

Logical clusters over physical clusters

Kora offers logical clusters (known as LKCs), which are serviced by larger physical clusters (PKCs). I will use the terms tenant and LKC interchangeably.

Fig 3. The separation of components into disaggregated proxy, compute and storage layers.

Tenant isolation is logical, with various strategies employed to avoid tenant interference. Kora ensures security isolation of LKCs by integrating a namespacing mechanism with authentication via API keys. Every cluster resource, including topics, consumer groups, and ACLs, is annotated with a unique logical cluster-ID, serving as a distinct identifier for each LKC. To seamlessly incorporate this namespacing for clients accessing a logical cluster, an interceptor within the broker dynamically annotates requests with the logical cluster-ID, linking it to a client connection during the authentication process.

Fig 4. Request interception to annotate resources with LKC ids.

Additionally, data is not co-located in shared files on disk or in cloud object storage, which prevents data corruption from causing reads to cross tenant boundaries.

Performance isolation is covered in more detail in the multi-tenancy section.

The Kafka workload - topics and partitions

Topics are distributed, partitioned, and replicated streams of records. As with all data systems that scale beyond a single server, sharding is used to split one large thing into many smaller things that can be distributed over a group of servers.

One area that the Kafka API diverges from DynamoDB (beyond streaming vs a KV API) is that this partitioning is exposed to the clients. This is also a feature of other open-source and proprietary event streaming systems (such as Apache Pulsar and Amazon Kinesis).

Fig 5. The Kafka API includes partitions (which can be abstracted by client libraries).

Producers must discover the set of partitions of the topic and which servers host those partitions. The producer uses a partitioning strategy to distribute writes across those servers, with strategies such as hash-based partitioning, round-robin, and sticky partitions. Consumers must register themselves as consumers of a given topic and go through a coordination process where each gets assigned a disjoint subset of the partitions to fetch from. This means that producers and consumers open persistent connections to the servers that host the partitions they wish to write/read from.

This immediately presents a problem, as distributing the load of a topic’s partitions across a large number of brokers can result in large numbers of persistent connections. These high connection counts are inefficient for both the clients and the brokers. This problem is resolved in Kora as brokers are deployed in a cellular architecture where each LKC is deployed to a single cell which results in a reduced number of brokers per LKC. The partitions of any given topic are therefore spread across a smaller number of brokers (those of a single cell), resulting in lower connection counts.

A single PKC can scale linearly with the number of cells as inter-broker replication traffic is limited to brokers within the same cell. The metadata component lives outside of these cells and can be scaled separately to the brokers.

Fig 6. Cells are independent of each other.

Metadata management

PKC (and LKC) metadata is durably stored in a fault-tolerant Raft-based metadata component called KRaft which Kora inherits from Apache Kafka. The “controller” is the single-threaded state machine that sits on top of the replicated Raft log. In Apache Kafka, the controller is mostly busy with coordinating the creation/deletion of topics, partition leader elections, and balancing partition leadership. Kora extends the controller to include additional roles, such as coordinating distributed rate limiting based on load distribution across the cluster.

KRaft is deployed as a separate cluster from that of the brokers. The brokers themselves act as Paxos learners, fetching committed records of the metadata Raft log and actuating the commands locally (such as creating a topic partition or a replica assuming leadership).

Fig 7. KRaft controllers and Kora brokers.

Each Kora broker stores a sequentially consistent copy of the metadata locally. This reduces the burden on the KRaft controllers as broker restarts do not require the broker to synchronize all cluster resource metadata again as it did with ZooKeeper.

Multi-tenancy

Over-subscription of tenants is key to achieving high resource utilization in multi-tenant systems. This presents the challenge that any given broker could get overloaded if multiple tenants all increased their load at the same time. Load which is focused on a small number of brokers (hotspots) or load peaks that concentrate load over a short time period, both present challenges.

Kora employs data placement and data movement to minimize hotspots in localized areas of a PKC that could result in throttling. Finally, some throttling may be required to prevent one tenant from using too many resources of a given broker. We’ll look at these strategies now.

Distributed Rate-limiting

Ideally, load across partitions would be even, partitions would be perfectly distributed across brokers, and there would be no load spikes. But none of these are true; load can skew across partitions, and tenants can experience transient load spikes on top of more gradual load patterns such as diurnal or seasonal changes in load. While Kora uses data rebalancing to avoid hotspots (which we’ll cover further down), data movement alone isn’t enough. Tenants can try to use more than their allotted quotas, load spikes may transiently exceed quotas, and multiple tenants on the same broker may experience load spikes at the same time exceeding the resources of the underlying broker.

Such a system needs an effective throttling mechanism that:

  • Allows for transient load spikes without throttling but prevents aggregate tenant-level load from exceeding its tenant-level quota.

  • Prevents performance degradation caused by broker overload by enforcing broker-level quotas. High utilization can be ok, but when the load causes saturation, where various buffers in the software and OS get choked, throughput can drop significantly and latencies soar. Broker-level throttling prevents this ungraceful degradation.

Throttling can occur whenever a tenant exceeds its tenant-level quota or an underlying broker exceeds one of its own broker-level quotas.

Unnecessary throttling is throttling that occurs when the throughput of a given LKC is within the provisioned bounds, but a hotspot has caused localized throttling on one or two brokers. However, given that the Kafka protocol is based on persistent connections to the brokers that host the partition leaders, moving partitions between brokers is disruptive to clients. This creates a tension between even data distribution and client stability. Therefore a balance between the two competing concerns is needed. To work around this issue (and that of a constantly moving load distribution), the distributed rate-limiting mechanism is designed to accommodate a certain amount of unequal tenant load distribution. We’ll look at this mechanism now.

Dynamic tenant-level quotas

Throttling occurs in the Kora brokers based on dynamic quotas assigned by a centralized Shared Quota service. This service is responsible for calculating and disseminating broker-tenant bandwidth quota pairs, which change according to load distribution and broker-level load. Brokers periodically share per-tenant and per-broker bandwidth consumption and throttling information with the shared quota service. This service determines how much bandwidth to allocate to each broker-tenant pair based on its global view of consumption and broker load. Periodically the service disseminates the latest broker-tenant bandwidth quotas to the brokers, the brokers then enforce those quotas locally. As long as the aggregate load across tenants is relatively balanced, individual tenant skew can be accommodated without unnecessary throttling.

Fig 8. Dynamic quota system for distributed rate limiting.

Handling bursts

Transient bursts would get throttled if the tenant-broker quotas were hard limits. The dynamic quota design is reactive to changes in load distribution and, therefore, can act after throttling has already taken place. This is not ideal for customers as they would experience regular unnecessary throttling. To avoid this, the rate-limiting algorithm must allow for bursts and sudden load distribution changes that do not violate the aggregate tenant bandwidth limits or broker limits.

DynamoDB uses the Token Bucket algorithm to permit bursts as long as long-term throughput remains within the provisioned amount. Kora uses a different algorithm which known as lazy throttling.

The shared quota service has a global view of a tenant’s bandwidth consumption. When that consumption is below a configured proportion of its quota (such as using less than 80% of its maximum aggregate bandwidth), the service places no per-broker bandwidth quotas on that tenant. That tenant can spike throughput in the short term (though is still subject to broker-level throttling) until the shared quota service recalculates the quotas and decides that the tenant needs explicit per-broker quotas as it is nearing or has exceeded its aggregate quota. 

The SLO on per-tenant throttling is no more than 5 minutes of throttling per week (0.05%). With this lazy dynamic quota approach, 99.9% of tenants fall within this SLO.

Protecting brokers from overload

Transient load spikes and skewed load can cause a broker to receive more requests than it can handle while still providing low-latency operations. Broker load is not just a factor of bandwidth but the number of connections, the number of partitions, the connection rate, and so on. While any given streaming workload has relatively low variability in cost across its requests, there can be larger differences across workloads. Two workloads on opposite ends of the cost spectrum are “few clients, few partitions, large record batches” and “many clients, many partitions, tiny record batches”. 

Each broker monitors its load and throttles all tenants proportionally to their quotas before it reaches a stressful level of load. Additionally, brokers share this throttling information with the shared quota service so it can take into account per-broker load when it recalculates broker-tenant quota pairs.

Broker-level throttling should be temporary as this should either trigger a data rebalancing or even a cluster expansion of the PKC if the aggregate load requires it.

Data placement

If multiple partitions tend to experience similar load changes at the same time, we can say that their workloads are correlated. Ideally, we wouldn’t place those correlated partitions on the same brokers.

Correlated workloads across partitions tend to come in the order of topic, tenant, and region:

  • When the aggregate load of a topic increases, the load across each individual partition increases.

  • The workload across topics of the same tenant tends to be more correlated than the workload of other tenants.

  • Finally, region-based correlations exist, such as time of day and seasonal events. 

Ideally, Kora would place tenants, their topics, and partitions to maximize the number of uncorrelated workloads per broker, thus ensuring that the load peaks across tenants cancel each other out and the peak-to-average difference is low. However, there are also competing concerns, such as minimizing connection counts per broker which increase CPU load on the brokers.

Kora balances the need to spread out a tenant across many brokers with the need to reduce connection counts. Currently, Confluent Cloud supports logical clusters up to a throughput of 250 MB/s ingress and 750 MB/s egress, which covers the vast majority of workloads. These LKCs are placed in a single cell, which is large enough such that the cell can typically handle the aggregate load of its LKCs without resorting to too frequent throttling or data movement.

Due to this one-tenant per-cell limitation of the current LKC placement logic, larger workloads must be provisioned with dedicated hardware. A dedicated LKC is a logical cluster scheduled on a PKC that hosts no other LKCs. This dedicated PKC can scale the number of compute instances (and brokers) to the needs of the LKC (reaching many GB/s). These larger workloads are fewer in number and PKC elastic scaling mitigates the need for overprovisioning for peak loads. However, ideally, even the very large workloads would run in the multi-tenant infrastructure to benefit from the same cost savings and elasticity as the small to medium-sized workloads.

The solution to adding large workloads to the multi-tenant infrastructure is a more sophisticated LKC placement algorithm. In the future, LKCs will be able to expand across cells, maintaining a balance between connection counts, workload correlation, and tenant density per broker. Cells will continue to be self-contained, with no replication occurring across cells meaning that each partition must only be placed in a single cell. However, LKCs with a large number of topics will distribute those topics over multiple cells and very high throughput topics with potentially hundreds of partitions also could also span multiple cells.

Fig 9. LKCs spread over multiple cells such that any given partition remains in a single cell.

As aggregate load across the cluster increases, new cells can be added to the PKC, which results in linear increases in capacity and performance.

Heat management (load distribution)

Load distribution in distributed data systems is harder than for stateless systems, where a load balancer can route requests to nodes without limitations, as any node can process any request. Kora and DynamoDB share the trait that any given write can only be serviced by a single storage node out of the entire fleet. Partitions are spread over three Kora brokers and only one broker hosts the leader replica at a time. Reads also typically only go to the leader replica, though follower fetching can be enabled to allow consumers to read from follower replicas in their availability zone, spreading out load and reducing cross-AZ charges.

Hotspots can develop where a single broker can experience elevated traffic, causing throttling and higher latencies. Managing this heat proactively across a PKC, before throttling is required, is key to delivering predictable performance. 

Hotspots can develop for a number of reasons:

  • Correlated load changes.

  • Degraded brokers can result in partition leadership centralizing on a subset of brokers.

  • Unlucky combination of transient load spikes.

Kora includes a component called Self-Balancing-Clusters (SBC) which is based on Cruise Control (a widely used Kafka cluster management tool). SBC operates based on a prioritized list of goals with objectives like balancing disk usage, network throughput, and distribution of partitions across brokers. Ultimately, SBC generates a set of partition replica movements which the KRaft controller then puts into action by leveraging the Kafka replication protocol’s partition reassignment algorithm. This algorithm reconfigures a partition’s replicas while maintaining availability and durability.

Fig 10. Online reconfiguration of replicas, such as for replacing a degraded or dead broker; or for moving replicas across brokers.

Another component of Kora detects degraded disks, which unfortunately, are relatively common in cloud environments. Degraded disks not only cause performance issues on a specific broker but can cause partition leadership to concentrate on a few brokers over time. Actively detecting degraded disks and replacing those brokers is an important part of balancing load effectively.

The key to fast and efficient data movement is keeping as little state locally as possible. As I described at the top, Kora brokers form a high-performance fault-tolerant cache that sits in front of high latency, high durability, and low-cost cloud object storage. Because the Kora brokers are simply a durable caching and serving layer, they keep as small an amount of data locally as possible.

Kora brokers offload data to cloud object storage aggressively, balancing multiple needs:

  1. Brokers need to keep recent data in this cache for consumers (to ensure low end-to-end latency).

  2. The platform needs to reduce storage costs by only keeping a small amount of data on the relatively expensive storage drives.

  3. Per-partition data stored only in this caching layer must be as small as possible to make data movement fast and efficient.

Kora usually moves partitions from brokers with high load to brokers with low load within the same cell. However, as aggregate load across a cell grows, an entire LKC may need to be moved from one cell to another. The Kafka replication protocol supports arbitrary member changes where the original brokers and destination brokers are completely disjoint, similar to Raft’s joint consensus reconfiguration algorithm. Cross-cell movements are typically more disruptive, so there is a balance between tenant density per cell and latency impacts on cross-cell movement.

The plans to allow tenants to span multiple cells will reduce this tension as individual topics will be moved from one cell to another rather than the entire LKC.

Similarities and differences to DynamoDB

Kora and DynamoDB share some common traits with regard to multi-tenancy:

  1. They use the same resource-sharing model across all components: shared processes with logical isolation.

  2. High resource utilization is achieved by over-subscribing tenants, given that the aggregate load of all tenants falls far below the aggregate throughput quotas assigned to all tenants. 

  3. Any given write can only go to one node out of the entire fleet (as both use leader-based replication). 

  4. The unit of data distribution is the partition.

  5. Partitions can be migrated across storage nodes (called brokers in the case of Kora) to minimize hotspots.

However, there are also a few differences:

  1. DynamoDB avoids unnecessary throttling due to load skew by enforcing aggregate table throughput in the proxy layer (with its Global Admission Control) and moving data to avoid hotspots. Kora enforces aggregate throughput at the broker level but uses a lazy distributed rate-limiting design which allows for skewed load distribution without throttling. Kora also uses data movement to avoid hotspots which could cause throttling.

  2. DynamoDB, with its request-oriented protocol can move data transparently, whereas the Kafka API, with its persistent connections and exposed partitions, makes data movement more disruptive. Thus there is a balance between data balancing and client stability that must be taken into account.

Conclusions

Kora and Apache Kafka are evolving. There are some exciting features that are in the ideation phase for Apache Kafka that will also be implemented by Kora.

Splitting and merging partitions will help with heat management as skewed partition loads can be handled on the storage layer more effectively. Currently, the onus is on the producers to ensure that load is evenly distributed over partitions. Partition strategies based on random partitioning with some degree of stickiness work relatively well and key-based partitioning works as long as the key distribution is relatively uniform. However, being able to handle partition skew or even aggregate topic-level load changes by splitting and merging will be very useful.

Partition-less Kafka is also attractive as it would both simplify the client protocol and potentially make data movement less disruptive. It would also make splitting and merging transparent to clients, just as it is with DynamoDB. Sharding will always be necessary, so partitions are here to stay, but how these are exposed in the client protocol is another thing entirely.

Technology is changing, the capabilities of the cloud are changing, and with it, so too will the architecture of cloud data systems as they evolve to take advantage of improved networking speeds, better compute, and low-cost object storage.

Kora’s architecture is not static and a year to two from now, I will need to rewrite much of this deep-dive. For one, it is trending towards more and more disaggregation, enabled by modern networking speeds, a better understanding in the industry of how to build large-scale disaggregated systems, and more sophisticated abstractions offered by the major CSPs. Currently, Kora prioritizes the needs of low-latency event streams using an architecture based on a fault-tolerant cache (brokers) in front of cloud object storage. But not all streams are equal and Kora will continue to change to accommodate the needs of different workloads as well as improve the scalability, elasticity, and performance of its multi-tenant workloads.