Featured image of post Airflow 정리

Airflow 정리

Airflow 기본 요소 및 내용 정리

What is Airflow?

Core Components

  • Web server: UI 담당 웹 서버
  • Scheduler: 워크플로우 스케줄링
  • Metastore: 메타데이터가 저장되는 데이터 베이스
  • Executor: 작업이 어디서 실행될지 정의
  • Worker: 작업이 실행되는 프로세스

DAG

  • dag_id: 유니크한 dag 이름
  • start_date: dag가 처음 스케줄되는 시간. datetime 모듈로 정의
  • schedule_interval: 스케줄되는 간격. cron으로 정의하거나 "@daily"와 같이 정의
  • default_args
    • owner: str, "airflow"
    • email_on_failure: bool
    • email_on_retry: bool
    • email: str
    • retries: int
    • retry_delay: datetime.timedelta
  • catchup: airflow가 트리거하지 않았던 날짜에 대해 실행 여부, False로 하는 것이 좋음

DAGrun

  • 스케줄러가 DAGrun object 생성
  • 주어진 시간의 dag 정보를 담고 있는 인스턴스
  • 실행될 task들을 가지고 있음
  • 원자성, 멱등성

Operator

  • Operator = Task
  • 실행하고자 하는 작업을 캡슐화했다고 생각하면 된다.
  • task_id: 하나의 dag내에서 유니크한 이름을 가져야 한다.
  • 종류
    • Action Operator: 실행 (bash)
    • Transfer Operator: 전송 (mysql, postgres)
    • Sensor Operator: 감지

How Airflow works?

One Node Architecture

  1. Web server가 metastore에서 메타데이터 정보를 가져온다.
  2. Schduler가 Metastore와 Executor에서 DAG, 작업을 트리거한다.
  3. Executor가 작업 업데이트를 metastore에 완료되었다고 업데이트한다.
  4. Executor에는 큐가 있는데, 실행이 정해진 순서대로 되게 한다.

Multi Nodes Architecture (Celery)

  1. Node1에는 웹서버, 스케줄러, 익스큐터가 있음
  2. Node2에는 메타스토어와 큐가 있음. 큐는 rabbit MQ나 redis 같은 서비스를 사용함
  3. 워커 노드들에는 Airflow 워커들이 있음
  4. 실행 방식은 1 노드 구조와 유사하나, 익스큐터는 외부 큐에 작업을 푸시한다.
  5. 큐 내부에 있는 작업은 워커에 의해 풀 된다.

folder dags

  1. dags 폴더에 파일이 저장
  2. 웹 서버와 스케쥴러가 이를 파싱함
  3. 스케쥴러가 메타스토어에 dagrun object를 생성
  4. dag가 실행이 되어야 할 경우 스케줄러가 TaskInstance object를 메타스토어에 스케줄함
  5. TaskInstance를 익스큐터에 보냄
  6. 실행 중에 메타스토어의 정보를 실행중으로 업데이트
  7. 완료되면 메타스토어의 정보를 완료로 업데이트
  8. dagrun이 종료되었는지 검증
  9. 웹서버가 metastore의 정보를 ui에 업데이트

caution

  • dag에 파일이 업데이트 되면 dag_dir_list_interval 주기 후 (기본 5분) UI상에서 확인 가능하다.
  • min_file_process_interval (기본 30초) dag를 파싱하는 시간. dag 코드가 업데이트 되도 해당 시간 후 반영된다.

Commands

useful commands

  • airflow run: 하나의 task 인스턴스 실행
  • airflow list_dags: dag 목록
  • airflow dag_state: dag 상태
  • airflow task_state: task 상태
  • airflow test: 테스트

test

  • $ airflow tasks test DAG_ID TASK_ID DATE

dependencies

  • task 디펜던시 줄 때 줄바꾸기
1
2
task1 >> task2 >> task3
task3 >> task4 >> task5

DAG Details

date options

start_date

  • DAG의 task들이 언제부터 트리거되고 스케줄되는지 시간을 정의.
    • ex) 2019-03-01로 정의했다면, 2019년 3월 1일 자정에 스케줄됨
  • python의 datetime 모듈로 정의 가능
  • 과거나 미래로 설정 가능함
    • 미래로 설정 시, 해당 시간이 될 때까지 기다림
    • 과거로 설정 시, 기다리지 않고 실행 가능. 그러나 catchup=False로 주지 않으면 과거로 설정한 날짜로 부터 schedule_interval마다 task instance가 실행되므로 주의
  • datetime.now()와 같이 동적으로 할당하지 말 것

