temporal-sdk
Safe HaskellNone
LanguageHaskell2010

Temporal.Worker

Description

Temporal workers are responsible for executing workflows and activities by polling the Temporal service for tasks in specific task queues. This module provides types and functions for defining and configuring Temporal workers, including both workflow and activity workers.

Types and functions for configuring workers, adding workflows and activities, and setting worker options are included in this module.

Note: workers only poll from a single task queue. If you need to poll from multiple task queues, you will need to start multiple workers. Multiple workers may run in the same process.

Synopsis

Documentation

data Worker env Source #

A Worker is responsible for polling a Task Queue, dequeueing a Task, executing your code in response to a Task, and responding to the Temporal Cluster with the results.

Worker Processes are external to a Temporal Cluster. Temporal Application developers are responsible for developing Worker Programs and operating Worker Processes. Said another way, the Temporal Cluster (including the Temporal Cloud) doesn't execute any of your code (Workflow and Activity Definitions) on Temporal Cluster machines. The Cluster is solely responsible for orchestrating State Transitions and providing Tasks to the next available Worker Entity.

A Worker Process can be both a Workflow Worker Process and an Activity Worker Process. Haskell the ability to have multiple Worker Entities in a single Worker Process.

A single Worker Entity can listen to only a single Task Queue. But if a Worker Process has multiple Worker Entities, the Worker Process could be listening to multiple Task Queues.

startWorker :: (MonadUnliftIO m, MonadCatch m) => Client -> WorkerConfig actEnv -> m (Worker actEnv) Source #

waitWorker :: MonadIO m => Worker actEnv -> m () Source #

Wait for a worker to exit. This waits for both the workflow and activity loops to complete.

Any exceptions thrown by the workflow or activity loops will be rethrown.

This function is generally not needed, as shutdown will wait for the worker to exit.

shutdown :: MonadUnliftIO m => Worker actEnv -> m () Source #

Shut down a worker. This will initiate a graceful shutdown of the worker, waiting for all in-flight tasks to complete before finalizing the shutdown.

linkWorker :: Worker actEnv -> IO () Source #

Link a worker to the current thread. This will cause uncaught worker exceptions to be rethrown in the current thread.

If the worker has both workflow and activity definitions registered, this will link both the workflow and activity loops. If one of the loops fails, the worker will initiate graceful shutdown (but not block) before rethrowing the exception.

waitWorkerSTM :: Worker actEnv -> STM () Source #

replaceEnvironment :: MonadIO m => Worker actEnv -> actEnv -> m () Source #

Replace the environment for activities run within a Worker. This is generally not needed, but can be useful for reusing a Worker under test.

Configuration

data WorkerConfig activityEnv Source #

Constructors

WorkerConfig 

Fields

data Definitions env Source #

Instances

Instances details
ToDefinitions env (Definitions env) Source # 
Instance details

Defined in Temporal.Worker

Monoid (Definitions env) Source # 
Instance details

Defined in Temporal.Worker

Methods

mempty :: Definitions env #

mappend :: Definitions env -> Definitions env -> Definitions env #

mconcat :: [Definitions env] -> Definitions env #

Semigroup (Definitions env) Source # 
Instance details

Defined in Temporal.Worker

Methods

(<>) :: Definitions env -> Definitions env -> Definitions env #

sconcat :: NonEmpty (Definitions env) -> Definitions env #

stimes :: Integral b => b -> Definitions env -> Definitions env #

class ToDefinitions env f where Source #

Methods

toDefinitions :: f -> Definitions env Source #

Instances

Instances details
ToDefinitions env WorkflowDefinition Source # 
Instance details

Defined in Temporal.Worker

ToDefinitions env (ActivityDefinition env) Source # 
Instance details

Defined in Temporal.Worker

ToDefinitions env (Definitions env) Source # 
Instance details

Defined in Temporal.Worker

ToDefinitions env (ProvidedWorkflow f) Source # 
Instance details

Defined in Temporal.Worker

ToDefinitions env (ProvidedActivity env f) Source # 
Instance details

Defined in Temporal.Worker

(ToDefinitions env _1, ToDefinitions env _2) => ToDefinitions env (_1, _2) Source # 
Instance details

Defined in Temporal.Worker

Methods

