ElGraph (ElGraph v0.3.0)

Copy Markdown View Source

Graph-first 에이전트 오케스트레이션의 코어: 상태 채널 + 노드 + 엣지로 그래프를 선언하고, superstep 루프로 실행한다. 설계 전문은 docs/SPEC.md.

예제

graph =
  ElGraph.new()
  |> ElGraph.state(:messages, default: [], reducer: {ElGraph.Reducers, :append, []})
  |> ElGraph.add_node(:agent, &MyApp.Agent.call/2)
  |> ElGraph.add_node(:tools, &MyApp.Tools.call/2)
  |> ElGraph.add_edge(:tools, :agent)
  |> ElGraph.add_conditional_edge(:agent, &MyApp.Router.route/1)
  |> ElGraph.compile(entry: :agent)

{:ok, state} = ElGraph.invoke(graph, %{messages: [user_msg]})

노드는 (state, ctx)를 받아 상태 부분 업데이트 맵을 반환한다. durable 그래프(체크포인트 재개)의 노드는 MFA 또는 원격 캡처(&Mod.fun/2)를 권장한다.

Summary

Functions

조건부 엣지를 추가한다. router(state) -> 노드이름 | :end.

고정 엣지를 추가한다. to는 노드 이름 또는 :end.

노드를 추가한다. run은 MFA {m, f, extra_args} 또는 2-인자 함수.

그래프를 검증하고 실행 가능한 형태로 확정한다. 유효하지 않으면 ElGraph.CompileError.

그래프를 실행하고 최종 상태를 반환한다.

빈 그래프를 만든다.

체크포인트에서 실행을 재개한다 (SPEC §3.5).

상태 키(채널)를 선언한다.

그래프를 실행하며 이벤트를 lazy 스트림으로 반환한다 (SPEC §3.7).

Functions

add_conditional_edge(graph, from, router)

@spec add_conditional_edge(ElGraph.Graph.t(), atom(), ElGraph.Graph.router()) ::
  ElGraph.Graph.t()

조건부 엣지를 추가한다. router(state) -> 노드이름 | :end.

라우터는 순수해야 한다(SPEC §3.3) — 재개·리플레이 시 재평가된다.

add_edge(graph, from, to)

@spec add_edge(ElGraph.Graph.t(), atom(), atom()) :: ElGraph.Graph.t()

고정 엣지를 추가한다. to는 노드 이름 또는 :end.

add_node(graph, name, run, opts \\ [])

노드를 추가한다. run은 MFA {m, f, extra_args} 또는 2-인자 함수.

옵션

  • :input — 노드에 전달할 상태 키 목록 (input projection, SPEC §3.4)
  • :timeout — 노드 실행 시간 상한(ms, 기본 :infinity). 초과 시 {:error, {:node_timeout, node, ms}} — 병렬 형제의 완료된 쓰기는 보존된다

compile(graph, opts \\ [])

@spec compile(
  ElGraph.Graph.t(),
  keyword()
) :: ElGraph.Graph.t()

그래프를 검증하고 실행 가능한 형태로 확정한다. 유효하지 않으면 ElGraph.CompileError.

검증 항목(SPEC §3.3): entry 존재, 엣지/라우터 대상 존재, reducer 형태(MFA/원격 캡처), :input 키 선언 여부, 도달 불가 노드(정적 엣지만 있는 그래프에 한해).

invoke(graph, input, opts \\ [])

@spec invoke(ElGraph.Graph.t(), map() | keyword(), keyword()) ::
  {:ok, map()} | {:error, term()} | {:interrupted, map()}

그래프를 실행하고 최종 상태를 반환한다.

옵션

  • :max_steps — superstep 상한 (기본 25). 초과 시 {:error, {:max_steps_exceeded, _}}
  • :thread_id — 실행 식별자 (기본 자동 생성). 체크포인트/재개의 키
  • :event_sinkElGraph.Ctx.emit/2 이벤트를 받을 pid
  • :checkpointer{module, config}. 지정 시 초기 상태와 매 superstep 후 체크포인트 저장
  • :interrupt_before — 노드 목록. 해당 노드 진입 전에 {:interrupted, info} 반환 (:checkpointer 필수)
  • :durability — 체크포인트 영속 시점 (SPEC §3.5). :sync(기본, 매 step 동기) / :async(순서 보장 writer에 비동기, 반환 전 flush) / :exit(완료·인터럽트만 영속, 가장 빠름)

new()

@spec new() :: ElGraph.Graph.t()

빈 그래프를 만든다.

resume(graph, opts)

@spec resume(
  ElGraph.Graph.t(),
  keyword()
) :: {:ok, map()} | {:error, term()} | {:interrupted, map()}

체크포인트에서 실행을 재개한다 (SPEC §3.5).

마지막 체크포인트의 상태와 활성 노드에서 이어가며, 부분 실패한 superstep의 pending writes가 있으면 완료된 노드를 재실행하지 않는다.

필수 옵션: :checkpointer ({module, config}), :thread_id. 동적 인터럽트 재개 시 :resume 옵션의 값이 ElGraph.Ctx.interrupt/2의 반환값으로 주입된다 (노드는 처음부터 재실행, 호출 순서로 매칭 — SPEC §3.6). 체크포인트가 없으면 {:error, :no_checkpoint}, 인터럽트되지 않았는데 :resume을 주면 {:error, :nothing_to_resume}.

state(graph, key, opts \\ [])

@spec state(ElGraph.Graph.t(), atom(), keyword()) :: ElGraph.Graph.t()

상태 키(채널)를 선언한다.

옵션

  • :default — 초기값 (기본 nil)
  • :reducer — 쓰기 병합 함수 (현재값, 새값) -> 병합값. MFA 또는 원격 캡처만 허용(SPEC §3.1). 미지정 시 overwrite.

stream(graph, input, opts \\ [])

@spec stream(ElGraph.Graph.t(), map() | keyword(), keyword()) :: Enumerable.t()

그래프를 실행하며 이벤트를 lazy 스트림으로 반환한다 (SPEC §3.7).

스트림 원소는 %{thread_id:, step:, node:, event:} 맵이다: 생명주기 이벤트(:node_start/:node_end), ElGraph.Ctx.emit/2의 사용자 이벤트, 마지막으로 {:done, result}. 실행 프로세스는 호출자에 link되며, 스트림을 조기 중단하면 정리(kill)된다.