Operator
Airflow 공식 홈페이지 소개에 따르면
사용 안내서를 살펴보면, 기본적으로 제공하는 Operator를 볼 수 있습니다.
그 중
BashOperator
Bash Shell 명령어를 실행합니다. 자세한 내용은 BashOperator에서 확인할 수 있습니다.
run_this = BashOperator(
task_id='run_after_loop',
bash_command='echo 1',
)
echo 1
# Expected output
1
다양한 명령어를 사용하여 응용할 수 있습니다.
정적인 명령어 사용과 더불어
also_run_this = BashOperator(
task_id='also_run_this',
bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
)
사용자가 별도의 값을 지정하지 않아도, 실행 시 해당 값을 자동으로 입력하여 결과값으로 출력합니다.
# Expected Output
run_id=backfill__2015-06-06T00:00:00+00:00 | dag_run=<DagRun bashop @ 2015-06-06 00:00:00+00:00:
backfill__2015-06-06T00:00:00+00:00, externally triggered: False>
현재 실행 중인 DAG의 ID와 DAG 정보를 나타냅니다.
명령어에 escaping 문자가 포함이 되어있다면,
bash_task = BashOperator(
task_id="bash_task",
bash_command='echo "here is the message: '$message'"',
env={'message': '{{ dag_run.conf["message"] if dag_run else "" }}'},
)
env 키워드를 사용하여 메세지를 전달하는 것이 올바른 방법입니다.
아래처럼
bash_task = BashOperator(
task_id="bash_task",
bash_command='echo "Here is the message: '{{ dag_run.conf["message"] if dag_run else "" }}'"',
)
PythonOperator
파이썬으로 작성한 코드를 실행합니다. 자세한 내용은 PythonOperator에서 확인할 수 있습니다.
기본적으로 PythonOperator 인스턴스로
from pprint import pprint
def print_context(ds, **kwargs):
"""Print the Airflow context and ds variable from the context."""
pprint(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
run_this = PythonOperator(
task_id='print_the_context',
python_callable=print_context,
)
Python callable에 전달할 추가 Argument는
def my_sleeping_function(random_base):
"""This is a function that will run within the DAG execution"""
time.sleep(random_base)
# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
for i in range(5):
task = PythonOperator(
task_id='sleep_for_' + str(i),
python_callable=my_sleeping_function,
op_kwargs={'random_base': float(i) / 10},
)
run_this >> task
마지막으로,
끝까지 읽어주신 모든 분들께 감사드립니다.
다음 글 보기
이전 글 보기