toDefinitions :: (_1, _2) -> Definitions env Source #

(ToDefinitions env _1, ToDefinitions env _2, ToDefinitions env _3) => ToDefinitions env (_1, _2, _3) Source # 
Instance details

Defined in Temporal.Worker

Methods

toDefinitions :: (_1, _2, _3) -> Definitions env Source #

(ToDefinitions env _1, ToDefinitions env _2, ToDefinitions env _3, ToDefinitions env _4) => ToDefinitions env (_1, _2, _3, _4) Source # 
Instance details

Defined in Temporal.Worker

Methods

toDefinitions :: (_1, _2, _3, _4) -> Definitions env Source #

(ToDefinitions env _1, ToDefinitions env _2, ToDefinitions env _3, ToDefinitions env _4, ToDefinitions env _5) => ToDefinitions env (_1, _2, _3, _4, _5) Source # 
Instance details

Defined in Temporal.Worker

Methods

toDefinitions :: (_1, _2, _3, _4, _5) -> Definitions env Source #

(ToDefinitions env _1, ToDefinitions env _2, ToDefinitions env _3, ToDefinitions env _4, ToDefinitions env _5, ToDefinitions env _6) => ToDefinitions env (_1, _2, _3, _4, _5, _6) Source # 
Instance details

Defined in Temporal.Worker

Methods

toDefinitions :: (_1, _2, _3, _4, _5, _6) -> Definitions env Source #

data ConfigM actEnv a Source #

Instances

Instances details
Applicative (ConfigM actEnv) Source # 
Instance details

Defined in Temporal.Worker

Methods

pure :: a -> ConfigM actEnv a #

(<*>) :: ConfigM actEnv (a -> b) -> ConfigM actEnv a -> ConfigM actEnv b #

liftA2 :: (a -> b -> c) -> ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv c #

(*>) :: ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv b #

(<*) :: ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv a #

Functor (ConfigM actEnv) Source # 
Instance details

Defined in Temporal.Worker

Methods

fmap :: (a -> b) -> ConfigM actEnv a -> ConfigM actEnv b #

(<$) :: a -> ConfigM actEnv b -> ConfigM actEnv a #

Monad (ConfigM actEnv) Source # 
Instance details

Defined in Temporal.Worker

Methods

(>>=) :: ConfigM actEnv a -> (a -> ConfigM actEnv b) -> ConfigM actEnv b #

(>>) :: ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv b #

return :: a -> ConfigM actEnv a #

Monoid a => Monoid (ConfigM actEnv a) Source # 
Instance details

Defined in Temporal.Worker

Methods

mempty :: ConfigM actEnv a #

mappend :: ConfigM actEnv a -> ConfigM actEnv a -> ConfigM actEnv a #

mconcat :: [ConfigM actEnv a] -> ConfigM actEnv a #

Semigroup a => Semigroup (ConfigM actEnv a) Source # 
Instance details

Defined in Temporal.Worker

Methods

(<>) :: ConfigM actEnv a -> ConfigM actEnv a -> ConfigM actEnv a #

sconcat :: NonEmpty (ConfigM actEnv a) -> ConfigM actEnv a #

stimes :: Integral b => b -> ConfigM actEnv a -> ConfigM actEnv a #

configure :: ToDefinitions actEnv defs => actEnv -> defs -> ConfigM actEnv () -> WorkerConfig actEnv Source #

Turn a configuration block into the final configuration.

Replay

runReplayHistory :: (MonadUnliftIO m, MonadCatch m) => Runtime -> WorkerConfig actEnv -> History -> m (Either ReplayHistoryFailure ()) Source #

Run a worker in replay mode, feeding a history to the worker in order to ensure that the worker execution still works as expected.

subscribeToEvictions :: MonadIO m => Worker actEnv -> m (TChan EvictionWithRunID) Source #

Subscribe to evictions from the worker. This is not generally needed, but can be useful for debugging.

subscribeToEvictionsSTM :: Worker actEnv -> STM (TChan EvictionWithRunID) Source #

Subscribe to evictions from the worker. This is not generally needed, but can be useful for debugging.

Worker options

setLogger :: (Loc -> LogSource -> LogLevel -> LogStr -> IO ()) -> ConfigM actEnv () Source #

