Selecto.Output.Transformers.Stream
(Selecto v0.4.5)
Copy Markdown
Transforms query results to streaming format for memory-efficient processing of large datasets.
This module provides lazy evaluation of result transformations, allowing processing of large result sets without loading all data into memory at once.
Features
- Lazy evaluation via Elixir Streams
- Configurable batch sizes for optimization
- Composition with other transformers (maps, JSON, CSV)
- Memory-efficient processing for large datasets
- Backpressure support for downstream consumers
Examples
# Stream rows as maps
{:ok, stream} = transform(rows, columns, aliases, :maps)
Enum.each(stream, &process_row/1)
# Stream to JSON Lines format
{:ok, stream} = transform(rows, columns, aliases, {:json, format: :lines})
Enum.into(stream, file_stream)
# Stream CSV rows
{:ok, stream} = transform(rows, columns, aliases, :csv)
Enum.into(stream, File.stream!("output.csv"))
Summary
Functions
Transform rows to a stream with the specified inner format.
Create a stream that yields results in chunks, useful for pagination or batch processing.
Create a streaming transformation that processes rows one at a time.
Transform and write results directly to an IO device or file stream.
Functions
@spec transform(list() | Enumerable.t(), list(), map(), atom() | tuple(), keyword()) :: {:ok, Enumerable.t()} | {:error, term()}
Transform rows to a stream with the specified inner format.
Parameters
rows- List of row data or stream of rowscolumns- List of column namesaliases- Map of column aliasesinner_format- The format to transform each row/batch tooptions- Transformation options
Options
:batch_size- Number of rows to process in each batch. Default: 1000:parallel- Whether to process batches in parallel. Default: false- Other options are passed to the inner transformer
Inner Format Support
:maps- Stream of maps{:maps, opts}- Stream of maps with options:json- Stream of JSON strings (JSON Lines format by default){:json, opts}- Stream of JSON with options:csv- Stream of CSV lines{:csv, opts}- Stream of CSV with options:raw- Stream of raw row lists
Examples
# Basic streaming to maps
{:ok, stream} = transform(rows, columns, aliases, :maps)
# Streaming with custom batch size
{:ok, stream} = transform(rows, columns, aliases, :maps, batch_size: 500)
# Streaming to JSON Lines
{:ok, stream} = transform(rows, columns, aliases, {:json, format: :lines})
@spec transform_chunked(Enumerable.t(), list(), map(), atom() | tuple(), keyword()) :: {:ok, Enumerable.t()} | {:error, term()}
Create a stream that yields results in chunks, useful for pagination or batch processing.
Examples
{:ok, chunked_stream} = transform_chunked(rows, columns, aliases, :maps, chunk_size: 100)
Enum.each(chunked_stream, fn chunk ->
# Process 100 maps at a time
process_batch(chunk)
end)
@spec transform_single(Enumerable.t(), list(), map(), atom() | tuple(), keyword()) :: {:ok, Enumerable.t()} | {:error, term()}
Create a streaming transformation that processes rows one at a time.
This is more memory efficient for simple transformations but may be slower for complex transformations that benefit from batching.
@spec transform_to_io( Enumerable.t(), list(), map(), atom() | tuple(), IO.device(), keyword() ) :: :ok | {:error, term()}
Transform and write results directly to an IO device or file stream.
This is useful for writing large result sets directly to files without buffering all data in memory.
Examples
# Write to file
File.open!("output.csv", [:write], fn file ->
transform_to_io(rows, columns, aliases, :csv, file)
end)