drrtuy`s board

ноя 16, 2018

Getting data from Postgres into Columnstore in realtime with Debezium and Kafka

There are different workload types in database world that usually couldn't be handled by one DBMS. Let's say you have OLTP processed by a number of Postgres installations working in a sharded setup. And if you often need to run analitycal query using data from all the shards then you don't have lots of choices. One of them is to upload the data into analytical DBMS, e.g. Columnstore that handles analytical workload the best.

This how-to is based on my recent speech @HighLoad 2018++ conference about Change Data Capture and databases. I will show you how to setup data changes streaming from Postgres into Columnstore in almost no time. Please note that you must have docker and docker-compose installed at physical or virtual machine that has at least 16 gigs of RAM.

The goal

The goal I am pursuing here is a simple one. I want that every data change made on a table in Postgres 10 eventually gets into a corresponding table that lives in MariaDB Columnstore 1.1.6-1.

The way

Schema overview

The setup uses Kafka event queue as an itermediate storage for change events. So our first subtask is how to get data from PG into Kafka? To do so I will use protobuf plugin[1] for PG that extracts logical changes from the WAL and Debezium plugin[2] for Kafka Connect that streams changes from protobuf into a Kafka topic. The second subtask I have is how to get data from Kafka into Columnstore in a most efficient way? Here comes Kafka Avro Data Adapter[3] into the game. I will use it as a Kafka consumer that stores events into Columnstore as records. This DA makes it in the most efficient way using Columnstore Bulk Write API[4]. There are two notes on the schema:

  • numbers on a schema that orders of steps the data change takes to get from from PG into Columnstore
  • Confluent Schema Registry[5] is an auxilary but very important service that isn't set directly on the data path but is actively used by Kafka Connect and later Data Adapter. Without it I would have to deserialise data changes from JSON document that follows Postgres schema and changes accordingly.

The players

There is a pair ofinteresting OpenSource projects I use in this setup. - MariaDB Columstore is analytical engine for MariaDB[6] - RedHat Debezium is an OpenSource Change Data Capture framework[7] that is based on Confluent Bottledwater ideas[8].

Run Forest, run

To speed up the setup I will use docker-compose together with a number of usefull shell scrits from my repository. Get into the repo after you clone it:

[drrtuy@intmacsta git]$ git clone https://github.com/drrtuy/hl2018-cdc.git
Клонирование в «hl2018-cdc»…
remote: Enumerating objects: 38, done.
remote: Counting objects: 100% (38/38), done.
remote: Compressing objects: 100% (27/27), done.
remote: Total 38 (delta 9), reused 35 (delta 9), pack-reused 0
Распаковка объектов: 100% (38/38), готово.
[drrtuy@intmacsta git]$ cd ./hl2018-cdc/
[drrtuy@intmacsta hl2018-cdc]$

The starting order is important so you should start one container at a time with 5 seconds pauses. Let's begin with ZK that is needed by Kafka for a different purposes.

[drrtuy@intmacsta hl2018-cdc]$ sudo docker-compose up -d zk
Creating volume "hl2018-cdc_mysql" with default driver
Creating volume "hl2018-cdc_kafka-data" with default driver
Creating volume "hl2018-cdc_kafka-config" with default driver
Creating volume "hl2018-cdc_zk" with default driver
Creating volume "hl2018-cdc_mcs" with default driver
Creating hl2018-cdc_zk_1 ... done

Let's check ZooKeeper logs after 5 seconds to miss no error. I will take only a snippet from the actual output because it takes a lot of space. The log output on your side could be different but there must be no errors in it.

[drrtuy@intmacsta hl2018-cdc]$ sudo docker-compose logs zk
Attaching to hl2018-cdc_zk_1
zk_1       | Starting up in standalone mode
zk_1       | ZooKeeper JMX enabled by default
zk_1       | Using config: /zookeeper/conf/zoo.cfg
zk_1       | 2018-11-22 12:36:54,549 - INFO  [main:QuorumPeerConfig@134] - Reading configuration 
...
zk_1       | 2018-11-22 12:36:54,586 - INFO  [main:NIOServerCnxnFactory@89] - binding to port 0.0.0.0/0.0.0.0:2181

Next one to start is Postgres 10. The log output is huge and not very important so I also skip most of its content.

[drrtuy@intmacsta hl2018-cdc]$ sudo docker-compose up -d pg
Creating hl2018-cdc_pg_1 ... done
[drrtuy@intmacsta hl2018-cdc]$ sudo docker-compose logs pg
Attaching to hl2018-cdc_pg_1
pg_1       | The files belonging to this database system will be owned by user "postgres".
pg_1       | This user must also own the server process.
...
pg_1       | 2018-11-22 12:46:10.506 GMT [1] LOG:  listening on IPv4 address "0.0.0.0", port 5432
pg_1       | 2018-11-22 12:46:10.506 GMT [1] LOG:  listening on IPv6 address "::", port 5432
pg_1       | 2018-11-22 12:46:10.523 GMT [1] LOG:  listening on Unix socket "/var/run/postgresql/.s.PGSQL.5432"
pg_1       | 2018-11-22 12:46:10.556 GMT [59] LOG:  database system was shut down at 2018-11-22 12:46:10 GMT
pg_1       | 2018-11-22 12:46:10.577 GMT [1] LOG:  database system is ready to accept connections

The allmighty Kafka goes next. The logs are even longer then before so I cut it without regret. But pay attention to any ERROR that could happen when Kafka starts. The setup is a many-headed beast so the sooner you catch errors the best because it saves your time from searching the misconfiguration root cause later.

[drrtuy@intmacsta hl2018-cdc]$ sudo docker-compose up -d kafka
hl2018-cdc_zk_1 is up-to-date
Creating hl2018-cdc_kafka_1 ... done
[drrtuy@intmacsta hl2018-cdc]$ sudo docker-compose logs kafka
Attaching to hl2018-cdc_kafka_1
kafka_1    | WARNING: Using default BROKER_ID=1, which is valid only for non-clustered installations.
kafka_1    | Using ZOOKEEPER_CONNECT=zk:2181
kafka_1    | Using KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.24.0.4:9092
...
kafka_1    | 2018-11-22 12:48:02,392 - INFO  [main:Logging$class@66] - [TransactionCoordinator id=1] Starting up.
kafka_1    | 2018-11-22 12:48:02,394 - INFO  [TxnMarkerSenderThread-1:Logging$class@66] - [Transaction Marker Channel Manager 1]: Starting
kafka_1    | 2018-11-22 12:48:02,394 - INFO  [main:Logging$class@66] - [TransactionCoordinator id=1] Startup complete.
kafka_1    | 2018-11-22 12:48:02,419 - INFO  [/config/changes-event-process-thread:Logging$class@66] - [/config/changes-event-process-thread]: Starting
kafka_1    | 2018-11-22 12:48:02,428 - INFO  [main:AppInfoParser$AppInfo@109] - Kafka version : 1.1.0
kafka_1    | 2018-11-22 12:48:02,429 - INFO  [main:AppInfoParser$AppInfo@110] - Kafka commitId : fdcf75ea326b8e07
kafka_1    | 2018-11-22 12:48:02,432 - INFO  [main:Logging$class@66] - [KafkaServer id=1] started

Our next container runs Kafka Connect Framework that extracts CDC-records from the source Postgres. Connect is the absolute winner of a number of useless log records.

[drrtuy@intmacsta hl2018-cdc]$ sudo docker-compose logs connect
Attaching to hl2018-cdc_connect_1
connect_1  | Plugins are loaded from /kafka/connect
connect_1  | Using the following environment variables:
connect_1  |       GROUP_ID=1
connect_1  |       CONFIG_STORAGE_TOPIC=my_connect_configs
connect_1  |       OFFSET_STORAGE_TOPIC=my_connect_offsets
connect_1  |       BOOTSTRAP_SERVERS=kafka:9092
connect_1  |       REST_HOST_NAME=172.24.0.5
connect_1  |       REST_PORT=8083
connect_1  |       ADVERTISED_HOST_NAME=172.24.0.5
...
connect_1  | 2018-11-26 18:41:27,655 - INFO  [DistributedHerder:DistributedHerder@842] - Starting connectors and tasks using config offset -1
connect_1  | 2018-11-26 18:41:27,655 - INFO  [DistributedHerder:DistributedHerder@852] - Finished starting connectors and tasks

We will run Confluent Schema Registry and Columnsture in one go:

[drrtuy@intmacsta hl2018-cdc]$ sudo docker-compose up -d schema mcs
hl2018-cdc_zk_1 is up-to-date
hl2018-cdc_kafka_1 is up-to-date
Starting hl2018-cdc_schema_1 ... done
Creating hl2018-cdc_mcs_1    ... done

I still have Kafka Avro Data Adapter to run but I'll do it later.

Postgres side - the source

I will make a simple table to experiment with and call it int_table. You should note that the table must have a primary key.

[drrtuy@intmacsta hl2018-cdc]$ sudo docker-compose exec pg bash
root@404536a51cf3:/# 
root@404536a51cf3:/# psql -U postgres
psql (10.0)
Type "help" for help.
postgres=# create table int_table(i bigint primary key, fl float, t text);
CREATE TABLE

Columnstore side - the destination

Now I create the similar int_table in Columnstore to capture the change events. For the sake of the how-to the change events will contain a number of service fields so the int_table schema for Columnstore differs from the PG`s one.

