Airflow

Apache Airflow는 데이터 파이프라인을 구축, 관리, 모니터링하기 위한 오픈소스 플랫폼이다.

Airflow는 복잡한 데이터 파이프라인을 효율적으로 관리할 수 있게 해주는 강력한 도구이다.
데이터 엔지니어링 분야에서 널리 사용되며, 지속적으로 발전하고 있는 플랫폼이다.

기본적인 DAG(Directed Acyclic Graph) 예시:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

# DAG 기본 설정
default_args = {
    'owner': 'data_engineer',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email': ['alert@example.com'],
    'email_on_failure': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

# DAG 정의
dag = DAG(
    'data_processing_pipeline',
    default_args=default_args,
    description='데이터 처리 파이프라인',
    schedule_interval='0 0 * * *'  # 매일 자정에 실행
)

# 태스크 함수 정의
def extract_data(**context):
    # 데이터 추출 로직
    raw_data = {'data': 'extracted_value'}
    context['task_instance'].xcom_push(key='raw_data', value=raw_data)

def transform_data(**context):
    # 데이터 변환 로직
    raw_data = context['task_instance'].xcom_pull(key='raw_data')
    transformed_data = {'data': f"transformed_{raw_data['data']}"}
    context['task_instance'].xcom_push(key='transformed_data', value=transformed_data)

def load_data(**context):
    # 데이터 적재 로직
    transformed_data = context['task_instance'].xcom_pull(key='transformed_data')
    print(f"Loading data: {transformed_data}")

# 태스크 생성
extract_task = PythonOperator(
    task_id='extract_data',
    python_callable=extract_data,
    provide_context=True,
    dag=dag
)

transform_task = PythonOperator(
    task_id='transform_data',
    python_callable=transform_data,
    provide_context=True,
    dag=dag
)

load_task = PythonOperator(
    task_id='load_data',
    python_callable=load_data,
    provide_context=True,
    dag=dag
)

# 태스크 의존성 설정
extract_task >> transform_task >> load_task

Airflow의 주요 특징

  1. Python 기반: DAG(Directed Acyclic Graph)를 Python 코드로 정의할 수 있어 유연성과 확장성이 뛰어나다.
  2. 스케줄링: 복잡한 워크플로우를 쉽게 스케줄링할 수 있다.
  3. 모니터링: 웹 인터페이스를 통해 작업 실행 상태를 실시간으로 모니터링할 수 있다.
  4. 확장성: 다양한 외부 시스템과 쉽게 통합할 수 있다.

Airflow의 주요 구성 요소

  1. DAG (Directed Acyclic Graph):

    • 작업 흐름을 정의하는 핵심 개념이다.
    • 태스크 간의 의존성을 표현합니다.
    • 작업(Task)간의 실행 순서를 나타낸다.
  2. Operator:

    • 실제 작업을 수행하는 객체이다.
    • PythonOperator, BashOperator 등 다양한 유형이 있다.
    • 예를 들어, BashOperator는 Bash 명령을, PythonOperator는 Python 함수를 실행한다.
  3. Task:

    • DAG 내에서 실행되는 개별 작업 단위.
    • Operator를 사용하여 정의된다.
  4. Scheduler:

    • DAG를 주기적으로 실행하고 관리한다.
  5. Executor:

    • 태스크를 실제로 실행하는 컴포넌트이다.
    • LocalExecutor, CeleryExecutor, KubernetesExecutor 등 다양한 종류가 있다.
  6. Web Server:

    • DAG와 태스크의 상태를 시각화하는 UI를 제공한다.

Airflow를 사용하여 데이터 파이프라인 구축하기:

  1. 환경 설정:
    • Airflow는 Python 기반이므로, Python 환경이 필요하다. 가상환경을 설정하고 필요한 패키지를 설치한다.
    • Airflow 설치 시, 데이터베이스 초기화와 사용자 생성을 진행한다.
  2. DAG 정의:
    • DAG 객체를 생성하고, 시작 날짜(start_date), 스케줄 간격(schedule_interval) 등을 설정한다.
    • 작업(Task)을 정의하고, 각 작업 간의 의존성을 설정한다.
  3. 작업(Task) 정의:
    • Operator를 사용하여 작업을 정의한다. 예를 들어, PythonOperator를 사용하여 데이터 추출, 변환, 로드 작업을 수행할 수 있다.
  4. DAG 실행 및 모니터링:
    • DAG 파일을 지정된 폴더에 저장한 후, Airflow 웹 UI를 통해 DAG를 활성화하고 실행할 수 있다.
    • 웹 UI에서 작업의 상태, 실행 로그 등을 확인하고 모니터링할 수 있다.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def extract():
    # 데이터 추출 로직
    pass

def transform():
    # 데이터 변환 로직
    pass

def load():
    # 데이터 로드 로직
    pass

default_args = {
    'start_date': datetime(2023, 1, 1),
    'schedule_interval': '@daily',
}

with DAG('etl_pipeline', default_args=default_args, catchup=False) as dag:
    extract_task = PythonOperator(
        task_id='extract',
        python_callable=extract,
    )

    transform_task = PythonOperator(
        task_id='transform',
        python_callable=transform,
    )

    load_task = PythonOperator(
        task_id='load',
        python_callable=load,
    )

    extract_task >> transform_task >> load_task

Airflow의 장점

  1. 유연성: Python 코드로 복잡한 워크플로우를 쉽게 표현할 수 있다.
  2. 확장성: 다양한 외부 시스템과 통합이 가능하다.
  3. 가시성: 웹 UI를 통해 작업 상태를 쉽게 모니터링할 수 있다.
  4. 재실행 용이성: 실패한 작업을 쉽게 재실행할 수 있다.

주의사항

  1. 리소스 관리: 대규모 워크플로우에서는 리소스 관리에 주의해야 한다.
  2. 버전 관리: DAG 코드의 버전 관리가 중요하다.
  3. 보안: 민감한 정보 처리 시 보안에 주의해야 한다.

구현 예시

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
from airflow.operators.bash import BashOperator
from airflow.operators.python import BranchPythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator

# 조건부 브랜칭을 위한 함수
def check_data_quality(**context):
    data_quality = context['task_instance'].xcom_pull(key='quality_score')
    if data_quality > 0.9:
        return 'process_high_quality_data'
    else:
        return 'process_low_quality_data'

# 고급 데이터 파이프라인 DAG
with DAG(
    'advanced_data_pipeline',
    default_args=default_args,
    schedule_interval='@daily'
) as dag:

    # S3 버킷 생성
    create_bucket = S3CreateBucketOperator(
        task_id='create_s3_bucket',
        bucket_name='my-data-bucket',
        region_name='ap-northeast-2'
    )

    # PostgreSQL 테이블 생성
    create_table = PostgresOperator(
        task_id='create_table',
        postgres_conn_id='postgres_default',
        sql="""
            CREATE TABLE IF NOT EXISTS processed_data (
                id SERIAL PRIMARY KEY,
                data_value VARCHAR(100),
                processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            );
        """
    )

    # 데이터 품질 확인
    check_quality = BranchPythonOperator(
        task_id='check_data_quality',
        python_callable=check_data_quality
    )

    # 고품질 데이터 처리
    process_high_quality = BashOperator(
        task_id='process_high_quality_data',
        bash_command='echo "Processing high quality data"'
    )

    # 저품질 데이터 처리
    process_low_quality = BashOperator(
        task_id='process_low_quality_data',
        bash_command='echo "Processing low quality data"'
    )

    # 태스크 의존성 설정
    create_bucket >> create_table >> check_quality
    check_quality >> [process_high_quality, process_low_quality]

참고 및 출처