[Airflow] Operator 소개


Operator


Airflow 공식 홈페이지 소개에 따르면 Operator는 이론적으로 멱등성을 특징으로 하는 단일 작업(Task)으로 소개하고 있습니다.


사용 안내서를 살펴보면, 기본적으로 제공하는 Operator를 볼 수 있습니다.


그 중 BashOperator, PythonOperator를 살펴보도록 합니다.


BashOperator


Bash Shell 명령어를 실행합니다. 자세한 내용은 BashOperator에서 확인할 수 있습니다.

run_this = BashOperator(
    task_id='run_after_loop',
    bash_command='echo 1',
)

run_after_loop 작업의 결과는 아래의 결과와 똑같습니다.

echo 1
# Expected output
1

다양한 명령어를 사용하여 응용할 수 있습니다.


정적인 명령어 사용과 더불어 Jinja templates를 활용하여 동적인 표현 사용이 가능합니다.

also_run_this = BashOperator(
    task_id='also_run_this',
    bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
)

Jinja templates에서 사용할 수 있는 Macros / Variables는 Macros reference 에서 확인합니다.


{{ run_id }} 또는 {{ dag_run }} 과 같이 이미 정의된 변수 Jinja templates 변수는 Default Variables입니다.


사용자가 별도의 값을 지정하지 않아도, 실행 시 해당 값을 자동으로 입력하여 결과값으로 출력합니다.

# Expected Output
run_id=backfill__2015-06-06T00:00:00+00:00 | dag_run=<DagRun bashop @ 2015-06-06 00:00:00+00:00:
backfill__2015-06-06T00:00:00+00:00, externally triggered: False>

현재 실행 중인 DAG의 ID와 DAG 정보를 나타냅니다.


명령어에 escaping 문자가 포함이 되어있다면, dag_run conf 파일을 사용합니다.

bash_task = BashOperator(
    task_id="bash_task",
    bash_command='echo "here is the message: '$message'"',
    env={'message': '{{ dag_run.conf["message"] if dag_run else "" }}'},
)

env 키워드를 사용하여 메세지를 전달하는 것이 올바른 방법입니다.


아래처럼 bash_command에 포함하는 것은 올바르지 않습니다.

bash_task = BashOperator(
    task_id="bash_task",
    bash_command='echo "Here is the message: '{{ dag_run.conf["message"] if dag_run else "" }}'"',
)

PythonOperator


파이썬으로 작성한 코드를 실행합니다. 자세한 내용은 PythonOperator에서 확인할 수 있습니다.


기본적으로 PythonOperator 인스턴스로 print_context 함수를 두번째 Argument로 전달합니다.

from pprint import pprint
def print_context(ds, **kwargs):
    """Print the Airflow context and ds variable from the context."""
    pprint(kwargs)
    print(ds)
    return 'Whatever you return gets printed in the logs'
run_this = PythonOperator(
    task_id='print_the_context',
    python_callable=print_context,
)

Python callable에 전달할 추가 Argument는 op_argsop_kwargs 키워드를 사용합니다.

def my_sleeping_function(random_base):
    """This is a function that will run within the DAG execution"""
    time.sleep(random_base)
# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
for i in range(5):
    task = PythonOperator(
        task_id='sleep_for_' + str(i),
        python_callable=my_sleeping_function,
        op_kwargs={'random_base': float(i) / 10},
    )
    run_this >> task

마지막으로,

끝까지 읽어주신 모든 분들께 감사드립니다.


다음 글 보기

이전 글 보기

댓글 달기

이메일 주소는 공개되지 않습니다. 필수 필드는 *로 표시됩니다