What is Airflow?
Core Components
- Web server: UI 담당 웹 서버
- Scheduler: 워크플로우 스케줄링
- Metastore: 메타데이터가 저장되는 데이터 베이스
- Executor: 작업이 어디서 실행될지 정의
- Worker: 작업이 실행되는 프로세스
DAG
dag_id
: 유니크한 dag 이름start_date
: dag가 처음 스케줄되는 시간.datetime
모듈로 정의schedule_interval
: 스케줄되는 간격. cron으로 정의하거나"@daily"
와 같이 정의default_args
owner
:str
,"airflow"
email_on_failure
:bool
email_on_retry
:bool
email
:str
retries
:int
retry_delay
:datetime.timedelta
catchup
: airflow가 트리거하지 않았던 날짜에 대해 실행 여부,False
로 하는 것이 좋음
DAGrun
- 스케줄러가 DAGrun object 생성
- 주어진 시간의 dag 정보를 담고 있는 인스턴스
- 실행될 task들을 가지고 있음
- 원자성, 멱등성
Operator
- Operator = Task
- 실행하고자 하는 작업을 캡슐화했다고 생각하면 된다.
task_id
: 하나의 dag내에서 유니크한 이름을 가져야 한다.- 종류
- Action Operator: 실행 (bash)
- Transfer Operator: 전송 (mysql, postgres)
- Sensor Operator: 감지
How Airflow works?
One Node Architecture
- Web server가 metastore에서 메타데이터 정보를 가져온다.
- Schduler가 Metastore와 Executor에서 DAG, 작업을 트리거한다.
- Executor가 작업 업데이트를 metastore에 완료되었다고 업데이트한다.
- Executor에는 큐가 있는데, 실행이 정해진 순서대로 되게 한다.
Multi Nodes Architecture (Celery)
- Node1에는 웹서버, 스케줄러, 익스큐터가 있음
- Node2에는 메타스토어와 큐가 있음. 큐는 rabbit MQ나 redis 같은 서비스를 사용함
- 워커 노드들에는 Airflow 워커들이 있음
- 실행 방식은 1 노드 구조와 유사하나, 익스큐터는 외부 큐에 작업을 푸시한다.
- 큐 내부에 있는 작업은 워커에 의해 풀 된다.
folder dags
- dags 폴더에 파일이 저장
- 웹 서버와 스케쥴러가 이를 파싱함
- 스케쥴러가 메타스토어에 dagrun object를 생성
- dag가 실행이 되어야 할 경우 스케줄러가 TaskInstance object를 메타스토어에 스케줄함
- TaskInstance를 익스큐터에 보냄
- 실행 중에 메타스토어의 정보를 실행중으로 업데이트
- 완료되면 메타스토어의 정보를 완료로 업데이트
- dagrun이 종료되었는지 검증
- 웹서버가 metastore의 정보를 ui에 업데이트
caution
- dag에 파일이 업데이트 되면
dag_dir_list_interval
주기 후 (기본 5분) UI상에서 확인 가능하다. min_file_process_interval
(기본 30초) dag를 파싱하는 시간. dag 코드가 업데이트 되도 해당 시간 후 반영된다.
Commands
useful commands
airflow run
: 하나의 task 인스턴스 실행airflow list_dags
: dag 목록airflow dag_state
: dag 상태airflow task_state
: task 상태airflow test
: 테스트
test
$ airflow tasks test DAG_ID TASK_ID DATE
dependencies
- task 디펜던시 줄 때 줄바꾸기
|
|
DAG Details
date options
start_date
- DAG의 task들이 언제부터 트리거되고 스케줄되는지 시간을 정의.
- ex) 2019-03-01로 정의했다면, 2019년 3월 1일 자정에 스케줄됨
- python의
datetime
모듈로 정의 가능 - 과거나 미래로 설정 가능함
- 미래로 설정 시, 해당 시간이 될 때까지 기다림
- 과거로 설정 시, 기다리지 않고 실행 가능. 그러나
catchup=False
로 주지 않으면 과거로 설정한 날짜로 부터schedule_interval
마다 task instance가 실행되므로 주의
datetime.now()
와 같이 동적으로 할당하지 말 것
schedule_interval
start_date
의 최소값으로 부터 트리거되는 시간 간격- 같은 dag에서도 task 별로
start_date
를 따로 줄 수 있기 때문에 최소값으로 정의됨 - 그러나 같은 dag내 task들은 같은
start_date
를 쓰는 것이 좋음
- 같은 dag에서도 task 별로
- cron expression이나
datetime.timedelta
모듈로 정의 가능- cron을 쓰는 것이 더 정확한 표현을 할 수 있으므로 cron을 사용하는 것이 좋음
execution_date
- dag가 실행된 시각이 아님
start_date
-schedule_interval
end_date
- dag/task가 더이상 스케줄되지 않는 시간
- 기본값은 None
Backfill
- 실행되지 않았던 dag/task를 실행하는 기능
catchup=True
로 설정 시 수행catchup=False
로 설정 시 실행되지 않은 가장 마지막 dag/task만 실행하도록 되어 있음- CLI를 통해 실행 가능. airflow 공식문서
depends_on_past
- task 레벨에서 정의
default_args
에 정의 해서 모든 task에도 적용 가능- 이전 dagrun의 특정 task가 실패했다면, 이번 dagrun에서 그 task가 실행되는 것을 막을 수 있음
wait_for_downstream
- task 레벨에서 정의
default_args
에 정의 해서 모든 task에도 적용 가능wait_for_downtstream
이 정의된 task의 downstream task들이 이전 dagrun에서 완료될 때 까지 이번 dagrun 대기
DAGs folder structure
1. Zip
- dag 파일은 zip 파일 root에 위치해야함
- 모듈 디펜던시가 필요하면 virtualenv와 pip 사용
2. DAGBag
- DAG 모음. 폴더 구조로 dag를 다룬다.
- dev/staging/prod와 같이 환경 분리에 이점이 있다
- dagbag이 깨지면 airflow UI상에서 에러가 뜨지 않고 웹서버 로그로만 확인 가능하므로 주의
3. .airflowignore
- .gitignore와 유사
- 모든 dags 폴더에 넣는 것이 좋음
Failure Detection
DAGs
dagrun_timeout
: dagrun이 타임아웃되는 시간. 스케줄된 dag만 해당하며(수동 실행은 해당되지 않음) 실행중인 dag의 수가max_active_runs
와 일치하는 경우에만 해당sla_miss_callback
on_failure_callback
on_success_callback
Tasks
email
email_on_failure
email_on_retry
retries
retry_delay
retry_exponential_backoff
max_retry_delay
execution_timeout
on_failure_callback
on_success_callback
on_retry_callback
Test
DAG validation tests
- 유효한지
- cycle이 없는지
- default arguments가 잘 설정됐는지
DAG/Pipeline Definition Tests
- task의 숫자가 맞는지
- (로직이 아닌) task가 잘 정의 됐는지
- task의 upstream, downstream 디펜던시가 잘 정의됐는지
Unit tests
- 로직 체크
- operator가 잘 동작되는지만 체크
- 복잡한 로직을 airflow가 하게 두지 마라 (airflow는 오케스트레이션 툴이다)
Integration tests
- task가 데이터를 잘 교환하는지
- task의 input 체크
- 여러 task간 의존성 체크
End to End Pipeline tests
- 결과가 올바른지
- 전체 로직 체크
- 성능 체크
Local Executor
- 병렬 실행 가능 (개발시 local executor 사용 권장)
parallelism = 0
: unlimitparallelism > 0
: limit- 코어수 - 1로 설정하는 것을 권장
dag_cuncurrency
와max_active_runs_per_dag
옵션에 따라 dag간 task 실행 순서를 조정할 수 있다.dag_cuncurrency
: dag내에서 동시에 실행 가능한 task의 수max_active_runs_per_dag
: 동시에 실행 시킬 수 있는 dag 수(backfill 일 때 주로 신경쓸 듯)
SubDAGs
- 유사한 DAG를 하나의 그룹으로 묶어 UI상에서 마치 하나의 DAG인 것 처럼 표시할 수 있음
SubDagOperator
사용- 기본 Executor는
SequentialExecutor
- 기본 Executor는
- main DAG가 모든 subDAG 들을 task로 관리
- Airflow UI는 오직 main DAG만 표시
- subDAG는 부모 DAG와 동일한 시각에 스케줄되어야 함. 그렇지 않으면 예상치 못한 결과를 낳을 수 있음
- 데드락이 발생할 수 있음
Branching
- DAG가 특정 task의 결과에 따라 경로를 선택할 수 있게끔 하는 것
BranchPythonOperator
BranchPythonOperator
의 결과가 task_c를 반환하고BranchPythonOperator
의 downstream으로 task_a, task_b, task_c가 있다면 task_c를 실행하고, a와 b는 스킵depends_on_past=True
로 지정시, a와 b는 실패 상태로 뜨기 때문에 다음 DAGrun은 실행되지 않음BranchPythonOperator
에 empty path를 주면 의도하지 않은 결과를 줄 수 있기 때문에 반드시 주는 것이 좋다.- path를 스킵하고 싶다면 (task를 끝내고 싶다면) dummy task를 줘서 끝내자
- 마지막 task도 skipped로 뜨는 것을 막고 싶다면 마지막 task operator에
trigger_rule='one_success'
추가
Trigger Rule
depends_on_past
와 사용 가능- upstream task중 skipped task가 있으면
all_success
와all_failed
는 skipped 상태로 표시됨
Kinds
all_success
: upstream task가 모두 성공하면 runall_failed
: upstream task가 모두 실패하면 runall_done
: upstream task가 성공 유무와 관련없이 끝나면 runone_failed
: upstream task가 하나라도 실패해야 runone_success
: upstream task가 하나라도 성공해야 runnone_failed
: upstream task가 failed가 없어야 runnone_skipped
: upstream task가 skipped가 없어야 rundummy
Variables
- metadata DB에 저장되는 값
- Key, value, Is encrypted로 구성
- JSON 형식 가능
Templating
- placeholder
{{}}
를 사용하여 값을 대체 - Jinja template
Macros
XCOMs
- task 간 메세지 공유
- key, value, timestamp로 구성
- value는 가벼워야 한다. (성능 문제)
xcom_push(key, value)
로 metadata DB에 푸시- key:
returned_value
, value: 리턴값
- key:
xcom_pull(key)
로 받기- key를 명시하지 않으면
returned_value
가 기본값
- key를 명시하지 않으면
TriggerDagRunOperator
- 다른 DAG(컨트롤러)의 조건에 따라 특정 DAG(타겟)를 시작하게 함
- branch나 subdag로는 너무 복잡해질때 사용
- 컨트롤러는 타겟이 종료될 때 까지 기다리지 않음
- 컨트롤러와 타겟은 독립적임
- 컨트롤러의 히스토리에 관한 시각화는 제공되지 않음
- 두 dag모두 스케줄되어야 함
- 타겟 interval은 None이어야 함
- 두 dag간에 메세지를 주고받을 수 있음(xcoms 대체)
ExternalTaskSensor
- DAG간 종속성 줄 때 사용
- 예를 들어 DAG1(
t1 >> t2 >> t3
), DAG2(t3 >> t4 >> t5
)가 있을 때 t3가 완료되면 (DAG1이 완료되면) DAG2가 실행되도록 한다. (t4부터) - 두 DAG는같은 스케줄이어야 함 (또는
execution_delta
나execution_date_fn
파라미터 사용) TriggerDagRunOperator
와 쓰면 고장남- 스케줄 인터벌 None이기 때문
Logging
airflow.cfg
의base_log_folder
: log 저장 경로fab_logging_level
: flask app builder의 로깅 수준. (flask 기반 웹서버)
Metrics
- Counters: 실패한 task 수
- Gauges: queued task 수
- Timers: task 완료까지의 밀리초