Airflow
프로그래밍 언어(파이썬, Python)으로 작업 흐름(Workflows)을 관리, 모니터하는 플랫폼으로 더 자세한 내용은 링크를 참고하세요.
설치
공식 가이드를 따라 설치를 진행합니다.
설치는
Dependencies 설치
sudo apt-get update -y
sudo apt-get install -y --no-install-recommends
freetds-bin
krb5-user
ldap-utils
libsasl2-2
libsasl2-modules
libssl1.1
locales
lsb-release
sasl2-bin
sqlite3
unixodbc
postgresql
python3-pip
python3-testresources
환경 변수 설정
export AIRFLOW_HOME=~/airflow
export AIRFLOW_VERSION="2.1.2"
export PYTHON_VERSION="$(python3 --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
export CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
export PATH=$PATH:/home/ubuntu/.local/bin
Airflow 설치
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
DB 초기화
사용자 생성
# initialize the database
airflow db init
airflow users create
--username admin
--firstname Peter
--lastname Parker
--role Admin
--email spiderman@superhero.org
사용자 생성 시 비밀번호를 입력할 수 있습니다.
설정 파일 수정
sed -i "103s/True/False/" ~/airflow/airflow.cfg
샘플을 보여주지 않겠다는 뜻이에요.
Web 서버 시작
스케쥴러 시작
# start the web server, default port is 8080
airflow webserver --port 8080
# start the scheduler
# open a new terminal or else run webserver with ``-D`` option to run it as a daemon
airflow scheduler
# visit localhost:8080 in the browser and use the admin account you just
# created to login. Enable the example_bash_operator dag in the home page
Webserver 실행 시 -D 옵션을 추가하여, Daemon으로 실행할 수 있습니다.
DAG
Airflow는 작업의 관계를 DAG(방향성 비순환 그래프, Directed Acyclic Graphs)로 표현합니다.
튜토리얼
파이프라인 작성
링크에서 제공하는 예제 파이프라인을 만들어봅니다. 자세한 설명도 있으니 참고하세요.
파일 생성
# Create a file
vi ~/airflow/dags/tutorial.py
내용 입력
from datetime import timedelta
from textwrap import dedent
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
}
with DAG(
'tutorial',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
tags=['example'],
) as dag:
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
)
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3,
)
t1.doc_md = dedent(
"""
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.

"""
)
dag.doc_md = __doc__ # providing that you have a docstring at the beggining of the DAG
dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this
templated_command = dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
)
t1 >> [t2, t3]
내용을 복사할 수 있습니다.
파이프라인 테스트
앞에서 생성한 파일을 테스트하여 문제가 없는지 확인합니다.
python3 ~/airflow/dags/tutorial.py
파이프라인 분석
Airflow 파이프라인은 파이썬 스크립트로 작성하며, Airflow DAG 객체를 정의합니다.
스크립트를 작성할 때 가장 먼저 하는 일은 필요한 라이브러리 불러오기(Importing) 입니다.
from datetime import timedelta
from textwrap import dedent
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
코드를 작성하면서 계속 바뀌는 부분 중 한 곳 입니다. 상황에 따라 필요한 라이브러리가 바뀔 수도 있겠죠?
작업(Task)를 생성할 때 사용할 기본 매개변수를 Dictionary 타입으로 정의합니다.
모든 작업 생성자(Constructor)에 Arguments를 명시적으로 전달할 수 있습니다.
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
}
더 많은 정보는 airflow.models.BaseOperator에서 확인하세요.
지금부터는 작업을 연결하는 DAG 객체가 필요합니다.
객체에 수 많은 DAG를 구별할 수 있는 고유한 식별자
앞서 정의한 Argument Dictionary 추가합니다. 그리고
with DAG(
'tutorial',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
tags=['example'],
) as dag:
튜토리얼에서 생성한 DAG의 고유한 식별자는
클래스로 생성한 객체를 인스턴스라고 하며,
작업의 고유한 식별자는
t1 = BashOperator(
task_id='print_date',
bash_command='date',
)
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3,
)
작업은 반드시
Airflow는
Jinja Templating을 사용하면 Template이 Rendering될 때 변수 및 표현들을 실제 값으로 변환합니다.
<!DOCTYPE html>
<html lang="en">
<head>
<title>My Webpage</title>
</head>
<body>
<ul id="navigation">
{% for item in navigation %}
<li><a href="{{ item.href }}">{{ item.caption }}</a></li>
{% endfor %}
</ul>
<h1>My Webpage</h1>
{{ a_variable }}
{# a comment #}
</body>
</html>
Jinja Template 소개
실제 Airflow 튜토리얼에 적용한 코드를 살펴보면,
templated_command = dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
)
그리고 하나의 작업을 더 추가했습니다.
지금까지 총 세 개의 작업
만약 우리가 작업 사이의 의존 관계를 정의를 한다면, 작업의 메소드
t1.set_downstream(t2)
# This means that t2 will depend on t1
# running successfully to run.
# It is equivalent to:
t2.set_upstream(t1)
# The bit shift operator can also be
# used to chain operations:
t1 >> t2
# And the upstream dependency with the
# bit shift operator:
t2 << t1
# Chaining multiple dependencies becomes
# concise with the bit shift operator:
t1 >> t2 >> t3
# A list of tasks can also be set as
# dependencies. These operations
# all have the same effect:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1
파이프라인 그리기
명령어로 간단히 DAG의 의존성 정보를 나타냅니다.
의존성 정보 확인
airflow tasks list tutorial --tree
# Expected Output
<Task(BashOperator): print_date>
<Task(BashOperator): sleep>
<Task(BashOperator): templated>
테스트
임의의 날짜를 대입하여 실제 작업 인스턴스를 동작해봅니다.
첫번째 작업 실행
airflow tasks test tutorial print_date 2015-06-01
# Expected Output
[2021-08-10 07:55:07,803] {subprocess.py:74} INFO - Output:
[2021-08-10 07:55:07,805] {subprocess.py:78} INFO - Tue Aug 10 07:55:07 UTC 2021
[2021-08-10 07:55:07,805] {subprocess.py:82} INFO - Command exited with return c ode 0
[2021-08-10 07:55:07,818] {taskinstance.py:1204} INFO - Marking task as SUCCESS. dag_id=tutorial, task_id=print_date, execution_date=20150601T000000, start_date =20210810T075507, end_date=20210810T075507
두번째 작업 실행
# testing sleep
airflow tasks test tutorial sleep 2015-06-01
# Expected output
[2021-08-10 08:01:27,404] {subprocess.py:63} INFO - Running command: ['bash', '-c', 'sleep 5']
[2021-08-10 08:01:27,408] {subprocess.py:74} INFO - Output:
[2021-08-10 08:01:32,412] {subprocess.py:82} INFO - Command exited with return code 0
[2021-08-10 08:01:32,428] {taskinstance.py:1204} INFO - Marking task as SUCCESS. dag_id=tutorial, task_id=sleep, execution_date=20150601T000000, start_date=20210810T080127, end_date=20210810T080132
생성한 작업이 정상적으로 동작하네요.
이와 유사하게
Backfill
테스트까지 정상적으로 진행됐으면, 드디어 Backfill을 진행할 때 입니다.
의존 관계에 따라 작업을 진행하고, 파일로 로그를 남기며 데이터베이스에 상태 정보를 저장합니다.
# optional, start a web server in debug mode in the background
# airflow webserver --debug &
# start your backfill on a date range
airflow dags backfill tutorial
--start-date 2015-06-01
--end-date 2015-06-07
# Expected output
[2021-08-10 08:11:15,786] {backfill_job.py:377} INFO - [backfill progress] | finished run 7 of 7 | tasks waiting: 0 | succeeded: 21 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
[2021-08-10 08:11:15,792] {backfill_job.py:831} INFO - Backfill done. Exiting.
Backfill을 성공적으로 수행하고, 종료하였습니다.
여기까지 Airflow 튜토리얼이었습니다.
마지막으로,
끝까지 읽어주신 모든 분들께 감사드립니다.
다음 글 보기
이전 글 보기