Jido.Composer.FanOut.State (Jido Composer v0.6.0)

Copy Markdown View Source

Typed sub-state for tracking FanOut branch execution.

Replaces the bare map pending_fan_out in Workflow Strategy. Encapsulates branch completion, suspension, queueing, and merge logic.

Summary

Functions

Record a successful branch completion.

Record a branch error (for :collect_partial mode).

Returns the completion status of the fan-out.

Merges branch results using the specified merge strategy.

Drain queued branches into available slots. Returns {updated_state, to_dispatch} where to_dispatch is a list of {branch_name, directive} tuples.

Find a suspended branch by suspension_id.

Check if a suspended branch matches the given suspension_id.

Merge completed results using the configured merge strategy.

Creates a new FanOut.State from dispatched and queued branches.

Resume a suspended branch by removing it from suspended_branches.

Types

t()

@type t() :: %Jido.Composer.FanOut.State{
  completed_results: %{required(atom()) => term()},
  id: String.t(),
  merge: atom(),
  node: struct(),
  on_error: :fail_fast | :collect_partial,
  pending_branches: MapSet.t(),
  queued_branches: [{atom(), term()}],
  suspended_branches: %{
    required(atom()) => %{
      suspension: Jido.Composer.Suspension.t(),
      partial_result: term()
    }
  },
  total_branches: non_neg_integer() | nil
}

Functions

branch_completed(state, branch_name, result)

@spec branch_completed(t(), atom(), term()) :: t()

Record a successful branch completion.

branch_error(state, branch_name, reason)

@spec branch_error(t(), atom(), term()) :: t()

Record a branch error (for :collect_partial mode).

branch_suspended(state, branch_name, suspension, partial_result)

@spec branch_suspended(t(), atom(), Jido.Composer.Suspension.t(), term()) :: t()

Record a branch suspension.

completion_status(state)

@spec completion_status(t()) :: :all_done | :suspended | :in_progress

Returns the completion status of the fan-out.

do_merge(results, merge)

@spec do_merge(
  %{required(atom()) => term()} | [{atom(), term()}],
  atom() | function()
) :: map()

Merges branch results using the specified merge strategy.

Accepts either a map (from strategy tracking) or a keyword list (from inline FanOutNode.run/3).

drain_queue(state)

@spec drain_queue(t()) :: {t(), [{atom(), term()}]}

Drain queued branches into available slots. Returns {updated_state, to_dispatch} where to_dispatch is a list of {branch_name, directive} tuples.

find_suspended_branch(state, suspension_id)

@spec find_suspended_branch(t(), String.t()) :: {atom(), map()} | nil

Find a suspended branch by suspension_id.

has_suspended_branch?(state, suspension_id)

@spec has_suspended_branch?(t(), String.t()) :: boolean()

Check if a suspended branch matches the given suspension_id.

merge_results(state)

@spec merge_results(t()) :: map()

Merge completed results using the configured merge strategy.

new(id, node, dispatched_names, queued_branches, opts \\ [])

@spec new(String.t(), struct(), MapSet.t(), [{atom(), term()}], keyword()) :: t()

Creates a new FanOut.State from dispatched and queued branches.

resume_branch(state, branch_name)

@spec resume_branch(t(), atom()) :: t()

Resume a suspended branch by removing it from suspended_branches.