Alimentation d’un Data Lake en temps réel grâce à PostgreSQL et Debezium

Meetup PostgreSQL Lille

Gwendoline ROTROU, Julien RIOU

25 Janvier 2024



Speakers

Gwendoline Rotrou
Julien Riou

Sommaire


Qui sommes-nous ?

OVHcloud logo OVHcloud univers


Bases de données interne

OVHcloud univers pointant vers PostgreSQL


Quelques chiffres


Exemple de cluster

cluster example


Environnement mutualisé

mutualized databases


Besoin d’analyse


Mélange de workloads

This is fine

  • Online Transactional Processing (OLTP)
  • Online Analytical Processing (OLAP) Source de l’image

Data Lake


Data Lake


Extract Transform Load (ETL)

Extraction de la donnée depuis une source

Transformation de la donnée pour correspondre au format attendu

Chargement dans une destination


Sqoop

MOZG

“mózg” = “cerveau” en polonais.


Sqoop

  • Extraction des données
    • SELECT * FROM... WHERE id BETWEEN...
  • Chargement des données
    • Hive

DEPRECATED


Limitations des ETL


Change Data Capture (CDC)

Détection des changements dans une base de données

ETL sous stéroïde.


Triggers

Déclenche des actions en fonction d’événements

  • Table de capture/log/audit
  • Fonction qui insère OLD et NEW
  • Trigger AFTER INSERT OR UPDATE OR DELETE

Triggers


LISTEN/NOTIFY

Système de publication et de souscription

  • Trigger qui publie sur un canal avec NOTIFY
  • Clients qui souscrivent au canal avec LISTEN

LISTEN/NOTIFY


Réplication logique

Décodage d’événements grâce aux Write-Ahead Logs (WAL)

  • CREATE PUBLICATION
  • CREATE SUBSCRIPTION
  • Slot de réplication

Debezium

CDC


Debezium


CDC TO KAFKA


Demo



Snapshot et streaming


Snapshots


snapshot


Streaming


streaming


Comment configurer Debezium

API REST de Kafka Connect

curl -XPOST -d @payload.json -s localhost:8083/connectors

Exemple de payload

{
  "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  "database.ApplicationName": "debezium",
  "database.dbname": "test",
  "database.hostname": "${file:/path/to/secrets/test:hostname}",
  "database.password": "${file:/path/to/secrets/test:password}",
  "database.port": "${file:/path/to/secrets/test:port}",
  "database.ssl.mode": "required",
  "database.user": "${file:/path/to/secrets/test:user}",
  "decimal.handling.mode": "double",
  "field.name.adjustment.mode": "avro",
  "heartbeat.action.query": "update ovh_debezium_heartbeat set ts=now();",
  "heartbeat.interval.ms": "1000",
  "incremental.snapshot.chunk.size": "100000",
  "name": "test",
  "plugin.name": "pgoutput",
  "producer.override.compression.type": "lz4",
  "publication.name": "debezium_test",
  "sanitize.field.names": "true",
  "schema.name.adjustment.mode": "avro",
  "signal.data.collection": "public.ovh_debezium_signal",
  "slot.name": "debezium_test",
  "status.update.interval.ms": "1000",
  "topic.creation.default.cleanup.policy": "compact",
  "topic.creation.default.delete.retention.ms": "604800000",
  "topic.creation.default.partitions": "6",
  "topic.creation.default.replication.factor": "3",
  "topic.heartbeat.prefix": "namespace.debezium.heartbeat",
  "topic.prefix": "namespace.test",
  "transforms": "schema,public",
  "transforms.public.regex": "namespace\\.test\\.public\\.(.+)",
  "transforms.public.replacement": "namespace.test.$1",
  "transforms.public.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.schema.regex": "namespace\\.test\\.((?!public).+)\\.(.+)",
  "transforms.schema.replacement": "namespace.test.$1.$2",
  "transforms.schema.type": "org.apache.kafka.connect.transforms.RegexRouter"
}

Comment nous configurons Debezium

Ansible logo

ansible-playbook -e '{"database_name":"test"}' database-connector-create.yml
ansible-playbook -e '{"database_name":"test"}' database-connector-delete.yml

Où se connecte Debezium


Promotion d’un réplica


Sécurité des mots de passe

{
  "database.password": "${file:/path/to/secrets/test:password}"
}

Perte de connexion à la base de données

Redémarrage manuel du connecteur

io.debezium.DebeziumException: Couldn't obtain encoding for database test

Tables héritées


Tables héritées

test=> select count(*) from events;
   count    
------------
 1314817483
(1 row)
test=> select count(*) from only events;
 count 
-------
     0
(1 row)

https://github.com/debezium/debezium/pull/5159

  • Snapshots prennant en compte les lignes de la table principale
  • Lignes qui ne proviennent pas de la réplication logique
  • Lignes donc jamais supprimmées

Topic routing (SMT)

Transformation d’un nom de table en un nom de topic à l’aide de regex

  • Tables partageant le même schéma
  • Uniformisation des noms de topics entre les DBMS
  • Réduire le nombre de partitions sur Kafka
  • SMT = Single Message Transformation
  • Retrait du schéma public sur PostgreSQL
  • Déduplication du nom de base de données sur MySQL

Tables avec le même schéma

topic routing same schema

{
  "transforms": "events",
  "transforms.events.type": "org.apache.kafka.connect.transforms.RegexRouter"
  "transforms.events.regex": "namespace\\.database\\.public\\.events_[0-9]{4}_[0-9]{2}",
  "transforms.events.replacement": "namespace.database.public.events",
}

Retrait du schéma public

topic routing remove public schema

{
  "transforms": "public",
  "transforms.public.type": "org.apache.kafka.connect.transforms.RegexRouter"
  "transforms.public.regex": "namespace\\.test\\.public\\.(.+)",
  "transforms.public.replacement": "namespace.test.$1",
}

Combinaison de plusieurs transformations possible avec

{
  "transforms": "events,public"
}

TOAST

Technique de stockage des attributs trop grands

UPDATE sans changement de l’attribut

{
  "op": "u",
  "before": null,
  "after": {
    "namespace.database.public.toasty.Value": {
      "id": "2e142822-cc92-4c4f-9fdc-ec45504753a9",
      "updated_at": {
        "string": "2024-01-17T10:53:34.929630Z"
      },
      "clob": {
        "string": "__debezium_unavailable_value"
      }
...
  • The Oversized-Attribute Storage Technique (TOAST)
  • https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-toasted-values

TOAST


Bases avec peu d’écriture

low writes


Bases avec peu d’écriture

low writes simplified


Heartbeats

{
  "heartbeat.interval.ms": "1000",
  "heartbeat.action.query": "update ovh_debezium_heartbeat set ts=now();"
}

Kafka

  • queue = file en francais

Fonctionnalités


Partitionnement et réplication

{
  "topic.creation.default.partitions": "6",
  "topic.creation.default.replication.factor": "3"
}
  • Partitionnement : découpe d’une grosse pièce en plusieurs plus petites
  • Réplication : copie de ces pièces en plusieurs exemplaires

Nombre de partitions

Nombre de partitions

Topic routing

14 topics

252 partitions

-99.88 %


Compaction

{
  "topic.creation.default.cleanup.policy": "compact",
  "topic.creation.default.delete.retention.ms": "604800000"
}

Compression

{
  "producer.override.compression.type": "lz4"
}

De Kafka vers le Data Lake


Replication spark


Replication spark


Autres utilisations


La suite


Avez-vous vraiment besoin du CDC ?


Questions