[drrtuy@intmacsta hl2018-cdc]$ sudo docker-compose exec mcs bash
[root@a4edfdce9fca /]# mcsmysql test
Welcome to the MariaDB monitor.  Commands end with ; or \g.
Your MariaDB connection id is 11
Server version: 10.2.17-MariaDB-log Columnstore 1.1.6-1

Copyright (c) 2000, 2018, Oracle, MariaDB Corporation Ab and others.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

MariaDB [test]> create table test.int_table( i bigint, fl double, t text, `before` text, after text, source text, op char(1), ts_ms bigint) engine=columnstore;
Query OK, 0 rows affected (1.05 sec)

It could take a while to start Columnstore so if you have problems running mcsmysql just relax for a couple of minuteswhile it starts.

Running MariaDB Kafka Avro Data adapter

In the end I will run Kafka Avro Data Adapter to suck events from the Kafka topic and put them into Columnstore converting types on the fly.

[drrtuy@intmacsta hl2018-cdc]$ sudo docker-compose up -d da
Creating hl2018-cdc_da_1 ... done

ATM the daemon puts errors into container's stdout so I would use docker-compose logs for debug. The output isn't very consice but it shows Avro types used for Kafka events. This Data Adapter uses MCS Bulk Write API and Schema Registry under the hood but the hood is big and these components went unnoticed. It`s worth to note this is a customized installation of a git repository code for Kafka Avro Data Adapter. And if you want to play more with it you could comment out the entrypoint for the container and run it manually.

Connect configuration

So now we have a pack of services to configure. Let's start with a first subtask from "The way" section and setup CDC from Postgres into Kafka. At this point I add Debezium Postgres plugin for Kafka Connect as a source connector and call it int_table. Debezium plugin connects with the protobuf plugin installed at Postgres. The plugin creates a topic in Kafka. It also saves Avro schemas in Schema Registry but I will describe the details later. The file pg_int_table.curl contains a number of settings that are described here[10] in more details. With my second command I check for a list of available connectors.

[drrtuy@intmacsta hl2018-cdc]$ ./pg.add_connector.sh pg_int_table.curl 
[drrtuy@intmacsta hl2018-cdc]$ ./list_connectors.sh 
[
    "int_table"
]
[drrtuy@intmacsta hl2018-cdc]$

Whether your output is the same or differs it's time to check logs of the Connect service.

[drrtuy@intmacsta hl2018-cdc]$ sudo docker-compose logs connect
Attaching to hl2018-cdc_connect_1
...
connect_1  | 2018-11-26 20:16:25,649 - INFO  [pool-6-thread-2:BaseSourceTask@40] - Starting PostgresConnectorTask with configuration:
connect_1  | 2018-11-26 20:16:25,649 - INFO  [pool-6-thread-2:BaseSourceTask@42] -    connector.class = io.debezium.connector.postgresql.PostgresConnector
...
connect_1  | 2018-11-27 17:19:20,215 - INFO  [pool-6-thread-1:PostgresSchema@97] - REPLICA IDENTITY for 'public.int_table' is 'DEFAULT'; UPDATE and DELETE events will contain previous values only for PK columns
connect_1  | 2018-11-27 17:19:20,224 - INFO  [pool-6-thread-1:Threads$2@247] - Creating thread debezium-postgresconnector-int_table-records-stream-producer

Here I see that everything is fine and Debezium plugin will stream changes from the int_table into the Kafka topic created for the purpose.

Running MariaDB Kafka Avro Data adapter

In the end I will run Kafka Avro Data Adapter to suck events from the Kafka topic and put them into Columnstore converting types. To do so I simply create the container.

[drrtuy@intmacsta hl2018-cdc]$ sudo docker-compose up -d da
Creating hl2018-cdc_da_1 ... done

ATM the daemon puts errors into container's stdout so I would use docker-compose logs for a debug or monitoring. This Data Adapter[3] uses MCS Bulk Write API and Schema Registry under the hood but the hood is big and these components went unnoticed. It`s worth to note that is customized installation of a git repository code. And if you want to play more with it you could comment out the entrypoint for the da container.

