PgFlow.Logger (PgFlow v0.1.0)

Copy Markdown View Source

Structured logging for PgFlow, aligned with the TypeScript reference implementation.

This module provides consistent, structured logging across all PgFlow components. It supports two output modes:

  • :fancy - Colorful, human-readable output for development
  • :simple - Structured key=value format for production/log aggregation

Configuration

Configure the log format in your config:

config :pgflow, :log_format, :fancy  # or :simple

The format defaults to :fancy in dev/test and :simple in prod. Empty polling logs are disabled by default. Enable them only when debugging worker polling behavior:

config :pgflow, :log_empty_polls, true

Usage

alias PgFlow.Logger, as: PgLogger

# Task lifecycle
PgLogger.task_started(ctx)
PgLogger.task_completed(ctx, duration_ms)
PgLogger.task_failed(ctx, error, retry_info)

# Worker lifecycle
PgLogger.startup_banner(startup_ctx)
PgLogger.polling(worker_name)
PgLogger.task_count(worker_name, count)
PgLogger.shutdown(worker_name, :waiting | :stopped)

Metadata

All log calls include structured metadata that can be filtered and queried by log aggregation systems. Use Logger's metadata filtering to control output.

Summary

Functions

Logs that the worker is polling for messages.

Logs when a flow run completes.

Logs when a flow run starts.

Logs worker shutdown phases.

Logs the worker startup banner with flow compilation status.

Logs when a task completes successfully.

Logs the number of tasks found after polling.

Logs when a task execution begins.

Types

retry_info()

@type retry_info() :: %{
  attempt: pos_integer(),
  max_attempts: pos_integer(),
  delay_seconds: number()
}

shutdown_phase()

@type shutdown_phase() :: :waiting | :stopped

startup_context()

@type startup_context() :: %{
  worker_name: String.t(),
  worker_id: String.t(),
  queue_name: String.t(),
  flows: [%{flow_slug: String.t() | atom(), status: atom()}]
}

task_context()

@type task_context() :: %{
  :worker_name => String.t(),
  :worker_id => String.t(),
  :flow_slug => String.t() | atom(),
  :step_slug => String.t() | atom(),
  :run_id => String.t(),
  optional(:msg_id) => integer(),
  optional(:task_index) => non_neg_integer()
}

Functions

polling(worker_name)

@spec polling(String.t()) :: :ok

Logs that the worker is polling for messages.

run_completed(flow_slug, run_id, duration_ms)

@spec run_completed(String.t() | atom(), String.t(), non_neg_integer()) :: :ok

Logs when a flow run completes.

run_failed(flow_slug, run_id, duration_ms, error)

@spec run_failed(String.t() | atom(), String.t(), non_neg_integer(), String.t()) ::
  :ok

Logs when a flow run fails.

run_started(flow_slug, run_id)

@spec run_started(String.t() | atom(), String.t()) :: :ok

Logs when a flow run starts.

shutdown(worker_name, phase)

@spec shutdown(String.t(), shutdown_phase()) :: :ok

Logs worker shutdown phases.

Phases:

  • :waiting - Waiting for in-flight tasks to complete
  • :stopped - Worker has stopped gracefully

startup_banner(ctx)

@spec startup_banner(startup_context()) :: :ok

Logs the worker startup banner with flow compilation status.

Example

PgFlow.Logger.startup_banner(%{
  worker_name: "pgflow-orders",
  worker_id: "abc-123",
  queue_name: "process_order",
  flows: [
    %{flow_slug: "process_order", status: :compiled}
  ]
})

task_completed(ctx, duration_ms)

@spec task_completed(task_context(), non_neg_integer()) :: :ok

Logs when a task completes successfully.

Example

PgFlow.Logger.task_completed(ctx, 150)  # completed in 150ms

task_count(worker_name, count)

@spec task_count(String.t(), non_neg_integer()) :: :ok

Logs the number of tasks found after polling.

task_failed(ctx, error, retry_info \\ nil)

@spec task_failed(task_context(), String.t(), retry_info() | nil) :: :ok

Logs when a task fails.

If retry_info is provided, includes retry attempt information.

Example

PgFlow.Logger.task_failed(ctx, "Connection timeout", %{
  attempt: 2,
  max_attempts: 3,
  delay_seconds: 10
})

task_started(ctx)

@spec task_started(task_context()) :: :ok

Logs when a task execution begins.

Example

PgFlow.Logger.task_started(%{
  worker_name: "pgflow-orders",
  worker_id: "abc-123",
  flow_slug: "process_order",
  step_slug: "validate",
  run_id: "run-456",
  msg_id: 789,
  task_index: 0
})