drrtuy`s board

июн 09, 2019

MDB ColumnStore and ClickHouse performance comparison

Introduction

ColumnStore and ClickHouse have a lot in common. Both are columnar-oriented analytical workload engines and extensively use MPP concept so they share almost the same niche. That is why I'm always asked at conferences about performance comparison between CS and CH. Until very recently there was not much to read in the wild on this matter except couple of slightly outdated tests[1] [2]. However there was a series of comparisons presented at the last Percona Live 19[3]. But in March 2019 I wasn't aware of those speeches yet so I decided to conduct my own comparison against CH that I personally find the best of a kind b/c this state of art engine exploits lots of concepts that any modern analytical engine needs.

Setup

I wanted a comparison not a stress test so I used my MacBook Pro 2018 with Linux Arch on it as a hardware. On top I put Docker to separate CS and CH environments. CS was a custom 1.2 release build for U18[15] that included MCOL-498 feature[4]. CH was a release build of 9158e0 commit. Configuration-wise I increased DBBC.NumBlocksPct in CS'es config up to 80%, Lang to UTF-8, DBRoot1.PreallocSpace to OFF. CH was good enough w/o any additional changes in my environment but feel free to add a note if I missed some important setting here. To avoid fancy memory performance degradation I disabled swap before I run the tests.

Dataset

There are number of analytical data sets available in the Net however I appreciate Yandex'es data set the most[5] b/c it comes from Yandex.Metrics that is the largest web-analytics in Russia. There was a remarkable speech[6] given by Alexey Milovidov about the origins of this data set. I decided to use only one table of the pair, namely hits because of the queries I used for the comparison. I also had to change the structure of the hits table to avoid array data type that CS doesn't support now. Here are the DDL statements for CS[7] and CH[8]. When I created the table I ingested the final dataset[9] into hits. It is worth to note that CH specific ddl file[8] contains both CREATE TABLE and INSERT statements that could be used to extract columns from the original hits table.

Data ingestion

When it comes down to data ingestion cpimport is the tool to use in CS. I uploaded data using 8 threads set both writers and readers to 8 and allocated 10MB for the buffer. Here is the command for data ingestion in CS.

cpimport -r8 -w8 -E'"' -c10000000   -e20000 -s',' test hits ./hits_partial_col_set.csv

To parellize data ingestion in CH I have to split data files. Playing with parallelize factor I came up with the number 4 that happened to be the best in my case. And here are the commands that I used to ingest data into CH.

split -n l/4
for i in $(echo 'abcd' | fold -w1); do echo $ hits_partial_col_set_a$i; time clickhouse-client --query "INSERT INTO test.h1 FORMAT CSV" --max_insert_block_size=100000 < hits_partial_col_set_a$i & done

CS spent 100 seconds to ingest the data set into CS and it took 1.9GB of disk space whilst CH spent 125 seconds and the table took 2.5GB of disk space.

Queries

I used the queries from CH'es benchmark page[10] but I changed them a bit b/c of the differences b/w CS and CH:

  • uniqExact() was used instead of uniq() for CH
  • all queries with ORDER BY/LIMIT is wraped in subqueries for CS
  • replaced time period lower and upper limits with a new ones that for both CS and CH
  • replaced CounterID for both CS and CH

Here are the lists for CS[11] and for CH[12] I got in the end.

Methodology

I run every query two times first using cold cache and then using a prewarmed cache. To clean caches I restarted CS and clean OS caches for CH b/c latter utilizes Page Cache extensively while the former uses its own block cache. Here[13] is the scenario log for CS and here[14] for CH.

Results

# Query in CS Query in CH ColumnStore ClickHouse
1
SELECT count(*) FROM test.hits;
SELECT count() FROM test.hits;
(0.111 s.) (0.080 s.) (0.075 s.) (0.008 s.)
2
SELECT count(*) FROM test.hits WHERE AdvEngineID != 0;
SELECT count() FROM test.hits WHERE AdvEngineID != 0;
(0.146 s.) (0.081 s.) (0.151 s.) (0.003 s.)
3
SELECT sum(AdvEngineID), count(*), avg(ResolutionWidth) FROM test.hits;
SELECT sum(AdvEngineID), count(), avg(ResolutionWidth) FROM test.hits;
(0.330 s.) (0.264 s.) (0.171 s.) (0.024 s.)
4
SELECT sum(UserID) FROM test.hits;
SELECT sum(UserID) FROM test.hits;
(0.240 s.) (0.178 s.) (0.067 s.) (0.016 s.)
5
SELECT distinct UserID FROM test.hits;
SELECT uniqExact(UserID) FROM test.hits;
(0.353 s.) (0.297 s.) (0.122 s.) (0.035 s.)
6
SELECT distinct SearchPhrase FROM test.hits;
SELECT uniqExact(SearchPhrase) FROM test.hits;
(0.844 s.) (0.773 s.) (0.275 s.) (0.111 s.)
7
SELECT min(EventDate), max(EventDate) FROM test.hits;
SELECT min(EventDate), max(EventDate) FROM test.hits;
(0.279 s.) (0.258 s.) (0.171 s.) (0.015 s.)
8
SELECT * FROM (SELECT AdvEngineID, count(*) FROM test.hits WHERE AdvEngineID != 0 GROUP BY AdvEngineID ORDER BY count(*) DESC) a;
SELECT AdvEngineID, count() FROM test.hits WHERE AdvEngineID != 0 GROUP BY AdvEngineID ORDER BY count() DESC;
(0.136 s.) (0.078 s.) (0.170 s.) (0.007 s.)
9
SELECT * FROM ( SELECT distinct UserID as u, regionid FROM test.hits GROUP BY regionid ORDER BY u DESC LIMIT 10 )a ;
SELECT RegionID, uniqExact(UserID) AS u FROM test.hits GROUP BY RegionID ORDER BY u DESC LIMIT 10;
(0.370 s.) (0.272 s.) (0.262 s.) (0.059 s.)
10
SELECT * FROM (SELECT RegionID, sum(AdvEngineID), count(*) AS c, avg(ResolutionWidth), count(distinct UserID) FROM test.hits GROUP BY RegionID ORDER BY c DESC LIMIT 10) a;
SELECT RegionID, sum(AdvEngineID), count() AS c, avg(ResolutionWidth), uniqExact(UserID) FROM test.hits GROUP BY RegionID ORDER BY c DESC LIMIT 10;
(0.628 s.) (0.518 s.) (0.230 s.) (0.070 s.)
11
SELECT * FROM (SELECT distinct UserID AS u, MobilePhoneModel FROM test.hits WHERE MobilePhoneModel != '' GROUP BY MobilePhoneModel ORDER BY u DESC LIMIT 10) a;
SELECT MobilePhoneModel, uniqExact(UserID) AS u FROM test.hits WHERE MobilePhoneModel != '' GROUP BY MobilePhoneModel ORDER BY u DESC LIMIT 10;
(0.539 s.) (0.475 s.) (0.213 s.) (0.040 s.)
12
SELECT * FROM (SELECT distinct UserID AS u, MobilePhone, MobilePhoneModel FROM test.hits WHERE MobilePhoneModel != '' GROUP BY MobilePhone, MobilePhoneModel ORDER BY u DESC LIMIT 10) a;
SELECT MobilePhone, MobilePhoneModel, uniqExact(UserID) AS u FROM test.hits WHERE MobilePhoneModel != '' GROUP BY MobilePhone, MobilePhoneModel ORDER BY u DESC LIMIT 10;
(0.561 s.) (0.443 s.) (0.215 s.) (0.047 s.)
13
SELECT * FROM (SELECT SearchPhrase, count(*) AS c FROM test.hits WHERE SearchPhrase != '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10) a;
SELECT SearchPhrase, count() AS c FROM test.hits WHERE SearchPhrase != '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10;
(0.539 s.) (0.476 s.) (0.171 s.) (0.094 s.)
14
SELECT * FROM (SELECT distinct UserID AS u, SearchPhrase FROM test.hits WHERE SearchPhrase != '' GROUP BY SearchPhrase ORDER BY u DESC LIMIT 10) a;
SELECT SearchPhrase, uniqExact(UserID) AS u FROM test.hits WHERE SearchPhrase != '' GROUP BY SearchPhrase ORDER BY u DESC LIMIT 10;
(0.627 s.) (0.522 s.) (0.252 s.) (0.077 s.)
15
SELECT * FROM (SELECT SearchEngineID, SearchPhrase, count(*) AS c FROM test.hits WHERE SearchPhrase != '' GROUP BY SearchEngineID, SearchPhrase ORDER BY c DESC LIMIT 10) a;
SELECT SearchEngineID, SearchPhrase, count() AS c FROM test.hits WHERE SearchPhrase != '' GROUP BY SearchEngineID, SearchPhrase ORDER BY c DESC LIMIT 10;
(0.644 s.) (0.474 s.) (0.183 s.) (0.107 s.)
16
SELECT * FROM(SELECT UserID, count(*) FROM test.hits GROUP BY UserID ORDER BY count(*) DESC LIMIT 10) a;
SELECT UserID, count() FROM test.hits GROUP BY UserID ORDER BY count() DESC LIMIT 10;
(0.312 s.) (0.258 s.) (0.194 s.) (0.050 s.)
17
SELECT * FROM(SELECT UserID, SearchPhrase, count(*) FROM test.hits GROUP BY UserID, SearchPhrase ORDER BY count(*) DESC LIMIT 10) a;
SELECT UserID, SearchPhrase, count() FROM test.hits GROUP BY UserID, SearchPhrase ORDER BY count() DESC LIMIT 10;
(0.947 s.) (0.830 s.) (0.316 s.) (0.163 s.)
18
SELECT * FROM(SELECT UserID, SearchPhrase, count(*) FROM test.hits GROUP BY UserID, SearchPhrase LIMIT 10) a;
SELECT UserID, SearchPhrase, count() FROM test.hits GROUP BY UserID, SearchPhrase LIMIT 10;
(0.915 s.) (0.783 s.) (0.322 s.) (0.158 s.)
19
SELECT * FROM(SELECT UserID, extract( minute from EventTime) AS m, SearchPhrase, count(*) FROM test.hits GROUP BY UserID, m, SearchPhrase ORDER BY count(*) DESC LIMIT 10) a;
SELECT UserID, toMinute(EventTime) AS m, SearchPhrase, count() FROM test.hits GROUP BY UserID, m, SearchPhrase ORDER BY count() DESC LIMIT 10;
(2.383 s.) (2.057 s.) (0.673 s.) (0.460 s.)
20
SELECT UserID FROM test.hits WHERE UserID = 1711847583975330569;
SELECT UserID FROM test.hits WHERE UserID = 12345678901234567890;
(0.151 s.) (0.081 s.) (0.054 s.) (0.012 s.)
21
SELECT count(*) FROM test.hits WHERE URL LIKE '%metrika%';
SELECT count() FROM test.hits WHERE URL LIKE '%metrika%';
(3.717 s.) (3.717 s.) (0.262 s.) (0.215 s.)
22
SELECT * FROM(SELECT SearchPhrase, min(URL), count(*) AS c FROM test.hits WHERE URL LIKE '%metrika%' AND SearchPhrase != '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10) a;
SELECT SearchPhrase, any(URL), count() AS c FROM test.hits WHERE URL LIKE '%metrika%' AND SearchPhrase != '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10;
(1.093 s.) (0.629 s.) (0.405 s.) (0.195 s.)
23
SELECT * FROM(SELECT min(UserID),SearchPhrase, min(URL), min(Title), count(*) AS c FROM test.hits WHERE Title LIKE '%Яндекс%' AND URL NOT LIKE '%.yandex.%' AND SearchPhrase != '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10) a;
SELECT SearchPhrase, any(URL), any(Title), count() AS c, uniqExact(UserID) FROM test.hits WHERE Title LIKE '%Яндекс%' AND URL NOT LIKE '%.yandex.%' AND SearchPhrase != '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10;
(2.339 s.) (1.415 s.) (0.647 s.) (0.436 s.)
24
SELECT * FROM(SELECT * FROM test.hits WHERE URL LIKE '%metrika%' ORDER BY EventTime LIMIT 10) a;
SELECT * FROM test.hits WHERE URL LIKE '%metrika%' ORDER BY EventTime LIMIT 10;
(7.356 s.) (3.491 s.) (1.464 s.) (0.954 s.)
25
SELECT * FROM(SELECT SearchPhrase FROM test.hits WHERE SearchPhrase != '' ORDER BY EventTime LIMIT 10) a;
SELECT SearchPhrase FROM test.hits WHERE SearchPhrase != '' ORDER BY EventTime LIMIT 10;
(0.548 s.) (0.419 s.) (0.213 s.) (0.042 s.)
26
SELECT * FROM(SELECT SearchPhrase FROM test.hits WHERE SearchPhrase != '' ORDER BY SearchPhrase LIMIT 10) a;
SELECT SearchPhrase FROM test.hits WHERE SearchPhrase != '' ORDER BY SearchPhrase LIMIT 10;
(0.484 s.) (0.400 s.) (0.198 s.) (0.032 s.)
27
SELECT * FROM(SELECT SearchPhrase FROM test.hits WHERE SearchPhrase != '' ORDER BY EventTime, SearchPhrase LIMIT 10) a;
SELECT SearchPhrase FROM test.hits WHERE SearchPhrase != '' ORDER BY EventTime, SearchPhrase LIMIT 10;
(0.548 s.) (0.433 s.) (0.195 s.) (0.045 s.)
28
SELECT * FROM(SELECT CounterID, avg(length(URL)) AS l, count(*) AS c FROM test.hits WHERE URL != '' GROUP BY CounterID HAVING c > 100000 ORDER BY l DESC LIMIT 25) a;
SELECT CounterID, avg(length(URL)) AS l, count() AS c FROM test.hits WHERE URL != '' GROUP BY CounterID HAVING c > 100000 ORDER BY l DESC LIMIT 25;
(2.415 s.) (2.150 s.) (0.436 s.) (0.208 s.)
29
SELECT * FROM(SELECT Referer, avg(length(Referer)) AS l, count(*) AS c FROM test.hits WHERE Referer != '' GROUP BY 1 HAVING c > 100000 ORDER BY l DESC LIMIT 25) a;
SELECT domainWithoutWWW(Referer) AS key, avg(length(Referer)) AS l, count() AS c, any(Referer) FROM test.hits WHERE Referer != '' GROUP BY key HAVING c > 100000 ORDER BY l DESC LIMIT 25;
(3.130 s.) (2.712 s.) (0.600 s.) (0.375 s.)
30
SELECT sum(ResolutionWidth), sum(ResolutionWidth + 1), sum(ResolutionWidth + 2), sum(ResolutionWidth + 3), sum(ResolutionWidth + 4), sum(ResolutionWidth + 5), sum(ResolutionWidth + 6), sum(ResolutionWidth + 7), sum(ResolutionWidth + 8), sum(ResolutionWidth + 9), sum(ResolutionWidth + 10), sum(ResolutionWidth + 11), sum(ResolutionWidth + 12), sum(ResolutionWidth + 13), sum(ResolutionWidth + 14), sum(ResolutionWidth + 15), sum(ResolutionWidth + 16), sum(ResolutionWidth + 17), sum(ResolutionWidth + 18), sum(ResolutionWidth + 19), sum(ResolutionWidth + 20), sum(ResolutionWidth + 21), sum(ResolutionWidth + 22), sum(ResolutionWidth + 23), sum(ResolutionWidth + 24), sum(ResolutionWidth + 25), sum(ResolutionWidth + 26), sum(ResolutionWidth + 27), sum(ResolutionWidth + 28), sum(ResolutionWidth + 29), sum(ResolutionWidth + 30), sum(ResolutionWidth + 31), sum(ResolutionWidth + 32), sum(ResolutionWidth + 33), sum(ResolutionWidth + 34), sum(ResolutionWidth + 35), sum(ResolutionWidth + 36), sum(ResolutionWidth + 37), sum(ResolutionWidth + 38), sum(ResolutionWidth + 39), sum(ResolutionWidth + 40), sum(ResolutionWidth + 41), sum(ResolutionWidth + 42), sum(ResolutionWidth + 43), sum(ResolutionWidth + 44), sum(ResolutionWidth + 45), sum(ResolutionWidth + 46), sum(ResolutionWidth + 47), sum(ResolutionWidth + 48), sum(ResolutionWidth + 49), sum(ResolutionWidth + 50), sum(ResolutionWidth + 51), sum(ResolutionWidth + 52), sum(ResolutionWidth + 53), sum(ResolutionWidth + 54), sum(ResolutionWidth + 55), sum(ResolutionWidth + 56), sum(ResolutionWidth + 57), sum(ResolutionWidth + 58), sum(ResolutionWidth + 59), sum(ResolutionWidth + 60), sum(ResolutionWidth + 61), sum(ResolutionWidth + 62), sum(ResolutionWidth + 63), sum(ResolutionWidth + 64), sum(ResolutionWidth + 65), sum(ResolutionWidth + 66), sum(ResolutionWidth + 67), sum(ResolutionWidth + 68), sum(ResolutionWidth + 69), sum(ResolutionWidth + 70), sum(ResolutionWidth + 71), sum(ResolutionWidth + 72), sum(ResolutionWidth + 73), sum(ResolutionWidth + 74), sum(ResolutionWidth + 75), sum(ResolutionWidth + 76), sum(ResolutionWidth + 77), sum(ResolutionWidth + 78), sum(ResolutionWidth + 79), sum(ResolutionWidth + 80), sum(ResolutionWidth + 81), sum(ResolutionWidth + 82), sum(ResolutionWidth + 83), sum(ResolutionWidth + 84), sum(ResolutionWidth + 85), sum(ResolutionWidth + 86), sum(ResolutionWidth + 87), sum(ResolutionWidth + 88), sum(ResolutionWidth + 89) FROM test.hits;
SELECT sum(ResolutionWidth), sum(ResolutionWidth + 1), sum(ResolutionWidth + 2), sum(ResolutionWidth + 3), sum(ResolutionWidth + 4), sum(ResolutionWidth + 5), sum(ResolutionWidth + 6), sum(ResolutionWidth + 7), sum(ResolutionWidth + 8), sum(ResolutionWidth + 9), sum(ResolutionWidth + 10), sum(ResolutionWidth + 11), sum(ResolutionWidth + 12), sum(ResolutionWidth + 13), sum(ResolutionWidth + 14), sum(ResolutionWidth + 15), sum(ResolutionWidth + 16), sum(ResolutionWidth + 17), sum(ResolutionWidth + 18), sum(ResolutionWidth + 19), sum(ResolutionWidth + 20), sum(ResolutionWidth + 21), sum(ResolutionWidth + 22), sum(ResolutionWidth + 23), sum(ResolutionWidth + 24), sum(ResolutionWidth + 25), sum(ResolutionWidth + 26), sum(ResolutionWidth + 27), sum(ResolutionWidth + 28), sum(ResolutionWidth + 29), sum(ResolutionWidth + 30), sum(ResolutionWidth + 31), sum(ResolutionWidth + 32), sum(ResolutionWidth + 33), sum(ResolutionWidth + 34), sum(ResolutionWidth + 35), sum(ResolutionWidth + 36), sum(ResolutionWidth + 37), sum(ResolutionWidth + 38), sum(ResolutionWidth + 39), sum(ResolutionWidth + 40), sum(ResolutionWidth + 41), sum(ResolutionWidth + 42), sum(ResolutionWidth + 43), sum(ResolutionWidth + 44), sum(ResolutionWidth + 45), sum(ResolutionWidth + 46), sum(ResolutionWidth + 47), sum(ResolutionWidth + 48), sum(ResolutionWidth + 49), sum(ResolutionWidth + 50), sum(ResolutionWidth + 51), sum(ResolutionWidth + 52), sum(ResolutionWidth + 53), sum(ResolutionWidth + 54), sum(ResolutionWidth + 55), sum(ResolutionWidth + 56), sum(ResolutionWidth + 57), sum(ResolutionWidth + 58), sum(ResolutionWidth + 59), sum(ResolutionWidth + 60), sum(ResolutionWidth + 61), sum(ResolutionWidth + 62), sum(ResolutionWidth + 63), sum(ResolutionWidth + 64), sum(ResolutionWidth + 65), sum(ResolutionWidth + 66), sum(ResolutionWidth + 67), sum(ResolutionWidth + 68), sum(ResolutionWidth + 69), sum(ResolutionWidth + 70), sum(ResolutionWidth + 71), sum(ResolutionWidth + 72), sum(ResolutionWidth + 73), sum(ResolutionWidth + 74), sum(ResolutionWidth + 75), sum(ResolutionWidth + 76), sum(ResolutionWidth + 77), sum(ResolutionWidth + 78), sum(ResolutionWidth + 79), sum(ResolutionWidth + 80), sum(ResolutionWidth + 81), sum(ResolutionWidth + 82), sum(ResolutionWidth + 83), sum(ResolutionWidth + 84), sum(ResolutionWidth + 85), sum(ResolutionWidth + 86), sum(ResolutionWidth + 87), sum(ResolutionWidth + 88), sum(ResolutionWidth + 89) FROM test.hits;
(14.983 s.) (15.201 s.) (0.913 s.) (0.759 s.)
31
SELECT * FROM(SELECT SearchEngineID, ClientIP, count(*) AS c, sum(Refresh), avg(ResolutionWidth) FROM test.hits WHERE SearchPhrase != '' GROUP BY SearchEngineID, ClientIP ORDER BY c DESC LIMIT 10) a;
SELECT SearchEngineID, ClientIP, count() AS c, sum(Refresh), avg(ResolutionWidth) FROM test.hits WHERE SearchPhrase != '' GROUP BY SearchEngineID, ClientIP ORDER BY c DESC LIMIT 10;
(0.610 s.) (0.393 s.) (0.250 s.) (0.073 s.)
32
SELECT * FROM(SELECT WatchID, ClientIP, count(*) AS c, sum(Refresh), avg(ResolutionWidth) FROM test.hits WHERE SearchPhrase != '' GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10) a;
SELECT WatchID, ClientIP, count() AS c, sum(Refresh), avg(ResolutionWidth) FROM test.hits WHERE SearchPhrase != '' GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10;
(0.711 s.) (0.511 s.) (0.303 s.) (0.130 s.)
33
SELECT * FROM(SELECT WatchID, ClientIP, count(*) AS c, sum(Refresh), avg(ResolutionWidth) FROM test.hits GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10) a;
SELECT WatchID, ClientIP, count() AS c, sum(Refresh), avg(ResolutionWidth) FROM test.hits GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10;
(3.860 s.) (3.877 s.) (1.413 s.) (1.130 s.)
34
SELECT * FROM(SELECT URL, count(*) AS c FROM test.hits GROUP BY URL ORDER BY c DESC LIMIT 10) a;
SELECT URL, count() AS c FROM test.hits GROUP BY URL ORDER BY c DESC LIMIT 10;
(2.782 s.) (2.447 s.) (0.863 s.) (0.635 s.)
35
SELECT * FROM(SELECT 1, URL, count(*) AS c FROM test.hits GROUP BY 1, URL ORDER BY c DESC LIMIT 10) a;
SELECT 1, URL, count() AS c FROM test.hits GROUP BY 1, URL ORDER BY c DESC LIMIT 10;
(2.745 s.) (2.445 s.) (0.870 s.) (0.621 s.)
36
SELECT * FROM(SELECT ClientIP AS x, ClientIP - 1, ClientIP - 2, ClientIP - 3, count(*) AS c FROM test.hits GROUP BY x, x - 1, x - 2, x - 3 ORDER BY c DESC LIMIT 10) a;
SELECT ClientIP AS x, x - 1, x - 2, x - 3, count() AS c FROM test.hits GROUP BY x, x - 1, x - 2, x - 3 ORDER BY c DESC LIMIT 10;
(0.631 s.) (0.560 s.) (0.332 s.) (0.139 s.)
37
select * from (SELECT URL, count(*) AS PageViews FROM test.hits WHERE CounterID = 63 AND EventDate >= '2014-03-01' AND EventDate <= '2014-03-31' AND DontCountHits =0 AND Refresh =0 AND URL != '' and URL IS NOT NULL GROUP BY URL ORDER BY PageViews DESC LIMIT 10) a;
SELECT URL, count() AS PageViews FROM test.hits WHERE CounterID = 93229 AND EventDate >= toDate('2014-03-17') AND EventDate <= toDate('2014-03-23') AND NOT DontCountHits AND NOT Refresh AND notEmpty(URL) GROUP BY URL ORDER BY PageViews DESC LIMIT 10;
(1.366 s.) (0.852 s.) (0.195 s.) (0.004 s.)
38
SELECT * FROM (SELECT Title, count(*) AS PageViews FROM test.hits WHERE CounterID = 63 AND EventDate >= '2014-03-01' AND EventDate <= '2014-03-31' AND NOT DontCountHits AND NOT Refresh AND Title IS NOT NULL GROUP BY Title ORDER BY PageViews DESC LIMIT 10) a;
SELECT Title, count() AS PageViews FROM test.hits WHERE CounterID = 93229 AND EventDate >= toDate('2014-03-17') AND EventDate <= toDate('2014-03-23') AND NOT DontCountHits AND NOT Refresh AND notEmpty(Title) GROUP BY Title ORDER BY PageViews DESC LIMIT 10;
(0.386 s.) (0.210 s.) (0.091 s.) (0.004 s.)
39
SELECT * FROM (SELECT URL, count(*) AS PageViews FROM test.hits WHERE CounterID = 63 AND EventDate >= '2014-03-01' AND EventDate <= '2014-03-31' AND NOT Refresh AND IsLink AND NOT IsDownload GROUP BY URL ORDER BY PageViews DESC LIMIT 1000) a;
SELECT URL, count() AS PageViews FROM test.hits WHERE CounterID = 93229 AND EventDate >= toDate('2014-03-17') AND EventDate <= toDate('2014-03-23') AND NOT Refresh AND IsLink AND NOT IsDownload GROUP BY URL ORDER BY PageViews DESC LIMIT 1000;
(0.258 s.) (0.138 s.) (0.178 s.) (0.008 s.)
40
SELECT * FROM (SELECT TraficSourceID, SearchEngineID, AdvEngineID, IF((SearchEngineID = 0 AND AdvEngineID = 0),Referer,'') AS Src, URL AS Dst, count(*) AS PageViews FROM test.hits WHERE CounterID = 63 AND EventDate between '2014-03-01' AND '2014-03-31' AND NOT Refresh GROUP BY TraficSourceID, SearchEngineID, AdvEngineID, Src, Dst ORDER BY PageViews DESC LIMIT 1000) a;
SELECT TraficSourceID, SearchEngineID, AdvEngineID, ((SearchEngineID = 0 AND AdvEngineID = 0) ? Referer : '') AS Src, URL AS Dst, count() AS PageViews FROM test.hits WHERE CounterID = 93229 AND EventDate >= toDate('2014-03-17') AND EventDate <= toDate('2014-03-23') AND NOT Refresh GROUP BY TraficSourceID, SearchEngineID, AdvEngineID, Src, Dst ORDER BY PageViews DESC LIMIT 1000;
(0.405 s.) (0.199 s.) (0.200 s.) (0.014 s.)
41
SELECT * FROM (SELECT WindowClientWidth, WindowClientHeight, count(*) AS PageViews FROM test.hits WHERE CounterID = 63 AND EventDate between '2014-03-01' AND '2014-03-31' AND NOT Refresh AND NOT DontCountHits AND URLHash = 8759892953031403647 GROUP BY WindowClientWidth, WindowClientHeight ORDER BY PageViews DESC LIMIT 10000) a;
SELECT WindowClientWidth, WindowClientHeight, count() AS PageViews FROM test.hits WHERE CounterID = 93229 AND EventDate >= toDate('2014-03-17') AND EventDate <= toDate('2014-03-23') AND NOT Refresh AND NOT DontCountHits AND URLHash = halfMD5('http://yandex.ru/') GROUP BY WindowClientWidth, WindowClientHeight ORDER BY PageViews DESC LIMIT 10000;
(0.233 s.) (0.113 s.) (0.190 s.) (0.005 s.)
42
SELECT * FROM (SELECT EventTime AS Minute, count(*) AS PageViews FROM test.hits WHERE CounterID = 63 AND EventDate between '2014-03-01' and '2014-03-31' AND NOT Refresh AND NOT DontCountHits GROUP BY Minute ORDER BY Minute) a;
SELECT toStartOfMinute(EventTime) AS Minute, count() AS PageViews FROM test.hits WHERE CounterID = 93229 AND EventDate >= toDate('2014-03-17') AND EventDate <= toDate('2013-07-02') AND NOT Refresh AND NOT DontCountHits GROUP BY Minute ORDER BY Minute;
(0.301 s.) (0.182 s.) (0.093 s.) (0.008 s.)

Conclusion

As you can see CS is beaten by CH in most cases. The most notable reasons for that in CS are:

  • Lack of parallel ORDER BY facility. But we are working on it and hopefully the project converges 3Q-ish 2019. Here[15] is the issue to follow up
  • Very inefficient variable size strings filtering. We are attacking this from number of sides first introducing segment files support for wide(>8 byte) data type. JFYI CS now uses a Dictionary for storing DT wider then 8bytes and this imposes a number of performance penalties. I will write more on this another day
  • CS lacks NULL bitmaps a lot. To save NULL or empty value we use tokens that are MAX_VALUE-1 and MAX_VALUE-2 of the range available for the data type. To check the column value for NULL CS literally extracts the value and compares it with a magic and that is ugly. We have project to eliminate the inefficiency in CS roadmap though

However CH is a great example of performance-centered approach in making analytical engines. I must confess when it boils down to performance I have CH as a golden exemplar in mind. Let's look into queries where timigs differ the most. I will mostly look into CS problems rather than CH advantages.

  • Query 21. This one does simple scans using regexp. CS utilizes regcomp/regexec pair for doing the search and this approach isn't very good at times. We have a different approach in mind using trigrams + counting Bloom filters to produce a fingerprint of a string.
  • Query 24. This query suffers from inefficient regex handling, outdated algorithms of Dictionary scanning and single threaded sort.
  • Query 28. Strings scanning and single-threaded sort both take enourmous amount of total time.
  • Query 29. Same as previous.
  • Query 30. The most significant difference and the most interesting case. That is mix of reason number 3 from the list above combined with the fact CS uses Vulcano processing approach thus processing the set row by row that is very inefficient in analytical workload world. We have plans how to overcome this limitation with using both vectorization and JIT for the most intensively working loops.

Any comments and/or suggestions are welcome and thanks for reading this. And as a spoiler. I'm going to post an additional comparison concerned with JOIN-s mostly so stay tuned.

Links

  1. https://www.percona.com/blog/2017/03/17/column-store-database-benchmarks-mariadb-columnstore-vs-clickhouse-vs-apache-spark/
  2. https://linuxjedi.co.uk/2017/09/21/correcting-mariadb-columnstore-benchmarks/#more-485
  3. https://www.percona.com/live/19/sessions/opensource-column-store-databases-mariadb-columnstore-vs-clickhouse
  4. https://jira.mariadb.org/browse/MCOL-498
  5. https://github.com/yandex/ClickHouse/blob/master/dbms/tests/instructions/developer_instruction_ru.md#%D1%82%D0%B5%D1%81%D1%82%D0%BE%D0%B2%D1%8B%D0%B5-%D0%B4%D0%B0%D0%BD%D0%BD%D1%8B%D0%B5 *Russian text
  6. https://www.highload.ru/spb/2019/abstracts/4808
  7. https://drrtuy.zerothree.su/CS-ddl.txt
  8. https://drrtuy.zerothree.su/CH-ddl.txt
  9. s3://datasetyandex/hits_partial_col_set.csv.lzma
  10. https://clickhouse.yandex/benchmark.html
  11. https://drrtuy.zerothree.su/CS-testing-queries.txt
  12. https://drrtuy.zerothree.su/CH-testing-queries.txt
  13. https://drrtuy.zerothree.su/CS-testing-raw-results.txt
  14. https://drrtuy.zerothree.su/CH-testing-raw-results.txt
  15. s3://mcol498/mariadb-columnstore-1.2.4-1-bionic.amd64.deb.tar.gz
Click to read and post comments

янв 21, 2019

How to save time working with MariaDB Columnstore code.

I work a lot with MariaDB Columnstore source code and I used to spent much time just starting/restarting MCS. I usually don't need a cluster but a single server installation so there are ways to optimize starting/restarting routine. Before this commit cluster restart took about 3 minutes and now it takes only 6 seconds. Moreover I can get my stdout,stderr output without much trouble now because the streams goes neatly into the place that I configured. For details look here. Today I will share a number of scripts I use to speed re-/starting routine up but all credits for huge savings of the precious time goes to the author of the commit. And by the way here is the repo you could grab the scripts from.

Presetup

All the scripts mentioned here use a number of variables that must be set for your environment. The variables are in the very beginning of the scripts.

Initial installation

I use run_cs_with_oam_skiped for the purpose. A word of warning this one DELETES the MCS installation. The logic behind it simple: it compiles/installs both the server and MCS engine code and calls a number of scripts creating initial system catalog. I advise you to go through commentaries of the script itself before you run it. Here are the interesting variables that one must set before he can use the script. And yes, I omit self-explanatory variables:

  • MCS_TMP_DIR=/tmp/columnstore_tmp_files ( This one points to a directory that contains temporary columnstore installation files. It shouldn't be changed. )
  • MCS_SCRIPTS_REPO_PREFIX=/git/cs-docker-tools ( This one points to top directory of the scripts repository

The example output for this script is very long so I also omit it. The ps output is different when you run MCS using the script.

root@4bbf704f05b8:/git/mariadb-columnstore-server/d1-mariadb-columnstore-engine# ps -auxfw
USER       PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
root     29369  0.0  0.0  20288  2756 pts/0    Ss   янв16   0:00 bash
root      3293  0.0  0.0  38372  3536 pts/0    R+   09:13   0:00  \_ ps -auxfw
root         1  0.0  0.0  20048  1824 ?        Ss   янв14   0:05 /bin/bash /scripts/idle.sh
syslog   22073  0.0  0.0 263036  3784 ?        Ssl  янв18   0:00 /usr/sbin/rsyslogd
root      3056  0.0  0.2 12226744 41860 pts/0  S<l  08:29   0:00 /usr/local/mariadb/columnstore/bin/PrimProc
root      3060  0.0  0.0 907272 12360 pts/0    Sl   08:29   0:00 /usr/local/mariadb/columnstore/bin/controllernode
root      3065  0.0  0.1 275408 18176 pts/0    Sl   08:29   0:00 /usr/local/mariadb/columnstore/bin/workernode DBRM_Worker1
root      3112  0.0  0.2 1202380 32696 pts/0   S<l  08:29   0:00 /usr/local/mariadb/columnstore/bin/ExeMgr
root      3115  0.0  0.2 423020 40160 pts/0    Sl   08:29   0:00 /usr/local/mariadb/columnstore/bin/WriteEngineServer
root      3142  0.0  0.2 428416 43736 pts/0    Sl   08:29   0:00 /usr/local/mariadb/columnstore/bin/DMLProc
root      3145  0.0  0.2 279164 36396 pts/0    Sl   08:29   0:00 /usr/local/mariadb/columnstore/bin/DDLProc
mysql     3148  0.0  2.5 2961452 418788 pts/0  Sl   08:29   0:01 /usr/local/mariadb/columnstore/mysql/bin/mysqld --basedir=/usr/local/mariadb/columnstore/mysql/ --datadir=/usr/local/mariadb/columnstore/mysql/db --plugin-dir=/usr/local/mar
root      3291  0.0  0.0   6176   848 ?        S    09:13   0:00 sleep 60

Fast reinstall

If I only add some debugging output, it isn't worth it to delete / reinstall the whole MCS; better to stop MCS, call make install, and then start MCS again. I use sreinstall for this. It gracefully stops MCS, compiles and installs from the preset source directory and then starts MCS. This method works only if I installed MCS using the method described in the previous section. Here is the example that I minimized.

root@4bbf704f05b8:/git/mariadb-columnstore-server/d1-mariadb-columnstore-engine# sreinstall 1
Saved to /usr/local/mariadb/columnstore/data1/systemFiles/dbrm/BRM_saves
Sending SIGTERM
2019-01-21  9:18:15 0 [Note] /usr/local/mariadb/columnstore/mysql/bin/mysqld (initiated by: unknown): Normal shutdown
2019-01-21  9:18:15 0 [Note] InnoDB: FTS optimize thread exiting.
2019-01-21  9:18:15 0 [Note] Event Scheduler: Purging the queue. 0 events
2019-01-21  9:18:15 0 [Note] InnoDB: Starting shutdown...
...
-- Up-to-date: /usr/local/mariadb/columnstore/bin/WriteEngineServer
-- Up-to-date: /usr/local/mariadb/columnstore/bin/cpimport.bin
-- Up-to-date: /usr/local/mariadb/columnstore/bin/cpimport
This is dumb. DMLProc seems to need it.
Starting...
loading BRM from /usr/local/mariadb/columnstore/data1/systemFiles/dbrm/BRM_saves
OK.
Successfully loaded BRM snapshot
Successfully replayed 0 BRM transactions
workernode PID = 3909
controllernode PID = 3912
PrimProc PID = 3915
ExeMgr PID = 3971
WriteEngineServer PID = 3974
DMLProc PID = 4002
DDLProc PID = 4005
2019-01-21  9:18:21 0 [Note] /usr/local/mariadb/columnstore/mysql/bin/mysqld (mysqld 10.3.11-MariaDB-debug) starting as process 4008 ...
190121  9:18:21 Columnstore: Started; Version: 1.2.2-1
190121  9:18:21 Columnstore: Started; Version: 1.2.2-1
2019-01-21  9:18:21 0 [Note] InnoDB: !!!!!!!! UNIV_DEBUG switched on !!!!!!!!!
2019-01-21  9:18:21 0 [Note] InnoDB: Mutexes and rw_locks use GCC atomic builtins
2019-01-21  9:18:21 0 [Note] InnoDB: Uses event mutexes
2019-01-21  9:18:21 0 [Note] InnoDB: Compressed tables use zlib 1.2.11
2019-01-21  9:18:21 0 [Note] InnoDB: Number of pools: 1
2019-01-21  9:18:21 0 [Note] InnoDB: Using SSE2 crc32 instructions
2019-01-21  9:18:21 0 [Note] InnoDB: Initializing buffer pool, total size = 128M, instances = 1, chunk size = 128M
2019-01-21  9:18:21 0 [Note] InnoDB: Completed initialization of buffer pool
2019-01-21  9:18:21 0 [Note] InnoDB: If the mysqld execution user is authorized, page cleaner thread priority can be changed. See the man page of setpriority().
2019-01-21  9:18:21 0 [Note] InnoDB: 128 out of 128 rollback segments are active.
2019-01-21  9:18:21 0 [Note] InnoDB: Creating shared tablespace for temporary tables
2019-01-21  9:18:21 0 [Note] InnoDB: Setting file './ibtmp1' size to 12 MB. Physically writing the file full; Please wait ...
2019-01-21  9:18:21 0 [Note] InnoDB: File './ibtmp1' size is now 12 MB.
2019-01-21  9:18:21 0 [Note] InnoDB: Waiting for purge to start
2019-01-21  9:18:21 0 [Note] InnoDB: 10.3.11 started; log sequence number 1656714; transaction id 49
2019-01-21  9:18:21 0 [Note] InnoDB: Loading buffer pool(s) from /usr/local/mariadb/columnstore/mysql/db/ib_buffer_pool
2019-01-21  9:18:21 0 [Note] InnoDB: Buffer pool(s) load completed at 190121  9:18:21
2019-01-21  9:18:21 0 [Note] Plugin 'FEEDBACK' is disabled.
2019-01-21  9:18:21 0 [Note] Server socket created on IP: '::'.
2019-01-21  9:18:21 0 [Note] Reading of all Master_info entries succeded
2019-01-21  9:18:21 0 [Note] Added new Master_info '' to hash table
2019-01-21  9:18:21 0 [Note] /usr/local/mariadb/columnstore/mysql/bin/mysqld: ready for connections.
Version: '10.3.11-MariaDB-debug'  socket: '/usr/local/mariadb/columnstore/mysql/lib/mysql/mysql.sock'  port: 3306  Columnstore 1.2.3-1

Start/stop

Let's say an ExeMgr has crashed and I don't want to reinstall but just to gracefully restart MCS. I use a pair: startcs, stopcs for this. And as I said before I can have my stdout, stderr output now and startcs has the variable(LOG_PREFIX) that points to a directory where all logging goes. Here is an example output.

root@4bbf704f05b8:/git/mariadb-columnstore-server/d1-mariadb-columnstore-engine# stopcs
Saved to /usr/local/mariadb/columnstore/data1/systemFiles/dbrm/BRM_saves
Sending SIGTERM
2019-01-21  9:25:21 0 [Note] /usr/local/mariadb/columnstore/mysql/bin/mysqld (initiated by: unknown): Normal shutdown
2019-01-21  9:25:21 0 [Note] InnoDB: FTS optimize thread exiting.
2019-01-21  9:25:21 0 [Note] Event Scheduler: Purging the queue. 0 events
2019-01-21  9:25:21 0 [Note] InnoDB: Starting shutdown...
2019-01-21  9:25:21 0 [Note] InnoDB: Dumping buffer pool(s) to /usr/local/mariadb/columnstore/mysql/db/ib_buffer_pool
2019-01-21  9:25:21 0 [Note] InnoDB: Buffer pool(s) dump completed at 190121  9:25:21
2019-01-21  9:25:23 0 [Note] InnoDB: Shutdown completed; log sequence number 1656723; transaction id 50
2019-01-21  9:25:23 0 [Note] InnoDB: Removed temporary tablespace data file: "ibtmp1"
2019-01-21  9:25:23 0 [Note] /usr/local/mariadb/columnstore/mysql/bin/mysqld: Shutdown complete

Sending SIGKILL
Clearing SHM
root@4bbf704f05b8:/git/mariadb-columnstore-server/d1-mariadb-columnstore-engine# startcs
This is dumb. DMLProc seems to need it.
Starting...
loading BRM from /usr/local/mariadb/columnstore/data1/systemFiles/dbrm/BRM_saves
OK.
Successfully loaded BRM snapshot
Successfully replayed 0 BRM transactions
workernode PID = 4094
controllernode PID = 4097
PrimProc PID = 4100
ExeMgr PID = 4156
WriteEngineServer PID = 4159
DMLProc PID = 4186
DDLProc PID = 4189
2019-01-21  9:25:29 0 [Note] /usr/local/mariadb/columnstore/mysql/bin/mysqld (mysqld 10.3.11-MariaDB-debug) starting as process 4192 ...
190121  9:25:29 Columnstore: Started; Version: 1.2.2-1
190121  9:25:29 Columnstore: Started; Version: 1.2.2-1
2019-01-21  9:25:29 0 [Note] InnoDB: !!!!!!!! UNIV_DEBUG switched on !!!!!!!!!
2019-01-21  9:25:29 0 [Note] InnoDB: Mutexes and rw_locks use GCC atomic builtins
2019-01-21  9:25:29 0 [Note] InnoDB: Uses event mutexes
2019-01-21  9:25:29 0 [Note] InnoDB: Compressed tables use zlib 1.2.11
2019-01-21  9:25:29 0 [Note] InnoDB: Number of pools: 1
2019-01-21  9:25:29 0 [Note] InnoDB: Using SSE2 crc32 instructions
2019-01-21  9:25:29 0 [Note] InnoDB: Initializing buffer pool, total size = 128M, instances = 1, chunk size = 128M
2019-01-21  9:25:29 0 [Note] InnoDB: Completed initialization of buffer pool
2019-01-21  9:25:29 0 [Note] InnoDB: If the mysqld execution user is authorized, page cleaner thread priority can be changed. See the man page of setpriority().
2019-01-21  9:25:30 0 [Note] InnoDB: 128 out of 128 rollback segments are active.
2019-01-21  9:25:30 0 [Note] InnoDB: Creating shared tablespace for temporary tables
2019-01-21  9:25:30 0 [Note] InnoDB: Setting file './ibtmp1' size to 12 MB. Physically writing the file full; Please wait ...
2019-01-21  9:25:30 0 [Note] InnoDB: File './ibtmp1' size is now 12 MB.
2019-01-21  9:25:30 0 [Note] InnoDB: Waiting for purge to start
2019-01-21  9:25:30 0 [Note] InnoDB: 10.3.11 started; log sequence number 1656723; transaction id 49
2019-01-21  9:25:30 0 [Note] InnoDB: Loading buffer pool(s) from /usr/local/mariadb/columnstore/mysql/db/ib_buffer_pool
2019-01-21  9:25:30 0 [Note] InnoDB: Buffer pool(s) load completed at 190121  9:25:30
root@4bbf704f05b8:/git/mariadb-columnstore-server/d1-mariadb-columnstore-engine# 2019-01-21  9:25:30 0 [Note] Plugin 'FEEDBACK' is disabled.
2019-01-21  9:25:30 0 [Note] Server socket created on IP: '::'.
2019-01-21  9:25:30 0 [Note] Reading of all Master_info entries succeded
2019-01-21  9:25:30 0 [Note] Added new Master_info '' to hash table
2019-01-21  9:25:30 0 [Note] /usr/local/mariadb/columnstore/mysql/bin/mysqld: ready for connections.
Version: '10.3.11-MariaDB-debug'  socket: '/usr/local/mariadb/columnstore/mysql/lib/mysql/mysql.sock'  port: 3306  Columnstore 1.2.3-1

Conclusion

This handful of scripts helps me to speed up the development. Hope you find them useful too.

Click to read and post comments

ноя 16, 2018

Getting data from Postgres into Columnstore in realtime with Debezium and Kafka

There are different workload types in database world that usually couldn't be handled by one DBMS. Let's say you have OLTP processed by a number of Postgres installations working in a sharded setup. And if you often need to run analitycal query using data from all the shards then you don't have lots of choices. One of them is to upload the data into analytical DBMS, e.g. Columnstore that handles analytical workload the best.

This how-to is based on my recent speech @HighLoad 2018++ conference about Change Data Capture and databases. I will show you how to setup data changes streaming from Postgres into Columnstore in almost no time. Please note that you must have docker and docker-compose installed at physical or virtual machine that has at least 16 gigs of RAM.

The goal

The goal I am pursuing here is a simple one. I want that every data change made on a table in Postgres 10 eventually gets into a corresponding table that lives in MariaDB Columnstore 1.1.6-1.

The way

Schema overview

The setup uses Kafka event queue as an itermediate storage for change events. So our first subtask is how to get data from PG into Kafka? To do so I will use protobuf plugin[1] for PG that extracts logical changes from the WAL and Debezium plugin[2] for Kafka Connect that streams changes from protobuf into a Kafka topic. The second subtask I have is how to get data from Kafka into Columnstore in a most efficient way? Here comes Kafka Avro Data Adapter[3] into the game. I will use it as a Kafka consumer that stores events into Columnstore as records. This DA makes it in the most efficient way using Columnstore Bulk Write API[4]. There are two notes on the schema:

  • numbers on a schema that orders of steps the data change takes to get from from PG into Columnstore
  • Confluent Schema Registry[5] is an auxilary but very important service that isn't set directly on the data path but is actively used by Kafka Connect and later Data Adapter. Without it I would have to deserialise data changes from JSON document that follows Postgres schema and changes accordingly.

The players

There is a pair ofinteresting OpenSource projects I use in this setup. - MariaDB Columstore is analytical engine for MariaDB[6] - RedHat Debezium is an OpenSource Change Data Capture framework[7] that is based on Confluent Bottledwater ideas[8].

Run Forest, run

To speed up the setup I will use docker-compose together with a number of usefull shell scrits from my repository. Get into the repo after you clone it:

[drrtuy@intmacsta git]$ git clone https://github.com/drrtuy/hl2018-cdc.git
Клонирование в «hl2018-cdc»…
remote: Enumerating objects: 38, done.
remote: Counting objects: 100% (38/38), done.
remote: Compressing objects: 100% (27/27), done.
remote: Total 38 (delta 9), reused 35 (delta 9), pack-reused 0
Распаковка объектов: 100% (38/38), готово.
[drrtuy@intmacsta git]$ cd ./hl2018-cdc/
[drrtuy@intmacsta hl2018-cdc]$

The starting order is important so you should start one container at a time with 5 seconds pauses. Let's begin with ZK that is needed by Kafka for a different purposes.

[drrtuy@intmacsta hl2018-cdc]$ sudo docker-compose up -d zk
Creating volume "hl2018-cdc_mysql" with default driver
Creating volume "hl2018-cdc_kafka-data" with default driver
Creating volume "hl2018-cdc_kafka-config" with default driver
Creating volume "hl2018-cdc_zk" with default driver
Creating volume "hl2018-cdc_mcs" with default driver
Creating hl2018-cdc_zk_1 ... done

Let's check ZooKeeper logs after 5 seconds to miss no error. I will take only a snippet from the actual output because it takes a lot of space. The log output on your side could be different but there must be no errors in it.

[drrtuy@intmacsta hl2018-cdc]$ sudo docker-compose logs zk
Attaching to hl2018-cdc_zk_1
zk_1       | Starting up in standalone mode
zk_1       | ZooKeeper JMX enabled by default
zk_1       | Using config: /zookeeper/conf/zoo.cfg
zk_1       | 2018-11-22 12:36:54,549 - INFO  [main:QuorumPeerConfig@134] - Reading configuration 
...
zk_1       | 2018-11-22 12:36:54,586 - INFO  [main:NIOServerCnxnFactory@89] - binding to port 0.0.0.0/0.0.0.0:2181

Next one to start is Postgres 10. The log output is huge and not very important so I also skip most of its content.

[drrtuy@intmacsta hl2018-cdc]$ sudo docker-compose up -d pg
Creating hl2018-cdc_pg_1 ... done
[drrtuy@intmacsta hl2018-cdc]$ sudo docker-compose logs pg
Attaching to hl2018-cdc_pg_1
pg_1       | The files belonging to this database system will be owned by user "postgres".
pg_1       | This user must also own the server process.
...
pg_1       | 2018-11-22 12:46:10.506 GMT [1] LOG:  listening on IPv4 address "0.0.0.0", port 5432
pg_1       | 2018-11-22 12:46:10.506 GMT [1] LOG:  listening on IPv6 address "::", port 5432
pg_1       | 2018-11-22 12:46:10.523 GMT [1] LOG:  listening on Unix socket "/var/run/postgresql/.s.PGSQL.5432"
pg_1       | 2018-11-22 12:46:10.556 GMT [59] LOG:  database system was shut down at 2018-11-22 12:46:10 GMT
pg_1       | 2018-11-22 12:46:10.577 GMT [1] LOG:  database system is ready to accept connections

The allmighty Kafka goes next. The logs are even longer then before so I cut it without regret. But pay attention to any ERROR that could happen when Kafka starts. The setup is a many-headed beast so the sooner you catch errors the best because it saves your time from searching the misconfiguration root cause later.

[drrtuy@intmacsta hl2018-cdc]$ sudo docker-compose up -d kafka
hl2018-cdc_zk_1 is up-to-date
Creating hl2018-cdc_kafka_1 ... done
[drrtuy@intmacsta hl2018-cdc]$ sudo docker-compose logs kafka
Attaching to hl2018-cdc_kafka_1
kafka_1    | WARNING: Using default BROKER_ID=1, which is valid only for non-clustered installations.
kafka_1    | Using ZOOKEEPER_CONNECT=zk:2181
kafka_1    | Using KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.24.0.4:9092
...
kafka_1    | 2018-11-22 12:48:02,392 - INFO  [main:Logging$class@66] - [TransactionCoordinator id=1] Starting up.
kafka_1    | 2018-11-22 12:48:02,394 - INFO  [TxnMarkerSenderThread-1:Logging$class@66] - [Transaction Marker Channel Manager 1]: Starting
kafka_1    | 2018-11-22 12:48:02,394 - INFO  [main:Logging$class@66] - [TransactionCoordinator id=1] Startup complete.
kafka_1    | 2018-11-22 12:48:02,419 - INFO  [/config/changes-event-process-thread:Logging$class@66] - [/config/changes-event-process-thread]: Starting
kafka_1    | 2018-11-22 12:48:02,428 - INFO  [main:AppInfoParser$AppInfo@109] - Kafka version : 1.1.0
kafka_1    | 2018-11-22 12:48:02,429 - INFO  [main:AppInfoParser$AppInfo@110] - Kafka commitId : fdcf75ea326b8e07
kafka_1    | 2018-11-22 12:48:02,432 - INFO  [main:Logging$class@66] - [KafkaServer id=1] started

Our next container runs Kafka Connect Framework that extracts CDC-records from the source Postgres. Connect is the absolute winner of a number of useless log records.

[drrtuy@intmacsta hl2018-cdc]$ sudo docker-compose logs connect
Attaching to hl2018-cdc_connect_1
connect_1  | Plugins are loaded from /kafka/connect
connect_1  | Using the following environment variables:
connect_1  |       GROUP_ID=1
connect_1  |       CONFIG_STORAGE_TOPIC=my_connect_configs
connect_1  |       OFFSET_STORAGE_TOPIC=my_connect_offsets
connect_1  |       BOOTSTRAP_SERVERS=kafka:9092
connect_1  |       REST_HOST_NAME=172.24.0.5
connect_1  |       REST_PORT=8083
connect_1  |       ADVERTISED_HOST_NAME=172.24.0.5
...
connect_1  | 2018-11-26 18:41:27,655 - INFO  [DistributedHerder:DistributedHerder@842] - Starting connectors and tasks using config offset -1
connect_1  | 2018-11-26 18:41:27,655 - INFO  [DistributedHerder:DistributedHerder@852] - Finished starting connectors and tasks

We will run Confluent Schema Registry and Columnsture in one go:

[drrtuy@intmacsta hl2018-cdc]$ sudo docker-compose up -d schema mcs
hl2018-cdc_zk_1 is up-to-date
hl2018-cdc_kafka_1 is up-to-date
Starting hl2018-cdc_schema_1 ... done
Creating hl2018-cdc_mcs_1    ... done

I still have Kafka Avro Data Adapter to run but I'll do it later.

Postgres side - the source

I will make a simple table to experiment with and call it int_table. You should note that the table must have a primary key.

[drrtuy@intmacsta hl2018-cdc]$ sudo docker-compose exec pg bash
root@404536a51cf3:/# 
root@404536a51cf3:/# psql -U postgres
psql (10.0)
Type "help" for help.
postgres=# create table int_table(i bigint primary key, fl float, t text);
CREATE TABLE

Columnstore side - the destination

Now I create the similar int_table in Columnstore to capture the change events. For the sake of the how-to the change events will contain a number of service fields so the int_table schema for Columnstore differs from the PG`s one.

