Building A "Simple" Distributed System - The Implementation

In previous posts we’ve identified the requirements for our distributed resource allocation library, including one invariant (No Double Resource Access) that must hold 100% of the time no matter what and another (All Resources Evenly Accessed) that must hold after a successful rebalancing of resources. We documented a protocol that describes how nodes interact with a central registry to achieve the requirements, including how they deal with all conceived failure scenarios. Then we built a TLA+ specification and used the model checker to verify the designed protocol, identifying a defect in the process.

In this post we’ll tackle the implementation and in the next we’ll look at testing.

Thinking Before Coding

Everything up till now has been thinking before coding. There is one last bit of design work that we can choose to do before writing that first line of code.

I started my career in 2006 as a junior C++ developer in a small team building a stock trading platform. It would have been called a microservice architecture, as the system was built of multiple small services that formed a topology with Tibco RV as the underlying messaging layer.

We had rigorous standards as the cost of failure had a high cost. My mentor taught me how to think about complex, highly concurrent systems that needed to be highly robust. One practice we followed was that before we ever built a new service we would always design it first with diagrams and some descriptions. I’ve continued that practice when building complex concurrent systems and that is what we’ll do in this post.

Additionally, by diagramming it all out we can map the implementation design onto the high level protocol. Why go to all the effort of the protocol and TLA+ specification if we can’t easily compare the implementation to them?

In our protocol and TLA+ specification we talk about nodes and the registry. In our implementation we’ll be talking about Rebalanser clients and Apache ZooKeeper.

The Library API

Before diagramming out the internals, let’s define the interace that an end programmer is going to use when integrating Rebalanser into their application.

This is the C# API (which will pretty much identical to the Java one):

event EventHandler OnUnassignment;
event EventHandler<OnAssignmentArgs> OnAssignment;
event EventHandler<OnAbortedArgs> OnAborted;
Task StartAsync(string resourceGroup, ClientOptions clientOptions)
Task StopAsync()

It’s a simple API. Methods for starting the Rebalanser client in a particular group and stopping it. The ClientOptions class has properties for configuring auto-recovery. Then we have three event handlers for receiving notifications about resource assignment, unassignment and when a non-recoverable error has occurred. This is all that is needed in order to get consumer group functionality or single active consumer, active/back-up etc.

There is a fair amount behind it though which is where we start doing some further thinking and designing before writing a lot of code.

Apache ZooKeeper

I chose to use Apache ZooKeeper for the consensus/meta-data service (the registry). I chose it purely because I’ve used it so many times with Apache Kafka, (and recently also Apache Pulsar and Apache BookKeeper) but I had never built my own code against it. It was time to learn how to program against it for myself.

Apache ZooKeeper is a hierarchical data store that is often compared to a file system. A file system is made up of folders and files, the folders forming a hierarchy. ZooKeeper has znodes where each znode acts as both a folder and a file. In other words, znodes can store data and can have child znodes.

Znodes can be permanent or ephemeral. Ephemeral znodes only stick around while the session of the ZK client that created it is active. The ZK client library maintains a keep alive that lets ZooKeeper know it’s still alive. Once that keep alive stops getting received, because either the host application failed or due to loss of connectivity, the session expires and all ephemeral nodes of that client get removed. This is very useful for keeping the list of clients and for our barriers. We need that both clients and barriers get removed if the client fails or becomes unreachable. If not, then a failed client could block that resource forever and keep getting allocated resources in each rebalancing.

Ephemeral znodes can also be sequential, which means they get appended with a monotonic id. We’ll make all our client znodes ephemeral sequential with the format c_{id}. So for example, c_000000001, c_000000002 etc. Whichever client has the lowest id is the current leader.

ZooKeeper linearizes all changes into one global sequence and so every write gets its own monotonic id (Zxid). Additionally, each znode has a version number, and this version number gets incremented on every update to that znode. It also has a cversion number that increments every time a change is made to its children. We’ll be making use of these versioning capabilities to ensure correctness of some operations.

We can also list all children of a znode, in a single operation, which we’ll make use of to list the current nodes and resources.

Apache ZooKeeper has the concept of a watch. We can ask ZooKeeper to notify us if a znode is added, updated or removed. We can also request notifications if the children of a znode change. We’ll be using watches for all the “monitor” activities specified in our protocol.

The hierarchy of znodes:

   /{group name} (permanent)
      /clients (permanent)
         /c_000000000 (ephemeral sequential)
      /resources (permanent - data: stores allocations map)
         /{res id 1} (permanent)
         /{res id 2}
         /{res id n}
      /barriers (permanent)
         /{res id 1} (ephemeral)
         /{res id 2}
         /{res id n}
      /term (permanent)

