Перейти к содержанию

8. CDC и Debezium


CDC (Change Data Capture) превращает изменения из OLTP в поток событий для дальнейшей обработки.

Debezium — популярный CDC-инструмент в связке с Kafka Connect: он читает изменения БД из логических журналов и публикует события в Kafka через Connect.


Термины и сущности

Термин Определение
CDC Change Data Capture: доставка изменений данных (INSERT/UPDATE/DELETE) как событий.
Logical replication (PostgreSQL) Механизм публикации изменений на уровне логических событий (через replication slot’ы).
binlog (MySQL) Журнал изменений MySQL, который Debezium может читать для CDC.
Debezium connector Конфигурация Debezium, подключающаяся к БД и публикующая события в Kafka.
Schema evolution Изменение схемы данных (DDL) со временем: совместимость сообщений и версий схем.
Backpressure Ситуация, когда downstream не успевает: накапливаются задержки, растёт consumption ресурсов.
Schema Registry Компонент (обычно Confluent), который хранит и версионирует схемы (Avro/Protobuf/JSON Schema).

Архитектура CDC в “production-логике”

Упрощённая последовательность:

1) включаем логическое чтение изменений в БД, 2) Debezium connector подписывается на изменения, 3) Kafka Connect доставляет события в Kafka, 4) consumer’ы применяют бизнес-логику, 5) изменения схемы (DDL) совместимы и версионированы.

Production best practice:

  • планируйте совместимость схем заранее: DDL в OLTP должен быть “совместим с пайплайном”.

Практика: включить logical replication в PostgreSQL

Точные шаги зависят от вашей версии и политики безопасности, но производственная логика обычно такая:

-- 1) Включите параметры в postgresql.conf (понадобится restart):
-- wal_level = logical
-- max_replication_slots = 10
-- max_wal_senders = 10

-- 2) Role для репликации
CREATE ROLE debezium_replication WITH REPLICATION LOGIN PASSWORD 'REPLACE_ME';

-- 3) Publication: какие таблицы/набор событий доступны для логической репликации
CREATE PUBLICATION dbserver_pub FOR ALL TABLES;

Комментарий:

  • replication slot’ы удерживают WAL, пока прогресс connector’а не продвинется,
  • поэтому monitoring задержки CDC критичен: он напрямую влияет на размер WAL и стабильность Postgres.

Практика: создать Debezium connector через REST API Kafka Connect

Ниже пример POST-запроса на создание connector’а. Это production-minded скелет: параметры источника, publication, режим snapshot и параллелизм.

Поля backend для истории схем зависят от вашей версии Debezium/Connect — ниже они опущены, но должны быть настроены по вашему стандарту.

curl -X POST http://kafka-connect:8083/connectors \
  -H 'Content-Type: application/json' \
  -d '{
    "name": "debezium-postgres",
    "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "database.hostname": "postgres",
      "database.port": "5432",
      "database.user": "debezium_replication",
      "database.dbname": "appdb",

      "plugin.name": "pgoutput",
      "publication.name": "dbserver_pub",

      "snapshot.mode": "initial",
      "tasks.max": "2"
    }
  }'

Production best practices:

  • snapshot планируйте в окна с контролируемой нагрузкой на OLTP,
  • задачи/параллелизм (tasks.max) выбирайте под throughput downstream, иначе lag начнёт расти,
  • после DDL — проверьте совместимость схем на consumer’ах.

Schema evolution: совместимость, а не “угадывание”

Когда БД меняет схему:

  • могут появляться новые поля,
  • меняются типы,
  • добавляются/убираются таблицы и ключи.

Production best practices:

  • используйте стратегию совместимости схем (backward/forward) и версионируйте сообщения,
  • применяйте Schema Registry как единый источник схем,
  • обновляйте consumer’ы и schema compatibility в согласованном порядке.

Мини-практика: проверить совместимость (concept)

curl -X POST http://schema-registry:8081/compatibility/subjects/<subject>/versions/latest

Exactly-once vs at-least-once в CDC

В CDC чаще всего реальность такая:

  • доставка в Kafka — ближе к at-least-once (дубликаты возможны),
  • а “exactly-once” на уровне бизнеса достигается идемпотентностью обработки.

Production best practices:

  • dedup по стабильному ключу события (например, LSN/txid + идентификатор строки/изменения),
  • commit progress делайте только после успешного apply_change,
  • для критичных интеграций используйте outbox-паттерн на стороне приложения (если применимо).

Backpressure и lag CDC: как реагировать

Если лаг растёт:

  • проверьте downstream (DB/HTTP) на slow paths,
  • увеличьте параллелизм consumer’ов/worker’ов (не только в Kafka),
  • убедитесь, что connector жив и replication slot’ы не “застряли”,
  • проверьте совместимость схем (DDL мог сломать consumer’а).

Мини-код: идемпотентная обработка событий (Python-псевдокод)

Цель — не применять одно и то же событие дважды.

processed_event_ids = set()

for msg in consumer:
    event_id = msg.headers.get("event-id")  # пример: идемпотентный ключ события
    if event_id in processed_event_ids:
        continue

    apply_change(msg.value())
    processed_event_ids.add(event_id)

    # commit делайте после успешного apply_change
    consumer.commit()

Комментарий:

  • в production дедуп ключи лучше хранить в durable store (БД/Redis с TTL), а не в памяти,
  • commit/offset progress связывайте с успешностью apply_change.

Production checklist

Что проверить Зачем
logical replication включён и доступен Debezium чтобы события вообще появлялись
Lag CDC не растёт устойчиво чтобы не раздувать WAL и не накапливать очередь
Schema evolution совместима чтобы DDL не ломал консьюм
Consumer обработка идемпотентная чтобы дубликаты не ломали бизнес
Есть план реакции на backpressure чтобы MTTR был быстрым