Safe Haskell | None |
---|---|
Language | Haskell2010 |
Temporal.Activity
Description
Temporal activities are units of work in a Temporal workflow. They encapsulate a specific task or operation that needs to be executed within a workflow instance. Activities can be synchronous or asynchronous and are designed to perform external actions, such as interacting with databases, sending emails, or invoking external services.
When writing activities to execute withing a workflow, we recommend writing them while keeping the following principles in mind:
- Retryable: Activities can be automatically retried by the Temporal system in case of failures. You can configure the retry behavior, such as the maximum number of retries and retry intervals.
- Idempotency: Since Activities are designed to be retryable, executing an activity multiple times should not have unintended side effects. This is crucial for ensuring the reliability of workflow executions.
- Cancellable: Activities can be cancelled, which means that they can
be stopped before they complete execution. This is useful for handling
long-running activities that need to be aborted in certain situations.
As a consequence, activities should to be written to receive
ActivityCancelReason
exceptions asynchronously.
Synopsis
- data Activity env a
- heartbeat :: [Payload] -> Activity env ()
- data ActivityInfo = ActivityInfo {
- workflowNamespace :: Namespace
- workflowType :: WorkflowType
- workflowId :: WorkflowId
- runId :: RunId
- activityId :: ActivityId
- activityType :: Text
- headerFields :: Map Text Payload
- heartbeatDetails :: Vector Payload
- scheduledTime :: SystemTime
- currentAttemptScheduledTime :: SystemTime
- startedTime :: SystemTime
- attempt :: Word32
- scheduleToCloseTimeout :: Maybe Duration
- startToCloseTimeout :: Maybe Duration
- heartbeatTimeout :: Maybe Duration
- retryPolicy :: Maybe RetryPolicy
- isLocal :: Bool
- taskToken :: TaskToken
- askActivityInfo :: Activity env ActivityInfo
- data ActivityCancelReason
- activityWorkflowClient :: Activity env WorkflowClient
- askActivityClient :: Activity env Client
- provideActivity :: forall codec env f. (f ~ (ArgsOf f :->: Activity env (ResultOf (Activity env) f)), FunctionSupportsCodec codec (ArgsOf f) (ResultOf (Activity env) f)) => codec -> Text -> f -> ProvidedActivity env f
- data ProvidedActivity env f = ProvidedActivity {
- definition :: ActivityDefinition env
- reference :: KnownActivity (ArgsOf f) (ResultOf (Activity env) f)
- data KnownActivity (args :: [Type]) result = FunctionSupportsCodec codec args result => KnownActivity {
- knownActivityCodec :: codec
- knownActivityName :: Text
- data ActivityDefinition env = ActivityDefinition {
- activityName :: Text
- activityRun :: ActivityEnv env -> ExecuteActivityInput -> IO (Either String Payload)
- newtype TaskToken = TaskToken {}
- data CompleteAsync = CompleteAsync
- completeAsync :: MonadIO m => m ()
- type family (args :: [Type]) :->: result where ...
Documentation
What is an Activity?
An activity is a unit of work that is executed by a worker. It is a specialized function call that can be executed one or more times, and can be cancelled while it is running.
Activity Definitions are executed as normal functions.
In the event of failure, the function begins at its initial state when retried (except when Activity Heartbeats are established).
Therefore, an Activity Definition has no restrictions on the code it contains.
Idempotency
Temporal recommends that Activities be idempotent.
Idempotent means that performing an operation multiple times has the same result as performing it once. In the context of Temporal, Activities should be designed to be safely executed multiple times without causing unexpected or undesired side effects.
An Activity is idempotent if multiple Activity Task Executions do not change the state of the system beyond the first Activity Task Execution.
We recommend using idempotency keys for critical side effects.
The lack of idempotency might affect the correctness of your application but does not affect the Temporal Platform. In other words, lack of idempotency doesn't lead to a platform error.
In some cases, whether something is idempotent doesn't affect the correctness of an application. For example, if you have a monotonically incrementing counter, you might not care that retries increment the counter because you don’t care about the actual value, only that the current value is greater than a previous value.
Instances
Built-in Acitivity primitives
heartbeat :: [Payload] -> Activity env () Source #
An Activity Heartbeat is a ping from the Worker Process that is executing the Activity to the Temporal Cluster. Each Heartbeat informs the Temporal Cluster that the Activity Execution is making progress and the Worker has not crashed. If the Cluster does not receive a Heartbeat within a Heartbeat Timeout time period, the Activity will be considered failed and another Activity Task Execution may be scheduled according to the Retry Policy.
Heartbeats may not always be sent to the Cluster—they may be throttled by the Worker.
Activity Cancellations are delivered to Activities from the Cluster when they Heartbeat. Activities that don't Heartbeat can't receive a Cancellation. Heartbeat throttling may lead to Cancellation getting delivered later than expected.
Heartbeats can contain payloads describing the Activity's current progress. If an Activity gets retried, the Activity can access the details from the last Heartbeat that was sent to the Cluster.
data ActivityInfo Source #
ActivityInfo
provides information about the currently executing activity.
It can be used for logging, introspection, etc.
Constructors
ActivityInfo | |
Fields
|
Instances
Show ActivityInfo Source # | |
Defined in Temporal.Activity.Types Methods showsPrec :: Int -> ActivityInfo -> ShowS # show :: ActivityInfo -> String # showList :: [ActivityInfo] -> ShowS # | |
Eq ActivityInfo Source # | |
Defined in Temporal.Activity.Types |
askActivityInfo :: Activity env ActivityInfo Source #
data ActivityCancelReason Source #
A type of exception thrown to a running activity to cancel it due to things happening
with the worker, such as a shutdown. This differs from a normal activity cancellation, which
uses the cancel
function from the async
package.
Constructors
NotFound | The activity no longer exists on the server (may already be completed or its workflow may be completed). |
CancelRequested | The activity was explicitly cancelled. |
Timeout | Activity timeout caused the activity to be marked cancelled. |
WorkerShutdown | The worker the activity is running on is shutting down. |
UnknownCancellationReason | We received a cancellation reason that we don't know how to handle. |
Instances
Exception ActivityCancelReason Source # | |
Defined in Temporal.Exception | |
Show ActivityCancelReason Source # | |
Defined in Temporal.Exception Methods showsPrec :: Int -> ActivityCancelReason -> ShowS # show :: ActivityCancelReason -> String # showList :: [ActivityCancelReason] -> ShowS # |
activityWorkflowClient :: Activity env WorkflowClient Source #
It is very common for an Activity to do things that interact
with another Workflow, like using query
or signal
. This function
provides a WorkflowClient
that can be used to accomplish that.
askActivityClient :: Activity env Client Source #
The Activity monad provides access to the underlying Temporal client since it is very common for an Activity to interact with other Workflows.
A common use-case is to use signal
to signal another Workflow
from an Activity.
Using the provided client ensures that a consistent set of interceptors are used for all relevant actions.
Defining Activities
provideActivity :: forall codec env f. (f ~ (ArgsOf f :->: Activity env (ResultOf (Activity env) f)), FunctionSupportsCodec codec (ArgsOf f) (ResultOf (Activity env) f)) => codec -> Text -> f -> ProvidedActivity env f Source #
A utility function for constructing an ActivityDefinition
from a function as well as
a KnownActivity
value. This is useful for keeping the argument, codec, and result types
in sync with each other so that changes to the function are reflected at their use sites.
myActivity :: Activity env () myActivity = liftIO $ putStrLn "Hello world!" myActivityDef :: ActivityDefinition env (Activity env ()) myActivityDef = provideWorkflow JSON -- codec for serializing arguments and results "myActivity" -- visible name of the workflow myActivity -- the workflow function
data ProvidedActivity env f Source #
Constructors
ProvidedActivity | |
Fields
|
Instances
ToDefinitions env (ProvidedActivity env f) Source # | |||||||||
Defined in Temporal.Worker Methods toDefinitions :: ProvidedActivity env f -> Definitions env Source # | |||||||||
ActivityDef (ProvidedActivity env f) Source # | |||||||||
Defined in Temporal.Activity.Definition Associated Types
Methods activityDefinition :: ProvidedActivity env f -> ActivityDefinition (ActivityDefinitionEnv (ProvidedActivity env f)) Source # | |||||||||
ActivityRef (ProvidedActivity env f) Source # | |||||||||
Defined in Temporal.Activity.Definition Associated Types
Methods activityRef :: ProvidedActivity env f -> KnownActivity (ActivityArgs (ProvidedActivity env f)) (ActivityResult (ProvidedActivity env f)) Source # | |||||||||
type ActivityArgs (ProvidedActivity env f) Source # | |||||||||
Defined in Temporal.Activity.Definition | |||||||||
type ActivityDefinitionEnv (ProvidedActivity env f) Source # | |||||||||
Defined in Temporal.Activity.Definition | |||||||||
type ActivityResult (ProvidedActivity env f) Source # | |||||||||
Defined in Temporal.Activity.Definition |
data KnownActivity (args :: [Type]) result Source #
Constructors
FunctionSupportsCodec codec args result => KnownActivity | |
Fields
|
Instances
VarArgs args => UseAsInWorkflowProxy ProxyAsync (KnownActivity args res) Source # | |||||||||
Defined in Temporal.Bundle Methods useAsInWorkflowProxy :: ProxyAsync -> KnownActivity args res -> (RefStartOptions @@ KnownActivity args res) -> InWorkflowProxies ProxyAsync @@ KnownActivity args res Source # | |||||||||
VarArgs args => UseAsInWorkflowProxy ProxySync (KnownActivity args res) Source # | |||||||||
Defined in Temporal.Bundle Methods useAsInWorkflowProxy :: ProxySync -> KnownActivity args res -> (RefStartOptions @@ KnownActivity args res) -> InWorkflowProxies ProxySync @@ KnownActivity args res Source # | |||||||||
VarArgs args => ActivityRef (KnownActivity args result) Source # | |||||||||
Defined in Temporal.Activity.Definition Associated Types
Methods activityRef :: KnownActivity args result -> KnownActivity (ActivityArgs (KnownActivity args result)) (ActivityResult (KnownActivity args result)) Source # | |||||||||
FieldToStartOptionDefaults (KnownActivity args res) Source # | |||||||||
Defined in Temporal.Bundle Methods refStartOptionsDefaults :: Proxy (KnownActivity args res) -> StartActivityOptions -> StartChildWorkflowOptions -> RefStartOptionsType (KnownActivity args res) Source # | |||||||||
type Eval (InWorkflowProxies ProxyAsync (KnownActivity args res) :: Type -> Type) Source # | |||||||||
Defined in Temporal.Bundle type Eval (InWorkflowProxies ProxyAsync (KnownActivity args res) :: Type -> Type) = args :->: Workflow (Task res) | |||||||||
type Eval (InWorkflowProxies ProxySync (KnownActivity args res) :: Type -> Type) Source # | |||||||||
Defined in Temporal.Bundle type Eval (InWorkflowProxies ProxySync (KnownActivity args res) :: Type -> Type) = args :->: Workflow res | |||||||||
type ActivityArgs (KnownActivity args result) Source # | |||||||||
Defined in Temporal.Activity.Definition | |||||||||
type ActivityResult (KnownActivity args result) Source # | |||||||||
Defined in Temporal.Activity.Definition |
data ActivityDefinition env Source #
Constructors
ActivityDefinition | |
Fields
|
Instances
ToDefinitions env (ActivityDefinition env) Source # | |||||
Defined in Temporal.Worker Methods toDefinitions :: ActivityDefinition env -> Definitions env Source # | |||||
ActivityDef (ActivityDefinition env) Source # | |||||
Defined in Temporal.Activity.Definition Associated Types
Methods activityDefinition :: ActivityDefinition env -> ActivityDefinition (ActivityDefinitionEnv (ActivityDefinition env)) Source # | |||||
type ActivityDefinitionEnv (ActivityDefinition env) Source # | |||||
Defined in Temporal.Activity.Definition |
Asynchronous Completion
A Task Token is a unique identifier for a Task. It can be used with the AsyncActivity
API to signal activity completion or failure.
Constructors
TaskToken | |
Fields
|
Instances
IsString TaskToken Source # | |
Defined in Temporal.Common Methods fromString :: String -> TaskToken # | |
Show TaskToken Source # | |
Eq TaskToken Source # | |
Ord TaskToken Source # | |
Hashable TaskToken Source # | |
Defined in Temporal.Common |
data CompleteAsync Source #
Asynchronous Activity Completion is a feature that enables an Activity Function to return without causing the Activity Execution to complete. The Temporal Client can then be used to both Heartbeat Activity Execution progress and eventually provide a result.
The intended use-case for this feature is when an external system has the final result of a computation, started by an Activity.
Consider using Asynchronous Activities instead of Signals if the external process is unreliable and might fail to send critical status updates through a Signal.
Consider using Signals as an alternative to Asynchronous Activities to return data back to a Workflow Execution if there is a human in the process loop. The reason is that a human in the loop means multiple steps in the process. The first is the Activity Function that stores state in an external system and at least one other step where a human would “complete” the activity. If the first step fails, you want to detect that quickly and retry instead of waiting for the entire process, which could be significantly longer when humans are involved.
Constructors
CompleteAsync |
Instances
Exception CompleteAsync Source # | |
Defined in Temporal.Exception Methods toException :: CompleteAsync -> SomeException # fromException :: SomeException -> Maybe CompleteAsync # displayException :: CompleteAsync -> String # backtraceDesired :: CompleteAsync -> Bool # | |
Show CompleteAsync Source # | |
Defined in Temporal.Exception Methods showsPrec :: Int -> CompleteAsync -> ShowS # show :: CompleteAsync -> String # showList :: [CompleteAsync] -> ShowS # |
completeAsync :: MonadIO m => m () Source #
Signal to the Temporal worker that the activity will be completed asynchronously (out of band).
In order to complete the activity once it has been moved to async, use complete
, fail
, or reportCancellation
.
Note: Under the hood, this throws a CompleteAsync
exception, which is caught and handled by the Temporal worker.
Make sure that your own code does not swallow or rewrap this exception, otherwise the activity will fail instead of signalling that it will be completed asynchronously.
Commonly used
type family (args :: [Type]) :->: result where ... infixr 0 Source #
Construct a function type from a list of argument types and a result type.
Orphan instances
HasWorkflowClient (Activity env) Source # | |
Methods |