Alimentation d’un Data Lake en temps réel grâce à PostgreSQL et Debezium

intro_logo

Meetup PostgreSQL Lille

Gwendoline ROTROU, Julien RIOU

25 Janvier 2024

Speakers

Gwendoline Rotrou
Julien Riou

Sommaire

  • Qui sommes-nous ?
  • Bases de données interne
  • Data Lake
  • ETL
  • CDC
  • Autre utilisations
  • La suite

Qui sommes-nous ?

OVHcloud logo OVHcloud univers

Bases de données interne

OVHcloud univers pointant vers PostgreSQL

Quelques chiffres

  • 3 DBMS (MySQL, MongoDB, PostgreSQL)
  • 7 infrastructures autonomes dans le monde entier
  • 500+ serveurs
  • 2000+ bases de données
  • 100+ clusters
  • Environnements hautement sécurisés

Exemple de cluster

cluster example

Environnement mutualisé

mutualized databases

Besoin d’analyse

  • Recette
  • Stratégie de l’entreprise
  • KPIs
  • Détection de fraude
  • Consommation électrique
  • Exploitation des meta données (Jira)
  • Calcul du temps de travail pour le support

Mélange de workloads

This is fine

Data Lake

  • 3 environnements (production, integration, sandbox)
  • 2 zones géographiques (OVH Group, OVH US)
  • 900 TB

Data Lake

Extract Transform Load (ETL)

Extraction de la donnée depuis une source

Transformation de la donnée pour correspondre au format attendu

Chargement dans une destination

Sqoop

MOZG

Sqoop

  • Extraction des données
    • SELECT * FROM... WHERE id BETWEEN...
  • Chargement des données
    • Hive

DEPRECATED

Limitations des ETL

  • Mélange de workload
  • Accès en direct aux bases de données
  • Données en retard

Change Data Capture (CDC)

Détection des changements dans une base de données

Triggers

Déclenche des actions en fonction d’événements

  • Table de capture/log/audit
  • Fonction qui insère OLD et NEW
  • Trigger AFTER INSERT OR UPDATE OR DELETE

Triggers

  • Impact sur les performances en écriture
  • Relations visibles pour les utilisateurs
  • Rétention assurée par les utilisateurs eux-même

LISTEN/NOTIFY

Système de publication et de souscription

  • Trigger qui publie sur un canal avec NOTIFY
  • Clients qui souscrivent au canal avec LISTEN

LISTEN/NOTIFY

  • Impact sur les performances en écriture
  • Perte d’événement si les souscrits se déconnectent

Réplication logique

Décodage d’événements grâce aux Write-Ahead Logs (WAL)

  • CREATE PUBLICATION
  • CREATE SUBSCRIPTION
  • Slot de réplication

Debezium

CDC

Debezium

  • Connecteur Kafka Connect de type source
  • Compatible avec PostgreSQL et d’autres DBMS
  • Temps réel grâce à la réplication logique
  • Format de message standard avec Apache Avro
  • Open source (Apache License, Version 2.0)

CDC TO KAFKA

Demo

Snapshot et streaming

Snapshots

  • Copie initiale (1 fois)
  • Copies à la demande avec les signal tables
  • Copies partielles
    • *.include.list
    • *.exclude.list

snapshot

Streaming

  • Réplication logique
  • Capture les DML (INSERT, UPDATE, DELETE)
  • Clé primaire ou unique vivement recommandé
  • Plugin pgoutput

streaming

Comment configurer Debezium

API REST de Kafka Connect

curl -XPOST -d @payload.json -s localhost:8083/connectors

Exemple de payload

{
  "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  "database.ApplicationName": "debezium",
  "database.dbname": "test",
  "database.hostname": "${file:/path/to/secrets/test:hostname}",
  "database.password": "${file:/path/to/secrets/test:password}",
  "database.port": "${file:/path/to/secrets/test:port}",
  "database.ssl.mode": "required",
  "database.user": "${file:/path/to/secrets/test:user}",
  "decimal.handling.mode": "double",
  "field.name.adjustment.mode": "avro",
  "heartbeat.action.query": "update ovh_debezium_heartbeat set ts=now();",
  "heartbeat.interval.ms": "1000",
  "incremental.snapshot.chunk.size": "100000",
  "name": "test",
  "plugin.name": "pgoutput",
  "producer.override.compression.type": "lz4",
  "publication.name": "debezium_test",
  "sanitize.field.names": "true",
  "schema.name.adjustment.mode": "avro",
  "signal.data.collection": "public.ovh_debezium_signal",
  "slot.name": "debezium_test",
  "status.update.interval.ms": "1000",
  "topic.creation.default.cleanup.policy": "compact",
  "topic.creation.default.delete.retention.ms": "604800000",
  "topic.creation.default.partitions": "6",
  "topic.creation.default.replication.factor": "3",
  "topic.heartbeat.prefix": "namespace.debezium.heartbeat",
  "topic.prefix": "namespace.test",
  "transforms": "schema,public",
  "transforms.public.regex": "namespace\\.test\\.public\\.(.+)",
  "transforms.public.replacement": "namespace.test.$1",
  "transforms.public.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.schema.regex": "namespace\\.test\\.((?!public).+)\\.(.+)",
  "transforms.schema.replacement": "namespace.test.$1.$2",
  "transforms.schema.type": "org.apache.kafka.connect.transforms.RegexRouter"
}

Comment nous configurons Debezium

Ansible logo

ansible-playbook -e '{"database_name":"test"}' database-connector-create.yml
ansible-playbook -e '{"database_name":"test"}' database-connector-delete.yml

Où se connecte Debezium

Promotion d’un réplica

  • Patroni gère l’avancement des slots de réplication sur tous des noeuds
  • Transparent pour Debezium

Sécurité des mots de passe

  • Coffre fort interne via HTTPS
  • Copie des secrets dans des fichiers locaux
  • Référence à ces fichiers avec FileConfigProvider
{
  "database.password": "${file:/path/to/secrets/test:password}"
}

Perte de connexion à la base de données

Redémarrage manuel du connecteur

io.debezium.DebeziumException: Couldn't obtain encoding for database test

Tables héritées

  • events
    • events_2023_10
    • events_2023_11
    • events_2023_12
    • events_2024_01

Tables héritées

test=> select count(*) from events;
   count    
------------
 1314817483
(1 row)
test=> select count(*) from only events;
 count 
-------
     0
(1 row)

https://github.com/debezium/debezium/pull/5159

Topic routing (SMT)

Transformation d’un nom de table en un nom de topic à l’aide de regex

  • Tables partageant le même schéma
  • Uniformisation des noms de topics entre les DBMS
  • Réduire le nombre de partitions sur Kafka

Tables avec le même schéma

topic routing same schema

{
  "transforms": "events",
  "transforms.events.type": "org.apache.kafka.connect.transforms.RegexRouter"
  "transforms.events.regex": "namespace\\.database\\.public\\.events_[0-9]{4}_[0-9]{2}",
  "transforms.events.replacement": "namespace.database.public.events",
}

Retrait du schéma public

topic routing remove public schema

{
  "transforms": "public",
  "transforms.public.type": "org.apache.kafka.connect.transforms.RegexRouter"
  "transforms.public.regex": "namespace\\.test\\.public\\.(.+)",
  "transforms.public.replacement": "namespace.test.$1",
}

TOAST

Technique de stockage des attributs trop grands

UPDATE sans changement de l’attribut

{
  "op": "u",
  "before": null,
  "after": {
    "namespace.database.public.toasty.Value": {
      "id": "2e142822-cc92-4c4f-9fdc-ec45504753a9",
      "updated_at": {
        "string": "2024-01-17T10:53:34.929630Z"
      },
      "clob": {
        "string": "__debezium_unavailable_value"
      }
...

TOAST

  • ALTER TABLE... REPLICA IDENTITY DEFAULT
    • Valeur remplacée par unavailable.value.placeholder
  • ALTER TABLE... REPLICA IDENTITY FULL
    • Valeur incluse

Bases avec peu d’écriture

low writes

Bases avec peu d’écriture

low writes simplified

Heartbeats

{
  "heartbeat.interval.ms": "1000",
  "heartbeat.action.query": "update ovh_debezium_heartbeat set ts=now();"
}

Kafka

  • Système de queue
  • 3 clusters pour le CDC
    • Europe (5,000 à 15,000 événements par seconde)
    • Canada (2,000 événements par seconde)
    • USA
  • 20 brokers
  • PCI DSS

Fonctionnalités

  • Partitionnement
  • Réplication
  • Compaction
  • Compression

Partitionnement et réplication

{
  "topic.creation.default.partitions": "6",
  "topic.creation.default.replication.factor": "3"
}

Nombre de partitions

  • 1 regroupement de 80 bases de données
    • Pays + “Univers”
  • 140 tables par base
  • 1 table = 18 partitions (partitions (6) * replication factor (3))
  • Total estimé de 201,600 partitions

Nombre de partitions

Topic routing

14 topics

252 partitions

-99.88 %

Compaction

  • Mécanisme de rétention
  • Garde la dernière valeur de chaque ligne
  • Permet une copie identique entièrement sur Kafka
  • 7 jours avant suppression définitive (tombstone)
{
  "topic.creation.default.cleanup.policy": "compact",
  "topic.creation.default.delete.retention.ms": "604800000"
}

Compression

{
  "producer.override.compression.type": "lz4"
}

De Kafka vers le Data Lake

Replication spark

Replication spark

Autres utilisations

  • Gestion de cache
  • Gestion de tâches asynchrones

La suite

  • Hortonworks Data Platform (HDP) Open Source déprécié

Avez-vous vraiment besoin du CDC ?

Questions

// reveal.js plugins