temporal-sdk
Safe HaskellNone
LanguageHaskell2010

Temporal.Activity

Description

Temporal activities are units of work in a Temporal workflow. They encapsulate a specific task or operation that needs to be executed within a workflow instance. Activities can be synchronous or asynchronous and are designed to perform external actions, such as interacting with databases, sending emails, or invoking external services.

When writing activities to execute withing a workflow, we recommend writing them while keeping the following principles in mind:

  1. Retryable: Activities can be automatically retried by the Temporal system in case of failures. You can configure the retry behavior, such as the maximum number of retries and retry intervals.
  2. Idempotency: Since Activities are designed to be retryable, executing an activity multiple times should not have unintended side effects. This is crucial for ensuring the reliability of workflow executions.
  3. Cancellable: Activities can be cancelled, which means that they can be stopped before they complete execution. This is useful for handling long-running activities that need to be aborted in certain situations. As a consequence, activities should to be written to receive ActivityCancelReason exceptions asynchronously.
Synopsis

Documentation

data Activity env a Source #

What is an Activity?

An activity is a unit of work that is executed by a worker. It is a specialized function call that can be executed one or more times, and can be cancelled while it is running.

Activity Definitions are executed as normal functions.

In the event of failure, the function begins at its initial state when retried (except when Activity Heartbeats are established).

Therefore, an Activity Definition has no restrictions on the code it contains.

Idempotency

Temporal recommends that Activities be idempotent.

Idempotent means that performing an operation multiple times has the same result as performing it once. In the context of Temporal, Activities should be designed to be safely executed multiple times without causing unexpected or undesired side effects.

An Activity is idempotent if multiple Activity Task Executions do not change the state of the system beyond the first Activity Task Execution.

We recommend using idempotency keys for critical side effects.

The lack of idempotency might affect the correctness of your application but does not affect the Temporal Platform. In other words, lack of idempotency doesn't lead to a platform error.

In some cases, whether something is idempotent doesn't affect the correctness of an application. For example, if you have a monotonically incrementing counter, you might not care that retries increment the counter because you don’t care about the actual value, only that the current value is greater than a previous value.

Instances