On Confluent Schema Registry

The Registry[5] is used by a number of components in this setup and make DevOps'es lifes easier. At the start Connect Debezium plugin connector saves two Avro schemas in the Registry: - table's primary key schema - table's value that is an Avro representation for all table's columns.

Data adapter also uses the Registry to obtain an actual Avro schema for the table's value. Using the schema DA deserializes Kafka events into Columnstore RECORDS.

Stream INSERT changes

Now I run a number of INSERTs in Postgres and look at the changes streamed into the Columnstore.

[drrtuy@intmacsta hl2018-cdc]$ sudo docker-compose exec pg bash
root@404536a51cf3:/# psql -U postgres
psql (10.0)
Type "help" for help.

postgres=# insert into int_table values (25, 0.05, 'test1');
INSERT 0 1
postgres=# insert into int_table values (42, 0.15, 'test2');
INSERT 0 1
postgres=# insert into int_table values (67, 1.25, 'test3');
INSERT 0 1

And in the Columnstore after that:

[drrtuy@intmacsta hl2018-cdc]$ sudo docker-compose exec mcs bash
[root@a4edfdce9fca /]# mcsmysql test
Welcome to the MariaDB monitor.  Commands end with ; or \g.
Your MariaDB connection id is 11
Server version: 10.2.17-MariaDB-log Columnstore 1.1.6-1

