TemporalSdk.Workflow (temporal_sdk v0.2.20)

View Source

Temporal workflow task module.

WIP Temporal commands:

  • start_activity WIP: session_execution, direct_execution, direct_result

TODO Temporal commands:

  • start_nexus/4
  • start_nexus/5
  • cancel_nexus
  • upsert_workflow_search_attributes
  • cancel_external_workflow
  • signal_external_workflow

OpenTelemetry

Workflow executions OpenTelemetry tracing is handled internally by the workflow executor state machine process. Tracing can be configured when starting a new workflow execution using TemporalSdk.start_workflow/4.

Following spans are created for each workflow execution:

  • "StartWorkflow": span created with TemporalSdk.start_workflow/4 command,
  • "RunWorkflow": span created after workflow task is polled and execution processing starts,
  • "RunExecution": span created for each parallel execution.

Following spans are created for the workflow commands:

  • "StartActvity": when activity is started,
  • "StartChildWorkflow": when child workflow is started,
  • "StartMarker" and "RunMarker": when record marker command is executed. "RunMarker" proceeds "StartMarker" span, as marker value must be evaluated before dispatching completed workflow task to Temporal server.

By default, spans are enabled for all workflow commands above. Spans can be disabled for each command by setting the :opentelemetry command options key to false.

Workflow and commands spans and otel_add_event/2 command are using local worker node time for OpenTelemetry timestamps. "RunWorkflow" span includes an OpenTelemetry event "StartWorkflowTask", which marks last known history event created by Temporal server, using (server) event time as a timestamp.

OpenTelemetry traces are propagated using task headers, serialized with the W3C Trace Context standard via :otel_propagator_text_map. Spans and OpenTelemetry commands are created and exported only during live workflow execution. They are suppressed during workflow replay to prevent duplicate entries in the APM backend.

SDK doesn't support custom workflow spans creation, as managing them across workflow executor failures and replays would be challenging. Each parallel execution starts its own span, this functionality should be used instead of regular spans.

Following SDK Samples provide OpenTelemetry traces screenshots:

Summary

OpenTelemetry commands

Adds an OpenTelemetry event to the given parallel execution span context.

Clears the OpenTelemetry parallel execution span context baggage, removing all the current key-value pairs.

Sets OpenTelemetry attributes to the given parallel execution span context.

Sets the given key-value pair in the given parallel execution OpenTelemetry span context baggage (with the associated metadata).

Awaitables functions

await(await_pattern)

await(await_pattern, opts)

await_all(await_pattern)

await_all(await_pattern, opts)

await_any(await_pattern)

await_any(await_pattern, opts)

await_info(info_or_info_id)

@spec await_info(info_or_info_id :: :temporal_sdk_workflow.info() | term()) ::
  :temporal_sdk_workflow.await_ret() | :noinfo

await_info(info_or_info_id, info_timeout, awaitable_timeout)

@spec await_info(
  info_or_info_id :: :temporal_sdk_workflow.info() | term(),
  info_timeout :: :temporal_sdk.time(),
  awaitable_timeout :: :temporal_sdk.time()
) :: :temporal_sdk_workflow.await_ret() | :noinfo

is_awaited(await_pattern)

@spec is_awaited(await_pattern :: :temporal_sdk_workflow.await_pattern()) ::
  {true, :temporal_sdk_workflow.await_match()}
  | {false, :temporal_sdk_workflow.await_match()}
  | :no_return

is_awaited_all(await_pattern)

@spec is_awaited_all(await_pattern :: [:temporal_sdk_workflow.await_pattern()]) ::
  {true, [:temporal_sdk_workflow.await_match()]}
  | {false, [:temporal_sdk_workflow.await_match()]}
  | :no_return

is_awaited_any(await_pattern)

@spec is_awaited_any(await_pattern :: [:temporal_sdk_workflow.await_pattern()]) ::
  {true, [:temporal_sdk_workflow.await_match()]}
  | {false, [:temporal_sdk_workflow.await_match()]}
  | :no_return

wait(await_pattern)

wait(await_pattern, opts)

wait_all(await_pattern)

wait_all(await_pattern, opts)

wait_any(await_pattern)

wait_any(await_pattern, opts)

wait_info(info_or_info_id)

