temporal_sdk_activity behaviour (temporal_sdk v0.2.20)

View Source

Temporal activity task module.

OpenTelemetry

Activity execution inherits its OpenTelemetry trace from the parent workflow. Traces propagate across Temporal tasks via task headers, using the W3C Trace Context standard through otel_propagator_text_map.

Once activity execution starts, a new OpenTelemetry "RunActivity" span is created using the parent workflow's context. OpenTelemetry context is attached to the activity execution process, enabling standard OpenTelemetry commands, such as adding user-defined spans, attributes, events, etc.

"RunActivity" span is created after activity task is polled and execution processing starts using worker node local time. "RunActivity" span includes an OpenTelemetry event "StartActivityTask" created at the activity task (server) started_time.

SDK Samples Otel Sample demonstrates how to extract baggage from the inherited trace context, add span attributes, and start a new span.

Summary

Types

cancel_action()

-type cancel_action() :: {cancel, CanceledDetails :: temporal_sdk:term_to_payloads()}.

complete_action()

-type complete_action() :: {complete, Result :: temporal_sdk:term_to_payloads()}.

context()

-type context() ::
          #{cluster := temporal_sdk_cluster:cluster_name(),
            executor_pid := pid(),
            task := task(),
            worker_opts := temporal_sdk_worker:opts(),
            started_at := SystemTime :: integer(),
            task_timeout := erlang:timeout(),
            header => temporal_sdk:term_from_mapstring_payload()}.

data()

-type data() :: term().

data_action()

-type data_action() :: {data, NewData :: data()}.

fail_action()

-type fail_action() ::
          {fail,
           {Source :: temporal_sdk:serializable(),
            Message :: temporal_sdk:serializable(),
            Stacktrace :: temporal_sdk:serializable()}}.

handler_context()

-type handler_context() ::
          #{data := data(),
            cancel_requested := boolean(),
            activity_paused := boolean(),
            last_heartbeat := heartbeat(),
            elapsed_time := non_neg_integer(),
            remaining_time := erlang:timeout()}.

heartbeat()

-type heartbeat() :: temporal_sdk:term_to_payloads().

heartbeat_action()

-type heartbeat_action() :: heartbeat | {heartbeat, Heartbeat :: heartbeat()}.

task()

terminate_action()

-type terminate_action() :: cancel_action() | complete_action() | fail_action().

Callbacks

execute(Context, Input)

-callback execute(Context :: context(), Input :: temporal_sdk:term_from_payloads()) ->
                     Result :: temporal_sdk:term_to_payloads().

handle_cancel(HandlerContext)

(optional)
-callback handle_cancel(HandlerContext :: handler_context()) -> terminate_action() | ignore.

handle_failure(HandlerContext, Class, Reason, Stacktrace)

(optional)
-callback handle_failure(HandlerContext :: handler_context(),
                         Class :: error | exit | throw | temporal_sdk:serializable(),
                         Reason :: term() | temporal_sdk:serializable(),
                         Stacktrace :: erlang:raise_stacktrace() | temporal_sdk:serializable()) ->
                            ApplicationFailure ::
                                temporal_sdk:application_failure() |
                                temporal_sdk:application_failure_as_list().

handle_heartbeat(HandlerContext)

(optional)
-callback handle_heartbeat(HandlerContext :: handler_context()) -> terminate_action() | heartbeat_action().

handle_message(HandlerContext, Message)

(optional)
-callback handle_message(HandlerContext :: handler_context(), Message :: term()) ->
                            terminate_action() | data_action() | ignore.

terminate(HandlerContext)

(optional)
-callback terminate(HandlerContext :: handler_context()) -> term().

Functions

activity_paused()

-spec activity_paused() -> boolean() | no_return().

await_data(EtsPattern)

-spec await_data(EtsPattern :: term()) -> {ok, data()} | timeout | invalid_pattern | no_return().

await_data(EtsPattern, Timeout)

-spec await_data(EtsPattern :: term(), Timeout :: erlang:timeout()) ->
                    {ok, data()} | timeout | invalid_pattern | no_return().

cancel(CanceledDetails)

-spec cancel(CanceledDetails :: temporal_sdk:term_to_payloads()) -> no_return().

cancel_requested()

-spec cancel_requested() -> boolean() | no_return().

complete(Result)

-spec complete(Result :: temporal_sdk:term_to_payloads()) -> no_return().

elapsed_time()

-spec elapsed_time() -> NativeTime :: non_neg_integer() | no_return().

elapsed_time(Unit)

-spec elapsed_time(Unit :: erlang:time_unit()) -> non_neg_integer() | no_return().

fail(ApplicationFailure)

fail(Class, Reason, Stacktrace)

-spec fail(Class :: error | exit | throw | temporal_sdk:serializable(),
           Reason :: term() | temporal_sdk:serializable(),
           Stacktrace :: erlang:raise_stacktrace() | temporal_sdk:serializable()) ->
              no_return().

get_data()

-spec get_data() -> Data :: data() | no_return().

heartbeat()

-spec heartbeat() -> ok.

heartbeat(Heartbeat)

-spec heartbeat(Heartbeat :: heartbeat()) -> ok.

last_heartbeat()

-spec last_heartbeat() -> LastHeartbeat :: heartbeat() | no_return().

remaining_time()

-spec remaining_time() -> NativeTime :: non_neg_integer() | infinity | no_return().

remaining_time(Unit)

-spec remaining_time(Unit :: erlang:time_unit()) -> non_neg_integer() | infinity | no_return().

set_data(TaskData)

-spec set_data(TaskData :: term()) -> ok.