Caravela.Flow is a small DSL + GenServer runtime for composable
async workflows: debouncing, retries, sagas, parallel tasks. It is
deliberately scoped to ephemeral orchestration — no event
sourcing, no CQRS, no read-model projections. Teams needing
append-only logs should reach for
Commanded; flows deliberately
keep out of that territory so the CRUD half of Caravela stays clean.
Flows pair naturally with Caravela.Live.* + LiveSvelte: flow state
changes go out via a :notify pid, a LiveView re-assigns socket
state, LiveSvelte pushes the prop diff to the Svelte component, and
the UI reacts. No custom WebSocket code.
The DSL
defmodule MyApp.Flows.BookSyncFlow do
use Caravela.Flow
flow :sync_book, initial_state: %{dirty: false, book_id: nil} do
repeat do
wait_until fn state -> state.dirty end
debounce 500
sequence do
set_state fn state -> %{state | dirty: :processing} end
run fn state ->
case MyApp.ExternalAPI.sync_book(state.book_id) do
:ok -> {:ok, %{state | dirty: false}}
{:error, :timeout} -> {:retry, state}
{:error, reason} -> {:error, reason}
end
end, retries: 3, backoff: :exponential, base_delay: 200
end
end
end
flow :import_books, initial_state: %{urls: [], results: []} do
sequence do
parallel fn state ->
Enum.map(state.urls, fn url ->
fn -> MyApp.ExternalAPI.fetch_book(url) end
end)
end, collect_as: :fetched
each :fetched, fn book_data, state ->
case MyApp.Library.V1.create_book(book_data) do
{:ok, book} -> {:ok, %{state | results: [book | state.results]}}
{:error, reason} -> {:skip, reason}
end
end
end
end
endEach flow/3 block compiles into a step tree rooted at a Sequence.
Nested repeat/sequence blocks produce nested structs; every
step's fun is captured as-is and invoked at runtime.
Step primitives
Every primitive maps onto a struct in Caravela.Flow.Steps. The
Ballerina equivalents are listed in the module doc and below.
| DSL | Struct | Ballerina | Description |
|---|---|---|---|
sequence do ... end | Sequence | Co.Seq | Run inner steps in order |
repeat do ... end | Repeat | Co.Repeat | Loop the inner step forever |
wait ms | Wait | Co.Wait | Pause for a fixed duration |
wait_until fun | WaitUntil | Co.While | Block until fun.(state) is truthy; unblocks on signal/2 |
debounce ms | Debounce | Wait for ms of state-stability (resets on signal) | |
set_state fun | SetState | Co.SetState | Synchronously replace state |
run fun, retries:, backoff:, base_delay: | Run | Co.Await | Invoke async work with optional retry + backoff |
parallel fun, collect_as: | Parallel | Co.All | Run a list of zero-arity funs concurrently, collect results |
race tasks, collect_as: | Race | Co.Any | Run tasks concurrently, keep the first result |
each :key, fun | Each | Co.For | Iterate a collection already in state |
run/2 understands the following return shapes:
:ok— advance, state unchanged{:ok, new_state}— advance with new state{:retry, new_state}— retry (decrementsretries){:error, reason}— retry; ifretries == 0, fail the flow
each/2 understands {:ok, state}, {:skip, reason}, and
{:error, reason}. :skip keeps iterating; :error aborts the
whole flow.
Starting, signalling, reading state
{:ok, pid} =
Caravela.Flow.start(MyApp.Flows.BookSyncFlow, :sync_book,
initial_state: %{dirty: false, book_id: "abc"},
notify: self())
# Flip `dirty` to kick the sync from outside
Caravela.Flow.signal(pid, fn state -> %{state | dirty: true} end)
# Read current state at any time
Caravela.Flow.get_state(pid) #=> %{dirty: :processing, book_id: "abc"}
# Stop when you're done
Caravela.Flow.stop(pid)The :notify pid receives three message shapes:
{:flow_state, new_state}— whenever state changes{:flow_done, final_state}— when the flow completes normally{:flow_error, reason}— on{:error, _}fromrun/each
Supervising flows
In production, start Caravela.Flow.Supervisor as part of your
application tree:
children = [
# ... Ecto Repo, Endpoint, PubSub, etc. ...
Caravela.Flow.Supervisor
]
Supervisor.start_link(children, strategy: :one_for_one)With the supervisor running, every Caravela.Flow.start/3 call
attaches the new runner as a supervised child. Without it,
start/3 falls back to an unsupervised start_link — handy in tests
and tooling.
The real-time loop — Flow → LiveView → LiveSvelte → Svelte
Flow (GenServer)
│ {:flow_state, %{dirty: :processing}}
│ via `:notify`
▼
LiveView (handle_info)
│ assign(socket, sync_status: state.dirty)
│ LiveSvelte auto-pushes the prop change
▼
Svelte Component
│ export let sync_status: string;
│ $: statusLabel = derive(sync_status);
▼
Reactive DOM update — no manual WebSocket code.LiveView side
defmodule MyAppWeb.Library.BookSyncLive do
use MyAppWeb, :live_view
def mount(_params, _session, socket) do
{:ok, pid} =
Caravela.Flow.start(MyApp.Flows.BookSyncFlow, :sync_book,
initial_state: %{dirty: false, book_id: socket.assigns.book.id},
notify: self())
{:ok, assign(socket, flow_pid: pid, sync_status: :idle)}
end
def handle_info({:flow_state, state}, socket) do
{:noreply, assign(socket, sync_status: state.dirty)}
end
def handle_info({:flow_done, _}, socket), do: {:noreply, socket}
def render(assigns) do
~H"""
<LiveSvelte.render name="library/BookSync" props={%{sync_status: @sync_status}} />
"""
end
endSvelte side
<script lang="ts">
export let sync_status: string | boolean = "idle";
$: statusLabel =
sync_status === "idle" ? "Ready" :
sync_status === :processing ? "Syncing…" :
sync_status === false ? "Synced ✓" : "Unknown";
</script>
<div class="sync-indicator" class:syncing={sync_status === :processing}>
{statusLabel}
</div>The Svelte component declares a prop and reacts — no socket.on,
no subscriptions, no ad-hoc WebSocket plumbing. LiveView + LiveSvelte
already own the transport; flows just feed it.
Scope boundary (no event sourcing)
Flows are ephemeral async orchestration — live only while the runner process is alive, state is in-memory only, and there is no persistent log. If a runner crashes, the flow starts over from initial state (or from the state you persisted externally). That's the tradeoff for a tiny, predictable runtime.
If you need durable event streams, projections, or CQRS, use Commanded — Caravela intentionally does not compete with it.