Change Data Capture (CDC) MySQL menggunakan Debezium

debezium

Arsitektur Debezium untuk Change Data Capture (CDC) DBMS Mysql. (Foto : vladmihalcea )

 

Debezium adalah salah satu tools yang dapat membantu database kita menjadi event streams, dengan begitu aplikasi dapat melihat dan merespon secara langsung saat terjadi perubahan setiap baris pada database. Debezium berjalan di atas Apache Kafka dan menyediakan Kafka Connect yang dapat menghubungkan Kafka ke berbagai macam DBMS. Debezium akan menyimpan log perubahan data di Kafka logs, dimana consumer berasal. Artinya kita bisa dengan mudah meng-consume semua event log dengan benar dan lengkap walaupun aplikasi kita mati atau crash.

 

Running Debezium di Docker

Pada eksperimen kali ini, kita akan menjalankan Debezium di local. Terdapat 4 docker container yang akan berjalan. Tiga diantaranya container service Debezium (Zookeeper, Kafka dan Connector ) dan satu lagi container MySQL yang sudah terdapat example database. Jika container dihapus, maka semua data dan konfigurasi hilang.

Pertama, pastikan docker engine dan docker compose versi terakhir sudah terinstal dan sudah Running. Buat sebuah folder bernama debezium yang berisi file  docker-compose.yaml dan  .env.  

Salin script debezium-examples ini ke dalam file docker-compose.yaml. Selanjutnya isi file .env dengan konfigurasi versi debezium yang akan kita pakai seperti script dibawah ini

DEBEZIUM_VERSION=0.7 

Buka Command Line Interface (CLI) dan arahkan ke folder debezium lalu bangkitkan docker compose dengan memberikan perintah

$ cd ~/debezium
$ docker-compose up 

Tunggu proses pull image selesai dan semua container started.

 

Register Debezium’s Kafka Connect

Cara paling mudah memeriksa apakah semua service debezium sudah berjalan sempurna adalah dengan melakukan request HTTP GET ke endpoint connectornya.

$ curl -H "Accept:application/json" localhost:8083/

Jika Kafka servicenya sudah Running, respon yang akan diterima adalah seperti dibawah ini

{“version":"1.0.0","commit":"cb8625948210849f"}

Respon diatas memberikan informasi bahwa versi Kafka Connect yang kita gunakan adalah versi 1.0.0. Selanjutnya kita harus memeriksa Connector yang terdaftar dengan cara

curl -H "Accept:application/json" localhost:8083/connectors/

perintah diatas seharusnya mengeluarkan respon array kosong seperti dibawah ini

[]

respon diatas menandakan bahwa service Kafka Connect sudah berjalan dan dapat saling berkomunikasi, namun saat ini belum ada konektornya.

Untuk membuat konektor kita hanya cukup melakukan POST request ke API Kafka Connect dengan alamat endpoint localhost:8083/connectors/ yang berisi konfigurasi database dan kafka topic yang akan kita buat. Berikut contoh json kafka connectnya

{
 "name": "inventory-connector",
 "config": {
 "connector.class": "io.debezium.connector.mysql.MySqlConnector",
 "tasks.max": "1",
 "database.hostname": "mysql",
 "database.port": "3306",
 "database.user": "debezium",
 "database.password": "dbz",
 "database.server.id": "184054",
 "database.server.name": "dbserver1",
 "database.whitelist": "inventory",
 "database.history.kafka.bootstrap.servers": "kafka:9092",
 "database.history.kafka.topic": "dbhistory.inventory"
 }
}

Json diatas mendeskripsikan bahwa nama konektor yang akan dibuat adalah inventory-connector dengan userdb MySQL debezium  dan password dbz. Nama server database untuk kafka adalah dbserver1 menjadikan semua tabel di database inventory sebagai kafka topicnya.

Jika sudah benar, lakukan HTTP request dibawah ini

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'

Dibawah ini adalah JSON response jika pendaftaran kafka connector baru yang berhasil dibuat.

{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "database.server.id": "184054",
        "database.server.name": "dbserver1",
        "database.whitelist": "inventory",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "dbhistory.inventory",
        "name": "inventory-connector"
    },
    "tasks": [],
    "type": null
}

