TimelessLogs (timeless_logs v1.4.12)

Copy Markdown View Source

Embedded log compression and indexing for Elixir applications.

TimelessLogs plugs into Elixir's Logger as a handler, compresses log entries into compressed blocks, and indexes them in ETS for fast querying.

Setup

# config/config.exs
config :timeless_logs,
  data_dir: "priv/log_stream"

The handler is installed automatically when the application starts.

Querying

# Find error logs from the last hour
TimelessLogs.query(level: :error, since: DateTime.add(DateTime.utc_now(), -3600))

# Paginated results
TimelessLogs.query(level: :info, limit: 50, offset: 100, order: :asc)

# Search log messages with metadata
TimelessLogs.query(message: "timeout", metadata: %{service: "api"})

Summary

Functions

Create a consistent online backup of the log store.

Collect all field names and hit counts from matching entries.

Collect distinct values and hit counts for a given field.

Flush the buffer, writing any pending log entries to disk immediately.

Ingest multiple log entries in one call.

Merge multiple small compressed blocks into fewer, larger blocks.

Query stored logs. Returns a TimelessLogs.Result struct.

Return aggregate statistics about stored log data without reading any blocks.

Lazily stream matching log entries without loading all results into memory.

Subscribe the calling process to receive new log entries as they arrive.

Unsubscribe the calling process from log entry notifications.

Functions

backup(target_dir)

@spec backup(String.t()) :: {:ok, map()} | {:error, term()}

Create a consistent online backup of the log store.

Flushes all in-flight data, writes an ETS index snapshot, and copies block files to the target directory.

Parameters

  • target_dir - Directory to write backup files into (will be created)

Returns

{:ok, %{path: target_dir, files: [filenames], total_bytes: integer()}}

Examples

TimelessLogs.backup("/tmp/log_backup_2024")

field_names(filters \\ [])

@spec field_names(keyword()) :: {:ok, [map()]}

Collect all field names and hit counts from matching entries.

Always includes _msg, _time, and level. Metadata keys are also included.

Examples

TimelessLogs.field_names(since: DateTime.add(DateTime.utc_now(), -3600))
#=> {:ok, [%{"value" => "_msg", "hits" => 1094}, ...]}

field_values(field_name, filters \\ [])

@spec field_values(
  String.t(),
  keyword()
) :: {:ok, [map()]}

Collect distinct values and hit counts for a given field.

Handles built-in fields ("_msg", "_time", "level") and metadata fields.

Examples

TimelessLogs.field_values("level", since: DateTime.add(DateTime.utc_now(), -3600))
#=> {:ok, [%{"value" => "info", "hits" => 706}, %{"value" => "error", "hits" => 42}]}

flush()

@spec flush() :: :ok

Flush the buffer, writing any pending log entries to disk immediately.

ingest(entries)

@spec ingest([map()]) :: :ok

Ingest multiple log entries in one call.

This is the lower-overhead path for batch producers such as NDJSON imports.

merge_now()

@spec merge_now() :: :ok | :noop

Merge multiple small compressed blocks into fewer, larger blocks.

Returns :ok if blocks were merged, :noop if no merge was needed.

normalize_filters(filters)

@spec normalize_filters(keyword()) :: keyword()

query(filters \\ [])

@spec query(keyword()) :: {:ok, TimelessLogs.Result.t()} | {:error, term()}

Query stored logs. Returns a TimelessLogs.Result struct.

Filters

  • :level - Log level atom (:debug, :info, :warning, :error)
  • :message - Substring match on log message or metadata values
  • :since - DateTime or unix timestamp lower bound
  • :until - DateTime or unix timestamp upper bound
  • :metadata - Map of metadata key/value pairs to match

Pagination & Ordering

  • :limit - Max entries to return (default 100)
  • :offset - Number of entries to skip (default 0)
  • :order - :desc (newest first, default) or :asc (oldest first)

Examples

TimelessLogs.query(level: :error)
#=> {:ok, %TimelessLogs.Result{entries: [...], total: 42, limit: 100, offset: 0}}

TimelessLogs.query(level: :warning, limit: 10, offset: 20)
TimelessLogs.query(message: "connection refused", metadata: %{service: "api"})

stats()

@spec stats() :: {:ok, TimelessLogs.Stats.t()}

Return aggregate statistics about stored log data without reading any blocks.

Examples

{:ok, stats} = TimelessLogs.stats()
stats.total_blocks   #=> 42
stats.total_entries   #=> 50000
stats.disk_size       #=> 24_000_000

stream(filters \\ [])

@spec stream(keyword()) :: Enumerable.t()

Lazily stream matching log entries without loading all results into memory.

Returns an Elixir Stream that yields %TimelessLogs.Entry{} structs. Blocks are decompressed on demand as the stream is consumed.

Entries are returned in block order (oldest blocks first by default). For fully sorted results across blocks, use query/1 instead.

Accepts the same filter options as query/1 except :limit, :offset, and :order which are ignored (use Enum.take/2, Stream.drop/2, etc.).

Examples

TimelessLogs.stream(level: :error)
|> Enum.take(10)

TimelessLogs.stream(since: DateTime.add(DateTime.utc_now(), -3600))
|> Stream.filter(& &1.message =~ "timeout")
|> Enum.to_list()

subscribe(opts \\ [])

@spec subscribe(keyword()) :: {:ok, pid()}

Subscribe the calling process to receive new log entries as they arrive.

The subscriber receives messages of the form: {:timeless_logs, :entry, %TimelessLogs.Entry{}}.

Options

  • :level - Only receive entries at this level (e.g., :error)
  • :metadata - Map of metadata key/value pairs that must match

Examples

TimelessLogs.subscribe()
receive do
  {:timeless_logs, :entry, entry} -> IO.inspect(entry)
end

TimelessLogs.subscribe(level: :error)

unsubscribe()

@spec unsubscribe() :: :ok

Unsubscribe the calling process from log entry notifications.