Rindle.Workers.MuxIngestVariant (Rindle v0.1.5)

Copy Markdown View Source

Push a Rindle-produced AV variant to Mux from server context.

The worker reads the source variant via a private signed storage URL (Rindle.Delivery.url(profile, key, expires_in: 1_800)), calls Rindle.Streaming.Provider.Mux.create_asset_with_retry_hint/3, persists the resulting provider_asset_id + playback_ids (PLURAL ARRAY) into a media_provider_assets row, and advances the FSM pending → uploading → processing.

Adopter wiring (Phase 36 owns the canonical guide)

config :my_app, Oban,
  queues: [rindle_provider: 4]

Job arguments

%{
  "asset_id" => binary_id,
  "profile" => "MyApp.Profiles.Web",            # module name as string
  "variant_name" => "hero",
  "expected_storage_key" => storage_key_at_enqueue,
  "expected_recipe_digest" => recipe_digest_at_enqueue
}

The two expected_* fields are the captured-at-enqueue values used by the atomic-promote race protection (mirrors process_variant.ex:244-275 verbatim — AV-03-10).

Note: variant_name lives ONLY in Oban job args (and in the Oban unique key for job-level idempotency). It is NOT a column on media_provider_assets. The row-level uniqueness is (asset_id, profile, provider_name) — different variants of the same asset+profile share one provider row, by design (Phase 33 schema).

Telemetry contract (security invariant 14 enforced via

MediaProviderAsset.redact_id/1 on every metadata asset_id)

[:rindle, :provider, :ingest, :start]
  measurements: %{system_time}
  metadata:     %{profile, provider, asset_id, variant_name}

[:rindle, :provider, :ingest, :stop]
  measurements: %{system_time, duration}
  metadata:     %{profile, provider, asset_id, variant_name}

[:rindle, :provider, :ingest, :exception]
  measurements: %{system_time, duration?}
  metadata:     %{profile, provider, asset_id, variant_name, kind}
                # kind: :error | :cancelled

Idempotency — two layers

  1. JOB LEVEL (Oban unique): keys on (asset_id, profile, variant_name) across [:scheduled, :executing, :retryable, :completed] states with a period: 86_400 (24h) cooldown. Use unique: unique_job_opts() when enqueueing (matches process_variant.ex:51 shape).

  2. PERFORM LEVEL: if the worker is re-invoked while the row is already in :uploading, :processing, or :ready, the worker logs and returns :ok immediately. It does NOT re-call the adapter and does NOT attempt the forbidden processing → uploading FSM edge (provider_asset_fsm.ex:9-16).

Telemetry — kind metadata

[:rindle, :provider, :ingest, :exception] events carry an additional metadata.kind key:

  • :cancelled — atomic-promote race aborted the job ({:cancel, ...})
  • :error — a genuine failure ({:error, _})

Adopters can route the two cases differently in their handlers.

Summary

Functions

Oban unique opts for job-level idempotency. Wrap as unique: unique_job_opts() when enqueueing — matches the process_variant.ex:51 [unique: unique_job_opts()] shape.

Functions

unique_job_opts()

@spec unique_job_opts() :: keyword()

Oban unique opts for job-level idempotency. Wrap as unique: unique_job_opts() when enqueueing — matches the process_variant.ex:51 [unique: unique_job_opts()] shape.

Differs from process_variant.ex:408-415 by adding :profile to keys (since the same asset_id can ingest into multiple profiles) and using period: 86_400 instead of :infinity (re-ingest is possible after 24h).

Includes :available in states because Oban inserts newly-enqueued jobs in :available state by default — without it the unique constraint never fires for the most common dedup case (re-enqueue right after the first insert, before the worker picks up the job).