Structured logging for PgFlow, aligned with the TypeScript reference implementation.
This module provides consistent, structured logging across all PgFlow components. It supports two output modes:
:fancy- Colorful, human-readable output for development:simple- Structured key=value format for production/log aggregation
Configuration
Configure the log format in your config:
config :pgflow, :log_format, :fancy # or :simpleThe format defaults to :fancy in dev/test and :simple in prod.
Empty polling logs are disabled by default. Enable them only when debugging
worker polling behavior:
config :pgflow, :log_empty_polls, trueUsage
alias PgFlow.Logger, as: PgLogger
# Task lifecycle
PgLogger.task_started(ctx)
PgLogger.task_completed(ctx, duration_ms)
PgLogger.task_failed(ctx, error, retry_info)
# Worker lifecycle
PgLogger.startup_banner(startup_ctx)
PgLogger.polling(worker_name)
PgLogger.task_count(worker_name, count)
PgLogger.shutdown(worker_name, :waiting | :stopped)Metadata
All log calls include structured metadata that can be filtered and queried by log aggregation systems. Use Logger's metadata filtering to control output.
Summary
Functions
Logs that the worker is polling for messages.
Logs when a flow run completes.
Logs when a flow run fails.
Logs when a flow run starts.
Logs worker shutdown phases.
Logs the worker startup banner with flow compilation status.
Logs when a task completes successfully.
Logs the number of tasks found after polling.
Logs when a task fails.
Logs when a task execution begins.
Types
@type retry_info() :: %{ attempt: pos_integer(), max_attempts: pos_integer(), delay_seconds: number() }
@type shutdown_phase() :: :waiting | :stopped
Functions
@spec polling(String.t()) :: :ok
Logs that the worker is polling for messages.
@spec run_completed(String.t() | atom(), String.t(), non_neg_integer()) :: :ok
Logs when a flow run completes.
@spec run_failed(String.t() | atom(), String.t(), non_neg_integer(), String.t()) :: :ok
Logs when a flow run fails.
Logs when a flow run starts.
@spec shutdown(String.t(), shutdown_phase()) :: :ok
Logs worker shutdown phases.
Phases:
:waiting- Waiting for in-flight tasks to complete:stopped- Worker has stopped gracefully
@spec startup_banner(startup_context()) :: :ok
Logs the worker startup banner with flow compilation status.
Example
PgFlow.Logger.startup_banner(%{
worker_name: "pgflow-orders",
worker_id: "abc-123",
queue_name: "process_order",
flows: [
%{flow_slug: "process_order", status: :compiled}
]
})
@spec task_completed(task_context(), non_neg_integer()) :: :ok
Logs when a task completes successfully.
Example
PgFlow.Logger.task_completed(ctx, 150) # completed in 150ms
@spec task_count(String.t(), non_neg_integer()) :: :ok
Logs the number of tasks found after polling.
@spec task_failed(task_context(), String.t(), retry_info() | nil) :: :ok
Logs when a task fails.
If retry_info is provided, includes retry attempt information.
Example
PgFlow.Logger.task_failed(ctx, "Connection timeout", %{
attempt: 2,
max_attempts: 3,
delay_seconds: 10
})
@spec task_started(task_context()) :: :ok
Logs when a task execution begins.
Example
PgFlow.Logger.task_started(%{
worker_name: "pgflow-orders",
worker_id: "abc-123",
flow_slug: "process_order",
step_slug: "validate",
run_id: "run-456",
msg_id: 789,
task_index: 0
})