Instances details
(FunctionSupportsCodec' (Activity env) codec original, (Def env @@ original) ~ ActivityDefinition env) => DefFromFunction' codec env (Activity env result) original Source # 
Instance details

Defined in Temporal.Bundle

Methods

defFromFunction :: Proxy (Activity env result) -> codec -> String -> original -> Def env @@ original Source #

MonadError IOException (Activity env) Source # 
Instance details

Defined in Temporal.Activity.Definition

Methods

throwError :: IOException -> Activity env a #

catchError :: Activity env a -> (IOException -> Activity env a) -> Activity env a #

MonadReader env (Activity env) Source # 
Instance details

Defined in Temporal.Activity.Definition

Methods

ask :: Activity env env #

local :: (env -> env) -> Activity env a -> Activity env a #

reader :: (env -> a) -> Activity env a #

(FunctionSupportsCodec' (Activity env) codec original, (Ref @@ original) ~ KnownActivity (ArgsOf original) (ResultOf (Activity env) original)) => RefFromFunction' codec (Activity env result) original Source # 
Instance details

Defined in Temporal.Bundle

Methods

refFromFunction :: Proxy (Activity env result) -> codec -> String -> Proxy original -> Ref @@ original Source #

MonadIO (Activity env) Source # 
Instance details

Defined in Temporal.Activity.Definition

Methods

liftIO :: IO a -> Activity env a #

MonadCatch (Activity env) Source # 
Instance details

Defined in Temporal.Activity.Definition

Methods

catch :: (HasCallStack, Exception e) => Activity env a -> (e -> Activity env a) -> Activity env a #

MonadMask (Activity env) Source # 
Instance details

Defined in Temporal.Activity.Definition

Methods

mask :: HasCallStack => ((forall a. Activity env a -> Activity env a) -> Activity env b) -> Activity env b #

uninterruptibleMask :: HasCallStack => ((forall a. Activity env a -> Activity env a) -> Activity env b) -> Activity env b #

generalBracket :: HasCallStack => Activity env a -> (a -> ExitCase b -> Activity env c) -> (a -> Activity env b) -> Activity env (b, c) #

MonadThrow (Activity env) Source # 
Instance details

Defined in Temporal.Activity.Definition

Methods

throwM :: (HasCallStack, Exception e) => e -> Activity env a #

Alternative (Activity env) Source # 
Instance details

Defined in Temporal.Activity.Definition

Methods

empty :: Activity env a #

(<|>) :: Activity env a -> Activity env a -> Activity env a #

some :: Activity env a -> Activity env [a] #

many :: Activity env a -> Activity env [a] #

Applicative (Activity env) Source # 
Instance details

Defined in Temporal.Activity.Definition

Methods

pure :: a -> Activity env a #

(<*>) :: Activity env (a -> b) -> Activity env a -> Activity env b #

liftA2 :: (a -> b -> c) -> Activity env a -> Activity env b -> Activity env c #

(*>) :: Activity env a -> Activity env b -> Activity env b #

(<*) :: Activity env a -> Activity env b -> Activity env a #

Functor (Activity env) Source # 
Instance details

Defined in Temporal.Activity.Definition

Methods

fmap :: (a -> b) -> Activity env a -> Activity env b #

(<$) :: a -> Activity env b -> Activity env a #

Monad (Activity env) Source # 
Instance details

Defined in Temporal.Activity.Definition

Methods

(>>=) :: Activity env a -> (a -> Activity env b) -> Activity env b #

(>>) :: Activity env a -> Activity env b -> Activity env b #

return :: a -> Activity env a #

MonadPlus (Activity env) Source # 
Instance details

Defined in Temporal.Activity.Definition

Methods

mzero :: Activity env a #

mplus :: Activity env a -> Activity env a -> Activity env a #

MonadFail (Activity env) Source # 
Instance details

Defined in Temporal.Activity.Definition

Methods

fail :: String -> Activity env a #

MonadFix (Activity env) Source # 
Instance details

Defined in Temporal.Activity.Definition

Methods

mfix :: (a -> Activity env a) -> Activity env a #

HasWorkflowClient (Activity env) Source # 
Instance details

Defined in Temporal.Activity

MonadUnliftIO (Activity env) Source # 
Instance details

Defined in Temporal.Activity.Definition

Methods

withRunInIO :: ((forall a. Activity env a -> IO a) -> IO b) -> Activity env b #

(TypeError DirectActivityReferenceMsg :: Constraint) => ActivityRef (Activity env a) Source # 
Instance details

Defined in Temporal.Activity.Definition

Associated Types

type ActivityArgs (Activity env a) 
Instance details

Defined in Temporal.Activity.Definition

type ActivityArgs (Activity env a) = '[] :: [Type]
type ActivityResult (Activity env a) 
Instance details

Defined in Temporal.Activity.Definition

type ActivityResult (Activity env a) = a
FieldToStartOptionDefaults (Activity env result) Source # 
Instance details

Defined in Temporal.Bundle

type ActivityArgs (Activity env a) Source # 
Instance details

Defined in Temporal.Activity.Definition

type ActivityArgs (Activity env a) = '[] :: [Type]
type ActivityResult (Activity env a) Source # 
Instance details

Defined in Temporal.Activity.Definition

type ActivityResult (Activity env a) = a

Built-in Acitivity primitives

heartbeat :: [Payload] -> Activity env () Source #

An Activity Heartbeat is a ping from the Worker Process that is executing the Activity to the Temporal Cluster. Each Heartbeat informs the Temporal Cluster that the Activity Execution is making progress and the Worker has not crashed. If the Cluster does not receive a Heartbeat within a Heartbeat Timeout time period, the Activity will be considered failed and another Activity Task Execution may be scheduled according to the Retry Policy.

Heartbeats may not always be sent to the Cluster—they may be throttled by the Worker.

Activity Cancellations are delivered to Activities from the Cluster when they Heartbeat. Activities that don't Heartbeat can't receive a Cancellation. Heartbeat throttling may lead to Cancellation getting delivered later than expected.

Heartbeats can contain payloads describing the Activity's current progress. If an Activity gets retried, the Activity can access the details from the last Heartbeat that was sent to the Cluster.

data ActivityCancelReason Source #

A type of exception thrown to a running activity to cancel it due to things happening with the worker, such as a shutdown. This differs from a normal activity cancellation, which uses the cancel function from the async package.

Constructors

NotFound

The activity no longer exists on the server (may already be completed or its workflow may be completed).

CancelRequested

The activity was explicitly cancelled.

Timeout

Activity timeout caused the activity to be marked cancelled.

WorkerShutdown

The worker the activity is running on is shutting down.

UnknownCancellationReason

We received a cancellation reason that we don't know how to handle.

activityWorkflowClient :: Activity env WorkflowClient Source #

It is very common for an Activity to do things that interact with another Workflow, like using query or signal. This function provides a WorkflowClient that can be used to accomplish that.

askActivityClient :: Activity env Client Source #

The Activity monad provides access to the underlying Temporal client since it is very common for an Activity to interact with other Workflows.

A common use-case is to use signal to signal another Workflow from an Activity.

Using the provided client ensures that a consistent set of interceptors are used for all relevant actions.

Defining Activities

provideActivity :: forall codec env f. (f ~ (ArgsOf f :->: Activity env (ResultOf (Activity env) f)), FunctionSupportsCodec codec (ArgsOf f) (ResultOf (Activity env) f)) => codec -> Text -> f -> ProvidedActivity env f Source #

A utility function for constructing an ActivityDefinition from a function as well as a KnownActivity value. This is useful for keeping the argument, codec, and result types in sync with each other so that changes to the function are reflected at their use sites.

myActivity :: Activity env ()
myActivity = liftIO $ putStrLn "Hello world!"

myActivityDef :: ActivityDefinition env (Activity env ())
myActivityDef = provideWorkflow
  JSON -- codec for serializing arguments and results
  "myActivity" -- visible name of the workflow
  myActivity -- the workflow function

data ProvidedActivity env f Source #

Instances

Instances details
ToDefinitions env (ProvidedActivity env f) Source # 
Instance details

Defined in Temporal.Worker

ActivityDef (ProvidedActivity env f) Source # 
Instance details

Defined in Temporal.Activity.Definition

Associated Types

type ActivityDefinitionEnv (ProvidedActivity env f) 
Instance details

Defined in Temporal.Activity.Definition

ActivityRef (ProvidedActivity env f) Source # 
Instance details

Defined in Temporal.Activity.Definition

Associated Types

type ActivityArgs (ProvidedActivity env f) 
Instance details

Defined in Temporal.Activity.Definition

type ActivityResult (ProvidedActivity env f) 
Instance details

Defined in Temporal.Activity.Definition

type ActivityArgs (ProvidedActivity env f) Source # 
Instance details

Defined in Temporal.Activity.Definition

type ActivityDefinitionEnv (ProvidedActivity env f) Source # 
Instance details

Defined in Temporal.Activity.Definition

type ActivityResult (ProvidedActivity env f) Source # 
Instance details

Defined in Temporal.Activity.Definition

data KnownActivity (args :: [Type]) result Source #

Constructors

FunctionSupportsCodec codec args result => KnownActivity 

Instances

Instances details
VarArgs args => UseAsInWorkflowProxy ProxyAsync (KnownActivity args res) Source # 
Instance details

Defined in Temporal.Bundle

VarArgs args => UseAsInWorkflowProxy ProxySync (KnownActivity args res) Source # 
Instance details

Defined in Temporal.Bundle

VarArgs args => ActivityRef (KnownActivity args result) Source # 
Instance details

Defined in Temporal.Activity.Definition

Associated Types

type ActivityArgs (KnownActivity args result) 
Instance details

Defined in Temporal.Activity.Definition

type ActivityArgs (KnownActivity args result) = args
type ActivityResult (KnownActivity args result) 
Instance details

Defined in Temporal.Activity.Definition

type ActivityResult (KnownActivity args result) = result

Methods

activityRef :: KnownActivity args result -> KnownActivity (ActivityArgs (KnownActivity args result)) (ActivityResult (KnownActivity args result)) Source #

FieldToStartOptionDefaults (KnownActivity args res) Source # 
Instance details

Defined in Temporal.Bundle

type Eval (InWorkflowProxies ProxyAsync (KnownActivity args res) :: Type -> Type) Source # 
Instance details

Defined in Temporal.Bundle

type Eval (InWorkflowProxies ProxySync (KnownActivity args res) :: Type -> Type) Source # 
Instance details

Defined in Temporal.Bundle

type Eval (InWorkflowProxies ProxySync (KnownActivity args res) :: Type -> Type) = args :->: Workflow res
type ActivityArgs (KnownActivity args result) Source # 
Instance details

Defined in Temporal.Activity.Definition

type ActivityArgs (KnownActivity args result) = args
type ActivityResult (KnownActivity args result) Source # 
Instance details

Defined in Temporal.Activity.Definition

type ActivityResult (KnownActivity args result) = result

Asynchronous Completion

newtype TaskToken Source #

A Task Token is a unique identifier for a Task. It can be used with the AsyncActivity API to signal activity completion or failure.

Constructors

TaskToken 

Fields

  • rawTaskToken :: ByteString

    An opaque token that can be used to uniquely identify a Task. This token should not be modified by consumers, but is exposed here as a raw ByteString for transport purposes.

Instances

Instances details
IsString TaskToken Source # 
Instance details

Defined in Temporal.Common

Show TaskToken Source # 
Instance details

Defined in Temporal.Common

Eq TaskToken Source # 
Instance details

Defined in Temporal.Common

Ord TaskToken Source # 
Instance details

Defined in Temporal.Common

Hashable TaskToken Source # 
Instance details

Defined in Temporal.Common

data CompleteAsync Source #

Asynchronous Activity Completion is a feature that enables an Activity Function to return without causing the Activity Execution to complete. The Temporal Client can then be used to both Heartbeat Activity Execution progress and eventually provide a result.

The intended use-case for this feature is when an external system has the final result of a computation, started by an Activity.

Consider using Asynchronous Activities instead of Signals if the external process is unreliable and might fail to send critical status updates through a Signal.

Consider using Signals as an alternative to Asynchronous Activities to return data back to a Workflow Execution if there is a human in the process loop. The reason is that a human in the loop means multiple steps in the process. The first is the Activity Function that stores state in an external system and at least one other step where a human would “complete” the activity. If the first step fails, you want to detect that quickly and retry instead of waiting for the entire process, which could be significantly longer when humans are involved.

Constructors

CompleteAsync 

completeAsync :: MonadIO m => m () Source #

Signal to the Temporal worker that the activity will be completed asynchronously (out of band).

In order to complete the activity once it has been moved to async, use complete, fail, or reportCancellation.

Note: Under the hood, this throws a CompleteAsync exception, which is caught and handled by the Temporal worker.

Make sure that your own code does not swallow or rewrap this exception, otherwise the activity will fail instead of signalling that it will be completed asynchronously.

Commonly used

type family (args :: [Type]) :->: result where ... infixr 0 Source #

Construct a function type from a list of argument types and a result type.

Equations

('[] :: [Type]) :->: result = result 
(arg ': args) :->: result = arg -> args :->: result 

Orphan instances