Temporalex Workflow Programming Model

Copy Markdown View Source

Overview

A Temporalex workflow is a single function that reads top-to-bottom as sequential code. Concurrency is introduced through two explicit constructs — receive and parallel — which act as structured concurrency scopes. All async work is bound to the scope that spawned it and must complete before the scope returns.

The design principles:

  1. Workflows are functions. A workflow is a module with a run/1 function. It calls activities, sleeps, waits for signals, and returns a result. There is no implicit event loop or background message processing.

  2. Concurrency is scoped and explicit. The only way to introduce concurrent execution is by entering a receive or parallel block. The keyword {:async, fn, state} must be explicitly returned to spawn concurrent work. Nothing is concurrent by default.

  3. State is what you make it. There is no framework-managed "workflow state" that handlers implicitly share. receive has reducer state (an accumulator). Queries see only what you explicitly publish. These are separate concerns.

  4. Structure determines validity. Which updates and signals a workflow accepts is determined by which receive block it's currently in. An update that arrives when the workflow isn't in a receive expecting it is rejected. The code structure declares what's valid when.


The Primitive Set

Sequential Primitives

These are blocking calls available anywhere in workflow code (in run/1, inside handlers, inside parallel branches):

Activities.Module.function(args) — Execute an activity. Blocks until the activity completes or fails. This is the primary way workflows interact with the outside world.

API.sleep(duration_ms) — Durable timer. Blocks for the specified duration. Survives process restarts. The executor schedules a StartTimer command and the runner resumes when the timer fires.

API.wait_for_signal(name) — Blocks until a signal with the given name arrives. Consumes one signal from the buffer. If a matching signal is already buffered, returns immediately. Signals are a queue — multiple signals with the same name accumulate and are consumed one at a time.

API.side_effect(fn) — Executes a non-deterministic function once and records its return value in history. On replay, the function is not re-executed — the recorded value is returned. Use for things like reading a config value, generating an external ID, or capturing a timestamp for business logic. The function must not fail and must not have side effects beyond computing a value. If you need side effects (network calls, mutations), use an activity instead.

order_number = API.side_effect(fn -> MyApp.Sequences.next("orders") end)
correlation_id = API.side_effect(fn -> UUID.uuid4() end)

API.publish_state(state) — Publishes a state snapshot that queries can read. Non-blocking. Can be called from anywhere. This is the only way to make state visible to queries. Calling it replaces the previously published state entirely.

Versioning

API.patched?(patch_id) — Workflow versioning. Returns true on new executions (emits a SetPatchMarker command). On replay, returns true only if the patch was recorded in history. Use to branch between old and new code paths when changing workflow logic while executions are in-flight.

API.deprecate_patch(patch_id) — Marks a patch as deprecated. Call after all pre-patch executions have completed. Emits a marker but doesn't cause replay failure if missing.

if API.patched?("use-new-pricing") do
  Activities.Pricing.v2(item)
else
  Activities.Pricing.v1(item)
end

Structured Concurrency Hosts

These are blocking calls that can host concurrent work within their scope:

API.receive(state, handlers) — Enters a message-processing loop. Blocks the caller. Dispatches incoming updates and signals to the provided handlers. Returns when a handler signals completion via {:stop, ...} or the timeout expires. All async handlers spawned within this scope must complete before receive returns.

API.parallel(fns) — Executes a list of functions concurrently. Each function runs in its own process and can call activities, sleep, or use other sequential primitives. Blocks until all functions complete. Returns a list of results in the same order as the input functions.

Async-Only Primitives

These are only available inside async handler processes (spawned by {:async, fn, state} within a receive):

API.update_state(fn) — Atomically transforms the enclosing receive block's reducer state. The function receives the current state and returns {result, new_state}. The transformation runs inside the executor and is serialized — concurrent async handlers calling update_state are never interleaved. This is the only way for async handlers to interact with the receive state.


