While CockroachDB is an excellent system of record, it also needs to coexist with other systems. For example, you might want to keep your data mirrored in full-text indexes, analytics engines, or big data pipelines.
This page walks you through a demonstration of how to use an enterprise changefeed to stream row-level changes to Snowflake, an online analytical processing (OLAP) database.
Snowflake is optimized for INSERT
s and batch rewrites over streaming updates. This means that CockroachDB changefeeds are unable to send UPDATE
s and DELETE
s to Snowflake. If this is necessary, additional setup (not covered in this tutorial) can allow entire tables to be replaced in batch.
Before you begin
Before you begin, make sure you have:
- Admin access to a CockroachCloud account
Write access to an AWS S3 bucket
Note:This tutorial uses AWS S3 for cloud storage, but Snowflake also supports Azure. Snowflake does not support GCS yet.
Read and write access to a Snowflake cluster
Step 1. Create a cluster
If you have not done so already, create a cluster.
Step 2. Configure your cluster
Connect to the built-in SQL shell as a user with Admin privileges, replacing the placeholders in the client connection string with the correct username, password, and path to the
ca.cert
:$ cockroach sql \ --url='postgres://<username>:<password>@<global host>:26257?sslmode=verify-full&sslrootcert=certs/ca.crt'
Note:If you haven't connected to your CockroachCloud cluster before, see Connect to your CockroachCloud Cluster for information on how to initially connect.
Enable rangefeeds:
> SET CLUSTER SETTING kv.rangefeed.enabled = true;
SET CLUSTER SETTING
Step 3. Create a database
In the built-in SQL shell, create a database called
cdc_test
:> CREATE DATABASE cdc_test;
CREATE DATABASE
Set it as the default:
> SET DATABASE = cdc_test;
SET
Step 4. Create tables
Before you can start a changefeed, you need to create at least one table for the changefeed to target. The targeted table's rows are referred to as the "watched rows".
Let's create a table called order_alerts
to target:
> CREATE TABLE order_alerts (
id INT PRIMARY KEY,
name STRING
);
CREATE TABLE
Step 5. Create an S3 bucket in the AWS Console
Every change to a watched row is emitted as a record in a configurable format (i.e., JSON
for cloud storage sinks). To configure an AWS S3 bucket as the cloud storage sink:
Log in to your AWS S3 Console.
Create an S3 bucket, called
changefeed-example
, where streaming updates from the watched tables will be collected.The name of the S3 bucket is needed when you create your changefeed. Be sure to have a set of IAM credentials with write access on the S3 bucket that will be used during changefeed setup.
Step 6. Create an enterprise changefeed
Back in the built-in SQL shell, create an enterprise changefeed:
> CREATE CHANGEFEED FOR TABLE order_alerts
INTO 'experimental-s3://changefeed-example?AWS_ACCESS_KEY_ID=<KEY>&AWS_SECRET_ACCESS_KEY=<SECRET_KEY>'
WITH
updated,
resolved='10s';
job_id
+--------------------+
000000000000000000
(1 row)
Be sure to replace the placeholders with your AWS key ID and AWS secret key.
If your changefeed is running but data is not displaying in your S3 bucket, you might have to debug your changefeed.
Step 7. Insert data into the tables
In the built-in SQL shell, insert data into the
order_alerts
table that the changefeed is targeting:> INSERT INTO order_alerts VALUES (1, 'Order received'), (2, 'Order processed');
INSERT 2
Navigate back to the S3 bucket to confirm that the data is now streaming to the bucket. A new directory should display on the Overview tab.
Note:If your changefeed is running but data is not displaying in your S3 bucket, you might have to debug your changefeed.
Step 8. Configure Snowflake
Log in to Snowflake as a user with read and write access to a cluster.
Navigate to the Worksheet view.
Create a table to store the data to be ingested:
> CREATE TABLE order_alerts ( changefeed_record VARIANT );
This will store all of the data in a single
VARIANT
column as JSON. You can then access this field with valid JSON and query the column as if it were a table.Run the statement.
In the Worksheet, create a stage called
cdc-stage
, which tells Snowflake where your data files reside in S3:> CREATE STAGE cdc_stage url='s3://changefeed-example/' credentials=(aws_key_id='<KEY>' aws_secret_key='<SECRET_KEY>') file_format = (type = json);
Be sure to replace the placeholders with your AWS key ID and AWS secret key.
In the Worksheet, create a snowpipe called
cdc-pipe
, which tells Snowflake to auto-ingest data:> CREATE PIPE cdc_pipe auto_ingest = TRUE as COPY INTO order_alerts FROM @cdc_stage;
Note:Currently, auto-ingest in Snowflake only works with AWS and Azure. Snowflake does not support GCS yet.
In the Worksheet, view the snowpipe:
> SHOW PIPES;
Copy the ARN of the SQS queue for your stage (displays in the notification_channel column). You will use this information to configure the S3 bucket.
Step 9. Configure the S3 bucket
Configure an event notification for the S3 bucket. Use the following parameters:
- Name: Name of the event notification (e.g., Auto-ingest Snowflake).
- Events: Select the All object create events.
- Send to: Select SQS Queue from the drop-down.
- SQS: Select Add SQS queue ARN from the drop-down.
- SQS queue ARN: Paste the SQS queue name from the
SHOW PIPES
output (from Step 8).
Navigate back to Snowflake.
Ingest the data from your stage:
> ALTER PIPE cdc_pipe refresh;
To view the data Snowflake, query the
order_alerts
table:> SELECT * FROM order_alerts;
The ingested rows will display in the Results panel. It may take a few minutes for the data to load into the Snowflake cluster.
Your changefeed is now streaming to Snowflake.
Known limitations
- Snowflake cannot filter streaming updates by table. Because of this, we recommend creating a changefeed that watches only one table.
- Snowpipe is unaware of CockroachDB resolved timestamps. This means CockroachDB transactions will not be loaded atomically and partial transactions can briefly be returned from Snowflake.
- Auto-ingest in Snowflake only works with AWS and Azure. Snowflake does not support GCS yet.
General change data capture known limitations
- Changefeeds only work on tables with a single column family (which is the default for new tables).
- Changefeeds do not share internal buffers, so each running changefeed will increase total memory usage. To watch multiple tables, we recommend creating a changefeed with a comma-separated list of tables.
- Many DDL queries (including
TRUNCATE
andDROP TABLE
) will cause errors on a changefeed watching the affected tables. You will need to start a new changefeed. - Changefeeds cannot be backed up or restored.
- Partial or intermittent sink unavailability may impact changefeed stability; however, ordering guarantees will still hold for as long as a changefeed remains active.
- Changefeeds cannot be altered. To alter, cancel the changefeed and create a new one with updated settings from where it left off.
- Additional target options will be added, including partitions and ranges of primary key rows.
- Changefeeds do not pick up data ingested with the
IMPORT INTO
statement. - Using a cloud storage sink only works with
JSON
and emits newline-delimited json files.