I will be writing a series of blog posts about Apache Pulsar, including some Kafka vs Pulsar posts. First up though I will be running some chaos tests on a Pulsar cluster like I have done with RabbitMQ and Kafka to see what failure modes it has and its message loss scenarios.
I will try to do this by either exploiting design defects, implementation bugs or poor configuration on the part of the admin or developer.
In this post we’ll go through the Apache Pulsar design so that we can better design the failure scenarios. This post is not for people who want to understand how to use Apache Pulsar but who want to understand how it works. I have struggled to write a clear overview of its architecture in a way that is simple and easy to understand. I appreciate any feedback on this write-up.
The main claims that I am interested in are:
guarantees of no message loss (if recommended configuration applied and your whole data center doesn't burn to the ground)
strong ordering guarantees
predictable read and write latency
Apache Pulsar chooses consistency over availability as does its sister projects BookKeeper and ZooKeeper. Every effort is made to give strong consistency.
We'll be taking a look at Pulsar's design to see if those claims are valid. In the next post we'll put the implementation of that design to the test. I won’t cover geo-replication in this post, we’ll look at that another day, we’ll just focus on a single cluster.
Multiple layers of abstraction
Apache Pulsar has the high level concept of topics and subscriptions and at its lowest level data is stored in binary files which interleave data from multiple topics distributed across multiple servers. In between are a myriad of details and moving parts. I personally find it easier to understand the Pulsar architecture if I separate it out into different layers of abstraction, so that’s what I’ll do in this post.
Let's take a journey down the layers.
Layer 1 - Topics, Subscriptions and Cursors
This is not a post about messaging architectures that you can build with Apache Pulsar. We’ll just cover the basics of what topics, subscriptions and cursors are but not any depth about the wider messaging patterns that Pulsar enables.
Messages are stored in topics. A topic, logically, is a log structure with each message being at an offset. Apache Pulsar uses the term Cursor to describe the tracking of offsets. Producers send their messages to a given topic and Pulsar guarantees that once the message has been acknowledged it won’t be lost (bar some super bad catastrophe or poor configuration).
A consumer consumes messages from a topic via a subscription. A subscription is a logical entity that keeps track of the cursor (the current consumer offset) and also provides some extra guarantees depending on the subscription type:
Exclusive Subscription - Only one consumer can read the topic via the subscription at a time
Shared Subscription - Competing consumers can read the topic via the same subscription at the same time.
Fail-Over Subscription - Active/Backup pattern for consumers. If the active consumer dies, then the back up takes over. But there are never two active consumers at the same time.
One topic can have multiple attached subscriptions. The subscriptions do not contain the data, only meta-data and a cursor.
Pulsar provides both queueing and log semantics by allowing consumers to treat a Pulsar topic like a queue that deletes messages after being acknowledged by a consumer, or like a log where consumers can rewind their cursor if they want to. Underneath the storage model is the same - a log.
If no data retention policy is set on a topic (via its namespace) then messages are deleted once all cursors of attached subscriptions have passed its offset. That is, the message has been acknowledged on all subscriptions attached to that topic.
However, if a data retention policy exists that covers the topic, then messages are removed once they pass the policy boundary (size of topic, time in topic).
Messages can also be sent with an expiration. These messages are deleted if they exceed the TTL while still unacknowledged. This means that they can be deleted before any consumer gets the chance to read them. Expiration only applies to unacknowledged messages and therefore fits more into the queuing semantics side of things.
TTLs apply to each subscription separately, meaning that “deletion” is a logical deletion. The actual deletion will occur later according to what happens in other subscriptions and any data retention policy.
Consumers acknowledge their messages either one by one, or cumulatively. Cumulative acknowledgement will be better for throughput but introduces duplicate message processing after consumer failures. However, cumulative acknowledgement is not available for shared subscriptions as acknowledgements are based on the offset. However, the consumer API does allow for batched acknowledgements that will end up with the same number of acks but with fewer RPC calls. This can improve throughput for competing consumers on a shared subscription.
Finally there are partitioned topics similar to the topics of Kafka. The difference is that the partitions in Pulsar are also topics. Just like with kafka a producer can send messages round-robin, use a hashing algorithm or choose a partition explicitly.
That was a whirlwind introduction to the high-level concepts, we’ll now delve deeper. Remember this is not a primer on Apache Pulsar from 10,000 feet but a look at how it all works underneath from 1000 feet.
Layer 2 - Logical Storage Model
Now Apache BookKeeper enters the scene. I will talk about BookKeeper in the context of Apache Pulsar, though BookKeeper is a general purpose log storage solution.
First of all, BookKeeper stores data across a cluster of nodes. Each BookKeeper node is called a Bookie. Secondly, Apache Zookeeper is used by both Pulsar and BookKeeper for storing meta-data and monitoring node health.
A topic is in fact a stream of Ledgers. A Ledger is a log in its own right. So we compose a parent log (the topic) from a sequence of child logs (Ledgers).
Ledgers are appended to a topic, and entries (messages or groups of messages) are appended to Ledgers. Ledgers, once closed, are immutable. Ledgers are deleted as a unit, that is, we cannot delete individual entries but ledgers as a whole.
Ledgers themselves are also broken down into Fragments. Fragments are the smallest unit of distribution across a BookKeeper cluster (depending on your perspective, striping might invalidate that claim).
Topics are a Pulsar concept. Ledgers, Fragments and Entries are BookKeeper concepts, though Pulsar understand and works with ledgers and entries.
Each Ledger (consisting of one or more Fragments) can be replicated across multiple BookKeeper nodes (Bookies) for both redundancy and read performance. Each Fragment is replicated across a different set of Bookies (if enough Bookies exist).
Each Ledger has three key configurations:
Ensemble Size (E)
Write Quorum Size (Qw)
Ack Quorum Size (Qa)
These configurations are applied at the Topic level, which Pulsar then sets on the BookKeeper Ledgers/Fragments of the topic.
Note: "Ensemble" means the actual list of Bookies that will be written to. Ensemble size is an instruction to Pulsar to say how big an ensemble it should create. Note that you will need at least E Bookies available for writes. By default, bookies are picked up randomly from the list of available bookies (each Bookie registers itself in Zookeeper).
There's also the option to configure rack-awareness, by marking Bookies to belong to specific racks. A rack can be a logical construct (eg: an availability zone in a cloud environment). With a rack-aware policy, the BookKeeper client of the Pulsar broker will try to pick Bookies from different racks. It's also possible to plug in a custom policy to perform a different type of selection.
Ensemble Size (E) governs the size of the pool of Bookies available for that Ledger to be written to by Pulsar. Each Fragment may have a different ensemble, the broker will select a set of Bookies on creating the fragment, but the ensemble will always be the size indicated by E. There must be enough Bookies that are write available to cover E.
Write Quorum (Qw) is the number of actual Bookies that Pulsar will write an entry to. It can be equal to or smaller than E.
When Qw is smaller than E then we get striping which distributes reads/writes in such a way that each Bookie need only serve a subset of read/write requests. Striping can increase total throughput and lower latency.
Ack Quorum (Qa) is the number of Bookies that must acknowledge the write, for the Pulsar broker to send its acknowledgement to its client. In practice it would either be:
(Qa == Qw) or
(Qa == Qw -1) ---> This will improve latency by ignoring the slowest bookie.
Ultimately, every bookie must receive the write. But if we always wait for all bookies to respond we can get spikey latency and unappealing tail latencies. Pulsar promises predictable latencies after all.
A Ledger is created when it is a new topic or when roll-over occurs. Roll-over is the concept of creating a new Ledger when either:
a Ledger size or time limit has been reached
ownership (by a Pulsar broker) of a Ledger changes (more on that later).
A Fragment is created when:
a new Ledger is created
when a Bookie in the current Fragment ensemble returns an error or timesout when a write occurs.
When a bookie cannot serve a write then the Pulsar broker gets busy creating a new fragment and making sure the write gets acknowledged by Qw bookies. It’s like the Terminator, it won’t stop until that message is persisted.
Insight #1: Increase E to optimize for latency and throughput. Increase Qw for redundancy at the cost of write throughput. Increase Qa to increase the durability of acknowledged writes at the increased risk of extra latency and longer tail latencies.
Insight #2: E and Qw are not a list of Bookies. They simply indicate how large the pool of Bookies that can serve a given Ledger is. Pulsar will use E and Qw in the moment that it creates a new Ledger or Fragment. Each Fragment has a fixed set of Bookies in its ensemble that will never change.
Insight #3: Adding new Bookies does not mean manual rebalancing needs to be performed. Automatically, those new Bookies will be candidates for new Fragments. After joining the cluster, new Bookies will be written to immediately upon new fragments/ledgers being created. Each Fragment can be stored on a different subset of Bookies in the cluster! We do not couple Topics or Ledgers to a given Bookie or set of Bookies.
Let’s stop and take stock. This is a very different and more complex model to Kafka. With Kafka each partition replica is stored in its entirety on a single broker. The partition replica is comprised of a series of segment and index files. This blog post nicely describes it.
The great thing about the Kafka model is that it is simple and fast. All reads and writes are sequential. The bad thing is that a single broker must have enough storage to cope with that replica, so very large replicas can force you to have very large disks. The second downside is that rebalancing partitions when you grow your cluster becomes necessary. This can be painful and requires good planning and execution to pull if off without any hitches.
Returning to the Pulsar + BookKeeper model. The data of a given topic is spread across multiple Bookies. The topic has been split into Ledgers and the Ledgers into Fragments and with striping, into calculatable subsets of fragment ensembles. When you need to grow your cluster, just add more Bookies and they’ll start getting written to when new fragments are created. No more Kafka-style rebalancing required. However, reads and writes now have to jump around a bit between Bookies. We’ll see how Pulsar manages this and does it fast further down this post.
But now each Pulsar broker needs to keep track of the Ledgers and Fragments that each Topic is comprised of. This meta-data is stored in ZooKeeper and if you lose that then you’re in serious trouble.
In the storage layer we've written a topic evenly across a BookKeeper cluster. We've avoided the pitfalls of coupling Topic replicas to specific nodes. Where Kafka topics are like sticks of Toblerone, our Pulsar topics are like a gas expanding to fill the available space. This avoids painful rebalancing.
Layer 2 - Pulsar Brokers and Topic Ownership
Also in Layer 2 of my abstraction layers we have the Pulsar Brokers. Pulsar brokers have no persistent state that cannot be lost. They are separated from the storage layer. A BookKeeper cluster by itself does not perform replication, each Bookie is just a follower that is told what to do by a leader - the leader being a Pulsar broker. Each topic is owned by a single Pulsar broker. That broker serves all reads and writes of that topic.
When a Pulsar broker receives a write, it will perform that write against the ensemble of the current Fragment of that Topic. Remember that if no striping occurs the ensemble of each entry is the same as the fragment ensemble. If striping occurs then each entry has its own ensemble which is a subset of the fragment ensemble.
In a normal situation there will be a single Fragment in the current Ledger. Once Qa brokers have acknowledged the write the Pulsar broker will send an acknowledgement to the producer client.
An acknowledgement can only be sent if all prior messages have also been Qa acknowledged. If for a given message, a Bookie responds with an error or does not respond at all, then the broker will create a new Fragment on a new ensemble of Bookies (that does not include the problem Bookie).
Note that the broker will only wait for Qa acks from the bookies.
Reads also go through the owner. The broker, being the singular entrypoint for a given topic, knows up to which offset has been safely persisted to BookKeeper. It needs only read from a single Bookie to serve a read. We’ll see in Layer 3 how it uses caching to serve many reads from its in-memory cache rather than sending reads to BookKeeper.
Pulsar Broker health is monitored by ZooKeeper. When a broker fails or becomes unavailable (to ZooKeeper) an ownership change occurs. A new broker becomes the topic owner and all clients are now directed to read/write to this new broker.
BookKeeper has a critically important functionality called Fencing. Fencing allows BookKeeper to guarantee that only one writer (Pulsar broker) can be writing to a ledger.
It works as follows:
The current Pulsar broker (B1) that has ownership of topic X is deemed dead or unavailable (via ZooKeeper).
Another broker (B2) updates the state of the current ledger of topic X to IN_RECOVERY from OPEN.
B2 sends a fence message to all bookies of the current fragment of the ledger and waits for (Qw-Qa)+1 responses. Once this number of responses is received the ledger is now fenced. The old broker if it is in fact still alive, can no longer make writes as it will not be able to get Qa acknowledgements (due to fencing exception responses).
B2 then requests from each bookie in the fragment ensemble what their the last acknowledged entry is. It takes the most recent entry id and then starts reading forward from that point. It ensures that all entries from that point on (which may not have been previously acknowledged to the Pulsar broker) get replicated to Qw bookies. Once B2 cannot read and replicate any more entries, the ledger is fully recovered.
B2 changes the state of the ledger to CLOSED
B2 can now accept writes and opens a new ledger.
The great thing about this architecture is that by making the leaders (the Pulsar brokers) have no state, split-brain is trivally taken care of by BookKeeper's fencing functionality. There is no split-brain, no divergence, no data loss.
Layer 2 - Cursor Tracking
Each subscription stores a cursor. The cursor is the current offset in the log. Subscriptions store their cursor in BookKeeper in ledgers. This makes cursor tracking scalable just like topics.
Layer 3 - Bookie Storage
Ledgers and Fragments are logical constructs which are maintained and tracked in ZooKeeper. Physically, the data is not stored in files that correspond to Ledgers and Fragments. The actual implementation of storage in BookKeeper is pluggable and Pulsar uses a storage implementation called DbLedgerStorage by default.
When a write to a Bookie occurs, first that message is written to a journal file. This is a write-ahead log (WAL) and it helps BookKeeper avoid data loss in the event of a failure. It is the same mechanism by which relational databases achieve their durability guarantees.
The write is also made to the Write Cache. The Write Cache accumulates writes and periodically sorts and flushes them to disk in Entry Log files. Writes are sorted so that entries of the same ledger are placed together which improves read performance. If the entries are written in strict temporal order then reads will not benefit from a sequential layout on disk. By aggregating and sorting we achieve temporal ordering at the ledger level which is what we care about.
The Write Cache also writes the entries to RocksDB which stores an index of the location of each entry. It simply maps (ledgerId, entryId) to (entryLogId, offset in the file).
Reads hit the Write Cache first as the write cache has the latest messages. If there is a cache miss then it hits the Read Cache. If there is a second cache-miss then the Read Cache looks up the location of the requested entry in RocksDB and then reads that entry in the correct Entry Log file. It performs a read-ahead and updates the Read Cache so that following requests are more likely to get a cache hit. These two layers of caching mean that reads are generally served from memory.
BookKeeper allows you to isolate disk IO from reads and writes. Writes are all written sequentially to the Journal file that can be stored on a dedicated disk and are committed in groups for even greater throughput. After that no other disk IO is synchronous from the point of view of the writer. Data is just written to memory buffers.
Asynchronously on background threads, the Write Cache performs bulk writes to Entry Log files and RocksDB, which typically run a their own shared disk. So one disk for synchronous writes (journal file) and another disk for asynchronous optimized writes and all reads.
On the read-side, readers are served from either the Read Cache or from the Log Entry files and RocksDB.
Also take into account that writes can saturate the ingress network bandwidth and reads can saturate the egress network bandwidth, but they do not affect each other.
This elegantly isolated reads from writes at a disk and network level.
Layer 3 - Pulsar Broker Caching
Each topic has a single broker that acts as owner. All reads and writes go through that broker. This provides many benefits.
Firstly, the broker can cache the log tail in memory meaning that the broker can serve tailing readers itself without the need for BookKeeper. This avoids paying the cost of a network round-trip and a possible disk read on a Bookie.
The broker is also aware of the id of the Last Add Confirmed entry. It can track which message is the last safely persisted message.
When the broker does not have the message in its cache it will request the data from one Bookie in the ensemble of the Fragment of that message. This means that the difference in read serving performance between tail readers and catch-up readers is large. Tail readers can be served from memory on the Pulsar broker whereas a catch-up reader may have to incur the cost of an extra network round trip and multiple disk reads if neither the Write nor Read Cache have the data.
So we’ve covered from a high level the logical and physical representation of messages, as well as the different actors in a Pulsar cluster and their relationships with each other. There is plenty of detail that has not been covered but we’ll leave that as an exercise for another day.
Next up we’ll cover how an Apache Pulsar cluster ensures that messages are sufficiently replicated after node failures.
When a bookie fails, all the ledgers that have fragments on that bookie are now under replicated. Recovery is the process of "rereplicating" fragments to ensure the replication factor (Qw) is maintained for each ledger.
There are two types of recovery: manual or automatic. The rereplication protocol is the same for both, but Automatic Recovery uses an in-built failed node detection mechanism that registers rereplication tasks to be performed. The manual process requires manual intervention.
We'll focus on the Auto Recovery mode.
Auto Recovery can be run from a dedicated set of servers or hosted on the Bookies, in the AutoRecoveryMain process. One of the auto-recovery processes gets elected as Auditor. The role of the Auditor is to detect downed bookies and then:
Read the full ledger list from ZK and find the ledgers hosted on the failed bookie.
For each ledger it will create a rereplication task in the /underreplicated znode in ZooKeeper.
If the Auditor node fails then another node gets promoted as the Auditor. The Auditor is a thread in the AutoRecoveryMain process.
The AutoRecoveryMain process also has a thread that runs a Replication Task Worker. Each worker watches the /underreplicated znode for tasks.
On seeing a task it will try and lock it. If it is not able to acquire the lock, it will move onto the next task.
If it does manage to acquire a lock it then:
Scans the ledger for fragments which its local bookie is not a member of
For each matching fragment, it replicates the data from another bookie to its own bookie, updates ZooKeeper with the new ensemble and the fragment is marked as fully replicated.
If the ledger has remaining underreplicated fragments then the lock is released. If all fragments are all fully replicated the task is deleted from /underreplicated.
If a fragment does not have an end entry id then the replication task waits and checks again, if the fragment still has no end entry id it fences the ledger before rereplicating the fragment.
Therefore, with Auto Recovery mode, a Pulsar cluster is able to fully the details of replication to ensure the correct replication factor for each ledger. The admin must just ensure that the right amount of bookies are deployed.
ZooKeeper is required by both Pulsar and BookKeeper. If a Pulsar node loses visibility of all ZooKeeper nodes then it stops accepting read and writes and restarts itself. This is as a precaution to ensure that the cluster cannot enter an inconsistent state.
This does mean that if ZooKeeper goes down, everything becomes unavailable and that all Pulsar node caches will be wiped. Therefore upon resumption of service there could in theory be a latency spike due to all reads going to BookKeeper.
Each topic has an owner broker
Each topic is logically broken down into Ledgers, Fragments and Entries
Fragments are distributed across the bookie cluster. There is no coupling of a given topic to a given bookie(s).
Fragments can be striped across multiple bookies.
When a Pulsar broker fails, ownership of the topics of that broker fail-over to another broker. Fencing avoids two brokers that might believe themselves the owner from actually writing to the current topic ledger at the same time.
When a bookie fails, auto recovery (if enabled) will automatically perform “rereplication” of the data to other bookies. If disabled, a manual process can be initiated
Brokers cache the log tail allowing them to serve tailing readers very efficiently
Bookies use a journal to provide guarantees on failure. The journal can be used to recover data not yet written to Entry Log files at the time of the failure.
Entries of all topics are interleaved in Entry Log files. A lookup index is kept in RocksDB.
Bookies serve reads as follows: Write Cache -> Read Cache -> Log Entry files
Bookies can isolate reads from writes IO via separate disks for journal files, log entry files and RocksDB.
ZooKeeper stores all meta-data for both Pulsar and BookKeeper. If ZooKeeper is unavailable Pulsar is unavailable.
Storage can be scaled out separately to the Pulsar brokers. If storage is the bottleneck then simply add more bookies and they will start taking on load without the need for rebalancing.
Some Initial Thoughts on Potential Data Loss
Let’s look at the RabbitMQ and Kafka acknowledged write message loss scenarios and see if they apply to Pulsar.
RabbitMQ split-brain with either Ignore or Autoheal mode.
The losing side of the partition loses any messages delivered since the partition began that were not consumed.
Split-brain on the storage layer is not theoretically possible with Apache Pulsar.
Apache Kafka, acks=1 and broker with leader replica dies.
Fail-over to a follower in the ISR occurs with potential message loss as an ack was sent once the leader had persisted the message but potentially before the follower was able to fetch it.
Apache Pulsar has no leader storage node. Given a replication factor (Qw) of 2 or more, there simply is no way for a single node failure to cause message loss of an open ledger (closed ledgers could still lose messages, see further below).
Closed ledgers are both exposed and more plentiful. The auto-recovery process cannot repair a ledger if Qa bookies of its ensemble have been permanently lost. Therefore having a Qa of 1 is a dangerous setting. Open ledgers are not a problem as the broker will simply end up creating a new fragment and taking care of safely writing the message. But closed ledgers can only be repaired by bookies. So we can only guarantee no message loss of closed ledgers when permanently lost bookies < Qa. So think hard about your Qa setting.
Let’s consider some scenarios.
Scenario 1 (Closed ledger). E = 3, Qw =2, Qa = 2. The broker sends the write to two bookies. Bookie 1 and Bookie 2 return an ack to the broker who then sends an ack to its client. After a while the ledger is closed and a new one started. Now to produce message loss we’ll need both the Bookie 1 and Bookie 2 to fail. If any single bookie dies then the Auto Recovery protocol will kick in. (Note the use of Qa=2. Qa=1 could make the ledger unrecoverable).
Scenario 2 (Open ledger). E = 3, Qw =2, Qa = 1. The broker sends the write to two bookies. Bookie 1 returns an ack to the broker who then sends an ack to its client. Bookie 2 has not responded yet. Now to produce message loss we’ll need both the broker and Bookie 1 to fail and Bookie 2 to have no successfully made the write. If only Bookie 1 dies, then the broker will still end up writing the message to a second bookie in the end (in a new fragment).
The only way a failure of a single node could cause message loss is if the Qw is 1 (which means no redundancy) or for Qa to be 1 meaning auto-recovery cannot work. So if you want to avoid message loss, make sure you have redundancy (Qw >= 2 and Qa >= 2) .
Apache Kafka. Node with leader partition is isolated from ZooKeeper.
This causes short-term split-brain in Kafka.
With acks=1, the leader will continue to accept writes until it realizes it cannot talk to ZooKeeper at which point it will stop accepting writes. Meanwhile a follower got promoted to leader. Any messages persisted to the original leader during that time are lost when the original leader becomes a follower.
With acks=all, if the followers fall behind and get removed from the ISR, then the ISR consists only of the leader. Then the leader becomes isolated from ZooKeeper and continues to accept acks=all messages for a short while even after a follower got promoted to leader. The messages received in that short time window are lost when the leader becomes a follower.
Apache Pulsar cannot have split-brain of the storage layer. If the current broker owner gets isolated from ZooKeeper, or suffers a long GC, or its VM gets suspended, and another broker becomes the owner, then still only one broker can write to the topic. The new owner will fence off the ledger and prevent the original leader from making any writes that could get lost.
Apache Kafka. Acks=all with leader failure.
The followers fall behind and get removed from the ISR. Now the ISR consists of a single replica. The leader continues to ack messages even with acks=all. The leader dies. All unreplicated messages are lost.
Apache Pulsar uses a quorum based approach where this cannot happen. An ack can only be sent once Qa bookies have persisted the message to disk.
Apache kafka - Simultaneous Loss of Power (Data Center Outage)
Kafka will acknowledge a message once written to memory. It fsyncs to disk periodically. When a data center suffers a power loss, all servers could go offline at the same time. A message might only be in memory on all replicas. That message is now lost.
Apache Pulsar only acks a message once Qa bookies have acknowledged the message. A bookie only acknowledges an entry once it is persisted to its journal file on disk. Simultaneous power loss to all servers should not lose messages unless multiple disk failures also occur.
So far Apache Pulsar is looking pretty robust. We’ll have to see how it fares in the chaos testing.
There are more details that I have either missed out or don’t yet know about. Apache Pulsar is significantly more complicated than Apache Kafka in terms of its protocols and storage model.
The two stand-out features of a Pulsar cluster are:
Separation of brokers from storage, combined with BookKeepers fencing functionality, elegantly avoids split-brain scenarios that could provoke data loss.
Breaking topics into ledgers and fragments, and distributing those across a cluster allow Pulsar clusters to scale out with ease. New data automatically starts getting written to new bookies. No rebalancing is required.
Plus I haven’t even gotten to geo-replication and tiered storage which are also amazing features.
My feeling is that Pulsar and BookKeeper are part of the next generation of data streaming systems. Their protocols are well thought out and rather elegant. But with added complexity comes added risk of bugs. In the next post we’ll start chaos testing an Apache Pulsar cluster and see if we can identify weaknesses in the protocols, and any implementation bugs or anomalies.