인공지능 개발자 수다(유튜브 바로가기) 자세히보기

Fastapi

[FastAPI] SQLAlchemy 상세 - Create/Insert (6-1)

Suda_777 2024. 12. 31. 22:04

목차

    반응형

    현재 글에서는 SQLAlchemy 2.0 기준으로 글을 작성하겠다.

     

    CRUD는 크게 두가지 방법이 있다.

    • ORM 방식
      • 객체 중심적으로 작업해야 하는 경우
      • 데이터베이스와 python 객체의 상태 동기화가 필요한 경우
    • SQL 방식
      • 대량의 데이터를 처리하거나 성능이 중요한 경우
      • ORM이 필요 없는 아주 간단한 작업

    두가지 방법을 비교하며 작성하겠다.

     

    이번 시간에는 Insert 에 대해 공부해 보자.

     


    1. ORM 방식

    1.1. ORM 기본 방법

    • 사용 함수: db.add()
    • 단점: 대량 데이터 삽입에는 비효율적일 수 있음(대량 작업에서는 SQL 방식 추천).
    from sqlalchemy.ext.asyncio import AsyncSession
    from models import User  # ORM 모델
    from database import get_db
    
    
    async def insert_user(name: str, email: str, db: AsyncSession):
        # 1. 모델 객체 생성
        new_user = User(name=name, email=email)
    
        # 2. 세션에 추가
        db.add(new_user)
    
        await db.commit()
        await db.refresh(new_user)
        return new_user

     

    1.2. ORM 중복 데이터 처리

    1.2.1. 수동 확인

    데이터가 존재하는지 수동으로 확인한다.

    stmt = select(User).where(User.email == "example@example.com")
    result = await db.execute(stmt)
    existing_user = result.scalars().first()
    
    if not existing_user:
        new_user = User(name="John", email="example@example.com")
        db.add(new_user)
        await db.commit()
    else:
        print("User already exists!")

     

    1.2.2. merge() 함수 사용

    merge() 함수는 기존 값이 존재하면 update 하고, 없으면 insert를 한다

    new_user = User(id=1, name="John", email="example@example.com")
    merged_user = db.merge(new_user)  # 기존 데이터를 업데이트하거나 새로운 데이터를 추가
    await db.commit()

     

     


    2. SQL 방식

    2.1. 단일 데이터 삽입

    • 사용 함수: insert()
    • 설명: 단일 데이터 삽입
    from sqlalchemy import insert
    from models import User  # SQLAlchemy ORM 모델
    from database import get_db  # 세션 의존성
    
    # 비동기 함수 예제
    async def insert_user_sql(name: str, email: str, db):
        # 1. INSERT 문 생성
        stmt = insert(User).values(name=name, email=email)
    
        await db.execute(stmt)
        await db.commit()

     

    2.2. 다량의 데이터 삽입

    • 사용 함수: insert()
    • 설명: list[dict] 형식으로 데이터를 다량으로 받는다.
    async def insert_multiple_users_sql(users: list[dict], db):
        # 1. INSERT 문 생성
        stmt = insert(User)
    
        await db.execute(stmt, users)
        await db.commit()
    async def main():
        async with get_db() as db:
            users = [
                {"name": "Alice", "email": "alice@example.com"},
                {"name": "Bob", "email": "bob@example.com"}
            ]
            await insert_multiple_users_sql(users, db)

     

    2.3. 중복 데이터 처리

    2.3.1. 중복 발생시 작업 중지

    postgresql 기준

    • import 시 postgresql을 불러온다
    • on_conflict_do_nothing() 메서드 사용
    from sqlalchemy.dialects.postgresql import insert
    
    stmt = (
        insert(User)
        .values(id=1, name="John", email="example@example.com")
        .on_conflict_do_nothing(index_elements=["id"])  # 'id' 중복 시 아무 작업도 하지 않음
    )

     

    mysql 기준

    • import 시 mysql을 불러온다
    • on_duplicate_key_update() 메서드 사용
    from sqlalchemy.dialects.mysql import insert
    
    stmt = (
    	insert(User)
            .values(id=id, name=name, email=email)
            .on_duplicate_key_update(
                name=name,  # 중복 발생 시 업데이트할 필드
                email=email
            )
        )

     

    2.3.2. 중복 발생시 업데이트

     

    postgresql 기준

    • on_conflict_do_update() 메서드 사용
    stmt = (
        insert(User)
        .values(id=1, name="John", email="new_email@example.com")
        .on_conflict_do_update(
            index_elements=["id"],  # 'id' 중복 시 처리
            set_={"email": "new_email@example.com"}  # 업데이트할 컬럼
        )
    )

     

    mysql 기준

    • on_duplicate_key_update() 메서드 사용
    stmt = (
            insert(User)
            .values(id=id, name=name, email=email)
            .on_duplicate_key_update(
                name=name,  # 중복 발생 시 업데이트할 필드
                email=email
            )
        )

     


    3. FastAPI 예시

    @app.post("/users/")
    async def create_user(name: str, email: str, db: AsyncSession = Depends(get_db)):
        stmt = (
            insert(User)
            .values(name=name, email=email)
            .on_conflict_do_nothing(index_elements=["email"])  # 'email' 중복 시 무시
        )
        result = await db.execute(stmt)
        await db.commit()
    
        if result.rowcount == 0:
            raise HTTPException(status_code=400, detail="User already exists")
    
        return {"message": "User created successfully"}

     

    반응형