ConduitMcp.Tasks.Store behaviour (ConduitMCP v0.9.5)

Copy Markdown View Source

Behaviour for pluggable task storage backends.

ConduitMCP uses tasks to model long-running MCP operations (the tasks/* JSON-RPC routes in the 2025-11-25 spec). The default implementation, ConduitMcp.Tasks.EtsStore, keeps tasks in-memory and is ideal for single-node, ephemeral workloads. To survive restarts, distribute across nodes, or back tasks with a job queue, implement this behaviour and configure it as the application's task store:

config :conduit_mcp, :tasks_store, MyApp.MyTasksStore

The standard tasks/get, tasks/cancel, tasks/result, and tasks/list handler routes dispatch through ConduitMcp.Tasks, which forwards every storage call to the configured store. No handler changes are required.

Implementing a Custom Store

A custom store must persist whatever map shape the worker writes (the framework adds "task_id", "status", and "created_at" on create/2 and never reads other keys itself, so any extra fields are preserved verbatim).

Example: Postgres-backed Store

defmodule MyApp.PostgresTaskStore do
  @behaviour ConduitMcp.Tasks.Store

  alias MyApp.{Repo, McpTask}

  @impl true
  def create(task_id, metadata) do
    task = Map.merge(metadata, %{"task_id" => task_id, "status" => "working",
                                 "created_at" => System.system_time(:millisecond)})
    %McpTask{}
    |> McpTask.changeset(task)
    |> Repo.insert()
    |> case do
      {:ok, row} -> {:ok, McpTask.to_map(row)}
      err        -> err
    end
  end

  @impl true
  def get(task_id) do
    case Repo.get(McpTask, task_id) do
      nil -> {:error, :not_found}
      row -> {:ok, McpTask.to_map(row)}
    end
  end

  # ...update/2, cancel/1, delete/1, list/1, cleanup/1
end

See examples/oban_tasks_server/ for an Oban + SQLite implementation and examples/oban_task_store.ex for a Postgres-flavored reference.

Configuration

# Default — in-memory ETS, zero config
# (equivalent to omitting the key)
config :conduit_mcp, :tasks_store, ConduitMcp.Tasks.EtsStore

# Custom store
config :conduit_mcp, :tasks_store, MyApp.MyTasksStore

Summary

Types

Task state. Worker code is responsible for keeping it consistent with the spec lifecycle.

Opaque task identifier (typically Base.url_encode64/1 of 16 random bytes).

Callbacks

Cancels a task.

Removes terminal-state tasks (completed, failed, cancelled) older than ttl_ms. Tasks still in working or input_required should never be evicted.

Creates a new task with the given id and initial metadata.

Deletes a task. Returns :ok whether or not the row existed.

Fetches a task by id.

Lists tasks, optionally filtered by :status.

Merges updates into the existing task and returns the new task. The store may also use this hook to emit side effects (e.g., publish a pub/sub event when a status flips to a terminal state).

Types

task()

@type task() :: map()

Task state. Worker code is responsible for keeping it consistent with the spec lifecycle.

task_id()

@type task_id() :: String.t()

Opaque task identifier (typically Base.url_encode64/1 of 16 random bytes).

Callbacks

cancel(task_id)

(optional)
@callback cancel(task_id()) :: {:ok, task()} | {:error, :not_found}

Cancels a task.

Defaults to update(task_id, %{"status" => "cancelled"}) via the facade when not implemented. Stores backed by a job queue should override this to also cancel the underlying job (e.g., Oban.cancel_job/1).

cleanup(ttl_ms)

(optional)
@callback cleanup(ttl_ms :: non_neg_integer()) :: non_neg_integer() | :ok

Removes terminal-state tasks (completed, failed, cancelled) older than ttl_ms. Tasks still in working or input_required should never be evicted.

Optional. Stores implementing this callback can be paired with ConduitMcp.Tasks.Janitor for periodic background eviction. Stores backed by systems with native TTL (e.g., Redis with EX, or an Oban pruner) can omit it.

Returns the number of tasks removed, or :ok.

create(task_id, metadata)

@callback create(task_id(), metadata :: map()) :: {:ok, task()} | {:error, term()}

Creates a new task with the given id and initial metadata.

The store is responsible for setting "task_id", "status" => "working", and "created_at" on the stored row (the default EtsStore does this; a custom store should mirror that behaviour so the rest of the framework can rely on those fields).

delete(task_id)

@callback delete(task_id()) :: :ok

Deletes a task. Returns :ok whether or not the row existed.

get(task_id)

@callback get(task_id()) :: {:ok, task()} | {:error, :not_found}

Fetches a task by id.

list(opts)

@callback list(opts :: keyword()) :: [task()]

Lists tasks, optionally filtered by :status.

update(task_id, updates)

@callback update(task_id(), updates :: map()) :: {:ok, task()} | {:error, :not_found}

Merges updates into the existing task and returns the new task. The store may also use this hook to emit side effects (e.g., publish a pub/sub event when a status flips to a terminal state).