필요 설정 파일
- Airflow
- airflow.cfg
- Fluentd
- fluentd.conf
- Elastalert
- airflow.yaml
설정
Dag Run Log
Airflow
# airflow.cfg
[logging]
log_format = time:%%(asctime)s filename:%%(filename)s:%%(lineno)d levelname:%%(levelname)s message:%%(message)s
log_filename_template = dag/{{ ti.dag_id }}/{{ logical_date.now().year }}-{{ logical_date.now().month }}-{{ logical_date.now().day }}.log
- log_format
- Dag를 실행시킨 후 해당 Dag에서 발생하는 로그의 포맷을 설정하는 부분.
- mapping 변수
- asctime : 로그 발생 시간
- filename : Dag Task를 실행하는 파이썬 파일
- lineno : filename 파일의 코드 줄 번호
- levelname : Logging Level
- message : 발생 메시지
- log_filename_template
- Dag의 로그 파일 이름
- logical_date라는 객체의 now()를 사용한 이유
- DateTime 클래스를 상속받은 logical_date의 now() 메소드는 현지 시간대 기준으로의 현재 시간을 가진 DateTime 객체를 반환하기 때문에, 로그가 발생할 때 시간을 기준으로 로그를 찍기 위하여 사용.
- 로그
time:2023-02-27T10:15:35.834+0900 filename:taskinstance.py:1851 levelname:ERROR message:Task failed with exception
Traceback (most recent call last):
File "/home/ec2-user/da_efk_study/airflow/venv/lib64/python3.8/site-packages/airflow/operators/python.py", line 175, in execute
return_value = self.execute_callable()
File "/home/ec2-user/da_efk_study/airflow/venv/lib64/python3.8/site-packages/airflow/operators/python.py", line 193, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/home/ec2-user/da_efk_study/airflow/dags/raise_error_dag.py", line 9, in raise_error
result = 1/0
ZeroDivisionError: division by zero
Fluentd
<source>
# 플러그인
@type tail
# tail plugin으로 가져올 로그 파일 위치
path /var/log/airflow/dag/*/*.log
# pos 파일 위치
pos_file /var/log/airflow/td-agent/airflow.dag.log.pos
# fluentd 이벤트 태그 지정
tag airflow.dag
# multiline parsing 을 사용할 때, 매칭되는 로그가 버퍼에 들어간 후, 다음 매칭되는 로그가 버퍼링 되기 전까지 최대 5초까지 대기 후 함께 flush
multiline_flush_interval 5s
<parse>
@type multiline
format_firstline /^time:/
format1 /^time:(?<time>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}\+0900) filename:(?<filename>[^:]+:\d+) levelname:(?<levelname>\w+) message:(?<message>.*)/
time_key time
time_format %Y-%m-%dT%H:%M:%S.%N%z
</parse>
</source>
- multiline parser 를 사용.
- 위의 Airflow 로그처럼 message에 ‘\n’ 즉, 줄 바꿈 문자가 포함되어 있는 경우, 로그가 다중 라인을 갖기 때문에 이를 위한 플러그인으로 multiline 플러그인을 사용.
Scheduler Log
Airflow
- Dag Log 설정 그대로 따라감.
Fluentd
# fluentd.conf
# scheduler
<source>
@type tail
path /var/log/airflow/scheduler/*/*.log
pos_file /var/log/airflow/td-agent/airflow.scheduler.log.pos
<parse>
@type ltsv
delimiter ' '
time_key time
time_format %Y-%m-%dT%H:%M:%S.%N%z
</parse>
tag airflow.scheduler
</source>
- ltsv parse 플러그인 사용.
- delimiter를 '\t' 대신에 ' '(2 space) 사용.
- LTSV(Labeled Tab-Separated Values) : Tab 키로 구분된 Key:Value 형태의 포맷을 의미.
- Fluentd에서는 여러 가지 format에 대한 parser를 지원하는데, parser마다의 처리 속도가 다름.
- LTSV > JSON > RE 순으로 처리 속도가 빠르다.
- 벤치마크 결과 (Apache 2 combined 형식의 log 10만회 parsing 했을 경우)
집계 방법 | 시간(초) | 초당처리건수 |
최장일치 정규표현 | 20.6 | 4,854 |
최단일치 정규표현 | 1.5 | 66,667 |
JSON 형식에 대한 format json | 0.75 | 133,333 |
LTSV 형식에 대한 format ltsv | 0.39 | 256,410 |
Webserver Log
Airflow
# airflow.cfg
[webserver]
access_logformat = time:%%(t)s IP:%%(h)s request:%%(r)s status:%%(s)s bytes:%%(b)s url:%%(f)s UA:%%(a)s
- mapping
- t : 시간
- h : 접속 host IP
- r : request 정보
- s : HTTP Status
- b : request bytes
- f : url
- a : 접속 환경
- Access 로그
time:[27/Feb/2023:10:01:59 +0900] IP:106.240.29.163 request:GET /static/appbuilder/js/jquery-latest.js HTTP/1.1 status:304 bytes:0 url:http://3.37.234.22:8080/home UA:Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36
- Error 로그
[2023-02-23 11:55:08 +0900] [8253] [INFO] Starting gunicorn 20.1.0
[2023-02-23 11:55:08 +0900] [8253] [INFO] Listening at: http://0.0.0.0:8080 (8253)
[2023-02-23 11:55:08 +0900] [8253] [INFO] Using worker: sync
[2023-02-23 11:55:08 +0900] [8297] [INFO] Booting worker with pid: 8297
[2023-02-23 11:55:08 +0900] [8298] [INFO] Booting worker with pid: 8298
[2023-02-23 11:55:08 +0900] [8299] [INFO] Booting worker with pid: 8299
[2023-02-23 11:55:08 +0900] [8300] [INFO] Booting worker with pid: 8300
Fluentd
# fluent.conf
# webserver access
<source>
@type tail
path /var/log/airflow/webserver/access.log
pos_file /var/log/airflow/td-agent/airflow.webserver.access.log.pos
tag airflow.webserver.access
<parse>
@type ltsv
delimiter ' '
time_key time
time_format '[%d/%b/%Y:%H:%M:%S %z]'
</parse>
</source>
# webserver error
<source>
@type tail
path /var/log/airflow/webserver/error.log
pos_file /var/log/airflow/td-agent/airflow.webserver.error.log.pos
tag airflow.webserver.error
<parse>
@type regexp
expression /^\[(?<time>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} \+0900)\] \[(?<pid>\d+)\] \[(?<levelname>\w+)\] (?<message>.*)$/
time_key time
time_format %Y-%m-%d %H:%M:%S %z
</parse>
</source>
- access, error 두 가지로 나눈 이유
- access 로그는 포맷을 설정할 수 있지만, error 로그는 포맷 설정이 불가능.
- access의 경우, LTSV 형식으로 포맷을 정의해 놓았기 때문에, 그것을 ltsv parser로 파싱 후 수집
- error의 경우, 정해진 형식이 있어 그것을 RE로 파싱 후 수집
Fluentd 출력
<match airflow.*.**>
@type copy
<store>
@type elasticsearch
host elasticsearch
port 9200
# default로 index 생성에 utc 시간대로 설정되어 있음.
utc_index false
# index 이름은 fluentd.{tag}-{dateformat}
logstash_format true
logstash_prefix fluentd.${tag}
logstash_dateformat %Y%m%d
</store>
</match>
Elastalert
# airflow.yaml
# ES HOST
es_host: elasticsearch
# ES port
es_port: 9200
# 메시지 이름
name: AIRFLOW ERROR
# Type
type: frequency
# 모니터링할 ES index pattern
index: fluentd.airflow.*
# 매칭된 이벤트의 수가 value 값 이상이면 alert
num_events: 1
# 감지할 이벤트의 시간 길이, 쿼리를 날렸을 때 감지할 시간 단위
timeframe:
minutes: 1
# 어떤 쿼리를 날릴 것인지
filter:
- query:
query_string:
query: "levelname:ERROR"
alert:
- slack:
# slack webhook URL
slack_webhook_url: "https://hooks.slack.com/services/..."
# slack 메시지를 날릴 유저의 이름
slack_username_override: "AIRFLOW_ALERT"
# slack 채널
slack_channel_override: "elastalert"
# slack 메시지의 이모지
slack_emoji_override: ":ghost:"
# 오류 메시지 앞에 붙는 메시지
slack_text_string: "Airflow Error"
# "alert_text_only"를 지정하지 않으면, 해당 ES index의 필드 값들이 모두 나옴.
alert_text_type: "alert_text_only"
# slack 오류 메시지 필드 구성
slack_alert_fields:
# 메시지에 표시할 이름
- title: Index
# 매핑할 데이터 필드 이름
value: _index
# 메시지를 작은 블록으로 표현할 것인지 (긴 텍스트면 이쁘게 보이지 않음.)
short: True
- title: Time
value: "@timestamp"
short: True
- title: Logging Level
value: levelname
short: True
- title: Message
value: message
- title: num_hits
value: num_hits
short: True
- title: num_matches
value: num_matches
short: True
결과

