temporal_sdk_workflow behaviour (temporal_sdk v0.1.18)

View Source

Temporal workflow task module.

WIP Temporal commands:

  • start_activity WIP: session_execution, direct_execution, direct_result

TODO Temporal commands:

  • start_nexus/4

  • start_nexus/5

  • cancel_nexus

  • upsert_workflow_search_attributes

  • cancel_external_workflow

  • signal_external_workflow

Summary

Awaitables types

Awaitables functions

await(AwaitPattern)

-spec await(AwaitPattern :: await_pattern()) -> await_ret().

await(AwaitPattern, Opts)

-spec await(AwaitPattern :: await_pattern(), Opts :: await_opts()) -> await_ret().

await_all(AwaitPattern)

-spec await_all(AwaitPattern :: [await_pattern()]) -> await_ret_list().

await_all(AwaitPattern, Opts)

-spec await_all(AwaitPattern :: [await_pattern()], Opts :: await_opts()) -> await_ret_list().

await_info(InfoOrInfoId)

-spec await_info(InfoOrInfoId :: info() | term()) -> await_ret() | noinfo.

await_info(InfoOrInfoId, InfoTimeout, AwaitableTimeout)

-spec await_info(InfoOrInfoId :: info() | term(),
                 InfoTimeout :: temporal_sdk:time(),
                 AwaitableTimeout :: temporal_sdk:time()) ->
                    await_ret() | noinfo.

await_one(AwaitPattern)

-spec await_one(AwaitPattern :: [await_pattern()]) -> await_ret_list().

await_one(AwaitPattern, Opts)

-spec await_one(AwaitPattern :: [await_pattern()], Opts :: await_opts()) -> await_ret_list().

is_awaited(AwaitPattern)

-spec is_awaited(AwaitPattern :: await_pattern()) ->
                    {true, await_match()} | {false, await_match()} | no_return.

is_awaited_all(AwaitPattern)

-spec is_awaited_all(AwaitPattern :: [await_pattern()]) ->
                        {true, [await_match()]} | {false, [await_match()]} | no_return.

is_awaited_one(AwaitPattern)

-spec is_awaited_one(AwaitPattern :: [await_pattern()]) ->
                        {true, [await_match()]} | {false, [await_match()]} | no_return.

wait(AwaitPattern)

-spec wait(AwaitPattern :: await_pattern()) -> await_match() | no_return().

wait(AwaitPattern, Opts)

-spec wait(AwaitPattern :: await_pattern(), Opts :: await_opts()) -> await_match() | no_return().

wait_all(AwaitPattern)

-spec wait_all(AwaitPattern :: [await_pattern()]) -> [await_match()] | no_return().

wait_all(AwaitPattern, Opts)

-spec wait_all(AwaitPattern :: [await_pattern()], Opts :: await_opts()) -> [await_match()] | no_return().

wait_info(InfoOrInfoId)

-spec wait_info(InfoOrInfoId :: info() | term()) -> await_match() | no_return().

wait_info(InfoOrInfoId, InfoTimeout, AwaitableTimeout)

-spec wait_info(InfoOrInfoId :: info() | term(),
                InfoTimeout :: temporal_sdk:time(),
                AwaitableTimeout :: temporal_sdk:time()) ->
                   await_match() | no_return().

wait_one(AwaitPattern)

-spec wait_one(AwaitPattern :: [await_pattern()]) -> [await_match()] | no_return().

wait_one(AwaitPattern, Opts)

-spec wait_one(AwaitPattern :: [await_pattern()], Opts :: await_opts()) -> [await_match()] | no_return().

Awaitables functions types

await_match()

-type await_match() :: awaitable_match() | await_match_all() | await_match_one().

await_match_all()

-type await_match_all() :: {all, [awaitable_match() | await_match_one() | await_match_all()]}.

await_match_one()

-type await_match_one() :: {one, [awaitable_match() | await_match_one() | await_match_all()]}.

await_opts()

-type await_opts() ::
          Timeout :: temporal_sdk:time() | [{timeout, temporal_sdk:time()} | {evict, true} | evict].

await_pattern()

-type await_pattern() :: awaitable_pattern() | await_pattern_all() | await_pattern_one().

await_pattern_all()

-type await_pattern_all() :: {all, [awaitable_pattern() | await_pattern_one() | await_pattern_all()]}.

await_pattern_one()

-type await_pattern_one() :: {one, [awaitable_pattern() | await_pattern_one() | await_pattern_all()]}.

await_ret()

-type await_ret() :: {ok, await_match()} | {noevent, PartialMatch :: await_match()} | no_return().

await_ret_list()

-type await_ret_list() ::
          {ok, [await_match()]} | {noevent, [ProperOrPartialMatch :: await_match()]} | no_return().

awaitable_match()

-type awaitable_match() :: awaitable_data() | info_data() | noevent.

awaitable_pattern()

Awaitables types

activity()

-type activity() :: {activity_event(), ActivityId :: unicode:chardata()}.

activity_data()