[drrtuy@intmacsta hl2018-cdc]$ sudo docker-compose exec mcs bash
[root@a4edfdce9fca /]# mcsmysql test
Welcome to the MariaDB monitor.  Commands end with ; or \g.
Your MariaDB connection id is 11
Server version: 10.2.17-MariaDB-log Columnstore 1.1.6-1

Copyright (c) 2000, 2018, Oracle, MariaDB Corporation Ab and others.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

MariaDB [test]> create table test.int_table( i bigint, fl double, t text, `before` text, after text, source text, op char(1), ts_ms bigint) engine=columnstore;
Query OK, 0 rows affected (1.05 sec)

It could take a while to start Columnstore so if you have problems running mcsmysql just relax for a couple of minuteswhile it starts.

Running MariaDB Kafka Avro Data adapter

In the end I will run Kafka Avro Data Adapter to suck events from the Kafka topic and put them into Columnstore converting types on the fly.

[drrtuy@intmacsta hl2018-cdc]$ sudo docker-compose up -d da
Creating hl2018-cdc_da_1 ... done

ATM the daemon puts errors into container's stdout so I would use docker-compose logs for debug. The output isn't very consice but it shows Avro types used for Kafka events. This Data Adapter uses MCS Bulk Write API and Schema Registry under the hood but the hood is big and these components went unnoticed. It`s worth to note this is a customized installation of a git repository code for Kafka Avro Data Adapter. And if you want to play more with it you could comment out the entrypoint for the container and run it manually.

Connect configuration

So now we have a pack of services to configure. Let's start with a first subtask from "The way" section and setup CDC from Postgres into Kafka. At this point I add Debezium Postgres plugin for Kafka Connect as a source connector and call it int_table. Debezium plugin connects with the protobuf plugin installed at Postgres. The plugin creates a topic in Kafka. It also saves Avro schemas in Schema Registry but I will describe the details later. The file pg_int_table.curl contains a number of settings that are described here[10] in more details. With my second command I check for a list of available connectors.

[drrtuy@intmacsta hl2018-cdc]$ ./pg.add_connector.sh pg_int_table.curl 
[drrtuy@intmacsta hl2018-cdc]$ ./list_connectors.sh 
[
    "int_table"
]
[drrtuy@intmacsta hl2018-cdc]$

Whether your output is the same or differs it's time to check logs of the Connect service.

[drrtuy@intmacsta hl2018-cdc]$ sudo docker-compose logs connect
Attaching to hl2018-cdc_connect_1
...
connect_1  | 2018-11-26 20:16:25,649 - INFO  [pool-6-thread-2:BaseSourceTask@40] - Starting PostgresConnectorTask with configuration:
connect_1  | 2018-11-26 20:16:25,649 - INFO  [pool-6-thread-2:BaseSourceTask@42] -    connector.class = io.debezium.connector.postgresql.PostgresConnector
...
connect_1  | 2018-11-27 17:19:20,215 - INFO  [pool-6-thread-1:PostgresSchema@97] - REPLICA IDENTITY for 'public.int_table' is 'DEFAULT'; UPDATE and DELETE events will contain previous values only for PK columns
connect_1  | 2018-11-27 17:19:20,224 - INFO  [pool-6-thread-1:Threads$2@247] - Creating thread debezium-postgresconnector-int_table-records-stream-producer

Here I see that everything is fine and Debezium plugin will stream changes from the int_table into the Kafka topic created for the purpose.

Running MariaDB Kafka Avro Data adapter

In the end I will run Kafka Avro Data Adapter to suck events from the Kafka topic and put them into Columnstore converting types. To do so I simply create the container.

[drrtuy@intmacsta hl2018-cdc]$ sudo docker-compose up -d da
Creating hl2018-cdc_da_1 ... done

ATM the daemon puts errors into container's stdout so I would use docker-compose logs for a debug or monitoring. This Data Adapter[3] uses MCS Bulk Write API and Schema Registry under the hood but the hood is big and these components went unnoticed. It`s worth to note that is customized installation of a git repository code. And if you want to play more with it you could comment out the entrypoint for the da container.