'Backend' 카테고리의 다른 글
FastAPI로 Serving API 구축하기 (0) | 2023.05.27 |
---|---|
Locust 동작 방식 (1) | 2023.04.15 |
Fluentd 기본 개념 (1) | 2023.03.14 |
필요 설정 파일
- Airflow
- airflow.cfg
- Fluentd
- fluentd.conf
- Elastalert
- airflow.yaml
설정
Dag Run Log
Airflow
# airflow.cfg
[logging]
log_format = time:%%(asctime)s filename:%%(filename)s:%%(lineno)d levelname:%%(levelname)s message:%%(message)s
log_filename_template = dag/{{ ti.dag_id }}/{{ logical_date.now().year }}-{{ logical_date.now().month }}-{{ logical_date.now().day }}.log
- log_format
- Dag를 실행시킨 후 해당 Dag에서 발생하는 로그의 포맷을 설정하는 부분.
- mapping 변수
- asctime : 로그 발생 시간
- filename : Dag Task를 실행하는 파이썬 파일
- lineno : filename 파일의 코드 줄 번호
- levelname : Logging Level
- message : 발생 메시지
- log_filename_template
- Dag의 로그 파일 이름
- logical_date라는 객체의 now()를 사용한 이유
- DateTime 클래스를 상속받은 logical_date의 now() 메소드는 현지 시간대 기준으로의 현재 시간을 가진 DateTime 객체를 반환하기 때문에, 로그가 발생할 때 시간을 기준으로 로그를 찍기 위하여 사용.
- 로그
time:2023-02-27T10:15:35.834+0900 filename:taskinstance.py:1851 levelname:ERROR message:Task failed with exception
Traceback (most recent call last):
File "/home/ec2-user/da_efk_study/airflow/venv/lib64/python3.8/site-packages/airflow/operators/python.py", line 175, in execute
return_value = self.execute_callable()
File "/home/ec2-user/da_efk_study/airflow/venv/lib64/python3.8/site-packages/airflow/operators/python.py", line 193, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/home/ec2-user/da_efk_study/airflow/dags/raise_error_dag.py", line 9, in raise_error
result = 1/0
ZeroDivisionError: division by zero
Fluentd
<source>
# 플러그인
@type tail
# tail plugin으로 가져올 로그 파일 위치
path /var/log/airflow/dag/*/*.log
# pos 파일 위치
pos_file /var/log/airflow/td-agent/airflow.dag.log.pos
# fluentd 이벤트 태그 지정
tag airflow.dag
# multiline parsing 을 사용할 때, 매칭되는 로그가 버퍼에 들어간 후, 다음 매칭되는 로그가 버퍼링 되기 전까지 최대 5초까지 대기 후 함께 flush
multiline_flush_interval 5s
<parse>
@type multiline
format_firstline /^time:/
format1 /^time:(?<time>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}\+0900) filename:(?<filename>[^:]+:\d+) levelname:(?<levelname>\w+) message:(?<message>.*)/
time_key time
time_format %Y-%m-%dT%H:%M:%S.%N%z
</parse>
</source>
- multiline parser 를 사용.
- 위의 Airflow 로그처럼 message에 ‘\n’ 즉, 줄 바꿈 문자가 포함되어 있는 경우, 로그가 다중 라인을 갖기 때문에 이를 위한 플러그인으로 multiline 플러그인을 사용.
Scheduler Log
Airflow
- Dag Log 설정 그대로 따라감.
Fluentd
# fluentd.conf
# scheduler
<source>
@type tail
path /var/log/airflow/scheduler/*/*.log
pos_file /var/log/airflow/td-agent/airflow.scheduler.log.pos
<parse>
@type ltsv
delimiter ' '
time_key time
time_format %Y-%m-%dT%H:%M:%S.%N%z
</parse>
tag airflow.scheduler
</source>
- ltsv parse 플러그인 사용.
- delimiter를 '\t' 대신에 ' '(2 space) 사용.
- LTSV(Labeled Tab-Separated Values) : Tab 키로 구분된 Key:Value 형태의 포맷을 의미.
- Fluentd에서는 여러 가지 format에 대한 parser를 지원하는데, parser마다의 처리 속도가 다름.
- LTSV > JSON > RE 순으로 처리 속도가 빠르다.
- 벤치마크 결과 (Apache 2 combined 형식의 log 10만회 parsing 했을 경우)
집계 방법 | 시간(초) | 초당처리건수 |
최장일치 정규표현 | 20.6 | 4,854 |
최단일치 정규표현 | 1.5 | 66,667 |
JSON 형식에 대한 format json | 0.75 | 133,333 |
LTSV 형식에 대한 format ltsv | 0.39 | 256,410 |
Webserver Log
Airflow
# airflow.cfg
[webserver]
access_logformat = time:%%(t)s IP:%%(h)s request:%%(r)s status:%%(s)s bytes:%%(b)s url:%%(f)s UA:%%(a)s
- mapping
- t : 시간
- h : 접속 host IP
- r : request 정보
- s : HTTP Status
- b : request bytes
- f : url
- a : 접속 환경
- Access 로그
time:[27/Feb/2023:10:01:59 +0900] IP:106.240.29.163 request:GET /static/appbuilder/js/jquery-latest.js HTTP/1.1 status:304 bytes:0 url:http://3.37.234.22:8080/home UA:Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36
- Error 로그
[2023-02-23 11:55:08 +0900] [8253] [INFO] Starting gunicorn 20.1.0
[2023-02-23 11:55:08 +0900] [8253] [INFO] Listening at: http://0.0.0.0:8080 (8253)
[2023-02-23 11:55:08 +0900] [8253] [INFO] Using worker: sync
[2023-02-23 11:55:08 +0900] [8297] [INFO] Booting worker with pid: 8297
[2023-02-23 11:55:08 +0900] [8298] [INFO] Booting worker with pid: 8298
[2023-02-23 11:55:08 +0900] [8299] [INFO] Booting worker with pid: 8299
[2023-02-23 11:55:08 +0900] [8300] [INFO] Booting worker with pid: 8300
Fluentd
# fluent.conf
# webserver access
<source>
@type tail
path /var/log/airflow/webserver/access.log
pos_file /var/log/airflow/td-agent/airflow.webserver.access.log.pos
tag airflow.webserver.access
<parse>
@type ltsv
delimiter ' '
time_key time
time_format '[%d/%b/%Y:%H:%M:%S %z]'
</parse>
</source>
# webserver error
<source>
@type tail
path /var/log/airflow/webserver/error.log
pos_file /var/log/airflow/td-agent/airflow.webserver.error.log.pos
tag airflow.webserver.error
<parse>
@type regexp
expression /^\[(?<time>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} \+0900)\] \[(?<pid>\d+)\] \[(?<levelname>\w+)\] (?<message>.*)$/
time_key time
time_format %Y-%m-%d %H:%M:%S %z
</parse>
</source>
- access, error 두 가지로 나눈 이유
- access 로그는 포맷을 설정할 수 있지만, error 로그는 포맷 설정이 불가능.
- access의 경우, LTSV 형식으로 포맷을 정의해 놓았기 때문에, 그것을 ltsv parser로 파싱 후 수집
- error의 경우, 정해진 형식이 있어 그것을 RE로 파싱 후 수집
Fluentd 출력
<match airflow.*.**>
@type copy
<store>
@type elasticsearch
host elasticsearch
port 9200
# default로 index 생성에 utc 시간대로 설정되어 있음.
utc_index false
# index 이름은 fluentd.{tag}-{dateformat}
logstash_format true
logstash_prefix fluentd.${tag}
logstash_dateformat %Y%m%d
</store>
</match>
Elastalert
# airflow.yaml
# ES HOST
es_host: elasticsearch
# ES port
es_port: 9200
# 메시지 이름
name: AIRFLOW ERROR
# Type
type: frequency
# 모니터링할 ES index pattern
index: fluentd.airflow.*
# 매칭된 이벤트의 수가 value 값 이상이면 alert
num_events: 1
# 감지할 이벤트의 시간 길이, 쿼리를 날렸을 때 감지할 시간 단위
timeframe:
minutes: 1
# 어떤 쿼리를 날릴 것인지
filter:
- query:
query_string:
query: "levelname:ERROR"
alert:
- slack:
# slack webhook URL
slack_webhook_url: "https://hooks.slack.com/services/..."
# slack 메시지를 날릴 유저의 이름
slack_username_override: "AIRFLOW_ALERT"
# slack 채널
slack_channel_override: "elastalert"
# slack 메시지의 이모지
slack_emoji_override: ":ghost:"
# 오류 메시지 앞에 붙는 메시지
slack_text_string: "Airflow Error"
# "alert_text_only"를 지정하지 않으면, 해당 ES index의 필드 값들이 모두 나옴.
alert_text_type: "alert_text_only"
# slack 오류 메시지 필드 구성
slack_alert_fields:
# 메시지에 표시할 이름
- title: Index
# 매핑할 데이터 필드 이름
value: _index
# 메시지를 작은 블록으로 표현할 것인지 (긴 텍스트면 이쁘게 보이지 않음.)
short: True
- title: Time
value: "@timestamp"
short: True
- title: Logging Level
value: levelname
short: True
- title: Message
value: message
- title: num_hits
value: num_hits
short: True
- title: num_matches
value: num_matches
short: True
결과

'Backend' 카테고리의 다른 글
FastAPI로 Serving API 구축하기 (0) | 2023.05.27 |
---|---|
Locust 동작 방식 (1) | 2023.04.15 |
Fluentd 기본 개념 (1) | 2023.03.14 |