Build real-time media processing pipelines using the MediaPipeline GenServer
behaviour. Designed for livestreaming, camera effects, and other low-latency
use cases.
MediaPipeline Behaviour
defmodule MyPipeline do
use ExCubecl.MediaPipeline
@impl true
def handle_frame(frame, state) do
# Process the frame
{:ok, new_state}
end
endThe handle_frame/2 callback receives each incoming frame and the current
state. Return {:ok, new_state} to continue or {:error, reason} to stop.
Basic Example
defmodule SimpleFilter do
use ExCubecl.MediaPipeline
def handle_frame(frame, state) do
frame
|> ExCubecl.Filter.apply(:gaussian_blur, radius: 2)
|> ExCubecl.Transcode.write_frame(state.encoder)
{:ok, state}
end
end
# Start the pipeline
{:ok, enc} = ExCubecl.Transcode.start("output.mp4",
video: [codec: :h264, width: 1280, height: 720]
)
{:ok, pid} = ExCubecl.MediaPipeline.start_link(SimpleFilter, %{encoder: enc})
# Push frames
ExCubecl.MediaPipeline.push_frame(pid, frame)Livestream with Overlay
defmodule Livestream do
use ExCubecl.MediaPipeline
def handle_frame(frame, state) do
frame
|> ExCubecl.Filter.apply(:denoise, strength: 0.3)
|> ExCubecl.Video.overlay(state.logo, x: 10, y: 10, alpha: 0.9)
|> ExCubecl.Transcode.write_frame(state.encoder)
{:ok, state}
end
endCamera Effects Pipeline
defmodule CameraEffects do
use ExCubecl.MediaPipeline
def handle_frame(frame, state) do
frame
|> ExCubecl.Filter.apply(:brightness_contrast, brightness: 0.05, contrast: 1.1)
|> ExCubecl.Filter.apply(:lut, file: "portrait.cube")
|> ExCubecl.Transcode.write_frame(state.encoder)
{:ok, %{state | frame_count: state.frame_count + 1}}
end
end
{:ok, pid} = ExCubecl.MediaPipeline.start_link(CameraEffects, %{
encoder: enc,
frame_count: 0
})Multi-Stage Processing
defmodule MultiStage do
use ExCubecl.MediaPipeline
def handle_frame(frame, state) do
# Stage 1: Denoise
{:ok, clean} = ExCubecl.Filter.apply(frame, :denoise, strength: 0.5)
# Stage 2: Color grade
{:ok, graded} = ExCubecl.Filter.apply(clean, :lut, file: "cinematic.cube")
# Stage 3: Composite
{:ok, composited} = ExCubecl.Video.overlay(graded, state.lower_third,
x: 50, y: state.height - 100)
# Stage 4: Encode
:ok = ExCubecl.Transcode.write_frame(state.encoder, composited)
{:ok, state}
end
endError Handling
defmodule RobustPipeline do
use ExCubecl.MediaPipeline
def handle_frame(frame, state) do
try do
{:ok, processed} = ExCubecl.Filter.apply(frame, :gaussian_blur, radius: 2)
:ok = ExCubecl.Transcode.write_frame(state.encoder, processed)
{:ok, state}
rescue
e ->
IO.puts("Frame processing error: #{inspect(e)}")
{:ok, state} # Continue with next frame
end
end
endNamed Pipelines
{:ok, _pid} = ExCubecl.MediaPipeline.start_link(MyPipeline, state, name: :livestream)
# Push frame by name
ExCubecl.MediaPipeline.push_frame(:livestream, frame)Architecture
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Media Source │────▶│ MediaPipeline │────▶│ Transcoder │
│ (FFmpeg) │ │ (GenServer) │ │ (FFmpeg) │
└──────────────┘ └──────────────┘ └──────────────┘
│
┌──────┴──────┐
│ GPU Kernels │
│ (CubeCL) │
└─────────────┘Performance Tips
- Batch frames: Process multiple frames in
handle_framewhen possible. - Minimize readbacks: Avoid
Video.snapshot/2in the hot path. - Use filter chains:
Filter.chain/2is more efficient than individualapply/2calls. - Pipeline mode: For fixed processing graphs, use
ExCubecl.pipeline()directly instead of the GenServer for lower overhead.
Full Example: RTMP Ingest → Process → HLS Output
defmodule RTMPProcessor do
use ExCubecl.MediaPipeline
def start_link(opts) do
{:ok, src} = ExCubecl.Media.open("rtmp://ingest/server")
{:ok, enc} = ExCubecl.Transcode.start("output/hls/playlist.m3u8",
video: [codec: :h264, width: 1280, height: 720, bitrate: "3M"],
audio: [codec: :aac, bitrate: "128k"]
)
state = %{
source: src,
encoder: enc,
logo: opts[:logo],
frame_count: 0
}
ExCubecl.MediaPipeline.start_link(__MODULE__, state, name: __MODULE__)
end
def handle_frame(frame, state) do
frame
|> ExCubecl.Filter.apply(:denoise, strength: 0.3)
|> ExCubecl.Video.overlay(state.logo, x: 10, y: 10)
|> ExCubecl.Transcode.write_frame(state.encoder)
{:ok, %{state | frame_count: state.frame_count + 1}}
end
end