kura_stream (kura v2.0.6)

View Source

Server-side cursor streaming for processing large result sets in batches.

Q = kura_query:where(kura_query:from(my_user), {active, true}),
kura_stream:stream(my_repo, Q, fun(Batch) ->
    [process(Row) || Row <- Batch],
    ok
end, #{batch_size => 100}).

Summary

Functions

Stream query results in batches of 500, calling Fun for each batch.

Stream query results with options. Opts: #{batch_size => pos_integer()}.

Functions

stream(RepoMod, Query, Fun)

-spec stream(module(),
             #kura_query{from :: atom() | module() | undefined,
                         select :: [atom() | term()] | {exprs, [term()]},
                         wheres :: [term()],
                         joins :: [term()],
                         order_bys :: [term()],
                         group_bys :: [atom()],
                         havings :: [term()],
                         limit :: non_neg_integer() | undefined,
                         offset :: non_neg_integer() | undefined,
                         distinct :: boolean() | [atom()],
                         lock :: binary() | undefined,
                         prefix :: binary() | undefined,
                         preloads :: [atom() | {atom(), list()}],
                         ctes :: [{binary(), #kura_query{}}],
                         combinations :: [{union | union_all | intersect | except, #kura_query{}}],
                         include_deleted :: boolean()},
             fun(([map()]) -> ok)) ->
                ok | {error, term()}.

Stream query results in batches of 500, calling Fun for each batch.

stream(RepoMod, Query, Fun, Opts)

-spec stream(module(),
             #kura_query{from :: atom() | module() | undefined,
                         select :: [atom() | term()] | {exprs, [term()]},
                         wheres :: [term()],
                         joins :: [term()],
                         order_bys :: [term()],
                         group_bys :: [atom()],
                         havings :: [term()],
                         limit :: non_neg_integer() | undefined,
                         offset :: non_neg_integer() | undefined,
                         distinct :: boolean() | [atom()],
                         lock :: binary() | undefined,
                         prefix :: binary() | undefined,
                         preloads :: [atom() | {atom(), list()}],
                         ctes :: [{binary(), #kura_query{}}],
                         combinations :: [{union | union_all | intersect | except, #kura_query{}}],
                         include_deleted :: boolean()},
             fun(([map()]) -> ok),
             map()) ->
                ok | {error, term()}.

Stream query results with options. Opts: #{batch_size => pos_integer()}.