temporal_sdk_activity behaviour (temporal_sdk v0.2.20)
View SourceTemporal activity task module.
OpenTelemetry
Activity execution inherits its OpenTelemetry trace from the parent workflow. Traces propagate across
Temporal tasks via task headers, using the W3C Trace Context standard through otel_propagator_text_map.
Once activity execution starts, a new OpenTelemetry "RunActivity" span is created using the parent
workflow's context. OpenTelemetry context is attached to the activity execution process, enabling
standard OpenTelemetry commands, such as adding user-defined spans, attributes, events, etc.
"RunActivity" span is created after activity task is polled and execution processing starts using
worker node local time.
"RunActivity" span includes an OpenTelemetry event "StartActivityTask" created at the activity task
(server) started_time.
SDK Samples Otel Sample demonstrates how to extract baggage from the inherited trace context, add span attributes, and start a new span.
Summary
Types
-type cancel_action() :: {cancel, CanceledDetails :: temporal_sdk:term_to_payloads()}.
-type complete_action() :: {complete, Result :: temporal_sdk:term_to_payloads()}.
-type context() :: #{cluster := temporal_sdk_cluster:cluster_name(), executor_pid := pid(), task := task(), worker_opts := temporal_sdk_worker:opts(), started_at := SystemTime :: integer(), task_timeout := erlang:timeout(), header => temporal_sdk:term_from_mapstring_payload()}.
-type data() :: term().
-type data_action() :: {data, NewData :: data()}.
-type fail_action() :: {fail, {Source :: temporal_sdk:serializable(), Message :: temporal_sdk:serializable(), Stacktrace :: temporal_sdk:serializable()}}.
-type handler_context() :: #{data := data(), cancel_requested := boolean(), activity_paused := boolean(), last_heartbeat := heartbeat(), elapsed_time := non_neg_integer(), remaining_time := erlang:timeout()}.
-type heartbeat() :: temporal_sdk:term_to_payloads().
-type heartbeat_action() :: heartbeat | {heartbeat, Heartbeat :: heartbeat()}.
-type terminate_action() :: cancel_action() | complete_action() | fail_action().
Callbacks
-callback execute(Context :: context(), Input :: temporal_sdk:term_from_payloads()) -> Result :: temporal_sdk:term_to_payloads().
-callback handle_cancel(HandlerContext :: handler_context()) -> terminate_action() | ignore.
-callback handle_failure(HandlerContext :: handler_context(), Class :: error | exit | throw | temporal_sdk:serializable(), Reason :: term() | temporal_sdk:serializable(), Stacktrace :: erlang:raise_stacktrace() | temporal_sdk:serializable()) -> ApplicationFailure :: temporal_sdk:application_failure() | temporal_sdk:application_failure_as_list().
-callback handle_heartbeat(HandlerContext :: handler_context()) -> terminate_action() | heartbeat_action().
-callback handle_message(HandlerContext :: handler_context(), Message :: term()) -> terminate_action() | data_action() | ignore.
-callback terminate(HandlerContext :: handler_context()) -> term().
Functions
-spec await_data(EtsPattern :: term(), Timeout :: erlang:timeout()) -> {ok, data()} | timeout | invalid_pattern | no_return().
-spec cancel(CanceledDetails :: temporal_sdk:term_to_payloads()) -> no_return().
-spec complete(Result :: temporal_sdk:term_to_payloads()) -> no_return().
-spec elapsed_time() -> NativeTime :: non_neg_integer() | no_return().
-spec elapsed_time(Unit :: erlang:time_unit()) -> non_neg_integer() | no_return().
-spec fail(ApplicationFailure :: temporal_sdk:application_failure() | temporal_sdk:application_failure_as_list()) -> no_return().
-spec fail(Class :: error | exit | throw | temporal_sdk:serializable(), Reason :: term() | temporal_sdk:serializable(), Stacktrace :: erlang:raise_stacktrace() | temporal_sdk:serializable()) -> no_return().
-spec heartbeat() -> ok.
-spec heartbeat(Heartbeat :: heartbeat()) -> ok.
-spec remaining_time() -> NativeTime :: non_neg_integer() | infinity | no_return().
-spec remaining_time(Unit :: erlang:time_unit()) -> non_neg_integer() | infinity | no_return().
-spec set_data(TaskData :: term()) -> ok.