-type activity_data() ::
          #{state := cmd,
            execution_id := execution_id(),
            event_id := event_id(),
            task_queue := unicode:chardata(),
            activity_type := unicode:chardata() | atom(),
            session_execution := boolean(),
            eager_execution := boolean(),
            direct_execution := boolean(),
            direct_result := boolean(),
            result => temporal_sdk:term_to_payloads(),
            last_failure => temporal_sdk_telemetry:exception(),
            heartbeat_timeout => pos_integer(),
            cancel_requested => true,
            history => [map()]} |
          #{state := scheduled,
            execution_id := execution_id(),
            event_id := event_id(),
            task_queue := unicode:chardata(),
            activity_type := unicode:chardata() | atom(),
            session_execution := boolean(),
            eager_execution := boolean(),
            direct_execution := boolean(),
            direct_result := boolean(),
            result => temporal_sdk:term_to_payloads(),
            last_failure => temporal_sdk_telemetry:exception(),
            heartbeat_timeout => pos_integer(),
            cancel_requested => true,
            history => [map()],
            replay_id => activity_index_key()} |
          #{state := started,
            execution_id := execution_id(),
            event_id := event_id(),
            task_queue := unicode:chardata(),
            activity_type := unicode:chardata() | atom(),
            session_execution := boolean(),
            eager_execution := boolean(),
            direct_execution := boolean(),
            direct_result := boolean(),
            result => temporal_sdk:term_to_payloads(),
            last_failure => temporal_sdk_telemetry:exception() | temporal_sdk:failure_from_temporal(),
            heartbeat_timeout => pos_integer(),
            cancel_requested => true,
            scheduled_event_id := event_id(),
            history => [map()],
            replay_id => activity_index_key()} |
          #{state := completed,
            execution_id := execution_id(),
            event_id := event_id(),
            task_queue := unicode:chardata(),
            activity_type := unicode:chardata() | atom(),
            session_execution := boolean(),
            eager_execution := boolean(),
            direct_execution := boolean(),
            direct_result := boolean(),
            result => temporal_sdk:term_from_payloads(),
            last_failure => temporal_sdk_telemetry:exception() | temporal_sdk:failure_from_temporal(),
            heartbeat_timeout => pos_integer(),
            cancel_requested => true,
            scheduled_event_id := event_id(),
            started_event_id := event_id(),
            attempt := pos_integer(),
            history => [map()],
            replay_id => activity_index_key()} |
          #{state := canceled,
            execution_id := execution_id(),
            event_id := event_id(),
            task_queue := unicode:chardata(),
            activity_type := unicode:chardata() | atom(),
            session_execution := boolean(),
            eager_execution := boolean(),
            direct_execution := boolean(),
            direct_result := boolean(),
            result => temporal_sdk:term_to_payloads(),
            last_failure => temporal_sdk_telemetry:exception() | temporal_sdk:failure_from_temporal(),
            heartbeat_timeout => pos_integer(),
            cancel_requested := true,
            scheduled_event_id := event_id(),
            started_event_id := event_id(),
            attempt := pos_integer(),
            details := temporal_sdk:term_from_payloads(),
            history => [map()],
            replay_id => activity_index_key()} |
          #{state := failed,
            execution_id := execution_id(),
            event_id := event_id(),
            task_queue := unicode:chardata(),
            activity_type := unicode:chardata() | atom(),
            session_execution := boolean(),
            eager_execution := boolean(),
            direct_execution := boolean(),
            direct_result := boolean(),
            result => temporal_sdk:term_to_payloads(),
            last_failure => temporal_sdk_telemetry:exception() | temporal_sdk:failure_from_temporal(),
            heartbeat_timeout => pos_integer(),
            cancel_requested => true,
            scheduled_event_id := event_id(),
            started_event_id := event_id(),
            failure => temporal_sdk:failure_from_temporal(),
            retry_state :=
                temporal_sdk_proto_service_workflow_binaries:'temporal.api.enums.v1.RetryState'(),
            history => [map()],
            replay_id => activity_index_key()} |
          #{state := timedout,
            execution_id := execution_id(),
            event_id := event_id(),
            task_queue := unicode:chardata(),
            activity_type := unicode:chardata() | atom(),
            session_execution := boolean(),
            eager_execution := boolean(),
            direct_execution := boolean(),
            direct_result := boolean(),
            result => temporal_sdk:term_to_payloads(),
            last_failure => temporal_sdk_telemetry:exception() | temporal_sdk:failure_from_temporal(),
            heartbeat_timeout => pos_integer(),
            cancel_requested => true,
            scheduled_event_id := event_id(),
            started_event_id := event_id(),
            failure => temporal_sdk:failure_from_temporal(),
            retry_state :=
                temporal_sdk_proto_service_workflow_binaries:'temporal.api.enums.v1.RetryState'(),
            history => [map()],
            replay_id => activity_index_key()}.

activity_event()

-type activity_event() ::
          activity_cmd | activity_cancel_request | activity_result | activity_schedule |
          activity_start | activity.

activity_index_key()

-type activity_index_key() :: {activity, ActivityId :: unicode:chardata()}.

activity_index_key_pattern()

-type activity_index_key_pattern() :: {activity, ets_matchvar() | (ActivityId :: unicode:chardata())}.

activity_pattern()

-type activity_pattern() :: {activity_event(), '_' | (ActivityId :: unicode:chardata() | atom())}.

awaitable()

awaitable_data()

awaitable_state()

-type awaitable_state() ::
          cmd | scheduled | started | canceled | completed | failed | initiated | initiate_failed |
          fired | modified | recorded | continued | requested | signaled | responded | suggested |
          admitted.

cancel_request()

-type cancel_request() :: {cancel_request_event()}.

cancel_request_data()

-type cancel_request_data() ::
          #{state := requested,
            event_id := event_id(),
            cause => unicode:chardata(),
            external_initiated_event_id => pos_integer(),
            external_workflow_execution => temporal_sdk:workflow_execution(),
            identity => unicode:chardata()}.

cancel_request_event()

-type cancel_request_event() :: cancel_request.

cancel_request_index_key()

-type cancel_request_index_key() :: {cancel_request}.

cancel_request_index_key_pattern()

-type cancel_request_index_key_pattern() :: {cancel_request}.

cancel_request_pattern()

-type cancel_request_pattern() :: {cancel_request_event()}.

cancel_workflow_execution()

-type cancel_workflow_execution() :: {cancel_workflow_execution}.

cancel_workflow_execution_data()

-type cancel_workflow_execution_data() ::
          #{state := cmd,
            execution_id := execution_id(),
            event_id => event_id(),
            details := temporal_sdk:term_to_payloads()} |
          #{state := canceled,
            execution_id := execution_id(),
            event_id := event_id(),
            details := temporal_sdk:term_from_payloads()}.

cancel_workflow_execution_index_key()

-type cancel_workflow_execution_index_key() :: {cancel_workflow_execution}.

cancel_workflow_execution_index_key_pattern()

-type cancel_workflow_execution_index_key_pattern() :: {cancel_workflow_execution}.

child_workflow()

-type child_workflow() :: {child_workflow_event(), ChildWorkflowId :: unicode:chardata()}.

child_workflow_data()

