Meetup PostgreSQL Lille
Gwendoline ROTROU, Julien RIOU
25 Janvier 2024
|
|
Extraction de la donnée depuis une source
Transformation de la donnée pour correspondre au format attendu
Chargement dans une destination
SELECT * FROM... WHERE id BETWEEN...
Détection des changements dans une base de données
Déclenche des actions en fonction d’événements
OLD
et NEW
AFTER INSERT OR UPDATE OR DELETE
Système de publication et de souscription
NOTIFY
LISTEN
Décodage d’événements grâce aux Write-Ahead Logs (WAL)
CREATE PUBLICATION
CREATE SUBSCRIPTION
*.include.list
*.exclude.list
INSERT
, UPDATE
, DELETE
)pgoutput
API REST de Kafka Connect
curl -XPOST -d @payload.json -s localhost:8083/connectors
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.ApplicationName": "debezium",
"database.dbname": "test",
"database.hostname": "${file:/path/to/secrets/test:hostname}",
"database.password": "${file:/path/to/secrets/test:password}",
"database.port": "${file:/path/to/secrets/test:port}",
"database.ssl.mode": "required",
"database.user": "${file:/path/to/secrets/test:user}",
"decimal.handling.mode": "double",
"field.name.adjustment.mode": "avro",
"heartbeat.action.query": "update ovh_debezium_heartbeat set ts=now();",
"heartbeat.interval.ms": "1000",
"incremental.snapshot.chunk.size": "100000",
"name": "test",
"plugin.name": "pgoutput",
"producer.override.compression.type": "lz4",
"publication.name": "debezium_test",
"sanitize.field.names": "true",
"schema.name.adjustment.mode": "avro",
"signal.data.collection": "public.ovh_debezium_signal",
"slot.name": "debezium_test",
"status.update.interval.ms": "1000",
"topic.creation.default.cleanup.policy": "compact",
"topic.creation.default.delete.retention.ms": "604800000",
"topic.creation.default.partitions": "6",
"topic.creation.default.replication.factor": "3",
"topic.heartbeat.prefix": "namespace.debezium.heartbeat",
"topic.prefix": "namespace.test",
"transforms": "schema,public",
"transforms.public.regex": "namespace\\.test\\.public\\.(.+)",
"transforms.public.replacement": "namespace.test.$1",
"transforms.public.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.schema.regex": "namespace\\.test\\.((?!public).+)\\.(.+)",
"transforms.schema.replacement": "namespace.test.$1.$2",
"transforms.schema.type": "org.apache.kafka.connect.transforms.RegexRouter"
}
ansible-playbook -e '{"database_name":"test"}' database-connector-create.yml
ansible-playbook -e '{"database_name":"test"}' database-connector-delete.yml
{
"database.password": "${file:/path/to/secrets/test:password}"
}
Redémarrage manuel du connecteur
io.debezium.DebeziumException: Couldn't obtain encoding for database test
events
events_2023_10
events_2023_11
events_2023_12
events_2024_01
test=> select count(*) from events;
count
------------
1314817483
(1 row)
test=> select count(*) from only events;
count
-------
0
(1 row)
Transformation d’un nom de table en un nom de topic à l’aide de regex
{
"transforms": "events",
"transforms.events.type": "org.apache.kafka.connect.transforms.RegexRouter"
"transforms.events.regex": "namespace\\.database\\.public\\.events_[0-9]{4}_[0-9]{2}",
"transforms.events.replacement": "namespace.database.public.events",
}
{
"transforms": "public",
"transforms.public.type": "org.apache.kafka.connect.transforms.RegexRouter"
"transforms.public.regex": "namespace\\.test\\.public\\.(.+)",
"transforms.public.replacement": "namespace.test.$1",
}
Technique de stockage des attributs trop grands
UPDATE
sans changement de l’attribut
{
"op": "u",
"before": null,
"after": {
"namespace.database.public.toasty.Value": {
"id": "2e142822-cc92-4c4f-9fdc-ec45504753a9",
"updated_at": {
"string": "2024-01-17T10:53:34.929630Z"
},
"clob": {
"string": "__debezium_unavailable_value"
}
...
ALTER TABLE... REPLICA IDENTITY DEFAULT
unavailable.value.placeholder
ALTER TABLE... REPLICA IDENTITY FULL
{
"heartbeat.interval.ms": "1000",
"heartbeat.action.query": "update ovh_debezium_heartbeat set ts=now();"
}
{
"topic.creation.default.partitions": "6",
"topic.creation.default.replication.factor": "3"
}
partitions
(6) * replication factor
(3))Topic routing
14 topics
252 partitions
-99.88 %
{
"topic.creation.default.cleanup.policy": "compact",
"topic.creation.default.delete.retention.ms": "604800000"
}
{
"producer.override.compression.type": "lz4"
}