목차
반응형
현재 글에서는 SQLAlchemy 2.0 기준으로 글을 작성하겠다.
CRUD는 크게 두가지 방법이 있다.
- ORM 방식
- 객체 중심적으로 작업해야 하는 경우
- 데이터베이스와 python 객체의 상태 동기화가 필요한 경우
- SQL 방식
- 대량의 데이터를 처리하거나 성능이 중요한 경우
- ORM이 필요 없는 아주 간단한 작업
두가지 방법을 비교하며 작성하겠다.
이번 시간에는 Insert 에 대해 공부해 보자.
1. ORM 방식
1.1. ORM 기본 방법
- 사용 함수: db.add()
- 단점: 대량 데이터 삽입에는 비효율적일 수 있음(대량 작업에서는 SQL 방식 추천).
from sqlalchemy.ext.asyncio import AsyncSession
from models import User # ORM 모델
from database import get_db
async def insert_user(name: str, email: str, db: AsyncSession):
# 1. 모델 객체 생성
new_user = User(name=name, email=email)
# 2. 세션에 추가
db.add(new_user)
await db.commit()
await db.refresh(new_user)
return new_user
1.2. ORM 중복 데이터 처리
1.2.1. 수동 확인
데이터가 존재하는지 수동으로 확인한다.
stmt = select(User).where(User.email == "example@example.com")
result = await db.execute(stmt)
existing_user = result.scalars().first()
if not existing_user:
new_user = User(name="John", email="example@example.com")
db.add(new_user)
await db.commit()
else:
print("User already exists!")
1.2.2. merge() 함수 사용
merge() 함수는 기존 값이 존재하면 update 하고, 없으면 insert를 한다
new_user = User(id=1, name="John", email="example@example.com")
merged_user = db.merge(new_user) # 기존 데이터를 업데이트하거나 새로운 데이터를 추가
await db.commit()
2. SQL 방식
2.1. 단일 데이터 삽입
- 사용 함수: insert()
- 설명: 단일 데이터 삽입
from sqlalchemy import insert
from models import User # SQLAlchemy ORM 모델
from database import get_db # 세션 의존성
# 비동기 함수 예제
async def insert_user_sql(name: str, email: str, db):
# 1. INSERT 문 생성
stmt = insert(User).values(name=name, email=email)
await db.execute(stmt)
await db.commit()
2.2. 다량의 데이터 삽입
- 사용 함수: insert()
- 설명: list[dict] 형식으로 데이터를 다량으로 받는다.
async def insert_multiple_users_sql(users: list[dict], db):
# 1. INSERT 문 생성
stmt = insert(User)
await db.execute(stmt, users)
await db.commit()
async def main():
async with get_db() as db:
users = [
{"name": "Alice", "email": "alice@example.com"},
{"name": "Bob", "email": "bob@example.com"}
]
await insert_multiple_users_sql(users, db)
2.3. 중복 데이터 처리
2.3.1. 중복 발생시 작업 중지
postgresql 기준
- import 시 postgresql을 불러온다
- on_conflict_do_nothing() 메서드 사용
from sqlalchemy.dialects.postgresql import insert
stmt = (
insert(User)
.values(id=1, name="John", email="example@example.com")
.on_conflict_do_nothing(index_elements=["id"]) # 'id' 중복 시 아무 작업도 하지 않음
)
mysql 기준
- import 시 mysql을 불러온다
- on_duplicate_key_update() 메서드 사용
from sqlalchemy.dialects.mysql import insert
stmt = (
insert(User)
.values(id=id, name=name, email=email)
.on_duplicate_key_update(
name=name, # 중복 발생 시 업데이트할 필드
email=email
)
)
2.3.2. 중복 발생시 업데이트
postgresql 기준
- on_conflict_do_update() 메서드 사용
stmt = (
insert(User)
.values(id=1, name="John", email="new_email@example.com")
.on_conflict_do_update(
index_elements=["id"], # 'id' 중복 시 처리
set_={"email": "new_email@example.com"} # 업데이트할 컬럼
)
)
mysql 기준
- on_duplicate_key_update() 메서드 사용
stmt = (
insert(User)
.values(id=id, name=name, email=email)
.on_duplicate_key_update(
name=name, # 중복 발생 시 업데이트할 필드
email=email
)
)
3. FastAPI 예시
@app.post("/users/")
async def create_user(name: str, email: str, db: AsyncSession = Depends(get_db)):
stmt = (
insert(User)
.values(name=name, email=email)
.on_conflict_do_nothing(index_elements=["email"]) # 'email' 중복 시 무시
)
result = await db.execute(stmt)
await db.commit()
if result.rowcount == 0:
raise HTTPException(status_code=400, detail="User already exists")
return {"message": "User created successfully"}
반응형
'Fastapi' 카테고리의 다른 글
[FastAPI] SQLAlchemy 상세 - Update(6-3) (0) | 2025.01.01 |
---|---|
[FastAPI] SQLAlchemy 상세 - Read/Select (6-2) (1) | 2025.01.01 |
[FastAPI] Sqlalchemy와 CRUD (6) (1) | 2024.12.31 |
[FastAPI] FastAPI에서 Redis 사용하기 (3) | 2024.11.20 |
[FastAPI] SSO 로그인과 OAuth 2.0 (0) | 2024.11.16 |