PostgreSQL Users Group Belgium
Julien RIOU
February 13, 2024
Extract data from a source
Transform data to an expected format
Load data into a destination
SELECT * FROM... WHERE id BETWEEN...
Detect changes happening in databases
Execute actions on events
OLD
and NEW
valuesAFTER INSERT OR UPDATE OR DELETE
triggerPublication and subscription system (pub/sub)
NOTIFY channel [ , payload ]
LISTEN channel
Decode events based on Write-Ahead Logs (WAL)
CREATE PUBLICATION
CREATE SUBSCRIPTION
*.include.list
*.exclude.list
INSERT
, UPDATE
, DELETE
)pgoutput
pluginKafka Connect REST API:
curl -XPOST -d @payload.json -s localhost:8083/connectors
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.ApplicationName": "debezium",
"database.dbname": "test",
"database.hostname": "${file:/path/to/secrets/test:hostname}",
"database.password": "${file:/path/to/secrets/test:password}",
"database.port": "${file:/path/to/secrets/test:port}",
"database.ssl.mode": "required",
"database.user": "${file:/path/to/secrets/test:user}",
"decimal.handling.mode": "double",
"field.name.adjustment.mode": "avro",
"heartbeat.action.query": "update ovh_debezium_heartbeat set ts=now();",
"heartbeat.interval.ms": "1000",
"incremental.snapshot.chunk.size": "100000",
"name": "test",
"plugin.name": "pgoutput",
"producer.override.compression.type": "lz4",
"publication.name": "debezium_test",
"sanitize.field.names": "true",
"schema.name.adjustment.mode": "avro",
"signal.data.collection": "public.ovh_debezium_signal",
"slot.name": "debezium_test",
"status.update.interval.ms": "1000",
"topic.creation.default.cleanup.policy": "compact",
"topic.creation.default.delete.retention.ms": "604800000",
"topic.creation.default.partitions": "6",
"topic.creation.default.replication.factor": "3",
"topic.heartbeat.prefix": "namespace.debezium.heartbeat",
"topic.prefix": "namespace.test",
"transforms": "schema,public",
"transforms.public.regex": "namespace\\.test\\.public\\.(.+)",
"transforms.public.replacement": "namespace.test.$1",
"transforms.public.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.schema.regex": "namespace\\.test\\.((?!public).+)\\.(.+)",
"transforms.schema.replacement": "namespace.test.$1.$2",
"transforms.schema.type": "org.apache.kafka.connect.transforms.RegexRouter"
}
ansible-playbook -e '{"database_name":"test"}' database-connector-create.yml
ansible-playbook -e '{"database_name":"test"}' database-connector-delete.yml
{
"database.password": "${file:/path/to/secrets/test:password}"
}
io.debezium.DebeziumException: Couldn't obtain encoding for database test
Manual restart of the connector
events
events_2023_10
events_2023_11
events_2023_12
events_2024_01
test=> select count(*) from events;
count
------------
1314817483
(1 row)
test=> select count(*) from only events;
count
-------
0
(1 row)
Table name + regex = topic name
{
"transforms": "events",
"transforms.events.type": "org.apache.kafka.connect.transforms.RegexRouter"
"transforms.events.regex": "namespace\\.database\\.public\\.events_[0-9]{4}_[0-9]{2}",
"transforms.events.replacement": "namespace.database.public.events",
}
{
"transforms": "public",
"transforms.public.type": "org.apache.kafka.connect.transforms.RegexRouter"
"transforms.public.regex": "namespace\\.test\\.public\\.(.+)",
"transforms.public.replacement": "namespace.test.$1",
}
The Oversized-Attribute Storage Technique
UPDATE
without modifying the attribute
{
"op": "u",
"before": null,
"after": {
"namespace.database.public.toasty.Value": {
"id": "2e142822-cc92-4c4f-9fdc-ec45504753a9",
"updated_at": {
"string": "2024-01-17T10:53:34.929630Z"
},
"clob": {
"string": "__debezium_unavailable_value"
}
...
ALTER TABLE... REPLICA IDENTITY DEFAULT
unavailable.value.placeholder
ALTER TABLE... REPLICA IDENTITY FULL
{
"heartbeat.interval.ms": "1000",
"heartbeat.action.query": "update ovh_debezium_heartbeat set ts=now();"
}
{
"topic.creation.default.partitions": "6",
"topic.creation.default.replication.factor": "3"
}
partitions
(6) * replication factor
(3))Topic routing
14 topics
252 partitions
-99.88 %
{
"topic.creation.default.cleanup.policy": "compact",
"topic.creation.default.delete.retention.ms": "604800000"
}
{
"producer.override.compression.type": "lz4"
}