@spec wait_info(info_or_info_id :: :temporal_sdk_workflow.info() | term()) ::
  :temporal_sdk_workflow.await_match() | no_return()

wait_info(info_or_info_id, info_timeout, awaitable_timeout)

@spec wait_info(
  info_or_info_id :: :temporal_sdk_workflow.info() | term(),
  info_timeout :: :temporal_sdk.time(),
  awaitable_timeout :: :temporal_sdk.time()
) :: :temporal_sdk_workflow.await_match() | no_return()

OpenTelemetry commands

otel_add_event(name, attributes)

@spec otel_add_event(
  name :: :opentelemetry.event_name(),
  attributes :: :opentelemetry.attributes_map()
) ::
  :ok

Adds an OpenTelemetry event to the given parallel execution span context.

Corresponds to the :otel_span.add_event/3 OpenTelemetry command.

otel_clear_baggage()

@spec otel_clear_baggage() :: :ok

Clears the OpenTelemetry parallel execution span context baggage, removing all the current key-value pairs.

Corresponds to the :otel_baggage.clear/0 OpenTelemetry command.

otel_set_attributes(attributes)

@spec otel_set_attributes(attributes :: :opentelemetry.attributes_map()) :: :ok

Sets OpenTelemetry attributes to the given parallel execution span context.

Corresponds to the :otel_span.set_attribute/3 OpenTelemetry command.

otel_set_baggage(key, value, metadata)

@spec otel_set_baggage(
  key :: :otel_baggage.key(),
  value :: :otel_baggage.value(),
  metadata :: [
    :unicode.unicode_binary()
    | {:unicode.unicode_binary(), :unicode.unicode_binary()}
  ]
) :: :ok

Sets the given key-value pair in the given parallel execution OpenTelemetry span context baggage (with the associated metadata).

Corresponds to the :otel_baggage.set/3 OpenTelemetry command.

SDK commands

await_open_before_close(is_enabled)

@spec await_open_before_close(is_enabled :: boolean()) :: :ok

evict_workflow()

@spec evict_workflow() :: :ok | no_return()

Requests workflow eviction on next workflow task completion request.

SDK Architecture - Workflow Eviction section provides details about workflow eviction mechanism.

Duplicate eviction requests within the same workflow task cycle are ignored. Eviction requests are ignored during workflow replay.

SDK Samples Eviction Parallel Handler sample demonstrates function usage.

get_workflow_result()

@spec get_workflow_result() :: :temporal_sdk.term_to_payloads() | no_return()

select_history(continuation)

@spec select_history(event_id :: pos_integer()) ::
  :temporal_sdk_workflow.history_event() | :noevent
@spec select_history(
  history_event_pattern :: :temporal_sdk_workflow.history_event_table_pattern()
) :: [
  :temporal_sdk_workflow.ets_match()
]
@spec select_history(
  history_pattern_spec ::
    :temporal_sdk_workflow.history_event_table_pattern_match_spec()
) :: [:temporal_sdk_workflow.ets_match()]
@spec select_history(continuation :: :temporal_sdk_workflow.ets_continuation()) ::
  {[:temporal_sdk_workflow.ets_match()],
   continuation :: :temporal_sdk_workflow.ets_continuation()}
  | :"$end_of_table"

select_history(history_pattern_spec, limit)

@spec select_history(
  history_pattern_spec ::
    :temporal_sdk_workflow.history_event_table_pattern_match_spec(),
  limit :: pos_integer()
) ::
  {[:temporal_sdk_workflow.ets_match()],
   continuation :: :temporal_sdk_workflow.ets_continuation()}
  | :"$end_of_table"

select_index(continuation)

@spec select_index(
  awaitable_index_pattern :: :temporal_sdk_workflow.awaitable_index_pattern()
) :: [
  :temporal_sdk_workflow.ets_match()
]
@spec select_index(
  index_pattern_spec ::
    :temporal_sdk_workflow.awaitable_index_pattern_match_spec()
) ::
  [:temporal_sdk_workflow.ets_match()]
@spec select_index(continuation :: :temporal_sdk_workflow.ets_continuation()) ::
  {[:temporal_sdk_workflow.ets_match()],
   continuation :: :temporal_sdk_workflow.ets_continuation()}
  | :"$end_of_table"