schedule_interval

  • start_date의 최소값으로 부터 트리거되는 시간 간격
    • 같은 dag에서도 task 별로 start_date를 따로 줄 수 있기 때문에 최소값으로 정의됨
    • 그러나 같은 dag내 task들은 같은 start_date를 쓰는 것이 좋음
  • cron expression이나 datetime.timedelta 모듈로 정의 가능
    • cron을 쓰는 것이 더 정확한 표현을 할 수 있으므로 cron을 사용하는 것이 좋음

execution_date

  • dag가 실행된 시각이 아님
  • start_date - schedule_interval

end_date

  • dag/task가 더이상 스케줄되지 않는 시간
  • 기본값은 None

Backfill

  • 실행되지 않았던 dag/task를 실행하는 기능
  • catchup=True로 설정 시 수행
  • catchup=False로 설정 시 실행되지 않은 가장 마지막 dag/task만 실행하도록 되어 있음
  • CLI를 통해 실행 가능. airflow 공식문서

depends_on_past

  • task 레벨에서 정의
  • default_args에 정의 해서 모든 task에도 적용 가능
  • 이전 dagrun의 특정 task가 실패했다면, 이번 dagrun에서 그 task가 실행되는 것을 막을 수 있음

wait_for_downstream

  • task 레벨에서 정의
  • default_args에 정의 해서 모든 task에도 적용 가능
  • wait_for_downtstream이 정의된 task의 downstream task들이 이전 dagrun에서 완료될 때 까지 이번 dagrun 대기

DAGs folder structure

1. Zip

  • dag 파일은 zip 파일 root에 위치해야함
  • 모듈 디펜던시가 필요하면 virtualenv와 pip 사용

2. DAGBag

  • DAG 모음. 폴더 구조로 dag를 다룬다.
  • dev/staging/prod와 같이 환경 분리에 이점이 있다
  • dagbag이 깨지면 airflow UI상에서 에러가 뜨지 않고 웹서버 로그로만 확인 가능하므로 주의

3. .airflowignore

  • .gitignore와 유사
  • 모든 dags 폴더에 넣는 것이 좋음

Failure Detection

DAGs

  • dagrun_timeout: dagrun이 타임아웃되는 시간. 스케줄된 dag만 해당하며(수동 실행은 해당되지 않음) 실행중인 dag의 수가 max_active_runs와 일치하는 경우에만 해당
  • sla_miss_callback
  • on_failure_callback
  • on_success_callback

Tasks

  • email
  • email_on_failure
  • email_on_retry
  • retries
  • retry_delay
  • retry_exponential_backoff
  • max_retry_delay
  • execution_timeout
  • on_failure_callback
  • on_success_callback
  • on_retry_callback

Test

DAG validation tests

  • 유효한지
  • cycle이 없는지
  • default arguments가 잘 설정됐는지

DAG/Pipeline Definition Tests

  • task의 숫자가 맞는지
  • (로직이 아닌) task가 잘 정의 됐는지
  • task의 upstream, downstream 디펜던시가 잘 정의됐는지

Unit tests

  • 로직 체크
  • operator가 잘 동작되는지만 체크
  • 복잡한 로직을 airflow가 하게 두지 마라 (airflow는 오케스트레이션 툴이다)

Integration tests

  • task가 데이터를 잘 교환하는지
  • task의 input 체크
  • 여러 task간 의존성 체크

End to End Pipeline tests

  • 결과가 올바른지
  • 전체 로직 체크
  • 성능 체크

Local Executor

  • 병렬 실행 가능 (개발시 local executor 사용 권장)
  • parallelism = 0: unlimit
  • parallelism > 0: limit
    • 코어수 - 1로 설정하는 것을 권장
  • dag_cuncurrencymax_active_runs_per_dag 옵션에 따라 dag간 task 실행 순서를 조정할 수 있다.
    • dag_cuncurrency: dag내에서 동시에 실행 가능한 task의 수
    • max_active_runs_per_dag: 동시에 실행 시킬 수 있는 dag 수(backfill 일 때 주로 신경쓸 듯)

