Durable (Durable v0.1.0-rc)

View Source

A durable, resumable workflow engine for Elixir.

Durable provides a clean DSL for defining workflows with built-in support for:

  • Resumability: Sleep, wait for events, wait for human input
  • Reliability: Automatic retries with configurable backoff strategies
  • Observability: Built-in log capture and graph visualization
  • Composability: Decision steps, loops, parallel execution, and more

Installation

Add Durable to your supervision tree:

defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    children = [
      MyApp.Repo,
      {Durable, repo: MyApp.Repo}
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Create a migration for Durable tables:

defmodule MyApp.Repo.Migrations.AddDurable do
  use Ecto.Migration

  def up, do: Durable.Migration.up()
  def down, do: Durable.Migration.down()
end

Quick Start

Define a workflow using the DSL:

defmodule MyApp.OrderWorkflow do
  use Durable
  use Durable.Helpers

  workflow "process_order", timeout: hours(2) do
    step :validate, fn order ->
      {:ok, %{order_id: order["id"], items: order["items"]}}
    end

    step :charge, [retry: [max_attempts: 3, backoff: :exponential]], fn data ->
      receipt = PaymentService.charge(data.order_id)
      {:ok, assign(data, :receipt, receipt)}
    end
  end
end

Start a workflow:

{:ok, workflow_id} = Durable.start(MyApp.OrderWorkflow, %{order: order})

Query execution status:

{:ok, execution} = Durable.get_execution(workflow_id)

Configuration Options

  • :repo - The Ecto repo module (required)
  • :name - Instance name for multiple Durable instances (default: Durable)
  • :prefix - PostgreSQL schema name (default: "durable")
  • :queues - Queue configuration map
  • :queue_enabled - Enable/disable queue processing (default: true)

See Durable.Config for the complete list of options.

Summary

Functions

Injects the Durable DSL into the calling module.

Cancels a running or pending workflow.

Returns a child specification for Durable.

Deletes a scheduled workflow.

Disables a scheduled workflow.

Enables a scheduled workflow.

Gets the execution details for a workflow.

Gets a scheduled workflow by name.

Lists child workflow executions for a parent workflow.

Lists workflow executions with optional filters.

Lists scheduled workflows.

Provides input for a waiting workflow (human-in-the-loop).

Creates a new scheduled workflow.

Sends an event to a waiting workflow.

Starts a new workflow execution.

Starts a Durable instance.

Triggers a scheduled workflow immediately.

Updates a scheduled workflow.

Functions

__using__(opts)

(macro)

Injects the Durable DSL into the calling module.

Usage

defmodule MyApp.OrderWorkflow do
  use Durable
  use Durable.Helpers

  workflow "process_order" do
    step :validate, fn data ->
      {:ok, %{order_id: data["id"]}}
    end
  end
end

cancel(workflow_id, reason \\ nil)

@spec cancel(String.t(), String.t() | nil) :: :ok | {:error, term()}

Cancels a running or pending workflow.

Examples

:ok = Durable.cancel(workflow_id)
:ok = Durable.cancel(workflow_id, "User requested cancellation")

child_spec(opts)

@spec child_spec(keyword()) :: Supervisor.child_spec()

Returns a child specification for Durable.

This allows Durable to be used in supervision trees with the {Durable, opts} syntax.

delete_schedule(name, opts \\ [])

@spec delete_schedule(
  String.t(),
  keyword()
) :: :ok | {:error, :not_found}

Deletes a scheduled workflow.

Examples

:ok = Durable.delete_schedule("daily_report")

disable_schedule(name, opts \\ [])

@spec disable_schedule(
  String.t(),
  keyword()
) :: {:ok, term()} | {:error, term()}

Disables a scheduled workflow.

Examples

{:ok, schedule} = Durable.disable_schedule("daily_report")

enable_schedule(name, opts \\ [])

@spec enable_schedule(
  String.t(),
  keyword()
) :: {:ok, term()} | {:error, term()}

Enables a scheduled workflow.

Examples

{:ok, schedule} = Durable.enable_schedule("daily_report")

get_execution(workflow_id, opts \\ [])

@spec get_execution(
  String.t(),
  keyword()
) :: {:ok, map()} | {:error, term()}

Gets the execution details for a workflow.

Options

  • :include_steps - Include step execution details (default: false)
  • :include_logs - Include logs for each step (default: false)

Examples

{:ok, execution} = Durable.get_execution(workflow_id)

get_schedule(name, opts \\ [])

@spec get_schedule(
  String.t(),
  keyword()
) :: {:ok, term()} | {:error, :not_found}

Gets a scheduled workflow by name.

Examples

{:ok, schedule} = Durable.get_schedule("daily_report")

list_children(parent_workflow_id, opts \\ [])

@spec list_children(
  String.t(),
  keyword()
) :: [map()]

Lists child workflow executions for a parent workflow.

Options

  • :status - Filter by status

Examples

children = Durable.list_children(parent_workflow_id)
running_children = Durable.list_children(parent_workflow_id, status: :running)

list_executions(filters \\ [])

@spec list_executions(keyword()) :: [map()]

Lists workflow executions with optional filters.

Filters

  • :workflow - Filter by workflow module
  • :status - Filter by status
  • :queue - Filter by queue
  • :limit - Maximum number of results (default: 50)

Examples

executions = Durable.list_executions(status: :running, limit: 100)

list_schedules(filters \\ [])

@spec list_schedules(keyword()) :: [term()]

Lists scheduled workflows.

Filters

  • :enabled - Filter by enabled status
  • :workflow_module - Filter by module
  • :queue - Filter by queue
  • :limit - Maximum results (default: 100)
  • :durable - Durable instance name

Examples

schedules = Durable.list_schedules(enabled: true)

provide_input(workflow_id, input_name, data)

@spec provide_input(String.t(), String.t(), map()) :: :ok | {:error, term()}

Provides input for a waiting workflow (human-in-the-loop).

Examples

:ok = Durable.provide_input(workflow_id, "approval", %{approved: true})

schedule(module, cron_expression, opts \\ [])

@spec schedule(module(), String.t(), keyword()) :: {:ok, term()} | {:error, term()}

Creates a new scheduled workflow.

Arguments

  • module - The workflow module
  • cron_expression - Cron expression (e.g., "0 9 *" for 9am daily)
  • opts - Options

Options

  • :name - Schedule name (defaults to workflow name)
  • :workflow - Workflow name (defaults to first workflow in module)
  • :input - Input data for each execution
  • :timezone - Timezone for cron (default: "UTC")
  • :queue - Queue to run on (default: :default)
  • :enabled - Whether schedule is active (default: true)
  • :durable - Durable instance name (default: Durable)

Examples

{:ok, schedule} = Durable.schedule(MyApp.DailyReport, "0 9 * * *")

{:ok, schedule} = Durable.schedule(
  MyApp.Reports,
  "0 9 * * MON-FRI",
  name: "weekday_report",
  workflow: "generate_report",
  timezone: "America/New_York"
)

send_event(workflow_id, event_name, payload)

@spec send_event(String.t(), String.t(), map()) :: :ok | {:error, term()}

Sends an event to a waiting workflow.

Examples

:ok = Durable.send_event(workflow_id, "payment_confirmed", %{amount: 99.99})

start(module, input, opts \\ [])

@spec start(module(), map(), keyword()) :: {:ok, String.t()} | {:error, term()}

Starts a new workflow execution.

Arguments

  • module - The workflow module
  • input - Initial input data for the workflow
  • opts - Options (optional)

Options

  • :workflow - The workflow name (defaults to the first workflow in the module)
  • :queue - The queue to run the workflow on (default: :default)
  • :priority - Priority level (higher = more important, default: 0)
  • :scheduled_at - Schedule execution for a future time

Examples

{:ok, workflow_id} = Durable.start(OrderWorkflow, %{order_id: 123})

{:ok, workflow_id} = Durable.start(
  OrderWorkflow,
  %{order_id: 123},
  workflow: "process_order",
  queue: :high_priority
)

start_link(opts)

@spec start_link(keyword()) :: Supervisor.on_start()

Starts a Durable instance.

This function is used when adding Durable to your supervision tree.

Options

  • :repo - The Ecto repo module (required)
  • :name - Instance name (default: Durable)
  • :prefix - Database schema prefix (default: "durable")
  • :queues - Queue configuration

See Durable.Config for the complete list of options.

Examples

# In your application supervisor
children = [
  MyApp.Repo,
  {Durable, repo: MyApp.Repo}
]

# With custom queues
{Durable,
 repo: MyApp.Repo,
 queues: %{
   default: [concurrency: 10],
   high_priority: [concurrency: 20]
 }}

trigger_schedule(name, opts \\ [])

@spec trigger_schedule(
  String.t(),
  keyword()
) :: {:ok, String.t()} | {:error, term()}

Triggers a scheduled workflow immediately.

This starts a new workflow execution without waiting for the next scheduled time.

Options

  • :input - Override the schedule's input
  • :durable - Durable instance name

Examples

{:ok, workflow_id} = Durable.trigger_schedule("daily_report")

update_schedule(name, changes)

@spec update_schedule(
  String.t(),
  keyword()
) :: {:ok, term()} | {:error, term()}

Updates a scheduled workflow.

Updatable Fields

  • :cron_expression - New cron expression
  • :timezone - New timezone
  • :input - New input data
  • :queue - New queue
  • :enabled - Enable/disable

Examples

{:ok, schedule} = Durable.update_schedule("daily_report", cron_expression: "0 10 * * *")