Skip to content
PПромтбук
RUEN
07Пайплайны

Дизайн DAG'ов в Airflow без боли

Дробление tasks, dependencies, retries, SLA, sensors vs polling, как не сделать DAG-of-doom и когда смотреть на Prefect/Dagster.

Действуй как Airflow-практик с шрамами от продакшена. Спроектируй оркестрацию для: {{pipeline_purpose}}. Стек: {{tools}}.

Принципы дробления tasks

Один task = одна идемпотентная операция, которая укладывается в ≤30 минут. Если дольше — дроби. Почему:

  • Retry дешевле (не перевыполняешь успешное)
  • Параллелизация очевиднее
  • Логи читаемы (один task — один контекст)
  • Падение видно точечно: "load_orders failed", а не "etl_v2 failed на каком-то шаге"

Плохо: task_run_everything PythonOperator на 400 строк. Хорошо: extract_orders → load_orders → validate_orders → aggregate_orders.

Dependencies — задавай явно и минимально

extract >> load >> [validate, snapshot]
[validate, snapshot] >> aggregate

Не используй set_downstream в цикле без необходимости — теряется визуальная схема. TaskGroup для логической группировки (не SubDAG — это deprecated и ломает scheduler).

Retries & timeouts

default_args = {
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
    "retry_exponential_backoff": True,
    "max_retry_delay": timedelta(hours=1),
    "execution_timeout": timedelta(minutes=30),
}

Правила:

  • retries=0 для тасков, которые не идемпотентны (избежишь повторов с побочными эффектами)
  • exponential backoff обязателен для внешних API — иначе DDoS-ишь свой же сервис
  • execution_timeout всегда — иначе зависший worker блокирует слот навсегда

SLA — это soft-deadline, не таймаут

sla=timedelta(hours=2) шлёт алерт, если task не закончился к dagrun.start + 2h. НЕ останавливает выполнение. Используй для бизнес-обещаний ("отчёт готов к 9:00"), не для технических.

Алерт через sla_miss_callback, не через email (он бесполезен в проде).

Sensors vs polling — главный источник проблем

Sensor блокирует worker-слот, пока ждёт. На большом DAG'е sensors съедают пул и блокируют всё.

Правильно:

  • mode='reschedule' вместо 'poke' — освобождает слот между проверками
  • Deferrable Operators (Airflow 2.2+) — async, нулевая нагрузка на worker
  • Или вообще не sensor, а event-driven: triggerer + дёрнуть DAG при событии (Pub/Sub, Kafka, S3 event)
# Плохо — съедает слот 2 часа
FileSensor(filepath="/data/ready.flag", poke_interval=30)

# Хорошо
FileSensor(filepath="/data/ready.flag", mode="reschedule", poke_interval=300)

# Ещё лучше
S3KeySensorAsync(bucket_key="s3://bucket/ready.flag")  # deferrable

Как избежать DAG-of-doom

Признаки:

  • 200+ tasks в одном файле
  • 5+ TaskGroup глубины
  • Render занимает >5 секунд (видно в UI)
  • Любое изменение требует понимания всего DAG'а

Лечение:

  • Один DAG = одна бизнес-цель, один owner, одна каденция
  • Связи между DAG'ами — TriggerDagRunOperator или Dataset (Airflow 2.4+, event-driven)
  • Для миграций, бекфиллов — отдельные DAG'и, не флаги в основном

Когда смотреть в сторону Prefect / Dagster

| Проблема | Куда смотреть | | Динамические workflow'ы (граф зависит от данных) | Prefect (нативно), Dagster (через asset graph) | | Сильная типизация I/O между задачами | Dagster (assets + IO managers) | | Низкая когнитивная нагрузка для аналитиков | Prefect (Pythonic API) | | Lineage + observability из коробки | Dagster | | Уже работает, команда знает | Оставайся на Airflow, не мигрируй ради миграции |

Вывод

## DAG: {{pipeline_purpose}}
schedule: '0 * * * *'
catchup: False
max_active_runs: 1
tags: [domain, owner]

## Task graph (mermaid)
graph LR
  extract --> load --> validate
  validate --> [snapshot, aggregate]

## Параметры по task'ам
| Task | Operator | retries | timeout | mode |

## SLA контракт
| Task / DAG | SLA | Алерт-канал |

## Идемпотентность
Каждый task должен быть безопасен для перезапуска. Опиши как для каждого.

Anti-patterns

  • ❌ Sensor в mode='poke' на 4 часа — съест все слоты пула
  • ❌ XCom для больших данных — XCom не для DataFrame, грузи в S3 и передавай путь
  • catchup=True без понимания — после паузы DAG запускает сотни прогонов разом и кладёт инфру
  • ❌ Логика в default_args через мутирующий объект — все DAG'и шарят ссылку, баги ловишь годами
  • ❌ Бизнес-логика прямо в PythonOperator вместо вызова отдельного модуля — невозможно покрыть unit-тестами
К подразделу «Пайплайны»
Похожие промты