GenLoop behaviour (GenLoop v2.1.0)

Copy Markdown View Source

Features

  • OTP Compliant: Handles system messages (sys module), parent supervision, and debugging.
  • Selective Receive: Use receive blocks to pick messages when you want them, enabling complex state machines or protocols to be implemented more naturally.
  • Convenient Macros: rcall/2 and rcast/1 macros to easily match on GenLoop.call/2 and GenLoop.cast/2 messages.
  • Familiar API: Uses start_link/3, call/3, cast/2 similar to GenServer.

Usage

To use GenLoop, use GenLoop in your module. You need to define an entry point function (default is enter_loop/1 or specified via enter: :function_name).

Basic Example

Here is a simple Stack implementation:

defmodule Stack do
  use GenLoop

  # Client API

  def start_link(initial_stack) do
    GenLoop.start_link(__MODULE__, initial_stack, name: __MODULE__)
  end

  def push(item) do
    GenLoop.cast(__MODULE__, {:push, item})
  end

  def pop do
    GenLoop.call(__MODULE__, :pop)
  end

  # Server Callbacks

  # Optional: init/1 can be used to validate args or set up initial state.
  # It should return {:ok, state}, {:stop, reason}, or :ignore.
  def init(initial_stack) do
    {:ok, initial_stack}
  end

  # The main loop.
  # The `receive/2` macro (provided by GenLoop) is used instead of `receive/1`.
  # It takes the current state as the first argument to handle system messages automatically.
  def enter_loop(stack) do
    receive stack do
      # Match a synchronous call
      rcall(from, :pop) ->
        case stack do
          [head | tail] ->
            reply(from, head) # Helper to send reply
            enter_loop(tail)  # Loop with new state
          [] ->
            reply(from, nil)
            enter_loop([])
        end

      # Match an asynchronous cast
      rcast({:push, item}) ->
        enter_loop([item | stack])

      # Match standard messages
      other ->
        IO.inspect(other, label: "Unexpected message")
        enter_loop(stack)
    end
  end
end

Using the Process

{:ok, _pid} = Stack.start_link([1, 2])

Stack.pop()
#=> 1

Stack.push(3)
#=> :ok

Stack.pop()
#=> 3

Why GenLoop?

vs GenServer

GenServer is the standard abstraction for client-server relations. However, it forces you to handle every message in a callback (handle_call, handle_info, etc.). This is great for most cases, but can be cumbersome for complex state machines where the set of expected messages changes depending on the state.

GenLoop allows you to write a "plain" recursive loop with receive, so you can wait for specific messages at specific times (selective receive).

vs :gen_statem

:gen_statem is the OTP standard for state machines. It is very powerful but can be verbose. GenLoop offers a middle ground: it's simpler and feels more like writing a raw Elixir process, but with the safety net of OTP compliance.

Advanced Usage

Custom Entry Point

You can specify a different entry function name:

use GenLoop, enter: :my_loop

def my_loop(state) do
  # ...
end

Handling System Messages

The receive state do ... end macro is the magic that makes your loop OTP-compliant. It expands to a receive block that also includes clauses for handling system messages (like sys.get_state, sys.suspend, etc.) and parent exit signals.

If you want to handle everything manually (not recommended unless you know what you are doing), you can use the standard receive do ... end, but your process will not respond to standard OTP system calls.

Acknowledgements

This library is built on top of :plain_fsm by Ulf Wiger. It also draws inspiration from plain_fsm_ex by ashneyderman.

Summary

Types

Debug options supported by the start* functions

Tuple describing the client of a call request. pid is the PID of the caller and tag is a unique term used to identify the call.

The GenLoop name

Return values of start* functions

Option values used by the start* functions

Options used by the start* functions

The server reference

Functions

Sends an asynchronous broadcast to the named server on all connected nodes and the local node.

Sends an asynchronous broadcast to the named server on the given nodes.

Sends a synchronous request to the server and waits for its reply.

Sends an asynchronous request to the server.

Returns the pid of the client from a from/0 reference bound by rcall/2.

Hibernates the process and resumes in a loop function when a message arrives.

Sends a synchronous request to the named server on several nodes and collects their replies.

Matches a message sent by call/3 inside a receive/2 block.

Matches a message sent by cast/2 inside a receive/2 block.

Waits for messages while keeping the process OTP-compliant.

Replies to a call/3 request.

Behaves just like Kernel.send but accepts atoms or registry tuples on top of pids to identify a process.

Starts a GenLoop process without linking it to the caller.

Starts a GenLoop process linked to the caller.

Stops the server with reason :normal.

Stops the server with the given exit reason.

Stops the server with the given exit reason, waiting at most timeout milliseconds.

Returns the pid of the process registered under name, or nil when no such process exists.

Types

debug()

@type debug() :: [:trace | :log | :statistics | {:log_to_file, Path.t()}]

Debug options supported by the start* functions

from()

@type from() :: {pid(), tag :: term()}

Tuple describing the client of a call request. pid is the PID of the caller and tag is a unique term used to identify the call.

name()

@type name() :: atom() | {:global, term()} | {:via, module(), term()}

The GenLoop name

on_start()

@type on_start() ::
  {:ok, pid()} | :ignore | {:error, {:already_started, pid()} | term()}

Return values of start* functions

option()

@type option() ::
  {:debug, debug()}
  | {:name, name()}
  | {:timeout, timeout()}
  | {:spawn_opt, Process.spawn_opt()}

Option values used by the start* functions

options()

@type options() :: [option()]

Options used by the start* functions

server()

@type server() :: pid() | name() | {atom(), node()}

The server reference

Callbacks

