목차
select 방식에서는 굳이 복잡하게 SQL 방식을 사용할 필요는 없다.
대신, 이번 페이지에서는 select의 다양한 기능에 대해 알아보자
1. 기본 사용 방법
기본적인 실행 순서는 아래와 같다.
- 쿼리 생성: select() 함수 이용
- 조건 추가 : .where() 를 사용해 필터 조건 추가
- 쿼리 실행 : db.execute() 로 쿼리를 실행
- 결과 처리
아래는 result = db.execute() 코드 실행 이후에 결과처리를 위한 메서드 이다.
메서드 | 반환값 | 설명 |
.fetchall() | 행의 리스트(list of Row objects) | 결과의 모든 행을 반환, 메모리 효율성이 중요하거나 단순 데이터 추출에 적합 |
.fetchone() | 첫 번째 행(Row object) | 결과의 첫 번째 행(row)만 반환 |
.scalar() | 첫 번째 컬럼 값(scalar value) | 결과의 첫 번째 행에서 첫 번째 컬럼 값만 반환 |
.scalars() | 스칼라 값 또는 ORM 객체의 제너레이터. | 데이터를 한 번에 처리하지 않고, 필요할 때 한 개씩 값을 가져옴, 코드가 간결하고 Pythonic |
scalars() 메서드는 제너레이터 이기 때문에, 추가적으로 사용하는 메소드가 있음
메서드 | 반환값 | 설명 |
.scalars().all() | 리스트 (list of ORM객체) | 모든 결과를 가져올 때 |
.scalars().first() | ORM 객체 | 단일 결과 또는 첫 번째 행만 가져올 때 |
.scalars().one() | ORM 객체 | 결과가 정확히 하나만 있어야 하는 경우 |
.scalars().one_or_none() | 단일 객체 또는 None | 결과가 없거나 하나만 있어야 하는 경우 |
.scalars().fetchmany(size) | 리스트 | 지정된 개수만큼 결과를 가져올 때 |
.scalars().partitions() | 리스트들 | 결과를 페이징 처리하거나 부분적으로 처리할 때 |
all() 예시코드
stmt = select(User)
result = await db.execute(stmt)
users = result.scalars().all()
# ORM 객체를 딕셔너리 형태로 변환
user_dicts = [{"id": user.id, "name": user.name, "email": user.email} for user in users]
print(user_dicts)
first() 예시코드
stmt = select(User).where(User.email == "john@example.com")
result = await db.execute(stmt)
user = result.scalars().first()
if user:
print(user.name) # 단일 결과의 속성 접근
else:
print("No user found")
팁: scalars() 함수를 주로 가장 많이 사용한다.
2. Where 사용 방법
2.1. 단일 조건
stmt = select(User).where(User.name == "John Doe")
2.2. 연산자
연산자 | 의미 | 사용 예시 |
== | 같다 | User.id == 1 |
!= | 다르다 | User.name != "John Doe" |
< | 작다 | User.age < 30 |
> | 크다 | User.age > 18 |
<= | 작거나 같다 | User.age <= 30 |
>= | 크거나 같다 | User.age >= 18 |
2.3. 기타 연산
LIKE
stmt = select(User).where(User.email.like("%example.com"))
In / Not In
stmt = select(User).where(User.id.in_([1, 2, 3]))
stmt = select(User).where(User.id.notin_([1, 2, 3]))
Is / Is Not
stmt = select(User).where(User.email.is_(None))
stmt = select(User).where(User.email.isnot(None))
Contains
stmt = select(User).where(User.email.contains("@example.com"))
Between
stmt = select(User).where(User.age.between(18, 30))
Is Null 조건
stmt = select(User).where(User.email == None) # 또는 User.email.is_(None)
2.4. and, or, not
and는 , 로 구분한다.
stmt = select(User).where(User.name == "John Doe", User.email.like("%example.com"))
or 은 or_() 를 사용한다.
from sqlalchemy import or_
stmt = select(User).where(or_(User.name == "John Doe", User.name == "Jane Doe"))
not은 ~ 연산자를 사용한다.
stmt = select(User).where(~(User.name == "John Doe"))
2.5. 동적 where 문 설정
from sqlalchemy.future import select
async def get_filtered_users(filters: dict, db):
# 조건 리스트 생성
conditions = []
if "name" in filters:
conditions.append(User.name.like(f"%{filters['name']}%"))
if "email" in filters:
conditions.append(User.email.like(f"%{filters['email']}%"))
if "age_min" in filters:
conditions.append(User.age >= filters["age_min"])
if "age_max" in filters:
conditions.append(User.age <= filters["age_max"])
# 동적으로 조건 추가
stmt = select(User).where(*conditions)
result = await db.execute(stmt)
return result.scalars().all()
3. order by
3.1. 오름차순 / 내림차순
오름차순
stmt = select(User).order_by(User.name.asc())
내림차순
stmt = select(User).order_by(User.name.desc())
3.2. 다중 컬럼
stmt = select(User).order_by(User.name.asc(), User.age.desc())
3.3. 동적 정렬 설정
클라이언트로부터 정렬 조건을 입력받는 경우
- 동적 컬럼 설정
- 동적 정렬 방향 설정
def get_ordered_users(order_by_field: str, ascending: bool, db: AsyncSession):
field = getattr(User, order_by_field) # 동적으로 컬럼 선택
order = field.asc() if ascending else field.desc() # 정렬 방향 설정
stmt = select(User).order_by(order)
result = await db.execute(stmt)
return result.scalars().all()
# 호출
users = await get_ordered_users("age", False, db) # 나이 내림차순
4. limit, offset, distinct
limit
stmt = select(User).limit(10)
offset (limit과 함께 사용가능)
stmt = select(User).limit(10).offset(10)
distinct
stmt = select(User.email).distinct()
5. SQL 함수 호출
SQLAlchemy의 func를 사용하여 SQL 함수 호출.
count 함수
from sqlalchemy import func
stmt = select(func.count(User.id))
6. 서브 쿼리
SELECT 내부에 또 다른 SELECT 사용.
subquery = select(User.id).where(User.age > 18).subquery()
stmt = select(Post).where(Post.user_id.in_(subquery))
7. Group by 와 Having
7.1. group by와 함수 사용
COUNT
from sqlalchemy import func
stmt = select(User.age, func.count(User.id)).group_by(User.age)
SUM
stmt = select(User.department, func.sum(User.salary)).group_by(User.department)
AVG
stmt = select(User.department, func.avg(User.salary)).group_by(User.department)
MAX
stmt = select(User.department, func.max(User.salary)).group_by(User.department)
MIN
stmt = select(User.department, func.min(User.salary)).group_by(User.department)
표준편차
stmt = select(User.department, func.stddev(User.salary)).group_by(User.department)
분산
stmt = select(User.department, func.variance(User.salary)).group_by(User.department)
7.2. Having 사용
stmt = (
select(User.age, func.count(User.id))
.group_by(User.age)
.having(func.count(User.id) > 2)
)
8. Union
여러 SELECT 쿼리를 결합.
stmt1 = select(User).where(User.age < 18)
stmt2 = select(User).where(User.name.like("John%"))
union_stmt = stmt1.union(stmt2)
9. Join
SQLAlchemy의 Join 연산 방법은 두가지 방법이 있다.
- join() 메서드
- ORM의 관계 이용
Join을 공부하기 전에, 테이블의 별칭을 만드는 방법은 아래와 같다.
aliased() 함수를 이용하면 된다.
from sqlalchemy.orm import aliased
user_alias = aliased(User)
stmt = select(user_alias).where(user_alias.name == "John")
9.1. Join 메서드 이용
inner join
SELECT * FROM users JOIN posts ON users.id = posts.user_id
stmt = select(User, Post).join(Post, User.id == Post.user_id)
left outer join
SELECT * FROM users LEFT OUTER JOIN posts ON users.id = posts.user_id
stmt = select(User, Post).outerjoin(Post, User.id == Post.user_id)
9.2. 관계형 ORM JOIN
SQLAlchemy ORM 모델에서 관계(relationship)를 정의하여 JOIN을 간단히 구현
관계 정의 예제
from sqlalchemy.orm import relationship
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String)
posts = relationship("Post", back_populates="user")
class Post(Base):
__tablename__ = "posts"
id = Column(Integer, primary_key=True)
title = Column(String)
user_id = Column(Integer, ForeignKey("users.id"))
user = relationship("User", back_populates="posts")
ORM Join 사용
stmt = select(User).join(User.posts)
9.3. 여러 테이블 JOIN
stmt = (
select(User.name, Post.title, Comment.content)
.join(Post, User.id == Post.user_id)
.join(Comment, Post.id == Comment.post_id)
)
9.4. 동적 Join
def dynamic_join(base_table, join_table, condition, db):
stmt = select(base_table, join_table).join(join_table, condition)
result = await db.execute(stmt)
return result.all()
# 호출 예제
data = await dynamic_join(User, Post, User.id == Post.user_id, db)
9.5. JOIN 결과 반환 처리
- 튜플 형식
- 기본적으로 select(User, Post)는 (User 객체, Post 객체) 형식으로 반환.
stmt = select(User, Post).join(Post, User.id == Post.user_id)
result = await db.execute(stmt)
# 결과: 튜플(User, Post) 반환
for user, post in result:
print(user.name, post.title)
- 필드 값만 가져오기
- select(User.name, Post.title)와 같은 방식으로 특정 필드만 반환.
stmt = select(User.name, Post.title).join(Post, User.id == Post.user_id)
result = await db.execute(stmt)
# 결과: 튜플(User.name, Post.title) 반환
for user_name, post_title in result:
print(user_name, post_title)
- ORM 객체 방식
- scalars()를 사용하면 특정 테이블 반환
- relationship로 정의한 객체를 불러와 다른 테이블에 접근
stmt = select(User).options(joinedload(User.posts))
result = await db.execute(stmt)
users = result.scalars().all()
for user in users:
print(user.name, [post.title for post in user.posts])
'Fastapi' 카테고리의 다른 글
[FastAPI] SQLAlchemy 상세 - Delete (6-4) (0) | 2025.01.01 |
---|---|
[FastAPI] SQLAlchemy 상세 - Update(6-3) (0) | 2025.01.01 |
[FastAPI] SQLAlchemy 상세 - Create/Insert (6-1) (0) | 2024.12.31 |
[FastAPI] Sqlalchemy와 CRUD (6) (1) | 2024.12.31 |
[FastAPI] FastAPI에서 Redis 사용하기 (3) | 2024.11.20 |