Copyright (c) 2000, 2018, Oracle, MariaDB Corporation Ab and others.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

MariaDB [test]> select * from int_table;
+------+------+--------+--------+-----------------------------------------------------------------------------+--------+------+---------------+
| i    | fl   | t      | before | after                                                                       | source | op   | ts_ms         |
+------+------+--------+--------+-----------------------------------------------------------------------------+--------+------+---------------+
|   25 | 0.05 | test1  | NULL   | {"i": 25, "fl": {"double": 0.050000000000000003}, "t": {"string": "test1"}} | NULL   | c    | 1543339686004 |
|   42 | 0.15 | test2  | NULL   | {"i": 42, "fl": {"double": 0.14999999999999999}, "t": {"string": "test2"}}  | NULL   | c    | 1543339691291 |
|   67 | 1.25 | test3  | NULL   | {"i": 67, "fl": {"double": 1.25}, "t": {"string": "test3"}}                 | NULL   | c    | 1543339705614 |
+------+------+--------+--------+-----------------------------------------------------------------------------+--------+------+---------------+
3 rows in set (0.08 sec)

Stream UPDATE changes

Here is the UPDATE statement I will play with at the Postgres side. It touches 2 RECORDS and I will get the same number of RECORDS in Columnstore.

postgres=# update  int_table set t = 'updated' where i >= 42;
UPDATE 2

And what do I get at Columnstore side.