Yay!. Kafka connector berhasil kita buat.

 

Pengujian Kafka Consumer

Tahap selanjutnya yaitu pengujian produce & consume kafka topics. Pada bagian ini kita harus masuk ke exec mode docker container kafka dan mysql. Di exec mode container kafka, kita akan melakukan aktifitas consuming topic.

Pada docker container mysql, kita akan mencoba melakukan operasi insert, update dan delete pada suatu tabel. Nantinya, consumer kafka akan mendapatkan informasi log baru berupa json yang berisi aktifitas yang terjadi pada topik yang di-Subscribe.

Buka tab baru di CLI anda lalu periksa nama container yang digenerate oleh docker engine dengan cara

$ docker ps
CONTAINER ID        IMAGE                        COMMAND                  CREATED             STATUS              PORTS                                                                                        NAMES

ec9ee4e478e7        debezium/connect:0.7         "https://static.kudo.co.id/blog/engineering/docker-entrypoin..."   39 seconds ago      Up 38 seconds       8778/tcp, 9092/tcp, 0.0.0.0:8083->8083/tcp, 9779/tcp                                         debezium_connect_1

9a7457ec6419        debezium/kafka:0.7           "https://static.kudo.co.id/blog/engineering/docker-entrypoin..."   40 seconds ago      Up 39 seconds       8778/tcp, 9779/tcp, 0.0.0.0:9092->9092/tcp                                                   debezium_kafka_1

2a1e763839b5        debezium/example-mysql:0.7   "docker-entrypoint..."   3 minutes ago       Up 40 seconds       0.0.0.0:3306->3306/tcp                                                                       debezium_mysql_1

b23c8313da49        debezium/zookeeper:0.7       "https://static.kudo.co.id/blog/engineering/docker-entrypoin..."   3 minutes ago       Up 40 seconds       0.0.0.0:2181->2181/tcp, 0.0.0.0:2888->2888/tcp, 8778/tcp, 0.0.0.0:3888->3888/tcp, 9779/tcp   debezium_zookeeper_1

Dari matriks yang dihasilkan perintah docker ps, kita dapat mengetahui nama container untuk kafka adalah debezium_kafka_1 dan container mysqlnya bernama debezium_mysql_1. Agar masuk ke mode exec container kafka, kita hanya cukup berikan perintah

$ docker exec -it debezium_kafka_1 bash

di mana debezium_kafka_1 adalah nama docker container yang dituju.

Sebelum consume kafka topic, ada baiknya cek terlebih dahulu nama-nama kafka topic berhasil dibuat pada saat register debezium kafka connect sebelumnya. Berikan perintah dibawah ini untuk mendapatkan list kafka topic yang sudah ada

$ ./bin/kafka-topics.sh --zookeeper zookeeper:2181 —-list

output dari perintah tersebut akan menampilkan nama-nama kafka topic kurang lebih seperti dibawah ini

__consumer_offsets
connect-status
dbhistory.inventory
dbserver1
dbserver1.inventory.customers
dbserver1.inventory.orders
dbserver1.inventory.products
dbserver1.inventory.products_on_hand
my_connect_configs
my_connect_offsets

Jika kita perhatikan, setiap nama topic yang dihasilkan debezium sesuai dengan konfigurasi yang kita daftarkan ke debezium connector. Format nama topic yang dihasilkan adalah [server_name].[database_name].[table_name].

Setelah mengetahui nama topicnya, kita bisa langsung men-consume topicnya. Sebagai contoh, kita ingin men-consume topic dbserver1.inventory.customers yang dapat memberikan informasi berbagai macam update pada tabel customers.

$ ./bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --from-beginning --topic dbserver1.inventory.customers