select_index(index_pattern_spec, limit)

@spec select_index(
  index_pattern_spec ::
    :temporal_sdk_workflow.awaitable_index_pattern_match_spec(),
  limit :: pos_integer()
) ::
  {[:temporal_sdk_workflow.ets_match()],
   continuation :: :temporal_sdk_workflow.ets_continuation()}
  | :"$end_of_table"

set_info(info_value)

@spec set_info(info_value :: term()) :: :temporal_sdk_workflow.info() | no_return()

set_info(info_value, opts)

@spec set_info(
  info_value :: term(),
  opts :: [
    info_id: :temporal_sdk_workflow.execution_id(),
    awaitable_id: :temporal_sdk_workflow.awaitable_id()
  ]
) :: :temporal_sdk_workflow.info() | no_return()

set_workflow_result(workflow_result)

@spec set_workflow_result(workflow_result :: :temporal_sdk.term_to_payloads()) :: :ok

start_execution(function)

@spec start_execution(function :: atom()) ::
  :temporal_sdk_workflow.execution() | no_return()

start_execution(function, input)

@spec start_execution(function :: atom(), input :: term()) ::
  :temporal_sdk_workflow.execution() | no_return()

start_execution(function, input, opts)

start_execution(module, function, input, opts)

@spec start_execution(
  module :: module(),
  function :: atom(),
  input :: term(),
  opts :: :temporal_sdk_workflow.start_execution_opts()
) ::
  :temporal_sdk_workflow.execution()
  | :temporal_sdk_workflow.execution_data()
  | no_return()

terminate_executor()

@spec terminate_executor() :: :ok

terminate_executor(reason)

@spec terminate_executor(reason :: term()) :: :ok

workflow_info()

@spec workflow_info() :: :temporal_sdk_workflow.workflow_info() | no_return()

Temporal commands

cancel_activity(activity_or_activity_data)

@spec cancel_activity(
  activity_or_activity_data ::
    :temporal_sdk_workflow.activity() | :temporal_sdk_workflow.activity_data()
) :: :temporal_sdk_workflow.activity() | no_return()

cancel_activity(activity_or_activity_data, opts)

@spec cancel_activity(
  activity_or_activity_data ::
    :temporal_sdk_workflow.activity() | :temporal_sdk_workflow.activity_data(),
  opts :: [
    {:awaitable_event,
     :cmd | :cancel_request | :result | :schedule | :start | :close}
    | {:wait, boolean()}
    | :wait
  ]
) ::
  :temporal_sdk_workflow.activity()
  | :temporal_sdk_workflow.activity_data()
  | no_return()

cancel_timer(timer_or_timer_data_or_timer_id)

@spec cancel_timer(
  timer_or_timer_data_or_timer_id ::
    :temporal_sdk_workflow.timer()
    | :temporal_sdk_workflow.timer_data()
    | :unicode.chardata()
    | atom()
) :: :temporal_sdk_workflow.timer() | no_return()

cancel_timer(timer_or_timer_data_or_timer_id, opts)

@spec cancel_timer(
  timer_or_timer_data_or_timer_id ::
    :temporal_sdk_workflow.timer()
    | :temporal_sdk_workflow.timer_data()
    | :unicode.chardata()
    | atom(),
  opts :: [
    {:awaitable_event,
     :cmd | :cancel_request | :result | :schedule | :start | :close}
    | {:wait, boolean()}
    | :wait
  ]
) ::
  :temporal_sdk_workflow.timer()
  | :temporal_sdk_workflow.timer_data()
  | no_return()

cancel_workflow_execution(details)

@spec cancel_workflow_execution(details :: :temporal_sdk.term_to_payloads()) ::
  :temporal_sdk_workflow.cancel_workflow_execution()

complete_workflow_execution(result)

@spec complete_workflow_execution(result :: :temporal_sdk.term_to_payloads()) ::
  :temporal_sdk_workflow.complete_workflow_execution()

continue_as_new_workflow(task_queue, workflow_type)

@spec continue_as_new_workflow(
  task_queue :: :unicode.chardata(),
  workflow_type :: atom() | :unicode.chardata()
) :: :temporal_sdk_workflow.continue_as_new_workflow()

continue_as_new_workflow(task_queue, workflow_type, opts)

