Flows — async workflow orchestration

Copy Markdown View Source

Caravela.Flow is a small DSL + GenServer runtime for composable async workflows: debouncing, retries, sagas, parallel tasks. It is deliberately scoped to ephemeral orchestration — no event sourcing, no CQRS, no read-model projections. Teams needing append-only logs should reach for Commanded; flows deliberately keep out of that territory so the CRUD half of Caravela stays clean.

Flows pair naturally with Caravela.Live.* + LiveSvelte: flow state changes go out via a :notify pid, a LiveView re-assigns socket state, LiveSvelte pushes the prop diff to the Svelte component, and the UI reacts. No custom WebSocket code.

The DSL

defmodule MyApp.Flows.BookSyncFlow do
  use Caravela.Flow

  flow :sync_book, initial_state: %{dirty: false, book_id: nil} do
    repeat do
      wait_until fn state -> state.dirty end
      debounce 500

      sequence do
        set_state fn state -> %{state | dirty: :processing} end

        run fn state ->
          case MyApp.ExternalAPI.sync_book(state.book_id) do
            :ok                   -> {:ok, %{state | dirty: false}}
            {:error, :timeout}    -> {:retry, state}
            {:error, reason}      -> {:error, reason}
          end
        end, retries: 3, backoff: :exponential, base_delay: 200
      end
    end
  end

  flow :import_books, initial_state: %{urls: [], results: []} do
    sequence do
      parallel fn state ->
        Enum.map(state.urls, fn url ->
          fn -> MyApp.ExternalAPI.fetch_book(url) end
        end)
      end, collect_as: :fetched

      each :fetched, fn book_data, state ->
        case MyApp.Library.V1.create_book(book_data) do
          {:ok, book} -> {:ok, %{state | results: [book | state.results]}}
          {:error, reason} -> {:skip, reason}
        end
      end
    end
  end
end

Each flow/3 block compiles into a step tree rooted at a Sequence. Nested repeat/sequence blocks produce nested structs; every step's fun is captured as-is and invoked at runtime.

Step primitives

Every primitive maps onto a struct in Caravela.Flow.Steps. The Ballerina equivalents are listed in the module doc and below.

DSLStructBallerinaDescription
sequence do ... endSequenceCo.SeqRun inner steps in order
repeat do ... endRepeatCo.RepeatLoop the inner step forever
wait msWaitCo.WaitPause for a fixed duration
wait_until funWaitUntilCo.WhileBlock until fun.(state) is truthy; unblocks on signal/2
debounce msDebounceWait for ms of state-stability (resets on signal)
set_state funSetStateCo.SetStateSynchronously replace state
run fun, retries:, backoff:, base_delay:RunCo.AwaitInvoke async work with optional retry + backoff
parallel fun, collect_as:ParallelCo.AllRun a list of zero-arity funs concurrently, collect results
race tasks, collect_as:RaceCo.AnyRun tasks concurrently, keep the first result
each :key, funEachCo.ForIterate a collection already in state

run/2 understands the following return shapes:

  • :ok — advance, state unchanged
  • {:ok, new_state} — advance with new state
  • {:retry, new_state} — retry (decrements retries)
  • {:error, reason} — retry; if retries == 0, fail the flow

each/2 understands {:ok, state}, {:skip, reason}, and {:error, reason}. :skip keeps iterating; :error aborts the whole flow.

Starting, signalling, reading state

{:ok, pid} =
  Caravela.Flow.start(MyApp.Flows.BookSyncFlow, :sync_book,
    initial_state: %{dirty: false, book_id: "abc"},
    notify: self())

# Flip `dirty` to kick the sync from outside
Caravela.Flow.signal(pid, fn state -> %{state | dirty: true} end)

# Read current state at any time
Caravela.Flow.get_state(pid) #=> %{dirty: :processing, book_id: "abc"}

# Stop when you're done
Caravela.Flow.stop(pid)

The :notify pid receives three message shapes:

  • {:flow_state, new_state} — whenever state changes
  • {:flow_done, final_state} — when the flow completes normally
  • {:flow_error, reason} — on {:error, _} from run/each

Supervising flows

In production, start Caravela.Flow.Supervisor as part of your application tree:

children = [
  # ... Ecto Repo, Endpoint, PubSub, etc. ...
  Caravela.Flow.Supervisor
]

Supervisor.start_link(children, strategy: :one_for_one)

With the supervisor running, every Caravela.Flow.start/3 call attaches the new runner as a supervised child. Without it, start/3 falls back to an unsupervised start_link — handy in tests and tooling.

The real-time loop — Flow → LiveView → LiveSvelte → Svelte

Flow (GenServer)
   {:flow_state, %{dirty: :processing}}
   via `:notify`
  
LiveView (handle_info)
   assign(socket, sync_status: state.dirty)
   LiveSvelte auto-pushes the prop change
  
Svelte Component
   export let sync_status: string;
   $: statusLabel = derive(sync_status);
  
  Reactive DOM update  no manual WebSocket code.

LiveView side

defmodule MyAppWeb.Library.BookSyncLive do
  use MyAppWeb, :live_view

  def mount(_params, _session, socket) do
    {:ok, pid} =
      Caravela.Flow.start(MyApp.Flows.BookSyncFlow, :sync_book,
        initial_state: %{dirty: false, book_id: socket.assigns.book.id},
        notify: self())

    {:ok, assign(socket, flow_pid: pid, sync_status: :idle)}
  end

  def handle_info({:flow_state, state}, socket) do
    {:noreply, assign(socket, sync_status: state.dirty)}
  end

  def handle_info({:flow_done, _}, socket), do: {:noreply, socket}

  def render(assigns) do
    ~H"""
    <LiveSvelte.render name="library/BookSync" props={%{sync_status: @sync_status}} />
    """
  end
end

Svelte side

<script lang="ts">
  export let sync_status: string | boolean = "idle";

  $: statusLabel =
    sync_status === "idle"       ? "Ready" :
    sync_status === :processing  ? "Syncing…" :
    sync_status === false        ? "Synced ✓" : "Unknown";
</script>

<div class="sync-indicator" class:syncing={sync_status === :processing}>
  {statusLabel}
</div>

The Svelte component declares a prop and reacts — no socket.on, no subscriptions, no ad-hoc WebSocket plumbing. LiveView + LiveSvelte already own the transport; flows just feed it.

Scope boundary (no event sourcing)

Flows are ephemeral async orchestration — live only while the runner process is alive, state is in-memory only, and there is no persistent log. If a runner crashes, the flow starts over from initial state (or from the state you persisted externally). That's the tradeoff for a tiny, predictable runtime.

If you need durable event streams, projections, or CQRS, use Commanded — Caravela intentionally does not compete with it.