Transactional outbox pattern
Outbox table в одной транзакции с данными, async publisher, дедупликация на consumer, обработка poison messages.
Спроектируй transactional outbox для {{domain}}. Брокер: {{broker}}.
Базовая аксиома: «записать в БД И опубликовать в Kafka» — не атомарно. БД commit'нула, broker timeout'нул — событие потеряно, consumers ничего не знают. БД не успела commit'нуть, broker принял — событие опубликовано про несуществующий заказ. Outbox решает это, разделяя «обещание опубликовать» и «реально опубликовано».
1. Проблема dual write
// Анти-паттерн
async function createOrder(input) {
const order = await db.orders.insert(input); // ① БД
await kafka.publish('order.created', order); // ② Broker
return order;
}
Failure modes:
- ① succeeded, ② failed (network, broker down) → order есть, события нет
- ① succeeded, ② failed после publish но до response → ретрай дублирует
- ② succeeded, ① rollback'нулся → событие про несуществующий order
- Сервис упал между ① и ② → молчаливая потеря
Никакой retry/circuit breaker не делает эти две операции атомарными — они в разных системах.
2. Идея outbox
Записать событие в ту же БД что и бизнес-данные, в той же транзакции:
BEGIN;
INSERT INTO orders (...);
INSERT INTO outbox (event_type, payload, ...);
COMMIT;
Атомарно. Если commit прошёл — есть и order, и outbox запись. Если rollback — нет ни того, ни другого.
Отдельный процесс — outbox publisher — читает outbox table, публикует в broker, помечает как sent. Это уже single-system retry-able операция.
3. Outbox table
CREATE TABLE outbox (
id BIGSERIAL PRIMARY KEY,
aggregate_type TEXT NOT NULL, -- 'order', 'payment'
aggregate_id TEXT NOT NULL, -- 'ord_abc123' — для ordering
event_type TEXT NOT NULL, -- 'order.created', 'order.cancelled'
payload JSONB NOT NULL, -- сам event
headers JSONB, -- trace_id, tenant_id
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
published_at TIMESTAMPTZ, -- NULL = не опубликовано
attempt_count INTEGER NOT NULL DEFAULT 0,
last_error TEXT
);
-- Индекс для publisher'а: только unpublished, по порядку
CREATE INDEX outbox_unpublished_idx
ON outbox (created_at)
WHERE published_at IS NULL;
Ключевое:
aggregate_id— позволяет shardить publishing по агрегату (события одного order идут в порядке)payloadJSONB, не TEXT — index, query, валидацияpublished_at IS NULLpartial index — main querySELECT WHERE published_at IS NULLбежит по маленькому индексу
4. Publisher — два подхода
4.1 Polling publisher (проще)
async function publishLoop() {
while (true) {
const batch = await db.query(`
SELECT * FROM outbox
WHERE published_at IS NULL
ORDER BY created_at
LIMIT 100
FOR UPDATE SKIP LOCKED -- важно для нескольких publisher'ов
`);
for (const row of batch) {
try {
await broker.publish(row.event_type, row.payload, {
headers: { ...row.headers, outbox_id: row.id },
partitionKey: row.aggregate_id,
});
await db.query(
'UPDATE outbox SET published_at = now() WHERE id = $1',
[row.id]
);
} catch (err) {
await db.query(
'UPDATE outbox SET attempt_count = attempt_count + 1, last_error = $2 WHERE id = $1',
[row.id, String(err)]
);
}
}
if (batch.length === 0) await sleep(500);
}
}
Плюсы: просто, портабельно, работает с любой БД. Минусы: latency 200-500ms (polling interval), нагрузка на БД от постоянных SELECT.
FOR UPDATE SKIP LOCKED — критично если запущено N publisher replicas: каждая берёт свой batch без конфликтов.
4.2 CDC publisher (Debezium / Postgres logical replication)
Publisher слушает WAL Postgres, видит INSERT в outbox, публикует. Не нужен polling.
Плюсы: low latency (~50ms), нет polling load. Минусы: операционная сложность (Kafka Connect, Debezium), требует logical replication, ещё один сервис.
Выбор:
- < 100 events/sec → polling, не парься
-
1000 events/sec или latency критична → CDC
5. Дедупликация на consumer side
Outbox publisher может публиковать at-least-once — упал между broker.publish() и UPDATE published_at → следующая итерация опубликует снова. Consumer обязан быть idempotent.
Способ: каждое событие имеет event_id (= outbox.id или UUID). Consumer хранит processed_event_ids:
async function handleOrderCreated(event) {
const seen = await db.query(
'INSERT INTO processed_events (event_id) VALUES ($1) ON CONFLICT DO NOTHING RETURNING event_id',
[event.id]
);
if (seen.rowCount === 0) return; // дубликат — skip
// … бизнес-логика
}
Альтернатива — natural idempotency: INSERT ON CONFLICT DO NOTHING на бизнес-данных. Не всегда возможно.
processed_events cleanup: TTL 7-30 дней, после этого вероятность дубликата ≈ 0.
6. Ordering — когда важно
В Kafka topic с N партициями events с одинаковым partitionKey идут в порядке. Если bus = order events:
partitionKey = aggregate_id (order_id)
→ Все события одного order — в одной partition → strict order. События разных order'ов — параллельно.
В RabbitMQ — single queue, single consumer per consumer group. Сложнее.
В SQS — FIFO queue с MessageGroupId.
7. Poison messages
Событие не публикуется (broker отвергает schema, payload corrupted, retry'и не помогают). Без обработки — застрянет навечно, блокирует следующие события того же aggregate'а.
-- Если attempt_count > 10 — отправить в dead letter
UPDATE outbox SET published_at = now(), dlq = true
WHERE id = $1 AND attempt_count >= 10;
DLQ варианты:
- Отдельная
outbox_dlqтаблица — оператор разбирает руками - Topic
order-events.dlqв Kafka — отдельный consumer пишет в БД для алертов - Alert в Slack/PagerDuty при появлении в DLQ
Никогда: не пропускать события молча. Лучше остановить publisher и алертить, чем потерять.
8. Schema evolution
Outbox payload меняется со временем. Правила:
- Include
schema_versionв headers - Consumer должен обрабатывать N и N-1 версии одновременно (rolling deploy)
- Никогда не remove поле — только deprecate. Удалить через 6 месяцев когда все consumers обновились
- Breaking change → новый event_type (
order.created.v2)
9. Cleanup outbox
Опубликованные записи не нужны — но не удаляй сразу:
- Debug — увидеть «было ли событие отправлено»
- Replay — re-emit в новый consumer
Решение:
- Хранить
published_at IS NOT NULL7-30 дней - Cron:
DELETE FROM outbox WHERE published_at < now() - interval '30 days' - Если нужен audit log на годы — отдельная
events_archive(партиционирована по месяцу)
10. Что мониторить
- Outbox lag:
MAX(now() - created_at) WHERE published_at IS NULL→ если > 30 секунд — алерт - Unpublished count:
COUNT(*) WHERE published_at IS NULL→ растёт → publisher не справляется или упал - DLQ size:
COUNT(*) WHERE dlq = true→ > 0 → нужен оператор - Publish rate: events/sec
- Consumer lag: на стороне broker'а
Anti-patterns
- ❌
db.insert()+broker.publish()в одной функции без outbox — гарантированная потеря событий когда что-то падает - ❌ Outbox в другой БД (или другом DB connection pool) — теряется атомарность, той же проблемой не отличается от dual write
- ❌ Publisher без
FOR UPDATE SKIP LOCKED— две replica берут один батч → дубли - ❌ DELETE из outbox сразу после publish — потерял debug, replay невозможен
- ❌ Consumer без дедупликации — at-least-once delivery + retry = duplicate processing
- ❌ Удаление поля из payload без schema_version — старый consumer падает на новых событиях
- ❌ Poison message stuck forever без DLQ — outbox растёт, lag огромный, никто не знает почему
- ❌ Один publisher на всё → SPOF; падает → события копятся часами
- ❌ Polling каждые 10ms «для latency» → DB CPU 100% на пустых SELECT
- ❌ Ordering предполагается, но
partitionKeyне выставлен → события разбегаются по партициям → out-of-order
В конце
- Outbox table schema (с partial index на unpublished)
- Publisher: polling vs CDC, с обоснованием
- Consumer dedup:
event_id+ processed table или natural idempotency - Ordering strategy (partition key = aggregate_id)
- Poison/DLQ flow
- Schema evolution policy
- Metrics + alerts (lag, unpublished count, DLQ size)
- Cleanup retention
Таксономия событий
Названия событий и параметров так, чтобы аналитик через год не плакал.
Multi-agent: координатор и специалисты
Архитектура из координатора и специализированных агентов: передача контекста, дедупликация, race conditions.
Новый subagent или новый skill: что выбрать
Decision tree: создавать ли отдельного агента или достаточно skill. Критерии — контекст, переиспользование, frequency, complexity.