Lockstep.Task (Lockstep v0.1.0)

Copy Markdown View Source

Lockstep-controlled Task.async / Task.await / async_stream style helper.

Example

ctest "concurrent fan-out" do
  tasks =
    for x <- 1..4 do
      Lockstep.Task.async(fn -> x * 2 end)
    end

  results = Lockstep.Task.await_many(tasks)
  assert Enum.sort(results) == [2, 4, 6, 8]
end

Limitations

  • No :timeout honoured -- everything blocks indefinitely under the controller. The runner's per-iteration timeout still applies.
  • async_stream honours :max_concurrency but not :on_timeout.
  • Task.Supervisor.async/start_child are routed to bare Lockstep.spawn -- no supervisor restart semantics, but task lifecycle is otherwise the same.

Summary

Functions

Spawn a managed task running fun. Returns a task handle to be passed to await/1 or await_many/1.

3-arg async (Task.async(M, F, A)).

Lockstep version of Task.async_stream/3,4. Honours :max_concurrency (default System.schedulers_online()); other options are accepted but ignored. Returns a stream of {:ok, result} tuples in input order.

4-arg form: async_stream(enum, M, F, A_extra, opts) invokes apply(M, F, [item | A_extra]) per element.

Block until the task replies with its result. Selective-receive-matched on the task's unique ref so other messages in the caller's mailbox are not disturbed.

Await many tasks; return their results in the order the handles were passed in.

Unlinked task; returns {:ok, pid} to match Task.start/1.

Three-arg variant of start/1.

Spawn a managed linked task; returns {:ok, pid} to match Task.start_link/1's shape.

Three-arg variant: Task.start_link(M, F, A).

Types

t()

@opaque t()

Functions

async(fun)

@spec async((-> any())) :: t()

Spawn a managed task running fun. Returns a task handle to be passed to await/1 or await_many/1.

async(module, fun, args)

@spec async(module(), atom(), [any()]) :: t()

3-arg async (Task.async(M, F, A)).

async_stream(enumerable, fun, opts \\ [])

Lockstep version of Task.async_stream/3,4. Honours :max_concurrency (default System.schedulers_online()); other options are accepted but ignored. Returns a stream of {:ok, result} tuples in input order.

Lockstep.Task.async_stream(1..4, fn x -> x * 2 end)
|> Enum.to_list()
# => [{:ok, 2}, {:ok, 4}, {:ok, 6}, {:ok, 8}]

async_stream(enumerable, module, function, args, opts \\ [])

4-arg form: async_stream(enum, M, F, A_extra, opts) invokes apply(M, F, [item | A_extra]) per element.

await(task, timeout \\ 5000)

@spec await(t(), timeout()) :: any()

Block until the task replies with its result. Selective-receive-matched on the task's unique ref so other messages in the caller's mailbox are not disturbed.

await_many(tasks, timeout \\ 5000)

@spec await_many([t()], timeout()) :: [any()]

Await many tasks; return their results in the order the handles were passed in.

start(fun)

@spec start((-> any())) :: {:ok, pid()}

Unlinked task; returns {:ok, pid} to match Task.start/1.

start(module, fun, args)

@spec start(module(), atom(), [any()]) :: {:ok, pid()}

Three-arg variant of start/1.

start_link(fun)

@spec start_link((-> any())) :: {:ok, pid()}

Spawn a managed linked task; returns {:ok, pid} to match Task.start_link/1's shape.

start_link(module, fun, args)

@spec start_link(module(), atom(), [any()]) :: {:ok, pid()}

Three-arg variant: Task.start_link(M, F, A).