Workflow Structure

A workflow is a module that uses Temporalex.Workflow and defines run/1 and optionally handle_query/3:

defmodule MyApp.Workflows.Checkout do
  use Temporalex.Workflow

  # Queries — always available, operate on last published state
  def handle_query("status", _args, state), do: {:reply, state.phase}
  def handle_query("items", _args, state), do: {:reply, state[:items]}

  def run(args) do
    # Sequential workflow code...
  end
end

run/1 receives the decoded workflow input and returns:

  • {:ok, result} — Workflow completes successfully.
  • {:error, reason} — Workflow fails.
  • {:continue_as_new, args} — Workflow restarts with fresh history and the provided arguments.

handle_query/3 receives the query name, arguments, and the last published state. It returns {:reply, value}. Query handlers are always available and are read-only — they cannot modify state or perform side effects.


Message Types

Signals

Signals are asynchronous, fire-and-forget messages. They are buffered by the executor and never lost. A signal has a name and a payload.

Inside receive: When a signal with a matching handler arrives, the handler is called. Multiple signals with the same name each invoke the handler separately.

Outside receive: Signals accumulate in the executor's buffer. They can be consumed with API.wait_for_signal(name), which pops one signal from the buffer and returns its payload. If no matching signal exists, it blocks until one arrives.

Signals arriving during linear execution (while the workflow is calling an activity, sleeping, etc.) are always buffered. They are never rejected or lost.

Updates

Updates are synchronous, tracked messages. The caller sends an update and waits for a response. An update has a name, arguments, and returns a result to the caller.

Inside receive: When an update with a matching handler arrives, the validator runs first (if defined). If the validator rejects, the caller gets an error and nothing is written to history. If accepted, the handler runs and its return value is sent back to the caller.

Outside receive: Updates are rejected. The caller receives an error indicating the workflow is not accepting that update at this time. This is intentional — the code structure declares when updates are valid.

Queries

Queries are synchronous, read-only requests. They operate on the last state published via API.publish_state and are handled by the module-level handle_query/3 callback. Queries are always available, regardless of whether the workflow is in a receive block.


API.receive

receive is the central construct for message-driven workflow phases. It blocks the workflow function, processes messages, and returns when a handler signals completion.

Signature

result = API.receive(initial_state, opts)
  • initial_state — The starting value for the reducer. Can be any term: a map, integer, list, etc.
  • opts — Keyword list with :update, :signal, and optionally :timeout keys.

Handler Definitions

Handlers run in their own process. They may perform blocking operations (activities, API.parallel, API.sleep) before returning. While a sync handler is running, the receive loop waits for it to complete before dispatching the next message — this guarantees sequential message processing by default. See Async Handlers for concurrent message processing.

Signal handlers receive the signal arguments and current state:

signal: %{
  "name" => fn args, state -> {:noreply, new_state} end,
  "done" => fn _args, state -> {:stop, state} end,
}

Return values:

  • {:noreply, new_state} — Update state, continue processing.
  • {:stop, state} — Exit the receive loop.
  • {:async, fn, state} — Spawn an async handler (see below). The function's return value is ignored (signals have no caller).

Update handlers receive the arguments and current state:

update: %{
  "name" => fn args, state -> {:reply, response, new_state} end,
  "name" => {&handler/2, validator: &validator/2},
}

Return values:

  • {:reply, response, new_state} — Reply to the caller, update state, continue processing.
  • {:stop, response, new_state} — Reply to the caller and exit the receive loop.
  • {:async, fn, state} — Accept the update, spawn an async handler (see below). The function's return value becomes the update reply.

Update validators receive the arguments and current state. They accept or reject:

validator: fn args, state ->
  :ok | {:error, reason}
end

Validators are always synchronous and always run inline in the executor process. They run before the update is accepted into history. If they return {:error, reason}, the update is rejected and no history event is written.

Timeout

