인공지능 개발자 수다(유튜브 바로가기) 자세히보기

Fastapi

[FastAPI] SQLAlchemy 상세 - Read/Select (6-2)

Suda_777 2025. 1. 1. 02:24

목차

    반응형

     

    select 방식에서는 굳이 복잡하게 SQL 방식을 사용할 필요는 없다.

    대신, 이번 페이지에서는 select의 다양한 기능에 대해 알아보자

    1. 기본 사용 방법

    기본적인 실행 순서는 아래와 같다.

    1. 쿼리 생성: select() 함수 이용
    2. 조건 추가 : .where() 를 사용해 필터 조건 추가
    3. 쿼리 실행 : db.execute() 로 쿼리를 실행
    4. 결과 처리

    아래는 result = db.execute() 코드 실행 이후에 결과처리를 위한 메서드 이다.

    메서드 반환값 설명
    .fetchall() 행의 리스트(list of Row objects) 결과의 모든 행을 반환,
    메모리 효율성이 중요하거나 단순 데이터 추출에 적합
    .fetchone() 첫 번째 행(Row object) 결과의 첫 번째 행(row)만 반환
    .scalar() 첫 번째 컬럼 값(scalar value) 결과의 첫 번째 행에서 첫 번째 컬럼 값만 반환
    .scalars() 스칼라 값 또는 ORM 객체의 제너레이터. 데이터를 한 번에 처리하지 않고, 필요할 때 한 개씩 값을 가져옴, 
    코드가 간결하고 Pythonic

     

    scalars() 메서드는 제너레이터 이기 때문에, 추가적으로 사용하는 메소드가 있음

    메서드 반환값 설명
    .scalars().all() 리스트 (list of ORM객체) 모든 결과를 가져올 때
    .scalars().first() ORM 객체 단일 결과 또는 첫 번째 행만 가져올 때
    .scalars().one() ORM 객체 결과가 정확히 하나만 있어야 하는 경우
    .scalars().one_or_none() 단일 객체 또는 None 결과가 없거나 하나만 있어야 하는 경우
    .scalars().fetchmany(size) 리스트 지정된 개수만큼 결과를 가져올 때
    .scalars().partitions() 리스트들 결과를 페이징 처리하거나 부분적으로 처리할 때

     

    all() 예시코드

    stmt = select(User)
    result = await db.execute(stmt)
    users = result.scalars().all()
    
    # ORM 객체를 딕셔너리 형태로 변환
    user_dicts = [{"id": user.id, "name": user.name, "email": user.email} for user in users]
    print(user_dicts)

     

    first() 예시코드

    stmt = select(User).where(User.email == "john@example.com")
    result = await db.execute(stmt)
    user = result.scalars().first()
    
    if user:
        print(user.name)  # 단일 결과의 속성 접근
    else:
        print("No user found")

     

    팁: scalars() 함수를 주로 가장 많이 사용한다.

     


    2. Where 사용 방법

    2.1. 단일 조건

    stmt = select(User).where(User.name == "John Doe")

     

    2.2. 연산자

    연산자 의미 사용 예시
    == 같다 User.id == 1
    != 다르다 User.name != "John Doe"
    < 작다 User.age < 30
    > 크다 User.age > 18
    <= 작거나 같다 User.age <= 30
    >= 크거나 같다 User.age >= 18

     

    2.3. 기타 연산

    LIKE

    stmt = select(User).where(User.email.like("%example.com"))

     

    In / Not In 

    stmt = select(User).where(User.id.in_([1, 2, 3]))
    stmt = select(User).where(User.id.notin_([1, 2, 3]))

     

    Is / Is Not

    stmt = select(User).where(User.email.is_(None))
    stmt = select(User).where(User.email.isnot(None))

     

    Contains

    stmt = select(User).where(User.email.contains("@example.com"))

     

    Between

    stmt = select(User).where(User.age.between(18, 30))

     

    Is Null 조건

    stmt = select(User).where(User.email == None)  # 또는 User.email.is_(None)

     

    2.4. and, or, not

    and는 , 로 구분한다.

    stmt = select(User).where(User.name == "John Doe", User.email.like("%example.com"))

    or 은 or_() 를 사용한다.

    from sqlalchemy import or_
    
    stmt = select(User).where(or_(User.name == "John Doe", User.name == "Jane Doe"))

     

    not은 ~ 연산자를 사용한다.

    stmt = select(User).where(~(User.name == "John Doe"))

     

    2.5. 동적 where 문 설정

    from sqlalchemy.future import select
    
    async def get_filtered_users(filters: dict, db):
        # 조건 리스트 생성
        conditions = []
        if "name" in filters:
            conditions.append(User.name.like(f"%{filters['name']}%"))
        if "email" in filters:
            conditions.append(User.email.like(f"%{filters['email']}%"))
        if "age_min" in filters:
            conditions.append(User.age >= filters["age_min"])
        if "age_max" in filters:
            conditions.append(User.age <= filters["age_max"])
    
        # 동적으로 조건 추가
        stmt = select(User).where(*conditions)
        result = await db.execute(stmt)
        return result.scalars().all()

     

     


    3. order by

    3.1. 오름차순 / 내림차순

    오름차순

    stmt = select(User).order_by(User.name.asc())

     

    내림차순

    stmt = select(User).order_by(User.name.desc())

     

    3.2. 다중 컬럼

    stmt = select(User).order_by(User.name.asc(), User.age.desc())

     

    3.3. 동적 정렬 설정

    클라이언트로부터 정렬 조건을 입력받는 경우

    - 동적 컬럼 설정

    - 동적 정렬 방향 설정

    def get_ordered_users(order_by_field: str, ascending: bool, db: AsyncSession):
        field = getattr(User, order_by_field)  # 동적으로 컬럼 선택
        order = field.asc() if ascending else field.desc()  # 정렬 방향 설정
        stmt = select(User).order_by(order)
        result = await db.execute(stmt)
        return result.scalars().all()
    
    # 호출
    users = await get_ordered_users("age", False, db)  # 나이 내림차순

    4. limit, offset, distinct

    limit

    stmt = select(User).limit(10)

     

    offset (limit과 함께 사용가능)

    stmt = select(User).limit(10).offset(10)

     

    distinct

    stmt = select(User.email).distinct()

     


    5. SQL 함수 호출

    SQLAlchemy의 func를 사용하여 SQL 함수 호출.

     

    count 함수

    from sqlalchemy import func
    
    stmt = select(func.count(User.id))

     


    6. 서브 쿼리

    SELECT 내부에 또 다른 SELECT 사용.

    subquery = select(User.id).where(User.age > 18).subquery()
    
    stmt = select(Post).where(Post.user_id.in_(subquery))

     


    7. Group by 와 Having

    7.1. group by와 함수 사용

     

    COUNT

    from sqlalchemy import func
    
    stmt = select(User.age, func.count(User.id)).group_by(User.age)

     

    SUM

    stmt = select(User.department, func.sum(User.salary)).group_by(User.department)

     

    AVG

    stmt = select(User.department, func.avg(User.salary)).group_by(User.department)

     

    MAX

    stmt = select(User.department, func.max(User.salary)).group_by(User.department)

     

    MIN

    stmt = select(User.department, func.min(User.salary)).group_by(User.department)

     

    표준편차

    stmt = select(User.department, func.stddev(User.salary)).group_by(User.department)

     

    분산

    stmt = select(User.department, func.variance(User.salary)).group_by(User.department)

    7.2. Having 사용

    stmt = (
        select(User.age, func.count(User.id))
        .group_by(User.age)
        .having(func.count(User.id) > 2)
    )

     


    8. Union

    여러 SELECT 쿼리를 결합.

    stmt1 = select(User).where(User.age < 18)
    stmt2 = select(User).where(User.name.like("John%"))
    
    union_stmt = stmt1.union(stmt2)

     


    9. Join

    SQLAlchemy의 Join 연산 방법은 두가지 방법이 있다.

    • join() 메서드
    • ORM의 관계 이용

     

    Join을 공부하기 전에, 테이블의 별칭을 만드는 방법은 아래와 같다.

    aliased() 함수를 이용하면 된다.

    from sqlalchemy.orm import aliased
    
    user_alias = aliased(User)
    
    stmt = select(user_alias).where(user_alias.name == "John")

     

    9.1. Join 메서드 이용

    inner join

    SELECT * FROM users JOIN posts ON users.id = posts.user_id
    stmt = select(User, Post).join(Post, User.id == Post.user_id)

     

    left outer join

    SELECT * FROM users LEFT OUTER JOIN posts ON users.id = posts.user_id
    stmt = select(User, Post).outerjoin(Post, User.id == Post.user_id)

     

    9.2. 관계형 ORM JOIN

    SQLAlchemy ORM 모델에서 관계(relationship)를 정의하여 JOIN을 간단히 구현

     

    관계 정의 예제

    from sqlalchemy.orm import relationship
    
    class User(Base):
        __tablename__ = "users"
        id = Column(Integer, primary_key=True)
        name = Column(String)
        
        posts = relationship("Post", back_populates="user")
    
    class Post(Base):
        __tablename__ = "posts"
        id = Column(Integer, primary_key=True)
        title = Column(String)
        user_id = Column(Integer, ForeignKey("users.id"))
        
        user = relationship("User", back_populates="posts")

     

    ORM Join 사용

    stmt = select(User).join(User.posts)

     

    9.3. 여러 테이블 JOIN

    stmt = (
        select(User.name, Post.title, Comment.content)
        .join(Post, User.id == Post.user_id)
        .join(Comment, Post.id == Comment.post_id)
    )

     

    9.4. 동적 Join

    def dynamic_join(base_table, join_table, condition, db):
        stmt = select(base_table, join_table).join(join_table, condition)
        result = await db.execute(stmt)
        return result.all()
    
    # 호출 예제
    data = await dynamic_join(User, Post, User.id == Post.user_id, db)

     

    9.5. JOIN 결과 반환 처리

    • 튜플 형식
      • 기본적으로 select(User, Post)는 (User 객체, Post 객체) 형식으로 반환.
    stmt = select(User, Post).join(Post, User.id == Post.user_id)
    result = await db.execute(stmt)
    
    # 결과: 튜플(User, Post) 반환
    for user, post in result:
        print(user.name, post.title)
    • 필드 값만 가져오기
      • select(User.name, Post.title)와 같은 방식으로 특정 필드만 반환.
    stmt = select(User.name, Post.title).join(Post, User.id == Post.user_id)
    result = await db.execute(stmt)
    
    # 결과: 튜플(User.name, Post.title) 반환
    for user_name, post_title in result:
        print(user_name, post_title)

     

    • ORM 객체 방식
      • scalars()를 사용하면 특정 테이블 반환
      • relationship로 정의한 객체를 불러와 다른 테이블에 접근
    stmt = select(User).options(joinedload(User.posts))
    result = await db.execute(stmt)
    
    users = result.scalars().all()
    for user in users:
        print(user.name, [post.title for post in user.posts])

     

    반응형