Real-time Pipeline Guide

Copy Markdown View Source

Build real-time media processing pipelines using the MediaPipeline GenServer behaviour. Designed for livestreaming, camera effects, and other low-latency use cases.

MediaPipeline Behaviour

defmodule MyPipeline do
  use ExCubecl.MediaPipeline

  @impl true
  def handle_frame(frame, state) do
    # Process the frame
    {:ok, new_state}
  end
end

The handle_frame/2 callback receives each incoming frame and the current state. Return {:ok, new_state} to continue or {:error, reason} to stop.

Basic Example

defmodule SimpleFilter do
  use ExCubecl.MediaPipeline

  def handle_frame(frame, state) do
    frame
    |> ExCubecl.Filter.apply(:gaussian_blur, radius: 2)
    |> ExCubecl.Transcode.write_frame(state.encoder)

    {:ok, state}
  end
end

# Start the pipeline
{:ok, enc} = ExCubecl.Transcode.start("output.mp4",
  video: [codec: :h264, width: 1280, height: 720]
)

{:ok, pid} = ExCubecl.MediaPipeline.start_link(SimpleFilter, %{encoder: enc})

# Push frames
ExCubecl.MediaPipeline.push_frame(pid, frame)

Livestream with Overlay

defmodule Livestream do
  use ExCubecl.MediaPipeline

  def handle_frame(frame, state) do
    frame
    |> ExCubecl.Filter.apply(:denoise, strength: 0.3)
    |> ExCubecl.Video.overlay(state.logo, x: 10, y: 10, alpha: 0.9)
    |> ExCubecl.Transcode.write_frame(state.encoder)

    {:ok, state}
  end
end

Camera Effects Pipeline

defmodule CameraEffects do
  use ExCubecl.MediaPipeline

  def handle_frame(frame, state) do
    frame
    |> ExCubecl.Filter.apply(:brightness_contrast, brightness: 0.05, contrast: 1.1)
    |> ExCubecl.Filter.apply(:lut, file: "portrait.cube")
    |> ExCubecl.Transcode.write_frame(state.encoder)

    {:ok, %{state | frame_count: state.frame_count + 1}}
  end
end

{:ok, pid} = ExCubecl.MediaPipeline.start_link(CameraEffects, %{
  encoder: enc,
  frame_count: 0
})

Multi-Stage Processing

defmodule MultiStage do
  use ExCubecl.MediaPipeline

  def handle_frame(frame, state) do
    # Stage 1: Denoise
    {:ok, clean} = ExCubecl.Filter.apply(frame, :denoise, strength: 0.5)

    # Stage 2: Color grade
    {:ok, graded} = ExCubecl.Filter.apply(clean, :lut, file: "cinematic.cube")

    # Stage 3: Composite
    {:ok, composited} = ExCubecl.Video.overlay(graded, state.lower_third,
      x: 50, y: 620, alpha: 1.0)

    # Stage 4: Encode
    :ok = ExCubecl.Transcode.write_frame(state.encoder, composited)

    {:ok, state}
  end
end

Error Handling

defmodule RobustPipeline do
  use ExCubecl.MediaPipeline

  def handle_frame(frame, state) do
    case ExCubecl.Filter.apply(frame, :gaussian_blur, radius: 2) do
      {:ok, processed} ->
        case ExCubecl.Transcode.write_frame(state.encoder, processed) do
          :ok ->
            {:ok, state}

          {:error, reason} ->
            IO.puts("Encode error: #{inspect(reason)}")
            {:ok, state}  # Continue with next frame
        end

      {:error, reason} ->
        IO.puts("Filter error: #{inspect(reason)}")
        {:ok, state}  # Continue with next frame
    end
  end
end

Named Pipelines

{:ok, _pid} = ExCubecl.MediaPipeline.start_link(MyPipeline, state, name: :livestream)

# Push frame by name
ExCubecl.MediaPipeline.push_frame(:livestream, frame)

Architecture

          
 Media Source   MediaPipeline   Transcoder  
 (FFmpeg)            (GenServer)          (FFmpeg)    
          
                           
                    
                      GPU Kernels 
                      (CubeCL)    
                    

Performance Tips

  1. Batch frames: Process multiple frames in handle_frame when possible.
  2. Minimize readbacks: Avoid Video.snapshot/2 in the hot path.
  3. Use filter chains: Filter.chain/2 is more efficient than individual apply/2 calls.
  4. Pipeline mode: For fixed processing graphs, use ExCubecl.pipeline() directly instead of the GenServer for lower overhead.

Full Example: RTMP Ingest → Process → HLS Output

defmodule RTMPProcessor do
  use ExCubecl.MediaPipeline

  def start_link(opts) do
    {:ok, src} = ExCubecl.Media.open("rtmp://ingest/server")
    {:ok, enc} = ExCubecl.Transcode.start("output/hls/playlist.m3u8",
      video: [codec: :h264, width: 1280, height: 720, bitrate: "3M"],
      audio: [codec: :aac, bitrate: "128k"]
    )

    state = %{
      source: src,
      encoder: enc,
      logo: opts[:logo],
      frame_count: 0
    }

    ExCubecl.MediaPipeline.start_link(__MODULE__, state, name: __MODULE__)
  end

  def handle_frame(frame, state) do
    frame
    |> ExCubecl.Filter.apply(:denoise, strength: 0.3)
    |> ExCubecl.Video.overlay(state.logo, x: 10, y: 10)
    |> ExCubecl.Transcode.write_frame(state.encoder)

    {:ok, %{state | frame_count: state.frame_count + 1}}
  end
end