-type child_workflow_data() ::
          #{state := cmd,
            execution_id := execution_id(),
            event_id := event_id(),
            namespace := unicode:chardata(),
            task_queue := unicode:chardata(),
            workflow_type := unicode:chardata() | atom(),
            history => [map()]} |
          #{state := initiated,
            execution_id := execution_id(),
            event_id := event_id(),
            namespace := unicode:chardata(),
            task_queue := unicode:chardata(),
            workflow_type := unicode:chardata() | atom(),
            history => [map()],
            replay_id => child_workflow_index_key()} |
          #{state := initiate_failed,
            execution_id := execution_id(),
            event_id := event_id(),
            namespace := unicode:chardata(),
            task_queue := unicode:chardata(),
            workflow_type := unicode:chardata() | atom(),
            initiated_event_id := event_id(),
            cause :=
                temporal_sdk_proto_service_workflow_binaries:'temporal.api.enums.v1.StartChildWorkflowExecutionFailedCause'(),
            history => [map()],
            replay_id => child_workflow_index_key()} |
          #{state := started,
            execution_id := execution_id(),
            event_id := event_id(),
            namespace := unicode:chardata(),
            task_queue := unicode:chardata(),
            workflow_type := unicode:chardata() | atom(),
            initiated_event_id := event_id(),
            run_id := unicode:chardata(),
            history => [map()],
            replay_id => child_workflow_index_key()} |
          #{state := completed,
            execution_id := execution_id(),
            event_id := event_id(),
            namespace := unicode:chardata(),
            task_queue := unicode:chardata(),
            workflow_type := unicode:chardata() | atom(),
            initiated_event_id := event_id(),
            run_id := unicode:chardata(),
            started_event_id := event_id(),
            result := temporal_sdk:term_from_payloads(),
            history => [map()],
            replay_id => child_workflow_index_key()} |
          #{state := failed,
            execution_id := execution_id(),
            event_id := event_id(),
            namespace := unicode:chardata(),
            task_queue := unicode:chardata(),
            workflow_type := unicode:chardata() | atom(),
            initiated_event_id := event_id(),
            run_id := unicode:chardata(),
            started_event_id := event_id(),
            failure => temporal_sdk:failure_from_temporal(),
            history => [map()],
            replay_id => child_workflow_index_key()} |
          #{state := canceled,
            execution_id := execution_id(),
            event_id := event_id(),
            namespace := unicode:chardata(),
            task_queue := unicode:chardata(),
            workflow_type := unicode:chardata() | atom(),
            initiated_event_id := event_id(),
            run_id := unicode:chardata(),
            started_event_id := event_id(),
            details := temporal_sdk:term_from_payloads(),
            history => [map()],
            replay_id => child_workflow_index_key()} |
          #{state := timedout,
            execution_id := execution_id(),
            event_id := event_id(),
            namespace := unicode:chardata(),
            task_queue := unicode:chardata(),
            workflow_type := unicode:chardata() | atom(),
            initiated_event_id := event_id(),
            run_id := unicode:chardata(),
            started_event_id := event_id(),
            retry_state :=
                temporal_sdk_proto_service_workflow_binaries:'temporal.api.enums.v1.RetryState'(),
            history => [map()],
            replay_id => child_workflow_index_key()} |
          #{state := terminated,
            execution_id := execution_id(),
            event_id := event_id(),
            namespace := unicode:chardata(),
            task_queue := unicode:chardata(),
            workflow_type := unicode:chardata() | atom(),
            initiated_event_id := event_id(),
            run_id := unicode:chardata(),
            started_event_id := event_id(),
            history => [map()],
            replay_id => child_workflow_index_key()}.

child_workflow_event()

-type child_workflow_event() ::
          [child_workflow_cmd | child_workflow_initiate | child_workflow_start | child_workflow].

child_workflow_index_key()

-type child_workflow_index_key() :: {child_workflow, ChildWorkflowId :: unicode:chardata()}.

child_workflow_index_key_pattern()

-type child_workflow_index_key_pattern() ::
          {child_workflow, ets_matchvar() | (ChildWorkflowId :: unicode:chardata())}.

child_workflow_pattern()

-type child_workflow_pattern() ::
          {child_workflow_event(), '_' | (ChildWorkflowId :: unicode:chardata())}.

complete_workflow_execution()

-type complete_workflow_execution() :: {complete_workflow_execution}.

complete_workflow_execution_data()

-type complete_workflow_execution_data() ::
          #{state := cmd,
            execution_id := execution_id() | undefined,
            event_id => event_id(),
            result := temporal_sdk:term_to_payloads()} |
          #{state := completed,
            execution_id := execution_id() | undefined,
            event_id := event_id(),
            result := temporal_sdk:term_from_payloads()}.

complete_workflow_execution_index_key()

-type complete_workflow_execution_index_key() :: {complete_workflow_execution}.

complete_workflow_execution_index_key_pattern()

-type complete_workflow_execution_index_key_pattern() :: {complete_workflow_execution}.

continue_as_new_workflow()

-type continue_as_new_workflow() :: {continue_as_new_workflow}.

continue_as_new_workflow_data()

-type continue_as_new_workflow_data() ::
          #{state := cmd,
            execution_id := execution_id(),
            event_id => event_id(),
            task_queue := unicode:chardata(),
            workflow_type := unicode:chardata()} |
          #{state := continued,
            execution_id := execution_id(),
            event_id := event_id(),
            task_queue := unicode:chardata(),
            workflow_type := unicode:chardata()}.

continue_as_new_workflow_index_key()

-type continue_as_new_workflow_index_key() :: {continue_as_new_workflow}.

continue_as_new_workflow_index_key_pattern()

-type continue_as_new_workflow_index_key_pattern() :: {continue_as_new_workflow}.

event_data()

-type event_data() ::
          #{event_id := event_id(),
            type := temporal_sdk_proto_service_workflow_binaries:'temporal.api.enums.v1.EventType'(),
            attributes := map(),
            data := map()}.

event_id()

-type event_id() :: pos_integer().

execution()

-type execution() :: {execution_event(), ExecutionId :: execution_id()}.

execution_data()

-type execution_data() ::
          #{state := cmd, mfa := {Module :: module(), Function :: atom(), Args :: term()}} |
          #{state := started, mfa := {Module :: module(), Function :: atom(), Args :: term()}} |
          #{state := completed,
            mfa := {Module :: module(), Function :: atom(), Args :: term()},
            result => execution_result()}.

execution_event()

-type execution_event() :: execution_cmd | execution_start | execution.

execution_id()

-type execution_id() :: term().

execution_index_key()

-type execution_index_key() :: {execution, ExecutionId :: execution_id()}.

execution_index_key_pattern()

-type execution_index_key_pattern() :: {execution, ets_matchvar() | (ExecutionId :: execution_id())}.

execution_pattern()

-type execution_pattern() :: {execution_event(), '_' | (ExecutionId :: execution_id())}.

execution_result()

-type execution_result() :: term().

fail_workflow_execution()

-type fail_workflow_execution() :: {fail_workflow_execution}.

fail_workflow_execution_data()

-type fail_workflow_execution_data() ::
          #{state := cmd,
            execution_id := execution_id(),
            event_id => event_id(),
            failure := temporal_sdk:application_failure() | temporal_sdk:user_application_failure()} |
          #{state := failed,
            execution_id := execution_id(),
            event_id := event_id(),
            failure := temporal_sdk:failure_from_temporal()}.

fail_workflow_execution_index_key()

-type fail_workflow_execution_index_key() :: {fail_workflow_execution}.

fail_workflow_execution_index_key_pattern()

-type fail_workflow_execution_index_key_pattern() :: {fail_workflow_execution}.

history_event()

-type history_event() ::
          {EventId :: event_id(),
           EventType :: temporal_sdk_proto_service_workflow_binaries:'temporal.api.enums.v1.EventType'(),
           EventAttributes :: map(),
           EventData :: map()}.

history_event_event_pattern()

-type history_event_event_pattern() :: {event, history_event_pattern()}.

history_event_pattern()

-type history_event_pattern() ::
          {'_' | (EventId :: event_id()),
           '_' |
           (EventType ::
                temporal_sdk_proto_service_workflow_binaries:'temporal.api.enums.v1.EventType'()),
           '_' | (EventAttributes :: map()),
           '_' | (EventData :: map())}.

history_event_table_pattern()

-type history_event_table_pattern() ::
          {ets_matchvar() | (EventId :: event_id()),
           ets_matchvar() |
           (EventType ::
                temporal_sdk_proto_service_workflow_binaries:'temporal.api.enums.v1.EventType'()),
           ets_matchvar() | (EventAttributes :: map()),
           ets_matchvar() | (EventData :: map())}.

history_event_table_pattern_match_spec()

-type history_event_table_pattern_match_spec() ::
          [{EtsMatchHead :: history_event_table_pattern(), EtsMatchGuard :: [_], EtsMatchResult :: [_]}].

info()

-type info() :: {info, InfoId :: term()}.

info_data()

-type info_data() :: InfoData :: term() | awaitable().

info_index_key()

-type info_index_key() :: {info, InfoId :: term()}.

info_index_key_pattern()

-type info_index_key_pattern() :: {info, ets_matchvar() | (InfoId :: term())}.

info_pattern()

-type info_pattern() :: {info, '_' | (InfoId :: term())}.

marker()

-type marker() ::
          {marker_event(),
           MarkerType :: unicode:chardata() | atom(),
           MarkerName :: unicode:chardata() | atom()}.

marker_data()

-type marker_data() ::
          #{state := cmd,
            execution_id := execution_id(),
            event_id := event_id(),
            mutable => record_marker_mutable_opts(),
            mutations_count => non_neg_integer(),
            details := temporal_sdk:term_to_mapstring_payload(),
            value => temporal_sdk:term_to_payloads() | term(),
            history => [map()]} |
          #{state := recorded,
            execution_id := execution_id(),
            event_id := event_id(),
            mutable => record_marker_mutable_opts(),
            mutations_count => non_neg_integer(),
            details := temporal_sdk:term_from_mapstring_payload(),
            value := temporal_sdk:term_from_payloads() | term(),
            history => [map()],
            replay_id => marker_index_key()} |
          #{state := recorded,
            execution_id := undefined,
            event_id := event_id(),
            value := temporal_sdk:term_from_payloads(),
            history => [map()]}.

marker_event()

-type marker_event() :: marker_cmd | marker_value | marker.

marker_index_key()

-type marker_index_key() :: {marker, MarkerType :: unicode:chardata(), MarkerName :: unicode:chardata()}.

marker_index_key_pattern()

-type marker_index_key_pattern() ::
          {marker,
           ets_matchvar() | (MarkerType :: unicode:chardata() | atom()),
           ets_matchvar() | (MarkerName :: unicode:chardata() | atom())}.

marker_pattern()

-type marker_pattern() ::
          {marker_event(),
           '_' | (MarkerType :: unicode:chardata() | atom()),
           '_' | (MarkerName :: unicode:chardata() | atom())}.

nexus()

-type nexus() ::
          {nexus_event(),
           Endpoint :: unicode:chardata() | atom(),
           Service :: unicode:chardata() | atom(),
           Operation :: unicode:chardata() | atom()}.

nexus_data()

-type nexus_data() ::
          #{state := cmd,
            execution_id := execution_id(),
            event_id := event_id(),
            input := temporal_sdk:term_to_payload(),
            cancel_requested => true,
            history => [map()]} |
          #{state := scheduled,
            execution_id := execution_id(),
            event_id := event_id(),
            input := temporal_sdk:term_from_payload(),
            cancel_requested => true,
            history => [map()],
            replay_id => nexus_index_key()} |
          #{state := started,
            execution_id := execution_id(),
            event_id := event_id(),
            input := temporal_sdk:term_from_payload(),
            cancel_requested => true,
            history => [map()],
            replay_id => nexus_index_key()} |
          #{state := completed,
            execution_id := execution_id(),
            event_id := event_id(),
            input := temporal_sdk:term_from_payload(),
            result => temporal_sdk:term_from_payload(),
            cancel_requested => true,
            history => [map()],
            replay_id => nexus_index_key()} |
          #{state := canceled,
            execution_id := execution_id(),
            event_id := event_id(),
            input := temporal_sdk:term_from_payload(),
            failure => temporal_sdk_proto_service_workflow_binaries:'temporal.api.failure.v1.Failure'(),
            cancel_requested := true,
            history => [map()],
            replay_id => nexus_index_key()}.

nexus_event()

-type nexus_event() :: nexus_cmd | nexus_cancel_request | nexus_schedule | nexus_start | nexus.

nexus_index_key()

-type nexus_index_key() ::
          {nexus,
           Endpoint :: unicode:chardata(),
           Service :: unicode:chardata(),
           Operation :: unicode:chardata()}.

nexus_index_key_pattern()

-type nexus_index_key_pattern() ::
          {nexus,
           ets_matchvar() | (Endpoint :: unicode:chardata()),
           ets_matchvar() | (Service :: unicode:chardata()),
           ets_matchvar() | (Operation :: unicode:chardata())}.

nexus_pattern()

-type nexus_pattern() ::
          {nexus_event(),
           '_' | (Endpoint :: unicode:chardata() | atom()),
           '_' | (Service :: unicode:chardata() | atom()),
           '_' | (Operation :: unicode:chardata() | atom())}.

query()

-type query() :: {query_event(), QueryType :: unicode:chardata()}.

query_data()

-type query_data() ::
          #{'_sdk_data' := term(),
            state := requested,
            query_args => temporal_sdk:term_from_payloads(),
            header => temporal_sdk:term_from_mapstring_payload(),
            history => [map()]} |
          #{state := responded,
            query_args => temporal_sdk:term_from_payloads(),
            header => temporal_sdk:term_from_mapstring_payload(),
            answer => temporal_sdk:term_to_payloads(),
            error_message => unicode:chardata(),
            failure => temporal_sdk:application_failure() | temporal_sdk:user_application_failure(),
            history => [map()]}.

query_event()

-type query_event() :: query_request | query_response | query.

query_index_key()

-type query_index_key() :: {query, QueryType :: unicode:chardata()}.

query_index_key_pattern()

-type query_index_key_pattern() :: {query, ets_matchvar() | (QueryType :: unicode:chardata())}.

query_pattern()

-type query_pattern() :: {query_event(), '_' | (QueryType :: unicode:chardata())}.

signal()

-type signal() :: {signal_event(), SignalName :: unicode:chardata()}.

signal_data()

-type signal_data() ::
          #{state := requested,
            event_id := event_id(),
            input := temporal_sdk:term_from_payloads(),
            identity := unicode:chardata(),
            header := temporal_sdk:term_from_mapstring_payload(),
            external_workflow_execution => temporal_sdk:workflow_execution(),
            history => [map()]} |
          #{state := admitted,
            event_id := event_id(),
            input := temporal_sdk:term_from_payloads(),
            identity := unicode:chardata(),
            header := temporal_sdk:term_from_mapstring_payload(),
            external_workflow_execution => temporal_sdk:workflow_execution(),
            details => term(),
            history => [map()]}.

