swirl_mapper (swirl v0.2.8)

View Source

Summary

Types

boolean_op/0

-type boolean_op() :: 'and' | 'or'.

comparison_op/0

-type comparison_op() :: '<' | '<=' | '=' | '>=' | '>' | '<>'.

event/0

-type event() :: [{atom(), value()}].

exp_tree/0

-type exp_tree() ::
          {boolean_op(), exp_tree(), exp_tree()} |
          {comparison_op(), variable(), value()} |
          {inclusion_op(), variable(), [value(), ...]} |
          {null_op(), variable()}.

flow/0

-type flow() ::
          #flow{id :: binary(),
                module :: module(),
                module_vsn :: undefined | module_vsn(),
                stream_filter :: undefined | string(),
                stream_names :: undefined | stream_names(),
                mapper_window :: undefined | pos_integer(),
                mapper_nodes :: undefined | [node()],
                mapper_opts :: mapper_opts(),
                reducer_window :: undefined | pos_integer(),
                reducer_node :: node(),
                reducer_opts :: reducer_opts(),
                reducer_skip :: undefined | boolean(),
                output_opts :: output_opts(),
                heartbeat :: undefined | pos_integer(),
                window_sync :: undefined | boolean(),
                started_at :: undefined | erlang:timestamp(),
                start_node :: node()}.

inclusion_op/0

-type inclusion_op() :: in | notin.

mapper_opts/0

-type mapper_opts() :: term().

module_vsn/0

-type module_vsn() :: pos_integer().

null_op/0

-type null_op() :: null | notnull.

output_opts/0

-type output_opts() :: term().

reducer_opts/0

-type reducer_opts() :: term().

stream/0

-type stream() ::
          #stream{flow_id :: binary(),
                  flow_mod :: module(),
                  flow_mod_vsn :: module_vsn(),
                  start_node :: node(),
                  exp_tree :: undefined | exp_tree(),
                  mapper_opts :: mapper_opts(),
                  table_id :: ets:tab()}.

stream_name/0

-type stream_name() :: atom().

stream_names/0

-type stream_names() :: [stream_name()].

value/0

-type value() :: integer() | float() | binary().

variable/0

-type variable() :: atom().

Functions

code_change(OldVsn, State, Extra)

handle_call(Request, From, State)

handle_cast(Msg, State)

handle_info(Msg, State)

init(Flow)

lookup(FlowId)

-spec lookup(binary() | flow()) -> undefined | pid.

map(StreamName, Event, Stream)

-spec map(atom(), event(), stream()) -> ok.

register(Flow)

-spec register(flow()) -> true.

start(Flow)

-spec start(flow()) -> {ok, pid()} | {error, mappers_max}.

terminate(Reason, State)

unregister(Flow)

-spec unregister(flow()) -> true.