[Airflow] 설치 및 튜토리얼


Airflow


프로그래밍 언어(파이썬, Python)으로 작업 흐름(Workflows)을 관리, 모니터하는 플랫폼으로 더 자세한 내용은 링크를 참고하세요.


설치


공식 가이드를 따라 설치를 진행합니다.


설치는 Ubuntu 20.04에서 진행하였습니다.


Dependencies 설치

sudo apt-get update -y
sudo apt-get install -y --no-install-recommends
        freetds-bin
        krb5-user
        ldap-utils
        libsasl2-2
        libsasl2-modules
        libssl1.1
        locales
        lsb-release
        sasl2-bin
        sqlite3
        unixodbc
        postgresql
        python3-pip
        python3-testresources

환경 변수 설정

export AIRFLOW_HOME=~/airflow
export AIRFLOW_VERSION="2.1.2"
export PYTHON_VERSION="$(python3 --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
export CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
export PATH=$PATH:/home/ubuntu/.local/bin

Airflow 설치

pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

DB 초기화

사용자 생성

# initialize the database
airflow db init
airflow users create
    --username admin
    --firstname Peter
    --lastname Parker
    --role Admin
    --email spiderman@superhero.org

사용자 생성 시 비밀번호를 입력할 수 있습니다.


설정 파일 수정

sed -i "103s/True/False/" ~/airflow/airflow.cfg

샘플을 보여주지 않겠다는 뜻이에요.


Web 서버 시작

스케쥴러 시작

# start the web server, default port is 8080
airflow webserver --port 8080
# start the scheduler
# open a new terminal or else run webserver with ``-D`` option to run it as a daemon
airflow scheduler
# visit localhost:8080 in the browser and use the admin account you just
# created to login. Enable the example_bash_operator dag in the home page

Webserver 실행 시 -D 옵션을 추가하여, Daemon으로 실행할 수 있습니다.


DAG


Airflow는 작업의 관계를 DAG(방향성 비순환 그래프, Directed Acyclic Graphs)로 표현합니다.


튜토리얼


파이프라인 작성


링크에서 제공하는 예제 파이프라인을 만들어봅니다. 자세한 설명도 있으니 참고하세요.


파일 생성

# Create a file
vi ~/airflow/dags/tutorial.py

내용 입력

from datetime import timedelta
from textwrap import dedent
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}
with DAG(
    'tutorial',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
    start_date=days_ago(2),
    tags=['example'],
) as dag:
    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = BashOperator(
        task_id='print_date',
        bash_command='date',
    )
    t2 = BashOperator(
        task_id='sleep',
        depends_on_past=False,
        bash_command='sleep 5',
        retries=3,
    )
    t1.doc_md = dedent(
        """
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
    """
    )
    dag.doc_md = __doc__  # providing that you have a docstring at the beggining of the DAG
    dag.doc_md = """
    This is a documentation placed anywhere
    """  # otherwise, type it like this
    templated_command = dedent(
        """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
    """
    )
    t3 = BashOperator(
        task_id='templated',
        depends_on_past=False,
        bash_command=templated_command,
        params={'my_param': 'Parameter I passed in'},
    )
    t1 >> [t2, t3]

내용을 복사할 수 있습니다.


파이프라인 테스트


앞에서 생성한 파일을 테스트하여 문제가 없는지 확인합니다.

python3 ~/airflow/dags/tutorial.py

파이프라인 분석


Airflow 파이프라인은 파이썬 스크립트로 작성하며, Airflow DAG 객체를 정의합니다.


스크립트를 작성할 때 가장 먼저 하는 일은 필요한 라이브러리 불러오기(Importing) 입니다.

from datetime import timedelta
from textwrap import dedent
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

코드를 작성하면서 계속 바뀌는 부분 중 한 곳 입니다. 상황에 따라 필요한 라이브러리가 바뀔 수도 있겠죠?


작업(Task)를 생성할 때 사용할 기본 매개변수를 Dictionary 타입으로 정의합니다.


모든 작업 생성자(Constructor)에 Arguments를 명시적으로 전달할 수 있습니다.

# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}

더 많은 정보는 airflow.models.BaseOperator에서 확인하세요.


지금부터는 작업을 연결하는 DAG 객체가 필요합니다.


객체에 수 많은 DAG를 구별할 수 있는 고유한 식별자 dag_id를 문자열로 정의합니다.


앞서 정의한 Argument Dictionary 추가합니다. 그리고 Schedule_interval도 정의하구요.

with DAG(
    'tutorial',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
    start_date=days_ago(2),
    tags=['example'],
) as dag:

튜토리얼에서 생성한 DAG의 고유한 식별자는 tutorial이군요.


클래스로 생성한 객체를 인스턴스라고 하며, 작업(Tasks)은 Operator의 인스턴스입니다.


작업의 고유한 식별자는 task_id이며, 작업의 첫 번째 Argument로 추가합니다.

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
)
t2 = BashOperator(
    task_id='sleep',
    depends_on_past=False,
    bash_command='sleep 5',
    retries=3,
)

작업은 반드시 task_idowner Arguemnt를 포함하거나 상속받아야합니다. 그렇지 않으면 문제가 발생(Raise an exception)합니다.


Airflow는 Jinja Templating, 사전 정의된 매개변수(Parameters)매크로(Macro) 등을 지원합니다.


Jinja Templating을 사용하면 Template이 Rendering될 때 변수 및 표현들을 실제 값으로 변환합니다.


