DateTime 클래스를 상속받은 logical_date의 now() 메소드는 현지 시간대 기준으로의 현재 시간을 가진 DateTime 객체를 반환하기 때문에, 로그가 발생할 때 시간을 기준으로 로그를 찍기 위하여 사용.
로그
time:2023-02-27T10:15:35.834+0900 filename:taskinstance.py:1851 levelname:ERROR message:Task failed with exception
Traceback (most recent call last):
File "/home/ec2-user/da_efk_study/airflow/venv/lib64/python3.8/site-packages/airflow/operators/python.py", line 175, in execute
return_value = self.execute_callable()
File "/home/ec2-user/da_efk_study/airflow/venv/lib64/python3.8/site-packages/airflow/operators/python.py", line 193, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/home/ec2-user/da_efk_study/airflow/dags/raise_error_dag.py", line 9, in raise_error
result = 1/0
ZeroDivisionError: division by zero
Fluentd
<source>
# 플러그인
@type tail
# tail plugin으로 가져올 로그 파일 위치
path /var/log/airflow/dag/*/*.log
# pos 파일 위치
pos_file /var/log/airflow/td-agent/airflow.dag.log.pos
# fluentd 이벤트 태그 지정
tag airflow.dag
# multiline parsing 을 사용할 때, 매칭되는 로그가 버퍼에 들어간 후, 다음 매칭되는 로그가 버퍼링 되기 전까지 최대 5초까지 대기 후 함께 flush
multiline_flush_interval 5s
<parse>
@type multiline
format_firstline /^time:/
format1 /^time:(?<time>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}\+0900) filename:(?<filename>[^:]+:\d+) levelname:(?<levelname>\w+) message:(?<message>.*)/
time_key time
time_format %Y-%m-%dT%H:%M:%S.%N%z
</parse>
</source>
multiline parser 를 사용.
위의 Airflow 로그처럼 message에 ‘\n’ 즉, 줄 바꿈 문자가 포함되어 있는 경우, 로그가 다중 라인을 갖기 때문에 이를 위한 플러그인으로 multiline 플러그인을 사용.
access의 경우, LTSV 형식으로 포맷을 정의해 놓았기 때문에, 그것을 ltsv parser로 파싱 후 수집
error의 경우, 정해진 형식이 있어 그것을 RE로 파싱 후 수집
Fluentd 출력
<match airflow.*.**>
@type copy
<store>
@type elasticsearch
host elasticsearch
port 9200
# default로 index 생성에 utc 시간대로 설정되어 있음.
utc_index false
# index 이름은 fluentd.{tag}-{dateformat}
logstash_format true
logstash_prefix fluentd.${tag}
logstash_dateformat %Y%m%d
</store>
</match>
Elastalert
# airflow.yaml
# ES HOST
es_host: elasticsearch
# ES port
es_port: 9200
# 메시지 이름
name: AIRFLOW ERROR
# Type
type: frequency
# 모니터링할 ES index pattern
index: fluentd.airflow.*
# 매칭된 이벤트의 수가 value 값 이상이면 alert
num_events: 1
# 감지할 이벤트의 시간 길이, 쿼리를 날렸을 때 감지할 시간 단위
timeframe:
minutes: 1
# 어떤 쿼리를 날릴 것인지
filter:
- query:
query_string:
query: "levelname:ERROR"
alert:
- slack:
# slack webhook URL
slack_webhook_url: "https://hooks.slack.com/services/..."
# slack 메시지를 날릴 유저의 이름
slack_username_override: "AIRFLOW_ALERT"
# slack 채널
slack_channel_override: "elastalert"
# slack 메시지의 이모지
slack_emoji_override: ":ghost:"
# 오류 메시지 앞에 붙는 메시지
slack_text_string: "Airflow Error"
# "alert_text_only"를 지정하지 않으면, 해당 ES index의 필드 값들이 모두 나옴.
alert_text_type: "alert_text_only"
# slack 오류 메시지 필드 구성
slack_alert_fields:
# 메시지에 표시할 이름
- title: Index
# 매핑할 데이터 필드 이름
value: _index
# 메시지를 작은 블록으로 표현할 것인지 (긴 텍스트면 이쁘게 보이지 않음.)
short: True
- title: Time
value: "@timestamp"
short: True
- title: Logging Level
value: levelname
short: True
- title: Message
value: message
- title: num_hits
value: num_hits
short: True
- title: num_matches
value: num_matches
short: True