Airflow 설치에 대한 포스팅은 다음에서 확인할 수 있다. 혹시 설치를 아직 안 했다면 다음 포스팅을 참고하자.
Airflow에서는 파이프라인을 정의하기 위해서 DAG를 사용한다. DAG란 Directred Acyclic Graph의 약자로 방향성 있는 비순환 그래프라고 불린다. 각각의 Task들의 Workflow를 만들 수 있는데 예를 들어, ETL 파이프라인을 구축한다고 한다면 각각의 Task는 Extract, Transform, Load 일 것이고 다음과 같은 화살표를 가진 DAG를 만들 수 있다.
이런 식으로 각각의 Task들은 의존성(화살표)을 가지고 수행을 한다. 그렇다면 이 DAG는 어떻게 만들어야 하는 걸까? 기본적으로 파이썬 코드를 통해 작성할 수 있다.
DAG 폴더
먼저 DAG을 작성한 파이썬 파일들은 $AIRFLOW_HOME/dags 디렉토리에 저장하는 것을 기본으로 하고 있다. 처음에는 dags 디렉토리가 없기 때문에 생성을 해주면 된다. 이 DAG 파일들의 디렉토리는 airflow.cfg 에서 변경할 수 있다.
[core]
# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository. This path must be absolute.
dags_folder = ./dags
DAG 작성하기
이제 DAG를 작성해보자. dags 폴더 내에 hello_dag.py 를 작성해보자.
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
def print_airflow() -> None:
print("airflow")
# with 구문으로 DAG 정의
with DAG(
dag_id="hello_airflow", # DAG의 식별자용 아이디
description="My First DAG", # DAG에 대해 설명
start_date=days_ago(2), # DAG 정의 기준 2일 전부터 시작
schedule_interval=timedelta(days=1), # 1일을 주기로 실행
) as dag:
# Task를 정의
# bash 커맨드로 echo hello 를 실행
t1 = BashOperator(
task_id="print_hello",
bash_command="echo Hello",
owner="HongGilDong", # 이 작업의 오너. 보통 작업을 담당하는 사람 이름을 넣는다.
retries=3, # 이 Task가 실패한 경우, 3번 재시도
retry_delay=timedelta(minutes=5), # 재시도하는 시간 간격은 5분
)
# Task를 정의
# python 함수 print_airflow()를 실행
t2 = PythonOperator(
task_id="print_airflow",
python_callable=print_airflow,
owner="HongGilDong",
retries=3,
retry_delay=timedelta(minutes=5),
)
# Task 의존성(순서)를 정하는 부분이다.
# t1 -> t2
t1 >> t2
DAG 정의
- dag_id : DAG를 구분하는 식별자
- description : DAG에 대한 설명을 적어놓는 부분
- start_date : DAG의 실행 날짜를 정하는 부분
- 보통은 datetime.datetime을 통해 자세한 날짜를 작성하는 것이 일반적이지만 이해를 쉽게 하기 위해 days_ago를 사용하였다.
- schedule_interval : DAG의 실행 주기를 정하는 부분
- 보통은 CRON EXPRESSION을 사용해서 정의하지만 이해를 쉽게 하기 위해 timedelta를 사용하였다.
이 외에도 다양한 파라미터들이 존재하고 있는데, 그에 대한 정보는 공식 문서에서 확인할 수 있다.
Task 정의
Task는 Airflow에서 지원하는 여러 Operator 들을 이용해서 만들 수 있다. 여기서는 PythonOperator와 BashOperator를 사용한다.
- BashOperator : Bash Shell 명령을 사용할 수 있는 오퍼레이터
- bash_command : Bash로 "echo hello"라는 커맨드를 실행한다.
- PythonOperator : 파이썬으로 작성된 코드를 사용할 수 있는 오퍼레이터
- python_callable : print_airflow() 함수를 실행한다.
- 이 외에 공통적인 파라미터
- owner : 실행하는 Task의 소유자. 보통 담당자의 이름을 넣는다.
- retries : Task의 실행을 실패할 시 재시도하는 횟수.
- retry_delay : 재시도할 때, 재시도의 시간 간격.
다양한 Operator들이 존재하고 있고, 그에 대한 설명과 파라미터들은 공식 문서에서 확인할 수 있다.
Airflow 기본 Operator 목록
Airflow Provider Operator 목록
Task 의존성 정의
정의한 Task들의 의존성을 >> 오퍼레이터를 통해 정한다.
- t1 >> t2
- t1의 실행이 완료되면 t2를 실행시킨다.
- t1이 실패한다면 t2는 실행시키지 않는다.
default_args 활용하기
위의 Task들을 정의할 때 각 오퍼레이터에서 중복되는 부분을 확인할 수 있다. 그럴 땐 가독성을 위해 default_args 파라미터를 활용할 수 있는데, 이것을 통해 깔끔하게 DAG를 정의할 수 있다.
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
def print_airflow() -> None:
print("airflow")
# with 구문으로 DAG 정의
with DAG(
dag_id="hello_airflow", # DAG의 식별자용 아이디
description="My First DAG", # DAG에 대해 설명
start_date=days_ago(2), # DAG 정의 기준 2일 전부터 시작
schedule_interval=timedelta(days=1), # 1일을 주기로 실행
default_args = {
"owner": "HongGilDong",
"retries": 3,
"retry_delay": timedelta(minutes=5),
}
) as dag:
# Task를 정의
# bash 커맨드로 echo hello 를 실행
t1 = BashOperator(
task_id="print_hello",
bash_command="echo Hello",
)
# Task를 정의
# python 함수 print_airflow()를 실행
t2 = PythonOperator(
task_id="print_airflow",
python_callable=print_airflow,
)
# Task 의존성(순서)를 정하는 부분이다.
# t1 -> t2
t1 >> t2
Dict 객체를 파라미터로 넘기게 되면 중복을 제거할 수 있다. 이렇게 DAG 작성을 완료했다.
DAG 실행하기
localhost:8080에 접속하면 hello_airflow라는 DAG가 생성된 것을 확인할 수 있다.
처음 DAG들을 확인하면 example DAG들이 많은 것을 확인할 수 있는데, airflow.cfg 에서 안보이게 설정할 수 있다.
# Whether to load the DAG examples that ship with Airflow. It's good to # get started, but you probably want to set this to ``False`` in a production # environment load_examples = False
load_examples를 True에서 False로 바꿔주면 된다.
hello_airflow 옆에 토글 버튼을 활성화 시키면 DAG가 활성화된다. 그러면 조금 뒤에 DAG가 실행될 것이다.
이렇게 두 개의 초록색 바를 확인할 수 있는데 DAG가 두 번 RUN 됐다는 것이다. 이것에 대한 이해를 하려면 DAG를 작성할 때 사용한 파라미터들을 살펴봐야 한다.
- start_date
- 위 DAG에서는 start_date를 days_ago(2)라고 설정을 하였는데 이것은 DAG 정의 기준 2일 전 0시를 말하는 것이다.
- schedule_interval
- schedule_interval이 timedelta(days=1)로 하루마다 DAG가 RUN 되도록 스케줄링을 한다는 의미다.
DAG 정의 일 기준으로 2일 전 0시 부터 1일만큼 시간이 지나면 DAG가 RUN이 된다. 그래서 총 2번이 실행된 것이다. start_date라고 start_date 시간에 실행되는 것이 아닌 인터벌만큼의 시간이 지난 후 실행이 된다. Airflow를 처음 접해보면 헷갈릴 수 있는 부분이므로 잘 숙지하도록 하자. 여기까지 Airflow DAG를 작성하는 것을 살펴보았다.
'Data > Airflow' 카테고리의 다른 글
[Airflow] EC2 -> EKS 마이그레이션 (0) | 2024.08.01 |
---|---|
[Airflow] Airflow 성능 관련 설정 값 정리 (0) | 2023.12.19 |
[Airflow] DAG CI/CD 구축기 (0) | 2023.06.21 |
[Airflow] 로그 기록 시간대 문제 (0) | 2023.04.05 |
[Airflow] Apache Airflow 설치하기 (0) | 2022.05.16 |