목차
반응형
1. DAG 개념
- Directed : Task가 방향성을 가진다 (선후 관계가 있음)
- Acyclic : 순환이 없다.
- Graph : Task의 관계를 나타낸 구조
2. DAG 정의
- dag_id : DAG 이름 (고유 식별자)
- start_date : 기준 시각. 과거로 두면 catchup이 True일 때 과거 분량이 밀린다. 학습 중에는 보통 catchup을 False로 둔다.
- schedule_interval : 실행주기, 크론 문자열이나 프리셋을 쓴다. 예, @once, @hourly, @daily, 0 9 * * *.
- catchup: True면 과거 실행분을 밀린 것까지 수행
- default_args: 공통 인자 (owner, retries, retry_delay 등)
- tags: DAG 그룹핑 및 검색 태그
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
default_args = {
"owner": "data",
"retries": 2,
"retry_delay": timedelta(minutes=5),
}
with DAG(
dag_id="etl_daily",
start_date=datetime(2025, 9, 1),
schedule_interval="@daily",
catchup=False,
default_args=default_args,
tags=["example", "etl"],
) as dag:
...
3. Operator와 Task
3.1. 개념
Operator
- Task 객체를 만드는 클래스
- 어떤 일을 할지 정의해 둠
Task
어떤 일을 하는 객체(인스턴스)
3.2. Operator의 종류
3.2.1. 기본 Operator
PythonOperator
- airflow와 같은 인터프리터를 공유하므로, 정말 간단한 작업만 하기 적합함
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def my_task():
print("Hello from Airflow task in Python venv")
with DAG(
dag_id="simple_python_dag",
start_date=datetime(2025, 8, 1),
schedule_interval="@daily",
catchup=False,
) as dag:
run_python = PythonOperator(
task_id="run_python_task",
python_callable=my_task,
)
BashOperator
- Bash 실행시 사용
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
with DAG("example_dag", start_date=datetime(2025, 8, 1), schedule_interval="@daily") as dag:
# Operator를 이용해 Task 생성
task1 = BashOperator(
task_id="print_date",
bash_command="date"
)
task2 = BashOperator(
task_id="print_hello",
bash_command="echo 'Hello Airflow'"
)
task1 >> task2 # 실행 순서 정의
PythonVirtualenvOperator
- 파이썬 가상환경을 만들어 실행. 환경 생성으로 좀 느릴 수 있음
from airflow.operators.python import VirtualenvOperator
def my_virtualenv_task():
import requests
print("requests version:", requests.__version__)
run_in_venv = VirtualenvOperator(
task_id="venv_task",
python_callable=my_virtualenv_task,
requirements=["requests==2.31.0"], # venv에만 설치됨
system_site_packages=False,
)
DockerOperator
- 간단한 작업에 사용 가능, 독립적인 실행환경 제공, 난이도 쉬움
from airflow.operators.docker_operator import DockerOperator
task = DockerOperator(
task_id="run_docker_task",
image="python:3.9",
command="python script.py",
docker_url="unix://var/run/docker.sock",
network_mode="bridge"
)
KubernetesPodOperator
- 팀 단위로 다수의 파이프라인을 확장하는 환경에서 사용
4. 테스크 의존성 정의
즉, DAG 안에서 실행 순서를 직관적으로 표현할 수 있도록 Airflow에서 특별히 지원하는 문법
task1 >> task2
→ task1이 끝난 뒤 task2 실행 (task1 → task2)
from airflow.operators.bash import BashOperator
from airflow import DAG
from datetime import datetime
with DAG("example", start_date=datetime(2025, 9, 1), schedule_interval="@daily", catchup=False) as dag:
t1 = BashOperator(task_id="task1", bash_command="echo 1")
t2 = BashOperator(task_id="task2", bash_command="echo 2")
t3 = BashOperator(task_id="task3", bash_command="echo 3")
# 실행 순서 정의
t1 >> t2 >> t3 # task1 → task2 → task3
반응형
'Airflow' 카테고리의 다른 글
[Airflow] 설치 방법 (로컬 환경) (1) | 2025.08.25 |
---|---|
[Airflow] 개요 (2) | 2025.08.25 |