Действуй как Airflow-практик с шрамами от продакшена. Спроектируй оркестрацию для: {{pipeline_purpose}}. Стек: {{tools}}.
Принципы дробления tasks
Один task = одна идемпотентная операция, которая укладывается в ≤30 минут. Если дольше — дроби. Почему:
- Retry дешевле (не перевыполняешь успешное)
- Параллелизация очевиднее
- Логи читаемы (один task — один контекст)
- Падение видно точечно: "load_orders failed", а не "etl_v2 failed на каком-то шаге"
Плохо: task_run_everything PythonOperator на 400 строк.
Хорошо: extract_orders → load_orders → validate_orders → aggregate_orders.
Dependencies — задавай явно и минимально
extract >> load >> [validate, snapshot]
[validate, snapshot] >> aggregate
Не используй set_downstream в цикле без необходимости — теряется визуальная схема. TaskGroup для логической группировки (не SubDAG — это deprecated и ломает scheduler).
Retries & timeouts
default_args = {
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True,
"max_retry_delay": timedelta(hours=1),
"execution_timeout": timedelta(minutes=30),
}
Правила:
- retries=0 для тасков, которые не идемпотентны (избежишь повторов с побочными эффектами)
- exponential backoff обязателен для внешних API — иначе DDoS-ишь свой же сервис
- execution_timeout всегда — иначе зависший worker блокирует слот навсегда
SLA — это soft-deadline, не таймаут
sla=timedelta(hours=2) шлёт алерт, если task не закончился к dagrun.start + 2h. НЕ останавливает выполнение. Используй для бизнес-обещаний ("отчёт готов к 9:00"), не для технических.
Алерт через sla_miss_callback, не через email (он бесполезен в проде).
Sensors vs polling — главный источник проблем
Sensor блокирует worker-слот, пока ждёт. На большом DAG'е sensors съедают пул и блокируют всё.
Правильно:
mode='reschedule'вместо'poke'— освобождает слот между проверкамиDeferrable Operators(Airflow 2.2+) — async, нулевая нагрузка на worker- Или вообще не sensor, а event-driven: triggerer + дёрнуть DAG при событии (Pub/Sub, Kafka, S3 event)
# Плохо — съедает слот 2 часа
FileSensor(filepath="/data/ready.flag", poke_interval=30)
# Хорошо
FileSensor(filepath="/data/ready.flag", mode="reschedule", poke_interval=300)
# Ещё лучше
S3KeySensorAsync(bucket_key="s3://bucket/ready.flag") # deferrable
Как избежать DAG-of-doom
Признаки:
- 200+ tasks в одном файле
- 5+ TaskGroup глубины
- Render занимает >5 секунд (видно в UI)
- Любое изменение требует понимания всего DAG'а
Лечение:
- Один DAG = одна бизнес-цель, один owner, одна каденция
- Связи между DAG'ами —
TriggerDagRunOperatorилиDataset(Airflow 2.4+, event-driven) - Для миграций, бекфиллов — отдельные DAG'и, не флаги в основном
Когда смотреть в сторону Prefect / Dagster
| Проблема | Куда смотреть | | Динамические workflow'ы (граф зависит от данных) | Prefect (нативно), Dagster (через asset graph) | | Сильная типизация I/O между задачами | Dagster (assets + IO managers) | | Низкая когнитивная нагрузка для аналитиков | Prefect (Pythonic API) | | Lineage + observability из коробки | Dagster | | Уже работает, команда знает | Оставайся на Airflow, не мигрируй ради миграции |
Вывод
## DAG: {{pipeline_purpose}}
schedule: '0 * * * *'
catchup: False
max_active_runs: 1
tags: [domain, owner]
## Task graph (mermaid)
graph LR
extract --> load --> validate
validate --> [snapshot, aggregate]
## Параметры по task'ам
| Task | Operator | retries | timeout | mode |
## SLA контракт
| Task / DAG | SLA | Алерт-канал |
## Идемпотентность
Каждый task должен быть безопасен для перезапуска. Опиши как для каждого.
Anti-patterns
- ❌ Sensor в
mode='poke'на 4 часа — съест все слоты пула - ❌ XCom для больших данных — XCom не для DataFrame, грузи в S3 и передавай путь
- ❌
catchup=Trueбез понимания — после паузы DAG запускает сотни прогонов разом и кладёт инфру - ❌ Логика в
default_argsчерез мутирующий объект — все DAG'и шарят ссылку, баги ловишь годами - ❌ Бизнес-логика прямо в PythonOperator вместо вызова отдельного модуля — невозможно покрыть unit-тестами
Декомпозиция задачи на агентов
Разбить большую задачу на параллельных независимых агентов с чёткими интерфейсами.
Последовательный пайплайн агентов
Когда параллель не подходит: цепочка агентов с явными контрактами между шагами.
Когда применять subagent
Критерии: контекст, изоляция, параллель. Типовые шаблоны вызова и анти-кейсы.