인공지능 개발자 수다(유튜브 바로가기) 자세히보기

자연어처리/LangGraph

[LangGraph] Human-in-the-loop & Time Travel

Suda_777 2025. 8. 11. 01:54

목차

    반응형

     

    langgraph 버전: 0.6.4

    내용 출처 : LangGrapn Docs

     

    LangGraph

    LangGraph Trusted by companies shaping the future of agents – including Klarna, Replit, Elastic, and more – LangGraph is a low-level orchestration framework for building, managing, and deploying long-running, stateful agents. Get started Install LangGr

    langchain-ai.github.io

     

    1. 개요

    실무에서는 “AI가 알아서 다 한다”보다 중간에 사람이 개입하거나, 과거 시점으로 되돌려 재실행해야 할 순간이 많아요.

    LangGraph는 이를 위해 두 가지를 제공합니다.

    • Human-in-the-loop(HITL): 실행 도중 일시 정지(Interrupt) → 사람이 확인/수정 → 그 지점부터 재개. 
    • Time Travel: 이미 저장된 체크포인트를 골라 그 시점으로 복원하거나 **분기(fork)**해서 다른 결과를 실험. 

     

    두 기능 모두 체크포인터(예: MemorySaver)가 필요하고, 실행 시 thread_id를 반드시 넘겨야 합니다.  

     


    2. Human-in-the-loop(HITL)

    2.1. 개념

    • 정적 인터럽트: interrupt_before / interrupt_after특정 노드 전/후에서 반드시 멈추기.
    • 동적 인터럽트: 노드 내부에서 상황에 따라 interrupt() 호출(필요할 때만 멈춤).
    • 멈춘 순간의 상태는 체크포인트로 저장되고, 같은 thread_id로 다시 호출하면 멈춘 지점부터 이어집니다.

     


    2.2. 예시 코드

    2.2.1. 정적 인터럽트 예시

    • interrupt_before=["review_and_finalize"]로 해당 노드 직전에 정지.
    • 멈춘 후, 같은 thread_id {"approved": True/False}를 넣어 재개.
    • 인터럽트/정지·재개의 기본 개념은 공식 가이드와 동일합니다.
    from typing import TypedDict
    from langgraph.graph import StateGraph, END
    from langgraph.checkpoint.memory import MemorySaver
    
    class State(TypedDict):
        text: str
        draft: str
        approved: bool
        final: str
    
    def write_draft(state: State):
        # 초안 생성
        return {"draft": f"[DRAFT] {state['text']}", "approved": False}
    
    def review_and_finalize(state: State):
        # 사람이 승인했다면 확정, 아니면 다시 수정하거나 종료
        if state.get("approved"):
            return {"final": state["draft"].replace("[DRAFT]", "[FINAL]")}
        return {"final": "[REJECTED]"}
    
    # 그래프 구성
    g = StateGraph(State)
    g.add_node("write_draft", write_draft)
    g.add_node("review_and_finalize", review_and_finalize)
    g.set_entry_point("write_draft")
    g.add_edge("write_draft", "review_and_finalize")
    g.add_edge("review_and_finalize", END)
    
    # 체크포인터 필수
    app = g.compile(checkpointer=MemorySaver())
    
    thread = {"configurable": {"thread_id": "hitl-demo-1"}}
    
    # 1) 초안까지 실행하고 '검토 노드' 직전에 멈춤
    for ev in app.stream(
        {"text": "가격 인하 공지문 작성"},
        config=thread,
        interrupt_before=["review_and_finalize"],  # <-- 여기서 일시 정지
    ):
        # 초안(draft) 생성 이벤트만 흘러옴
        pass
    
    # (사람 개입) 초안을 확인하고 승인 여부를 정함:
    human_decision = True  # 실제 앱이라면 UI에서 받는다
    
    # 2) 같은 thread_id로 재개하면서 사람 입력을 상태에 주입
    result = app.invoke({"approved": human_decision}, config=thread)
    print(result["final"])
    # True면 [FINAL] ..., False면 [REJECTED]

     

    2.2.2. 동적 인터럽트 예시

    노드 안에서 interrupt()를 직접 호출해 “필요한 순간”에만 멈추는 방식

    1. 노드 내부에서 interrupt() 호출 → 실행 일시정지
    2. 체크포인터 + thread_id 필수 → 멈춘 지점 상태가 저장됨
    3. Command(resume=...)로 재개 → 사람이 준 값을 그 노드의 반환값으로 이어받음
    from typing import TypedDict
    from langgraph.graph import StateGraph, START, END
    from langgraph.types import Command, interrupt
    from langgraph.checkpoint.memory import MemorySaver
    
    class State(TypedDict):
        input: str
        note: str
    
    def step_1(state: State):
        # 평소처럼 처리
        return {"note": f"draft for: {state['input']}"}
    
    def ask_human(state: State):
        # ⬇️ 동적 인터럽트: 필요한 순간에만 사람 입력을 기다림
        human_text = interrupt("승인/수정 메모를 입력해 주세요:")
        # 재개 시 이 값이 함수의 반환값처럼 사용됨
        return {"note": state["note"] + f" | human: {human_text}"}
    
    def finalize(state: State):
        return {"note": state["note"] + " | FINALIZED"}
    
    # 그래프 구성
    g = StateGraph(State)
    g.add_node("step_1", step_1)
    g.add_node("ask_human", ask_human)
    g.add_node("finalize", finalize)
    g.add_edge(START, "step_1")
    g.add_edge("step_1", "ask_human")
    g.add_edge("ask_human", "finalize")
    g.add_edge("finalize", END)
    
    app = g.compile(checkpointer=MemorySaver())
    cfg = {"configurable": {"thread_id": "demo-hitl-dynamic"}}
    
    # 1) 실행: ask_human에서 interrupt()가 호출되면 여기서 멈춤
    for ev in app.stream({"input": "가격 인하 공지 초안"}, config=cfg, stream_mode="updates"):
        # 마지막 이벤트에 '__interrupt__'가 보이면 멈춘 상태
        print(ev)
    
    # 2) 재개: 사람이 입력한 값을 Command(resume=...)로 전달
    out = app.invoke(Command(resume="문구 OK, 날짜만 9/1로 수정"), config=cfg)
    print(out["note"])
    # 예) "draft for: 가격 인하 공지 초안 | human: 문구 OK... | FINALIZED"

     


    3. Time Travel

    3.1. 개념

    Time Travel은 이미 저장된 실행 이력에서 원하는 체크포인트를 찾아 그 지점으로 복원하거나,

    상태를 조금 바꿔 다른 가지로 분기하는 기능입니다.

     

     

    핵심 메서드

    • get_state_history(config) : 특정 thread_id체크포인트 이력을 나열
    • get_state(config) : 특정 체크포인트의 스냅샷 조회
    • update_state(config, updates) : 특정 체크포인트에서 분기(fork)하여 상태를 수정

     

    이 작업은 모두 체크포인터가 설정돼 있어야 가능합니다.

     


    3.2. 예시 코드

    이력 조회 → 체크포인트 선택 → 분기라는 3단계

    과거 지점에 checkpoint_id로 진입 후, update_state()로 약간 수정하고 invoke()하면 새 가지가 생김

     

    과거 시점 관련 변수

    • thread_id = “어떤 대화나 워크플로우 세션인가?” (대화방/세션 ID 같은 것)
    • checkpoint_id = “그 세션에서 저장된 어느 시점인가?” (버전/스텝 ID 같은 것)
    • checkpoint_ns = “그 체크포인트를 어떤 서브그래프/작업 공간에서 만들었는가?” (폴더/서브디렉토리 느낌)

     

    그래프 생성 & 첫 실행

    from typing import TypedDict
    from langgraph.graph import StateGraph, END
    from langgraph.checkpoint.memory import MemorySaver
    
    class State(TypedDict):
        text: str
        step: int
        log: list[str]
    
    def step1(state: State):
        # step1: 기본 로그만 추가
        return {"step": 1, "log": state.get("log", []) + ["step1"]}
    
    def step2(state: State):
        # ✅ step2가 text를 사용하도록 변경
        #   - 원본 실행: "hello"
        #   - fork 실행: "hello (forked)"
        return {
            "step": 2,
            "log": state["log"] + [f"step2:text={state.get('text','<none>')}"]
        }
    
    g = StateGraph(State)
    g.add_node("step1", step1)
    g.add_node("step2", step2)
    g.set_entry_point("step1")
    g.add_edge("step1", "step2")
    g.add_edge("step2", END)
    
    app = g.compile(checkpointer=MemorySaver())
    base_cfg = {"configurable": {"thread_id": "travel-1"}}
    
    # 최초 실행
    result1 = app.invoke({"text": "hello", "log": []}, config=base_cfg)
    print("base:", result1["log"])
    # 예) base: ['step1', 'step2:text=hello']

     

    이력 조회

    history = list(app.get_state_history(base_cfg))
    
    # 'step1' 완료 시점 체크포인트 찾기
    cp_id = cp_ns = None
    for snap in history:
        if snap.values.get("step") == 1:
            conf = snap.config["configurable"]
            cp_id = conf.get("checkpoint_id")
            cp_ns = conf.get("checkpoint_ns", "")
            break
    
    for snap in history:
        print(snap)

     

    특정 분기에서 다시 실행

    # (3) 분기 지점에서 상태 업데이트 → update_state가 새 config를 반환!
    fork_cfg = {
        "configurable": {
            "thread_id": base_cfg["configurable"]["thread_id"],
            "checkpoint_id": cp_id,
            "checkpoint_ns": cp_ns,
        }
    }
    
    # ⬇️ 반환값을 반드시 받아서 사용
    fork_cfg = app.update_state(fork_cfg, {"text": "hello (forked)"})
    
    # 새 체크포인트에서 재실행
    result2 = app.invoke(None, config=fork_cfg)
    print("fork:", result2["log"])
    # 기대: ['step1', 'step2:text=hello (forked)']

     


    4. 코드 실행

    https://colab.research.google.com/drive/1DU3CcsVGP4enIytGzsvp7hBc6rgbTe6J?usp=sharing

     

    [LangGraph] Human-in-the-loop & Time Travel.ipynb

    Colab notebook

    colab.research.google.com

     

    반응형