데이터 파이프라인을 구축할 때 이 Workflow를 관리하기 위한 많은 오픈 소스 플랫폼들이 있는데 그중 인기 있고 많은 기업에서 사용 중인 Apache Airflow를 설치를 한 번 해보겠다. 현재 Airflow는 윈도우에서의 설치를 지원하지 않아 윈도우의 WSL을 활용하여 설치해보겠다.
실행 환경
- Python 3.8.x
- Windows 10 WSL2
가상 환경 만들기
# airflow를 설치할 디렉토리 생성
$ mkdir airflow-project
# 디렉토리로 이동
$ cd airflow-project
# 가상 환경 생성
$ python -m venv .venv
# 가상 환경 활성화
$ source .venv/bin/activate
# 활성화 완료 시
(.venv) $
Apache Airflow 설치하기
# pip upgrade
$ pip install --upgrade pip
# airflow 설치
$ pip install apache-airflow==2.3.0
Apache Airflow의 최신 버전은 https://github.com/apache/airflow 에서 확인할 수 있다.
Apache Airflow 배포하기
Airflow를 설치하였으니 이제 배포하는 작업을 해보겠습니다.
- 프로젝트 루트 경로 설정
# 현재 경로를 프로젝트 루트 경로로 지정
$ export AIRFLOW_HOME=.
- Airflow Meta Database 초기화
$ airflow db init
- DAG, 해당 실행 및 사용자, 역할 및 연결과 같은 기타 Airflow 구성에 대한 메타데이터를 저장한다.
- Airflow는 default로 Sequential Executor를 사용하는데 따로 Meta Database를 지정하지 않을 시 SQLite를 사용한다.
airflow.exceptions.AirflowConfigException: Cannot use relative path: `sqlite:///./airflow.db` to connect to sqlite. Please use absolute path such as `sqlite:////tmp/airflow.db`. 이러한 에러 메시지가 나온다면, airflow.cfg 파일에서 sql_alchemy_conn = sqlite:///./airflow.db 이 부분이 상대 경로로 되어 있어서 그런 것이므로 절대 경로로 airflow.db의 경로를 작성해줘야 한다.
- 관리자 계정 생성
# Airflow 웹 접속할 때 필요.
$ airflow users create \
--username admin \
--password 1234 \
--firstname Hong \
--lastname Gildong \
--role Admin \
--email example@example.com
- Webserver 실행
$ airflow webserver --port 8080
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
Running the Gunicorn Server with:
Workers: 4 sync
Host: 0.0.0.0:8080
Timeout: 120
Logfiles: - -
Access Logformat:
=================================================================
[2022-05-16 19:47:44 +0900] [133] [INFO] Starting gunicorn 20.1.0
[2022-05-16 19:47:47 +0900] [133] [INFO] Listening at: http://0.0.0.0:8080 (133)
[2022-05-16 19:47:47 +0900] [133] [INFO] Using worker: sync
[2022-05-16 19:47:47 +0900] [135] [INFO] Booting worker with pid: 135
[2022-05-16 19:47:47 +0900] [136] [INFO] Booting worker with pid: 136
[2022-05-16 19:47:47 +0900] [137] [INFO] Booting worker with pid: 137
[2022-05-16 19:47:47 +0900] [138] [INFO] Booting worker with pid: 138
- Airflow의 웹 UI로 Meta Database로 부터 DAG 정보를 읽어와 DAG 정보 및 DAG Run의 상태를 확인하고 실행할 수 있다.
- Airflow는 웹 서버는 기본 포트로 8080을 사용한다.
여기까지 진행 후 localhost:8080 으로 접속 후 관리자 계정으로 로그인 시 airflow 웹페이지를 확인할 수 있다.
웹 UI에서 다음과 같은 경고메시지들을 확인할 수 있는데 의미는 다음과 같다.
- Scheduler가 실행이 안되있음.
- Metadata DB로 SQLite를 production에서 사용하지 말라. 대신, Postgres나 MySQL을 사용해라.
- SequentialExecutor를 production에서 사용하지 말라.
여기서 첫 번째 메시지는 스케줄러를 실행하면 없어진다. 나머지에 대한 것은 다음에 포스팅하겠다.
- Scheduler 실행
$ source .venv/bin/activate
(.venv) $ export AIRFLOW_HOME=.
# scheduler 실행
(.venv) $ airflow scheduler
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
[2022-05-16 19:53:47,019] {scheduler_job.py:693} INFO - Starting the scheduler
[2022-05-16 19:53:47,019] {scheduler_job.py:698} INFO - Processing each file at most -1 times
[2022-05-16 19:53:47,032] {executor_loader.py:106} INFO - Loaded executor: SequentialExecutor
[2022-05-16 19:53:47,034] {manager.py:156} INFO - Launched DagFileProcessorManager with pid: 170
[2022-05-16 19:53:47,035] {scheduler_job.py:1218} INFO - Resetting orphaned tasks for active dag runs
[2022-05-16 19:53:47,047] {settings.py:55} INFO - Configured default timezone Timezone('UTC')
[2022-05-16 19:53:47,063] {manager.py:399} WARNING - Because we cannot use more than 1 thread (parsing_processes = 2) when using sqlite. So we set parallelism to 1.
[2022-05-16 19:53:47 +0900] [169] [INFO] Starting gunicorn 20.1.0
[2022-05-16 19:53:47 +0900] [169] [INFO] Listening at: http://0.0.0.0:8793 (169)
[2022-05-16 19:53:47 +0900] [169] [INFO] Using worker: sync
[2022-05-16 19:53:47 +0900] [171] [INFO] Booting worker with pid: 171
[2022-05-16 19:53:47 +0900] [172] [INFO] Booting worker with pid: 172
- Airflow의 가장 중요한 부분으로, 다양한 DAG Run과 Task들을 스케쥴링 및 오케스트레이션 한다.
- 하나의 DAG Run이 전체 시스템을 압도하지 않도록 각 DAG Run의 실행 횟수를 제한한다.
- Meta Database에 DAG 정보 및 DAG Run에 대해 저장한다.
그리고 웹 UI를 확인해보면 첫 번째 경고 메시지가 사라진 것을 확인할 수 있다.
여기까지 Airflow 설치와 배포까지 마무리를 해보겠다. 다음에는 DAG 작성에 대해 정리해봐야겠다.
'Data > Airflow' 카테고리의 다른 글
[Airflow] EC2 -> EKS 마이그레이션 (0) | 2024.08.01 |
---|---|
[Airflow] Airflow 성능 관련 설정 값 정리 (0) | 2023.12.19 |
[Airflow] DAG CI/CD 구축기 (0) | 2023.06.21 |
[Airflow] 로그 기록 시간대 문제 (0) | 2023.04.05 |
[Airflow] DAG란? (0) | 2022.05.22 |