MariaDB [test]> select * from int_table;
+------+------+----------+--------+------------------------------------------------------------------------------+--------+------+---------------+
| i    | fl   | t        | before | after                                                                        | source | op   | ts_ms         |
+------+------+----------+--------+------------------------------------------------------------------------------+--------+------+---------------+
|   25 | 0.05 | test1    | NULL   | {"i": 25, "fl": {"double": 0.050000000000000003}, "t": {"string": "test1"}}  | NULL   | c    | 1543339686004 |
|   42 | 0.15 | test2    | NULL   | {"i": 42, "fl": {"double": 0.14999999999999999}, "t": {"string": "test2"}}   | NULL   | c    | 1543339691291 |
|   67 | 1.25 | test3    | NULL   | {"i": 67, "fl": {"double": 1.25}, "t": {"string": "test3"}}                  | NULL   | c    | 1543339705614 |
|   42 | 0.15 | updated  | NULL   | {"i": 42, "fl": {"double": 0.14999999999999999}, "t": {"string": "updated"}} | NULL   | u    | 1543340276854 |
|   67 | 1.25 | updated  | NULL   | {"i": 67, "fl": {"double": 1.25}, "t": {"string": "updated"}}                | NULL   | u    | 1543340276855 |
+------+------+----------+--------+------------------------------------------------------------------------------+--------+------+---------------+
5 rows in set (0.09 sec)

Take a look at the last two RECORDS that have NULLs in the before column. This column could contain a JSON with int_table columns values before the UPDATE similar to what I got in the after. This is the place where REPLICA IDENTITY setting[9] comes into the game. I will set it at Postgres side for int_table and run the UPDATE again.

postgres=# alter table int_table replica identity full;
ALTER TABLE
postgres=# update  int_table set t = 'updated2' where i >= 42;
UPDATE 2

There are pair of new RECORDS in Columnstore`s int_table alter ego.

MariaDB [test]> select * from int_table;                                                                                                                       [3/106]
+------+------+-----------+------------------------------------------------------------------------------+------------------------------------------------------------
-------------------+--------+------+---------------+                                                                                                                  
| i    | fl   | t         | before                                                                       | after                                                                         | source | op   | ts_ms         |
+------+------+-----------+------------------------------------------------------------------------------+-----------------------------------------------------------$-------------------+--------+------+---------------+
...
|   42 | 0.15 | updated2  | {"i": 42, "fl": {"double": 0.14999999999999999}, "t": {"string": "updated"}} | {"i": 42, "fl": {"double": 0.14999999999999999}, "t": {"st$ing": "updated2"}} | NULL   | u    | 1543340725509 |
|   67 | 1.25 | updated2  | {"i": 67, "fl": {"double": 1.25}, "t": {"string": "updated"}}                | {"i": 67, "fl": {"double": 1.25}, "t": {"string": "updated$"}}                | NULL   | u    | 1543340725510 |
+------+------+-----------+------------------------------------------------------------------------------+-------------------------------------------------------------------------------+--------+------+---------------+
7 rows in set (0.08 sec) 

Changing REPLICA IDENTITY to full has a number of drawbacks though. But would I need previous values for all the columns I try this setting.

Summing this all up

Columnstore is good for analitic and heavy report workload when Postgres is good for transactional workload. And if you ever need to stream changes from PG into Columnstore you can the shortcut and adopt the solution with Debezium.

Links

  1. https://github.com/debezium/postgres-decoderbufs
  2. https://debezium.io/docs/connectors/postgresql/
  3. https://github.com/mariadb-corporation/mariadb-columnstore-data-adapters/tree/master/kafka-avro-adapter
  4. https://mariadb.com/kb/en/library/columnstore-bulk-write-sdk/
  5. https://docs.confluent.io/current/schema-registry/docs/index.html
  6. https://mariadb.com/kb/en/library/about-mariadb-columnstore/
  7. https://debezium.io/
  8. https://www.confluent.io/blog/bottled-water-real-time-integration-of-postgresql-and-kafka/
  9. https://debezium.io/docs/connectors/postgresql/#replica-identity
  10. https://debezium.io/docs/connectors/postgresql/#connector-properties