Each Rebalanser group has its own znode with its children being clients, resources, barriers and term.

State Machine

The Rebalanser library is made up of a state machine that manages the establishment of a ZK session, assuming a role, shutting down on request and handling both recoverable and unrecoverable failures.

Fig 1. The state machine.

Leadership and Client Watches

When a client starts up, it starts a ZK session and gets an ephemeral sequential id. It requests the children of the clients znode and sees if it has the lowest id. If it has the lowest id it will become the leader, else it will become a follower.

Followers place a watch on the znode of the client with the next smallest id. So with three clients 0-2:

  • c_000000002 will place a watch on c_000000001

  • c_000000001 will place a watch on c_000000000 (the leader).

If c_000000000 shutsdown or its session expires, the c_000000000 ephermal node will be removed and c_000000001 will be notified. c_000000001 will see that it is the new leader and change role.

But if instead c_000000001 shutsdown or its session expires, then c_000000002 gets notified and it will see that it is does not have the smallest id and so puts a watch on the next lowest id - c_000000000.

By only watching the next lowest node, we avoid herd behaviour anytime a client znode is removed. There is only ever one next in line to the throne at any given moment.

The leader does not watch any specific client znode, instead it watches the term znode to see if it ever gets deposed. The lowest id method of choosing a leader should not ever allow the situation where we have two connected nodes believing they are the leader. However, out of an abundance of caution (because bugs happen), we use the term znode to detect this scenario should it ever happen.

When a client becomes a leader, it makes a write to the term znode (which increments its version) and places a data watch on it. If somehow another client thought it should be leader, it will increment the term version and the current leader will get notified of it. In this situation, the current leader will abdicate immediately.

A more likely scenario is that the current leader gets isolated from ZooKeeper and its session expires. On expiry, its znode will be removed and a follower will get notified and become the new leader. In this situation we have two leaders. The original leader does not know its session has expired, it only gets informed of its expired session on reconnection, which may be too late to act. The original leader cannot take any actions, such as a performing a rebalancing because it cannot talk to ZooKeeper.

It is still accessing its resources however and it needs to self expire so that it can stop accessing any resources before a new rebalancing takes place. In fact self-expiry was discussed in the previous post on formal verification and it affects both leaders and followers. If a client does not self-expire before ZooKeeper expires it then our No Double Resource Access invariant will be violated.

Leader State

A Rebalanser client spends most of its time in either the Leader or Follower role. Both roles have multiple concurrent activities to perform which can complicate things a bit. In order to reduce the complexity, the control logic is entirely single-threaded. Having single-threaded control logic simplifies the code.

We have the following concurrent activities:

  • Control logic, which exists as an event loop that other concurrent logic can send messages to via an in memory queue.

  • ZK events can occur at any time. There are connectivity events (connected, disconnected, expired etc) and watch events (node added, node modified, node deleted, children changed). The underlying ZK library that Rebalanser uses invokes an event handler on each event occurring, and the event handler then posts these as messages to the in memory queue as they occur.

  • The host application can request a controlled shutdown at any time.

  • A rebalancing can take anywhere from a couple of seconds to a few minutes. The event loop cannot wait for a rebalancing to complete as it may receive a shutdown request, or session expiry, or watch event in the meantime that it should respond to. So a rebalancing gets executed asynchronously. If the rebalancing fails it can post its status to the event loop via the queue.

All decision making is performed in the single-threaded event loop. It can decide to exit due to a failure, a loss of leadership or a request to shutdown. It can kick off rebalancings and abort in progress rebalancing.

Fig 2. Leader state

The event loop uses a timer to detect when it should self expire. The self expiry limit must be set to be far shorter then the ZooKeeper session timeout in order to be reliable. However, made too short it could introduce cluster instability. Whenever the event loop receives a Disconnected message it starts the timer. A Connected message stops and resets the timer. If the timer reaches the configured self expiry limit then it exits the loop and returns an error code to the parent state machine.

