Real-Time Feeding of a Data Lake with PostgreSQL and Debezium
PostgreSQL Users Group Belgium
February 13, 2024
Speaker
- Julien RIOU
- Open Source DBA
- https://julien.riou.xyz
- @jriou@hachyderm.io
Summary
- Who are we?
- Internal databases
- Data Lake
- ETL
- CDC
- Other uses
- The future
Who are we?
Internal databases
Statistics
- 3 DBMS (MySQL, MongoDB, PostgreSQL)
- 7 autonomous infrastructures worldwide
- 500+ servers
- 2000+ databases
- 100+ clusters
- Highly secure environments
Cluster example
Mutualized environments
Analytics needs
- Billing
- Revenue
- Enterprise strategy
- KPIs
- Fraud detection
- Electrical consumption
- Metadata analysis (from JIRA)
- Work time detection of support teams
Mix of workloads
- Online Transactional Processing (OLTP)
- Online Analytical Processing (OLAP)
Data Lake
- 3 environnements (production, integration, sandbox)
- 2 geographical regions (OVH Group, OVH US)
- 900 TB
Data Lake
Extract Transform Load (ETL)
Extract data from a source
Transform data to an expected format
Load data into a destination
Sqoop
“mózg” = “brain” in polish
Sqoop
- Extract with
SELECT * FROM... WHERE id BETWEEN...
- Load into Hive
ETL’s limitations
- Mix of workloads
- Direct acces to the databases
- Lagging data
Change Data Capture (CDC)
Detect changes happening in databases
Triggers
Execute actions on events
- Capture/log/audit tables
- Fonction inserting
OLD
andNEW
values AFTER INSERT OR UPDATE OR DELETE
trigger
Triggers
- Impact on write performance
- Relations visible to the users
- Retention policy managed by users themselves
LISTEN/NOTIFY
Publication and subscription system (pub/sub)
NOTIFY channel [ , payload ]
LISTEN channel
- https://www.postgresql.org/docs/current/sql-notify.html
- https://www.postgresql.org/docs/current/sql-listen.html
LISTEN/NOTIFY
- Impact on write performance
- Lost events when disconnected
Logical replication
Decode events based on Write-Ahead Logs (WAL)
CREATE PUBLICATION
CREATE SUBSCRIPTION
- Replication slot
Debezium
Debezium
- Source Kafka Connect connector
- Compatible with PostgreSQL and many RDBMS
- Real-time thanks to logical replication
- Standard message format with Apache Avro
- Open source (Apache License, Version 2.0)
Demo
Snapshot and streaming
Snapshots
- Initial copy (once)
- On-demand snapshots with signal tables
- Partial snapshots
*.include.list
*.exclude.list
Streaming
- Based on logical replication
- Detect DML statements (
INSERT
,UPDATE
,DELETE
) - Primary key highly recommended
pgoutput
plugin
How to configure Debezium
Kafka Connect REST API:
curl -XPOST -d @payload.json -s localhost:8083/connectors
Payload example
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.ApplicationName": "debezium",
"database.dbname": "test",
"database.hostname": "${file:/path/to/secrets/test:hostname}",
"database.password": "${file:/path/to/secrets/test:password}",
"database.port": "${file:/path/to/secrets/test:port}",
"database.ssl.mode": "required",
"database.user": "${file:/path/to/secrets/test:user}",
"decimal.handling.mode": "double",
"field.name.adjustment.mode": "avro",
"heartbeat.action.query": "update ovh_debezium_heartbeat set ts=now();",
"heartbeat.interval.ms": "1000",
"incremental.snapshot.chunk.size": "100000",
"name": "test",
"plugin.name": "pgoutput",
"producer.override.compression.type": "lz4",
"publication.name": "debezium_test",
"sanitize.field.names": "true",
"schema.name.adjustment.mode": "avro",
"signal.data.collection": "public.ovh_debezium_signal",
"slot.name": "debezium_test",
"status.update.interval.ms": "1000",
"topic.creation.default.cleanup.policy": "compact",
"topic.creation.default.delete.retention.ms": "604800000",
"topic.creation.default.partitions": "6",
"topic.creation.default.replication.factor": "3",
"topic.heartbeat.prefix": "namespace.debezium.heartbeat",
"topic.prefix": "namespace.test",
"transforms": "schema,public",
"transforms.public.regex": "namespace\\.test\\.public\\.(.+)",
"transforms.public.replacement": "namespace.test.$1",
"transforms.public.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.schema.regex": "namespace\\.test\\.((?!public).+)\\.(.+)",
"transforms.schema.replacement": "namespace.test.$1.$2",
"transforms.schema.type": "org.apache.kafka.connect.transforms.RegexRouter"
}
Automation
ansible-playbook -e '{"database_name":"test"}' database-connector-create.yml
ansible-playbook -e '{"database_name":"test"}' database-connector-delete.yml
Debezium endpoint
- Replication slot on the primary
- Postgres 16 highlight: Logical decoding on standby (video)
Replica promotion
- Patroni advances replication slot position on all replicas
- Transparent for Debezium
Passwords security
- Custom vault microservice (HTTPS)
- Copy secrets to local files
- Reference to local files using FileConfigProvider
{
"database.password": "${file:/path/to/secrets/test:password}"
}
Database connection lost
io.debezium.DebeziumException: Couldn't obtain encoding for database test
Manual restart of the connector
Table inheritance
events
events_2023_10
events_2023_11
events_2023_12
events_2024_01
Table inheritance
test=> select count(*) from events;
count
------------
1314817483
(1 row)
test=> select count(*) from only events;
count
-------
0
(1 row)
https://github.com/debezium/debezium/pull/5159
- Snapshots selecting data from primary table
- Those rows are not included in the logical replication
- Rows never deleted
Topic routing (SMT)
Table name + regex = topic name
- Tables with the same structure
- Standardization of table names on all DBMS
- Reduce number of partitions on Kafka
- SMT = Single Message Transformation
- Remove “public” schema on PostgreSQL
- Remove double schema names on MySQL
Tables with the same structure
{
"transforms": "events",
"transforms.events.type": "org.apache.kafka.connect.transforms.RegexRouter"
"transforms.events.regex": "namespace\\.database\\.public\\.events_[0-9]{4}_[0-9]{2}",
"transforms.events.replacement": "namespace.database.public.events",
}
Remove “public” schema
{
"transforms": "public",
"transforms.public.type": "org.apache.kafka.connect.transforms.RegexRouter"
"transforms.public.regex": "namespace\\.test\\.public\\.(.+)",
"transforms.public.replacement": "namespace.test.$1",
}
Multiple transformations can be defined using a comma-separated list of transformation names:
{
"transforms": "events,public"
}
TOAST
The Oversized-Attribute Storage Technique
UPDATE
without modifying the attribute
{
"op": "u",
"before": null,
"after": {
"namespace.database.public.toasty.Value": {
"id": "2e142822-cc92-4c4f-9fdc-ec45504753a9",
"updated_at": {
"string": "2024-01-17T10:53:34.929630Z"
},
"clob": {
"string": "__debezium_unavailable_value"
}
...
- https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-toasted-values
TOAST
ALTER TABLE... REPLICA IDENTITY DEFAULT
- Value replaced by
unavailable.value.placeholder
- Value replaced by
ALTER TABLE... REPLICA IDENTITY FULL
- Value included
Databases with low writes
Databases with low writes
Heartbeats
{
"heartbeat.interval.ms": "1000",
"heartbeat.action.query": "update ovh_debezium_heartbeat set ts=now();"
}
Kafka
- Queue system
- 3 clusters dedicated to CDC
- Europe (5,000 to 15,000 events per second)
- Canada (2,000 events per second)
- USA
- 20 brokers
- PCI DSS
Features
- Partitioning
- Replication
- Compaction
- Compression
Partitioning and replication
{
"topic.creation.default.partitions": "6",
"topic.creation.default.replication.factor": "3"
}
- Partitioning: slice large piece into smaller ones
- Replication: maintain copies of a partition
Number of partitions
- 1 group of 80 databases
- Country + Universe
- 140 tables per database
- 1 table = 18 partitions (
partitions
(6) *replication factor
(3)) - Estimation of 201,600 partitions
Number of partitions
Topic routing
14 topics
252 partitions
-99.88 %
Compaction
- Retention policy
- Keep only the last value of each key identifying a message
- Full copy of the database on Kafka
- History of changes kept for 7 days (tombstones)
{
"topic.creation.default.cleanup.policy": "compact",
"topic.creation.default.delete.retention.ms": "604800000"
}
Compression
{
"producer.override.compression.type": "lz4"
}
zstd
algorithm for the future but it’s not a priority
From Kafka to the Data Lake
Other uses
- Caching system
- Asynchronous tasks
The future
- Hortonworks Data Platform (HDP) Open Source is deprecated
Do you really need CDC?
Before opening up your laptop and install everything to run Debezium on your infrastructure, you should ensure that you really need change data capture. It can be expensive, hard to setup and complex to maintain. You should go through easier steps first, like creating an asynchronous replica to perform reporting queries. Then, when simpler solutions have been ruled out, you should consider setting up CDC.