인공지능 개발자 수다(유튜브 바로가기) 자세히보기

Fastapi

[FastAPI] Sqlalchemy와 CRUD (6)

Suda_777 2024. 9. 1. 02:51

목차

    반응형

     

    1. SQLAlchemy

    1.1.SQLAlchemy란?

    SQLAlchemy는 Python의 ORM(객체 관계 매핑) 라이브러리로, 데이터베이스와 Python 객체 간의 변환을 쉽게 할 수 있게 도와줍니다.

     

    ORM(객체 관계 매핑, Object-Relational Mapping)은 프로그래밍 언어의 객체지향적 패러다임과 관계형 데이터베이스의 테이블 간의 데이터를 변환하는 기술입니다. 쉽게 말해, ORM은 데이터베이스의 테이블과 프로그래밍 언어에서 사용하는 객체를 매핑하여 데이터베이스 작업을 코드 내에서 객체를 다루는 것처럼 편리하게 수행할 수 있도록 해줍니다.

     

    1.2. 데이터베이스별 비교

    이번 글에서는 Postgresql을 기준으로 글을 작성하겠습니다.

     

    SQLAlchemy를 사용할 때, 사용하는 데이터베이스가 달라지더라도 대부분의 기본적인 사용법은 동일합니다. SQLAlchemy는 데이터베이스 독립적인 ORM이므로, 모델 정의, 데이터베이스 세션 관리, CRUD 작업 등 기본적인 작업은 데이터베이스 종류와 상관없이 같은 방식으로 처리됩니다. 그러나 데이터베이스 종류에 따라 몇 가지 차이점이 있을 수 있습니다.

     

    각 데이터베이스별 sqlalchemy 사용에 있어서 달라지는 점 요약

    • 데이터베이스 별로 데이터 타입이 다른 경우가 있음(예: msyql - VARCHAR, postgresql - TEXT)
    • 각 데이터베이스가 지원하는 고유한 기능
    • 각 데이터베이스별 SQL 구문 차이

    2. 데이터베이스 설정

    2.1. 데이터베이스 URL 설정

    데이터베이스 URL은 데이터베이스에 연결하기 위한 주소이다. 이 URL은 다음과 같은 구성 요소로 이루어져 있다.

    • DBMS (데이터베이스 관리 시스템): 사용하는 데이터베이스의 종류 (예: SQLite, PostgreSQL, MySQL 등).
    • 사용자명과 비밀번호: 데이터베이스에 접근하기 위한 사용자 인증 정보.
    • 호스트 및 포트: 데이터베이스 서버가 실행 중인 호스트의 주소와 포트 번호.
    • 데이터베이스 이름: 연결하고자 하는 데이터베이스의 이름.

    예시

    • username: PostgreSQL 사용자명
    • password: PostgreSQL 비밀번호
    • localhost: 데이터베이스가 실행 중인 호스트
    • 5432: PostgreSQL의 기본 포트 번호
    • mydatabase: 사용할 데이터베이스 이름
    postgresql://user:password@localhost:5432/mydatabase

    2.2. 데이터베이스 연결 구성

    SQLAlchemy의 create_engine 함수는 데이터베이스와의 연결을 설정하는 역할.

    이 함수는 데이터베이스 URL을 입력으로 받아 데이터베이스와의 연결 객체를 생성한다.

    from sqlalchemy import create_engine
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy.orm import sessionmaker
    
    # 데이터베이스 URL
    DATABASE_URL = "postgresql://username:password@localhost:5432/mydatabase"
    
    # 데이터베이스 엔진 생성
    engine = create_engine(DATABASE_URL)
    
    # 세션 로컬 생성
    SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
    
    # 베이스 클래스 생성
    Base = declarative_base()

     

    • engine: SQLAlchemy에서 데이터베이스와의 연결을 관리하는 객체입니다.
    • SessionLocal: 데이터베이스 세션을 관리하기 위한 세션 로컬 클래스입니다.
    • Base: 모든 ORM 모델이 상속받아야 하는 기본 클래스입니다.

    3. SQLAlchemy 모델 정의

    PostgreSQL 데이터베이스에서 사용할 테이블을 나타내는 모델을 정의합니다.

    • 데이터베이스 설정에서 만든 Base 클래스를 상속받는다.
    • __tablename__ 에 테이블의 이름을 넣어준다.
    • 각 클래스변수는 컬럼에 대응된다.
    • Columns 을 이용해 컬럼을 만들어준다.
      • Column의 첫 번째 인자는 해당 컬럼의 데이터 타입: Integer, String, Text, Boolean, DateTime, Float, BigInteger
      • primary_key
      • nullable
      • unique
      • default: 기본값 지정
      • index: index=True로 설정하면 해당 컬럼에 인덱스를 생성하여 데이터 조회 속도를 향상
      • autoincrement: autoincrement=True로 설정하면, 정수형 컬럼의 값이 자동으로 증가하도록 설정할 수 있습니다. 보통 기본 키와 함께 사용
      • server_default: 데이터베이스 수준에서 기본값을 설정할 때 사용
      • onupdate: 행이 업데이트될 때 자동으로 설정할 값을 지정
      • comment: 컬럼에 대한 주석
    • ForeignKey: 다른 테이블의 컬럼을 참조하여 외래 키 제약을 설정
      • 테이블명.컬럼명 형태로 어떤 컬럼을 참조할지 정의함
    • relationship : 테이블 간의 관계를 정의하는 데 사용. 외래키 관계를 관리해준다.
      • 관계가 있는 상대 클래스의 클래스명이 문자열로 들어감
      •  back_populates
        • 두 테이블 간의 관계를 명시적으로 정의할 때 사용됩니다.
        • 관계가 있는 상대방의 relationship 객체의 이름이 문자열로 들어감

     

    일반 모델 예시

    from sqlalchemy import Column, Integer, String, Boolean, DateTime
    from datetime import datetime
    
    class User(Base):
        __tablename__ = 'users'
        
        id = Column(Integer, primary_key=True, autoincrement=True)
        name = Column(String(50), nullable=False)
        email = Column(String(120), unique=True, nullable=False, index=True)
        is_active = Column(Boolean, default=True)
        created_at = Column(DateTime, default=datetime.utcnow)
        updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

     

    1:1 관계 예시

    from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey
    from sqlalchemy.orm import relationship
    from datetime import datetime
    
    class User(Base):
        __tablename__ = 'users'
        
        id = Column(Integer, primary_key=True, autoincrement=True)
        name = Column(String(50), nullable=False)
        email = Column(String(120), unique=True, nullable=False, index=True)
        is_active = Column(Boolean, default=True)
        created_at = Column(DateTime, default=datetime.utcnow)
        updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
        
        # Relationship to the Post table
        posts = relationship("Post", back_populates="owner")
    
    class Post(Base):
        __tablename__ = 'posts'
        
        id = Column(Integer, primary_key=True, autoincrement=True)
        title = Column(String(100), nullable=False)
        content = Column(String, nullable=False)
        created_at = Column(DateTime, default=datetime.utcnow)
        
        # ForeignKey to reference the User table
        owner_id = Column(Integer, ForeignKey('users.id'), nullable=False)
        
        # Relationship to the User table
        owner = relationship("User", back_populates="posts")

     

    N:M 관계에 있어서는 중간 테이블(association table)을 만들어야한다.

    from sqlalchemy import Table
    
    association_table = Table('association', Base.metadata,
        Column('student_id', Integer, ForeignKey('students.id')),
        Column('course_id', Integer, ForeignKey('courses.id'))
    )
    
    class Student(Base):
        __tablename__ = 'students'
        id = Column(Integer, primary_key=True, autoincrement=True)
        name = Column(String, nullable=False)
    
        courses = relationship("Course", secondary=association_table, back_populates="students")
    
    class Course(Base):
        __tablename__ = 'courses'
        id = Column(Integer, primary_key=True, autoincrement=True)
        title = Column(String, nullable=False)
    
        students = relationship("Student", secondary=association_table, back_populates="courses")

     

    아래 코드를 실행하면, 정의된 모든 모델을 기반으로 데이터베이스에 테이블을 생성한다.

    Base.metadata.create_all(bind=engine)

     


    4. CRUD 작업 구현

    4.1. 데이터베이스 세션

    • 세션(Session): SQLAlchemy에서 세션은 데이터베이스와의 연결을 나타내며, 데이터베이스에 대한 모든 작업(CRUD 작업 포함)은 이 세션을 통해 이루어집니다. 세션은 데이터베이스 작업을 추적하고, 이를 하나의 트랜잭션으로 관리합니다.
    • 트랜잭션(Transaction): 세션은 트랜잭션을 관리합니다. 트랜잭션은 일련의 데이터베이스 작업을 하나의 단위로 묶어, 작업이 모두 성공적으로 완료되면 커밋(commit)하고, 실패하면 롤백(rollback)하는 역할을 합니다.

    4.1.1. 세션 생성하기

    세션은 sessionmaker를 사용하여 생성

    from sqlalchemy import create_engine
    from sqlalchemy.orm import sessionmaker
    
    # 데이터베이스 엔진 생성
    engine = create_engine("postgresql://username:password@localhost/dbname")
    
    # 세션 로컬 생성
    SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
    • autocommit=False: 자동으로 트랜잭션이 커밋되지 않도록 설정합니다. 즉, 명시적으로 commit()을 호출해야 데이터가 저장됩니다.
    • autoflush=False: 쿼리 실행 전에 자동으로 세션이 플러시(flush)되지 않도록 설정합니다. 플러시란, 세션 내의 변경사항이 데이터베이스에 반영되는 과정을 말합니다.
    • bind=engine: 세션이 사용할 데이터베이스 엔진을 설정합니다.

    4.1.2. 세션 사용

    세션을 사용하여 데이터베이스 작업을 수행.

    FastAPI와 같은 웹 프레임워크에서는 보통 의존성 주입을 통해 세션을 관리

    from fastapi import Depends, FastAPI
    from sqlalchemy.orm import Session
    
    app = FastAPI()
    
    # 세션 생성 함수
    def get_db():
        db = SessionLocal()
        try:
            yield db
        finally:
            db.close()
    
    # 예시 엔드포인트: 데이터베이스에 새로운 사용자 추가
    @app.post("/users/")
    def create_user(name: str, email: str, db: Session = Depends(get_db)):
        db_user = User(name=name, email=email)
        db.add(db_user)
        db.commit()
        db.refresh(db_user)  # 데이터베이스에서 새로 추가된 정보를 가져와 객체를 갱신합니다.
        return db_user
    • get_db 함수: 이 함수는 요청이 들어올 때마다 새로운 세션을 생성하고, 작업이 끝나면 세션을 종료(닫기)합니다. FastAPI의 Depends를 사용하여 이 함수를 엔드포인트에서 의존성 주입으로 사용합니다.
    • db.add(): 새 객체를 세션에 추가합니다.
    • db.commit(): 세션 내의 모든 변경사항을 데이터베이스에 커밋합니다.
    • db.refresh(): 데이터베이스에서 새로 추가된 정보를 객체에 반영합니다.
    • db.close(): 작업이 끝나면 세션을 종료하여 자원을 해제합니다.

    4.2. Create 작업

    Create 작업은 데이터베이스에 새로운 데이터를 추가하는 작업

     

    예시코드를 위한 모델 예시코드

    from sqlalchemy import Column, Integer, String
    from sqlalchemy.ext.declarative import declarative_base
    
    Base = declarative_base()
    
    class User(Base):
        __tablename__ = 'users'
        id = Column(Integer, primary_key=True, autoincrement=True)
        name = Column(String, nullable=False)
        email = Column(String, unique=True, nullable=False)

     

     

    4.2.1. 일반적인 create 작업

    Create 작업을 위해 데이터베이스에 저장할 새로운 객체를 생성. 그리고 객체 추가

    add() 메서드를 사용함

    new_user = User(name="John Doe", email="johndoe@example.com")
    db.add(new_user)
    db.commit()
    db.refresh(new_user)

     

    4.2.2. FastAPI에서의 create 작업

    from fastapi import FastAPI, Depends
    from sqlalchemy.orm import Session
    
    app = FastAPI()
    
    @app.post("/users/")
    def create_user(name: str, email: str, db: Session = Depends(get_db)):
        new_user = User(name=name, email=email)
        db.add(new_user)
        db.commit()
        db.refresh(new_user)
        return new_user

     

    4.3. Read 작업

    read 작업은 sql의 select문에 해당하기 때문에 고급 기술을 사용하기 위해서는 상당한 공부가 필요하므로 여기서는 기본적인 것만 공부하고 넘어가도록 합니다.

    4.3.1. 기본 조회 작업

    query() 메서드를 사용하여 조회작업을 함.

    • all(): 쿼리의 결과로 모든 레코드를 가져와서 리스트로 반환
    db.query(User).all()

    4.3.2. 필터링

    filter() 메소드를 사용한다

    db.query(User).filter(User.email == email).first()

    4.3.3. Join 연산

    db.query(User).filter(User.id == user_id).options(joinedload(User.posts)).first()

     

    4.3.4. FastAPI에서의 Read 작업

    from fastapi import FastAPI, Depends
    from sqlalchemy.orm import Session
    
    app = FastAPI()
    
    @app.get("/users/{user_id}")
    def read_user(user_id: int, db: Session = Depends(get_db)):
        user = db.query(User).filter(User.id == user_id).first()
        if user is None:
            raise HTTPException(status_code=404, detail="User not found")
        return user

     

    4.4. Update 작업

    4.4.1. 기본 Update 작업

    먼저, query() 메서드를 사용하여 수정할 데이터를 검색한다.

    다음으로, 조회한 객체의 속성을 변경한다.

    마지막으로 세션에 변경 사항을 반영한다.

    # 데이터 조회
    user = db.query(User).filter(User.id == user_id).first()
    
    # 이메일 주소를 새로운 값으로 수정
    user.email = new_email
    
    # 세션에 반영
    db.commit()

     

    4.4.2. FastAPI에서의 Update 예시

    • @app.put(): HTTP PUT 요청을 처리함. 주로 데이터를 전체적으로 업데이트(update)할 때 사용. 요청에서 지정된 리소스의 모든 필드를 업데이트해야 하며, 지정되지 않은 필드는 기본값이나 빈 값으로 대체될 수 있다.
    • status_code=status.HTTP_200_OK: 이 엔드포인트가 성공적으로 처리되었을 때, 기본적으로 200 OK 상태 코드를 반환
    def get_user_by_id(db: Session, user_id: int):
        return db.query(User).filter(User.id == user_id).first()
    from fastapi import FastAPI, Depends, HTTPException, status
    from sqlalchemy.orm import Session
    
    app = FastAPI()
    
    @app.put("/users/{user_id}", status_code=status.HTTP_200_OK)
    def update_user(user_id: int, new_email: str, db: Session = Depends(get_db)):
        user = get_user_by_id(db, user_id)
        if not user:
            raise HTTPException(status_code=404, detail="User not found")
        
        user.email = new_email
        try:
            db.commit()
        except:
            db.rollback()
            raise HTTPException(status_code=400, detail="Could not update user")
        
        return {"message": "Success"}

     

    4.5. Delete 작업

    4.5.1. 기본 Delete 작업

    먼저, 삭제할 데이터를 조회한다.

    다음으로 세션에 삭제할 객체를 추가한다.

    마지막으로 세션에 변경 사항을 반영한다.

    user = db.query(User).filter(User.id == user_id).first()
    db.delete(user)
    db.commit()

    4.5.2. FastAPI에서의 Delete 예시

    def get_user_by_id(db: Session, user_id: int):
        return db.query(User).filter(User.id == user_id).first()
    from fastapi import FastAPI, Depends, HTTPException, status
    from sqlalchemy.orm import Session
    
    app = FastAPI()
    
    @app.delete("/users/{user_id}", status_code=status.HTTP_200_OK)
    def delete_user(user_id: int, db: Session = Depends(get_db)):
        user = get_user_by_id(db, user_id)
        if not user:
            raise HTTPException(status_code=404, detail="User not found")
        
        db.delete(user)
        db.commit()
        
        return {"message": "User deleted successfully"}

    5. 비동기 SQLAlchemy

    SQLAlchemy 1.4 버전부터 비동기 기능이 도입

    asyncpg: 비동기 SQLAlchemy와 함께 사용되는 비동기 PostgreSQL 드라이버

    5.1. 비동기 SQLAlchemy의 주요 구성 요소

    • create_async_engine 함수를 사용하여 비동기 데이터베이스 연결을 생성
    • echo=True: SQLAlchemy가 실행하는 SQL 쿼리를 출력하도록 설정합니다(디버깅 용도).
    • sessionmaker를 AsyncSession과 함께 사용
    from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
    from sqlalchemy.orm import sessionmaker
    
    DATABASE_URL = "postgresql+asyncpg://user:password@localhost/mydatabase"
    
    async_engine = create_async_engine(DATABASE_URL, echo=True)
    
    async_session = sessionmaker(
        bind=async_engine,
        class_=AsyncSession,
        expire_on_commit=False
    )

     

    5.2. 비동기 엔드포인트 작성

    • with 구문을 이용해 close() 메소드를 더이상 사용할 필요가 없음
    • async def create_user: 엔드포인트 함수를 비동기 함수로 정의합니다.
    • async with async_session() as session: 세션을 열고 닫는 작업을 비동기로 처리하는 이유는 데이터베이스와의 상호작용이 I/O 바운드 작업이기 때문
    • await db.commit(): 트랜잭션 커밋을 비동기적으로 수행합니다.
    • await db.refresh(new_user): 데이터베이스에서 객체를 갱신합니다.
    • await db.execute(): SQLAlchemy의 execute 메서드를 비동기적으로 실행하여 쿼리를 수행합니다.
    from fastapi import FastAPI, Depends
    from sqlalchemy.ext.asyncio import AsyncSession
    from sqlalchemy.future import select
    
    app = FastAPI()
    
    async def get_db():
        async with async_session() as session:
            yield session
    
    @app.post("/users/", status_code=status.HTTP_200_OK)
    async def create_user(name: str, email: str, db: Session = Depends(get_db)):
        new_user = User(name=name, email=email)
        db.add(new_user)
        await db.commit()
        await db.refresh(new_user)
        return {"message": "User created successfully"}
    
    @app.get("/users/{user_id}", status_code=status.HTTP_200_OK)
    async def read_user(user_id: int, db: AsyncSession = Depends(get_db)):
        result = await db.execute(select(User).filter(User.id == user_id))
        user = result.scalars().first()
        if user is None:
            raise HTTPException(status_code=404, detail="User not found")
        return user
        
    @app.put("/users_async/{user_id}", status_code=status.HTTP_200_OK)
    async def update_user_async(user_id: int, new_email: str, db: AsyncSession = Depends(get_db_async)):
        result = await db.execute(select(User).filter(User.id == user_id))
        user = result.scalars().first()
        
        if not user:
            raise HTTPException(status_code=404, detail="User not found")
        
        user.email = new_email
        await db.commit()  # 비동기 커밋
        return {"message": "User updated successfully"}
        
    @app.delete("/users_async/{user_id}", status_code=status.HTTP_200_OK)
    async def delete_user_async(user_id: int, db: AsyncSession = Depends(get_db_async)):
        result = await db.execute(select(User).filter(User.id == user_id))
        user = result.scalars().first()
        
        if not user:
            raise HTTPException(status_code=404, detail="User not found")
        
        await db.delete(user)  # 비동기 삭제
        await db.commit()  # 비동기 커밋
        return {"message": "User deleted successfully"}

     

     

     

     

     

     

     

     

     

     

    반응형

    'Fastapi' 카테고리의 다른 글

    [FastAPI] JWT 기반 인증 (7)  (0) 2024.09.01
    [FastAPI] Pydantic 사용법 (5)  (0) 2024.08.31
    [FastAPI] 비동기(Asynchronous)프로그래밍 (4)  (0) 2024.08.24
    [FastAPI] HTML 사용해 보기 (3)  (0) 2024.08.22
    [FastAPI] get post (2)  (0) 2024.08.17