signal_event()

-type signal_event() :: signal_request | signal_admit | signal.

signal_index_key()

-type signal_index_key() :: {signal, SignalName :: unicode:chardata()}.

signal_index_key_pattern()

-type signal_index_key_pattern() :: {signal, ets_matchvar() | (SignalName :: unicode:chardata())}.

signal_pattern()

-type signal_pattern() :: {signal_event(), '_' | (SignalName :: unicode:chardata())}.

suggest_continue_as_new()

-type suggest_continue_as_new() :: {suggest_continue_as_new}.

suggest_continue_as_new_data()

-type suggest_continue_as_new_data() :: #{state := suggested, event_id := event_id()}.

suggest_continue_as_new_index_key()

-type suggest_continue_as_new_index_key() :: {suggest_continue_as_new}.

suggest_continue_as_new_index_key_pattern()

-type suggest_continue_as_new_index_key_pattern() :: {suggest_continue_as_new}.

suggest_continue_as_new_pattern()

-type suggest_continue_as_new_pattern() :: {suggest_continue_as_new}.

timer()

-type timer() :: {timer_event(), TimerId :: unicode:chardata() | atom()}.

timer_data()

-type timer_data() ::
          #{state := cmd,
            execution_id := execution_id(),
            event_id := event_id(),
            cancel_requested => true,
            history => [map()]} |
          #{state := started,
            execution_id := execution_id(),
            event_id := event_id(),
            cancel_requested => true,
            history => [map()],
            replay_id => timer_index_key()} |
          #{state := fired,
            execution_id := execution_id(),
            event_id := event_id(),
            started_event_id := event_id(),
            cancel_requested => true,
            history => [map()],
            replay_id => timer_index_key()} |
          #{state := canceled,
            execution_id := execution_id(),
            event_id := event_id(),
            started_event_id := event_id(),
            cancel_requested := true,
            history => [map()],
            replay_id => timer_index_key()}.

timer_event()

-type timer_event() :: timer_cmd | timer_cancel_request | timer_start | timer.

timer_index_key()

-type timer_index_key() :: {timer, TimerId :: unicode:chardata()}.

timer_index_key_pattern()

-type timer_index_key_pattern() :: {timer, ets_matchvar() | (TimerId :: unicode:chardata())}.

timer_pattern()

-type timer_pattern() :: {timer_event(), '_' | (TimerId :: unicode:chardata() | atom())}.

workflow_properties()

-type workflow_properties() :: {workflow_properties_event()}.

workflow_properties_data()

-type workflow_properties_data() ::
          #{state := cmd,
            execution_id := execution_id(),
            event_id := event_id(),
            upserted_memo := temporal_sdk:term_to_mapstring_payload(),
            history => [map()]} |
          #{state := modified,
            execution_id := execution_id(),
            event_id := event_id(),
            upserted_memo := temporal_sdk:term_from_mapstring_payload(),
            history => [map()]}.

workflow_properties_event()

-type workflow_properties_event() :: workflow_properties_cmd | workflow_properties.

workflow_properties_index_key()

-type workflow_properties_index_key() :: {workflow_properties}.

workflow_properties_index_key_pattern()

-type workflow_properties_index_key_pattern() :: {workflow_properties}.

workflow_properties_pattern()

-type workflow_properties_pattern() :: {workflow_properties_event()}.

SDK functions

await_open_before_close(IsEnabled)

-spec await_open_before_close(IsEnabled :: boolean()) -> ok.

evict_workflow()

-spec evict_workflow() -> ok | no_return().

Requests workflow eviction on next workflow task completion request.

SDK Architecture - Workflow Eviction section provides details about workflow eviction mechanism.

Duplicate eviction requests within the same workflow task cycle are ignored. Eviction requests are ignored during workflow replay.

SDK Samples Eviction Parallel Handler sample demonstrates function usage.

get_workflow_result()

-spec get_workflow_result() -> temporal_sdk:term_to_payloads() | no_return().

select_history/1

-spec select_history(EventId :: pos_integer()) -> history_event() | noevent;
                    (HistoryEventPattern :: history_event_table_pattern()) -> [history_event()];
                    (HistoryPatternSpec :: history_event_table_pattern_match_spec()) ->
                        [history_event()];
                    (Continuation :: ets_continuation()) ->
                        {[history_event()], Continuation :: ets_continuation()} | '$end_of_table'.

select_history(HistoryPatternSpec, Limit)

-spec select_history(HistoryPatternSpec :: history_event_table_pattern_match_spec(),
                     Limit :: pos_integer()) ->
                        {[history_event()], Continuation :: ets_continuation()} | '$end_of_table'.

select_index/1

-spec select_index(AwaitableIndexPattern :: awaitable_index_pattern()) -> [awaitable_index()];
                  (IndexPatternSpec :: awaitable_index_pattern_match_spec()) -> [awaitable_index()];
                  (Continuation :: ets_continuation()) ->
                      {[awaitable_index()], Continuation :: ets_continuation()} | '$end_of_table'.

select_index(IndexPatternSpec, Limit)

-spec select_index(IndexPatternSpec :: awaitable_index_pattern_match_spec(), Limit :: pos_integer()) ->
                      {[awaitable_index()], Continuation :: ets_continuation()} | '$end_of_table'.

set_info(InfoValue)

-spec set_info(InfoValue :: term()) -> info() | no_return().

set_info(InfoValue, Opts)

-spec set_info(InfoValue :: term(),
               Opts :: [{info_id, execution_id()} | {awaitable_id, awaitable_id()}]) ->
                  info() | no_return().

set_workflow_result(WorkflowResult)

-spec set_workflow_result(WorkflowResult :: temporal_sdk:term_to_payloads()) -> ok.

start_execution(Function)

-spec start_execution(Function :: atom()) -> execution() | no_return().

start_execution(Function, Input)

-spec start_execution(Function :: atom(), Input :: term()) -> execution() | no_return().

start_execution(Function, Input, Opts)

-spec start_execution(Function :: atom(), Input :: term(), Opts :: start_execution_opts()) ->
                         execution() | no_return().

start_execution(Module, Function, Input, Opts)

-spec start_execution(Module :: module(),
                      Function :: atom(),
                      Input :: term(),
                      Opts :: start_execution_opts()) ->
                         execution() | execution_data() | no_return().

terminate_executor()

-spec terminate_executor() -> ok.

terminate_executor(Reason)

-spec terminate_executor(Reason :: term()) -> ok.

workflow_info()

-spec workflow_info() -> workflow_info() | no_return().

SDK functions types

awaitable_index()

awaitable_index_pattern()

awaitable_index_pattern_match_spec()

-type awaitable_index_pattern_match_spec() ::
          [{EtsMatchHead :: awaitable_index_pattern(), EtsMatchGuard :: [_], EtsMatchResult :: [_]}].