Other failures that are anticipated and handled are:

  • certain failures reading and writing to znodes. If a znode that should exist does not, or any other type of discrepancy occurs, the rebalancing logic posts a Potential Inconsistent State message and the event loops exits with that code. The state machine either reverts to no role (closing its ZK session) or closes if the application has configured no auto-recovery.

  • The session expires (this should not happen as we should self expire first, however, perhaps a long GC or some other kind of pause (pausing the VM) could make this happen.

  • Fatal error: the OnUnassignment or OnAssignment event handler, which contains application code written by a developer who uses the library, could throw an uncontrolled exception. In this case Rebalanser performs a controlled shutdown and invokes the OnAborted event handler.

Follower State

Like the leader state, we have the same concurrent activities, with an event loop as the control logic.

Fig 3. Follower State

The leader and follower states are very similar. The main differences are:

  • The follower has a data watch on the resources znode and a watch on the next smallest sibling node. The resources znode contains the resource allocation map.

  • Rebalancing involves reading the allocations map from the resources znode rather writing to it.

Short Note on the Resources znodes and children

An administrator registers resources as separate znodes as children of the resources znode. So when an admin wants to add a new resource to a group, they simply add a new znode. This triggers the leader’s child watch on the resources znode and a rebalancing commences.

The leader writes the resource allocations as the data payload of the resources znode. This triggers the data watch set by all followers, which triggers their part of the rebalancing.

This structure allows adminstrators to make changes that only the leader reacts to, who goes on to make changes that only followers react to.

Cancellation and Short Disconnections

So far we’ve seen how the leader and follower states manage the various events and actions that can occur. We’ve also seen the main state machine that governs the overall state of the system. This maps onto our protocol nicely.

But there are a couple more details that we should cover. The first is that no matter what the library is doing, it should be cancellable at anytime. A cancellation is ordered by the application by callings its Stop function. The cancellation needs to reach all executing code and be reacted to quickly.

A second requirement is that the library handle short-term disconnections without executing the revert to no role logic. That means we need to either perform retries or pause during short disconnections. But either of these techniques needs to be cancellable.

In order to reduce complexity, all calls to ZooKeeper go through a layer that handles retries for us.

Fig 4. Close propagates cancellations through the code. All ZooKeeper operations perform retries which are cancellable.

Language Specific Notes

The diagrams and descriptions so far are fairly detailed but they are not so specific that they are coupled to a specific language. That said, all my career has been spent in OO languages such as C#, Java, C++ and Python. So I may see the world through that lense.

The decision to use Apache ZooKeeper is most definitely part of the implementation design however. Etcd and Consul have similar or alternative concepts to Apache ZooKeeper that are close enough that the design would not undergo too great a change if we decide later to implement Rebalanser with them instead. We could also use a relational store but the design would have to deviate much further for that to work.

We could implement the above in Java, C++, C# or Python and the design should be compatible. We just need to be able to kick off code asynchronously, in a thread or some language specific alternative, and to be able to pass some kind of cancellation/completion signal down through the layers.

My own implementation is in C# where I use a separate Task for the state machine, the role and each rebalancing. Cancellation is handled by passing cancellation tokens and checking if the token has been cancelled before executing the different operations. Once the root cancellation token has been cancelled by the application invoking the StopAsync method, a chain of actions occur where all concurrent Tasks cleanly shutdown their operations.

Final Thoughts

Diagramming out my design makes sense to me, it has multiple benefits:

  • I can visually compare my implementation to my high level protocol to make sure that they are equivalent. The implementation design is more detailed and less clean than the protocol, having that higher level protocol first helps understanding.

  • It forces to me think about how I manage concurrency and state

  • It saves me time later as I already have a rough blueprint to follow.

  • It also makes it easier for someone else joining the project to understand the code.

  • Different language implementations can benefit from a shared model. Defects in the design can be fixed, benefitting all language implementations.

You can see the source code of my ZooKeeper C# implementation here. It is not totally finished as I still have some code clean-up to do, plus I am considring some API changes and changes to the protocol before it gets released.

So now we have a protocol, a TLA+ specification and an implementation design. Overkill? Perhaps, or perhaps not. This project is on the simpler side of things (for a distributed component) and used in this series to demonstrate ways of thinking about distributed systems programming. May be it is overkill for this project… may be.

But there reasons to go to this length with this kind of library:

  • Even simple distributed systems can be evil with all kinds of unforeseeable design and implementation defects. Building distributed systems is hard.

  • Other people are relying you, trusting that you have built a reliable library. Who knows what they’ll use it for. It’s your job as the library developer to make it as reliable as you can.

  • Shipping a code library with a bug is a bigger deal than if it were just in your own application. People need to update to a new version which is more work and some people might never do it.

  • More people than just yourself will be affected by any bugs.

  • More bugs means more GitHub issues!

So personally, I’m going to do everything I can to improve the quality/reliability of the library. And remember folks, this series is a case study about building a simple distributed system, don’t get too hung up on the specifics of Rebalanser.

Testing is to the implementation what TLA+ was to the protocol. Just because we have verified the protocol doesn’t mean that the implementation is sound. We need to test the s**t out of this code. We need to try and break it, over and over again and ensure that it reacts in a predictable way no matter what gets thrown at it. That’s what we’ll look at in the next post.

Links to the rest of this series:

Banner Image Credit: Iztok Bončina/ESO. Link to image.