인공지능 개발자 수다(유튜브 바로가기) 자세히보기

Airflow

[Airflow] DAG

Suda_777 2025. 9. 3. 00:37

목차

    반응형

    1. DAG 개념

    • Directed : Task가 방향성을 가진다 (선후 관계가 있음)
    • Acyclic : 순환이 없다.
    • Graph : Task의 관계를 나타낸 구조

     


    2. DAG 정의

    • dag_id : DAG 이름 (고유 식별자)
    • start_date : 기준 시각. 과거로 두면 catchup이 True일 때 과거 분량이 밀린다. 학습 중에는 보통 catchup을 False로 둔다.
    • schedule_interval : 실행주기, 크론 문자열이나 프리셋을 쓴다. 예, @once, @hourly, @daily, 0 9 * * *.
    • catchup: True면 과거 실행분을 밀린 것까지 수행
    • default_args: 공통 인자 (owner, retries, retry_delay 등)
    • tags: DAG 그룹핑 및 검색 태그
    from datetime import datetime, timedelta
    from airflow import DAG
    from airflow.operators.python import PythonOperator
    from airflow.operators.bash import BashOperator
    
    default_args = {
        "owner": "data",
        "retries": 2,
        "retry_delay": timedelta(minutes=5),
    }
    
    with DAG(
        dag_id="etl_daily",
        start_date=datetime(2025, 9, 1),
        schedule_interval="@daily",
        catchup=False,
        default_args=default_args,
        tags=["example", "etl"],
    ) as dag:
    	...

     


    3. Operator와 Task

    3.1. 개념

    Operator

    • Task 객체를 만드는 클래스
    • 어떤 일을 할지 정의해 둠

    Task
    어떤 일을 하는 객체(인스턴스)

     


    3.2. Operator의 종류

    3.2.1. 기본 Operator

    PythonOperator

    • airflow와 같은 인터프리터를 공유하므로, 정말 간단한 작업만 하기 적합함
    from airflow import DAG
    from airflow.operators.python import PythonOperator
    from datetime import datetime
    
    def my_task():
        print("Hello from Airflow task in Python venv")
    
    with DAG(
        dag_id="simple_python_dag",
        start_date=datetime(2025, 8, 1),
        schedule_interval="@daily",
        catchup=False,
    ) as dag:
        
        run_python = PythonOperator(
            task_id="run_python_task",
            python_callable=my_task,
        )

     

    BashOperator

    • Bash 실행시 사용
    from airflow import DAG
    from airflow.operators.bash import BashOperator
    from datetime import datetime
    
    with DAG("example_dag", start_date=datetime(2025, 8, 1), schedule_interval="@daily") as dag:
        # Operator를 이용해 Task 생성
        task1 = BashOperator(
            task_id="print_date",
            bash_command="date"
        )
        task2 = BashOperator(
            task_id="print_hello",
            bash_command="echo 'Hello Airflow'"
        )
    
        task1 >> task2  # 실행 순서 정의

     

    PythonVirtualenvOperator

    • 파이썬 가상환경을 만들어 실행. 환경 생성으로 좀 느릴 수 있음
    from airflow.operators.python import VirtualenvOperator
    
    def my_virtualenv_task():
        import requests
        print("requests version:", requests.__version__)
    
    run_in_venv = VirtualenvOperator(
        task_id="venv_task",
        python_callable=my_virtualenv_task,
        requirements=["requests==2.31.0"],  # venv에만 설치됨
        system_site_packages=False,
    )

     

    DockerOperator

    • 간단한 작업에 사용 가능, 독립적인 실행환경 제공, 난이도 쉬움
    from airflow.operators.docker_operator import DockerOperator
    
    task = DockerOperator(
        task_id="run_docker_task",
        image="python:3.9",
        command="python script.py",
        docker_url="unix://var/run/docker.sock",
        network_mode="bridge"
    )

     

    KubernetesPodOperator

    • 팀 단위로 다수의 파이프라인을 확장하는 환경에서 사용

     


    4. 테스크 의존성 정의

    즉, DAG 안에서 실행 순서를 직관적으로 표현할 수 있도록 Airflow에서 특별히 지원하는 문법

    task1 >> task2

    → task1이 끝난 뒤 task2 실행 (task1 → task2)

    from airflow.operators.bash import BashOperator
    from airflow import DAG
    from datetime import datetime
    
    with DAG("example", start_date=datetime(2025, 9, 1), schedule_interval="@daily", catchup=False) as dag:
        t1 = BashOperator(task_id="task1", bash_command="echo 1")
        t2 = BashOperator(task_id="task2", bash_command="echo 2")
        t3 = BashOperator(task_id="task3", bash_command="echo 3")
    
        # 실행 순서 정의
        t1 >> t2 >> t3   # task1 → task2 → task3
    반응형

    'Airflow' 카테고리의 다른 글

    [Airflow] 설치 방법 (로컬 환경)  (1) 2025.08.25
    [Airflow] 개요  (2) 2025.08.25