awaitable_temporal_index()

ets_continuation()

-type ets_continuation() ::
          '$end_of_table' |
          {ets:table(), integer(), integer(), ets:compiled_match_spec(), list(), integer()} |
          {ets:table(), _, _, integer(), ets:compiled_match_spec(), list(), integer(), integer()}.

ets_matchvar()

-type ets_matchvar() :: '_' | '$1' | '$2' | '$3' | '$4' | atom().

start_execution_opts()

-type start_execution_opts() ::
          [{execution_id, execution_id()} |
           {awaitable_id, awaitable_id()} |
           {awaitable_event, cmd | close} |
           {wait, boolean()} |
           wait].

workflow_info()

-type workflow_info() ::
          #{event_id := pos_integer(),
            is_replaying := boolean(),
            open_executions_count := pos_integer(),
            total_executions_count := pos_integer(),
            open_tasks_count := non_neg_integer(),
            otp_messages_count :=
                #{received => non_neg_integer(),
                  recorded => non_neg_integer(),
                  ignored => non_neg_integer()},
            attempt := pos_integer(),
            suggest_continue_as_new := boolean(),
            history_size_bytes := non_neg_integer()}.

Temporal commands

cancel_activity(ActivityOrActivityData)

-spec cancel_activity(ActivityOrActivityData :: activity() | activity_data()) ->
                         activity() | no_return().

cancel_activity(ActivityOrActivityData, Opts)

-spec cancel_activity(ActivityOrActivityData :: activity() | activity_data(),
                      Opts ::
                          [{awaitable_event, cmd | cancel_request | result | schedule | start | close} |
                           {wait, boolean()} |
                           wait]) ->
                         activity() | activity_data() | no_return().

cancel_timer(TimerOrTimerDataOrTimerId)

-spec cancel_timer(TimerOrTimerDataOrTimerId :: timer() | timer_data() | unicode:chardata() | atom()) ->
                      timer() | no_return().

cancel_timer(TimerOrTimerDataOrTimerId, Opts)

-spec cancel_timer(TimerOrTimerDataOrTimerId :: timer() | timer_data() | unicode:chardata() | atom(),
                   Opts ::
                       [{awaitable_event, cmd | cancel_request | result | schedule | start | close} |
                        {wait, boolean()} |
                        wait]) ->
                      timer() | timer_data() | no_return().

cancel_workflow_execution(Details)

-spec cancel_workflow_execution(Details :: temporal_sdk:term_to_payloads()) ->
                                   cancel_workflow_execution().

complete_workflow_execution(Result)

-spec complete_workflow_execution(Result :: temporal_sdk:term_to_payloads()) ->
                                     complete_workflow_execution().

continue_as_new_workflow(TaskQueue, WorkflowType)

-spec continue_as_new_workflow(TaskQueue :: unicode:chardata(),
                               WorkflowType :: atom() | unicode:chardata()) ->
                                  continue_as_new_workflow().

continue_as_new_workflow(TaskQueue, WorkflowType, Opts)

-spec continue_as_new_workflow(TaskQueue :: unicode:chardata(),
                               WorkflowType :: atom() | unicode:chardata(),
                               Opts :: continue_as_new_workflow_opts()) ->
                                  continue_as_new_workflow().

fail_workflow_execution(ApplicationFailure)

-spec fail_workflow_execution(ApplicationFailure ::
                                  temporal_sdk:application_failure() |
                                  temporal_sdk:user_application_failure()) ->
                                 fail_workflow_execution().

modify_workflow_properties(UpsertedMemoFields)

-spec modify_workflow_properties(UpsertedMemoFields :: temporal_sdk:term_to_mapstring_payload()) ->
                                    workflow_properties() | no_return().

modify_workflow_properties(UpsertedMemoFields, Opts)

-spec modify_workflow_properties(UpsertedMemoFields :: temporal_sdk:term_to_mapstring_payload(),
                                 Opts :: [{awaitable_event, cmd | close} | {wait, boolean()} | wait]) ->
                                    workflow_properties() | workflow_properties_data() | no_return().

record_marker(MarkerValueFun)

-spec record_marker(MarkerValueFun :: record_marker_value_fun()) -> marker() | no_return().

record_marker(MarkerValueFun, Opts)

-spec record_marker(MarkerValueFun :: record_marker_value_fun(), Opts :: record_marker_opts()) ->
                       marker() | marker_data() | no_return().

start_activity(ActivityType, Input)

-spec start_activity(ActivityType :: unicode:chardata() | atom(),
                     Input :: temporal_sdk:term_to_payloads()) ->
                        activity() | no_return().

start_activity(ActivityType, Input, Opts)

-spec start_activity(ActivityType :: unicode:chardata() | atom(),
                     Input :: temporal_sdk:term_to_payloads(),
                     Opts :: start_activity_opts()) ->
                        activity() | activity_data() | no_return().

start_child_workflow(TaskQueue, WorkflowType)

-spec start_child_workflow(TaskQueue :: unicode:chardata(), WorkflowType :: atom() | unicode:chardata()) ->
                              child_workflow() | child_workflow_data() | no_return().

start_child_workflow(TaskQueue, WorkflowType, Opts)

-spec start_child_workflow(TaskQueue :: unicode:chardata(),
                           WorkflowType :: atom() | unicode:chardata(),
                           Opts :: start_child_workflow_opts()) ->
                              child_workflow() | child_workflow_data() | no_return().

start_nexus(Endpoint, Service, Operation, Input)

-spec start_nexus(Endpoint :: atom() | unicode:chardata(),
                  Service :: atom() | unicode:chardata(),
                  Operation :: atom() | unicode:chardata(),
                  Input :: temporal_sdk:term_to_payload()) ->
                     nexus().

start_nexus(Endpoint, Service, Operation, Input, Opts)

-spec start_nexus(Endpoint :: atom() | unicode:chardata(),
                  Service :: atom() | unicode:chardata(),
                  Operation :: atom() | unicode:chardata(),
                  Input :: temporal_sdk:term_to_payload(),
                  Opts :: start_nexus_opts()) ->
                     nexus().

start_timer(StartToFireTimeout)

-spec start_timer(StartToFireTimeout :: temporal_sdk:time()) -> timer() | no_return().

start_timer(StartToFireTimeout, Opts)

-spec start_timer(StartToFireTimeout :: temporal_sdk:time(), Opts :: start_timer_opts()) ->
                     timer() | timer_data() | no_return().

Temporal commands opts

awaitable_id()

-type awaitable_id() ::
          #{id => unicode:chardata() | atom(),
            prefix => boolean() | unicode:chardata() | atom(),
            postfix => boolean() | unicode:chardata() | atom()} |
          unicode:chardata() |
          atom().

continue_as_new_workflow_opts()