On Confluent Schema Registry

The Registry[5] is used by a number of components in this setup and make DevOps'es lifes easier. At the start Connect Debezium plugin connector saves two Avro schemas in the Registry: - table's primary key schema - table's value that is an Avro representation for all table's columns.

Data adapter also uses the Registry to obtain an actual Avro schema for the table's value. Using the schema DA deserializes Kafka events into Columnstore RECORDS.

Stream INSERT changes

Now I run a number of INSERTs in Postgres and look at the changes streamed into the Columnstore.

[drrtuy@intmacsta hl2018-cdc]$ sudo docker-compose exec pg bash
root@404536a51cf3:/# psql -U postgres
psql (10.0)
Type "help" for help.

postgres=# insert into int_table values (25, 0.05, 'test1');
INSERT 0 1
postgres=# insert into int_table values (42, 0.15, 'test2');
INSERT 0 1
postgres=# insert into int_table values (67, 1.25, 'test3');
INSERT 0 1

And in the Columnstore after that:

[drrtuy@intmacsta hl2018-cdc]$ sudo docker-compose exec mcs bash
[root@a4edfdce9fca /]# mcsmysql test
Welcome to the MariaDB monitor.  Commands end with ; or \g.
Your MariaDB connection id is 11
Server version: 10.2.17-MariaDB-log Columnstore 1.1.6-1

