Skip to content
PПромтбук
RUEN
04Архитектура

Transactional outbox pattern

Outbox table в одной транзакции с данными, async publisher, дедупликация на consumer, обработка poison messages.

Спроектируй transactional outbox для {{domain}}. Брокер: {{broker}}.

Базовая аксиома: «записать в БД И опубликовать в Kafka» — не атомарно. БД commit'нула, broker timeout'нул — событие потеряно, consumers ничего не знают. БД не успела commit'нуть, broker принял — событие опубликовано про несуществующий заказ. Outbox решает это, разделяя «обещание опубликовать» и «реально опубликовано».

1. Проблема dual write

// Анти-паттерн
async function createOrder(input) {
  const order = await db.orders.insert(input);     // ① БД
  await kafka.publish('order.created', order);      // ② Broker
  return order;
}

Failure modes:

  • ① succeeded, ② failed (network, broker down) → order есть, события нет
  • ① succeeded, ② failed после publish но до response → ретрай дублирует
  • ② succeeded, ① rollback'нулся → событие про несуществующий order
  • Сервис упал между ① и ② → молчаливая потеря

Никакой retry/circuit breaker не делает эти две операции атомарными — они в разных системах.

2. Идея outbox

Записать событие в ту же БД что и бизнес-данные, в той же транзакции:

BEGIN;
  INSERT INTO orders (...);
  INSERT INTO outbox (event_type, payload, ...);
COMMIT;

Атомарно. Если commit прошёл — есть и order, и outbox запись. Если rollback — нет ни того, ни другого.

Отдельный процесс — outbox publisher — читает outbox table, публикует в broker, помечает как sent. Это уже single-system retry-able операция.

3. Outbox table

CREATE TABLE outbox (
  id              BIGSERIAL PRIMARY KEY,
  aggregate_type  TEXT NOT NULL,           -- 'order', 'payment'
  aggregate_id    TEXT NOT NULL,           -- 'ord_abc123' — для ordering
  event_type      TEXT NOT NULL,           -- 'order.created', 'order.cancelled'
  payload         JSONB NOT NULL,          -- сам event
  headers         JSONB,                   -- trace_id, tenant_id
  created_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
  published_at    TIMESTAMPTZ,             -- NULL = не опубликовано
  attempt_count   INTEGER NOT NULL DEFAULT 0,
  last_error      TEXT
);

-- Индекс для publisher'а: только unpublished, по порядку
CREATE INDEX outbox_unpublished_idx
  ON outbox (created_at)
  WHERE published_at IS NULL;

Ключевое:

  • aggregate_id — позволяет shardить publishing по агрегату (события одного order идут в порядке)
  • payload JSONB, не TEXT — index, query, валидация
  • published_at IS NULL partial index — main query SELECT WHERE published_at IS NULL бежит по маленькому индексу

4. Publisher — два подхода

4.1 Polling publisher (проще)

async function publishLoop() {
  while (true) {
    const batch = await db.query(`
      SELECT * FROM outbox
      WHERE published_at IS NULL
      ORDER BY created_at
      LIMIT 100
      FOR UPDATE SKIP LOCKED   -- важно для нескольких publisher'ов
    `);

    for (const row of batch) {
      try {
        await broker.publish(row.event_type, row.payload, {
          headers: { ...row.headers, outbox_id: row.id },
          partitionKey: row.aggregate_id,
        });
        await db.query(
          'UPDATE outbox SET published_at = now() WHERE id = $1',
          [row.id]
        );
      } catch (err) {
        await db.query(
          'UPDATE outbox SET attempt_count = attempt_count + 1, last_error = $2 WHERE id = $1',
          [row.id, String(err)]
        );
      }
    }

    if (batch.length === 0) await sleep(500);
  }
}

Плюсы: просто, портабельно, работает с любой БД. Минусы: latency 200-500ms (polling interval), нагрузка на БД от постоянных SELECT.

FOR UPDATE SKIP LOCKED — критично если запущено N publisher replicas: каждая берёт свой batch без конфликтов.

4.2 CDC publisher (Debezium / Postgres logical replication)

Publisher слушает WAL Postgres, видит INSERT в outbox, публикует. Не нужен polling.

Плюсы: low latency (~50ms), нет polling load. Минусы: операционная сложность (Kafka Connect, Debezium), требует logical replication, ещё один сервис.

Выбор:

  • < 100 events/sec → polling, не парься
  • 1000 events/sec или latency критична → CDC

5. Дедупликация на consumer side

Outbox publisher может публиковать at-least-once — упал между broker.publish() и UPDATE published_at → следующая итерация опубликует снова. Consumer обязан быть idempotent.

Способ: каждое событие имеет event_id (= outbox.id или UUID). Consumer хранит processed_event_ids:

async function handleOrderCreated(event) {
  const seen = await db.query(
    'INSERT INTO processed_events (event_id) VALUES ($1) ON CONFLICT DO NOTHING RETURNING event_id',
    [event.id]
  );
  if (seen.rowCount === 0) return;  // дубликат — skip

  // … бизнес-логика
}

Альтернатива — natural idempotency: INSERT ON CONFLICT DO NOTHING на бизнес-данных. Не всегда возможно.

processed_events cleanup: TTL 7-30 дней, после этого вероятность дубликата ≈ 0.

6. Ordering — когда важно

В Kafka topic с N партициями events с одинаковым partitionKey идут в порядке. Если bus = order events:

partitionKey = aggregate_id (order_id)

→ Все события одного order — в одной partition → strict order. События разных order'ов — параллельно.

В RabbitMQ — single queue, single consumer per consumer group. Сложнее.

В SQS — FIFO queue с MessageGroupId.

7. Poison messages

Событие не публикуется (broker отвергает schema, payload corrupted, retry'и не помогают). Без обработки — застрянет навечно, блокирует следующие события того же aggregate'а.

-- Если attempt_count > 10 — отправить в dead letter
UPDATE outbox SET published_at = now(), dlq = true
WHERE id = $1 AND attempt_count >= 10;

DLQ варианты:

  • Отдельная outbox_dlq таблица — оператор разбирает руками
  • Topic order-events.dlq в Kafka — отдельный consumer пишет в БД для алертов
  • Alert в Slack/PagerDuty при появлении в DLQ

Никогда: не пропускать события молча. Лучше остановить publisher и алертить, чем потерять.

8. Schema evolution

Outbox payload меняется со временем. Правила:

  • Include schema_version в headers
  • Consumer должен обрабатывать N и N-1 версии одновременно (rolling deploy)
  • Никогда не remove поле — только deprecate. Удалить через 6 месяцев когда все consumers обновились
  • Breaking change → новый event_type (order.created.v2)

9. Cleanup outbox

Опубликованные записи не нужны — но не удаляй сразу:

  • Debug — увидеть «было ли событие отправлено»
  • Replay — re-emit в новый consumer

Решение:

  • Хранить published_at IS NOT NULL 7-30 дней
  • Cron: DELETE FROM outbox WHERE published_at < now() - interval '30 days'
  • Если нужен audit log на годы — отдельная events_archive (партиционирована по месяцу)

10. Что мониторить

  • Outbox lag: MAX(now() - created_at) WHERE published_at IS NULL → если > 30 секунд — алерт
  • Unpublished count: COUNT(*) WHERE published_at IS NULL → растёт → publisher не справляется или упал
  • DLQ size: COUNT(*) WHERE dlq = true → > 0 → нужен оператор
  • Publish rate: events/sec
  • Consumer lag: на стороне broker'а

Anti-patterns

  • db.insert() + broker.publish() в одной функции без outbox — гарантированная потеря событий когда что-то падает
  • ❌ Outbox в другой БД (или другом DB connection pool) — теряется атомарность, той же проблемой не отличается от dual write
  • ❌ Publisher без FOR UPDATE SKIP LOCKED — две replica берут один батч → дубли
  • ❌ DELETE из outbox сразу после publish — потерял debug, replay невозможен
  • ❌ Consumer без дедупликации — at-least-once delivery + retry = duplicate processing
  • ❌ Удаление поля из payload без schema_version — старый consumer падает на новых событиях
  • ❌ Poison message stuck forever без DLQ — outbox растёт, lag огромный, никто не знает почему
  • ❌ Один publisher на всё → SPOF; падает → события копятся часами
  • ❌ Polling каждые 10ms «для latency» → DB CPU 100% на пустых SELECT
  • ❌ Ordering предполагается, но partitionKey не выставлен → события разбегаются по партициям → out-of-order

В конце

  • Outbox table schema (с partial index на unpublished)
  • Publisher: polling vs CDC, с обоснованием
  • Consumer dedup: event_id + processed table или natural idempotency
  • Ordering strategy (partition key = aggregate_id)
  • Poison/DLQ flow
  • Schema evolution policy
  • Metrics + alerts (lag, unpublished count, DLQ size)
  • Cleanup retention
К подразделу «Архитектура»
Похожие промты