file_log_reader (util v1.3.5)
View SourcePeriodically read an append-only log file and parse newly added data.
The user controls the interval in msec how often to check for file modifications. When new data is appended to file it triggers invocation of the user-defined parsing function that deliminates the file, and the result is delivered to the consumer by calling the consumer callback function.
The log reader can be started as a gen_server or can be controlled
synchronously by using init/3, run/1, and close/1 methods.
Author: Serge Aleynikov saleyn@gmail.com Copyright: 2015 Serge Aleynikov
Summary
Functions
Close file processor (use this method when not using gen_server)
Convert process state when code is changed
Handling call messages
Handling cast messages
Handling all non call/cast messages
Initiates the server
When using file processor without gen_server, use this function to initialize the state, and then call run/1.
Report last processed file position/size.
Return current parser state ({pstate, any()} initialization option).
Process file from given position Pos to EndPos (or eof).
Start the server outside of supervision tree.
Process File by calling Consumer callback on every delimited message.
Message delimination is handled by the {parser, Parser} option. Consumer
function gets called iteratively with the following arguments
To be called by the supervisor in order to start the server. If init/1 fails
with Reason, the function returns {error,Reason}. If init/1 returns
{stop,Reason} or ignore, the process is terminated and the function returns
{error,Reason} or ignore, respectively. See: start_link/3.
Stop the server.
This function is called by a gen_server when it is about to terminate. It should be the opposite of Module:init/1 and do any necessary cleaning up. When it returns, the gen_server terminates with Reason. The return value is ignored.
Update parser state.
Types
-type options() :: [{pos, StartPos :: integer()} | {end_pos, ReadUntilPos :: integer() | eof} | {max_size, MaxReadSize :: integer() | eof} | {timeout, MSec :: integer()} | {retry_sec, Sec :: integer()} | {parser, fun((Data :: binary(), ParserState :: any()) -> {ok, Msg :: any(), Tail :: binary(), NewParserState :: any()} | {incomplete, NewParserState :: any()} | {skip, Tail :: binary(), NewParserState :: any()}) | {Mod :: atom(), Fun :: atom()}} | {pstate, fun((File :: string(), consumer(), Options :: list()) -> any()) | any()} | {pstate_update, fun((Option :: atom(), Value :: any(), PState :: any()) -> {ok, NewPState :: any()} | {error, any()})}].
Details:
pos— Start reading from this position (default: 0)end_pos— Read until this position and stop. If provided and file position reachesend_pos, the consumer() callback given to the reader will be called as:Consumer({'$end_of_file', Filename::string(), Result}, Pos::integer(), State)whereResultisokor{error|exit|exception, Error::any(), StackTrace}if an error occured.max_size— Maximum chunk size to read from file in a single pass (default: 32M).timeout— Number of milliseconds between successive file scanning (default: 1000)retry_sec— Number of seconds between successive retries upon failure to open the market data file passed to one of thestart*/{3,4}functions (default: 15). The value of 0 means that the file must exist or else the process won't start.parser— Is the function to be called when the next chunk is read from file. The function must return:{ok, Msg, Tail, State}— invokeConsumercallback passing it the parsed messageMsg, and continue parsing theTailbinary{incomplete, State}— the data contains no complete messages - wait until there's more{skip, Tail, State}— disregard input and continue parsingTailwithout callingConsumercallback
pstate— Initial value of the parser state or a functorfun((File::string() Consumer::consumer(), Options::options()) -> PState::any())pstate_update— Update function of the parser state. Called when the user invokesupdate_pstate/3
Functions
Close file processor (use this method when not using gen_server)
-spec code_change(any(), #state{consumer :: consumer(), tref :: reference(), fd :: port(), file :: string(), pos :: integer() | eof, end_pos :: integer() | eof | undefined, max_size :: integer(), timeout :: integer(), parser :: {atom(), atom()} | fun((binary(), any()) -> {any(), binary(), any()}), pstate :: any(), pstate_update :: fun((any()) -> any()), part_size :: integer(), done :: false | ok | {error | exception | exit, Reason :: any(), Stacktrace :: any()}, incompl_count :: integer()}, any()) -> #state{consumer :: consumer(), tref :: reference(), fd :: port(), file :: string(), pos :: integer() | eof, end_pos :: integer() | eof | undefined, max_size :: integer(), timeout :: integer(), parser :: {atom(), atom()} | fun((binary(), any()) -> {any(), binary(), any()}), pstate :: any(), pstate_update :: fun((any()) -> any()), part_size :: integer(), done :: false | ok | {error | exception | exit, Reason :: any(), Stacktrace :: any()}, incompl_count :: integer()}.
Convert process state when code is changed
-spec handle_call(any(), From :: tuple(), #state{consumer :: consumer(), tref :: reference(), fd :: port(), file :: string(), pos :: integer() | eof, end_pos :: integer() | eof | undefined, max_size :: integer(), timeout :: integer(), parser :: {atom(), atom()} | fun((binary(), any()) -> {any(), binary(), any()}), pstate :: any(), pstate_update :: fun((any()) -> any()), part_size :: integer(), done :: false | ok | {error | exception | exit, Reason :: any(), Stacktrace :: any()}, incompl_count :: integer()}) -> {reply, Reply :: any(), #state{consumer :: consumer(), tref :: reference(), fd :: port(), file :: string(), pos :: integer() | eof, end_pos :: integer() | eof | undefined, max_size :: integer(), timeout :: integer(), parser :: {atom(), atom()} | fun((binary(), any()) -> {any(), binary(), any()}), pstate :: any(), pstate_update :: fun((any()) -> any()), part_size :: integer(), done :: false | ok | {error | exception | exit, Reason :: any(), Stacktrace :: any()}, incompl_count :: integer()}} | {reply, Reply :: any(), #state{consumer :: consumer(), tref :: reference(), fd :: port(), file :: string(), pos :: integer() | eof, end_pos :: integer() | eof | undefined, max_size :: integer(), timeout :: integer(), parser :: {atom(), atom()} | fun((binary(), any()) -> {any(), binary(), any()}), pstate :: any(), pstate_update :: fun((any()) -> any()), part_size :: integer(), done :: false | ok | {error | exception | exit, Reason :: any(), Stacktrace :: any()}, incompl_count :: integer()}, Timeout :: integer() | hibernate} | {noreply, #state{consumer :: consumer(), tref :: reference(), fd :: port(), file :: string(), pos :: integer() | eof, end_pos :: integer() | eof | undefined, max_size :: integer(), timeout :: integer(), parser :: {atom(), atom()} | fun((binary(), any()) -> {any(), binary(), any()}), pstate :: any(), pstate_update :: fun((any()) -> any()), part_size :: integer(), done :: false | ok | {error | exception | exit, Reason :: any(), Stacktrace :: any()}, incompl_count :: integer()}} | {noreply, #state{consumer :: consumer(), tref :: reference(), fd :: port(), file :: string(), pos :: integer() | eof, end_pos :: integer() | eof | undefined, max_size :: integer(), timeout :: integer(), parser :: {atom(), atom()} | fun((binary(), any()) -> {any(), binary(), any()}), pstate :: any(), pstate_update :: fun((any()) -> any()), part_size :: integer(), done :: false | ok | {error | exception | exit, Reason :: any(), Stacktrace :: any()}, incompl_count :: integer()}, Timeout :: integer() | hibernate} | {stop, Reason :: any(), Reply :: any(), #state{consumer :: consumer(), tref :: reference(), fd :: port(), file :: string(), pos :: integer() | eof, end_pos :: integer() | eof | undefined, max_size :: integer(), timeout :: integer(), parser :: {atom(), atom()} | fun((binary(), any()) -> {any(), binary(), any()}), pstate :: any(), pstate_update :: fun((any()) -> any()), part_size :: integer(), done :: false | ok | {error | exception | exit, Reason :: any(), Stacktrace :: any()}, incompl_count :: integer()}} | {stop, Reason :: any(), #state{consumer :: consumer(), tref :: reference(), fd :: port(), file :: string(), pos :: integer() | eof, end_pos :: integer() | eof | undefined, max_size :: integer(), timeout :: integer(), parser :: {atom(), atom()} | fun((binary(), any()) -> {any(), binary(), any()}), pstate :: any(), pstate_update :: fun((any()) -> any()), part_size :: integer(), done :: false | ok | {error | exception | exit, Reason :: any(), Stacktrace :: any()}, incompl_count :: integer()}}.
Handling call messages
-spec handle_cast(any(), #state{consumer :: consumer(), tref :: reference(), fd :: port(), file :: string(), pos :: integer() | eof, end_pos :: integer() | eof | undefined, max_size :: integer(), timeout :: integer(), parser :: {atom(), atom()} | fun((binary(), any()) -> {any(), binary(), any()}), pstate :: any(), pstate_update :: fun((any()) -> any()), part_size :: integer(), done :: false | ok | {error | exception | exit, Reason :: any(), Stacktrace :: any()}, incompl_count :: integer()}) -> {noreply, #state{consumer :: consumer(), tref :: reference(), fd :: port(), file :: string(), pos :: integer() | eof, end_pos :: integer() | eof | undefined, max_size :: integer(), timeout :: integer(), parser :: {atom(), atom()} | fun((binary(), any()) -> {any(), binary(), any()}), pstate :: any(), pstate_update :: fun((any()) -> any()), part_size :: integer(), done :: false | ok | {error | exception | exit, Reason :: any(), Stacktrace :: any()}, incompl_count :: integer()}} | {noreply, #state{consumer :: consumer(), tref :: reference(), fd :: port(), file :: string(), pos :: integer() | eof, end_pos :: integer() | eof | undefined, max_size :: integer(), timeout :: integer(), parser :: {atom(), atom()} | fun((binary(), any()) -> {any(), binary(), any()}), pstate :: any(), pstate_update :: fun((any()) -> any()), part_size :: integer(), done :: false | ok | {error | exception | exit, Reason :: any(), Stacktrace :: any()}, incompl_count :: integer()}, Timeout :: integer() | hibernate} | {stop, Reason :: any(), #state{consumer :: consumer(), tref :: reference(), fd :: port(), file :: string(), pos :: integer() | eof, end_pos :: integer() | eof | undefined, max_size :: integer(), timeout :: integer(), parser :: {atom(), atom()} | fun((binary(), any()) -> {any(), binary(), any()}), pstate :: any(), pstate_update :: fun((any()) -> any()), part_size :: integer(), done :: false | ok | {error | exception | exit, Reason :: any(), Stacktrace :: any()}, incompl_count :: integer()}}.
Handling cast messages
-spec handle_info(any(), #state{consumer :: consumer(), tref :: reference(), fd :: port(), file :: string(), pos :: integer() | eof, end_pos :: integer() | eof | undefined, max_size :: integer(), timeout :: integer(), parser :: {atom(), atom()} | fun((binary(), any()) -> {any(), binary(), any()}), pstate :: any(), pstate_update :: fun((any()) -> any()), part_size :: integer(), done :: false | ok | {error | exception | exit, Reason :: any(), Stacktrace :: any()}, incompl_count :: integer()}) -> {noreply, #state{consumer :: consumer(), tref :: reference(), fd :: port(), file :: string(), pos :: integer() | eof, end_pos :: integer() | eof | undefined, max_size :: integer(), timeout :: integer(), parser :: {atom(), atom()} | fun((binary(), any()) -> {any(), binary(), any()}), pstate :: any(), pstate_update :: fun((any()) -> any()), part_size :: integer(), done :: false | ok | {error | exception | exit, Reason :: any(), Stacktrace :: any()}, incompl_count :: integer()}} | {noreply, #state{consumer :: consumer(), tref :: reference(), fd :: port(), file :: string(), pos :: integer() | eof, end_pos :: integer() | eof | undefined, max_size :: integer(), timeout :: integer(), parser :: {atom(), atom()} | fun((binary(), any()) -> {any(), binary(), any()}), pstate :: any(), pstate_update :: fun((any()) -> any()), part_size :: integer(), done :: false | ok | {error | exception | exit, Reason :: any(), Stacktrace :: any()}, incompl_count :: integer()}, Timeout :: integer() | hibernate} | {stop, Reason :: any(), #state{consumer :: consumer(), tref :: reference(), fd :: port(), file :: string(), pos :: integer() | eof, end_pos :: integer() | eof | undefined, max_size :: integer(), timeout :: integer(), parser :: {atom(), atom()} | fun((binary(), any()) -> {any(), binary(), any()}), pstate :: any(), pstate_update :: fun((any()) -> any()), part_size :: integer(), done :: false | ok | {error | exception | exit, Reason :: any(), Stacktrace :: any()}, incompl_count :: integer()}}.
Handling all non call/cast messages
-spec init(list()) -> {ok, #state{consumer :: consumer(), tref :: reference(), fd :: port(), file :: string(), pos :: integer() | eof, end_pos :: integer() | eof | undefined, max_size :: integer(), timeout :: integer(), parser :: {atom(), atom()} | fun((binary(), any()) -> {any(), binary(), any()}), pstate :: any(), pstate_update :: fun((any()) -> any()), part_size :: integer(), done :: false | ok | {error | exception | exit, Reason :: any(), Stacktrace :: any()}, incompl_count :: integer()}} | {ok, #state{consumer :: consumer(), tref :: reference(), fd :: port(), file :: string(), pos :: integer() | eof, end_pos :: integer() | eof | undefined, max_size :: integer(), timeout :: integer(), parser :: {atom(), atom()} | fun((binary(), any()) -> {any(), binary(), any()}), pstate :: any(), pstate_update :: fun((any()) -> any()), part_size :: integer(), done :: false | ok | {error | exception | exit, Reason :: any(), Stacktrace :: any()}, incompl_count :: integer()}, Timeout :: integer() | hibernate} | ignore | {stop, any()}.
Initiates the server
-spec init(string(), consumer(), options()) -> {ok, #state{consumer :: consumer(), tref :: reference(), fd :: port(), file :: string(), pos :: integer() | eof, end_pos :: integer() | eof | undefined, max_size :: integer(), timeout :: integer(), parser :: {atom(), atom()} | fun((binary(), any()) -> {any(), binary(), any()}), pstate :: any(), pstate_update :: fun((any()) -> any()), part_size :: integer(), done :: false | ok | {error | exception | exit, Reason :: any(), Stacktrace :: any()}, incompl_count :: integer()}}.
When using file processor without gen_server, use this function to initialize the state, and then call run/1.
Report last processed file position/size.
Return current parser state ({pstate, any()} initialization option).
-spec run(#state{consumer :: consumer(), tref :: reference(), fd :: port(), file :: string(), pos :: integer() | eof, end_pos :: integer() | eof | undefined, max_size :: integer(), timeout :: integer(), parser :: {atom(), atom()} | fun((binary(), any()) -> {any(), binary(), any()}), pstate :: any(), pstate_update :: fun((any()) -> any()), part_size :: integer(), done :: false | ok | {error | exception | exit, Reason :: any(), Stacktrace :: any()}, incompl_count :: integer()}) -> #state{consumer :: consumer(), tref :: reference(), fd :: port(), file :: string(), pos :: integer() | eof, end_pos :: integer() | eof | undefined, max_size :: integer(), timeout :: integer(), parser :: {atom(), atom()} | fun((binary(), any()) -> {any(), binary(), any()}), pstate :: any(), pstate_update :: fun((any()) -> any()), part_size :: integer(), done :: false | ok | {error | exception | exit, Reason :: any(), Stacktrace :: any()}, incompl_count :: integer()}.
Process file from given position Pos to EndPos (or eof).
Start the server outside of supervision tree.
Process File by calling Consumer callback on every delimited message.
Message delimination is handled by the {parser, Parser} option. Consumer
function gets called iteratively with the following arguments:
(Msg, Pos::integer(), State)—Msgis what the parser function returned.Posis current file position following theMsg.Stateis current value of parser state that is initialized by{pstate, PState}option given to thestart_link/{3,4}function({'$end_of_file', Filename::string(), Result}, Pos::integer(), PState)— This call happens when end of file condition is reached (see definition ofconsumer()type)
Consumer can end processing normally without reaching the end of file by
throwing {eof, PState} exception.
To be called by the supervisor in order to start the server. If init/1 fails
with Reason, the function returns {error,Reason}. If init/1 returns
{stop,Reason} or ignore, the process is terminated and the function returns
{error,Reason} or ignore, respectively. See: start_link/3.
Stop the server.
-spec terminate(any(), #state{consumer :: consumer(), tref :: reference(), fd :: port(), file :: string(), pos :: integer() | eof, end_pos :: integer() | eof | undefined, max_size :: integer(), timeout :: integer(), parser :: {atom(), atom()} | fun((binary(), any()) -> {any(), binary(), any()}), pstate :: any(), pstate_update :: fun((any()) -> any()), part_size :: integer(), done :: false | ok | {error | exception | exit, Reason :: any(), Stacktrace :: any()}, incompl_count :: integer()}) -> ok.
This function is called by a gen_server when it is about to terminate. It should be the opposite of Module:init/1 and do any necessary cleaning up. When it returns, the gen_server terminates with Reason. The return value is ignored.
Update parser state.