Real-Time Feeding of a Data Lake with PostgreSQL and Debezium

intro_logo

PostgreSQL Users Group Belgium

Julien RIOU

February 13, 2024

Speaker

Summary

  • Who are we?
  • Internal databases
  • Data Lake
  • ETL
  • CDC
  • Other uses
  • The future

Who are we?

OVHcloud logo OVHcloud universes

Internal databases

OVHcloud universes pointing to PostgreSQL

Statistics

  • 3 DBMS (MySQL, MongoDB, PostgreSQL)
  • 7 autonomous infrastructures worldwide
  • 500+ servers
  • 2000+ databases
  • 100+ clusters
  • Highly secure environments

Cluster example

Cluster example

Mutualized environments

Mutualized databases

Analytics needs

  • Billing
  • Revenue
  • Enterprise strategy
  • KPIs
  • Fraud detection
  • Electrical consumption
  • Metadata analysis (from JIRA)
  • Work time detection of support teams

Mix of workloads

This is fine

Data Lake

  • 3 environnements (production, integration, sandbox)
  • 2 geographical regions (OVH Group, OVH US)
  • 900 TB

Data Lake

Extract Transform Load (ETL)

Extract data from a source

Transform data to an expected format

Load data into a destination

Sqoop

MOZG

Sqoop

  • Extract with SELECT * FROM... WHERE id BETWEEN...
  • Load into Hive

DEPRECATED

ETL’s limitations

  • Mix of workloads
  • Direct acces to the databases
  • Lagging data

Change Data Capture (CDC)

Detect changes happening in databases

Triggers

Execute actions on events

  • Capture/log/audit tables
  • Fonction inserting OLD and NEW values
  • AFTER INSERT OR UPDATE OR DELETE trigger

Triggers

  • Impact on write performance
  • Relations visible to the users
  • Retention policy managed by users themselves

LISTEN/NOTIFY

Publication and subscription system (pub/sub)

NOTIFY channel [ , payload ]
LISTEN channel

LISTEN/NOTIFY

  • Impact on write performance
  • Lost events when disconnected

Logical replication

Decode events based on Write-Ahead Logs (WAL)

  • CREATE PUBLICATION
  • CREATE SUBSCRIPTION
  • Replication slot

Debezium

CDC

Debezium

  • Source Kafka Connect connector
  • Compatible with PostgreSQL and many RDBMS
  • Real-time thanks to logical replication
  • Standard message format with Apache Avro
  • Open source (Apache License, Version 2.0)

CDC TO KAFKA

Demo

Snapshot and streaming

Snapshots

  • Initial copy (once)
  • On-demand snapshots with signal tables
  • Partial snapshots
    • *.include.list
    • *.exclude.list

Snapshot

Streaming

  • Based on logical replication
  • Detect DML statements (INSERT, UPDATE, DELETE)
  • Primary key highly recommended
  • pgoutput plugin

streaming

How to configure Debezium

Kafka Connect REST API:

curl -XPOST -d @payload.json -s localhost:8083/connectors

Payload example

{
  "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  "database.ApplicationName": "debezium",
  "database.dbname": "test",
  "database.hostname": "${file:/path/to/secrets/test:hostname}",
  "database.password": "${file:/path/to/secrets/test:password}",
  "database.port": "${file:/path/to/secrets/test:port}",
  "database.ssl.mode": "required",
  "database.user": "${file:/path/to/secrets/test:user}",
  "decimal.handling.mode": "double",
  "field.name.adjustment.mode": "avro",
  "heartbeat.action.query": "update ovh_debezium_heartbeat set ts=now();",
  "heartbeat.interval.ms": "1000",
  "incremental.snapshot.chunk.size": "100000",
  "name": "test",
  "plugin.name": "pgoutput",
  "producer.override.compression.type": "lz4",
  "publication.name": "debezium_test",
  "sanitize.field.names": "true",
  "schema.name.adjustment.mode": "avro",
  "signal.data.collection": "public.ovh_debezium_signal",
  "slot.name": "debezium_test",
  "status.update.interval.ms": "1000",
  "topic.creation.default.cleanup.policy": "compact",
  "topic.creation.default.delete.retention.ms": "604800000",
  "topic.creation.default.partitions": "6",
  "topic.creation.default.replication.factor": "3",
  "topic.heartbeat.prefix": "namespace.debezium.heartbeat",
  "topic.prefix": "namespace.test",
  "transforms": "schema,public",
  "transforms.public.regex": "namespace\\.test\\.public\\.(.+)",
  "transforms.public.replacement": "namespace.test.$1",
  "transforms.public.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.schema.regex": "namespace\\.test\\.((?!public).+)\\.(.+)",
  "transforms.schema.replacement": "namespace.test.$1.$2",
  "transforms.schema.type": "org.apache.kafka.connect.transforms.RegexRouter"
}

