Serverless ClickHouse Cloud - ASDS Chapter 5 (part 1)

See The Architecture of Serverless Data Systems introduction chapter to find other serverless data systems. This chapter is the first system of group 3 - the analytics database group.

ClickHouse is an open-source, column-oriented, distributed (real-time) OLAP database management system. ClickHouse Cloud offers serverless Clickhouse clusters, billed according to the compute and storage resources consumed. 

Before we jump into the architecture of ClickHouse Cloud, it’s worth understanding the architecture of a standard ClickHouse cluster first. The storage architecture of ClickHouse plays a critical role in how the serverless version was designed. Part 1 of this chapter will focus on the standard, fully open-source ClickHouse, and part 2 will focus on the serverless architecture of ClickHouse Cloud.

A mental model of the standard ClickHouse architecture

This section aims to build a mental model of the architecture of ClickHouse that we’ll refer back to when covering the serverless design.

ClickHouse chose a different architecture from its peers, such as Apache Druid and Apache Pinot. Those systems opted for a disaggregated architecture, with different node types fulfilling various functions, such as ingestion, query routing, query execution, and data storage. ClickHouse nodes are monolithic by comparison, where each node takes in its share of all duties except for consensus/coordination, which is reserved for Apache ZooKeeper or, more recently, ClickHouse Keeper. From a deployment and kicking the tyres perspective, this is a more straightforward architecture to understand and get started with (fewer components to deploy).

At the core of the ClickHouse node is the storage engine, which comes in various flavors with the MergeTree and ReplicatedMergeTree being the predominant ones. The MergeTree engine is a family of engines optimized for different non-aggregated vs aggregated tables and insert vs update heavy workloads. The tables using the MergeTree engine are not replicated though sharding and storage tiering to cloud object storage are available. For higher durability and availability, the ReplicatedMergeTree engine is recommended.

MergeTree-based storage engine

At the center of ClickHouse is the MergeTree family of storage engines:

  • MergeTree

  • ReplacingMergeTree

  • SummingMergeTree

  • AggregatingMergeTree

  • CollapsingMergeTree

  • VersionedCollapsingMergeTree

We’ll focus on the basics of the non-replicated MergeTree engine (and variants) before we look at the ReplicatedMergeTree family.

The basic idea of this storage engine is to write new data in immutable blocks called parts and then asynchronously merge these small data parts into larger parts up to a certain size. Writing small blocks is good for writes but bad for reads, so background merging creates larger parts to optimize read performance. This is reminiscent of an LSM tree engine, though there are key differences. While parts of lower levels are merged into deeper levels, they do not actually form an LSM-Tree-like multi-level structure that queries traverse in level order, and no write-ahead log exists.

A part is a self-contained subset of a table’s data and indexes that includes various files. Data is stored in a columnar format, either in a single file or one file per column. The directory of a single part contains:

  • Column data files. Parts can be in compact mode, where all column data is stored in a single .bin file, or wide mode, with one .bin file per column.

  • The primary index comprises an idx file and a mark file (see below).

  • Optional partitioning-key MinMax index.

  • Optional secondary indexes (with an idx and mark file per secondary index).

  • Metadata files.

  • Projections. Similar to the concept of a materialized view but stored as a hidden table in a sub-directory, with all the same types of files as the parent table (bin, idx, etc). Projections are covered later.

Indexes are part-scoped data structures; in other words, you can think of a ClickHouse primary or secondary index as a set of mini-indexes with one index per part. However, note that individual parts can grow to multiple GBs in size after several merges. This differs from how a traditional monolithic B-Tree-based database organizes its indexes and is fundamental to understanding how ClickHouse works.

Each ClickHouse table has a primary key and a sorting key. If the user omits the primary key in the CREATE TABLE statement, the sorting key acts as the primary key. Data is organized on disk within a part according to the sorting key order. The primary index is a sparse index - it does not include every row contained in the part. Instead, it creates one index entry for every Nth row (which defaults to 8192 rows), and each block of N rows is called a granule. The index entries are known as marks. A granule is the smallest block of data that the ClickHouse query engine reads from disk. Much like a block device might read 4Kb blocks, ClickHouse reads granules as the smallest unit of data (in a vectorized fashion). These vectorized reads of granules are extremely fast as the deserialization cost is low due to the in-memory and on-disk representation being, in many cases, the same. Granules can also be compressed to reduce the size on-disk.

Secondary indexes are also sparse, and marks can span multiple granules. Sparse indexes allow a ClickHouse node to cache all part indexes of a table in memory, even for huge datasets, which is necessary for fast query execution. However, there is always a limit, and the administrator must balance the needs for indexes against the additional demands on limited memory.