SubDAGs

  • 유사한 DAG를 하나의 그룹으로 묶어 UI상에서 마치 하나의 DAG인 것 처럼 표시할 수 있음
  • SubDagOperator 사용
    • 기본 Executor는 SequentialExecutor
  • main DAG가 모든 subDAG 들을 task로 관리
  • Airflow UI는 오직 main DAG만 표시
  • subDAG는 부모 DAG와 동일한 시각에 스케줄되어야 함. 그렇지 않으면 예상치 못한 결과를 낳을 수 있음
  • 데드락이 발생할 수 있음

Branching

  • DAG가 특정 task의 결과에 따라 경로를 선택할 수 있게끔 하는 것
  • BranchPythonOperator
  • BranchPythonOperator의 결과가 task_c를 반환하고 BranchPythonOperator의 downstream으로 task_a, task_b, task_c가 있다면 task_c를 실행하고, a와 b는 스킵
  • depends_on_past=True로 지정시, a와 b는 실패 상태로 뜨기 때문에 다음 DAGrun은 실행되지 않음
  • BranchPythonOperator에 empty path를 주면 의도하지 않은 결과를 줄 수 있기 때문에 반드시 주는 것이 좋다.
    • path를 스킵하고 싶다면 (task를 끝내고 싶다면) dummy task를 줘서 끝내자
  • 마지막 task도 skipped로 뜨는 것을 막고 싶다면 마지막 task operator에 trigger_rule='one_success'추가

Trigger Rule

  • depends_on_past와 사용 가능
  • upstream task중 skipped task가 있으면 all_successall_failed는 skipped 상태로 표시됨

Kinds

  • all_success: upstream task가 모두 성공하면 run
  • all_failed: upstream task가 모두 실패하면 run
  • all_done: upstream task가 성공 유무와 관련없이 끝나면 run
  • one_failed: upstream task가 하나라도 실패해야 run
  • one_success: upstream task가 하나라도 성공해야 run
  • none_failed: upstream task가 failed가 없어야 run
  • none_skipped: upstream task가 skipped가 없어야 run
  • dummy

Variables

  • metadata DB에 저장되는 값
  • Key, value, Is encrypted로 구성
  • JSON 형식 가능

Templating

  • placeholder {{}} 를 사용하여 값을 대체
  • Jinja template

Macros

XCOMs

  • task 간 메세지 공유
  • key, value, timestamp로 구성
  • value는 가벼워야 한다. (성능 문제)
  • xcom_push(key, value)로 metadata DB에 푸시
    • key: returned_value, value: 리턴값
  • xcom_pull(key)로 받기
    • key를 명시하지 않으면 returned_value가 기본값

TriggerDagRunOperator

  • 다른 DAG(컨트롤러)의 조건에 따라 특정 DAG(타겟)를 시작하게 함
  • branch나 subdag로는 너무 복잡해질때 사용
  • 컨트롤러는 타겟이 종료될 때 까지 기다리지 않음
  • 컨트롤러와 타겟은 독립적임
  • 컨트롤러의 히스토리에 관한 시각화는 제공되지 않음
  • 두 dag모두 스케줄되어야 함
  • 타겟 interval은 None이어야 함
  • 두 dag간에 메세지를 주고받을 수 있음(xcoms 대체)

ExternalTaskSensor

  • DAG간 종속성 줄 때 사용
  • 예를 들어 DAG1(t1 >> t2 >> t3), DAG2(t3 >> t4 >> t5)가 있을 때 t3가 완료되면 (DAG1이 완료되면) DAG2가 실행되도록 한다. (t4부터)
  • 두 DAG는같은 스케줄이어야 함 (또는 execution_deltaexecution_date_fn 파라미터 사용)
  • TriggerDagRunOperator와 쓰면 고장남
    • 스케줄 인터벌 None이기 때문

Logging

  • airflow.cfgbase_log_folder: log 저장 경로
  • fab_logging_level: flask app builder의 로깅 수준. (flask 기반 웹서버)

Metrics

  • Counters: 실패한 task 수
  • Gauges: queued task 수
  • Timers: task 완료까지의 밀리초
Licensed under CC BY-NC-SA 4.0
Hugo로 만듦
JimmyStack 테마 사용 중