Exploring the use of Hash Trees for Data Synchronization - Part 1

In this post we'll explore a relational database replication strategy that you can use when standard database replication is not an option – so no replication feature, no log shipping, no mirroring etc. The approaches outlined below will only work with a master-slave model where all writes go to the master. Conflict resolution is not addressed in this article.

We’ll cover phase one of a two-phase approach of
1.    Generate and compare hash trees to identify blocks of rows that have discrepancies
2.    For each block with a different hash value, identify and import the individual changes (insert, update, delete)
This post is really about exploring the approach rather than looking at the implementation details and detailed performance metrics. Perhaps I might share some code and metrics in a later post if people are interested.

What got me working on this problem?

I have recently been tasked with fixing an ETL process that synchronizes a master with a read-only replica. We cannot use standard database replication because the master database is owned and operated by a partner and will not offer replication, log shipping or any other SQL Server feature for transferring data.

The legacy ETL used a complex strategy to detect the new, updated and deleted data while maintaining inter table consistency. I won't go into that here but suffice to say it was broken and millions of discrepancies existed between the two databases. So I decided to go for a new strategy entirely and I looked to distributed data stores for inspiration.


Cassandra and Hash Trees (Merkle Trees)

Cassandra allows for a keyspace to be partitioned into shards and for each shard to have N replicas. Each replica can receive writes and must replicate those writes to the other replicas. Cassandra nodes manage this by exchanging hash trees or Merkle Trees with each other. The benefit of this strategy is that large volumes of data can be compared with relatively few bytes being transferred over the network.

Cassandra maintains a perfect compact binary hash tree of a certain depth which it can share with its replica neighbours at any time. A hash tree at its base level holds the hashes of individual rows and each parent then is the hash of its two child nodes. This could get pretty big so Cassandra compacts the tree and maintains to a limited depth with groups of rows at the base.

The Cassandra nodes exchange these trees and use them to identify the data inconsistencies between them and then exchange the real data. It is a bit more complex than this as conflicting changes need to be taken into account but we won’t cover that here.

There is a great description of how that works here: http://distributeddatastore.blogspot.com.es/2013/07/cassandra-using-merkle-trees-to-detect.html

Hash Trees with SQL Server

That is great for Cassandra but SQL Server has no online hash tree offering so instead I must periodically generate a hash tree for each table and exchange those. I cannot get the SQL Server databases to exchange these hash trees with each other and act on them so I need a third party to request and receive these trees - my C# application.

In my case I have about 300 GB of data that needs to be synchronized between the two SQL Server 2008 R2 databases. The master database hosted by our partner receives all writes and our replica is just a read-only copy of that. 

SQL Server offers us two hashing functions. CHECKSUM and BINARY_CHECKSUM. I had to discard CHECKSUM immediately as it generated different checksums (hashes) for the same content with different collations. The collations used in the master and replica are not the same and nor could I change our collation in my case. That left me with BINARY_CHECKSUM. It is important to note that some datatypes are ignored by BINARY_CHECKSUM, including text and image. However, in my case all types are covered by the function. So we are good to go.

Getting a single hash value for a block of rows 

Next we need a way of using BINARY_CHECKSUM to produce a value over block of rows. In C# I might go for an iterative hash function whereby the hash value of each row N is the hash of the columns of row N plus the hash value of N -1. The hash value of the last row is the block hash value. But with a relational database we are limited in how we can approach this and still get good performance. SQL is a set orientated language, so no loops if you want performance. The simplest way of doing it as a set is to hash each row and sum the hashes. This may not be suitable for all datasets but in my case the chance of two different blocks having the same hash sum value is vanishingly small.

SELECT SUM(CAST(BINARY_CHECKSUM(*) AS BIGINT)) AS BlockCheckSum
FROM MyTable
WHERE …

I have a way of getting block hash values, now I need a way of grouping rows into blocks. In fact, this is where I will start looking at two contrasting methods of grouping rows and building hash trees.

After some experimentation I found two ways of generating and comparing hash trees. The first method most closely matches the classic hash tree which I call the Bottom Up approach and the second method is an Iterative Top Down approach. Both approaches are influenced by the ability of your application to run asynchronous code in parallel.

Method 1 – Bottom Up

To build our perfect compact hash tree we need to generate N blocks for the bottom level of the tree and that bottom level must contain a number that is of a power of 2. So for example we could choose 4096 blocks that would make a hash tree with a depth of 12.

We need to make blocks that are comparable between the two databases, in order to do that we need blocks that map to the same key range in both databases. The record for Bob must be in the same block in both databases as we will later be comparing the databases block for block.

This would eliminate using the ROW_NUMBER() function to generate row numbers as a partition space. ROW_NUMBER would probably generate different row numbers in both databases so the user Bob could have row number 999 in database 1 and 1000 in database 2. Bob could potentially be in the group 1-999 in database 1 and 1000-1999 in database 2.

In fact, the grouping cannot be determined by any database but by the application. In my case I was lucky and all tables are either large with an auto incrementing bigint primary key or are small and have a character based primary key. For the small character based tables I simply treated the whole table as a single block. For the large tables I took the minimum Id (between both source and target) and the maximum Id (from both source and target) and partitioned that key range into 4096 blocks. This is perfect as the blocks perfectly align on both databases meaning that Bob is guaranteed to exist in the same block in both databases.

To efficiently generate 4096 blocks I discarded the idea of 4096 queries, instead I used bulk copy to insert all the ranges into a temporary table and then performed a join onto that temporary table.

SELECT SUM(CAST(BINARY_CHECKSUM(*) AS BIGINT)) AS BlockCheckSum
    ,S.FromId
    ,S.ToId