Copyright (c) 2000, 2018, Oracle, MariaDB Corporation Ab and others.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

MariaDB [test]> select * from int_table;
+------+------+--------+--------+-----------------------------------------------------------------------------+--------+------+---------------+
| i    | fl   | t      | before | after                                                                       | source | op   | ts_ms         |
+------+------+--------+--------+-----------------------------------------------------------------------------+--------+------+---------------+
|   25 | 0.05 | test1  | NULL   | {"i": 25, "fl": {"double": 0.050000000000000003}, "t": {"string": "test1"}} | NULL   | c    | 1543339686004 |
|   42 | 0.15 | test2  | NULL   | {"i": 42, "fl": {"double": 0.14999999999999999}, "t": {"string": "test2"}}  | NULL   | c    | 1543339691291 |
|   67 | 1.25 | test3  | NULL   | {"i": 67, "fl": {"double": 1.25}, "t": {"string": "test3"}}                 | NULL   | c    | 1543339705614 |
+------+------+--------+--------+-----------------------------------------------------------------------------+--------+------+---------------+
3 rows in set (0.08 sec)

Stream UPDATE changes

Here is the UPDATE statement I will play with at the Postgres side. It touches 2 RECORDS and I will get the same number of RECORDS in Columnstore.

postgres=# update  int_table set t = 'updated' where i >= 42;
UPDATE 2