To locate data in column data files (.bin files), each index file is accompanied by a mark file, which includes pointers from index key marks to offsets in column data files.

ClickHouse also supports partitioning keys, which further subdivide parts according to the supplied key. This allows for more efficient queries and operations to drop data subsets based on the partitioning key. The partitioning key is incorporated into the part name, allowing query execution to filter parts simply by the part directory names. A very low cardinality key is desirable as this optimization only works well when the number of partitions is small; otherwise, the number of parts increases dramatically, reducing the efficiency of reads and the background merge process.

Parts names and merging

Part directories have a naming convention <partition>_<min_block>_<max_block>_<level>_<version>, where version is included if the part has been modified by an ALTER TABLE query (see mutations below). When a part is created, it is of level 0, and it is assigned a block number which is an auto-incrementing counter. Parts can only be merged with other parts with an adjacent block number. When two level 0 parts are merged, the new part will have a level of 1. The original level 0 parts will eventually be garbage collected.

Original parts:
- all_1_1_0
- all_2_2_0
- all_3_3_0
- all_4_4_0
- all_5_5_0
- all_6_6_0
- all_7_7_0
- all_8_8_0

After merges ->
- all_1_4_1
- all_5_8_1

If the table has two partitions, then we might see the following:

Original parts
- 1_1_1_0
- 2_2_2_0
- 1_3_3_0
- 2_4_4_0
- 1_5_5_0
- 2_6_6_0
- 1_7_7_0
- 2_8_8_0

After merges ->
- 1_1_7_1
- 2_2_8_1

With partitions, adjacent block numbers within a partition may have gaps.

The MergeTree engine selects parts for merging based on some heuristics. It must balance some competing concerns, such as:

  • Limiting write amplification (merging large parts regularly or merging large parts with small parts will incur higher write amplification).

  • Limiting the number of parts (large numbers of parts will negatively impact read performance).

The merge selection algorithm must balance these concerns for balanced read and write performance. Additionally, the level of background and foreground work must be balanced. In Chapter 4 - CockroachDB, we saw how its sophisticated admission control was responsible for balancing foreground query execution with background LSM tree compaction (among other duties). CockroachDB refers to the need to keep read amplification under control by ensuring that background compactions keep up with foreground inserts and updates:

“Read amplification refers to this process of having to check multiple files. The more files the average read must scan, the higher the read amplification and the less efficient reads become. Compaction aims to reduce the number of SSTables that must be searched by consolidating these SSTables through the merging of overlapping ranges of data. This reduces the number of SSTables and improves read performance.”. Chapter 4, part 2, Serverless CockroachDB.

ClickHouse has the same requirement to ensure that background part merging keeps up with foreground part creation. The primary mechanism ClickHouse uses to protect itself from read amplification is capping the number of active parts allowed for any given partition (and also in aggregate). 

It has two thresholds that kick at the partition level: 

  1. Once the first threshold, parts_to_delay_insert, is reached, the server starts to artificially slow down inserts, giving background merges a chance to catch up. Defaults to 1000.

  2. Once the second threshold is reached, parts_to_throw_insert, the server actively rejects insert and update requests. Defaults to 3000.

The aggregate part count can also be limited by the threshold max_parts_in_total, which once breached, will cause the server(s) to reject inserts and updates.

In order to ensure that the background merging can keep up with new part creation, it is recommended for ClickHouse applications to perform less frequent, larger bulk inserts rather than more frequent, smaller inserts. The part creation rate is something that users of ClickHouse should always be cognizant of.

Data organization and indexes

Both the data and indexes are organized into the set of parts that make up the table. The naming and merging of those parts are based, in part, on insertion order (due to block number) rather than primary key/sorting key order. Any given index file only has knowledge of the data within its own part and there is no order across the index files as a whole. A ClickHouse index is a fragmented part-scoped data structure, unlike a B-Tree-based index in a traditional relational database. 

An index seek within one index file uses binary search, which is very fast. However,  primary keys are not unique in ClickHouse and two rows with an identical primary key value can end up in different parts. An index seek on a given primary key value will need to inspect the indexes of all parts of a table in order to return all rows that match the key. This is why background merging is so important, as the cost of a read is directly linked to the number of parts (read amplification). Binary search is logarithmic in time complexity, which means that one giant index will perform better than a multitude of tiny indexes. The need to inspect all parts is one clear way that the MergeTree engine differs from an LSM tree, where an index seek will search SSTables in level order and stop on the first match, assuming the last version is wanted, because a typical LSM tree enforces uniqueness of the primary key.