Template engine은 괄호 사이에 있는 변수나 표현들을 결과로 반환합니다.

<!DOCTYPE html>
<html lang="en">
<head>
    <title>My Webpage</title>
</head>
<body>
    <ul id="navigation">
    {% for item in navigation %}
        <li><a href="{{ item.href }}">{{ item.caption }}</a></li>
    {% endfor %}
    </ul>
    <h1>My Webpage</h1>
    {{ a_variable }}
    {# a comment #}
</body>
</html>

Jinja Template 소개


실제 Airflow 튜토리얼에 적용한 코드를 살펴보면, templated_command에 {% %} 블록, 매개변수, 함수 등 다양한 문법이 적용되어 있습니다.

templated_command = dedent(
    """
{% for i in range(5) %}
    echo "{{ ds }}"
    echo "{{ macros.ds_add(ds, 7)}}"
    echo "{{ params.my_param }}"
{% endfor %}
"""
)
t3 = BashOperator(
    task_id='templated',
    depends_on_past=False,
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
)

그리고 하나의 작업을 더 추가했습니다.


지금까지 총 세 개의 작업 T1, T2, T3을 생성했습니다. 작업들 사이의 의존 관계는 존재하지 않습니다.


만약 우리가 작업 사이의 의존 관계를 정의를 한다면, 작업의 메소드 set_downstream & set_upstream를 활용하거나 연산자 >> & << 등으로 나타냅니다.

t1.set_downstream(t2)
# This means that t2 will depend on t1
# running successfully to run.
# It is equivalent to:
t2.set_upstream(t1)
# The bit shift operator can also be
# used to chain operations:
t1 >> t2
# And the upstream dependency with the
# bit shift operator:
t2 << t1
# Chaining multiple dependencies becomes
# concise with the bit shift operator:
t1 >> t2 >> t3
# A list of tasks can also be set as
# dependencies. These operations
# all have the same effect:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1

파이프라인 그리기


명령어로 간단히 DAG의 의존성 정보를 나타냅니다.


의존성 정보 확인

airflow tasks list tutorial --tree
# Expected Output
<Task(BashOperator): print_date>
    <Task(BashOperator): sleep>
    <Task(BashOperator): templated>

테스트


임의의 날짜를 대입하여 실제 작업 인스턴스를 동작해봅니다.


첫번째 작업 실행

airflow tasks test tutorial print_date 2015-06-01
# Expected Output
[2021-08-10 07:55:07,803] {subprocess.py:74} INFO - Output:
[2021-08-10 07:55:07,805] {subprocess.py:78} INFO - Tue Aug 10 07:55:07 UTC 2021
[2021-08-10 07:55:07,805] {subprocess.py:82} INFO - Command exited with return c                                                                  ode 0
[2021-08-10 07:55:07,818] {taskinstance.py:1204} INFO - Marking task as SUCCESS.                                                                   dag_id=tutorial, task_id=print_date, execution_date=20150601T000000, start_date                                                                  =20210810T075507, end_date=20210810T075507

두번째 작업 실행

# testing sleep
airflow tasks test tutorial sleep 2015-06-01
# Expected output
[2021-08-10 08:01:27,404] {subprocess.py:63} INFO - Running command: ['bash', '-c', 'sleep 5']
[2021-08-10 08:01:27,408] {subprocess.py:74} INFO - Output:
[2021-08-10 08:01:32,412] {subprocess.py:82} INFO - Command exited with return code 0
[2021-08-10 08:01:32,428] {taskinstance.py:1204} INFO - Marking task as SUCCESS. dag_id=tutorial, task_id=sleep, execution_date=20150601T000000, start_date=20210810T080127, end_date=20210810T080132

생성한 작업이 정상적으로 동작하네요.


airflow tasks test 명령어는 작업 인스턴스를 로컬에서 실행하며, 의존 관계와 무관하게 동작합니다. 또한 데이터베이스에 상태를 전달하지 않습니다.


이와 유사하게 airflow dags test [dag_id] [execution_date] 명령어를 통해 DAG를 테스트해볼 수 있습니다. 이 때는 의존 관계에 따라 작업을 진행하지만, 데이터베이스에 상태를 전달하지는 않습니다.


Backfill


테스트까지 정상적으로 진행됐으면, 드디어 Backfill을 진행할 때 입니다.


의존 관계에 따라 작업을 진행하고, 파일로 로그를 남기며 데이터베이스에 상태 정보를 저장합니다.


strat_dateend_date를 명시하여 작업 인스턴스가 스케쥴에 맞춰 수행할 수 있도록 합니다.

# optional, start a web server in debug mode in the background
# airflow webserver --debug &
# start your backfill on a date range
airflow dags backfill tutorial
    --start-date 2015-06-01
    --end-date 2015-06-07
# Expected output
[2021-08-10 08:11:15,786] {backfill_job.py:377} INFO - [backfill progress] | finished run 7 of 7 | tasks waiting: 0 | succeeded: 21 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
[2021-08-10 08:11:15,792] {backfill_job.py:831} INFO - Backfill done. Exiting.

Backfill을 성공적으로 수행하고, 종료하였습니다.


여기까지 Airflow 튜토리얼이었습니다.


마지막으로,

끝까지 읽어주신 모든 분들께 감사드립니다.


다음 글 보기

이전 글 보기

댓글 달기

이메일 주소는 공개되지 않습니다. 필수 필드는 *로 표시됩니다