data_vsn()

@callback data_vsn() :: term()

init(args)

@callback init(args :: term()) ::
  {:ok, state}
  | {:ok, state, timeout() | :hibernate}
  | :ignore
  | {:stop, reason :: any()}
when state: any()

Functions

abcast(server, term)

Sends an asynchronous broadcast to the named server on all connected nodes and the local node.

Delegates to GenServer.abcast/2. Always returns :abcast.

abcast(nodes, server, term)

@spec abcast([node()], name :: atom(), term()) :: :abcast

Sends an asynchronous broadcast to the named server on the given nodes.

Delegates to GenServer.abcast/3. Always returns :abcast.

call(server, request, timeout \\ 5000)

@spec call(server(), term(), timeout()) :: term()

Sends a synchronous request to the server and waits for its reply.

Works like GenServer.call/3, but the request is delivered to a GenLoop process where it is matched with the rcall/2 macro inside a receive/2 block, and answered with reply/2.

server can be a pid, a registered name, or any of the server/0 forms. The call raises if server resolves to the calling process itself, and exits if no process is found or if the callee does not reply within timeout milliseconds (defaults to 5000).

cast(server, term)

@spec cast(server(), term()) :: term()

Sends an asynchronous request to the server.

Delegates to GenServer.cast/2 and returns :ok immediately. The message is matched with the rcast/1 macro inside the target loop's receive/2 block.

from_pid(from)

(macro)

Returns the pid of the client from a from/0 reference bound by rcall/2.

Useful when the loop needs the caller's pid, for instance to monitor it, rather than only replying to it with reply/2.

hibernate(module, function, args)

(macro)

Hibernates the process and resumes in a loop function when a message arrives.

Wraps :erlang.hibernate/3 through :plain_fsm, so the process state is restored (running c:code_change/3 when the code version changed) before the loop continues. module and function name the loop function to wake up in, and args must be a one-element list holding the state to resume with.

def loop(state) do
  receive state do
    rcast(:idle) ->
      hibernate(__MODULE__, :loop, [state])
  end
end

Passing an args list with anything other than one element raises an ArgumentError.

multi_call(nodes \\ [node() | Node.list()], name, request, timeout \\ :infinity)

@spec multi_call([node()], name :: atom(), term(), timeout()) ::
  {replies :: [{node(), term()}], bad_nodes :: [node()]}

Sends a synchronous request to the named server on several nodes and collects their replies.

nodes defaults to the local node plus every connected node. name must be the locally registered name of the server on each node. Returns a tuple of the gathered {node, reply} pairs and the list of nodes that did not reply within timeout (defaults to :infinity).

rcall(from, msg)

(macro)

Matches a message sent by call/3 inside a receive/2 block.

Binds the client reference to from, which is later passed to reply/2, and matches the request against msg.

receive state do
  rcall(from, :pop) ->
    reply(from, hd(state))
    loop(tl(state))
end

rcast(msg)

(macro)

Matches a message sent by cast/2 inside a receive/2 block.

Matches the cast payload against msg.

receive state do
  rcast({:push, item}) ->
    loop([item | state])
end

receive(state_var, blocks)

(macro)

Waits for messages while keeping the process OTP-compliant.

Use this macro in place of the standard Kernel.SpecialForms.receive/1 inside a loop function of arity 1. state_var is the current process state, and blocks holds the usual do/after clauses.

On top of the clauses you write, the macro injects clauses that handle system messages (the :sys protocol used by call/3, debugging, and code change) and parent exit signals, passing state_var along so those handlers can resume the loop with the current state. Match call/3 and cast/2 messages with the rcall/2 and rcast/1 macros.

The enclosing function must take exactly one argument, the state, otherwise a compile-time ArgumentError is raised.

def loop(state) do
  receive state do
    rcast({:push, item}) ->
      loop([item | state])
  after
    5000 ->
      loop(state)
  end
end

reply(from, term)

@spec reply(from(), term()) :: :ok

Replies to a call/3 request.

from is the client reference bound by the rcall/2 macro on the server side. Delegates to GenServer.reply/2 and returns :ok.

send(server, message)

@spec send(server(), term()) :: term()

Behaves just like Kernel.send but accepts atoms or registry tuples on top of pids to identify a process.

start(module, args, options \\ [])

@spec start(module(), any(), options()) :: on_start()

Starts a GenLoop process without linking it to the caller.

Takes the callback module, the args term passed to its init/1 callback, and the start options/0 (such as :name, :timeout, or :debug). Returns a on_start/0 value.

Use start_link/3 instead when the process should be supervised.

start_link(module, args, options \\ [])

@spec start_link(module(), any(), options()) :: on_start()

Starts a GenLoop process linked to the caller.

Takes the callback module, the args term passed to its init/1 callback, and the start options/0 (such as :name, :timeout, or :debug). Returns a on_start/0 value.

This is the function to call from a supervisor child spec.

stop(server)

Stops the server with reason :normal.

Delegates to GenServer.stop/1 and returns :ok once the process has terminated.

stop(server, reason)

Stops the server with the given exit reason.

Delegates to GenServer.stop/2 and returns :ok once the process has terminated.

stop(server, reason, timeout)

@spec stop(server(), reason :: term(), timeout()) :: :ok

Stops the server with the given exit reason, waiting at most timeout milliseconds.

Delegates to GenServer.stop/3. Exits with :timeout if the process does not terminate in time.

whereis(name)

@spec whereis(server()) :: pid() | {atom(), node()} | nil

Returns the pid of the process registered under name, or nil when no such process exists.

Delegates to GenServer.whereis/1 and accepts the same name forms.