Real-Time Feeding of a Data Lake with PostgreSQL and Debezium

PostgreSQL Users Group Belgium

Julien RIOU

February 13, 2024



Speaker


Summary


Who are we?

OVHcloud logo OVHcloud universes


Internal databases

OVHcloud universes pointing to PostgreSQL


Statistics


Cluster example

Cluster example


Mutualized environments

Mutualized databases


Analytics needs


Mix of workloads

This is fine

  • Online Transactional Processing (OLTP)
  • Online Analytical Processing (OLAP)

Link to image


Data Lake


Data Lake


Extract Transform Load (ETL)

Extract data from a source

Transform data to an expected format

Load data into a destination


Sqoop

MOZG

“mózg” = “brain” in polish


Sqoop

  • Extract with SELECT * FROM... WHERE id BETWEEN...
  • Load into Hive

DEPRECATED


ETL’s limitations


Change Data Capture (CDC)

Detect changes happening in databases


Triggers

Execute actions on events


Triggers


LISTEN/NOTIFY

Publication and subscription system (pub/sub)

NOTIFY channel [ , payload ]
LISTEN channel
  • https://www.postgresql.org/docs/current/sql-notify.html
  • https://www.postgresql.org/docs/current/sql-listen.html

LISTEN/NOTIFY


Logical replication

Decode events based on Write-Ahead Logs (WAL)


Debezium

CDC


Debezium


CDC TO KAFKA


Demo



Snapshot and streaming


Snapshots


Snapshot


Streaming


streaming


How to configure Debezium

Kafka Connect REST API:

curl -XPOST -d @payload.json -s localhost:8083/connectors

Payload example

{
  "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  "database.ApplicationName": "debezium",
  "database.dbname": "test",
  "database.hostname": "${file:/path/to/secrets/test:hostname}",
  "database.password": "${file:/path/to/secrets/test:password}",
  "database.port": "${file:/path/to/secrets/test:port}",
  "database.ssl.mode": "required",
  "database.user": "${file:/path/to/secrets/test:user}",
  "decimal.handling.mode": "double",
  "field.name.adjustment.mode": "avro",
  "heartbeat.action.query": "update ovh_debezium_heartbeat set ts=now();",
  "heartbeat.interval.ms": "1000",
  "incremental.snapshot.chunk.size": "100000",
  "name": "test",
  "plugin.name": "pgoutput",
  "producer.override.compression.type": "lz4",
  "publication.name": "debezium_test",
  "sanitize.field.names": "true",
  "schema.name.adjustment.mode": "avro",
  "signal.data.collection": "public.ovh_debezium_signal",
  "slot.name": "debezium_test",
  "status.update.interval.ms": "1000",
  "topic.creation.default.cleanup.policy": "compact",
  "topic.creation.default.delete.retention.ms": "604800000",
  "topic.creation.default.partitions": "6",
  "topic.creation.default.replication.factor": "3",
  "topic.heartbeat.prefix": "namespace.debezium.heartbeat",
  "topic.prefix": "namespace.test",
  "transforms": "schema,public",
  "transforms.public.regex": "namespace\\.test\\.public\\.(.+)",
  "transforms.public.replacement": "namespace.test.$1",
  "transforms.public.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.schema.regex": "namespace\\.test\\.((?!public).+)\\.(.+)",
  "transforms.schema.replacement": "namespace.test.$1.$2",
  "transforms.schema.type": "org.apache.kafka.connect.transforms.RegexRouter"
}

Automation

Ansible logo

ansible-playbook -e '{"database_name":"test"}' database-connector-create.yml
ansible-playbook -e '{"database_name":"test"}' database-connector-delete.yml

Debezium endpoint


Replica promotion


Passwords security

{
  "database.password": "${file:/path/to/secrets/test:password}"
}

Database connection lost

io.debezium.DebeziumException: Couldn't obtain encoding for database test

Manual restart of the connector


Table inheritance


Table inheritance

test=> select count(*) from events;
   count    
------------
 1314817483
(1 row)
test=> select count(*) from only events;
 count 
-------
     0
(1 row)

https://github.com/debezium/debezium/pull/5159

  • Snapshots selecting data from primary table
  • Those rows are not included in the logical replication
  • Rows never deleted

Topic routing (SMT)

Table name + regex = topic name

  • SMT = Single Message Transformation
  • Remove “public” schema on PostgreSQL
  • Remove double schema names on MySQL

Tables with the same structure

Topic routing same schema

{
  "transforms": "events",
  "transforms.events.type": "org.apache.kafka.connect.transforms.RegexRouter"
  "transforms.events.regex": "namespace\\.database\\.public\\.events_[0-9]{4}_[0-9]{2}",
  "transforms.events.replacement": "namespace.database.public.events",
}

Remove “public” schema

Topic routing remove public schema

{
  "transforms": "public",
  "transforms.public.type": "org.apache.kafka.connect.transforms.RegexRouter"
  "transforms.public.regex": "namespace\\.test\\.public\\.(.+)",
  "transforms.public.replacement": "namespace.test.$1",
}

Multiple transformations can be defined using a comma-separated list of transformation names:

{
  "transforms": "events,public"
}

TOAST

The Oversized-Attribute Storage Technique

UPDATE without modifying the attribute

{
  "op": "u",
  "before": null,
  "after": {
    "namespace.database.public.toasty.Value": {
      "id": "2e142822-cc92-4c4f-9fdc-ec45504753a9",
      "updated_at": {
        "string": "2024-01-17T10:53:34.929630Z"
      },
      "clob": {
        "string": "__debezium_unavailable_value"
      }
...
  • https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-toasted-values

TOAST


Databases with low writes

low writes


Databases with low writes

low writes simplified


Heartbeats

{
  "heartbeat.interval.ms": "1000",
  "heartbeat.action.query": "update ovh_debezium_heartbeat set ts=now();"
}

Kafka


Features


Partitioning and replication

{
  "topic.creation.default.partitions": "6",
  "topic.creation.default.replication.factor": "3"
}

Number of partitions

Maximum 200,000 partitions


Number of partitions

Topic routing

14 topics

252 partitions

-99.88 %


Compaction

{
  "topic.creation.default.cleanup.policy": "compact",
  "topic.creation.default.delete.retention.ms": "604800000"
}

Compression

{
  "producer.override.compression.type": "lz4"
}

zstd algorithm for the future but it’s not a priority


From Kafka to the Data Lake


Spark replication


Spark replication


Other uses


The future


Do you really need CDC?

Before opening up your laptop and install everything to run Debezium on your infrastructure, you should ensure that you really need change data capture. It can be expensive, hard to setup and complex to maintain. You should go through easier steps first, like creating an asynchronous replica to perform reporting queries. Then, when simpler solutions have been ruled out, you should consider setting up CDC.


Questions