Set the logger for the worker. This is used to log messages from the worker.

setTracerProvider :: TracerProvider -> ConfigM actEnv () Source #

Set the tracer provider for the worker. This is used to trace the worker's internal activity.

setNamespace :: Namespace -> ConfigM actEnv () Source #

The namespace that all workflows and activities will be registered in for this worker.

If you need to register workflows and activities in multiple namespaces, you will need to start multiple workers. Multiple workers may run in the same process.

setTaskQueue :: TaskQueue -> ConfigM actEnv () Source #

A temporal worker, whether polling for workflows or activities, does so by polling a specific "Task Queue".

Any time there is a new activity task or workflow task that needs to be performed by a worker, it'll be queued in a specific task queue.

setBuildId :: Text -> ConfigM actEnv () Source #

A string that should be unique to the exact worker code/binary being executed.

This is used to uniquely identify the worker's code for a handful of purposes, including the worker versioning feature if you have opted into that with useVersioning. It will also populate the binaryChecksum field on older servers.

N.B. this is not the same as the worker's identity, which is a string that identifies the worker instance. The identity is used to identify the worker instance in logs and in the Temporal UI. The buildId is used to identify the exact version of the code and its dependencies. In e.g. Nix, the executable path in the Nix store would be a useful buildId.

setIdentity :: Text -> ConfigM actEnv () Source #

setMaxConcurrentActivityTaskPolls :: Word64 -> ConfigM actEnv () Source #

Maximum number of Activity tasks to poll concurrently.

Increase this setting if your Worker is failing to fill in all of its maxConcurrentActivityTaskExecutions slots despite a backlog of Activity Tasks in the Task Queue (ie. due to network latency). Can't be higher than maxConcurrentActivityTaskExecutions.

setStickyQueueScheduleToStartTimeoutMillis :: Word64 -> ConfigM actEnv () Source #

How long a workflow task is allowed to sit on the sticky queue before it is timed out and moved to the non-sticky queue where it may be picked up by any worker.

setMaxHeartbeatThrottleIntervalMillis :: Word64 -> ConfigM actEnv () Source #

Longest interval for throttling activity heartbeats

setMaxActivitiesPerSecond :: Double -> ConfigM actEnv () Source #

Limits the number of Activities per second that this Worker will process. (Does not limit the number of Local Activities.) The Worker will not poll for new Activities if by doing so it might receive and execute an Activity which would cause it to exceed this limit. Must be a positive number.

If unset, no rate limiting will be applied to Worker's Activities. (tctl task-queue describe will display the absence of a limit as 100,000.)

setMaxTaskQueueActivitiesPerSecond :: Double -> ConfigM actEnv () Source #

Sets the maximum number of activities per second the task queue will dispatch, controlled server-side. Note that this only takes effect upon an activity poll request. If multiple workers on the same queue have different values set, they will thrash with the last poller winning.

If unset, no rate limiting will be applied to the task queue.

setGracefulShutdownPeriodMillis :: Word64 -> ConfigM actEnv () Source #

Time to wait for pending tasks to drain after shutdown was requested.

In-flight activities will be cancelled after this period and their current attempt will be resolved as failed if they confirm cancellation (by throwing a CancelledFailure or AbortError).

newtype WorkflowId Source #

A Workflow Id is a customizable, application-level identifier for a Workflow Execution that is unique to an Open Workflow Execution within a Namespace.

Constructors

WorkflowId 

Fields

Instances

Instances details
FromJSON WorkflowId Source # 
Instance details

Defined in Temporal.Common

ToJSON WorkflowId Source # 
Instance details

Defined in Temporal.Common

IsString WorkflowId Source # 
Instance details

Defined in Temporal.Common

Show WorkflowId Source # 
Instance details

Defined in Temporal.Common

Eq WorkflowId Source # 
Instance details

Defined in Temporal.Common

Ord WorkflowId Source # 
Instance details

Defined in Temporal.Common

Hashable WorkflowId Source # 
Instance details

Defined in Temporal.Common

Lift WorkflowId Source # 
Instance details

Defined in Temporal.Common

Methods

lift :: Quote m => WorkflowId -> m Exp #

liftTyped :: forall (m :: Type -> Type). Quote m => WorkflowId -> Code m WorkflowId #