Fastapi

[FastAPI] 비동기(Asynchronous)프로그래밍 (4)

Suda_777 2024. 8. 24. 20:13
반응형

1. 비동기(Asynchronous)프로그래밍의 개념

비동기 작업은 작업이 시작된 후 완료되기를 기다리지 않고, 그 사이에 다른 작업을 수행할 수 있는 프로그래밍 방식입니다. 이는 동기식 작업과는 달리 여러 작업을 병렬로 처리할 수 있게 해줍니다.

비동기 작업은 보통 async 및 await 키워드를 사용하여 정의됩니다.

  • async: 비동기 함수(코루틴) 정의한다.
  • await: 비동기 함수 내에서 시간이 오래 걸리는 작업을 수행할 때 사용한다.
    • File input/output 작업 (대용량 파일을 읽거나 쓰는 작업)
    • 데이터베이스 쿼리
    • 웹 서버: 많은 수의 클라이언트 요청을 동시에 처리해야 하는 경우

2. 예시 코드

2.1. 비동기 함수(코루틴 정의)

1. def 앞에 async를 붙여준다.

2. 시간이 오래 걸리는 작업에 await 키워드를 붙여준다.

from pydantic import BaseModel

# Pydantic 모델 정의
class UserSchema(BaseModel):
    id: int
    name: str

    class Config:
        orm_mode = True  # SQLAlchemy ORM 객체와 호환되도록 설정


# id가 1인 사용자를 가져오는 비동기 함수
async def get_user_by_id(user_id: int):    
    # 데이터베이스를 사용사기 위한 세션
    async with AsyncSessionLocal() as session:
        query = select(User).where(User.id == user_id)  # 쿼리 정의
        result = await session.execute(query)  # 쿼리 실행 (await 사용)
        user = result.scalar_one()  # 값을 가져옴
        return UserSchema.from_orm(user)

2.2. 비동기 함수 실행

2.2.1. 다른 비동기 함수 내에서 실행

  • await 을 이용해 실행한다.
from fastapi import FastAPI, HTTPException

app = FastAPI()

@app.get("/users/{user_id}", response_model=UserSchema)
async def read_user(user_id: int):
    user = await get_user_by_id(user_id)
    if user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return user

 

2.2.2. Task를 사용한 방법 1

  • task = asyncio.create_task() #메소드를 이용해 Task를 정의한다.
  • await task  # 위에서 정의한 task를 await와 함께, 언제까지 기다릴지 정의한다.
import asyncio

async def task1():
    await asyncio.sleep(2)
    print("Task 1 completed")

async def task2():
    await asyncio.sleep(1)
    print("Task 2 completed")

async def main():
    t1 = asyncio.create_task(task1())  # Task 1 실행
    t2 = asyncio.create_task(task2())  # Task 2 실행
    await t1  # Task 1의 완료를 기다림
    await t2  # Task 2의 완료를 기다림

asyncio.run(main())

 

위 코드에서는 t1과 t2가 동시에 실행되기 시작한다.

그리고, await t1 에서 t1이 모두 실행될 때 까지 기다린다.

이후에 await t2 에서 t2가 모두 실행될 때 까지 기다린다.

 

아래는 FastAPI에서 사용하는 예시코드 이다.

from fastapi import FastAPI
import asyncio

app = FastAPI()

async def fetch_data_from_service_a():
    await asyncio.sleep(2)  # 외부 서비스 A에서 데이터 가져오기
    return "Data from service A"

async def fetch_data_from_service_b():
    await asyncio.sleep(3)  # 외부 서비스 B에서 데이터 가져오기
    return "Data from service B"

@app.get("/aggregate-data/")
async def aggregate_data():
    task_a = asyncio.create_task(fetch_data_from_service_a())
    task_b = asyncio.create_task(fetch_data_from_service_b())
    
    # 두 작업의 결과를 동시에 기다림
    result_a = await task_a
    result_b = await task_b
    
    return {"service_a": result_a, "service_b": result_b}

 

2.2.3. Task를 이용한 실행방법 2 (백그라운드에서 코드 동작하기)

from fastapi import FastAPI, BackgroundTasks
import asyncio

app = FastAPI()

async def long_running_task():
    await asyncio.sleep(10)  # 10초 동안 작업 수행
    print("Task completed")

@app.get("/run-task/")
async def run_task(background_tasks: BackgroundTasks):
    background_tasks.add_task(long_running_task)  # 백그라운드에서 작업 수행
    return {"message": "Task is running in the background"}

 

위 코드는 background_tasks.add_task() 메소드를 이용해 Task에  비동기 함수를 추가한다.

백그라운드 동작을 하기 때문에 Task를 실행시작과 동시에, "Task is running in the background"가 출려된다.

 

2.2.4. 최상위 코드에서 실행

최상위 코드에서는 asyncio.run() 메소드를 이용해 비동기 함수를 실행한다.

import asyncio

async def example_async_function():
    return "Hello, Async World!"

# 메인 함수
async def main():
    result = await example_async_function()
    print(result)

# 비동기 루프 실행
asyncio.run(main())

 

2.2.6. 여러개의 비동기 함수 동시에 실행

asyncio.gather() 메소드 사용

import asyncio

async def task1():
    await asyncio.sleep(2)
    return "Task 1 result"

async def task2():
    await asyncio.sleep(1)
    return "Task 2 result"

async def main():
    results = await asyncio.gather(task1(), task2())
    print(results)

asyncio.run(main())

 

2.2.5. 콜백 실행

비동기 함수가 실행 완료 되었을 때, 후속 작업 추가

1. 루프를 생성한다.

2. Task를 만든다

3. 콜백 함수를 추가한다.

import asyncio

async def long_running_task():
    await asyncio.sleep(2)
    return "Task completed"

def callback(future):
    print(f"Callback: {future.result()}")

async def main():
    loop = asyncio.get_event_loop()

    # 비동기 작업을 Future로 실행
    future = loop.create_task(long_running_task())

    # 콜백 함수 등록
    future.add_done_callback(callback)

    await future  # Future의 완료를 기다림

asyncio.run(main())
반응형