FROM MyTable T
JOIN #MyTempTable AS S ON T.MyId >= S.FromId
    AND T.MyId <= S.ToId
GROUP BY S.FromId
    ,T.ToId

In order to perform a bulk copy to a temporary table and then perform a separate query against that table, simply share the same SqlConnection and SqlCommand object for both operations, that way the second query has visibility of the temporary table.

That brings me back N blocks that form the lowest level of my hash tree. Then I simply build the tree from the bottom up with a recursive method in C# and I will have two hash trees which I can use to compare both databases.

Too Slow

The cost of hashing the rows was less than the cost of the join. So I went for a Parallel Bottom Up approach. I partitioned the table into multiple key range partitions. Then for each partition I ran the Bottom Up approach in parallel, creating multiple hash trees. Because each block maps to the same key range in both databases I can simply compare each of my hash trees separately, in parallel and unify the results at the end.

This made it much quicker but it still took up to five minutes to finish for my test table of 15 million rows and hammered the database.

I started playing with the Top Down approach.

Iterative Top Down approach

I partitioned the table into 10 blocks by key range and hashed each key range producing 10 blocks. I compare each block and for the ones that differ I partition those blocks into 10 blocks and hash and compare each one. I continue to do this until the size of the blocks reached a configured size (row count). 

Figure 2 depicts the comparison using a 5 block partition. Thenumber in each block is the hash value.

Figure 3 shows how blocks that differ are broken down into sub blocks etc, creating a partial uneven hash tree that is generated iteratively. The numbers in each block refer to the row count.

For this I need one database query per block and in order to make it fast I make each call asynchronously in parallel for each block in the same branch.

Iteration 1

Break down table into ten blocks and hash each one in both databases.
Compare each block. Block 2 has a different hash in source and target.

Iteration 2

Break Block 2 into ten blocks and hash each block in source and target.
Block hashes 2.4 and 2.7 are different.

Iteration 3a

Partition Block 2.4 into ten blocks and hash each block in source and target.
Block 2.4.1 has a different hash in source and target. The size of this block is below the configured 25000 block size of Phase 2. Add this block to a list of blocks to pass to Phase 2.

Iteration 3b

Partition Block 2.7 into ten blocks and hash each block in source and target.
Block 2.7.5 has a different hash in ODS and target. The size of this block is below the configured 25000 block size of Phase 2. Add this block to a list of blocks to pass to Phase 2.

Result

Two blocks of size 20000. 20000 being the chosen block size. 

This approach was three times faster than the Bottom Up Approach which seems counterintuitive. This approach means that a particular row could end up being hashed multiple times whereas each row is guaranteed to be hashed only one time with the Bottom Up approach. However, not paying the price of the join seems to have won the day.

Further tests showed that the performance gain is only valid when there are relatively few discrepancies. When a large number of discrepancies exist, distributed evenly across the data set then the hash tree that is generated iteratively can grow deep meaning that the total hashed row count can grow many times larger than the actual row count of the table as rows get hashed and rehashed.

In my case, this synchronization will run every 5 minutes and so the number of changes to replicate are relatively low so the Iterative Top Down approach works best for me.

Performance Tricks

I found some ways of improving performance 10x to 20x which probably made this whole project viable. Hashing the entire database every five minutes might not be viable due to the size of the dataset so the tricks I list below are all about avoiding hashing the entire dataset each time.

Pre-Compute the Row Hashes

If you need a strategy like this one then you won't have control over the master database, but you do have control over the replica. At the very least you can pre-compute the row level hashes in the replica on insert and update. This provides a big improvement in performance as the only performance cost is the summing of the hashes.

Trustworthy Modified Dates

If you are lucky enough then you’ll have modified dates in all your tables and these dates will ALWAYS be updated when any change is made to the data. So if devs make cheeky data changes in production, or database update commands do not always update the modified date then forget this one.

Instead of BINARY_CHECKSUM(*) use BINARY_CHECKSUM(ModifiedDate). The performance improvement will depend on the number of bytes of your average row but you will likely see a dramatic improvement. But be very careful, we are essentially summing the hashes of these dates and so it could be more likely to get a summed hash collision with the result of a false negative. It really depends on your data whether this is a viable approach.

Created Dates and No Changes on Old Data

If you are fortunate then you’ll see that no data gets changed past a certain time period and that all tables have a created date. Limit the rows that form the hash trees to a time period where changes can take place. May be only 9 million of your 126 million record table could possibly have changes happen to them.

Tweak the Partition Counts and Block Sizes

You will likely speed up the hash tree generation by effective concurrency. So that means async and await if you have .NET 4.5 or higher or TPL for .NET 4.0. Increasing the partition count on each branch will only improve performance up to a certain number of partitions. Experiment and find out what is most effective for you.

I found making the generation of all the blocks concurrent ended up creating too many database connections (70 tables with 10 blocks at top level plus any deeper levels) so I made the block generation of the top level of each hash tree concurrent and lower levels sequential. The problem with this approach can be if you have a super large table with lots of changes, then other tables may finish quickly but the behemoth can take much longer. In that specific case you want to be able to distribute the concurrency more evenly over the total range of blocks generated, rather than by table and top level. I think the TPL Data Flow library could be quite useful for managing that kind of concurrency control as you can iteratively place blocks into a single data flow pipeline and set a limit on the number of blocks being generated concurrently at one time.

The block size will also affect performance though probably more in Phase 2 so we'll look at block sizes in Part 2.

EDIT

I originally intended this to be a two part series and in the second part I would share code snippets of key parts of the logic. I have since decided to create a generic synchronization tool based on this approach. This series will continue in the "Building Synkronizr" series that starts here.