Selecto.Output.Transformers.Stream (Selecto v0.4.5)

Copy Markdown

Transforms query results to streaming format for memory-efficient processing of large datasets.

This module provides lazy evaluation of result transformations, allowing processing of large result sets without loading all data into memory at once.

Features

  • Lazy evaluation via Elixir Streams
  • Configurable batch sizes for optimization
  • Composition with other transformers (maps, JSON, CSV)
  • Memory-efficient processing for large datasets
  • Backpressure support for downstream consumers

Examples

# Stream rows as maps
{:ok, stream} = transform(rows, columns, aliases, :maps)
Enum.each(stream, &process_row/1)

# Stream to JSON Lines format
{:ok, stream} = transform(rows, columns, aliases, {:json, format: :lines})
Enum.into(stream, file_stream)

# Stream CSV rows
{:ok, stream} = transform(rows, columns, aliases, :csv)
Enum.into(stream, File.stream!("output.csv"))

Summary

Functions

Transform rows to a stream with the specified inner format.

Create a stream that yields results in chunks, useful for pagination or batch processing.

Create a streaming transformation that processes rows one at a time.

Transform and write results directly to an IO device or file stream.

Functions

transform(rows, columns, aliases, inner_format, options \\ [])

@spec transform(list() | Enumerable.t(), list(), map(), atom() | tuple(), keyword()) ::
  {:ok, Enumerable.t()} | {:error, term()}

Transform rows to a stream with the specified inner format.

Parameters

  • rows - List of row data or stream of rows
  • columns - List of column names
  • aliases - Map of column aliases
  • inner_format - The format to transform each row/batch to
  • options - Transformation options

Options

  • :batch_size - Number of rows to process in each batch. Default: 1000
  • :parallel - Whether to process batches in parallel. Default: false
  • Other options are passed to the inner transformer

Inner Format Support

  • :maps - Stream of maps
  • {:maps, opts} - Stream of maps with options
  • :json - Stream of JSON strings (JSON Lines format by default)
  • {:json, opts} - Stream of JSON with options
  • :csv - Stream of CSV lines
  • {:csv, opts} - Stream of CSV with options
  • :raw - Stream of raw row lists

Examples

# Basic streaming to maps
{:ok, stream} = transform(rows, columns, aliases, :maps)

# Streaming with custom batch size
{:ok, stream} = transform(rows, columns, aliases, :maps, batch_size: 500)

# Streaming to JSON Lines
{:ok, stream} = transform(rows, columns, aliases, {:json, format: :lines})

transform_chunked(rows, columns, aliases, inner_format, options \\ [])

@spec transform_chunked(Enumerable.t(), list(), map(), atom() | tuple(), keyword()) ::
  {:ok, Enumerable.t()} | {:error, term()}

Create a stream that yields results in chunks, useful for pagination or batch processing.

Examples

{:ok, chunked_stream} = transform_chunked(rows, columns, aliases, :maps, chunk_size: 100)
Enum.each(chunked_stream, fn chunk ->
  # Process 100 maps at a time
  process_batch(chunk)
end)

transform_single(rows, columns, aliases, inner_format, options \\ [])

@spec transform_single(Enumerable.t(), list(), map(), atom() | tuple(), keyword()) ::
  {:ok, Enumerable.t()} | {:error, term()}

Create a streaming transformation that processes rows one at a time.

This is more memory efficient for simple transformations but may be slower for complex transformations that benefit from batching.

transform_to_io(rows, columns, aliases, inner_format, io_device, options \\ [])

@spec transform_to_io(
  Enumerable.t(),
  list(),
  map(),
  atom() | tuple(),
  IO.device(),
  keyword()
) ::
  :ok | {:error, term()}

Transform and write results directly to an IO device or file stream.

This is useful for writing large result sets directly to files without buffering all data in memory.

Examples

# Write to file
File.open!("output.csv", [:write], fn file ->
  transform_to_io(rows, columns, aliases, :csv, file)
end)