인공지능 개발자 수다(유튜브 바로가기) 자세히보기

Fastapi

[FastAPI] SSO 로그인과 OAuth 2.0

Suda_777 2024. 11. 16. 23:15

목차

    반응형

    1.  SSO 로그인

    • SSO(Single Sign-On)는 사용자가 한 번의 로그인으로 여러 애플리케이션이나 시스템에 접근할 수 있도록 해주는 인증 방식
    • 중앙 집중식 인증
    • 주로 OAuth 2.0과 OpenID Connect와 같은 프로토콜을 사용
    • 사용자는 먼저 IdP에 로그인하고, 해당 로그인 정보가 포함된 인증 토큰(보통 JWT)을 받는다.

    일반적으로 여러 서비스가 붙어있는 거대한 환경에서 쓴다.

     


    2.  OAuth 2.0

    2.1. OAuth 2.0 개념

    • 권한 위임(Authorization Delegation)
      • 유저는 비밀번호 등의 자격증명을 애플리케이션에 직접 제공하지 않고, 신뢰할 수 있는 권한 서버(Authorization Server)를 통해 자원에 접근할 권한을 위임함
    • 역할(Role)
      • Resource Owner (자원 소유자): 사용자를 의미하며, 자원의 소유권을 가지고 있는 주체
      • Client (클라이언트 애플리케이션): 자원에 접근하려는 애플리케이션입
      • Resource Server (자원 서버): 자원을 호스팅하고, 클라이언트 요청을 처리
      • uthorization Server (권한 서버): 사용자를 인증하고, 클라이언트에 액세스 토큰을 발급
    • 토큰(Token)
      • ccess Token (액세스 토큰): 제한된 시간 동안 특정 자원에 접근할 수 있는 권한을 나타냅니다.
      • Refresh Token (리프레시 토큰): 액세스 토큰이 만료된 경우, 새 액세스 토큰을 발급받을 때 사용됩니다.
    • 흐름
      • 사용자가 클라이언트 애플리케이션에 접근
      • 사용자가 권한 서버에서 인증
      • Authorization Code 발급
        • 권한 서버 ---(Authorization Code) --> 클라이언트 애플리케이션
      • Authorization Code로 Access Token 요청
        • 클라이언트 애플리케이션 ---(Authorization Code, Access Token을 요청) --> 권한 서버
        • 클라이언트 애플리케이션 <---(Access Token) -- 권한 서버
        • 요청: 애플리케이션의 클라이언트 ID, 비밀키(Client Secret)가 포함됨
      • Resource Server 접근
        • 클라이언트 애플리케이션 ---(Access Token, 자원 요청) -->  자원 서버

     

    요약하자면. 

    • 사용자가 권한 서버에서 인증
    • 클라이언트가 Authorization Code를 권한서버에 요청
    • 클라이언트가 Access Token을 요청
    • 클라이언트가 자원 서버에 자원 요청

    2.2. OAuth 2.0 사용 예시

    2.2.1. 사용자가 권한 서버에서 인증

    • REDIRECT_URI
      • 클라이언트가 인증 코드를 받는 위치 (클라이언트 URL), 사전에 권한 서버에 등록해야 함
      • 백엔드 서버가 클라이언트 역할을 할 수 있음. 프론트 엔드도 할 수 있음.
      • 예: FastAPI 애플리케이션의 특정 경로가 REDIRECT_URI로 등록
      • 예: http://host:8000/callback
    • 파라미터
      • 클라이언트 ID
      • Redirect URL
      • scope: 권한의 범위를 정의하는 매개변수
      • state: 클라이언트가 생성한 고유한 값으로, 요청 시 함께 전송, 권한 서버가 state 값을 그대로 응답에 포함해 돌려줌, 클라이언트는 응답의 state 값이 요청 시의 값과 일치하는지 확인해 요청의 무결성을 검증
    • 권한 서버에 파라미터 보내기
      • 쿼리 파라미터 형식: 권한서버 URL?key=value
      • RedirectResponse를 이용하면 자동으로 get 요청을 보냄
    from fastapi import FastAPI, Request, HTTPException, Depends
    from fastapi.responses import RedirectResponse
    import httpx
    import uuid
    
    app = FastAPI()
    
    CLIENT_ID = "your-client-id"
    CLIENT_SECRET = "your-client-secret"
    REDIRECT_URI = "http://localhost:8000/callback"
    TOKEN_ENDPOINT = "https://auth.example.com/oauth2/token"
    AUTHORIZATION_SERVER = "https://auth.example.com/oauth2/authorize"
    
    # 임시로 state 값을 저장할 딕셔너리 (실제 구현에서는 Redis나 세션 사용)
    state_storage = {}
    
    @app.get("/login")
    def login():
        # 고유한 state 값 생성
        state = str(uuid.uuid4())
        state_storage[state] = True  # state 값 저장
    
        # 권한 서버로 리디렉션
        params = {
            "response_type": "code",
            "client_id": CLIENT_ID,
            "redirect_uri": REDIRECT_URI,
            "scope": "openid profile email",
            "state": state,
        }
        authorization_url = f"{AUTHORIZATION_SERVER}?{urllib.parse.urlencode(params)}"
        return RedirectResponse(url=authorization_url)

     

    2.2.2. Authorization Code를 수신, Access Token을 요청

     

    REDIRECT_URI을 FastAPI로 돌려달라고 요청했으니까 해당 URL로 받아 작업을 처리한다.

    그리고 일반적으로 Authorization Code를 수신, Access Token을 요청은 하나의 API에서 진행한다.

     

    step1. 권한 정보(Authorization Code) 수신

    • code와 state을 받는다.
    • state을 검증한다

    step2. Access Token 요청

    • 파라미터
      • code: 받은 권한 코드를 그대로 돌려주면 됨
      • redirect_uri: 권한 서버에 등록된 리디렉션 URI(요청 시와 동일해야 함). 로그인때 사용했던 값 똑같이 사용
      • client_id: 클라이언트 아이디
      • client_secret: 클라이언트를 인증하는 비밀 키
    • post 형식으로 요청
      • post형식으로 요청하면 결과로 Access Token을 받는다.
    • return
      • 여기서는 예시이기 때문에 그냥 return 했지만, 토큰은 따로 저장해 줘야 한다.
    @app.get("/callback")
    async def callback(request: Request):
        # 1. Authorization Code와 State 수신
        code = request.query_params.get("code")
        state = request.query_params.get("state")
    
        if not code or not state:
            raise HTTPException(status_code=400, detail="Authorization code or state is missing")
    
        if state not in state_storage:
            raise HTTPException(status_code=400, detail="Invalid or expired state")
        del state_storage[state]  # 검증 후 삭제 (state는 일회용)
    
        # 2. Access Token 요청
        token_request_data = {
            "grant_type": "authorization_code",
            "code": code,
            "redirect_uri": REDIRECT_URI,
            "client_id": CLIENT_ID,
            "client_secret": CLIENT_SECRET,
        }
    
        async with httpx.AsyncClient() as client:
            token_response = await client.post(TOKEN_ENDPOINT, data=token_request_data)
            if token_response.status_code != 200:
                raise HTTPException(status_code=token_response.status_code, detail=token_response.text)
    
            token_data = token_response.json()
    
        # 3. Access Token 반환
        return {
            "access_token": token_data.get("access_token"),
            "id_token": token_data.get("id_token"),
            "token_type": token_data.get("token_type"),
            "expires_in": token_data.get("expires_in"),
        }

     

     

    2.2.3. 토큰 관리

    1. redis에 저장

    유효기간을 지정해, 이후에는 삭제할 수 있다.

    token_data = {"access_token": "example_token"}
    
    # 사용자 ID와 Access Token을 Redis에 저장
    user_id = "user123"  # 사용자 식별자 (예: 로그인 ID)
    redis_client.set(user_id, token_data["access_token"], ex=3600)  # 1시간 유효 기간 설정
    
    return {"message": "Token saved in Redis"}

     

    2. 웹브라우저, 모바일에서 관리

    그냥 return 에 토큰을 보내면 된다.

    일반적으로 FastAPI에서는 웹브라우저, 모바일에서 관리하도록 한다.

    토큰의 유효시간 검증은, 클라이언트에서 요청을 보낼 때 실행한다. (callback에서는 작업하지 않음)

    return {
        "access_token": token_data.get("access_token"),
        "id_token": token_data.get("id_token"),
        "token_type": token_data.get("token_type"),
        "expires_in": token_data.get("expires_in"),
    }

     

    2.2.4. 토큰 재발급

    토큰이 만료되었을 때에는 토큰을 재발급 받아야 한다.

    예시에서는 JWT 토큰을 기준으로 예외처리를 했으나, 이부분은 수정해서 쓰면 되겠다.

    @app.post("/refresh")
    def refresh_access_token(refresh_token: str):
        try:
            payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=[ALGORITHM])
            user_id = payload.get("sub")
    
            # 새로운 Access Token 생성
            new_access_token = create_access_token({"sub": user_id}, timedelta(minutes=15))
            return {"access_token": new_access_token}
        except jwt.ExpiredSignatureError:
            raise HTTPException(status_code=401, detail="Refresh token expired")
        except jwt.InvalidTokenError:
            raise HTTPException(status_code=401, detail="Invalid refresh token")
    반응형