case API.receive(state, signal: %{...}, timeout: :timer.hours(24)) do
  {:timeout, state} -> # timed out
  state -> # a handler returned {:stop, state}
end

If no handler returns {:stop, ...} within the timeout, the receive exits with {:timeout, state}, allowing the caller to distinguish between handler-driven completion and timeout. Useful for entity workflows that need periodic continue-as-new.

Completion Semantics

When a handler returns {:stop, ...}:

  1. The receive stops dispatching new messages to handlers.
  2. All in-flight async handlers are allowed to complete (structured concurrency).
  3. The final state (after all async handlers' update_state calls have applied) is returned to the caller.
  4. The workflow function resumes at the line after the receive call.

Async Handlers

By default, all handlers inside receive are synchronous. Sync handlers run in their own process and may call activities, API.parallel, or other blocking operations — but the receive loop waits for each sync handler to complete before dispatching the next message.

To process messages concurrently, return the {:async, fn, state} tuple explicitly. This spawns a background process and allows the receive loop to continue dispatching:

update: %{
  "add_item" => fn [item], state ->
    {:async, fn ->
      {:ok, price} = Activities.Pricing.lookup(item.sku)

      API.update_state(fn s ->
        new_items = [%{sku: item.sku, price: price} | s.items]
        {price, %{s | items: new_items}}
      end)
    end, state}
  end,
}

Semantics

When {:async, fn, state} is returned from an update handler:

  1. The update is immediately accepted (the UpdateAccepted event is written to history).
  2. A new process is spawned for the function. This process can call activities, use API.parallel, call API.update_state, and call API.publish_state.
  3. The return value of the function becomes the update reply sent to the caller. No explicit reply mechanism is needed.
  4. If the function raises, the update fails and the caller receives an error. The workflow continues.
  5. The spawned process is bound to the enclosing receive — it must complete before receive returns.

When {:async, fn, state} is returned from a signal handler:

  1. A new process is spawned for the function with the same capabilities as async update handlers.
  2. The function's return value is ignored (signals have no caller to reply to).
  3. If the function raises, the error is logged. The workflow continues.
  4. The spawned process is bound to the enclosing receive — it must complete before receive returns.

The key difference between sync and async handlers is concurrency, not capability. Both can call activities and do blocking work. Sync handlers serialize message processing (one at a time). Async handlers allow the receive loop to dispatch further messages while they run in the background.

API.update_state

Async handlers do not have direct access to the receive state (they run in a separate process). Instead, they use API.update_state to atomically read-modify-write the state:

result = API.update_state(fn state ->
  {return_value, new_state}
end)

The closure runs inside the executor, serialized with all other state operations. This means:

  • No stale reads. The closure always sees the current state.
  • No races. Concurrent async handlers' update_state calls are applied one at a time.
  • No locks needed. The executor's mailbox serializes everything.

If you need to read state without modifying it, return the state unchanged:

count = API.update_state(fn state -> {length(state.items), state} end)

Constraints

  • {:async, fn, state} can only be returned from handlers inside a receive.
  • Async handlers cannot spawn further async handlers (no {:async, ...} from within an async process).
  • Async handlers cannot enter their own receive loops.
  • Async handlers can call API.parallel for fan-out within the handler.

API.parallel

parallel executes multiple functions concurrently and waits for all of them to complete.

Signature

results = API.parallel([fn1, fn2, fn3])

Each function runs in its own process with access to the executor. Functions can call activities, sleep, use API.publish_state, and nest further API.parallel calls.

Returns a list of results in the same order as the input functions.

Error Semantics

If a branch raises an exception, the other branches continue running until they reach a terminal state (completion or failure). Every branch runs to completion. The result list contains each branch's return value on success, or {:error, reason} if the branch raised:

results = API.parallel([
  fn -> Activities.StepA.run(x) end,   # returns {:ok, "a"}
  fn -> Activities.StepB.run(y) end,   # raises RuntimeError
  fn -> Activities.StepC.run(z) end,   # returns {:ok, "c"}
])
# results == [{:ok, "a"}, {:error, %RuntimeError{...}}, {:ok, "c"}]

Example

def run(args) do
  [{:ok, user}, {:ok, config}] = API.parallel([
    fn -> Activities.Users.fetch(args.user_id) end,
    fn -> Activities.Config.load(args.tenant) end,
  ])

  # Both activities ran concurrently, both are done
  {:ok, %{user: user, config: config}}
end

Where It Works

API.parallel is a blocking call and works anywhere sequential primitives work:

  • In run/1
  • Inside sync and async handlers
  • Inside parallel branches (nested fan-out)
  • Anywhere you'd call an activity

State Model

There are three distinct kinds of "state" in a Temporalex workflow:

1. Local Variables

The function's local variables. Private to the run/1 execution. Not visible to handlers, queries, or anything else. This is the natural state of a sequential function:

def run(args) do
  charge_id = do_charge(args)  # local variable
  # ...
end

2. Receive State (Reducer Accumulator)

The state managed by a receive block. Passed to handlers, updated by handler return values and API.update_state calls. Scoped to the lifetime of one receive call. Returned to the caller when receive exits:

result = API.receive(%{items: [], count: 0}, ...)
# result is the final accumulator value

3. Published State (Query State)

The state visible to query handlers. Set explicitly via API.publish_state. Persists across receive blocks and linear phases. Replaced entirely on each publish:

API.publish_state(%{phase: :open, item_count: 0})
# ... later ...
API.publish_state(%{phase: :charging, item_count: 5})

These three are independent. The receive accumulator is not automatically published. Local variables are not visible to handlers. Published state is not the receive accumulator. The developer controls the boundaries between them.


Nesting Rules

run/1 (sequential, not a concurrency host)
 Activities              
 API.sleep               
 API.wait_for_signal     
 API.side_effect         
 API.publish_state       
 API.patched? / deprecate_patch 
 API.receive               (structured concurrency host)
    sync handlers         (default, runs in own process)
       Activities          
       API.sleep           
       API.parallel        
       API.side_effect     
       API.publish_state   
    {:async, fn, state}   (explicit, concurrent with receive loop)
        Activities          
        API.sleep           
        API.parallel        
        API.side_effect     
        API.update_state    
        API.publish_state   
        {:async, ...}         (cannot nest async)
        API.receive           (cannot nest receive)
 API.parallel              (structured concurrency host)
    Activities          
    API.sleep           
    API.parallel          (nested fan-out)
    API.side_effect     
    API.publish_state   
    {:async, ...}         (parallel branches are not receive)
    API.receive           (not in v1)
 {:async, ...}             (run/1 is not a host)

Return values from run/1:
  {:ok, result}              CompleteWorkflowExecution
  {:error, reason}           FailWorkflowExecution
  {:continue_as_new, args}   ContinueAsNewWorkflowExecution

Examples

Sequential Workflow

A simple three-step workflow with no message processing:

defmodule MyApp.Workflows.Onboarding do
  use Temporalex.Workflow

  def handle_query("status", _args, state), do: {:reply, state}

  def run(%{"user_id" => user_id}) do
    API.publish_state(%{step: :creating_account})
    {:ok, account} = Activities.Accounts.create(user_id)

    API.publish_state(%{step: :sending_welcome})
    {:ok, _} = Activities.Email.send_welcome(account)

    API.publish_state(%{step: :done})
    {:ok, %{account_id: account.id}}
  end
end

Entity Workflow

A long-lived entity that processes messages until told to stop:

defmodule MyApp.Workflows.Counter do
  use Temporalex.Workflow

  def handle_query("value", _args, state), do: {:reply, state}

  def run(_args) do
    API.publish_state(0)

    result = API.receive(0,
      signal: %{
        "increment" => fn _args, count -> {:noreply, count + 1} end,
        "decrement" => fn _args, count -> {:noreply, count - 1} end,
        "done"      => fn _args, count -> {:stop, count} end,
      }
    )

    API.publish_state(result)
    {:ok, result}
  end
end

Multi-Phase Workflow

A shopping cart that transitions between collection, checkout, and confirmation:

defmodule MyApp.Workflows.ShoppingCart do
  use Temporalex.Workflow

  def handle_query("status", _args, state), do: {:reply, state}

  def run(_args) do
    API.publish_state(%{phase: :open, item_count: 0})

    # Phase 1: Collect items
    cart = API.receive(%{items: []},
      update: %{
        "add_item"    => {&do_add_item/2, validator: &validate_sku/2},
        "remove_item" => &do_remove_item/2,
      },
      signal: %{
        "checkout" => fn _args, state -> {:stop, state} end,
      }
    )

    # Phase 2: Charge
    API.publish_state(%{phase: :charging, item_count: length(cart.items)})
    {:ok, total} = Activities.Payment.charge(cart.items)

    # Phase 3: Confirm or cancel
    API.publish_state(%{phase: :confirming, total: total})

    outcome = API.receive(%{confirmed: nil},
      update: %{
        "confirm" => fn _args, state -> {:stop, :ok, %{state | confirmed: true}} end,
        "cancel"  => fn _args, state -> {:stop, :ok, %{state | confirmed: false}} end,
      }
    )

    # Phase 4: Finalize
    if outcome.confirmed do
      API.publish_state(%{phase: :done, total: total})
      {:ok, _} = Activities.Email.send_receipt(total)
      {:ok, %{total: total}}
    else
      API.publish_state(%{phase: :refunded})
      {:ok, _} = Activities.Payment.refund(total)
      {:ok, :cancelled}
    end
  end

  defp do_add_item([item], state) do
    new_items = [item | state.items]
    API.publish_state(%{phase: :open, item_count: length(new_items)})
    {:reply, :ok, %{state | items: new_items}}
  end

  defp validate_sku([item], _state) do
    if valid_sku?(item.sku), do: :ok, else: {:error, "invalid SKU"}
  end

  defp do_remove_item([sku], state) do
    {:reply, :ok, %{state | items: Enum.reject(state.items, &(&1.sku == sku))}}
  end
end

Fan-Out with Parallel

defmodule MyApp.Workflows.BatchProcess do
  use Temporalex.Workflow

  def run(%{"items" => items}) do
    # Process all items concurrently
    results = API.parallel(Enum.map(items, fn item ->
      fn -> Activities.Processor.run(item) end
    end))

    failures = Enum.filter(results, &match?({:error, _}, &1))

    if failures == [] do
      {:ok, %{processed: length(results)}}
    else
      {:error, %{failures: length(failures)}}
    end
  end
end

Async Update Handlers

defmodule MyApp.Workflows.Inventory do
  use Temporalex.Workflow

  def handle_query("stock", _args, state), do: {:reply, state}

  def run(_args) do
    API.publish_state(%{})

    result = API.receive(%{stock: %{}},
      update: %{
        "restock" => fn [item], state ->
          # Async: needs to call an activity to get the price
          {:async, fn ->
            {:ok, price} = Activities.Pricing.current_price(item.sku)

            API.update_state(fn s ->
              new_qty = Map.get(s.stock, item.sku, 0) + item.quantity
              entry = %{quantity: new_qty, price: price}
              new_stock = Map.put(s.stock, item.sku, entry)
              API.publish_state(new_stock)
              {entry, %{s | stock: new_stock}}
            end)
          end, state}
        end,
      },
      signal: %{
        "close" => fn _args, state -> {:stop, state} end,
      }
    )

    {:ok, result.stock}
  end
end

Side Effects

defmodule MyApp.Workflows.OrderProcessing do
  use Temporalex.Workflow

  def run(%{"customer_id" => customer_id}) do
    # These run once and are recorded in history.
    # On replay, the recorded values are returned without re-executing.
    order_number = API.side_effect(fn -> MyApp.Sequences.next("orders") end)
    started_at = API.side_effect(fn -> DateTime.utc_now() end)

    API.publish_state(%{order_number: order_number, status: :processing})

    {:ok, _} = Activities.Orders.process(customer_id, order_number)
    {:ok, %{order_number: order_number, started_at: started_at}}
  end
end

Continue-As-New Entity

defmodule MyApp.Workflows.EventCollector do
  use Temporalex.Workflow

  def handle_query("count", _args, state), do: {:reply, state}

  def run(args) do
    state = args[:state] || %{events: [], generation: 0}
    API.publish_state(length(state.events))

    case API.receive(state,
      signal: %{
        "event" => fn [event], state ->
          {:noreply, %{state | events: [event | state.events]}}
        end,
        "flush" => fn _args, state -> {:stop, state} end,
      },
      timeout: :timer.hours(24)
    ) do
      {:timeout, state} -> state
      state -> state
    end
    |> then(fn state ->
      # Process accumulated events
      {:ok, _} = Activities.EventStore.batch_insert(state.events)

      # Continue with fresh state
      {:continue_as_new, %{state: %{events: [], generation: state.generation + 1}}}
    end)
  end
end

Determinism and Replay

All workflow code must be deterministic. The same inputs (activity results, signal payloads, timer fires) must produce the same sequence of commands. This is enforced by the executor during replay.

What Is Deterministic

  • Activity calls: the executor replays recorded results.
  • Timer fires: replayed from history.
  • Signal arrival order: replayed from history.
  • Update arrival order: replayed from history.
  • API.side_effect return values: recorded in history, returned without re-execution on replay.
  • API.update_state closures: re-executed with the same state (because activity results are the same).
  • API.parallel ordering: branches produce commands with unique sequence numbers, results are collected in deterministic order.

What Is Not Deterministic (Use Side Effects or Activities)

Don'tDo
DateTime.utc_now()API.side_effect(fn -> DateTime.utc_now() end)
:rand.uniform()API.side_effect(fn -> :rand.uniform() end)
UUID.uuid4()API.side_effect(fn -> UUID.uuid4() end)
System.get_env("FOO")API.side_effect(fn -> System.get_env("FOO") end)
HTTPClient.get(url)Use an activity
File.read(path)Use an activity

Mapping to Temporal Core SDK

The programming model maps to the Temporal Core SDK's activation/command protocol:

Temporalex ConstructCore SDK Commands
Activities.Foo.bar()ScheduleActivityResolveActivity
API.sleep(ms)StartTimerFireTimer
API.wait_for_signal(name)No command (executor buffers SignalWorkflow jobs)
API.side_effect(fn)Records result as a SideEffect marker event
API.receiveNo command (executor dispatches SignalWorkflow and DoUpdate jobs to handlers)
{:async, fn, state} (update)UpdateResponse{accepted} immediately, then handler's commands, then UpdateResponse{completed}
{:async, fn, state} (signal)No protocol-level tracking — handler's commands are regular commands
API.parallel(fns)Multiple commands in one activation (e.g. multiple ScheduleActivity)
API.publish_stateNo command (executor state for query serving)
API.update_stateNo command (executor-internal state transformation)
{:continue_as_new, args}ContinueAsNewWorkflowExecution
API.patched?(id)SetPatchMarker (or reads NotifyHasPatch from activation)
API.deprecate_patch(id)SetPatchMarker with deprecated flag

The executor allocates unique sequence numbers across all concurrent processes (runner, async handlers, parallel branches) and routes Resolve* jobs back to the correct process. The Core SDK sees a flat stream of commands and has no knowledge of the concurrency model.