Safe Haskell | None |
---|---|
Language | Haskell2010 |
Temporal.Worker
Description
Temporal workers are responsible for executing workflows and activities by polling the Temporal service for tasks in specific task queues. This module provides types and functions for defining and configuring Temporal workers, including both workflow and activity workers.
Types and functions for configuring workers, adding workflows and activities, and setting worker options are included in this module.
Note: workers only poll from a single task queue. If you need to poll from multiple task queues, you will need to start multiple workers. Multiple workers may run in the same process.
Synopsis
- data Worker env
- startWorker :: (MonadUnliftIO m, MonadCatch m) => Client -> WorkerConfig actEnv -> m (Worker actEnv)
- waitWorker :: MonadIO m => Worker actEnv -> m ()
- shutdown :: MonadUnliftIO m => Worker actEnv -> m ()
- linkWorker :: Worker actEnv -> IO ()
- pollWorkerSTM :: Worker actEnv -> STM (Maybe (Either SomeException ()))
- waitWorkerSTM :: Worker actEnv -> STM ()
- replaceEnvironment :: MonadIO m => Worker actEnv -> actEnv -> m ()
- data WorkerConfig activityEnv = WorkerConfig {
- deadlockTimeout :: Maybe Int
- wfDefs :: HashMap Text WorkflowDefinition
- actEnv :: activityEnv
- actDefs :: HashMap Text (ActivityDefinition activityEnv)
- coreConfig :: WorkerConfig
- interceptorConfig :: Interceptors activityEnv
- applicationErrorConverters :: [ApplicationFailureHandler]
- tracerProvider :: TracerProvider
- logger :: Loc -> LogSource -> LogLevel -> LogStr -> IO ()
- payloadProcessor :: PayloadProcessor
- data Definitions env = Definitions {
- workflowDefinitions :: !(HashMap Text WorkflowDefinition)
- activityDefinitions :: !(HashMap Text (ActivityDefinition env))
- class ToDefinitions env f where
- toDefinitions :: f -> Definitions env
- data ConfigM actEnv a
- configure :: ToDefinitions actEnv defs => actEnv -> defs -> ConfigM actEnv () -> WorkerConfig actEnv
- runReplayHistory :: (MonadUnliftIO m, MonadCatch m) => Runtime -> WorkerConfig actEnv -> History -> m (Either ReplayHistoryFailure ())
- data ReplayHistoryFailure = ReplayHistoryFailure {}
- subscribeToEvictions :: MonadIO m => Worker actEnv -> m (TChan EvictionWithRunID)
- subscribeToEvictionsSTM :: Worker actEnv -> STM (TChan EvictionWithRunID)
- data EvictionWithRunID = EvictionWithRunID {}
- evictionMessage :: EvictionWithRunID -> Text
- evictionWasFatal :: EvictionWithRunID -> Bool
- class WorkflowDef a where
- workflowDefinition :: a -> WorkflowDefinition
- class ActivityDef a where
- type ActivityDefinitionEnv a
- activityDefinition :: a -> ActivityDefinition (ActivityDefinitionEnv a)
- addErrorConverter :: Exception e => (e -> ApplicationFailure) -> ConfigM actEnv ()
- setLogger :: (Loc -> LogSource -> LogLevel -> LogStr -> IO ()) -> ConfigM actEnv ()
- setTracerProvider :: TracerProvider -> ConfigM actEnv ()
- setNamespace :: Namespace -> ConfigM actEnv ()
- setTaskQueue :: TaskQueue -> ConfigM actEnv ()
- setBuildId :: Text -> ConfigM actEnv ()
- setIdentity :: Text -> ConfigM actEnv ()
- setMaxCachedWorkflows :: Word64 -> ConfigM actEnv ()
- setMaxOutstandingWorkflowTasks :: Word64 -> ConfigM actEnv ()
- setMaxOutstandingActivities :: Word64 -> ConfigM actEnv ()
- setMaxOutstandingLocalActivities :: Word64 -> ConfigM actEnv ()
- setMaxConcurrentWorkflowTaskPolls :: Word64 -> ConfigM actEnv ()
- setNonstickyToStickyPollRatio :: Float -> ConfigM actEnv ()
- setMaxConcurrentActivityTaskPolls :: Word64 -> ConfigM actEnv ()
- setNoRemoteActivities :: Bool -> ConfigM actEnv ()
- setStickyQueueScheduleToStartTimeoutMillis :: Word64 -> ConfigM actEnv ()
- setMaxHeartbeatThrottleIntervalMillis :: Word64 -> ConfigM actEnv ()
- setDefaultHeartbeatThrottleIntervalMillis :: Word64 -> ConfigM actEnv ()
- setMaxActivitiesPerSecond :: Double -> ConfigM actEnv ()
- setMaxTaskQueueActivitiesPerSecond :: Double -> ConfigM actEnv ()
- setGracefulShutdownPeriodMillis :: Word64 -> ConfigM actEnv ()
- addInterceptors :: Interceptors actEnv -> ConfigM actEnv ()
- setPayloadProcessor :: PayloadProcessor -> ConfigM actEnv ()
- newtype WorkflowId = WorkflowId {}
Documentation
A Worker is responsible for polling a Task Queue, dequeueing a Task, executing your code in response to a Task, and responding to the Temporal Cluster with the results.
Worker Processes are external to a Temporal Cluster. Temporal Application developers are responsible for developing Worker Programs and operating Worker Processes. Said another way, the Temporal Cluster (including the Temporal Cloud) doesn't execute any of your code (Workflow and Activity Definitions) on Temporal Cluster machines. The Cluster is solely responsible for orchestrating State Transitions and providing Tasks to the next available Worker Entity.
A Worker Process can be both a Workflow Worker Process and an Activity Worker Process. Haskell the ability to have multiple Worker Entities in a single Worker Process.
A single Worker Entity can listen to only a single Task Queue. But if a Worker Process has multiple Worker Entities, the Worker Process could be listening to multiple Task Queues.
startWorker :: (MonadUnliftIO m, MonadCatch m) => Client -> WorkerConfig actEnv -> m (Worker actEnv) Source #
waitWorker :: MonadIO m => Worker actEnv -> m () Source #
Wait for a worker to exit. This waits for both the workflow and activity loops to complete.
Any exceptions thrown by the workflow or activity loops will be rethrown.
This function is generally not needed, as shutdown
will wait for the worker to exit.
shutdown :: MonadUnliftIO m => Worker actEnv -> m () Source #
Shut down a worker. This will initiate a graceful shutdown of the worker, waiting for all in-flight tasks to complete before finalizing the shutdown.
linkWorker :: Worker actEnv -> IO () Source #
Link a worker to the current thread. This will cause uncaught worker exceptions to be rethrown in the current thread.
If the worker has both workflow and activity definitions registered, this will link both the workflow and activity loops. If one of the loops fails, the worker will initiate graceful shutdown (but not block) before rethrowing the exception.
pollWorkerSTM :: Worker actEnv -> STM (Maybe (Either SomeException ())) Source #
waitWorkerSTM :: Worker actEnv -> STM () Source #
replaceEnvironment :: MonadIO m => Worker actEnv -> actEnv -> m () Source #
Replace the environment for activities run within a Worker. This is generally not needed, but can be useful for reusing a Worker under test.
Configuration
data WorkerConfig activityEnv Source #
Constructors
WorkerConfig | |
Fields
|
data Definitions env Source #
Constructors
Definitions | |
Fields
|
Instances
ToDefinitions env (Definitions env) Source # | |
Defined in Temporal.Worker Methods toDefinitions :: Definitions env -> Definitions env Source # | |
Monoid (Definitions env) Source # | |
Defined in Temporal.Worker Methods mempty :: Definitions env # mappend :: Definitions env -> Definitions env -> Definitions env # mconcat :: [Definitions env] -> Definitions env # | |
Semigroup (Definitions env) Source # | |
Defined in Temporal.Worker Methods (<>) :: Definitions env -> Definitions env -> Definitions env # sconcat :: NonEmpty (Definitions env) -> Definitions env # stimes :: Integral b => b -> Definitions env -> Definitions env # |
class ToDefinitions env f where Source #
Methods
toDefinitions :: f -> Definitions env Source #
Instances
data ConfigM actEnv a Source #
Instances
Applicative (ConfigM actEnv) Source # | |
Defined in Temporal.Worker Methods pure :: a -> ConfigM actEnv a # (<*>) :: ConfigM actEnv (a -> b) -> ConfigM actEnv a -> ConfigM actEnv b # liftA2 :: (a -> b -> c) -> ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv c # (*>) :: ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv b # (<*) :: ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv a # | |
Functor (ConfigM actEnv) Source # | |
Monad (ConfigM actEnv) Source # | |
Monoid a => Monoid (ConfigM actEnv a) Source # | |
Semigroup a => Semigroup (ConfigM actEnv a) Source # | |
configure :: ToDefinitions actEnv defs => actEnv -> defs -> ConfigM actEnv () -> WorkerConfig actEnv Source #
Turn a configuration block into the final configuration.
Replay
runReplayHistory :: (MonadUnliftIO m, MonadCatch m) => Runtime -> WorkerConfig actEnv -> History -> m (Either ReplayHistoryFailure ()) Source #
Run a worker in replay mode, feeding a history to the worker in order to ensure that the worker execution still works as expected.
data ReplayHistoryFailure Source #
Constructors
ReplayHistoryFailure | |
Instances
Exception ReplayHistoryFailure Source # | |
Defined in Temporal.Worker | |
Show ReplayHistoryFailure Source # | |
Defined in Temporal.Worker Methods showsPrec :: Int -> ReplayHistoryFailure -> ShowS # show :: ReplayHistoryFailure -> String # showList :: [ReplayHistoryFailure] -> ShowS # | |
Eq ReplayHistoryFailure Source # | |
Defined in Temporal.Worker Methods (==) :: ReplayHistoryFailure -> ReplayHistoryFailure -> Bool # (/=) :: ReplayHistoryFailure -> ReplayHistoryFailure -> Bool # |
subscribeToEvictions :: MonadIO m => Worker actEnv -> m (TChan EvictionWithRunID) Source #
Subscribe to evictions from the worker. This is not generally needed, but can be useful for debugging.
subscribeToEvictionsSTM :: Worker actEnv -> STM (TChan EvictionWithRunID) Source #
Subscribe to evictions from the worker. This is not generally needed, but can be useful for debugging.
data EvictionWithRunID Source #
Constructors
EvictionWithRunID | |
Fields
|
Instances
Show EvictionWithRunID Source # | |
Defined in Temporal.Workflow.Worker Methods showsPrec :: Int -> EvictionWithRunID -> ShowS # show :: EvictionWithRunID -> String # showList :: [EvictionWithRunID] -> ShowS # |
Worker options
class WorkflowDef a where Source #
Methods
workflowDefinition :: a -> WorkflowDefinition Source #
Instances
WorkflowDef WorkflowDefinition Source # | |
Defined in Temporal.Workflow.Definition Methods workflowDefinition :: WorkflowDefinition -> WorkflowDefinition Source # | |
(Fn f, WorkflowFn f) => WorkflowDef (WorkflowImpl f) Source # | |
Defined in Temporal.TH.Classes Methods workflowDefinition :: WorkflowImpl f -> WorkflowDefinition Source # | |
WorkflowDef (ProvidedWorkflow f) Source # | |
Defined in Temporal.Workflow.Definition Methods workflowDefinition :: ProvidedWorkflow f -> WorkflowDefinition Source # |
class ActivityDef a where Source #
Associated Types
type ActivityDefinitionEnv a Source #
Methods
activityDefinition :: a -> ActivityDefinition (ActivityDefinitionEnv a) Source #
Instances
ActivityDef (ActivityDefinition env) Source # | |||||
Defined in Temporal.Activity.Definition Associated Types
Methods activityDefinition :: ActivityDefinition env -> ActivityDefinition (ActivityDefinitionEnv (ActivityDefinition env)) Source # | |||||
(Fn f, ActivityFn f) => ActivityDef (ActivityImpl f) Source # | |||||
Defined in Temporal.TH.Classes Associated Types
Methods activityDefinition :: ActivityImpl f -> ActivityDefinition (ActivityDefinitionEnv (ActivityImpl f)) Source # | |||||
ActivityDef (ProvidedActivity env f) Source # | |||||
Defined in Temporal.Activity.Definition Associated Types
Methods activityDefinition :: ProvidedActivity env f -> ActivityDefinition (ActivityDefinitionEnv (ProvidedActivity env f)) Source # |
addErrorConverter :: Exception e => (e -> ApplicationFailure) -> ConfigM actEnv () Source #
setLogger :: (Loc -> LogSource -> LogLevel -> LogStr -> IO ()) -> ConfigM actEnv () Source #
Set the logger for the worker. This is used to log messages from the worker.
setTracerProvider :: TracerProvider -> ConfigM actEnv () Source #
Set the tracer provider for the worker. This is used to trace the worker's internal activity.
setNamespace :: Namespace -> ConfigM actEnv () Source #
The namespace that all workflows and activities will be registered in for this worker.
If you need to register workflows and activities in multiple namespaces, you will need to start multiple workers. Multiple workers may run in the same process.
setTaskQueue :: TaskQueue -> ConfigM actEnv () Source #
A temporal worker, whether polling for workflows or activities, does so by polling a specific "Task Queue".
Any time there is a new activity task or workflow task that needs to be performed by a worker, it'll be queued in a specific task queue.
setBuildId :: Text -> ConfigM actEnv () Source #
A string that should be unique to the exact worker code/binary being executed.
This is used to uniquely identify the worker's code for a handful of purposes, including the worker versioning feature if you have opted into that with useVersioning. It will also populate the binaryChecksum field on older servers.
N.B. this is not the same as the worker's identity, which is a string that identifies the worker instance. The identity is used to identify the worker instance in logs and in the Temporal UI. The buildId is used to identify the exact version of the code and its dependencies. In e.g. Nix, the executable path in the Nix store would be a useful buildId.
setIdentity :: Text -> ConfigM actEnv () Source #
setMaxCachedWorkflows :: Word64 -> ConfigM actEnv () Source #
setMaxOutstandingWorkflowTasks :: Word64 -> ConfigM actEnv () Source #
setMaxOutstandingActivities :: Word64 -> ConfigM actEnv () Source #
setMaxOutstandingLocalActivities :: Word64 -> ConfigM actEnv () Source #
setMaxConcurrentWorkflowTaskPolls :: Word64 -> ConfigM actEnv () Source #
setNonstickyToStickyPollRatio :: Float -> ConfigM actEnv () Source #
setMaxConcurrentActivityTaskPolls :: Word64 -> ConfigM actEnv () Source #
Maximum number of Activity tasks to poll concurrently.
Increase this setting if your Worker is failing to fill in all of its maxConcurrentActivityTaskExecutions slots despite a backlog of Activity Tasks in the Task Queue (ie. due to network latency). Can't be higher than maxConcurrentActivityTaskExecutions.
setNoRemoteActivities :: Bool -> ConfigM actEnv () Source #
setStickyQueueScheduleToStartTimeoutMillis :: Word64 -> ConfigM actEnv () Source #
How long a workflow task is allowed to sit on the sticky queue before it is timed out and moved to the non-sticky queue where it may be picked up by any worker.
setMaxHeartbeatThrottleIntervalMillis :: Word64 -> ConfigM actEnv () Source #
Longest interval for throttling activity heartbeats
setDefaultHeartbeatThrottleIntervalMillis :: Word64 -> ConfigM actEnv () Source #
setMaxActivitiesPerSecond :: Double -> ConfigM actEnv () Source #
Limits the number of Activities per second that this Worker will process. (Does not limit the number of Local Activities.) The Worker will not poll for new Activities if by doing so it might receive and execute an Activity which would cause it to exceed this limit. Must be a positive number.
If unset, no rate limiting will be applied to Worker's Activities. (tctl task-queue describe will display the absence of a limit as 100,000.)
setMaxTaskQueueActivitiesPerSecond :: Double -> ConfigM actEnv () Source #
Sets the maximum number of activities per second the task queue will dispatch, controlled server-side. Note that this only takes effect upon an activity poll request. If multiple workers on the same queue have different values set, they will thrash with the last poller winning.
If unset, no rate limiting will be applied to the task queue.
setGracefulShutdownPeriodMillis :: Word64 -> ConfigM actEnv () Source #
Time to wait for pending tasks to drain after shutdown was requested.
In-flight activities will be cancelled after this period and their current attempt will be resolved as failed if they confirm cancellation (by throwing a CancelledFailure or AbortError).
addInterceptors :: Interceptors actEnv -> ConfigM actEnv () Source #
setPayloadProcessor :: PayloadProcessor -> ConfigM actEnv () Source #
newtype WorkflowId Source #
A Workflow Id is a customizable, application-level identifier for a Workflow Execution that is unique to an Open Workflow Execution within a Namespace.
Constructors
WorkflowId | |
Fields |