-type continue_as_new_workflow_opts() ::
          [{input, temporal_sdk:term_to_payloads()} |
           {workflow_run_timeout, temporal_sdk:time()} |
           {workflow_task_timeout, temporal_sdk:time()} |
           {backoff_start_interval, temporal_sdk:time()} |
           {retry_policy, temporal_sdk:retry_policy()} |
           {header, temporal_sdk:term_to_mapstring_payload()} |
           {memo, temporal_sdk:term_to_mapstring_payload()} |
           {search_attributes, temporal_sdk:term_to_mapstring_payload()}].

marker_value_codec()

-type marker_value_codec() ::
          none | list | term |
          {Encoder ::
               none |
               fun((term()) -> temporal_sdk:term_to_payloads()) |
               {Module :: module(), Function :: atom()},
           Decoder ::
               none |
               fun((temporal_sdk:term_from_payloads()) -> term()) |
               {Module :: module(), Function :: atom()}}.

record_marker_mutable_opts()

-type record_marker_mutable_opts() ::
          #{mutations_limit => pos_integer(), fail_on_limit => boolean()} | boolean().

record_marker_opts()

-type record_marker_opts() ::
          [{marker_name, atom() | unicode:chardata()} |
           {header, temporal_sdk:term_to_mapstring_payload()} |
           {awaitable_id, awaitable_id()} |
           {awaitable_event, cmd | value | close} |
           {wait, boolean()} |
           wait |
           {type, temporal_sdk:convertable()} |
           {details, temporal_sdk:term_to_mapstring_payloads()} |
           {mutable, record_marker_mutable_opts()} |
           mutable |
           {value_codec, marker_value_codec()}].

record_marker_value_fun()

-type record_marker_value_fun() ::
          AllMarkers ::
              fun(() -> temporal_sdk:term_to_payloads() | term()) |
              (MutableMarker ::
                   fun((-1 | non_neg_integer()) -> temporal_sdk:term_to_payloads() | term()) |
                   {module(), atom()} |
                   {module(), atom(), term()}).

start_activity_opts()

-type start_activity_opts() ::
          [{activity_id, unicode:chardata() | atom()} |
           {task_queue, unicode:chardata()} |
           {header, temporal_sdk:term_to_mapstring_payload()} |
           {schedule_to_close_timeout, temporal_sdk:time()} |
           {schedule_to_start_timeout, temporal_sdk:time()} |
           {start_to_close_timeout, temporal_sdk:time()} |
           {heartbeat_timeout, temporal_sdk:time()} |
           {retry_policy, temporal_sdk:retry_policy()} |
           {eager_execution, boolean()} |
           eager_execution |
           {use_workflow_build_id, boolean()} |
           use_workflow_build_id |
           {priority, temporal_sdk_proto_service_workflow_binaries:'temporal.api.common.v1.Priority'()} |
           {raw_request,
            temporal_sdk_proto_service_workflow_binaries:'temporal.api.command.v1.ScheduleActivityTaskCommandAttributes'()} |
           {awaitable_id, awaitable_id()} |
           {awaitable_event, cmd | cancel_request | result | schedule | start | close} |
           {wait, boolean()} |
           wait |
           {direct_execution, boolean()} |
           direct_execution |
           {direct_result, boolean()} |
           direct_result |
           {session_execution, boolean()} |
           session_execution |
           {node_execution_fun, function()}].

start_child_workflow_opts()

-type start_child_workflow_opts() ::
          [{namespace, unicode:chardata()} |
           {workflow_id, unicode:chardata()} |
           {input, temporal_sdk:term_to_payloads()} |
           {workflow_execution_timeout, temporal_sdk:time()} |
           {workflow_run_timeout, temporal_sdk:time()} |
           {workflow_task_timeout, temporal_sdk:time()} |
           {parent_close_policy,
            temporal_sdk_proto_service_workflow_binaries:'temporal.api.enums.v1.ParentClosePolicy'()} |
           {workflow_id_reuse_policy,
            temporal_sdk_proto_service_workflow_binaries:'temporal.api.enums.v1.WorkflowIdReusePolicy'()} |
           {retry_policy, temporal_sdk:retry_policy()} |
           {cron_schedule, unicode:chardata()} |
           {header, temporal_sdk:term_to_mapstring_payload()} |
           {memo, temporal_sdk:term_to_mapstring_payload()} |
           {search_attributes, temporal_sdk:term_to_mapstring_payload()} |
           {priority, temporal_sdk_proto_service_workflow_binaries:'temporal.api.common.v1.Priority'()} |
           {raw_request,
            temporal_sdk_proto_service_workflow_binaries:'temporal.api.command.v1.ScheduleActivityTaskCommandAttributes'()} |
           {awaitable_id, awaitable_id()} |
           {awaitable_event, cmd | initiate | start | close} |
           {wait, boolean()} |
           wait].

start_nexus_opts()

-type start_nexus_opts() ::
          [{schedule_to_close_timeout, temporal_sdk:time()} |
           {awaitable_id, awaitable_id()} |
           {awaitable_event, cmd | cancel_request | close} |
           {wait, boolean()} |
           wait].

start_timer_opts()

-type start_timer_opts() ::
          [{timer_id, atom() | unicode:chardata()} |
           {awaitable_id, awaitable_id()} |
           {awaitable_event, cmd | cancel_request | close} |
           {wait, boolean()} |
           wait].

Temporal external commands

admit_signal(SignalOrSignalName)

-spec admit_signal(SignalOrSignalName :: signal() | unicode:chardata()) -> signal().

admit_signal(SignalOrSignalName, Opts)

-spec admit_signal(SignalOrSignalName :: signal() | unicode:chardata(), Opts :: admit_signal_opts()) ->
                      signal() | signal_data().

respond_query(QueryOrQueryType, Opts)

-spec respond_query(QueryOrQueryType :: query() | unicode:chardata(), Opts :: respond_query_opts()) ->
                       query() | query_data().

Temporal external commands opts

admit_signal_opts()

-type admit_signal_opts() ::
          [{details, term()} | {awaitable_event, request | admit | close} | {wait, boolean()} | wait].

respond_query_opts()

-type respond_query_opts() ::
          [{result_type,
            temporal_sdk_proto_service_workflow_binaries:'temporal.api.enums.v1.QueryResultType'()} |
           {answer, temporal_sdk:term_to_payloads()} |
           {error_message, unicode:chardata()} |
           {failure, temporal_sdk:application_failure() | temporal_sdk:user_application_failure()} |
           {awaitable_event, request | response | close} |
           {wait, boolean()} |
           wait].

Temporal marker commands

record_app_env(Par)

-spec record_app_env(Par :: atom()) -> marker() | no_return().

record_app_env(Par, Opts)

-spec record_app_env(Par :: atom(), Opts :: record_marker_opts()) ->
                        marker() | marker_data() | no_return().

Retrieves the application configuration parameter via application:get_env/1 and records its value as a marker.