And what do I get at Columnstore side.

MariaDB [test]> select * from int_table;
+------+------+----------+--------+------------------------------------------------------------------------------+--------+------+---------------+
| i    | fl   | t        | before | after                                                                        | source | op   | ts_ms         |
+------+------+----------+--------+------------------------------------------------------------------------------+--------+------+---------------+
|   25 | 0.05 | test1    | NULL   | {"i": 25, "fl": {"double": 0.050000000000000003}, "t": {"string": "test1"}}  | NULL   | c    | 1543339686004 |
|   42 | 0.15 | test2    | NULL   | {"i": 42, "fl": {"double": 0.14999999999999999}, "t": {"string": "test2"}}   | NULL   | c    | 1543339691291 |
|   67 | 1.25 | test3    | NULL   | {"i": 67, "fl": {"double": 1.25}, "t": {"string": "test3"}}                  | NULL   | c    | 1543339705614 |
|   42 | 0.15 | updated  | NULL   | {"i": 42, "fl": {"double": 0.14999999999999999}, "t": {"string": "updated"}} | NULL   | u    | 1543340276854 |
|   67 | 1.25 | updated  | NULL   | {"i": 67, "fl": {"double": 1.25}, "t": {"string": "updated"}}                | NULL   | u    | 1543340276855 |
+------+------+----------+--------+------------------------------------------------------------------------------+--------+------+---------------+
5 rows in set (0.09 sec)

