개요
기존 AWS에서 CeleryExecutor 를 활용하여 EC2 기반으로 Airflow 를 운영하였고 이번에 DevOps 팀에서 EKS를 구축하게 되면서 이를 기회 삼아 KubernetesExecutor 를 활용하여 EKS 기반으로 Airflow 를 구축하고 기존 Airflow 의 DAG 들의 이관을 계획하게 되었습니다. 이번 포스팅에서 기존 방식은 어떤 문제가 있었고 어떻게 EKS에 Airflow 를 구축하였고 EKS 기반으로 옮기게 되면서 얻게 된 장단점에 대해 써보겠습니다.
기존 방식과 문제점
기존에는 Airflow 가 제공하는 CeleryExecutor 를 통해 여러 EC2 인스턴스를 활용하여 Master-Worker 구조의 클러스터를 구축했었습니다. 먼저, CeleryExecutor 의 방식에 대해 간단히 설명하겠습니다.
CeleryExecutor
먼저, Executor 라는 개념이 생소할 수 있으니 간단히 설명하겠습니다.
간단히 말해서 Executor 란 작업자(Worker)들에게 작업을 실행시키는 실행자 역할을 합니다. 해당 Executor 는 Airflow Scheduler 에 종속되어 있으며, Airflow 는 여러 가지 Executor 를 지원합니다. 대표적으로, LocalExecutor, CeleryExecutor, KubernetesExecutor 들이 있습니다.
CeleryExecutor 는 Airflow 가 Cluster 를 지원하기 위해 만들어진 Executor 로, 여러 Node 와 통신을 해야하기 때문에 중간에 Worker 들에게 시킬 Job 들을 저장하고 각 Worker 들이 해당 Job 들을 가져가서 실행하기 위해 Broker 를 사용하게 됩니다. CeleryExecutor 는 RabbitMQ, Redis 등을 지원하는데 이 중 Redis 를 채택했으며, 이를 통해 Airflow 클러스터를 구성하였습니다.
문제점
CeleryExecutor 방식은 인스턴스가 구성되어 있다면 간단하게 구축할 수 있다는 장점이 있습니다. 추가적인 인프라가 Redis 하나 정도 추가되기 때문에 어렵지 않게 구축할 수 있습니다. 다만, 클러스터를 구축하게 되면서 생기는 Challenge 가 생기게 됩니다.
1. 리소스 및 비용 문제
Scheduler 가 Task Job 들을 Worker Node 에 배치할 때 기준은 각 Worker 노드에서 몇 개의 Job 이 실행 중인지를 판단하고 배치하기 때문에 특정 Task 가 리소스를 많이 사용하여 여유 있는 Worker 가 존재하여도 가용 리소스가 적은 Worker 로 배치될 수 있습니다.
실제로 운영하면서 특정 Task 들이 리소스를 많이 사용하는데 가용 리소스가 적은 Worker 로 배치되어 해당 Worker 가 리소스 과부하로 인해 Stop 되거나 Task 들이 실패하는 경우가 종종 발생하였습니다.
이는 Worker 를 Scale up 하거나 Scale out 하는 방향으로 어느 정도 해결할 수 있지만, Airflow 의 특성 상 DAG 가 많이 동작하는 시간대가 있거나 거의 동작하지 않는 시간대가 존재하고 있어 리소스의 낭비가 이루어 지기 때문에 비용 최적화 측면에서 애매한 상황이 발생하게 됩니다.
2. 인프라 관리 문제
Airflow 는 각 Master 노드와 Worker 노드에 동일한 configuration과 DAG 코드를 가지고 있어야 합니다. 실제로 Worker 가 Task 를 실행하는데 DAG 코드가 다르다면 예상치 못한 동작이나 에러가 발생할 수 있기 때문에 각 노드별로 신경을 써줘야 합니다. 노드가 Scale out 되면 될수록 점점 관리해야 될 포인트가 늘어나며 이는 인프라 차원에서 부담감이 늘어납니다.
문제를 해결 하기 위한 방안
위와 같은 문제를 해결하기 위한 방법은 여러 가지 있겠지만 마침 EKS 환경이 구성되어 있고 Airflow 가 쿠버네티스 환경에서 운영될 수 있도록 KubernetesExecutor 를 제공하니 이를 활용하여 문제를 해결 하기로 결정했습니다.
KubernetesExecutor 를 사용하면 위와 같이 Broker 가 필요 없어지고 동적으로 Worker Pod 를 생성하여 Job 을 실행시키는 방식으로 동작하게 됩니다. 통일된 이미지를 사용하여 동일한 환경에서 DAG를 실행시킬 수 있게 해주고 특정 Task 에 필요한 리소스를 지정하면 이에 맞춰 동적으로 Worker Pod 가 생성되기 때문에, 리소스에 크게 제한 없이 Job 을 실행할 수 있으며 또한, 동적으로 필요할 때만 Worker Pod 를 통해 리소스를 사용하기 때문에 비용적으로도 최적화가 가능해 집니다.
여기까지 기존의 문제점과 쿠버네티스로 옮기면 좋을 장점에 대해 설명드렸고 이제 본격적으로 EKS에 Airflow 를 어떻게 구축했는 지 설명드리도록 하겠습니다.
EKS 기반 Airflow 구축
Airflow 는 공식 Helm Chart 를 제공하고 있는데 해당 chart 의 values.yaml 을 조작하여 쉽게 Airflow 를 구성할 수 있습니다. 그래서 해당 Chart 를 베이스로 Airflow 를 구성하면서 여러 가지 쿠버네티스 생태계에서 쓰면 좋은 Tool 들을 결합하였고 이를 통해 확장성 있는 구조의 Airflow 를 구축하였습니다. 먼저 활용한 기술 스택에 대해 설명드리겠습니다.
- Airflow Chart : https://github.com/apache/airflow/tree/helm-chart/1.13.1/chart
- AWS : EKS, S3, Parameter Store, RDS, ECR
- Deploy : ArgoCD, Helm
- Auto Scaling : Karpenter
- Monitoring : Grafana, Prometheus
회사의 주요 클라우드로 AWS 를 사용하고 있었기 때문에 DevOps 팀의 도움을 받아 Airflow 를 구축하였습니다. 먼저 쿠버네티스 클러스터로 Amazon EKS 를 사용하였고 Airflow 의 기능들을 AWS의 리소스(S3, Parameter Store, RDS, ECR)과 결합하여 사용하였습니다. 그리고 배포 자동화를 위해 ArgoCD와 Helm 을 이용하였고 Karpenter 를 통해 리소스 사용량에 비례하여 노드의 확장과 축소를 자유롭게 할 수 있게 구성하였습니다. 또한, Prometheus 와 Grafana 를 사용 중이었기에 Node Exporter 와 Statsd Exporter 를 통해 Airflow 에 대한 모니터링을 구성하였습니다.
위와 같이 기존 구성에서 환경이 변경되면서 추가적인 설정과 개발이 필요한 부분이 생기게 되었는데 기존과 비교했을 때, 어떻게 변화하였고 이를 어떻게 구축 하였는지에 대해서 설명해 보겠습니다.
DAG 코드 동기화
기존에는 각각의 Git Repository 에 있는 DAG 코드에 대해서 배포 파이프라인을 구축하여 S3의 Bucket 으로 배포하였고 각 클러스터 노드마다 s3와 sync 하는 컨테이너를 배치하여 코드를 동기화 하였습니다. 이에 관해선 아래 포스팅을 참고 바랍니다.
위와 같이 운영하면서 여러 Repository 에 대한 관리가 어려워졌고 그래서 하나의 Repository 로 통합하는 과정을 한 번 가졌었습니다. 이를 기반으로 S3 배포 파이프라인을 제거하고 사이드카 패턴으로 git-sync 컨테이너를 사용하여 각 Pod 마다 DAG 코드를 동기화하도록 변경하였습니다. git-sync 설정을 하는 방법은 다음과 같습니다.
- SSH 통신을 통하여 코드를 가져오기 때문에 SSH Key 를 생성해 줍니다.
ssh-keygen -t rsa -b 4096
- 쿠버네티스의 Secret 에 Private Key 를 등록해 줍니다.
kubectl create secret generic airflow-ssh-secret \ --from-file=gitSshKey=/path/to/.ssh/airflow_ssh_key \ -n airflow
- 이후 DAG 코드가 존재하는 Git Repository 에 생성한 SSH Key의 Public Key 를 등록해 줍니다.
- Secret 에 등록한 SSH Key를 사용하도록 values.yaml 파일에 설정해 줍니다.
# values.yaml
...
dags:
gitSync:
enabled: true
repo: your/git/repository
branch: main
depth: 1
subPath: "dags"
wait: 60
sshKeySecret: airflow-ssh-secret
- subPath 의 경우, Git 을 통해 Repository 를 가져오게 되는데 해당 Repository 기준으로 어떤 경로로 dags 파일을 읽어들일 것인지 지정하는 옵션입니다.
Logging
기존에는 Task 실행에 대한 로그가 파일 시스템에 기록되는 구조였는데 이로 인해 가끔 Disk Full 로 인해 Node 가 내려가는 상황이 발생했었습니다. 그리고 Kubernetes 환경의 경우 Worker Pod 가 Task 를 수행하고 종료되면 Pod 가 삭제되면서 Task 의 수행 로그들을 확인할 수 없는 문제가 있는데 이때 해당 로그들을 따로 저장해줘야 합니다. 그래서 Log 를 PV에 연결시켜서 보존할 수 있는데 이전 경험 때문에 PV를 사용하지 않고 S3에 저장하는 방법을 사용했습니다.
Airflow 는 S3로 Remote Logging 하는 것을 지원하고 있어 이를 활용하였습니다. 간단하게 values.yaml 파일을 조작하면 가능합니다.
config:
...
logging:
remote_logging: 'True'
logging_level: 'INFO'
remote_base_log_folder: 's3://your-bucket-name/' # Specify the S3 bucket used for logging
# remote_log_conn_id: 'aws_conn' # serviceAccount 설정이 되어 있으면 필요 X
delete_worker_pods: 'False'
encrypt_s3_logs: 'True'
이를 통해 S3로 로그가 저장되면서 Pod가 내려가도 Web UI에서 로그를 확인할 수 있고 저장 공간에 제약이 없는 S3를 통해 로그 관리가 더욱 수월해 졌습니다.
Secrets
기존에는 Web UI 에서 variables 와 connections 를 조작하여 사용하여 Secret 한 정보들을 관리하여 사용하였습니다. DevOps 팀에서는 Parameter Store 를 통해 Secret 한 정보를 관리하고 있어 이에 통합하기 위해 Airflow 의 secrets 설정으로 Parameter Store 에서 variables 와 connections 를 관리하도록 변경하였습니다.
config
secrets:
backend: airflow.providers.amazon.aws.secrets.systems_manager.SystemsManagerParameterStoreBackend
backend_kwargs: '{ "connections_prefix": "/airflow/connections", "variables_prefix": "/airflow/variables", "region_name": "ap-northeast-2"}'
Auto Scaling
기존에는 EC2 인스턴스로 Cluster 가 구성되어 있어 Task 들이 각 Worker 노드에 배치되어 실행되었는데 Worker 노드의 리소스 사용량에 비례하여 Scale Up 이나 Scale Out 을 사람이 판단하여 수동으로 진행해야 했습니다.
쿠버네티스 환경으로 옮기면서 Karpenter 라는 Auto Scaler 를 사용할 수 있게 되었고 이를 통해 각 Pod 들의 nodeSelector 를 설정하여 요청하는 리소스에 비례하여 자동적으로 노드를 확장하도록 설계하였습니다. 특히, Worker 노드의 경우 Task 가 필요할 때만 각 Pod 들이 생성되는데 요청하는 리소스에 비례하여 필요하면 노드를 프로비저닝하고 실행되는 Worker Pod 가 적어지면 다시 노드를 축소하여 비용 절약이 가능해 집니다. 그래서 사용자 입장에서는 리소스에 대해 좀 더 자유롭게 DAG를 작성할 수 있습니다.
values.yaml 에서 각 Component(Scheduler, Webserver, Triggerer, Worker, Statsd)에 nodeSelector 를 설정하여 각 Pod 별로 Auto Scaling 을 할 수 있습니다. EKS의 경우 노드 인스턴스 타입으로 spot 과 on-demand 가 있는데 spot 이 더 저렴하지만 해당 인스턴스는 언제든지 내려갈 수 있다는 점이 있습니다. 그래서 Airflow 의 경우 Batch Job 이 많기 때문에 Worker 의 경우는 spot 을 할 시 Task 를 수행하다가 해당 인스턴스가 내려가면 안되기 때문에 on-demand 타입을 사용하도록 하였고 나머지는 spot 타입을 사용하도록 설정하였습니다.
# Worker 설정
workers:
...
nodeSelector:
karpenter.sh/capacity-type: on-demand
kubernetes.io/arch: amd64
provisioner: karpenter
application: airflow
scheduler:
...
nodeSelector:
karpenter.sh/capacity-type: spot
kubernetes.io/arch: amd64
provisioner: karpenter
application: airflow
그래서 해당 인프라를 제대로 사용하기 위해서는 코드 레벨에서 리소스 지정이 필요합니다. Airflow 는 Operator 를 통해 Task 를 생성하는데 Operator 의 parameter 중에 쿠버네티스 환경에서 실행되는 Worker Pod 의 리소스를 지정할 수 있도록 지원합니다. 이를 사용자 입장에서 리소스를 쉽게 지정할 수 있게 하기 위해 다음과 같이 공용 모듈을 개발했습니다.
@dataclass
class DynamicResource:
"""
요청할 CPU와 메모리 자원을 입력합니다.
예시:
- CPU: "500m"
- 메모리: "500Mi"
- CPU의 기본 단위는 m으로, 1000m은 1 코어입니다.
- 메모리의 기본 단위는 Mi로, MB와 동일합니다.
Args:
request_cpu (str): 요청할 CPU 자원
request_memory (str): 요청할 메모리 자원
limit_cpu (Optional[str]): 제한할 CPU 자원 (선택 사항)
limit_memory (Optional[str]): 제한할 메모리 자원 (선택 사항)
Raises:
ValueError: 요청한 자원이 제한된 자원보다 큰 경우 발생
"""
request_cpu: str
request_memory: str
limit_cpu: Optional[str] = None
limit_memory: Optional[str] = None
def __post_init__(self):
# 자원 요청 값 검증
_request_cpu = int("".join(re.findall(r"\d+", self.request_cpu)))
_request_memory = int("".join(re.findall(r"\d+", self.request_memory)))
_limit_cpu = int("".join(re.findall(r"\d+", self.limit_cpu))) if self.limit_cpu else None
_limit_memory = int("".join(re.findall(r"\d+", self.limit_memory))) if self.limit_memory else None
if (_request_cpu and _limit_cpu) and (_request_cpu > _limit_cpu):
raise ValueError("request_cpu가 limit_cpu보다 클 수 없습니다")
if (_request_memory and _limit_memory) and _request_memory > _limit_memory:
raise ValueError("request_memory가 limit_memory보다 클 수 없습니다")
def assign_operator_resources(operator: BaseOperator, resource: DynamicResource) -> BaseOperator:
"""
KubernetesPodOperator가 아닌 Operator들도 자원 할당이 가능하도록 설정합니다.
Args:
operator (BaseOperator): Airflow Operator
resource (DynamicResource): 할당할 자원
Returns:
BaseOperator: 자원이 할당된 Operator
예시:
t1 = assign_operator_resources(
PythonOperator(
task_id="t1",
python_callable=...,
...
),
resource=DynamicResource(request_cpu="100m", request_memory="128Mi", limit_cpu="300m", limit_memory="256Mi"),
)
"""
operator.executor_config = {
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
resources=k8s.V1ResourceRequirements(
limits={"cpu": resource.limit_cpu, "memory": resource.limit_memory},
requests={"cpu": resource.request_cpu, "memory": resource.request_memory},
),
)
],
)
)
}
return operator
위 모듈을 통해 Operator 에 다음과 같이 적용 시킬 수 있습니다.
with dag:
# Resource 설정
resource = DynamicResource(
request_cpu="100m",
request_memory="128Mi",
limit_cpu="300m",
limit_memory="256Mi"
)
# PythonOperator에 Resource 할당
t1 = assign_operator_resources(
PythonOperator(
task_id="t1",
python_callable=lambda: print("Hello, World!"),
dag=dag
),
resource=resource
)
이를 통해 각 Worker Pod 들이 해당 리소스들을 할당 받게 되고 Karpenter 가 요청 Pod 의 리소스를 보고 노드에 대해 확장을 진행할 수 있습니다. 해당 부분은 쏘카의 기술 블로그를 참고 하였습니다.
배포 구성
기존에는 Airflow 를 설치할 때, 따로 자동화를 구성하지는 않았습니다. EC2 인스턴스에 접근해서 airflow-cli 를 통해 Airflow 를 기동시키는 방식으로 구성하였었습니다. 이것의 단점은 각 Node 마다 Airflow 설정이 동기화 되야 하기 때문에 설정을 변경할 때마다 각 Node 마다 구성을 똑같이 유지해줘야 합니다. 실수로 구성이 틀어지면 예기치 못한 상황이 발생할 수도 있어 불편한 부분이 있었습니다.
Airflow 의 환경을 통일하고 배포를 자동화하기 위해 Airflow 의 Base 가 되는 image 를 자동으로 빌드하도록 파이프라인을 구성하였고 Chart 를 통해 Airflow 를 관리하고 배포하도록 변경하여 ArgoCD가 Airflow 의 values.yaml 파일을 가지고 있는 저장소를 바라보도록 하여 Git을 통해 변경이 발생하면 이를 배포된 Airflow 와 비교하여 Helm 을 통해 반영하도록 자동화 하였습니다.
마무리
처음 EKS 환경에서 Airflow 라는 application 을 배포하면서 여러 가지 설정을 많이 해보면서 시행착오를 많이 겪었습니다. 그래서 처음엔 어려웠지만 DevOps 팀의 도움을 받아 확장성 있는 아키텍처를 구축하면서 뿌듯했던 시간 이었던 것 같습니다. EC2에서 EKS로 마이그레이션 한 이후 더욱 더 안정적인 운영이 가능해졌고 사용자 입장에서는 본인이 필요한 리소스만큼 할당해서 자유롭게 데이터 파이프라인을 구축할 수 있게 되었습니다.
앞으로 운영하면서 Airflow 를 단순 ETL 툴이 아닌 Workflow 관리 플랫폼으로 확장해 나가기 위한 기반을 마련했고 이를 중심으로 데이터 플랫폼을 만들어 나가야 할 것 같습니다.
'Data > Airflow' 카테고리의 다른 글
[Airflow] Airflow 성능 관련 설정 값 정리 (0) | 2023.12.19 |
---|---|
[Airflow] DAG CI/CD 구축기 (0) | 2023.06.21 |
[Airflow] 로그 기록 시간대 문제 (0) | 2023.04.05 |
[Airflow] DAG란? (0) | 2022.05.22 |
[Airflow] Apache Airflow 설치하기 (0) | 2022.05.16 |