record_os_env(VarName)

-spec record_os_env(VarName :: os:env_var_name()) -> marker() | no_return().

record_os_env(VarName, Opts)

-spec record_os_env(VarName :: os:env_var_name(), Opts :: record_marker_opts()) ->
                       marker() | marker_data() | no_return().

Retrieves the OS environment variable via os:getenv/1 and records its value as a marker.

record_rand_uniform()

-spec record_rand_uniform() -> marker() | no_return().

record_rand_uniform(RangeOrOpts)

-spec record_rand_uniform(RangeOrOpts :: pos_integer() | record_marker_opts()) ->
                             marker() | marker_data() | no_return().

record_rand_uniform(Range, Opts)

-spec record_rand_uniform(Range :: pos_integer(), Opts :: record_marker_opts()) ->
                             marker() | marker_data() | no_return().

record_system_time()

-spec record_system_time() -> marker() | no_return().

record_system_time(UnitOrOpts)

-spec record_system_time(UnitOrOpts :: erlang:time_unit() | record_marker_opts()) ->
                            marker() | marker_data() | no_return().

record_system_time(Unit, Opts)

-spec record_system_time(Unit :: erlang:time_unit(), Opts :: record_marker_opts()) ->
                            marker() | marker_data() | no_return().

record_uuid4()

-spec record_uuid4() -> marker() | no_return().

record_uuid4(Opts)

-spec record_uuid4(Opts :: record_marker_opts()) -> marker() | marker_data() | no_return().

Workflow behaviour

context()

-type context() ::
          #{cluster := temporal_sdk_cluster:cluster_name(),
            executor_pid := pid(),
            otel_ctx := otel_ctx:t(),
            execution_id := execution_id(),
            worker_opts := temporal_sdk_worker:opts(),
            history_table := ets:table(),
            index_table := ets:table(),
            workflow_info := context_workflow_info(),
            task := task(),
            attempt := pos_integer(),
            is_replaying := boolean()}.

context_workflow_info()

-type context_workflow_info() ::
          #{workflow_execution :=
                temporal_sdk_proto_service_workflow_binaries:'temporal.api.common.v1.WorkflowExecution'(),
            workflow_execution_task_queue := unicode:chardata(),
            workflow_type := unicode:chardata(),
            task_queue := unicode:chardata(),
            workflow_execution_timeout_msec := erlang:timeout(),
            workflow_run_timeout_msec := erlang:timeout(),
            workflow_task_timeout_msec := erlang:timeout(),
            last_completion_result => temporal_sdk:term_from_payloads(),
            attempt := pos_integer(),
            memo => temporal_sdk:term_from_mapstring_payload(),
            search_attributes => temporal_sdk:term_from_mapstring_payload(),
            header => temporal_sdk:term_from_mapstring_payload()} |
          #{attempt := pos_integer()}.

execute(Context, Input)

-callback execute(Context :: context(), Input :: temporal_sdk:term_from_payloads()) ->
                     ExecutionResult :: execution_result().

handle_eviction(HandlerContext, PollIdleTime)

(optional)
-callback handle_eviction(HandlerContext :: handler_context(), PollIdleTime :: undefined | pos_integer()) ->
                             evict | ignore | default.

Handles eviction of the workflow executor process.

SDK Architecture - Workflow Eviction section provides details about workflow eviction mechanism.

The second function argument, PollIdleTime, represents the total time the workflow executor has already spent waiting for a new task when polling the task queue and sticky queue, plus the time spent executing this function.

The function is invoked at intervals determined by the worker options' workflow task sticky_execution type setting:

  • local: The long-poll timeout is determined by the grpc_opts_longpoll option (defined via temporal_sdk_client:opts/0) and the Temporal long-poll timeout, which defaults to 60 seconds.
  • disabled or pool: A fixed 60-second interval is used.

Function may return the following atoms:

  • ignore : Skips eviction.
  • evict : Evicts the workflow executor process.
  • default : Uses the built-in default function implementation to determine whether the workflow should be evicted.

Function implementations should be non-blocking and free of side effects. Function execution timeout is set to 50% of the workflow task timeout.

If callback function is not defined, default built-in implementation is used. Default function implementation provides a smart eviction that depends on the following variables:

  • Workflow history size in MB - HistorySizeMB,
  • Workflow history events count - EventsCount,
  • Executor idle time spent polling for new workflow tasks - PollIdleTime.

First, BaseTimeSec is calculated as a function of HistorySizeMB:

                             | if HistorySizeMB < 1: ignore
BaseTimeSec(HistorySizeMB) = | if HistorySizeMB > 40: 600
                             | else: -77 * HistorySizeMB + 3700

TimeMultiplier is calculated as a function of EventsCount:

                              | if EventsCount < 256: 1
TimeMultiplier(EventsCount) = | if EventsCount > 50_000: 3
                              | else: EventsCount / 25000 + 1

EvictionTimeSec is calculated using BaseTimeSec and TimeMultiplier:

EvictionTimeSec = round(BaseTimeSec * TimeMultiplier)

If calculated EvictionTimeSec is greater than PollIdleTime function returns ignore and eviction is skipped, otherwise function returns evict and workflow is evicted.

Example calculation for workflow with history size 10MB and 1000 history events:

BaseTimeSec(10) = -77 * 10 + 3700 = 1390
TimeMultiplier(1000) = 1000 / 25000 + 1 = 1.04
EvictionTimeSec = round(1390 * 1.04) = 1446

Workflow will be evicted if PollIdleTime exceeds approximately 24 minutes.

SDK Samples Workflow Eviction sample demonstrates callback usage.

handle_failure(HandlerContext, Class, Reason, Stacktrace)

(optional)
-callback handle_failure(HandlerContext :: handler_context(),
                         Class :: error | exit | throw,
                         Reason :: term(),
                         Stacktrace :: erlang:raise_stacktrace()) ->
                            default |
                            (ApplicationFailure ::
                                 temporal_sdk:application_failure() |
                                 temporal_sdk:user_application_failure()).

handle_message(HandlerContext, MessageName, MessageValue)

(optional)
-callback handle_message(HandlerContext :: handler_context(),
                         MessageName :: unicode:chardata(),
                         MessageValue :: term()) ->
                            {record, MarkerValue :: temporal_sdk:term_to_payloads()} |
                            {fail,
                             {Message :: temporal_sdk:serializable(),
                              Source :: temporal_sdk:serializable(),
                              Stacktrace :: temporal_sdk:serializable()}} |
                            ignore.

handle_query(HistoryEvents, WorkflowQuery)

(optional)

handler_context()

-type handler_context() ::
          #{cluster := temporal_sdk_cluster:cluster_name(),
            otel_ctx := otel_ctx:t(),
            history_table := ets:table(),
            index_table := ets:table(),
            workflow_info := workflow_info(),
            attempt := pos_integer()}.

task()