Take a look at the last two RECORDS that have NULLs in the before column. This column could contain a JSON with int_table columns values before the UPDATE similar to what I got in the after. This is the place where REPLICA IDENTITY setting[9] comes into the game. I will set it at Postgres side for int_table and run the UPDATE again.

postgres=# alter table int_table replica identity full;
ALTER TABLE
postgres=# update  int_table set t = 'updated2' where i >= 42;
UPDATE 2

There are pair of new RECORDS in Columnstore`s int_table alter ego.

MariaDB [test]> select * from int_table;                                                                                                                       [3/106]
+------+------+-----------+------------------------------------------------------------------------------+------------------------------------------------------------
-------------------+--------+------+---------------+                                                                                                                  
| i    | fl   | t         | before                                                                       | after                                                                         | source | op   | ts_ms         |
+------+------+-----------+------------------------------------------------------------------------------+-----------------------------------------------------------$-------------------+--------+------+---------------+
...
|   42 | 0.15 | updated2  | {"i": 42, "fl": {"double": 0.14999999999999999}, "t": {"string": "updated"}} | {"i": 42, "fl": {"double": 0.14999999999999999}, "t": {"st$ing": "updated2"}} | NULL   | u    | 1543340725509 |
|   67 | 1.25 | updated2  | {"i": 67, "fl": {"double": 1.25}, "t": {"string": "updated"}}                | {"i": 67, "fl": {"double": 1.25}, "t": {"string": "updated$"}}                | NULL   | u    | 1543340725510 |
+------+------+-----------+------------------------------------------------------------------------------+-------------------------------------------------------------------------------+--------+------+---------------+
7 rows in set (0.08 sec) 

Changing REPLICA IDENTITY to full has a number of drawbacks though. But would I need previous values for all the columns I try this setting.

Summing this all up

Columnstore is good for analitic and heavy report workload when Postgres is good for transactional workload. And if you ever need to stream changes from PG into Columnstore you can the shortcut and adopt the solution with Debezium.

Links

  1. https://github.com/debezium/postgres-decoderbufs
  2. https://debezium.io/docs/connectors/postgresql/
  3. https://github.com/mariadb-corporation/mariadb-columnstore-data-adapters/tree/master/kafka-avro-adapter
  4. https://mariadb.com/kb/en/library/columnstore-bulk-write-sdk/
  5. https://docs.confluent.io/current/schema-registry/docs/index.html
  6. https://mariadb.com/kb/en/library/about-mariadb-columnstore/
  7. https://debezium.io/
  8. https://www.confluent.io/blog/bottled-water-real-time-integration-of-postgresql-and-kafka/
  9. https://debezium.io/docs/connectors/postgresql/#replica-identity
  10. https://debezium.io/docs/connectors/postgresql/#connector-properties
Click to read and post comments