인공지능 개발자 수다(유튜브 바로가기) 자세히보기

Fastapi

[FastAPI] Sqlalchemy와 CRUD (6)

Suda_777 2024. 12. 31. 21:40

목차

    반응형

     

    1. SQLAlchemy

    1.1.SQLAlchemy란?

    SQLAlchemy는 Python의 ORM(객체 관계 매핑) 라이브러리로, 데이터베이스와 Python 객체 간의 변환을 쉽게 할 수 있게 도와줍니다.

     

    ORM(객체 관계 매핑, Object-Relational Mapping)은 프로그래밍 언어의 객체지향적 패러다임과 관계형 데이터베이스의 테이블 간의 데이터를 변환하는 기술입니다. 쉽게 말해, ORM은 데이터베이스의 테이블과 프로그래밍 언어에서 사용하는 객체를 매핑하여 데이터베이스 작업을 코드 내에서 객체를 다루는 것처럼 편리하게 수행할 수 있도록 해줍니다.

     

    1.2. 데이터베이스별 비교

    이번 글에서는 Postgresql을 기준으로 글을 작성하겠습니다.

     

    SQLAlchemy를 사용할 때, 사용하는 데이터베이스가 달라지더라도 대부분의 기본적인 사용법은 동일합니다. SQLAlchemy는 데이터베이스 독립적인 ORM이므로, 모델 정의, 데이터베이스 세션 관리, CRUD 작업 등 기본적인 작업은 데이터베이스 종류와 상관없이 같은 방식으로 처리됩니다. 그러나 데이터베이스 종류에 따라 몇 가지 차이점이 있을 수 있습니다.

     

    각 데이터베이스별 sqlalchemy 사용에 있어서 달라지는 점 요약

    • 데이터베이스 별로 데이터 타입이 다른 경우가 있음(예: msyql - VARCHAR, postgresql - TEXT)
    • 각 데이터베이스가 지원하는 고유한 기능
    • 각 데이터베이스별 SQL 구문 차이

    2. 데이터베이스 설정

    2.1. 데이터베이스 URL 설정

    데이터베이스 URL은 데이터베이스에 연결하기 위한 주소이다. 이 URL은 다음과 같은 구성 요소로 이루어져 있다.

    • DBMS (데이터베이스 관리 시스템): 사용하는 데이터베이스의 종류 (예: SQLite, PostgreSQL, MySQL 등).
    • 사용자명과 비밀번호: 데이터베이스에 접근하기 위한 사용자 인증 정보.
    • 호스트 및 포트: 데이터베이스 서버가 실행 중인 호스트의 주소와 포트 번호.
    • 데이터베이스 이름: 연결하고자 하는 데이터베이스의 이름.

    예시

    • username: PostgreSQL 사용자명
    • password: PostgreSQL 비밀번호
    • localhost: 데이터베이스가 실행 중인 호스트
    • 5432: PostgreSQL의 기본 포트 번호
    • mydatabase: 사용할 데이터베이스 이름
    postgresql://user:password@localhost:5432/mydatabase

    2.2. 데이터베이스 연결 구성

    • create_engine() : 이 함수는 데이터베이스와의 연결을 설정하는 역할. 이 함수는 데이터베이스 URL을 입력으로 받아 데이터베이스와의 연결 객체를 생성한다.
    • sessionmaker() : 이 함수는 engine을 입력으로 받아, 데이터베이스와의 세션을 만들어 준다.
    • Base = declarative_base() : sqlalchemy의 모델이 상속받아 사용할 기본 클래스
    from sqlalchemy import create_engine
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy.orm import sessionmaker
    
    # 데이터베이스 URL
    DATABASE_URL = "postgresql://username:password@localhost:5432/mydatabase"
    
    # 데이터베이스 엔진 생성
    engine = create_engine(DATABASE_URL)
    
    # 세션 로컬 생성
    SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
    
    # 베이스 클래스 생성
    Base = declarative_base()

     

    코드 설명

    • engine: SQLAlchemy에서 데이터베이스와의 연결을 관리하는 객체입니다.
    • SessionLocal: 데이터베이스 세션을 관리하기 위한 세션 로컬 클래스입니다.
    • Base: 모든 ORM 모델이 상속받아야 하는 기본 클래스입니다.

    3. SQLAlchemy 모델 정의

    3.1. 기본 모델 정의하기

    PostgreSQL 데이터베이스에서 사용할 테이블을 나타내는 모델을 정의합니다.

    • 데이터베이스 설정에서 만든 Base 클래스를 상속받는다.
    • __tablename__ 에 테이블의 이름을 넣어준다.
    • Columns 을 이용해 컬럼을 만들어준다.
      • Column의 첫 번째 인자는 해당 컬럼의 데이터 타입: Integer, String, Text, Boolean, DateTime, Float, BigInteger
      • primary_key
      • nullable
      • unique
      • default: 기본값 지정
      • index: index=True로 설정하면 해당 컬럼에 인덱스를 생성하여 데이터 조회 속도를 향상
      • autoincrement: autoincrement=True로 설정하면, 정수형 컬럼의 값이 자동으로 증가하도록 설정할 수 있습니다. 보통 기본 키와 함께 사용
      • server_default: 데이터베이스 수준에서 기본값을 설정할 때 사용
      • onupdate: 행이 업데이트될 때 자동으로 설정할 값을 지정
      • comment: 컬럼에 대한 주석

    SQLAlchemy Model 예시 코드

    from sqlalchemy import Column, Integer, String, Boolean, DateTime
    from datetime import datetime
    
    class User(Base):
        __tablename__ = 'users'
        
        id = Column(Integer, primary_key=True, autoincrement=True)
        name = Column(String(50), nullable=False)
        email = Column(String(120), unique=True, nullable=False, index=True)
        is_active = Column(Boolean, default=True)
        created_at = Column(DateTime, default=datetime.utcnow)
        updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

     

    3.2. 외래키 (Foreign Key) 정의하기

    • ForeignKey: 다른 테이블의 컬럼을 참조하여 외래 키 제약을 설정
      • 테이블명.컬럼명 형태로 어떤 컬럼을 참조할지 정의함
      • 외래키를 정의할 때에는 자식 테이블에 정의해 준다.
    from sqlalchemy import ForeignKey
    
    owner_id = Column(Integer, ForeignKey('users.id'), nullable=False)
    • relationship : 테이블 간의 관계를 정의하는 데 사용. 외래키 관계를 관리해준다.
      • 테이블1과 테이블2 서로를 relationship으로 참조, 양쪽 테이블 모두 정의해 줘야 함.
      • 1:1, 1:N 관계
        • one-to-one 관계를 명시: uselist=False
        • one-to-many 관계를 명시: uselist=True
      • cascade
        • "save-update": 부모가 저장되면 자식도 저장됨 (기본값).
        • "delete": 부모 삭제 시 자식도 삭제.
        • "all": 모든 작업에 대해 부모-자식 관계를 동기화.
    from sqlalchemy.orm import relationship
    
    name = relationship("상대 클래스이름",
    	back_populates="상대클래스의 relationship 객체이름",
    	cascade="all, delete"
    )

     

    3.3. 1:1 관계 예시

    from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey
    from sqlalchemy.orm import relationship
    from datetime import datetime
    
    class User(Base):
        __tablename__ = 'users'
        
        id = Column(Integer, primary_key=True, autoincrement=True)
        name = Column(String(50), nullable=False)
        email = Column(String(120), unique=True, nullable=False, index=True)
        is_active = Column(Boolean, default=True)
        created_at = Column(DateTime, default=datetime.utcnow)
        updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
        
        # Relationship to the Post table
        posts = relationship("Post", back_populates="owner")
    
    class Post(Base):
        __tablename__ = 'posts'
        
        id = Column(Integer, primary_key=True, autoincrement=True)
        title = Column(String(100), nullable=False)
        content = Column(String, nullable=False)
        created_at = Column(DateTime, default=datetime.utcnow)
        
        # ForeignKey to reference the User table
        owner_id = Column(Integer, ForeignKey('users.id'), nullable=False)
        
        # Relationship to the User table
        owner = relationship("User", back_populates="posts")

     

    3.4. N:N 관계 예시

    N:M 관계에 있어서는 중간 테이블(association table)을 만들어야한다.

    from sqlalchemy import Table
    
    association_table = Table('association', Base.metadata,
        Column('student_id', Integer, ForeignKey('students.id')),
        Column('course_id', Integer, ForeignKey('courses.id'))
    )
    
    class Student(Base):
        __tablename__ = 'students'
        id = Column(Integer, primary_key=True, autoincrement=True)
        name = Column(String, nullable=False)
    
        courses = relationship("Course", secondary=association_table, back_populates="students")
    
    class Course(Base):
        __tablename__ = 'courses'
        id = Column(Integer, primary_key=True, autoincrement=True)
        title = Column(String, nullable=False)
    
        students = relationship("Student", secondary=association_table, back_populates="courses")

     

    아래 코드를 실행하면, 정의된 모든 모델을 기반으로 데이터베이스에 테이블을 생성한다.

    Base.metadata.create_all(bind=engine)

     


    4. CRUD 작업 구현

    4.1. 데이터베이스 세션

    • 세션(Session): SQLAlchemy에서 세션은 데이터베이스와의 연결을 나타내며, 데이터베이스에 대한 모든 작업(CRUD 작업 포함)은 이 세션을 통해 이루어집니다. 세션은 데이터베이스 작업을 추적하고, 이를 하나의 트랜잭션으로 관리합니다.
    • 트랜잭션(Transaction): 세션은 트랜잭션을 관리합니다. 트랜잭션은 일련의 데이터베이스 작업을 하나의 단위로 묶어, 작업이 모두 성공적으로 완료되면 커밋(commit)하고, 실패하면 롤백(rollback)하는 역할을 합니다.

    4.1.1. 세션 생성하기

    sessionmaker() : 세션 생성 함수

    from sqlalchemy import create_engine
    from sqlalchemy.orm import sessionmaker
    
    # 데이터베이스 엔진 생성
    engine = create_engine("postgresql://username:password@localhost/dbname")
    
    # 세션 로컬 생성
    SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
    • autocommit=False: 자동으로 트랜잭션이 커밋되지 않도록 설정합니다. 즉, 명시적으로 commit()을 호출해야 데이터가 저장됩니다.
    • autoflush=False: 쿼리 실행 전에 자동으로 세션이 플러시(flush)되지 않도록 설정합니다. 플러시란, 세션 내의 변경사항이 데이터베이스에 반영되는 과정을 말합니다.
    • bind=engine: 세션이 사용할 데이터베이스 엔진을 설정합니다.

    4.1.2. 세션 사용

    FastAPI와 같은 웹 프레임워크에서는 보통 의존성 주입을 통해 세션을 관리

    from fastapi import Depends, FastAPI
    from sqlalchemy.orm import Session
    
    app = FastAPI()
    
    # 세션 생성 함수
    def get_db():
        db = SessionLocal()
        try:
            yield db
        finally:
            db.close()
    
    # 예시 엔드포인트: 데이터베이스에 새로운 사용자 추가
    @app.post("/users/")
    def create_user(name: str, email: str, db: Session = Depends(get_db)):
        db_user = User(name=name, email=email)
        db.add(db_user)
        db.commit()
        db.refresh(db_user)  # 데이터베이스에서 새로 추가된 정보를 가져와 객체를 갱신합니다.
        return db_user
    • get_db 함수: 이 함수는 요청이 들어올 때마다 새로운 세션을 생성하고, 작업이 끝나면 세션을 종료(닫기)합니다. FastAPI의 Depends를 사용하여 이 함수를 엔드포인트에서 의존성 주입으로 사용합니다.
    • db.add(): 새 객체를 세션에 추가합니다.
    • db.commit(): 세션 내의 모든 변경사항을 데이터베이스에 커밋합니다.
    • db.refresh(): 데이터베이스에서 새로 추가된 정보를 객체에 반영합니다.
    • db.close(): 작업이 끝나면 세션을 종료하여 자원을 해제합니다.

    4.2.  Create 하기

    Create은 데이터베이스에 새로운 데이터를 추가하는 작업

    SQL 문의 Insert에 해당 된다.

    4.2.1. 일반적인  작업

    Create 작업을 위해 데이터베이스에 저장할 새로운 객체를 생성.

    from sqlalchemy import Column, Integer, String
    from sqlalchemy.ext.declarative import declarative_base
    
    Base = declarative_base()
    
    class User(Base):
        __tablename__ = 'users'
        id = Column(Integer, primary_key=True, autoincrement=True)
        name = Column(String, nullable=False)
        email = Column(String, unique=True, nullable=False)
    • db.add(객체) : 데이터를 insert함
    • db.commit() : commit
    • db.refresh() : 데이터베이스에서 자동으로 생성되는 값(예: id, created_at 등)이 모델 객체에 반영되지 않을 때 db.refresh()를 호출하여 해당 값을 동기화합니다.
    new_user = User(name="John Doe", email="johndoe@example.com")
    db.add(new_user)
    db.commit()
    db.refresh(new_user)
    db.close()

     

    4.2.2. FastAPI에서의 Create 작업

    FastAPI에서 insert 작업은 Post를 이용한다.

    from fastapi import FastAPI, Depends
    from sqlalchemy.orm import Session
    
    app = FastAPI()
    
    @app.post("/users/")
    def create_user(name: str, email: str, db: Session = Depends(get_db)):
        new_user = User(name=name, email=email)
        db.add(new_user)
        db.commit()
        db.refresh(new_user)
        return new_user

     

    4.3. Read 작업

    read 작업은 sql의 select문에 해당하기 때문에 고급 기술을 사용하기 위해서는 상당한 공부가 필요하므로 여기서는 기본적인 것만 공부하고 넘어가도록 한다.

    4.3.1. 기본 조회 작업

    • select() : 함수로 쿼리를 작성
    • result = db.execute() : 쿼리 실행
    • result.scalars() : ORM 모델 객체나 특정 컬럼 값을 가져옴
      • select(User)와 같은 ORM 쿼리를 사용할 때, 결과는 User 객체를 가져옴
      • select(User.name)와 같이 특정 컬럼만 선택한 경우, scalars()는 그 컬럼 값만 반환
    • all() : 결과를 리스트 형태로 변환
    from sqlalchemy import select
    
    stmt = select(User)
    result = db.execute(stmt)  # 쿼리 실행
    users = result.scalars().all()
    
    db.close()

    4.3.2. where 문

    where() 메소드를 사용한다

    stmt = select(User).where(User.name == "John Doe")
    result = db.execute(stmt)
    user = result.scalars().all()
    
    db.close()

     

    4.3.4. FastAPI에서의 Read 작업

    FastAPI 에서 Read 작업은 Get을 이용해 준다.

    @app.get("/users/{user_id}")
    def read_user(user_name: int, db: Session = Depends(get_db)):
        stmt = select(User).where(User.name == user_name)
        result = db.execute(stmt)
        user = result.scalars().all()
        if user is None:
            raise HTTPException(status_code=404, detail="User not found")
        return user

     

    4.4. Update 작업

    4.4.1. 기본 Update 작업

    • update() : 함수를 사용한다. 
    • values() : 함수에 변경할 값을 넣어준다.
    from sqlalchemy import update
    
    stmt = (
        update(User)  # User 테이블 업데이트
        .where(User.name == "John Doe")  # 조건: 이름이 "John Doe"
        .values(email="john.doe@example.com")  # 수정할 값
    )
    
    db.execute(stmt)
    db.commit()
    db.close()

     

    4.4.2. FastAPI에서의 Update 예시

    • @app.put():자원의 모든 속성을 업데이트해야 하는 경우 사용.
    • @app.patch() : 자원의 일부 속성만 업데이트할 때 사용.

    아래 예시코드는 patch를 사용해, 데이터의 일부만 수정합니다.

    • dict(exclude_unset=True): 값이 없는 필드는 제외하고 딕셔너리로 변경함.
    # Pydantic 모델 정의
    class UserUpdate(BaseModel):
        name: str
        email: str
    
    @app.patch("/users/{user_id}")
    def partial_update_user(user_id: int, user_update: UserUpdate, db: Session = Depends(get_db)):
        # 업데이트할 필드만 동적으로 설정
        update_data = user_update.dict(exclude_unset=True)
    
        stmt = update(User).where(User.id == user_id).values(**update_data)
        result = db.execute(stmt)
        db.commit()
    
        # 업데이트 결과 확인
        if result.rowcount == 0:
            raise HTTPException(status_code=404, detail="User not found")
    
        return {"id": user_id, **update_data}

     

    4.5. Delete 작업

    4.5.1. 기본 Delete 작업

    delete() 함수를 사용한다.

    from sqlalchemy import delete
    
    stmt = delete(User).where(User.id == 1)
    result = db.execute(stmt)
    db.commit()
    db.close()

    4.5.2. FastAPI에서의 Delete 예시

    @app.delete 를 사용한다.

    @app.delete("/users/{user_id}")
    def delete_user(user_id: int, db: Session = Depends(get_db)):
        stmt = delete(User).where(User.id == user_id)
        result = db.execute(stmt)
        db.commit()
    
        if result.rowcount == 0:
            raise HTTPException(status_code=404, detail="User not found")
        
        return {"message": f"User with id {user_id} deleted successfully"}

    5. 비동기 SQLAlchemy

    asyncpg: 비동기 SQLAlchemy와 함께 사용되는 비동기 PostgreSQL 드라이버

    어떤 부분에서 await를 사용하는지 확인하자!

    5.1. 비동기 SQLAlchemy의 주요 구성 요소

    • create_async_engine() 함수를 사용하여 비동기 데이터베이스 연결을 생성
    • sessionmaker대신 async_sessionmaker()를 사용
    from sqlalchemy.ext.asyncio import create_async_engine
    from sqlalchemy.ext.asyncio import async_sessionmaker
    from sqlalchemy.ext.asyncio import AsyncSession
    from sqlalchemy.ext.declarative import declarative_base
    
    # 데이터베이스 연결
    DATABASE_URL = "postgresql+asyncpg://user:password@localhost/mydatabase"
    engine = create_async_engine(DATABASE_URL)
    
    # 모델 베이스
    Base = declarative_base()
    
    # 세션 생성기
    Async_session = async_sessionmaker(engine)
    
    # 의존성 주입
    async def get_session() -> AsyncSession:
        async with Async_session() as session:
            yield session

     

    5.2. 비동기 엔드포인트 작성

    • async def create_user: 엔드포인트 함수를 비동기 함수로 정의합니다.
    • async with async_session() as session: 세션을 열고 닫는 작업을 비동기로 처리하는 이유는 데이터베이스와의 상호작용이 I/O 바운드 작업이기 때문
    • await db.commit(): 트랜잭션 커밋을 비동기적으로 수행합니다.
    • await db.refresh(new_user): 데이터베이스에서 객체를 갱신합니다.
    • await db.execute(): SQLAlchemy의 execute 메서드를 비동기적으로 실행하여 쿼리를 수행합니다.
    from fastapi import FastAPI, Depends, HTTPException, status
    from sqlalchemy.ext.asyncio import AsyncSession
    from sqlalchemy.future import select
    from sqlalchemy import update, delete
    from pydantic import BaseModel
    from models import User  # SQLAlchemy User 모델
    from database import get_db  # AsyncSession 의존성
    
    app = FastAPI()
    
    # Pydantic 모델 정의
    class UserCreate(BaseModel):
        name: str
        email: str
    
    class UserUpdate(BaseModel):
        email: str
    
    
    # **데이터베이스 작업 함수 정의**
    async def add_user(db: AsyncSession, user: UserCreate):
        new_user = User(**user.dict())
        db.add(new_user)
        await db.commit()
        await db.refresh(new_user)
        return new_user
    
    
    async def get_user_by_id(db: AsyncSession, user_id: int):
        result = await db.execute(select(User).where(User.id == user_id))
        return result.scalars().first()
    
    
    async def update_user(db: AsyncSession, user_id: int, email: str):
        stmt = (
            update(User)
            .where(User.id == user_id)
            .values(email=email)
            .execution_options(synchronize_session="fetch")
        )
        result = await db.execute(stmt)
        await db.commit()
        return result.rowcount  # 업데이트된 행의 수 반환
    
    
    async def delete_user(db: AsyncSession, user_id: int):
        stmt = (
            delete(User)
            .where(User.id == user_id)
            .execution_options(synchronize_session="fetch")
        )
        result = await db.execute(stmt)
        await db.commit()
        return result.rowcount  # 삭제된 행의 수 반환
    
    
    # **FastAPI 엔드포인트 정의**
    
    @app.post("/users/", status_code=status.HTTP_200_OK)
    async def create_user(user: UserCreate, db: AsyncSession = Depends(get_db)):
        new_user = await add_user(db, user)
        return {"message": "User created successfully", "user": new_user}
    
    
    @app.get("/users/{user_id}", status_code=status.HTTP_200_OK)
    async def read_user(user_id: int, db: AsyncSession = Depends(get_db)):
        user = await get_user_by_id(db, user_id)
        if user is None:
            raise HTTPException(status_code=404, detail="User not found")
        return user
    
    
    @app.put("/users/{user_id}", status_code=status.HTTP_200_OK)
    async def update_user_endpoint(user_id: int, user_update: UserUpdate, db: AsyncSession = Depends(get_db)):
        updated_rows = await update_user(db, user_id, user_update.email)
        if updated_rows == 0:
            raise HTTPException(status_code=404, detail="User not found")
        return {"message": "User updated successfully"}
    
    
    @app.delete("/users/{user_id}", status_code=status.HTTP_200_OK)
    async def delete_user_endpoint(user_id: int, db: AsyncSession = Depends(get_db)):
        deleted_rows = await delete_user(db, user_id)
        if deleted_rows == 0:
            raise HTTPException(status_code=404, detail="User not found")
        return {"message": "User deleted successfully"}

     

    반응형