ClickHouse indexes are sparse, which makes them small relative to the data, and fit in memory. This allows for fast seeking and scanning despite the need to search many indexes (up to a point). When partitioning is used, part pruning can be employed which reduces the number of index files that the query must inspect, as long as the query uses a predicate based on the partitioning key. All this work can be parallelized for faster execution.

More on sparse, part-scoped indexes

Unlike a traditional OLTP database which has an index entry for every row, a sparse index only has an entry for a subset of rows. This means that, unlike a traditional B-Tree-based index, the row data (from bin files) must always be read as most of the index column data is not included in the index itself. 

Query engines over traditional B-Tree-based primary indexes (clustered indexes) can use optimizations such as:

  • Avoiding the need to drop down to the data pages of the leaf level if the query contains only the index columns.

  • Secondary indexes can include “covering columns” in the index itself to avoid the need to perform lookups against the data pages of the clustered index.

Those read optimizations are not possible with a sparse index. The sparse index is specially designed for very large datasets where keeping a B-Tree index of every row in memory is not possible. 

On the flip side, there are a number of benefits to this data architecture of parts and sparse indexes. Firstly, writes do not incur the costs of modifying a single giant B-Tree-based index. Instead, small indexes are built from scratch on the creation of each new part. For insert queries, this index creation cost is in the hotpath and directly visible to the calling application, but due to their small size, the cost is low. For new parts created by merges, which may be many times larger than level 0 parts, this cost is unseen except for when background work is big enough to impact foreground work.

Secondly, the ClickHouse engine can process a query in a highly parallelizable fashion:

  • The first stage is potential part pruning if a table utilizes partitions and the query uses the partitioning key column in its where clause.

  • Next, the primary (or secondary) index of each selected part is searched for relevant granules.

  • Finally, the selected granules are streamed into the query engine from the data bin files where it can apply the SQL operations on the data.

Each stage of this pipeline is parallelizable across multiple threads, and with distributed tables, this can even go further by parallelizing across servers.

Secondary indexes, projections, and materialized views

Queries that do not align with the primary key are likely to result in a table scan which for very large datasets will be very expensive operations. ClickHouse provides some additional features for speeding up such queries:

  • Secondary indexes.

  • Projections.

  • Materialized views.

I’ll start with secondary indexes, as they are a common feature across the database ecosystem. A secondary index is created with a certain granularity where it can skip granules - hence it is called a skipping index. With a granularity of 4, a secondary index will have an entry for every 4th granule. There are a few types of index available: MinMax, Set, and different Bloom Filters.

Secondary indexes are somewhat limited in ClickHouse because they are both sparse and scoped to the part. As I said just before, a sparse index requires that the data files always be read but these data files are organized by sorting key ordering. ClickHouse illustrates the limitations of secondary indexes in this excellent overview of primary indexes in ClickHouse. The example given is one with a table with a primary key of (UserId, Url).

CREATE TABLE table1
(UserID UInt32,
 URL String,
 EventTime DateTime)
ENGINE = MergeTree
PRIMARY KEY (UserID, URL)
ORDER BY (UserID, URL, EventTime)

If a secondary index is created over the Url column with a granularity of 4, using a MinMax index type, then the secondary index includes the min and max Url values covering 4 granules at a time (or potentially around 32k rows). This index is unlikely to be useful to the query engine, which is more likely to opt for a table scan.

However, there are more powerful options that trade greater disk space and lower write efficiency for greater read efficiency: the projection and the materialized view.

A projection is a generic word to mean a derivative of an original dataset. For example, any time we run a SELECT query that reorders the data, filters columns, filters rows, or does some kind of aggregation; we have created a projection of the original data. ClickHouse can materialize such as projection as a hidden table stored alongside the parent table in each part. Using the example with the primary key of (UserId, Url), we can create a projection within the table that uses a sorting key of (Url, UserId).