Automation

Ansible logo

ansible-playbook -e '{"database_name":"test"}' database-connector-create.yml
ansible-playbook -e '{"database_name":"test"}' database-connector-delete.yml

Debezium endpoint

Replica promotion

  • Patroni advances replication slot position on all replicas
  • Transparent for Debezium

Passwords security

  • Custom vault microservice (HTTPS)
  • Copy secrets to local files
  • Reference to local files using FileConfigProvider
{
  "database.password": "${file:/path/to/secrets/test:password}"
}

Database connection lost

io.debezium.DebeziumException: Couldn't obtain encoding for database test

Manual restart of the connector

Table inheritance

  • events
    • events_2023_10
    • events_2023_11
    • events_2023_12
    • events_2024_01

Table inheritance

test=> select count(*) from events;
   count    
------------
 1314817483
(1 row)
test=> select count(*) from only events;
 count 
-------
     0
(1 row)

https://github.com/debezium/debezium/pull/5159

Topic routing (SMT)

Table name + regex = topic name

  • Tables with the same structure
  • Standardization of table names on all DBMS
  • Reduce number of partitions on Kafka

Tables with the same structure

Topic routing same schema

{
  "transforms": "events",
  "transforms.events.type": "org.apache.kafka.connect.transforms.RegexRouter"
  "transforms.events.regex": "namespace\\.database\\.public\\.events_[0-9]{4}_[0-9]{2}",
  "transforms.events.replacement": "namespace.database.public.events",
}

Remove “public” schema

Topic routing remove public schema

{
  "transforms": "public",
  "transforms.public.type": "org.apache.kafka.connect.transforms.RegexRouter"
  "transforms.public.regex": "namespace\\.test\\.public\\.(.+)",
  "transforms.public.replacement": "namespace.test.$1",
}

TOAST

The Oversized-Attribute Storage Technique

UPDATE without modifying the attribute

{
  "op": "u",
  "before": null,
  "after": {
    "namespace.database.public.toasty.Value": {
      "id": "2e142822-cc92-4c4f-9fdc-ec45504753a9",
      "updated_at": {
        "string": "2024-01-17T10:53:34.929630Z"
      },
      "clob": {
        "string": "__debezium_unavailable_value"
      }
...

TOAST

  • ALTER TABLE... REPLICA IDENTITY DEFAULT
    • Value replaced by unavailable.value.placeholder
  • ALTER TABLE... REPLICA IDENTITY FULL
    • Value included

Databases with low writes

low writes

Databases with low writes

low writes simplified

Heartbeats

{
  "heartbeat.interval.ms": "1000",
  "heartbeat.action.query": "update ovh_debezium_heartbeat set ts=now();"
}

Kafka

  • Queue system
  • 3 clusters dedicated to CDC
    • Europe (5,000 to 15,000 events per second)
    • Canada (2,000 events per second)
    • USA
  • 20 brokers
  • PCI DSS

Features

  • Partitioning
  • Replication
  • Compaction
  • Compression

Partitioning and replication

{
  "topic.creation.default.partitions": "6",
  "topic.creation.default.replication.factor": "3"
}

Number of partitions

  • 1 group of 80 databases
    • Country + Universe
  • 140 tables per database
  • 1 table = 18 partitions (partitions (6) * replication factor (3))
  • Estimation of 201,600 partitions

Number of partitions

Topic routing

14 topics

252 partitions

-99.88 %

Compaction

  • Retention policy
  • Keep only the last value of each key identifying a message
  • Full copy of the database on Kafka
  • History of changes kept for 7 days (tombstones)
{
  "topic.creation.default.cleanup.policy": "compact",
  "topic.creation.default.delete.retention.ms": "604800000"
}

Compression

{
  "producer.override.compression.type": "lz4"
}

From Kafka to the Data Lake

Spark replication

Spark replication

Other uses

  • Caching system
  • Asynchronous tasks

The future

  • Hortonworks Data Platform (HDP) Open Source is deprecated

Do you really need CDC?

Questions

// reveal.js plugins