In this post we'll take a look at what Change Data Capture (CDC) is and how we can use it to get data from SQL Server into Redshift in either a near real-time streaming fashion or more of a batched approach.
CDC is a SQL Server Enterprise feature and so not available to everyone. Also there are vendors that sell automated change data capture extraction and load into Redshift, such as Attunity and that may be your best option. But if you can't or don't want to pay for another tool on top of your SQL Server Enterprise license then this post may help you.
Introduction to CDC
SQL Server's CDC feature reads the transaction log and writes the changes to CDC tables where your application or SSIS pipeline can read them. It runs as a SQL Agent job and there is a little latency between a write occuring and those changes appearing in your CDC tables.
Each table that you activate CDC on gets a respective CDC table created. So if you capture changes across 10 tables then you'll have 10 CDC tables to read from.
Validity Windows, LSNs and SeqVals
Each change is recorded with every column of the row plus some meta data columns:
a 10 byte (commit) Log Sequence Number (LSN) which all changes of a single transaction share. Think of it as a transaction id. If a transaction cover changes to multiple tables, then you'll see that LSN shared across records in those multiple CDC tables.
a 10 byte SeqVal which is a sequence number to order the changes within a given transaction.
an Operation. 1 = delete, 2 = insert, 3=before update, 4=after update. That means each update creates two records to show you the before and after
an Update Mask which shows which columns were updated in a bit mask fashion
By default only three days of data is kept in a CDC table. A validity window is maintained with a high watermark LSN and a low watermark LSN. Only data between these two LSNs should ever be read. Data gets eliminated periodically once its LSN is lower than the low watermark.
At first, each table will have a different lowest valid LSN you can read from due to when CDC was activated on them and when changes came in. Over time the windows should converge on a common window for all. We'll see that later on.
Although we may have multiple CDC tables, there is only one transaction log. The transaction log is a single sequence of changes with an ever increasing LSN for each transaction logged. So although this linearized log is split up into multiple tables, we can still get a consistent snapshot of the data across multiple queries.
We'll be using some SQL Server functions for getting the valid low and high LSNs and extracting data from the tables:
sys.fn_cdc_get_max_lsn() returns the high watermark LSN
sys.fn_cdc_get_min_lsn('your cdc table name') returns the lowest valid LSN for that table. There is a global low watermark, but each table will have its own smallest LSN that will be equal to or higher than the low water mark.
[cdc].[fn_cdc_get_all_changes_dbo_Person] returns all changes between two LSNs for the Person tracked table.
Note that we never query the CDC tables directly.
So far what we don't have are the dates and times of these changes. That is where the cdc.lsn_time_mapping table comes in. It records the start and and time of each LSN.
So we can join onto that table if we also want the associated time of the change.
Getting Data into Redshift
One efficient way to get data into Redshift is to upload the data to S3 as delimited files, then using the Redshift COPY command to do bulk insertions of the data. In our scenario we have inserts, updates and deletes so it is not an insertion only scenario.
The way we'll do this is that we'll create batches of changes and upload them to S3 as documents. Each window will consist of an Upsert and a Delete document per table. With these two files per table we can perform a merge operation:
Create upsert temp table matching target table
Create delete temp table matching target table
COPY upsert S3 document rows into upsert temp table
COPY delete S3 document rows into delete temp table
Delete all rows in target table that matches upsert and delete temp tables
Perform insert into target table from upsert temp table
In this way we perform a transactionally safe bulk update on the target table. We perform this on each table in the window.
Let's look at an example. Our Person table is as follows:
create table "person" ( personid int not null distkey sortkey, firstname nvarchar(200) not null, surname nvarchar(200) not null, dateofbirth date not null, primary key(personid) );
A delimited file might look like (it must be in the same order as the columns of the target Redshift table):
1|Jayden|Rogers|1988-01-16 2|Isaac|Trynski|1976-08-23 3|Mason|George|1992-06-12 4|Oliver|Ellis|1956-12-02 5|Ryan|Davis|1986-02-02
Once we have our upsert and delete S3 documents uploaded we can run the following script to perform the merge.
-- step 1 COPY to staging table create temp table person_stage_upsert (like person); create temp table person_stage_delete (like person); COPY "person_stage_upsert" FROM 's3://your_s3_upsert_doc_path_here' iam_role 'your_role_arn_here' delimiter '|' region 'eu-west-1'; COPY "person_stage_delete" FROM 's3://your_s3_delete_doc_path_here' iam_role 'your_role_arn_here' delimiter '|' region 'eu-west-1'; -- step 2 delete and insert from staging begin transaction; delete from person using person_stage_upsert where person.personid = person_stage_upsert.personid; delete from person using person_stage_delete where person.personid = person_stage_delete.personid; insert into person select * from person_stage_upsert; end transaction;
Note that we end up inserting all the data to be deleted, which is inefficient. I will improve this later on.
By the end of the query we'll have either updated all the data or none and left Redshift in a consistent state. But ensuring data consistency is more complex than just using a transaction in Redshift. In order to guarantee global consistency of the Redshift database we need to ensure that each window contains a globally consistent set of changes. This is easy to accomplish with small to medium size databases, but may be impossible with databases that generate a high velocity of changes. We'll dive deeper into data consistency in a minute.
Streaming (Micro-Batch) or Batch
We can move closer to a streaming model by having very small windows or closer to batch by using much larger windows. For efficiency we like larger batches. We are storing the changes in S3 then uploading the data to Redshift, but those changes can live in S3 forever. Next time we need a Redshift cluster we can bootstrap it from the S3 documents. That is when you really want big S3 documents.
You could of course have both. You could be uploading changes in micro-batches and inserting the data into Redshift and have a secondary process aggregating those small S3 documents into larger ones for future data loads of other clusters. You could even put Kafka in between your CDC reader and S3. That way you can have a micro-batch Kafka consumer and a large batch consumer reading from the same topic. The micro-batch consumer is uploading to Redshift via small S3 documents and the large batch consumer is only uploading large S3 diucments for future use.
All or Net Changes
We can use and configure CDC so that in each query we make, either all changes are returned or just the net changes are returned. If we choose All then if multiple changes are made to a given record then we see all those changes.
With Net changes, we get one net change that represents the combined changes in one change record. Note that only tables with a primary key or unique index can use net changes.
Let's see an example. We'll perform an insert and update in a single transaction. Then we'll get the changes of the most recent LSN which will contain both operations as they were both executed in the same transaction.
BEGIN TRAN; INSERT INTO [dbo].[Person]([PersonId],[FirstName],[Surname],[DateOfBirth]) VALUES (1895, 'Jose','Manchego','19880808') UPDATE [dbo].[Person] SET DateOfBirth = '19700228' WHERE PersonId = 1895 Commit TRAN;
Get the changes.
DECLARE @last_lsn binary (10) SET @last_lsn = (SELECT TOP 1 __$start_lsn FROM [cdc].[dbo_Person_CT] ORDER BY __$start_lsn DESC) SELECT * FROM [cdc].[fn_cdc_get_all_changes_dbo_Person](@last_lsn, @last_lsn, 'all') SELECT * FROM [cdc].[fn_cdc_get_net_changes_dbo_Person](@last_lsn, @last_lsn, 'all')
We see that fn_cdc_get_all_changes_dbo_Person returns both operations and fn_cdc_get_net_changes_dbo_Person returns an insert operation with the updated date of birth.
Also note with the Net results that there is no __$seqval column now as each primary key has only one record and therefore the ordering within the transaction is irrelevant to data consistency.
If you only want to replicate data then Net changes might be better to avoid transferring unneeded data. But if you also want to use this data as an audit log or event store with all individual change events then All would be a better choice.
Data Consistency Strategies
There are different strategies that balance scalability, latency and transactional consistency in different ways. Strategy 1 has the best consistency model and the worst scalability, where Strategy 3 has the best scalability with the least strong consistency model.
Strategy 1 - CDC to Redshift with Global Transactional Safety
One SQL Server transaction can span multiple changes across multiple tables. If transaction A involves changes to: Person 1, Person 2, PersonAddress 34, PersonAddress 40, then in order to maintain global transactional consistency, we'll need to write those changes to Redshift, in a single Redshift transaction.
Starting at the transaction log, this is easy as the transaction log is a single linearized log of changes. We find those four changes in a sequence next to each other. But when consuming those changes via SQL Server CDC, those changes are distributed across two tables. Now imagine a transaction that touches 10 tables. This presents us with a problem.
We could simply iterate over the known LSNs found in the lsn_time_mappings table, and query each CDC table for the corresponding changes. But this would be prohibitively slow and inefficient,
We could read large ranges of transactions across all tables and then perform grouping and ordering in memory, but very large transactions could be too much to fit into memory safely.
In the end, I ended up with the following processing pipeline:
1 - CDC Reader Tasks
Each CDC table gets its own CDC Reader Task that reads from the table in batches. It pushes its changes onto a BlockCollection which acts as a bounded capacity buffer in a producer/consumer pattern. It always loads data efficiently in batches and pushes that data onto the buffer. When the buffer reaches capacity, the reader continues to try every 100 milliseconds until its already loaded batch is fully on the buffer. Then it loads a fresh batch.
Each CDC Reader Task runs concurrently.
2 - Transaction Grouping Task
The Transaction Grouping Task runs concurrently alongside the other Tasks. It starts by peeking at the head of each buffer and chooses the lowest LSN it finds. It then starts efficiently pulling data from the buffers who have that LSN at the head. It pulls until no more buffers have that LSN to offer any more. It then packages those changes up into a Transaction Group and pushes it onto another bounded capacity buffer. Then it peeks the head of each buffer and selects the lowest LSN and repeats.
This allows us to very efficiently pool changes of the same LSN, even if there are many thousand changes of a given LSN in one table and only handful in a few other scattered tables, or if every transaction only ever has a single row change in a single table.
Secondly, we configure a maximum transaction group size. When a transaction group exceeds the threshold, we mark it as a multi-part transaction and push it onto the outgoing buffer. Each part gets a part number indicating its ordinal position in the transaction.
3 - Redshift Uploader
This Task also runs concurrently with the rest of the Tasks. It pulls from the Transaction Group buffer and periodically performs uploads to S3 and performs the merge operation to get the data into Redshift.
To cope with the very large transactions it writes each part of a multi-part transaction immediately to S3. Once it receives the last part, it performs the copy and merge operation on all of the S3 documents of that transaction in a single Redshift transaction.
This means we can support large transactions without needing to store them all in memory at the same time.
This might be overkill for your scenario. You might not care about applying the changes to Redshift in the same linearized transactions as SQL Server. But it's nice to know that we can have it, with decent performance up to a certain limit. Although we use concurrency to read from tables efficiently at the same time, this is still a one host solution. Your database might have too great a velocity of changes to support this strategy.
That said, you could easily replace the in memory buffers with Kafka and the Tasks with processes hosted on multiple servers. The theory would be the same on the whole.
Strategy 2 - CDC to Redshift with Table Level Transactional Safety
In this strategy we respect transactions at the level of the table. Each table is read, grouped and uploaded in isolation, along transactional boundaries. We can take the same architecture as strategy 1 except we now have a process per table and each process has only a single CDC Reader Task.
The benefit of this approach is that we scale better than strategy 1. But now we lose transactional consistency across tables. For example we might get a PersonAddress get written before the corresponding Person gets written.
Strategy 3 - CDC to Redshift with Change Order Safety
Here we also read each table in isolation, but now we perform no grouping of changes by LSN.
This eliminates that buffering pipeline of tasks and is replaced by a very simply paging and upload logic.
Read a 1000 changes of the Person table and upload to Redshift
Read a 1000 changes of the Person table and upload to Redshift
Read a 1000 changes of the Person table and upload to Redshift
You get the idea. The benefit of this approach is the simplicity. We don't care about large transactions at all. We do guarantee that changes are written to Redshift in the correct order (within the scope of a single table), but those changes may get written crossing transactional boundaries.
For example. Let's say as a user, I delete my delivery address and add a new one one and those two changes are written to SQL Server in a single transaction. We might end up performing the delete in one Redshift transaction, and the insert in the next. For a short time window I would not have an address in Redshift. How important that kind of scenario is depends on your data and company.
Building a CDC to Redshift Pipeline in .NET Core
I am currently working on a .NET Core implementation of this architecture. It is in Github here. It is a work in progress and once it is ready I will write a new post. It has many interesting tidbits that I find interesting.
It consists of:
a couple of core libraries that handle the various CDC reading strategies, including the buffering architecture of Strategy 1. The libraries also offer a full database load and also state persistence so if there is a crash or whatever, it can carry on from where it left off.
a couple of example applications that use the core libraries to read CDC and export to Redshift and Kafka. I will make more.
Docker support. Example docker-compose.yml files for standing up the various strategies with a docker-compose up command.
If you are interested then keep an eye on the Github repo. Currently documentation is virtually zero. It is in alpha.