CREATE TABLE table1
 (UserID UInt32,
  URL String,
  EventTime DateTime,
  PROJECTION by_url (
    SELECT *
    ORDER BY (URL, UserId))
ENGINE = MergeTree
PRIMARY KEY (UserID, URL)
ORDER BY (UserID, URL, EventTime)

This particular projection essentially doubles the data stored as it keeps all columns without any aggregation or filtering, it simply reorders the rows in URL, UserId order. This is stored as a hidden table in a sub-directory of each part and has all the usual files (primary key, bin files, etc). However, for queries that filter by URL, the speed-up will be huge as binary search over the projection primary key in now possible. Commonly, projections are used for pre-aggregation to reduce the cost of reads that need aggregated data.

Materialized views are another option with some similarities to projections. The main difference is that a materialized view is explicitly a different table from the parent. You query such a table by name and its data and indexes are stored in their own parts. It is essentially just another table, but one whose parts are created based on changes to the parent table.

The cost of projections and materialized views need to be balanced with the greater efficiency of reads. Because data is essentially duplicated, the costs of writes increase significantly as not only must more data be written and possibly replicated, but also the indexes of these additional tables must be built. This is a classic compromise that exists in all databases that offer secondary indexes and materialized views.

In general, ClickHouse provides the best performance when queries align well with the first column of the primary key, but there are a number of ways of creating additional primary keys (via projections and materialized views). See the ClickHouse write-up on sparse primary indexes to learn more.

Mutations

So far we’ve focused on data insertion, but rows can also be updated using the ALTER TABLE … UPDATE/DELETE … WHERE statement. This results in an asynchronous (by default) mutation task, which identifies the affected parts and rewrites them with an incremented version number. This can be an expensive set of operations as updating/deleting a single row can cause an entire part to be rewritten (which could be several GB in size).

Mutations are executed asynchronously by default in order to group as many mutations in the same operation to rewrite a modified part - thus reducing the cost. 

Alternatives to mutations do exist (and are recommended):

  • Instead of deletes, use partitions and drop partitions using the ALTER TABLE … DROP PARTITION… statement. This removes that entire partition from the table efficiently.

  • Instead of deletes, use Time-To-Live (TTL) combined with partitioning.

  • Instead of updates, use a specialized merge tree engine such as the ReplacingMergeTree engine which removes duplicate entries, keeping the most recent one. This allows updates to be performed via inserts. Because duplicate removal is performed during part merging, duplicates can exist for a period of time or indefinitely. For example, a number of parts may reach the upper size limit and no longer get merged and therefore duplicates in these parts never get eliminated. The FINAL keyword modifier can be used in queries which forces the query execution itself to perform a final deduplication so that duplicates are not returned to the caller. Some of ClickHouse’s underlying implementation does leak through to the SQL layer, with tricks such as FINAL required. The use of FINAL can impact performance due to the merge-on-read that occurs.

MergeTree engine variants

Different workloads require different MergeTree engine variants. As explained above, an update-heavy workload may perform poorly with the standard MergeTree engine but more efficiently with the ReplacingMergeTree (though deduplicating always adds some cost).

  • ReplacingMergeTree: Deduplicates rows during part merges based on the sort order. Optionally a column can be specified that should be used to determine which row of a set of duplicates survives (such as a datetime column), else it uses last-write-wins. Useful for update-heavy workloads.

  • SummingMergeTree: When merging, combines rows of the same sorting key, summing any numerical column values. Useful for simple count-based aggregations grouped on the sorting key.

  • AggregatingMergeTree: Used to create materialized views by performing aggregations on part merges.

  • CollapsingMergeTree: Allows you to do updates by inserting two rows: one cancel row and one current state row. The downside of this engine is that the INSERT query must include the last row inserted (that must be cancelled) and the new state of the row that should replace it.

  • VersionedCollapsingMergeTree adds a numeric version field to tell the merge logic the order of precedence which avoids some race conditions. The CollapsingMergeTree algorithm is difficult to reason about and so this versioned variant simplifies the logic and avoids certain race conditions at merge time.

Storage configurations

ClickHouse uses an abstraction of disks and storage volumes where a single volume can be composed of an ordered set of disks. This abstraction extends beyond multiple block devices to encompass various storage types, including S3. Storage policies map disks to volumes, as well as more advanced configurations such as storage tiering where recent “hot” data is stored on faster local disks, and “cold” data is copied to slower, more economical storage such as S3. I try not to refer to cloud object storage as S3, but this is how it is referred to in ClickHouse.

With this disk/volume/policy abstraction, the administrator can choose a number of combinations from keeping all data on local disks, only writing to S3, using tiered storage, or even writing part metadata to local disks and parts themselves to S3.

The ReplicatedMergeTree engine has some drawbacks with this disk abstraction and S3, as it ends up duplicating the S3 data once for each replica. To avoid this issue, a “zero-copy” variant of the replication mechanism was created and is detailed further down.

Distributed architecture

The MergeTree engine is not distributed though it can tier data to S3 as already mentioned. Therefore a table based solely on the MergeTree engine can only be served by a single ClickHouse server. There are two options for going distributed (which can be combined):

  • Sharding, for when your data doesn’t fit on a single machine.

  • Replication (ReplicatedMergeTree engine), for when you want high availability, and higher durability (though it also helps with scaling too).

Sharding

Sharding divides the data over multiple servers, resulting in a shared-nothing architecture where each server hosts a disjoint subset of the table data.

A ClickHouse table can be sharded across multiple servers by:

  1. Defining a cluster of servers in the configuration file.

  2. Creating a normal table with the MergeTree engine.

  3. Creating a distributed table with the Distributed Table engine (including the details of the cluster, the underlying MergeTree table, and sharding key column). This type of table does not actually store any data. Instead, it defines a distribution query processing layer that sits above the per-server MergeTree engines.

Sharding has a number of benefits:

  • Very large datasets (which are too big for a single machine) can be spread across multiple machines.

  • Conceivably it could help when parts are stored on S3 to improve read performance. Generally speaking, replication is the preferred method of improving read throughput as the ReplicatedMergeTree engine is leaderless for reads/writes, so reads can be spread across multiple replicas without the need to federate the query across replicas (which adds some overhead). However, as we’ll see further down, there are some issues with combining replication with S3 in the open-source ClickHouse (at the time of writing).

  • Merging results in fewer parts being maintained per shard, which improves read performance as any given shard needs to read a smaller number of parts. As I described earlier: binary search is logarithmic in time complexity, which means that one giant index will perform better than a multitude of tiny indexes.

Of course, these benefits get tempered by the additional overhead of federating out the query and gathering the results (as per the Universal Scalability Law), but presumably, every workload has its sweet spot on the hardware configuration it's deployed on.

Replication and high-availability

Sharding alone does not add redundancy or high availability, for that we need the ReplicatedMergeTree engine. This engine replicates parts across a set of replicas, allowing for both reads and writes to be directed at any replica.

Replicas coordinate with compare-and-swap on the version of data parts metadata in Keeper in order to merge parts concurrently across replicas.

Replicas disseminate the existence of new parts with peer replicas via ZooKeeper or ClickHouse Keeper. When a replica learns of a new part on a peer replica, it fetches that part from its peer. The job of merging parts is distributed across the replicas and coordinated with compare-and-swap on the version of data parts metadata in Keeper. The existence of newly merged parts and the now non-active source parts are disseminated via the keeper in the same way.

Replication is also the recommended approach to increase read throughput as the read load can be spread evenly across the replicas. Sharding and replication can be combined to further scale out processing power.

Tiering to cloud object storage

As described earlier, both the MergeTree and ReplicatedMergeTree engines can tier cold data to cloud object storage. The ReplicatedMergeTree engine currently has some limitations with regard to tiering, which I will briefly discuss now.

By default, each replica of a ReplicatedMergeTree table will tier its data to object storage, duplicating the data. To avoid this, a zero-copy mechanism can be employed where only one copy of the data is uploaded. The ReplicatedMergeTree engine with zero-copy only replicates part metadata between replicas rather than the parts themselves, which are instead downloaded from object storage. The part metadata includes which files belong to the part and the locations of these files in object storage.

ClickHouse reports that this zero-copy mode of the ReplicatedMergeTree engine has a few downsides, which seem to mostly come down to the keeper becoming a bottleneck for part dissemination on larger clusters and the short window where part metadata remains without redundancy on the source replica’s local disk. It is not clear if/when this will be addressed in the open-source ClickHouse project, but it is plausible that the current issues with the zero-copy architecture can be overcome. This GitHub issue seems to be the place to go if you’re interested in improving ClickHouse object storage integration for (Replicated)MergeTree engines.

Compression

ClickHouse uses its own columnar storage format and uses similar compression techniques to other columnar file formats such as Parquet and ORC.

These compression techniques fall into two categories:

  • Lightweight compression codecs which leverage patterns in row values of a given column. These codecs include dictionary encoding, delta encoding, Huffman encoding, Gorilla etc.

  • Block compression algorithms such as LZ4 and Zstd.

Compression can be configured to use none, one or both types of compression. ClickHouse has a very interesting blog post with a case study applying different compression techniques. Surprisingly, plain-old Zstd alone came out with strong results in terms of compressed size.

How much mileage you get from compression will depend a lot on your data and the hardware of your cluster. Local NVMe storage is so fast that block compression may actually be detrimental to some workloads (due to the extra CPU overhead), whereas for clusters which store data on high throughput, high latency cloud object storage, all use of compression will likely boost performance significantly. ClickHouse note that compression isn’t always slower than a simple memcpy when hardware has enough cores. ClickHouse chose to set compression to default to LZ4 for this reason. These fascinating slides go into more detail.

Next…

With that mental model built up, it’s time to delve into the ClickHouse Cloud serverless architecture in part 2.