@spec continue_as_new_workflow(
  task_queue :: :unicode.chardata(),
  workflow_type :: atom() | :unicode.chardata(),
  opts :: :temporal_sdk_workflow.continue_as_new_workflow_opts()
) :: :temporal_sdk_workflow.continue_as_new_workflow()

fail_workflow_execution(application_failure)

modify_workflow_properties(upserted_memo_fields)

@spec modify_workflow_properties(
  upserted_memo_fields :: :temporal_sdk.term_to_mapstring_payload()
) ::
  :temporal_sdk_workflow.workflow_properties() | no_return()

modify_workflow_properties(upserted_memo_fields, opts)

@spec modify_workflow_properties(
  upserted_memo_fields :: :temporal_sdk.term_to_mapstring_payload(),
  opts :: [{:awaitable_event, :cmd | :close} | {:wait, boolean()} | :wait]
) ::
  :temporal_sdk_workflow.workflow_properties()
  | :temporal_sdk_workflow.workflow_properties_data()
  | no_return()

record_marker(marker_value_fun)

@spec record_marker(
  marker_value_fun :: :temporal_sdk_workflow.record_marker_value_fun()
) ::
  :temporal_sdk_workflow.marker() | no_return()

record_marker(marker_value_fun, opts)

start_activity(activity_type, input)

@spec start_activity(
  activity_type :: :unicode.chardata() | atom(),
  input :: :temporal_sdk.term_to_payloads()
) :: :temporal_sdk_workflow.activity() | no_return()

start_activity(activity_type, input, opts)

start_child_workflow(task_queue, workflow_type)

@spec start_child_workflow(
  task_queue :: :unicode.chardata(),
  workflow_type :: atom() | :unicode.chardata()
) ::
  :temporal_sdk_workflow.child_workflow()
  | :temporal_sdk_workflow.child_workflow_data()
  | no_return()

start_child_workflow(task_queue, workflow_type, opts)

start_nexus(endpoint, service, operation, input)

@spec start_nexus(
  endpoint :: atom() | :unicode.chardata(),
  service :: atom() | :unicode.chardata(),
  operation :: atom() | :unicode.chardata(),
  input :: :temporal_sdk.term_to_payload()
) :: :temporal_sdk_workflow.nexus()

start_nexus(endpoint, service, operation, input, opts)

@spec start_nexus(
  endpoint :: atom() | :unicode.chardata(),
  service :: atom() | :unicode.chardata(),
  operation :: atom() | :unicode.chardata(),
  input :: :temporal_sdk.term_to_payload(),
  opts :: :temporal_sdk_workflow.start_nexus_opts()
) :: :temporal_sdk_workflow.nexus()

start_timer(start_to_fire_timeout)

@spec start_timer(start_to_fire_timeout :: :temporal_sdk.time()) ::
  :temporal_sdk_workflow.timer() | no_return()

start_timer(start_to_fire_timeout, opts)

Temporal external commands

admit_signal(signal_or_signal_name)

@spec admit_signal(
  signal_or_signal_name :: :temporal_sdk_workflow.signal() | :unicode.chardata()
) ::
  :temporal_sdk_workflow.signal()

admit_signal(signal_or_signal_name, opts)

respond_query(query_or_query_type, opts)

Temporal marker commands

record_app_env(par)

@spec record_app_env(par :: atom()) :: :temporal_sdk_workflow.marker() | no_return()

record_app_env(par, opts)

Retrieves the application configuration parameter via :application.get_env/1 and records its value as a marker.

record_os_env(var_name)

@spec record_os_env(var_name :: :os.env_var_name()) ::
  :temporal_sdk_workflow.marker() | no_return()

record_os_env(var_name, opts)

Retrieves the OS environment variable via :os.getenv/1 and records its value as a marker.

record_rand_uniform()

@spec record_rand_uniform() :: :temporal_sdk_workflow.marker() | no_return()

record_rand_uniform(range_or_opts)

record_rand_uniform(range, opts)

record_system_time()

@spec record_system_time() :: :temporal_sdk_workflow.marker() | no_return()

record_system_time(unit_or_opts)

record_system_time(unit, opts)

record_uuid4()

@spec record_uuid4() :: :temporal_sdk_workflow.marker() | no_return()

record_uuid4(opts)