This Livebook focuses on the authoring surface: workflow DSL structure, normalized workflow specs, dependency joins, input mappings, execution, and graph inspection.

It uses in-memory ETS journal storage so you can run it without a host app.

Mix.install([
  {:squidie, "~> 0.1.2"}
])

Runtime Setup

The runtime options below keep all durable facts inside this Livebook session.

storage = {Jido.Storage.ETS, table: :squidie_workflow_authoring_livebook}

defmodule SquidieAuthoringLivebook.Repo do
end

Application.put_env(:squidie, :repo, SquidieAuthoringLivebook.Repo)
Application.put_env(:squidie, :queue, "workflow-authoring")

opts = [
  runtime: :journal,
  journal_storage: storage,
  queue: "workflow-authoring"
]

defmodule SquidieAuthoringLivebook.Output do
  def step(step), do: Map.take(step, [:name, :module, :opts])

  def attempt(attempt) do
    Map.take(attempt, [
      :step,
      :status,
      :attempt_number,
      :visible_at,
      :applied?,
      :wakeup_emitted?
    ])
  end

  def node(node), do: Map.take(node, [:id, :status, :current?])

  def edge(edge) do
    Map.take(edge, [:id, :from, :to, :type, :status, :selected?, :pending?])
  end
end

Define Step Modules

Prefer native Squidie.Step modules for application steps. The workflow DSL stays readable, while each step keeps its own input and output contract.

defmodule SquidieAuthoringLivebook.ScoutWestRoad do
  use Squidie.Step,
    name: :scout_west_road,
    description: "Scouts the western road",
    input_schema: [
      bearer: [type: :string, required: true],
      west_mark: [type: :string, required: true],
      mountain_mark: [type: :string, required: true]
    ],
    output_schema: [
      west_road: [type: :map, required: true]
    ]

  @impl Squidie.Step
  def run(
        %{bearer: bearer, west_mark: west_mark, mountain_mark: mountain_mark},
        %Squidie.Step.Context{}
      ) do
    {:ok,
     %{
       west_road: %{
         bearer: bearer,
         map_mark: west_mark,
         mountain_mark: mountain_mark,
         distance_leagues: 12,
         danger: "watched"
       }
     }}
  end
end

defmodule SquidieAuthoringLivebook.ScoutMountainPass do
  use Squidie.Step,
    name: :scout_mountain_pass,
    description: "Scouts the mountain pass",
    input_schema: [
      bearer: [type: :string, required: true],
      mark: [type: :string, required: true]
    ],
    output_schema: [
      mountain_pass: [type: :map, required: true]
    ]

  @impl Squidie.Step
  def run(%{bearer: bearer, mark: mark}, %Squidie.Step.Context{}) do
    {:ok,
     %{
       mountain_pass: %{
         bearer: bearer,
         map_mark: mark,
         snow_depth: 3,
         danger: "storms"
       }
     }}
  end
end

defmodule SquidieAuthoringLivebook.ChooseRoute do
  use Squidie.Step,
    name: :choose_route,
    description: "Chooses a route from joined scouting reports",
    input_schema: [
      bearer: [type: :string, required: true],
      west_danger: [type: :string, required: true],
      west_distance_leagues: [type: :integer, required: true],
      mountain_danger: [type: :string, required: true],
      mountain_snow_depth: [type: :integer, required: true]
    ],
    output_schema: [
      route_plan: [type: :map, required: true]
    ]

  @impl Squidie.Step
  def run(input, %Squidie.Step.Context{}) do
    {:ok,
     %{
       route_plan: %{
         bearer: input.bearer,
         route: "moria",
         reason:
           "the western road is #{input.west_danger} and the pass has #{input.mountain_snow_depth} feet of snow",
         compared: %{
           west_distance_leagues: input.west_distance_leagues,
           west_danger: input.west_danger,
           mountain_danger: input.mountain_danger,
           mountain_snow_depth: input.mountain_snow_depth
         }
       }
     }}
  end
end

Define A Dependency Workflow

This workflow has one root step, one dependent scout, and one join step:

  • :scout_west_road consumes the bearer and nested map marks from the payload
  • :scout_mountain_pass waits for the western scout and consumes data from its output
  • :choose_route waits for both scouts and maps nested context into the input shape it needs
defmodule SquidieAuthoringLivebook.RoutePlanningWorkflow do
  use Squidie.Workflow

  workflow do
    trigger :plan_errand do
      manual()

      payload do
        field :bearer, :string, default: "Frodo"
        field :map_marks, :map,
          default: %{west_road: "watched", mountain_pass: "snow"}
      end
    end

    step :scout_west_road, SquidieAuthoringLivebook.ScoutWestRoad,
      input: [
        bearer: [:bearer],
        west_mark: [:map_marks, :west_road],
        mountain_mark: [:map_marks, :mountain_pass]
      ]

    step :scout_mountain_pass, SquidieAuthoringLivebook.ScoutMountainPass,
      after: [:scout_west_road],
      input: [
        bearer: [:west_road, :bearer],
        mark: [:west_road, :mountain_mark]
      ]

    step :choose_route, SquidieAuthoringLivebook.ChooseRoute,
      after: [:scout_west_road, :scout_mountain_pass],
      input: [
        bearer: [:west_road, :bearer],
        west_danger: [:west_road, :danger],
        west_distance_leagues: [:west_road, :distance_leagues],
        mountain_danger: [:mountain_pass, :danger],
        mountain_snow_depth: [:mountain_pass, :snow_depth]
      ]
  end