Sekarang kita sudah subscribe kafka topic  dbserver1.inventory.customers. Artinya kita akan mendapat sebuah message setiap ada update yang terjadi di tabel customer meski berbeda server / host. Hal ini bisa kita implementasikan pada berbagai studi kasus seperti singkronisasi data customer, mengaitkan tabel customer dengan tabel order dan berbagai contoh kasus lainnya. Pada laman wiki kafka, terdapat library kafka client yang ditulis dalam berbagai macam bahasa pemrograman. Klik disini untuk melihat informasi lengkapnya.

Tahap terakhir yaitu menguji kafka consumer dengan cara melakukan aktifitas transaksi di database inventory. Langkah-langkah pengujiannya yaitu masuk ke exec mode docker container mysql.

Buka tab CLI baru dan tuliskan perintah

$ docker exec -it debezium_mysql_1 bash

Setelah masuk ke mode exec docker container mysql, login ke MySQL Monitor agar dapat menjalankan perintah-perintah sql.

# mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory

Berikan beberapa sql query agar kita mengetahui message yang diberikan jika terdapat perubahan pada database.

Operasi Insert

mysql> INSERT INTO customers VALUES (default, "Dendi", "Abdul Rohim", “dendi.abdul@kudo.co.id");

Pada cosole consumer, kita akan mendapatkan message berformat json seperti dibawah ini

{
 "schema": {
 "type": "struct",
 "fields": [
 {
 "type": "struct",
 "fields": [
 {
 "type": "int32",
 "optional": false,
 "field": "id"
 },
 {
 "type": "string",
 "optional": false,
 "field": "first_name"
 },
 {
 "type": "string",
 "optional": false,
 "field": "last_name"
 },
 {
 "type": "string",
 "optional": false,
 "field": "email"
 }
 ],
 "optional": true,
 "name": "dbserver1.inventory.customers.Value",
 "field": "before"
 },
 {
 "type": "struct",
 "fields": [
 {
 "type": "int32",
 "optional": false,
 "field": "id"
 },
 {
 "type": "string",
 "optional": false,
 "field": "first_name"
 },
 {
 "type": "string",
 "optional": false,
 "field": "last_name"
 },
 {
 "type": "string",
 "optional": false,
 "field": "email"
 }
 ],
 "optional": true,
 "name": "dbserver1.inventory.customers.Value",
 "field": "after"
 },
 {
 "type": "struct",
 "fields": [
 {
 "type": "string",
 "optional": false,
 "field": "name"
 },
 {
 "type": "int64",
 "optional": false,
 "field": "server_id"
 },
 {
 "type": "int64",
 "optional": false,
 "field": "ts_sec"
 },
 {
 "type": "string",
 "optional": true,
 "field": "gtid"
 },
 {
 "type": "string",
 "optional": false,
 "field": "file"
 },
 {
 "type": "int64",
 "optional": false,
 "field": "pos"
 },
 {
 "type": "int32",
 "optional": false,
 "field": "row"
 },
 {
 "type": "boolean",
 "optional": true,
 "field": "snapshot"
 },
 {
 "type": "int64",
 "optional": true,
 "field": "thread"
 },
 {
 "type": "string",
 "optional": true,
 "field": "db"
 },
 {
 "type": "string",
 "optional": true,
 "field": "table"
 }
 ],
 "optional": false,
 "name": "io.debezium.connector.mysql.Source",
 "field": "source"
 },
 {
 "type": "string",
 "optional": false,
 "field": "op"
 },
 {
 "type": "int64",
 "optional": true,
 "field": "ts_ms"
 }
 ],
 "optional": false,
 "name": "dbserver1.inventory.customers.Envelope"
 },
 "payload": {
 "before": null,
 "after": {
 "id": 1005,
 "first_name": "Dendi",
 "last_name": "Abdul Rohim",
 "email": "dendi.abdul@kudo.co.id"
 },
 "source": {
 "name": "dbserver1",
 "server_id": 223344,
 "ts_sec": 1514347265,
 "gtid": null,
 "file": "mysql-bin.000003",
 "pos": 364,
 "row": 0,
 "snapshot": null,
 "thread": 18,
 "db": "inventory",
 "table": "customers"
 },
 "op": "c",
 "ts_ms": 1514347265336
 }
}

Perhatikan properti payload diatas, terdapat properti before berisi null dan properti after berisi data yang baru saja kita insert. Selain itu, properti “op” memberikan value “c” yang berarti telah terjadi operasi Create pada tabel customers di database.

Operasi Update

mysql > UPDATE customers SET email="dendi.abdul@grab.com" WHERE id=1005;

Pada cosole consumer, kita akan mendapatkan message berformat json seperti dibawah ini

{
 "schema": {
 "type": "struct",
 "fields": [
 {
 "type": "struct",
 "fields": [
 {
 "type": "int32",
 "optional": false,
 "field": "id"
 },
 {
 "type": "string",
 "optional": false,
 "field": "first_name"
 },
 {
 "type": "string",
 "optional": false,
 "field": "last_name"
 },
 {
 "type": "string",
 "optional": false,
 "field": "email"
 }
 ],
 "optional": true,
 "name": "dbserver1.inventory.customers.Value",
 "field": "before"
 },
 {
 "type": "struct",
 "fields": [
 {
 "type": "int32",
 "optional": false,
 "field": "id"
 },
 {
 "type": "string",
 "optional": false,
 "field": "first_name"
 },
 {
 "type": "string",
 "optional": false,
 "field": "last_name"
 },
 {
 "type": "string",
 "optional": false,
 "field": "email"
 }
 ],
 "optional": true,
 "name": "dbserver1.inventory.customers.Value",
 "field": "after"
 },
 {
 "type": "struct",
 "fields": [
 {
 "type": "string",
 "optional": false,
 "field": "name"
 },
 {
 "type": "int64",
 "optional": false,
 "field": "server_id"
 },
 {
 "type": "int64",
 "optional": false,
 "field": "ts_sec"
 },
 {
 "type": "string",
 "optional": true,
 "field": "gtid"
 },
 {
 "type": "string",
 "optional": false,
 "field": "file"
 },
 {
 "type": "int64",
 "optional": false,
 "field": "pos"
 },
 {
 "type": "int32",
 "optional": false,
 "field": "row"
 },
 {
 "type": "boolean",
 "optional": true,
 "field": "snapshot"
 },
 {
 "type": "int64",
 "optional": true,
 "field": "thread"
 },
 {
 "type": "string",
 "optional": true,
 "field": "db"
 },
 {
 "type": "string",
 "optional": true,
 "field": "table"
 }
 ],
 "optional": false,
 "name": "io.debezium.connector.mysql.Source",
 "field": "source"
 },
 {
 "type": "string",
 "optional": false,
 "field": "op"
 },
 {
 "type": "int64",
 "optional": true,
 "field": "ts_ms"
 }
 ],
 "optional": false,
 "name": "dbserver1.inventory.customers.Envelope"
 },
 "payload": {
 "before": {
 "id": 1005,
 "first_name": "Dendi",
 "last_name": "Abdul Rohim",
 "email": "dendi.abdul@kudo.co.id"
 },
 "after": {
 "id": 1005,
 "first_name": "Dendi",
 "last_name": "Abdul Rohim",
 "email": "dendi.abdul@grab.com"
 },
 "source": {
 "name": "dbserver1",
 "server_id": 223344,
 "ts_sec": 1514359327,
 "gtid": null,
 "file": "mysql-bin.000003",
 "pos": 686,
 "row": 0,
 "snapshot": null,
 "thread": 18,
 "db": "inventory",
 "table": "customers"
 },
 "op": "u",
 "ts_ms": 1514359327437
 }
}

Berbeda dengan operasi insert, properti before pada objek payload berisi kolom dan value sebelum baris tersebut diupdate. Message operasi update ini ditandai juga dengan data json “op”:”u”.

Operasi Delete

mysql> DELETE FROM customers WHERE id=1005;

Pada console consumer kita akan mendapatkan message

{
 "schema": {
 "type": "struct",
 "fields": [
 {
 "type": "struct",
 "fields": [
 {
 "type": "int32",
 "optional": false,
 "field": "id"
 },
 {
 "type": "string",
 "optional": false,
 "field": "first_name"
 },
 {
 "type": "string",
 "optional": false,
 "field": "last_name"
 },
 {
 "type": "string",
 "optional": false,
 "field": "email"
 }
 ],
 "optional": true,
 "name": "dbserver1.inventory.customers.Value",
 "field": "before"
 },
 {
 "type": "struct",
 "fields": [
 {
 "type": "int32",
 "optional": false,
 "field": "id"
 },
 {
 "type": "string",
 "optional": false,
 "field": "first_name"
 },
 {
 "type": "string",
 "optional": false,
 "field": "last_name"
 },
 {
 "type": "string",
 "optional": false,
 "field": "email"
 }
 ],
 "optional": true,
 "name": "dbserver1.inventory.customers.Value",
 "field": "after"
 },
 {
 "type": "struct",
 "fields": [
 {
 "type": "string",
 "optional": false,
 "field": "name"
 },
 {
 "type": "int64",
 "optional": false,
 "field": "server_id"
 },
 {
 "type": "int64",
 "optional": false,
 "field": "ts_sec"
 },
 {
 "type": "string",
 "optional": true,
 "field": "gtid"
 },
 {
 "type": "string",
 "optional": false,
 "field": "file"
 },
 {
 "type": "int64",
 "optional": false,
 "field": "pos"
 },
 {
 "type": "int32",
 "optional": false,
 "field": "row"
 },
 {
 "type": "boolean",
 "optional": true,
 "field": "snapshot"
 },
 {
 "type": "int64",
 "optional": true,
 "field": "thread"
 },
 {
 "type": "string",
 "optional": true,
 "field": "db"
 },
 {
 "type": "string",
 "optional": true,
 "field": "table"
 }
 ],
 "optional": false,
 "name": "io.debezium.connector.mysql.Source",
 "field": "source"
 },
 {
 "type": "string",
 "optional": false,
 "field": "op"
 },
 {
 "type": "int64",
 "optional": true,
 "field": "ts_ms"
 }
 ],
 "optional": false,
 "name": "dbserver1.inventory.customers.Envelope"
 },
 "payload": {
 "before": {
 "id": 1005,
 "first_name": "Dendi",
 "last_name": "Abdul Rohim",
 "email": "dendi.abdul@grab.com"
 },
 "after": null,
 "source": {
 "name": "dbserver1",
 "server_id": 223344,
 "ts_sec": 1514359486,
 "gtid": null,
 "file": "mysql-bin.000003",
 "pos": 1053,
 "row": 0,
 "snapshot": null,
 "thread": 18,
 "db": "inventory",
 "table": "customers"
 },
 "op": "d",
 "ts_ms": 1514359486651
 }
}

Pada operasi delete, properti payload after akan berisi null dan “op”:”d”.

 

Kesimpulan

Debezium adalah platform sistem terdistribusi yang mengubah database menjadi event stream. Debezium dibagun diatas Apache Kafka dan menyediakan Kafka Connect yang kompatibel dengan DBMS seperti MySQL, Postgres dan MongoDB. Dengan kafka, sistem menjadi lebih scalable, minimum dependency, dan fault-tolerant cluster. Setiap aplikasi yang men-Subscribe sebuah topic kafka akan mendapatkan message JSON berisi informasi perubahan yang terjadi pada database. Dibantu dengan library kafka client, kita dapat membuat aksi programatis untuk berbagai kondisi perubahan pada database yang kita monitor meskipun di server lain.

Semoga artikel ini bermanfaat untuk anda. Jika ingin berdiskusi atau memberikan saran silahkan hubungi melalui email. Saya akan segera membalasnya. Cheers 🥂.

Comments

comments

dendi.abdul

You Might Also Like

No Comment

Comments are closed here.