Alimentation d’un Data Lake en temps réel grâce à PostgreSQL et Debezium
Meetup PostgreSQL Lille
25 Janvier 2024
Speakers
|
|
Sommaire
- Qui sommes-nous ?
- Bases de données interne
- Data Lake
- ETL
- CDC
- Autre utilisations
- La suite
Qui sommes-nous ?
Bases de données interne
Quelques chiffres
- 3 DBMS (MySQL, MongoDB, PostgreSQL)
- 7 infrastructures autonomes dans le monde entier
- 500+ serveurs
- 2000+ bases de données
- 100+ clusters
- Environnements hautement sécurisés
Exemple de cluster
Environnement mutualisé
Besoin d’analyse
- Recette
- Stratégie de l’entreprise
- KPIs
- Détection de fraude
- Consommation électrique
- Exploitation des meta données (Jira)
- Calcul du temps de travail pour le support
Mélange de workloads
- Online Transactional Processing (OLTP)
- Online Analytical Processing (OLAP) Source de l’image
Data Lake
- 3 environnements (production, integration, sandbox)
- 2 zones géographiques (OVH Group, OVH US)
- 900 TB
Data Lake
Extract Transform Load (ETL)
Extraction de la donnée depuis une source
Transformation de la donnée pour correspondre au format attendu
Chargement dans une destination
Sqoop
“mózg” = “cerveau” en polonais.
Sqoop
- Extraction des données
SELECT * FROM... WHERE id BETWEEN...
- Chargement des données
- Hive
Limitations des ETL
- Mélange de workload
- Accès en direct aux bases de données
- Données en retard
Change Data Capture (CDC)
Détection des changements dans une base de données
ETL sous stéroïde.
Triggers
Déclenche des actions en fonction d’événements
- Table de capture/log/audit
- Fonction qui insère
OLD
etNEW
- Trigger
AFTER INSERT OR UPDATE OR DELETE
Triggers
- Impact sur les performances en écriture
- Relations visibles pour les utilisateurs
- Rétention assurée par les utilisateurs eux-même
LISTEN/NOTIFY
Système de publication et de souscription
- Trigger qui publie sur un canal avec
NOTIFY
- Clients qui souscrivent au canal avec
LISTEN
LISTEN/NOTIFY
- Impact sur les performances en écriture
- Perte d’événement si les souscrits se déconnectent
Réplication logique
Décodage d’événements grâce aux Write-Ahead Logs (WAL)
CREATE PUBLICATION
CREATE SUBSCRIPTION
- Slot de réplication
Debezium
Debezium
- Connecteur Kafka Connect de type source
- Compatible avec PostgreSQL et d’autres DBMS
- Temps réel grâce à la réplication logique
- Format de message standard avec Apache Avro
- Open source (Apache License, Version 2.0)
Demo
Snapshot et streaming
Snapshots
- Copie initiale (1 fois)
- Copies à la demande avec les signal tables
- Copies partielles
*.include.list
*.exclude.list
Streaming
- Réplication logique
- Capture les DML (
INSERT
,UPDATE
,DELETE
) - Clé primaire ou unique vivement recommandé
- Plugin
pgoutput
Comment configurer Debezium
API REST de Kafka Connect
curl -XPOST -d @payload.json -s localhost:8083/connectors
Exemple de payload
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.ApplicationName": "debezium",
"database.dbname": "test",
"database.hostname": "${file:/path/to/secrets/test:hostname}",
"database.password": "${file:/path/to/secrets/test:password}",
"database.port": "${file:/path/to/secrets/test:port}",
"database.ssl.mode": "required",
"database.user": "${file:/path/to/secrets/test:user}",
"decimal.handling.mode": "double",
"field.name.adjustment.mode": "avro",
"heartbeat.action.query": "update ovh_debezium_heartbeat set ts=now();",
"heartbeat.interval.ms": "1000",
"incremental.snapshot.chunk.size": "100000",
"name": "test",
"plugin.name": "pgoutput",
"producer.override.compression.type": "lz4",
"publication.name": "debezium_test",
"sanitize.field.names": "true",
"schema.name.adjustment.mode": "avro",
"signal.data.collection": "public.ovh_debezium_signal",
"slot.name": "debezium_test",
"status.update.interval.ms": "1000",
"topic.creation.default.cleanup.policy": "compact",
"topic.creation.default.delete.retention.ms": "604800000",
"topic.creation.default.partitions": "6",
"topic.creation.default.replication.factor": "3",
"topic.heartbeat.prefix": "namespace.debezium.heartbeat",
"topic.prefix": "namespace.test",
"transforms": "schema,public",
"transforms.public.regex": "namespace\\.test\\.public\\.(.+)",
"transforms.public.replacement": "namespace.test.$1",
"transforms.public.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.schema.regex": "namespace\\.test\\.((?!public).+)\\.(.+)",
"transforms.schema.replacement": "namespace.test.$1.$2",
"transforms.schema.type": "org.apache.kafka.connect.transforms.RegexRouter"
}
Comment nous configurons Debezium
ansible-playbook -e '{"database_name":"test"}' database-connector-create.yml
ansible-playbook -e '{"database_name":"test"}' database-connector-delete.yml
Où se connecte Debezium
- Sur le primaire pour utiliser un slot de réplication
- Postgres 16 highlight: Logical decoding on standby (video)
Promotion d’un réplica
- Patroni gère l’avancement des slots de réplication sur tous des noeuds
- Transparent pour Debezium
Sécurité des mots de passe
- Coffre fort interne via HTTPS
- Copie des secrets dans des fichiers locaux
- Référence à ces fichiers avec FileConfigProvider
{
"database.password": "${file:/path/to/secrets/test:password}"
}
Perte de connexion à la base de données
Redémarrage manuel du connecteur
io.debezium.DebeziumException: Couldn't obtain encoding for database test
Tables héritées
events
events_2023_10
events_2023_11
events_2023_12
events_2024_01
Tables héritées
test=> select count(*) from events;
count
------------
1314817483
(1 row)
test=> select count(*) from only events;
count
-------
0
(1 row)
https://github.com/debezium/debezium/pull/5159
- Snapshots prennant en compte les lignes de la table principale
- Lignes qui ne proviennent pas de la réplication logique
- Lignes donc jamais supprimmées
Topic routing (SMT)
Transformation d’un nom de table en un nom de topic à l’aide de regex
- Tables partageant le même schéma
- Uniformisation des noms de topics entre les DBMS
- Réduire le nombre de partitions sur Kafka
- SMT = Single Message Transformation
- Retrait du schéma public sur PostgreSQL
- Déduplication du nom de base de données sur MySQL
Tables avec le même schéma
{
"transforms": "events",
"transforms.events.type": "org.apache.kafka.connect.transforms.RegexRouter"
"transforms.events.regex": "namespace\\.database\\.public\\.events_[0-9]{4}_[0-9]{2}",
"transforms.events.replacement": "namespace.database.public.events",
}
Retrait du schéma public
{
"transforms": "public",
"transforms.public.type": "org.apache.kafka.connect.transforms.RegexRouter"
"transforms.public.regex": "namespace\\.test\\.public\\.(.+)",
"transforms.public.replacement": "namespace.test.$1",
}
Combinaison de plusieurs transformations possible avec
{
"transforms": "events,public"
}
TOAST
Technique de stockage des attributs trop grands
UPDATE
sans changement de l’attribut
{
"op": "u",
"before": null,
"after": {
"namespace.database.public.toasty.Value": {
"id": "2e142822-cc92-4c4f-9fdc-ec45504753a9",
"updated_at": {
"string": "2024-01-17T10:53:34.929630Z"
},
"clob": {
"string": "__debezium_unavailable_value"
}
...
- The Oversized-Attribute Storage Technique (TOAST)
- https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-toasted-values
TOAST
ALTER TABLE... REPLICA IDENTITY DEFAULT
- Valeur remplacée par
unavailable.value.placeholder
- Valeur remplacée par
ALTER TABLE... REPLICA IDENTITY FULL
- Valeur incluse
Bases avec peu d’écriture
Bases avec peu d’écriture
Heartbeats
{
"heartbeat.interval.ms": "1000",
"heartbeat.action.query": "update ovh_debezium_heartbeat set ts=now();"
}
Kafka
- Système de queue
- 3 clusters pour le CDC
- Europe (5,000 à 15,000 événements par seconde)
- Canada (2,000 événements par seconde)
- USA
- 20 brokers
- PCI DSS
- queue = file en francais
Fonctionnalités
- Partitionnement
- Réplication
- Compaction
- Compression
Partitionnement et réplication
{
"topic.creation.default.partitions": "6",
"topic.creation.default.replication.factor": "3"
}
- Partitionnement : découpe d’une grosse pièce en plusieurs plus petites
- Réplication : copie de ces pièces en plusieurs exemplaires
Nombre de partitions
- 1 regroupement de 80 bases de données
- Pays + “Univers”
- 140 tables par base
- 1 table = 18 partitions (
partitions
(6) *replication factor
(3)) - Total estimé de 201,600 partitions
Nombre de partitions
Topic routing
14 topics
252 partitions
-99.88 %
Compaction
- Mécanisme de rétention
- Garde la dernière valeur de chaque ligne
- Permet une copie identique entièrement sur Kafka
- 7 jours avant suppression définitive (tombstone)
{
"topic.creation.default.cleanup.policy": "compact",
"topic.creation.default.delete.retention.ms": "604800000"
}
Compression
{
"producer.override.compression.type": "lz4"
}
De Kafka vers le Data Lake
Autres utilisations
- Gestion de cache
- Gestion de tâches asynchrones
La suite
- Hortonworks Data Platform (HDP) Open Source déprécié