end

Inspect The Normalized Spec

The DSL compiles into a normalized workflow spec. Tooling can inspect this shape without parsing source code.

{:ok, spec} =
  Squidie.Workflow.to_spec(SquidieAuthoringLivebook.RoutePlanningWorkflow)

%{
  workflow: spec.workflow,
  triggers: Enum.map(spec.triggers, &Map.take(&1, [:name, :type, :payload])),
  payload: spec.payload,
  entry_steps: spec.entry_steps,
  steps: Enum.map(spec.steps, &SquidieAuthoringLivebook.Output.step/1),
  transitions: spec.transitions
}

Notice that dependency workflows do not need success transitions. entry_steps shows the root step, and later work is declared with after: [...].

Visual-editor clients can use the JSON-safe editor projection when they need to inspect or preview a draft workflow without starting it:

editor_map =
  spec
  |> Squidie.Workflow.EditorSpec.to_map()
  |> Jason.encode!()
  |> Jason.decode!()

:ok = Squidie.Workflow.EditorSpec.validate_map(editor_map)
{:ok, draft_graph} = Squidie.Workflow.EditorSpec.preview_graph(editor_map)
{:ok, draft_diff} = Squidie.Workflow.EditorSpec.diff(spec, editor_map)

Map.take(draft_graph, ["source", "status", "workflow"])

If the editor map uses runtime-authored top-level action keys, pass the host registry to validate_map/2 and preview_graph/2 before accepting the draft graph. Use diff/2 or diff/3 when a service needs to inspect what changed between a source spec and an edited draft.

Start A Run

Manual triggers can start through Squidie.start/3 when the workflow has one trigger.

{:ok, started} =
  Squidie.start(
    SquidieAuthoringLivebook.RoutePlanningWorkflow,
    %{
      bearer: "Frodo",
      map_marks: %{west_road: "watched", mountain_pass: "snow"}
    },
    opts
  )

%{
  run_id: started.run_id,
  status: started.status,
  reason: started.reason,
  visible_attempts: Enum.map(started.visible_attempts, &SquidieAuthoringLivebook.Output.attempt/1),
  planned_runnable_keys: started.planned_runnable_keys
}

The first snapshot has visible work for the root step. Later dependency steps become visible only after their prerequisites complete.

Drain Visible Work

Each call to Squidie.execute_next/1 claims one visible attempt, executes the step, records the result, and returns the updated snapshot.

worker_opts = Keyword.put(opts, :owner_id, "authoring-livebook-worker")

{:ok, first_scout} = Squidie.execute_next(worker_opts)
{:ok, second_scout} = Squidie.execute_next(worker_opts)
{:ok, completed} = Squidie.execute_next(worker_opts)
{:ok, :none} = Squidie.execute_next(worker_opts)

%{
  first_step: List.last(first_scout.applied_runnable_keys),
  second_step: List.last(second_scout.applied_runnable_keys),
  final_status: completed.status,
  final_reason: completed.reason,
  context: completed.context,
  attempts: Enum.map(completed.attempts, &SquidieAuthoringLivebook.Output.attempt/1)
}

The join step receives only the mapped values declared in the workflow. The full run context remains durable and inspectable.

Inspect And Explain

Inspection answers what durable state exists. Explanation answers why the run is in its current state and which action would make progress.

{:ok, inspected} = Squidie.inspect_run(started.run_id, opts)
{:ok, explanation} = Squidie.explain_run(started.run_id, opts)

%{
  inspected_status: inspected.status,
  inspected_context: inspected.context,
  explanation_reason: explanation.reason,
  explanation_summary: explanation.summary,
  explanation_evidence: explanation.evidence
}

Inspect The Graph

Graph inspection turns the same durable state into node and edge output for host UIs.

{:ok, graph} = Squidie.inspect_run_graph(started.run_id, opts)
payload = Squidie.Runs.GraphInspection.to_map(graph)

%{
  status: payload.status,
  current_node_ids: payload.current_node_ids,
  nodes: Enum.map(payload.nodes, &SquidieAuthoringLivebook.Output.node/1),
  edges: Enum.map(payload.edges, &SquidieAuthoringLivebook.Output.edge/1),
  child_links: payload.child_links
}

For the complete node, edge, and child-link contract, see Graph inspection contract.

Try Changing The Workflow

Useful edits to try:

  • add a retry policy to :scout_mountain_pass
  • add a second join step that depends on :choose_route
  • remove one input mapping path and observe the structured validation failure
  • switch to a transition-based workflow when the shape is a straight line

Read next: