# Flows — async workflow orchestration

`Caravela.Flow` is a small DSL + GenServer runtime for composable
async workflows: debouncing, retries, sagas, parallel tasks. It is
**deliberately scoped** to ephemeral orchestration — no event
sourcing, no CQRS, no read-model projections. Teams needing
append-only logs should reach for
[Commanded](https://github.com/commanded/commanded); flows deliberately
keep out of that territory so the CRUD half of Caravela stays clean.

Flows pair naturally with `Caravela.Live.*` + LiveSvelte: flow state
changes go out via a `:notify` pid, a LiveView re-assigns socket
state, LiveSvelte pushes the prop diff to the Svelte component, and
the UI reacts. No custom WebSocket code.

## The DSL

```elixir
defmodule MyApp.Flows.BookSyncFlow do
  use Caravela.Flow

  flow :sync_book, initial_state: %{dirty: false, book_id: nil} do
    repeat do
      wait_until fn state -> state.dirty end
      debounce 500

      sequence do
        set_state fn state -> %{state | dirty: :processing} end

        run fn state ->
          case MyApp.ExternalAPI.sync_book(state.book_id) do
            :ok                   -> {:ok, %{state | dirty: false}}
            {:error, :timeout}    -> {:retry, state}
            {:error, reason}      -> {:error, reason}
          end
        end, retries: 3, backoff: :exponential, base_delay: 200
      end
    end
  end

  flow :import_books, initial_state: %{urls: [], results: []} do
    sequence do
      parallel fn state ->
        Enum.map(state.urls, fn url ->
          fn -> MyApp.ExternalAPI.fetch_book(url) end
        end)
      end, collect_as: :fetched

      each :fetched, fn book_data, state ->
        case MyApp.Library.V1.create_book(book_data) do
          {:ok, book} -> {:ok, %{state | results: [book | state.results]}}
          {:error, reason} -> {:skip, reason}
        end
      end
    end
  end
end
```

Each `flow/3` block compiles into a step tree rooted at a `Sequence`.
Nested `repeat`/`sequence` blocks produce nested structs; every
step's fun is captured as-is and invoked at runtime.

## Step primitives

Every primitive maps onto a struct in `Caravela.Flow.Steps`. The
Ballerina equivalents are listed in the module doc and below.

| DSL                              | Struct         | Ballerina   | Description                                                   |
| -------------------------------- | -------------- | ----------- | ------------------------------------------------------------- |
| `sequence do ... end`            | `Sequence`     | `Co.Seq`    | Run inner steps in order                                      |
| `repeat do ... end`              | `Repeat`       | `Co.Repeat` | Loop the inner step forever                                   |
| `wait ms`                        | `Wait`         | `Co.Wait`   | Pause for a fixed duration                                    |
| `wait_until fun`                 | `WaitUntil`    | `Co.While`  | Block until `fun.(state)` is truthy; unblocks on `signal/2`   |
| `debounce ms`                    | `Debounce`     |             | Wait for `ms` of state-stability (resets on signal)           |
| `set_state fun`                  | `SetState`     | `Co.SetState` | Synchronously replace state                                 |
| `run fun, retries:, backoff:, base_delay:` | `Run`  | `Co.Await`  | Invoke async work with optional retry + backoff               |
| `parallel fun, collect_as:`      | `Parallel`     | `Co.All`    | Run a list of zero-arity funs concurrently, collect results   |
| `race tasks, collect_as:`        | `Race`         | `Co.Any`    | Run tasks concurrently, keep the first result                 |
| `each :key, fun`                 | `Each`         | `Co.For`    | Iterate a collection already in state                         |

`run/2` understands the following return shapes:

- `:ok` — advance, state unchanged
- `{:ok, new_state}` — advance with new state
- `{:retry, new_state}` — retry (decrements `retries`)
- `{:error, reason}` — retry; if `retries == 0`, fail the flow

`each/2` understands `{:ok, state}`, `{:skip, reason}`, and
`{:error, reason}`. `:skip` keeps iterating; `:error` aborts the
whole flow.

## Starting, signalling, reading state

```elixir
{:ok, pid} =
  Caravela.Flow.start(MyApp.Flows.BookSyncFlow, :sync_book,
    initial_state: %{dirty: false, book_id: "abc"},
    notify: self())

# Flip `dirty` to kick the sync from outside
Caravela.Flow.signal(pid, fn state -> %{state | dirty: true} end)

# Read current state at any time
Caravela.Flow.get_state(pid) #=> %{dirty: :processing, book_id: "abc"}

# Stop when you're done
Caravela.Flow.stop(pid)
```

The `:notify` pid receives three message shapes:

- `{:flow_state, new_state}` — whenever state changes
- `{:flow_done, final_state}` — when the flow completes normally
- `{:flow_error, reason}` — on `{:error, _}` from `run/each`

## Supervising flows

In production, start `Caravela.Flow.Supervisor` as part of your
application tree:

```elixir
children = [
  # ... Ecto Repo, Endpoint, PubSub, etc. ...
  Caravela.Flow.Supervisor
]

Supervisor.start_link(children, strategy: :one_for_one)
```

With the supervisor running, every `Caravela.Flow.start/3` call
attaches the new runner as a supervised child. Without it,
`start/3` falls back to an unsupervised `start_link` — handy in tests
and tooling.

## The real-time loop — Flow → LiveView → LiveSvelte → Svelte

```
Flow (GenServer)
  │ {:flow_state, %{dirty: :processing}}
  │ via `:notify`
  ▼
LiveView (handle_info)
  │ assign(socket, sync_status: state.dirty)
  │ LiveSvelte auto-pushes the prop change
  ▼
Svelte Component
  │ let { sync_status } = $props();
  │ const statusLabel = $derived(derive(sync_status));
  ▼
  Reactive DOM update — no manual WebSocket code.
```

### LiveView side

```elixir
defmodule MyAppWeb.Library.BookSyncLive do
  use MyAppWeb, :live_view

  def mount(_params, _session, socket) do
    {:ok, pid} =
      Caravela.Flow.start(MyApp.Flows.BookSyncFlow, :sync_book,
        initial_state: %{dirty: false, book_id: socket.assigns.book.id},
        notify: self())

    {:ok, assign(socket, flow_pid: pid, sync_status: :idle)}
  end

  def handle_info({:flow_state, state}, socket) do
    {:noreply, assign(socket, sync_status: state.dirty)}
  end

  def handle_info({:flow_done, _}, socket), do: {:noreply, socket}

  def render(assigns) do
    ~H"""
    <CaravelaSvelte.svelte name="library/BookSync" props={%{sync_status: @sync_status}} socket={@socket} />
    """
  end
end
```

### Svelte side

```svelte
<script lang="ts">
  let { sync_status = "idle" }: { sync_status?: string | boolean } = $props();

  const statusLabel = $derived(
    sync_status === "idle"       ? "Ready" :
    sync_status === "processing" ? "Syncing…" :
    sync_status === false        ? "Synced ✓" : "Unknown"
  );
</script>

<div class="sync-indicator" class:syncing={sync_status === "processing"}>
  {statusLabel}
</div>
```

The Svelte component declares a prop and reacts — no `socket.on`,
no subscriptions, no ad-hoc WebSocket plumbing. LiveView + LiveSvelte
already own the transport; flows just feed it.

## Scope boundary (no event sourcing)

Flows are **ephemeral async orchestration** — live only while the
runner process is alive, state is in-memory only, and there is no
persistent log. If a runner crashes, the flow starts over from
initial state (or from the state you persisted externally). That's
the tradeoff for a tiny, predictable runtime.

If you need durable event streams, projections, or CQRS, use
[Commanded](https://github.com/commanded/commanded) — Caravela
intentionally does not compete with it.
