{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}

{- |
Module: Temporal.Worker
Description: Define and manage Temporal workers for executing workflows and activities.

Temporal workers are responsible for executing workflows and activities by polling
the Temporal service for tasks in specific task queues. This module provides
types and functions for defining and configuring Temporal workers, including
both workflow and activity workers.

Types and functions for configuring workers, adding workflows and activities,
and setting worker options are included in this module.

Note: workers only poll from a single task queue. If you need to poll from
multiple task queues, you will need to start multiple workers. Multiple workers
may run in the same process.
-}
module Temporal.Worker (
  Temporal.Worker.Worker,
  startWorker,
  waitWorker,
  shutdown,
  linkWorker,
  pollWorkerSTM,
  waitWorkerSTM,
  replaceEnvironment,

  -- * Configuration
  WorkerConfig (..),
  Definitions (..),
  ToDefinitions (..),
  ConfigM,
  configure,

  -- * Replay

  --
  -- A replay worker is a worker that is used to replay a workflow execution.
  -- It is not used for real-world execution of workflows, but is used to
  -- replay a workflow execution from history. This is useful for test suites,
  -- as it allows you to ensure that changes to your workflow code do not
  -- break determinism.
  runReplayHistory,
  ReplayHistoryFailure (..),
  subscribeToEvictions,
  subscribeToEvictionsSTM,
  Workflow.EvictionWithRunID (..),
  evictionMessage,
  evictionWasFatal,
  -- startReplayWorker,
  -- Core.HistoryPusher,
  -- Core.pushHistory,
  -- Core.closeHistory,

  -- ** Worker options
  WorkflowDef (..),
  ActivityDef (..),
  addErrorConverter,
  setLogger,
  setTracerProvider,
  setNamespace,
  setTaskQueue,
  setBuildId,
  setIdentity,
  setMaxCachedWorkflows,
  setMaxOutstandingWorkflowTasks,
  setMaxOutstandingActivities,
  setMaxOutstandingLocalActivities,
  setMaxConcurrentWorkflowTaskPolls,
  setNonstickyToStickyPollRatio,
  setMaxConcurrentActivityTaskPolls,
  setNoRemoteActivities,
  setStickyQueueScheduleToStartTimeoutMillis,
  setMaxHeartbeatThrottleIntervalMillis,
  setDefaultHeartbeatThrottleIntervalMillis,
  setMaxActivitiesPerSecond,
  setMaxTaskQueueActivitiesPerSecond,
  setGracefulShutdownPeriodMillis,
  addInterceptors,
  setPayloadProcessor,
  WorkflowId (..),
) where

import Control.Concurrent
import Control.Exception (Exception (..))
import Control.Monad
import Control.Monad.Catch
import Control.Monad.Logger
import Control.Monad.Reader
import Control.Monad.State
import Data.Either (lefts)
import Data.Foldable
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as HashMap
import Data.ProtoLens (encodeMessage)
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import Data.UUID (UUID)
import qualified Data.UUID as UUID
import Data.UUID.V4 (nextRandom)
import Data.Word
import Lens.Family2
import OpenTelemetry.Attributes (emptyAttributes)
import OpenTelemetry.Trace.Core hiding (inSpan)
import qualified OpenTelemetry.Trace.Core as OT
import OpenTelemetry.Trace.Monad
import Proto.Temporal.Api.History.V1.Message (History)
import Proto.Temporal.Api.History.V1.Message_Fields (maybe'workflowExecutionStartedEventAttributes, vec'events, workflowId)
import Proto.Temporal.Sdk.Core.WorkflowActivation.WorkflowActivation
import qualified Proto.Temporal.Sdk.Core.WorkflowActivation.WorkflowActivation_Fields as Activation
import RequireCallStack
import System.IO.Unsafe
import Temporal.Activity.Definition
import qualified Temporal.Activity.Worker as Activity
import Temporal.Common
import Temporal.Common.Async
import Temporal.Core.Client
import Temporal.Core.Worker (InactiveForReplay, WorkerError)
import qualified Temporal.Core.Worker as Core
import Temporal.Exception
import Temporal.Interceptor
import Temporal.Payload (PayloadProcessor (..))
import Temporal.Runtime
import Temporal.Worker.Types
import Temporal.Workflow.Definition
import qualified Temporal.Workflow.Worker as Workflow
import UnliftIO


newtype ConfigM actEnv a = ConfigM {forall actEnv a. ConfigM actEnv a -> State (WorkerConfig actEnv) a
unConfigM :: State (WorkerConfig actEnv) a}
  deriving newtype ((forall a b. (a -> b) -> ConfigM actEnv a -> ConfigM actEnv b)
-> (forall a b. a -> ConfigM actEnv b -> ConfigM actEnv a)
-> Functor (ConfigM actEnv)
forall a b. a -> ConfigM actEnv b -> ConfigM actEnv a
forall a b. (a -> b) -> ConfigM actEnv a -> ConfigM actEnv b
forall actEnv a b. a -> ConfigM actEnv b -> ConfigM actEnv a
forall actEnv a b. (a -> b) -> ConfigM actEnv a -> ConfigM actEnv b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
$cfmap :: forall actEnv a b. (a -> b) -> ConfigM actEnv a -> ConfigM actEnv b
fmap :: forall a b. (a -> b) -> ConfigM actEnv a -> ConfigM actEnv b
$c<$ :: forall actEnv a b. a -> ConfigM actEnv b -> ConfigM actEnv a
<$ :: forall a b. a -> ConfigM actEnv b -> ConfigM actEnv a
Functor, Functor (ConfigM actEnv)
Functor (ConfigM actEnv) =>
(forall a. a -> ConfigM actEnv a)
-> (forall a b.
    ConfigM actEnv (a -> b) -> ConfigM actEnv a -> ConfigM actEnv b)
-> (forall a b c.
    (a -> b -> c)
    -> ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv c)
-> (forall a b.
    ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv b)
-> (forall a b.
    ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv a)
-> Applicative (ConfigM actEnv)
forall actEnv. Functor (ConfigM actEnv)
forall a. a -> ConfigM actEnv a
forall actEnv a. a -> ConfigM actEnv a
forall a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv a
forall a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv b
forall a b.
ConfigM actEnv (a -> b) -> ConfigM actEnv a -> ConfigM actEnv b
forall actEnv a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv a
forall actEnv a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv b
forall actEnv a b.
ConfigM actEnv (a -> b) -> ConfigM actEnv a -> ConfigM actEnv b
forall a b c.
(a -> b -> c)
-> ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv c
forall actEnv a b c.
(a -> b -> c)
-> ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv c
forall (f :: * -> *).
Functor f =>
(forall a. a -> f a)
-> (forall a b. f (a -> b) -> f a -> f b)
-> (forall a b c. (a -> b -> c) -> f a -> f b -> f c)
-> (forall a b. f a -> f b -> f b)
-> (forall a b. f a -> f b -> f a)
-> Applicative f
$cpure :: forall actEnv a. a -> ConfigM actEnv a
pure :: forall a. a -> ConfigM actEnv a
$c<*> :: forall actEnv a b.
ConfigM actEnv (a -> b) -> ConfigM actEnv a -> ConfigM actEnv b
<*> :: forall a b.
ConfigM actEnv (a -> b) -> ConfigM actEnv a -> ConfigM actEnv b
$cliftA2 :: forall actEnv a b c.
(a -> b -> c)
-> ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv c
liftA2 :: forall a b c.
(a -> b -> c)
-> ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv c
$c*> :: forall actEnv a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv b
*> :: forall a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv b
$c<* :: forall actEnv a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv a
<* :: forall a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv a
Applicative, Applicative (ConfigM actEnv)
Applicative (ConfigM actEnv) =>
(forall a b.
 ConfigM actEnv a -> (a -> ConfigM actEnv b) -> ConfigM actEnv b)
-> (forall a b.
    ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv b)
-> (forall a. a -> ConfigM actEnv a)
-> Monad (ConfigM actEnv)
forall actEnv. Applicative (ConfigM actEnv)
forall a. a -> ConfigM actEnv a
forall actEnv a. a -> ConfigM actEnv a
forall a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv b
forall a b.
ConfigM actEnv a -> (a -> ConfigM actEnv b) -> ConfigM actEnv b
forall actEnv a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv b
forall actEnv a b.
ConfigM actEnv a -> (a -> ConfigM actEnv b) -> ConfigM actEnv b
forall (m :: * -> *).
Applicative m =>
(forall a b. m a -> (a -> m b) -> m b)
-> (forall a b. m a -> m b -> m b)
-> (forall a. a -> m a)
-> Monad m
$c>>= :: forall actEnv a b.
ConfigM actEnv a -> (a -> ConfigM actEnv b) -> ConfigM actEnv b
>>= :: forall a b.
ConfigM actEnv a -> (a -> ConfigM actEnv b) -> ConfigM actEnv b
$c>> :: forall actEnv a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv b
>> :: forall a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv b
$creturn :: forall actEnv a. a -> ConfigM actEnv a
return :: forall a. a -> ConfigM actEnv a
Monad)


instance Semigroup a => Semigroup (ConfigM actEnv a) where
  <> :: ConfigM actEnv a -> ConfigM actEnv a -> ConfigM actEnv a
(<>) = (a -> a -> a)
-> ConfigM actEnv a -> ConfigM actEnv a -> ConfigM actEnv a
forall a b c.
(a -> b -> c)
-> ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv c
forall (f :: * -> *) a b c.
Applicative f =>
(a -> b -> c) -> f a -> f b -> f c
liftA2 a -> a -> a
forall a. Semigroup a => a -> a -> a
(<>)


instance Monoid a => Monoid (ConfigM actEnv a) where
  mempty :: ConfigM actEnv a
mempty = a -> ConfigM actEnv a
forall a. a -> ConfigM actEnv a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
forall a. Monoid a => a
mempty


------------------------------------------------------------------------------------
-- Configuration

-- TODO, it would be nice to expose an inert Tracer Provider in hs-opentelemetry-api
inertTracerProvider :: TracerProvider
inertTracerProvider :: TracerProvider
inertTracerProvider = IO TracerProvider -> TracerProvider
forall a. IO a -> a
unsafePerformIO (IO TracerProvider -> TracerProvider)
-> IO TracerProvider -> TracerProvider
forall a b. (a -> b) -> a -> b
$ [Processor] -> TracerProviderOptions -> IO TracerProvider
forall (m :: * -> *).
MonadIO m =>
[Processor] -> TracerProviderOptions -> m TracerProvider
createTracerProvider [] TracerProviderOptions
emptyTracerProviderOptions
{-# NOINLINE inertTracerProvider #-}


-- | Turn a configuration block into the final configuration.
configure :: ToDefinitions actEnv defs => actEnv -> defs -> ConfigM actEnv () -> WorkerConfig actEnv
configure :: forall actEnv defs.
ToDefinitions actEnv defs =>
actEnv -> defs -> ConfigM actEnv () -> WorkerConfig actEnv
configure actEnv
actEnv defs
defs = (State (WorkerConfig actEnv) ()
 -> WorkerConfig actEnv -> WorkerConfig actEnv)
-> WorkerConfig actEnv
-> State (WorkerConfig actEnv) ()
-> WorkerConfig actEnv
forall a b c. (a -> b -> c) -> b -> a -> c
flip State (WorkerConfig actEnv) ()
-> WorkerConfig actEnv -> WorkerConfig actEnv
forall s a. State s a -> s -> s
execState WorkerConfig actEnv
defaultConfig (State (WorkerConfig actEnv) () -> WorkerConfig actEnv)
-> (ConfigM actEnv () -> State (WorkerConfig actEnv) ())
-> ConfigM actEnv ()
-> WorkerConfig actEnv
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConfigM actEnv () -> State (WorkerConfig actEnv) ()
forall actEnv a. ConfigM actEnv a -> State (WorkerConfig actEnv) a
unConfigM
  where
    defaultConfig :: WorkerConfig actEnv
defaultConfig =
      let (Definitions HashMap Text WorkflowDefinition
wfDefs HashMap Text (ActivityDefinition actEnv)
actDefs) = defs -> Definitions actEnv
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions defs
defs
      in WorkerConfig
          { coreConfig :: WorkerConfig
coreConfig = WorkerConfig
Core.defaultWorkerConfig
          , deadlockTimeout :: Maybe Int
deadlockTimeout = Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1000000
          , interceptorConfig :: Interceptors actEnv
interceptorConfig = Interceptors actEnv
forall a. Monoid a => a
mempty
          , applicationErrorConverters :: [ApplicationFailureHandler]
applicationErrorConverters = [ApplicationFailureHandler]
standardApplicationFailureHandlers
          , logger :: Loc -> Text -> LogLevel -> LogStr -> IO ()
logger = \Loc
_ Text
_ LogLevel
_ LogStr
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
          , tracerProvider :: TracerProvider
tracerProvider = TracerProvider
inertTracerProvider
          , payloadProcessor :: PayloadProcessor
payloadProcessor = (Payload -> IO Payload)
-> (Payload -> IO (Either String Payload)) -> PayloadProcessor
PayloadProcessor Payload -> IO Payload
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either String Payload -> IO (Either String Payload)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either String Payload -> IO (Either String Payload))
-> (Payload -> Either String Payload)
-> Payload
-> IO (Either String Payload)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Payload -> Either String Payload
forall a b. b -> Either a b
Right)
          , actEnv
HashMap Text WorkflowDefinition
HashMap Text (ActivityDefinition actEnv)
actEnv :: actEnv
wfDefs :: HashMap Text WorkflowDefinition
actDefs :: HashMap Text (ActivityDefinition actEnv)
actDefs :: HashMap Text (ActivityDefinition actEnv)
actEnv :: actEnv
wfDefs :: HashMap Text WorkflowDefinition
..
          }


data Definitions env = Definitions
  { forall env. Definitions env -> HashMap Text WorkflowDefinition
workflowDefinitions :: {-# UNPACK #-} !(HashMap Text WorkflowDefinition)
  , forall env.
Definitions env -> HashMap Text (ActivityDefinition env)
activityDefinitions :: {-# UNPACK #-} !(HashMap Text (ActivityDefinition env))
  }


instance Semigroup (Definitions env) where
  (Definitions HashMap Text WorkflowDefinition
w1 HashMap Text (ActivityDefinition env)
a1) <> :: Definitions env -> Definitions env -> Definitions env
<> (Definitions HashMap Text WorkflowDefinition
w2 HashMap Text (ActivityDefinition env)
a2) = HashMap Text WorkflowDefinition
-> HashMap Text (ActivityDefinition env) -> Definitions env
forall env.
HashMap Text WorkflowDefinition
-> HashMap Text (ActivityDefinition env) -> Definitions env
Definitions (HashMap Text WorkflowDefinition
w1 HashMap Text WorkflowDefinition
-> HashMap Text WorkflowDefinition
-> HashMap Text WorkflowDefinition
forall a. Semigroup a => a -> a -> a
<> HashMap Text WorkflowDefinition
w2) (HashMap Text (ActivityDefinition env)
a1 HashMap Text (ActivityDefinition env)
-> HashMap Text (ActivityDefinition env)
-> HashMap Text (ActivityDefinition env)
forall a. Semigroup a => a -> a -> a
<> HashMap Text (ActivityDefinition env)
a2)


instance Monoid (Definitions env) where
  mempty :: Definitions env
mempty = HashMap Text WorkflowDefinition
-> HashMap Text (ActivityDefinition env) -> Definitions env
forall env.
HashMap Text WorkflowDefinition
-> HashMap Text (ActivityDefinition env) -> Definitions env
Definitions HashMap Text WorkflowDefinition
forall a. Monoid a => a
mempty HashMap Text (ActivityDefinition env)
forall a. Monoid a => a
mempty


instance ToDefinitions env (Definitions env) where
  toDefinitions :: Definitions env -> Definitions env
toDefinitions = Definitions env -> Definitions env
forall a. a -> a
id


class ToDefinitions env f where
  toDefinitions :: f -> Definitions env


instance ToDefinitions env WorkflowDefinition where
  toDefinitions :: WorkflowDefinition -> Definitions env
toDefinitions WorkflowDefinition
wf = HashMap Text WorkflowDefinition
-> HashMap Text (ActivityDefinition env) -> Definitions env
forall env.
HashMap Text WorkflowDefinition
-> HashMap Text (ActivityDefinition env) -> Definitions env
Definitions (Text -> WorkflowDefinition -> HashMap Text WorkflowDefinition
forall k v. Hashable k => k -> v -> HashMap k v
HashMap.singleton (WorkflowDefinition -> Text
workflowName WorkflowDefinition
wf) WorkflowDefinition
wf) HashMap Text (ActivityDefinition env)
forall a. Monoid a => a
mempty


instance ToDefinitions env (ProvidedWorkflow f) where
  toDefinitions :: ProvidedWorkflow f -> Definitions env
toDefinitions ProvidedWorkflow f
wf = WorkflowDefinition -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions ProvidedWorkflow f
wf.definition


instance ToDefinitions env (ActivityDefinition env) where
  toDefinitions :: ActivityDefinition env -> Definitions env
toDefinitions ActivityDefinition env
act = HashMap Text WorkflowDefinition
-> HashMap Text (ActivityDefinition env) -> Definitions env
forall env.
HashMap Text WorkflowDefinition
-> HashMap Text (ActivityDefinition env) -> Definitions env
Definitions HashMap Text WorkflowDefinition
forall a. Monoid a => a
mempty (Text
-> ActivityDefinition env -> HashMap Text (ActivityDefinition env)
forall k v. Hashable k => k -> v -> HashMap k v
HashMap.singleton (ActivityDefinition env -> Text
forall env. ActivityDefinition env -> Text
activityName ActivityDefinition env
act) ActivityDefinition env
act)


instance ToDefinitions env (ProvidedActivity env f) where
  toDefinitions :: ProvidedActivity env f -> Definitions env
toDefinitions ProvidedActivity env f
act = ActivityDefinition env -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions ProvidedActivity env f
act.definition


instance (ToDefinitions env _1, ToDefinitions env _2) => ToDefinitions env (_1, _2) where
  toDefinitions :: (_1, _2) -> Definitions env
toDefinitions (_1
a, _2
b) = _1 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _1
a Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _2 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _2
b


instance (ToDefinitions env _1, ToDefinitions env _2, ToDefinitions env _3) => ToDefinitions env (_1, _2, _3) where
  toDefinitions :: (_1, _2, _3) -> Definitions env
toDefinitions (_1
a, _2
b, _3
c) = _1 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _1
a Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _2 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _2
b Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _3 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _3
c


instance (ToDefinitions env _1, ToDefinitions env _2, ToDefinitions env _3, ToDefinitions env _4) => ToDefinitions env (_1, _2, _3, _4) where
  toDefinitions :: (_1, _2, _3, _4) -> Definitions env
toDefinitions (_1
a, _2
b, _3
c, _4
d) = _1 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _1
a Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _2 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _2
b Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _3 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _3
c Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _4 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _4
d


instance (ToDefinitions env _1, ToDefinitions env _2, ToDefinitions env _3, ToDefinitions env _4, ToDefinitions env _5) => ToDefinitions env (_1, _2, _3, _4, _5) where
  toDefinitions :: (_1, _2, _3, _4, _5) -> Definitions env
toDefinitions (_1
a, _2
b, _3
c, _4
d, _5
e) = _1 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _1
a Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _2 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _2
b Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _3 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _3
c Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _4 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _4
d Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _5 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _5
e


instance (ToDefinitions env _1, ToDefinitions env _2, ToDefinitions env _3, ToDefinitions env _4, ToDefinitions env _5, ToDefinitions env _6) => ToDefinitions env (_1, _2, _3, _4, _5, _6) where
  toDefinitions :: (_1, _2, _3, _4, _5, _6) -> Definitions env
toDefinitions (_1
a, _2
b, _3
c, _4
d, _5
e, _6
f) = _1 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _1
a Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _2 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _2
b Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _3 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _3
c Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _4 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _4
d Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _5 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _5
e Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _6 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _6
f


addInterceptors :: Interceptors actEnv -> ConfigM actEnv ()
addInterceptors :: forall actEnv. Interceptors actEnv -> ConfigM actEnv ()
addInterceptors Interceptors actEnv
i = State (WorkerConfig actEnv) () -> ConfigM actEnv ()
forall actEnv a. State (WorkerConfig actEnv) a -> ConfigM actEnv a
ConfigM (State (WorkerConfig actEnv) () -> ConfigM actEnv ())
-> State (WorkerConfig actEnv) () -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ (WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' ((WorkerConfig actEnv -> WorkerConfig actEnv)
 -> State (WorkerConfig actEnv) ())
-> (WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig actEnv
conf ->
  WorkerConfig actEnv
conf
    { interceptorConfig = i <> interceptorConfig conf
    }


addErrorConverter :: (Exception e) => (e -> ApplicationFailure) -> ConfigM actEnv ()
addErrorConverter :: forall e actEnv.
Exception e =>
(e -> ApplicationFailure) -> ConfigM actEnv ()
addErrorConverter e -> ApplicationFailure
f = State (WorkerConfig actEnv) () -> ConfigM actEnv ()
forall actEnv a. State (WorkerConfig actEnv) a -> ConfigM actEnv a
ConfigM (State (WorkerConfig actEnv) () -> ConfigM actEnv ())
-> State (WorkerConfig actEnv) () -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ (WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' ((WorkerConfig actEnv -> WorkerConfig actEnv)
 -> State (WorkerConfig actEnv) ())
-> (WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig actEnv
conf ->
  WorkerConfig actEnv
conf
    { applicationErrorConverters = ApplicationFailureHandler f : applicationErrorConverters conf
    }


modifyCore :: (Core.WorkerConfig -> Core.WorkerConfig) -> ConfigM actEnv ()
modifyCore :: forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore WorkerConfig -> WorkerConfig
f = State (WorkerConfig actEnv) () -> ConfigM actEnv ()
forall actEnv a. State (WorkerConfig actEnv) a -> ConfigM actEnv a
ConfigM (State (WorkerConfig actEnv) () -> ConfigM actEnv ())
-> State (WorkerConfig actEnv) () -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ (WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' ((WorkerConfig actEnv -> WorkerConfig actEnv)
 -> State (WorkerConfig actEnv) ())
-> (WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig actEnv
conf ->
  WorkerConfig actEnv
conf
    { coreConfig = f (coreConfig conf)
    }


{- | The namespace that all workflows and activities will be
registered in for this worker.

If you need to register workflows and activities in multiple namespaces,
you will need to start multiple workers. Multiple workers may run in the
same process.
-}
setNamespace :: Namespace -> ConfigM actEnv ()
setNamespace :: forall actEnv. Namespace -> ConfigM actEnv ()
setNamespace (Namespace Text
ns) = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
  WorkerConfig
conf
    { Core.namespace = ns
    }


{- | A temporal worker, whether polling for workflows or activities, does so by polling a specific "Task Queue".

Any time there is a new activity task or workflow task that needs to be performed by a worker, it'll be queued in a specific task queue.
-}
setTaskQueue :: TaskQueue -> ConfigM actEnv ()
setTaskQueue :: forall actEnv. TaskQueue -> ConfigM actEnv ()
setTaskQueue (TaskQueue Text
tq) = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
  WorkerConfig
conf
    { Core.taskQueue = tq
    }


{- | A string that should be unique to the exact worker code/binary being executed.

This is used to uniquely identify the worker's code for a handful of purposes,
including the worker versioning feature if you have opted into that with useVersioning.
It will also populate the binaryChecksum field on older servers.

N.B. this is not the same as the worker's identity, which is a string that identifies
the worker instance. The identity is used to identify the worker instance in logs and
in the Temporal UI. The buildId is used to identify the exact version of the code and
its dependencies. In e.g. Nix, the executable path in the Nix store would be a useful
buildId.
-}
setBuildId :: Text -> ConfigM actEnv ()
setBuildId :: forall actEnv. Text -> ConfigM actEnv ()
setBuildId Text
bid = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
  WorkerConfig
conf
    { Core.buildId = bid
    }


setIdentity :: Text -> ConfigM actEnv ()
setIdentity :: forall actEnv. Text -> ConfigM actEnv ()
setIdentity Text
ident = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
  WorkerConfig
conf
    { Core.identityOverride = Just ident
    }


setMaxCachedWorkflows :: Word64 -> ConfigM actEnv ()
setMaxCachedWorkflows :: forall actEnv. Word64 -> ConfigM actEnv ()
setMaxCachedWorkflows Word64
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
  WorkerConfig
conf
    { Core.maxCachedWorkflows = n
    }


setMaxOutstandingWorkflowTasks :: Word64 -> ConfigM actEnv ()
setMaxOutstandingWorkflowTasks :: forall actEnv. Word64 -> ConfigM actEnv ()
setMaxOutstandingWorkflowTasks Word64
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
  WorkerConfig
conf
    { Core.maxOutstandingWorkflowTasks = n
    }


setMaxOutstandingActivities :: Word64 -> ConfigM actEnv ()
setMaxOutstandingActivities :: forall actEnv. Word64 -> ConfigM actEnv ()
setMaxOutstandingActivities Word64
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
  WorkerConfig
conf
    { Core.maxOutstandingActivities = n
    }


setMaxOutstandingLocalActivities :: Word64 -> ConfigM actEnv ()
setMaxOutstandingLocalActivities :: forall actEnv. Word64 -> ConfigM actEnv ()
setMaxOutstandingLocalActivities Word64
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
  WorkerConfig
conf
    { Core.maxOutstandingLocalActivities = n
    }


setMaxConcurrentWorkflowTaskPolls :: Word64 -> ConfigM actEnv ()
setMaxConcurrentWorkflowTaskPolls :: forall actEnv. Word64 -> ConfigM actEnv ()
setMaxConcurrentWorkflowTaskPolls Word64
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
  WorkerConfig
conf
    { Core.maxConcurrentWorkflowTaskPolls = n
    }


setNonstickyToStickyPollRatio :: Float -> ConfigM actEnv ()
setNonstickyToStickyPollRatio :: forall actEnv. Float -> ConfigM actEnv ()
setNonstickyToStickyPollRatio Float
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
  WorkerConfig
conf
    { Core.nonstickyToStickyPollRatio = n
    }


{- | Maximum number of Activity tasks to poll concurrently.

Increase this setting if your Worker is failing to fill in all
of its maxConcurrentActivityTaskExecutions slots despite a
backlog of Activity Tasks in the Task Queue (ie. due to network latency).
Can't be higher than maxConcurrentActivityTaskExecutions.
-}
setMaxConcurrentActivityTaskPolls :: Word64 -> ConfigM actEnv ()
setMaxConcurrentActivityTaskPolls :: forall actEnv. Word64 -> ConfigM actEnv ()
setMaxConcurrentActivityTaskPolls Word64
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
  WorkerConfig
conf
    { Core.maxConcurrentActivityTaskPolls = n
    }


setNoRemoteActivities :: Bool -> ConfigM actEnv ()
setNoRemoteActivities :: forall actEnv. Bool -> ConfigM actEnv ()
setNoRemoteActivities Bool
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
  WorkerConfig
conf
    { Core.noRemoteActivities = n
    }


-- | How long a workflow task is allowed to sit on the sticky queue before it is timed out and moved to the non-sticky queue where it may be picked up by any worker.
setStickyQueueScheduleToStartTimeoutMillis :: Word64 -> ConfigM actEnv ()
setStickyQueueScheduleToStartTimeoutMillis :: forall actEnv. Word64 -> ConfigM actEnv ()
setStickyQueueScheduleToStartTimeoutMillis Word64
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
  WorkerConfig
conf
    { Core.stickyQueueScheduleToStartTimeoutMillis = n
    }


-- | Longest interval for throttling activity heartbeats
setMaxHeartbeatThrottleIntervalMillis :: Word64 -> ConfigM actEnv ()
setMaxHeartbeatThrottleIntervalMillis :: forall actEnv. Word64 -> ConfigM actEnv ()
setMaxHeartbeatThrottleIntervalMillis Word64
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
  WorkerConfig
conf
    { Core.maxHeartbeatThrottleIntervalMillis = n
    }


-- Default interval for throttling activity heartbeats in case
-- ActivityOptions.heartbeat_timeout is unset.
-- When the timeout is set in the ActivityOptions,
-- throttling is set to @heartbeat_timeout * 0.8@.
setDefaultHeartbeatThrottleIntervalMillis :: Word64 -> ConfigM actEnv ()
setDefaultHeartbeatThrottleIntervalMillis :: forall actEnv. Word64 -> ConfigM actEnv ()
setDefaultHeartbeatThrottleIntervalMillis Word64
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
  WorkerConfig
conf
    { Core.defaultHeartbeatThrottleIntervalMillis = n
    }


{- | Limits the number of Activities per second that this Worker will process.
(Does not limit the number of Local Activities.) The Worker will not poll for
new Activities if by doing so it might receive and execute an Activity which
would cause it to exceed this limit. Must be a positive number.

If unset, no rate limiting will be applied to Worker's Activities. (tctl task-queue describe will display the absence of a limit as 100,000.)
-}
setMaxActivitiesPerSecond :: Double -> ConfigM actEnv ()
setMaxActivitiesPerSecond :: forall actEnv. Double -> ConfigM actEnv ()
setMaxActivitiesPerSecond Double
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
  WorkerConfig
conf
    { Core.maxActivitiesPerSecond = Just n
    }


{- | Sets the maximum number of activities per second the task queue will dispatch, controlled server-side. Note that this only takes effect upon an activity poll request.
If multiple workers on the same queue have different values set, they will thrash with the last poller winning.

If unset, no rate limiting will be applied to the task queue.
-}
setMaxTaskQueueActivitiesPerSecond :: Double -> ConfigM actEnv ()
setMaxTaskQueueActivitiesPerSecond :: forall actEnv. Double -> ConfigM actEnv ()
setMaxTaskQueueActivitiesPerSecond Double
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
  WorkerConfig
conf
    { Core.maxTaskQueueActivitiesPerSecond = Just n
    }


{- | Time to wait for pending tasks to drain after shutdown was requested.

In-flight activities will be cancelled after this period and their current
attempt will be resolved as failed if they confirm cancellation
(by throwing a CancelledFailure or AbortError).
-}
setGracefulShutdownPeriodMillis :: Word64 -> ConfigM actEnv ()
setGracefulShutdownPeriodMillis :: forall actEnv. Word64 -> ConfigM actEnv ()
setGracefulShutdownPeriodMillis Word64
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
  WorkerConfig
conf
    { Core.gracefulShutdownPeriodMillis = n
    }


-- | Set the logger for the worker. This is used to log messages from the worker.
setLogger :: (Loc -> LogSource -> Control.Monad.Logger.LogLevel -> LogStr -> IO ()) -> ConfigM actEnv ()
setLogger :: forall actEnv.
(Loc -> Text -> LogLevel -> LogStr -> IO ()) -> ConfigM actEnv ()
setLogger Loc -> Text -> LogLevel -> LogStr -> IO ()
f = State (WorkerConfig actEnv) () -> ConfigM actEnv ()
forall actEnv a. State (WorkerConfig actEnv) a -> ConfigM actEnv a
ConfigM (State (WorkerConfig actEnv) () -> ConfigM actEnv ())
-> State (WorkerConfig actEnv) () -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ (WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' ((WorkerConfig actEnv -> WorkerConfig actEnv)
 -> State (WorkerConfig actEnv) ())
-> (WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig actEnv
conf ->
  WorkerConfig actEnv
conf
    { logger = f
    }


-- | Set the tracer provider for the worker. This is used to trace the worker's internal activity.
setTracerProvider :: TracerProvider -> ConfigM actEnv ()
setTracerProvider :: forall actEnv. TracerProvider -> ConfigM actEnv ()
setTracerProvider TracerProvider
tp = State (WorkerConfig actEnv) () -> ConfigM actEnv ()
forall actEnv a. State (WorkerConfig actEnv) a -> ConfigM actEnv a
ConfigM (State (WorkerConfig actEnv) () -> ConfigM actEnv ())
-> State (WorkerConfig actEnv) () -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ (WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' ((WorkerConfig actEnv -> WorkerConfig actEnv)
 -> State (WorkerConfig actEnv) ())
-> (WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig actEnv
conf ->
  WorkerConfig actEnv
conf
    { tracerProvider = tp
    }


setPayloadProcessor :: PayloadProcessor -> ConfigM actEnv ()
setPayloadProcessor :: forall actEnv. PayloadProcessor -> ConfigM actEnv ()
setPayloadProcessor PayloadProcessor
p = State (WorkerConfig actEnv) () -> ConfigM actEnv ()
forall actEnv a. State (WorkerConfig actEnv) a -> ConfigM actEnv a
ConfigM (State (WorkerConfig actEnv) () -> ConfigM actEnv ())
-> State (WorkerConfig actEnv) () -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ (WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' ((WorkerConfig actEnv -> WorkerConfig actEnv)
 -> State (WorkerConfig actEnv) ())
-> (WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig actEnv
conf ->
  WorkerConfig actEnv
conf
    { payloadProcessor = p
    }


------------------------------------------------------------------------------------

{- | A Worker is responsible for polling a Task Queue, dequeueing a Task, executing
your code in response to a Task, and responding to the Temporal Cluster with the results.

Worker Processes are external to a Temporal Cluster. Temporal Application developers are
responsible for developing Worker Programs and operating Worker Processes. Said another way,
the Temporal Cluster (including the Temporal Cloud) doesn't execute any of your code
(Workflow and Activity Definitions) on Temporal Cluster machines. The Cluster is solely
responsible for orchestrating State Transitions and providing Tasks to the next available
Worker Entity.

A Worker Process can be both a Workflow Worker Process and an Activity Worker Process.
Haskell the ability to have multiple Worker Entities in a single Worker Process.

A single Worker Entity can listen to only a single Task Queue. But if a Worker Process has multiple Worker Entities, the Worker Process could be listening to multiple Task Queues.
-}
data Worker env = forall ty.
  Core.KnownWorkerType ty =>
  Worker
  { ()
workerType :: !(Core.SWorkerType ty)
  , forall env. Worker env -> Async ()
workerWorkflowLoop :: !(Async ())
  , ()
workerActivityLoop :: !(InactiveForReplay ty (Async ()))
  , ()
workerActivityWorker :: !(InactiveForReplay ty (Activity.ActivityWorker env))
  , ()
workerCore :: !(Core.Worker ty)
  , forall env. Worker env -> Tracer
workerTracer :: !Tracer
  , forall env. Worker env -> TChan EvictionWithRunID
workerEvictionEmitter :: !(TChan Workflow.EvictionWithRunID)
  }


startReplayWorker :: (MonadUnliftIO m, MonadCatch m) => Runtime -> WorkerConfig actEnv -> m (Temporal.Worker.Worker actEnv, Core.HistoryPusher)
startReplayWorker :: forall (m :: * -> *) actEnv.
(MonadUnliftIO m, MonadCatch m) =>
Runtime -> WorkerConfig actEnv -> m (Worker actEnv, HistoryPusher)
startReplayWorker Runtime
rt WorkerConfig actEnv
conf = (RequireCallStackImpl => m (Worker actEnv, HistoryPusher))
-> m (Worker actEnv, HistoryPusher)
forall r. (RequireCallStackImpl => r) -> r
provideCallStack ((RequireCallStackImpl => m (Worker actEnv, HistoryPusher))
 -> m (Worker actEnv, HistoryPusher))
-> (RequireCallStackImpl => m (Worker actEnv, HistoryPusher))
-> m (Worker actEnv, HistoryPusher)
forall a b. (a -> b) -> a -> b
$ WorkerConfig actEnv
-> WorkerContextM m (Worker actEnv, HistoryPusher)
-> m (Worker actEnv, HistoryPusher)
forall conf (m :: * -> *) a.
WorkerConfig conf -> WorkerContextM m a -> m a
runWorkerContext WorkerConfig actEnv
conf (WorkerContextM m (Worker actEnv, HistoryPusher)
 -> m (Worker actEnv, HistoryPusher))
-> WorkerContextM m (Worker actEnv, HistoryPusher)
-> m (Worker actEnv, HistoryPusher)
forall a b. (a -> b) -> a -> b
$ do
  $(logDebug) Text
"Starting worker"
  let coreConfig' :: WorkerConfig
coreConfig' = WorkerConfig actEnv
conf.coreConfig {Core.nondeterminismAsWorkflowFail = True}
  (workerCore, replay) <- (WorkerError -> WorkerContextM m (Worker 'Replay, HistoryPusher))
-> ((Worker 'Replay, HistoryPusher)
    -> WorkerContextM m (Worker 'Replay, HistoryPusher))
-> Either WorkerError (Worker 'Replay, HistoryPusher)
-> WorkerContextM m (Worker 'Replay, HistoryPusher)
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either WorkerError -> WorkerContextM m (Worker 'Replay, HistoryPusher)
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (Worker 'Replay, HistoryPusher)
-> WorkerContextM m (Worker 'Replay, HistoryPusher)
forall a. a -> WorkerContextM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either WorkerError (Worker 'Replay, HistoryPusher)
 -> WorkerContextM m (Worker 'Replay, HistoryPusher))
-> WorkerContextM
     m (Either WorkerError (Worker 'Replay, HistoryPusher))
-> WorkerContextM m (Worker 'Replay, HistoryPusher)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO (Either WorkerError (Worker 'Replay, HistoryPusher))
-> WorkerContextM
     m (Either WorkerError (Worker 'Replay, HistoryPusher))
forall a. IO a -> WorkerContextM m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Runtime
-> WorkerConfig
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher))
Core.newReplayWorker Runtime
rt WorkerConfig
coreConfig')
  $(logDebug) "Instantiated core"
  workerEvictionEmitter <- newBroadcastTChanIO
  runningWorkflows <- newTVarIO mempty
  uuid <- liftIO nextRandom
  let workerWorkflowFunctions = WorkerConfig actEnv
conf.wfDefs
      workerTaskQueue = Text -> TaskQueue
TaskQueue (WorkerConfig -> Text
Core.taskQueue WorkerConfig actEnv
conf.coreConfig Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"-" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> UUID -> Text
UUID.toText UUID
uuid)
      workerInboundInterceptors = WorkerConfig actEnv
conf.interceptorConfig.workflowInboundInterceptors
      workerOutboundInterceptors = WorkerConfig actEnv
conf.interceptorConfig.workflowOutboundInterceptors
      workerDeadlockTimeout = WorkerConfig actEnv
conf.deadlockTimeout
      workerClient = ()
      workerErrorConverters = WorkerConfig actEnv
conf.applicationErrorConverters
      processor = WorkerConfig actEnv
conf.payloadProcessor
      workflowWorker = Workflow.WorkflowWorker {[ApplicationFailureHandler]
Maybe Int
()
HashMap Text WorkflowDefinition
TVar (HashMap RunId WorkflowInstance)
TChan EvictionWithRunID
Worker 'Replay
InactiveForReplay 'Replay Client
PayloadProcessor
TaskQueue
WorkflowOutboundInterceptor
WorkflowInboundInterceptor
workerCore :: Worker 'Replay
workerEvictionEmitter :: TChan EvictionWithRunID
runningWorkflows :: TVar (HashMap RunId WorkflowInstance)
workerWorkflowFunctions :: HashMap Text WorkflowDefinition
workerTaskQueue :: TaskQueue
workerInboundInterceptors :: WorkflowInboundInterceptor
workerOutboundInterceptors :: WorkflowOutboundInterceptor
workerDeadlockTimeout :: Maybe Int
workerClient :: ()
workerErrorConverters :: [ApplicationFailureHandler]
processor :: PayloadProcessor
workerEvictionEmitter :: TChan EvictionWithRunID
processor :: PayloadProcessor
workerErrorConverters :: [ApplicationFailureHandler]
workerTaskQueue :: TaskQueue
workerDeadlockTimeout :: Maybe Int
workerOutboundInterceptors :: WorkflowOutboundInterceptor
workerInboundInterceptors :: WorkflowInboundInterceptor
workerCore :: Worker 'Replay
workerClient :: InactiveForReplay 'Replay Client
runningWorkflows :: TVar (HashMap RunId WorkflowInstance)
workerWorkflowFunctions :: HashMap Text WorkflowDefinition
..}
      workerActivityWorker = ()
      workerActivityLoop = ()
      workerType = SWorkerType 'Replay
Core.SReplay
      workerTracer = TracerProvider -> InstrumentationLibrary -> TracerOptions -> Tracer
makeTracer WorkerConfig actEnv
conf.tracerProvider (Text -> Text -> InstrumentationLibrary
InstrumentationLibrary Text
"hs-temporal-sdk" Text
"0.0.1.0") TracerOptions
tracerOptions
  workerWorkflowLoop <- asyncLabelled (T.unpack $ T.concat ["temporal/worker/workflow/", Core.namespace conf.coreConfig, "/", Core.taskQueue conf.coreConfig]) $ do
    $(logDebug) "Starting workflow worker loop"
    Workflow.execute workflowWorker
      `UnliftIO.finally` $(logDebug) "Exiting workflow worker loop"
  pure (Temporal.Worker.Worker {..}, replay)


data ReplayHistoryFailure = ReplayHistoryFailure
  { ReplayHistoryFailure -> Text
message :: Text
  }
  deriving stock (Int -> ReplayHistoryFailure -> ShowS
[ReplayHistoryFailure] -> ShowS
ReplayHistoryFailure -> String
(Int -> ReplayHistoryFailure -> ShowS)
-> (ReplayHistoryFailure -> String)
-> ([ReplayHistoryFailure] -> ShowS)
-> Show ReplayHistoryFailure
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ReplayHistoryFailure -> ShowS
showsPrec :: Int -> ReplayHistoryFailure -> ShowS
$cshow :: ReplayHistoryFailure -> String
show :: ReplayHistoryFailure -> String
$cshowList :: [ReplayHistoryFailure] -> ShowS
showList :: [ReplayHistoryFailure] -> ShowS
Show, ReplayHistoryFailure -> ReplayHistoryFailure -> Bool
(ReplayHistoryFailure -> ReplayHistoryFailure -> Bool)
-> (ReplayHistoryFailure -> ReplayHistoryFailure -> Bool)
-> Eq ReplayHistoryFailure
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ReplayHistoryFailure -> ReplayHistoryFailure -> Bool
== :: ReplayHistoryFailure -> ReplayHistoryFailure -> Bool
$c/= :: ReplayHistoryFailure -> ReplayHistoryFailure -> Bool
/= :: ReplayHistoryFailure -> ReplayHistoryFailure -> Bool
Eq)
  deriving anyclass (Show ReplayHistoryFailure
Typeable ReplayHistoryFailure
(Typeable ReplayHistoryFailure, Show ReplayHistoryFailure) =>
(ReplayHistoryFailure -> SomeException)
-> (SomeException -> Maybe ReplayHistoryFailure)
-> (ReplayHistoryFailure -> String)
-> (ReplayHistoryFailure -> Bool)
-> Exception ReplayHistoryFailure
SomeException -> Maybe ReplayHistoryFailure
ReplayHistoryFailure -> Bool
ReplayHistoryFailure -> String
ReplayHistoryFailure -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e)
-> (e -> String)
-> (e -> Bool)
-> Exception e
$ctoException :: ReplayHistoryFailure -> SomeException
toException :: ReplayHistoryFailure -> SomeException
$cfromException :: SomeException -> Maybe ReplayHistoryFailure
fromException :: SomeException -> Maybe ReplayHistoryFailure
$cdisplayException :: ReplayHistoryFailure -> String
displayException :: ReplayHistoryFailure -> String
$cbacktraceDesired :: ReplayHistoryFailure -> Bool
backtraceDesired :: ReplayHistoryFailure -> Bool
Exception)


{- | Run a worker in replay mode, feeding a history to the worker in order to ensure that
the worker execution still works as expected.
-}
runReplayHistory :: (MonadUnliftIO m, MonadCatch m) => Runtime -> WorkerConfig actEnv -> History -> m (Either ReplayHistoryFailure ())
runReplayHistory :: forall (m :: * -> *) actEnv.
(MonadUnliftIO m, MonadCatch m) =>
Runtime
-> WorkerConfig actEnv
-> History
-> m (Either ReplayHistoryFailure ())
runReplayHistory Runtime
rt WorkerConfig actEnv
conf History
history = WorkerConfig actEnv
-> WorkerContextM m (Either ReplayHistoryFailure ())
-> m (Either ReplayHistoryFailure ())
forall conf (m :: * -> *) a.
WorkerConfig conf -> WorkerContextM m a -> m a
runWorkerContext WorkerConfig actEnv
conf (WorkerContextM m (Either ReplayHistoryFailure ())
 -> m (Either ReplayHistoryFailure ()))
-> WorkerContextM m (Either ReplayHistoryFailure ())
-> m (Either ReplayHistoryFailure ())
forall a b. (a -> b) -> a -> b
$ WorkerContextM m (Worker actEnv, HistoryPusher)
-> ((Worker actEnv, HistoryPusher) -> WorkerContextM m ())
-> ((Worker actEnv, HistoryPusher)
    -> WorkerContextM m (Either ReplayHistoryFailure ()))
-> WorkerContextM m (Either ReplayHistoryFailure ())
forall (m :: * -> *) a b c.
MonadUnliftIO m =>
m a -> (a -> m b) -> (a -> m c) -> m c
UnliftIO.bracket (Runtime
-> WorkerConfig actEnv
-> WorkerContextM m (Worker actEnv, HistoryPusher)
forall (m :: * -> *) actEnv.
(MonadUnliftIO m, MonadCatch m) =>
Runtime -> WorkerConfig actEnv -> m (Worker actEnv, HistoryPusher)
startReplayWorker Runtime
rt WorkerConfig actEnv
conf) (\(Worker actEnv
worker, HistoryPusher
pusher) -> IO () -> WorkerContextM m ()
forall a. IO a -> WorkerContextM m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (HistoryPusher -> IO ()
Core.closeHistory HistoryPusher
pusher) WorkerContextM m () -> WorkerContextM m () -> WorkerContextM m ()
forall a b.
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Worker actEnv -> WorkerContextM m ()
forall (m :: * -> *) actEnv.
MonadUnliftIO m =>
Worker actEnv -> m ()
shutdown Worker actEnv
worker) (((Worker actEnv, HistoryPusher)
  -> WorkerContextM m (Either ReplayHistoryFailure ()))
 -> WorkerContextM m (Either ReplayHistoryFailure ()))
-> ((Worker actEnv, HistoryPusher)
    -> WorkerContextM m (Either ReplayHistoryFailure ()))
-> WorkerContextM m (Either ReplayHistoryFailure ())
forall a b. (a -> b) -> a -> b
$ \(Worker actEnv
worker, HistoryPusher
pusher) -> do
  evictions <- Worker actEnv -> WorkerContextM m (TChan EvictionWithRunID)
forall (m :: * -> *) actEnv.
MonadIO m =>
Worker actEnv -> m (TChan EvictionWithRunID)
subscribeToEvictions Worker actEnv
worker
  let mWfId = History
history History -> Fold History History ByteString Any -> Maybe ByteString
forall s t a b. s -> Fold s t a b -> Maybe a
^? LensLike' f History (Vector HistoryEvent)
forall (f :: * -> *) s a.
(Functor f, HasField s "vec'events" a) =>
LensLike' f s a
vec'events LensLike' f History (Vector HistoryEvent)
-> ((ByteString -> f Any)
    -> Vector HistoryEvent -> f (Vector HistoryEvent))
-> (ByteString -> f Any)
-> History
-> f History
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (HistoryEvent -> f HistoryEvent)
-> Vector HistoryEvent -> f (Vector HistoryEvent)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Vector a -> f (Vector b)
traverse ((HistoryEvent -> f HistoryEvent)
 -> Vector HistoryEvent -> f (Vector HistoryEvent))
-> ((ByteString -> f Any) -> HistoryEvent -> f HistoryEvent)
-> (ByteString -> f Any)
-> Vector HistoryEvent
-> f (Vector HistoryEvent)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LensLike'
  f HistoryEvent (Maybe WorkflowExecutionStartedEventAttributes)
forall (f :: * -> *) s a.
(Functor f,
 HasField s "maybe'workflowExecutionStartedEventAttributes" a) =>
LensLike' f s a
maybe'workflowExecutionStartedEventAttributes LensLike'
  f HistoryEvent (Maybe WorkflowExecutionStartedEventAttributes)
-> ((ByteString -> f Any)
    -> Maybe WorkflowExecutionStartedEventAttributes
    -> f (Maybe WorkflowExecutionStartedEventAttributes))
-> (ByteString -> f Any)
-> HistoryEvent
-> f HistoryEvent
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (WorkflowExecutionStartedEventAttributes
 -> f WorkflowExecutionStartedEventAttributes)
-> Maybe WorkflowExecutionStartedEventAttributes
-> f (Maybe WorkflowExecutionStartedEventAttributes)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Maybe a -> f (Maybe b)
traverse ((WorkflowExecutionStartedEventAttributes
  -> f WorkflowExecutionStartedEventAttributes)
 -> Maybe WorkflowExecutionStartedEventAttributes
 -> f (Maybe WorkflowExecutionStartedEventAttributes))
-> ((ByteString -> f Any)
    -> WorkflowExecutionStartedEventAttributes
    -> f WorkflowExecutionStartedEventAttributes)
-> (ByteString -> f Any)
-> Maybe WorkflowExecutionStartedEventAttributes
-> f (Maybe WorkflowExecutionStartedEventAttributes)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LensLike' f WorkflowExecutionStartedEventAttributes Text
forall (f :: * -> *) s a.
(Functor f, HasField s "workflowId" a) =>
LensLike' f s a
workflowId LensLike' f WorkflowExecutionStartedEventAttributes Text
-> ((ByteString -> f Any) -> Text -> f Text)
-> (ByteString -> f Any)
-> WorkflowExecutionStartedEventAttributes
-> f WorkflowExecutionStartedEventAttributes
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Text -> ByteString) -> Getter Text Text ByteString Any
forall s a t b. (s -> a) -> Getter s t a b
to Text -> ByteString
T.encodeUtf8
  wfId <- maybe (throwIO $ userError "No workflow ID found in history") pure mWfId
  $(logDebug) $ "Pushing history for workflow ID " <> T.pack (show wfId)
  $(logDebug) $ T.pack $ show history
  res <- liftIO $ Core.pushHistory pusher wfId (Left $ encodeMessage history)
  $(logDebug) $ "Pushed history for workflow ID " <> T.pack (show wfId)
  case res of
    Left WorkerError
e -> Either ReplayHistoryFailure ()
-> WorkerContextM m (Either ReplayHistoryFailure ())
forall a. a -> WorkerContextM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either ReplayHistoryFailure ()
 -> WorkerContextM m (Either ReplayHistoryFailure ()))
-> Either ReplayHistoryFailure ()
-> WorkerContextM m (Either ReplayHistoryFailure ())
forall a b. (a -> b) -> a -> b
$ ReplayHistoryFailure -> Either ReplayHistoryFailure ()
forall a b. a -> Either a b
Left (ReplayHistoryFailure -> Either ReplayHistoryFailure ())
-> ReplayHistoryFailure -> Either ReplayHistoryFailure ()
forall a b. (a -> b) -> a -> b
$ ReplayHistoryFailure {message :: Text
message = WorkerError
e.message}
    Right () -> do
      res <- STM EvictionWithRunID -> WorkerContextM m EvictionWithRunID
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM EvictionWithRunID -> WorkerContextM m EvictionWithRunID)
-> STM EvictionWithRunID -> WorkerContextM m EvictionWithRunID
forall a b. (a -> b) -> a -> b
$ TChan EvictionWithRunID -> STM EvictionWithRunID
forall a. TChan a -> STM a
readTChan TChan EvictionWithRunID
evictions
      if evictionWasFatal res
        then pure $ Left $ ReplayHistoryFailure {message = evictionMessage res}
        else pure $ Right ()


traced :: WorkerConfig env -> ReaderT Tracer m a -> m a
traced :: forall env (m :: * -> *) a.
WorkerConfig env -> ReaderT Tracer m a -> m a
traced WorkerConfig env
conf ReaderT Tracer m a
m =
  ReaderT Tracer m a -> Tracer -> m a
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT ReaderT Tracer m a
m (Tracer -> m a) -> Tracer -> m a
forall a b. (a -> b) -> a -> b
$
    TracerProvider -> InstrumentationLibrary -> TracerOptions -> Tracer
makeTracer
      WorkerConfig env
conf.tracerProvider
      (Text -> Text -> InstrumentationLibrary
InstrumentationLibrary Text
"hs-temporal-sdk" Text
"0.0.1.0")
      TracerOptions
tracerOptions


runWorkerContext :: WorkerConfig conf -> WorkerContextM m a -> m a
runWorkerContext :: forall conf (m :: * -> *) a.
WorkerConfig conf -> WorkerContextM m a -> m a
runWorkerContext WorkerConfig conf
conf (WorkerContextM ReaderT Tracer (LoggingT m) a
m) = (LoggingT m a
 -> (Loc -> Text -> LogLevel -> LogStr -> IO ()) -> m a)
-> (Loc -> Text -> LogLevel -> LogStr -> IO ())
-> LoggingT m a
-> m a
forall a b c. (a -> b -> c) -> b -> a -> c
flip LoggingT m a -> (Loc -> Text -> LogLevel -> LogStr -> IO ()) -> m a
forall (m :: * -> *) a.
LoggingT m a -> (Loc -> Text -> LogLevel -> LogStr -> IO ()) -> m a
runLoggingT WorkerConfig conf
conf.logger (LoggingT m a -> m a) -> LoggingT m a -> m a
forall a b. (a -> b) -> a -> b
$ WorkerConfig conf -> ReaderT Tracer (LoggingT m) a -> LoggingT m a
forall env (m :: * -> *) a.
WorkerConfig env -> ReaderT Tracer m a -> m a
traced WorkerConfig conf
conf ReaderT Tracer (LoggingT m) a
m


newtype WorkerContextM m a = WorkerContextM (ReaderT Tracer (LoggingT m) a)
  deriving newtype ((forall a b. (a -> b) -> WorkerContextM m a -> WorkerContextM m b)
-> (forall a b. a -> WorkerContextM m b -> WorkerContextM m a)
-> Functor (WorkerContextM m)
forall a b. a -> WorkerContextM m b -> WorkerContextM m a
forall a b. (a -> b) -> WorkerContextM m a -> WorkerContextM m b
forall (m :: * -> *) a b.
Functor m =>
a -> WorkerContextM m b -> WorkerContextM m a
forall (m :: * -> *) a b.
Functor m =>
(a -> b) -> WorkerContextM m a -> WorkerContextM m b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
$cfmap :: forall (m :: * -> *) a b.
Functor m =>
(a -> b) -> WorkerContextM m a -> WorkerContextM m b
fmap :: forall a b. (a -> b) -> WorkerContextM m a -> WorkerContextM m b
$c<$ :: forall (m :: * -> *) a b.
Functor m =>
a -> WorkerContextM m b -> WorkerContextM m a
<$ :: forall a b. a -> WorkerContextM m b -> WorkerContextM m a
Functor, Functor (WorkerContextM m)
Functor (WorkerContextM m) =>
(forall a. a -> WorkerContextM m a)
-> (forall a b.
    WorkerContextM m (a -> b)
    -> WorkerContextM m a -> WorkerContextM m b)
-> (forall a b c.
    (a -> b -> c)
    -> WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m c)
-> (forall a b.
    WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m b)
-> (forall a b.
    WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m a)
-> Applicative (WorkerContextM m)
forall a. a -> WorkerContextM m a
forall a b.
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m a
forall a b.
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m b
forall a b.
WorkerContextM m (a -> b)
-> WorkerContextM m a -> WorkerContextM m b
forall a b c.
(a -> b -> c)
-> WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m c
forall (f :: * -> *).
Functor f =>
(forall a. a -> f a)
-> (forall a b. f (a -> b) -> f a -> f b)
-> (forall a b c. (a -> b -> c) -> f a -> f b -> f c)
-> (forall a b. f a -> f b -> f b)
-> (forall a b. f a -> f b -> f a)
-> Applicative f
forall (m :: * -> *). Applicative m => Functor (WorkerContextM m)
forall (m :: * -> *) a. Applicative m => a -> WorkerContextM m a
forall (m :: * -> *) a b.
Applicative m =>
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m a
forall (m :: * -> *) a b.
Applicative m =>
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m b
forall (m :: * -> *) a b.
Applicative m =>
WorkerContextM m (a -> b)
-> WorkerContextM m a -> WorkerContextM m b
forall (m :: * -> *) a b c.
Applicative m =>
(a -> b -> c)
-> WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m c
$cpure :: forall (m :: * -> *) a. Applicative m => a -> WorkerContextM m a
pure :: forall a. a -> WorkerContextM m a
$c<*> :: forall (m :: * -> *) a b.
Applicative m =>
WorkerContextM m (a -> b)
-> WorkerContextM m a -> WorkerContextM m b
<*> :: forall a b.
WorkerContextM m (a -> b)
-> WorkerContextM m a -> WorkerContextM m b
$cliftA2 :: forall (m :: * -> *) a b c.
Applicative m =>
(a -> b -> c)
-> WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m c
liftA2 :: forall a b c.
(a -> b -> c)
-> WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m c
$c*> :: forall (m :: * -> *) a b.
Applicative m =>
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m b
*> :: forall a b.
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m b
$c<* :: forall (m :: * -> *) a b.
Applicative m =>
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m a
<* :: forall a b.
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m a
Applicative, Applicative (WorkerContextM m)
Applicative (WorkerContextM m) =>
(forall a b.
 WorkerContextM m a
 -> (a -> WorkerContextM m b) -> WorkerContextM m b)
-> (forall a b.
    WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m b)
-> (forall a. a -> WorkerContextM m a)
-> Monad (WorkerContextM m)
forall a. a -> WorkerContextM m a
forall a b.
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m b
forall a b.
WorkerContextM m a
-> (a -> WorkerContextM m b) -> WorkerContextM m b
forall (m :: * -> *). Monad m => Applicative (WorkerContextM m)
forall (m :: * -> *) a. Monad m => a -> WorkerContextM m a
forall (m :: * -> *) a b.
Monad m =>
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m b
forall (m :: * -> *) a b.
Monad m =>
WorkerContextM m a
-> (a -> WorkerContextM m b) -> WorkerContextM m b
forall (m :: * -> *).
Applicative m =>
(forall a b. m a -> (a -> m b) -> m b)
-> (forall a b. m a -> m b -> m b)
-> (forall a. a -> m a)
-> Monad m
$c>>= :: forall (m :: * -> *) a b.
Monad m =>
WorkerContextM m a
-> (a -> WorkerContextM m b) -> WorkerContextM m b
>>= :: forall a b.
WorkerContextM m a
-> (a -> WorkerContextM m b) -> WorkerContextM m b
$c>> :: forall (m :: * -> *) a b.
Monad m =>
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m b
>> :: forall a b.
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m b
$creturn :: forall (m :: * -> *) a. Monad m => a -> WorkerContextM m a
return :: forall a. a -> WorkerContextM m a
Monad, Monad (WorkerContextM m)
Monad (WorkerContextM m) =>
(forall a. IO a -> WorkerContextM m a)
-> MonadIO (WorkerContextM m)
forall a. IO a -> WorkerContextM m a
forall (m :: * -> *).
Monad m =>
(forall a. IO a -> m a) -> MonadIO m
forall (m :: * -> *). MonadIO m => Monad (WorkerContextM m)
forall (m :: * -> *) a. MonadIO m => IO a -> WorkerContextM m a
$cliftIO :: forall (m :: * -> *) a. MonadIO m => IO a -> WorkerContextM m a
liftIO :: forall a. IO a -> WorkerContextM m a
MonadIO, MonadIO (WorkerContextM m)
MonadIO (WorkerContextM m) =>
(forall b.
 ((forall a. WorkerContextM m a -> IO a) -> IO b)
 -> WorkerContextM m b)
-> MonadUnliftIO (WorkerContextM m)
forall b.
((forall a. WorkerContextM m a -> IO a) -> IO b)
-> WorkerContextM m b
forall (m :: * -> *).
MonadIO m =>
(forall b. ((forall a. m a -> IO a) -> IO b) -> m b)
-> MonadUnliftIO m
forall (m :: * -> *). MonadUnliftIO m => MonadIO (WorkerContextM m)
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. WorkerContextM m a -> IO a) -> IO b)
-> WorkerContextM m b
$cwithRunInIO :: forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. WorkerContextM m a -> IO a) -> IO b)
-> WorkerContextM m b
withRunInIO :: forall b.
((forall a. WorkerContextM m a -> IO a) -> IO b)
-> WorkerContextM m b
MonadUnliftIO, Monad (WorkerContextM m)
Monad (WorkerContextM m) =>
(forall msg.
 ToLogStr msg =>
 Loc -> Text -> LogLevel -> msg -> WorkerContextM m ())
-> MonadLogger (WorkerContextM m)
forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> WorkerContextM m ()
forall (m :: * -> *).
Monad m =>
(forall msg.
 ToLogStr msg =>
 Loc -> Text -> LogLevel -> msg -> m ())
-> MonadLogger m
forall (m :: * -> *). MonadIO m => Monad (WorkerContextM m)
forall (m :: * -> *) msg.
(MonadIO m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> WorkerContextM m ()
$cmonadLoggerLog :: forall (m :: * -> *) msg.
(MonadIO m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> WorkerContextM m ()
monadLoggerLog :: forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> WorkerContextM m ()
MonadLogger, MonadIO (WorkerContextM m)
MonadLogger (WorkerContextM m)
WorkerContextM m (Loc -> Text -> LogLevel -> LogStr -> IO ())
(MonadLogger (WorkerContextM m), MonadIO (WorkerContextM m)) =>
WorkerContextM m (Loc -> Text -> LogLevel -> LogStr -> IO ())
-> MonadLoggerIO (WorkerContextM m)
forall (m :: * -> *). MonadIO m => MonadIO (WorkerContextM m)
forall (m :: * -> *). MonadIO m => MonadLogger (WorkerContextM m)
forall (m :: * -> *).
MonadIO m =>
WorkerContextM m (Loc -> Text -> LogLevel -> LogStr -> IO ())
forall (m :: * -> *).
(MonadLogger m, MonadIO m) =>
m (Loc -> Text -> LogLevel -> LogStr -> IO ()) -> MonadLoggerIO m
$caskLoggerIO :: forall (m :: * -> *).
MonadIO m =>
WorkerContextM m (Loc -> Text -> LogLevel -> LogStr -> IO ())
askLoggerIO :: WorkerContextM m (Loc -> Text -> LogLevel -> LogStr -> IO ())
MonadLoggerIO, MonadThrow (WorkerContextM m)
MonadThrow (WorkerContextM m) =>
(forall e a.
 (HasCallStack, Exception e) =>
 WorkerContextM m a
 -> (e -> WorkerContextM m a) -> WorkerContextM m a)
-> MonadCatch (WorkerContextM m)
forall e a.
(HasCallStack, Exception e) =>
WorkerContextM m a
-> (e -> WorkerContextM m a) -> WorkerContextM m a
forall (m :: * -> *).
MonadThrow m =>
(forall e a.
 (HasCallStack, Exception e) =>
 m a -> (e -> m a) -> m a)
-> MonadCatch m
forall (m :: * -> *). MonadCatch m => MonadThrow (WorkerContextM m)
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
WorkerContextM m a
-> (e -> WorkerContextM m a) -> WorkerContextM m a
$ccatch :: forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
WorkerContextM m a
-> (e -> WorkerContextM m a) -> WorkerContextM m a
catch :: forall e a.
(HasCallStack, Exception e) =>
WorkerContextM m a
-> (e -> WorkerContextM m a) -> WorkerContextM m a
MonadCatch, Monad (WorkerContextM m)
Monad (WorkerContextM m) =>
(forall e a.
 (HasCallStack, Exception e) =>
 e -> WorkerContextM m a)
-> MonadThrow (WorkerContextM m)
forall e a. (HasCallStack, Exception e) => e -> WorkerContextM m a
forall (m :: * -> *).
Monad m =>
(forall e a. (HasCallStack, Exception e) => e -> m a)
-> MonadThrow m
forall (m :: * -> *). MonadThrow m => Monad (WorkerContextM m)
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> WorkerContextM m a
$cthrowM :: forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> WorkerContextM m a
throwM :: forall e a. (HasCallStack, Exception e) => e -> WorkerContextM m a
MonadThrow)


instance Monad m => MonadTracer (WorkerContextM m) where
  getTracer :: WorkerContextM m Tracer
getTracer = ReaderT Tracer (LoggingT m) Tracer -> WorkerContextM m Tracer
forall (m :: * -> *) a.
ReaderT Tracer (LoggingT m) a -> WorkerContextM m a
WorkerContextM ReaderT Tracer (LoggingT m) Tracer
forall r (m :: * -> *). MonadReader r m => m r
ask


startWorker :: (MonadUnliftIO m, MonadCatch m) => Client -> WorkerConfig actEnv -> m (Temporal.Worker.Worker actEnv)
startWorker :: forall (m :: * -> *) actEnv.
(MonadUnliftIO m, MonadCatch m) =>
Client -> WorkerConfig actEnv -> m (Worker actEnv)
startWorker Client
client WorkerConfig actEnv
conf = (RequireCallStackImpl => m (Worker actEnv)) -> m (Worker actEnv)
forall r. (RequireCallStackImpl => r) -> r
provideCallStack ((RequireCallStackImpl => m (Worker actEnv)) -> m (Worker actEnv))
-> (RequireCallStackImpl => m (Worker actEnv)) -> m (Worker actEnv)
forall a b. (a -> b) -> a -> b
$ WorkerConfig actEnv
-> WorkerContextM m (Worker actEnv) -> m (Worker actEnv)
forall conf (m :: * -> *) a.
WorkerConfig conf -> WorkerContextM m a -> m a
runWorkerContext WorkerConfig actEnv
conf (WorkerContextM m (Worker actEnv) -> m (Worker actEnv))
-> WorkerContextM m (Worker actEnv) -> m (Worker actEnv)
forall a b. (a -> b) -> a -> b
$ Text
-> SpanArguments
-> WorkerContextM m (Worker actEnv)
-> WorkerContextM m (Worker actEnv)
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadTracer m, HasCallStack) =>
Text -> SpanArguments -> m a -> m a
inSpan Text
"startWorker" SpanArguments
defaultSpanArguments (WorkerContextM m (Worker actEnv)
 -> WorkerContextM m (Worker actEnv))
-> WorkerContextM m (Worker actEnv)
-> WorkerContextM m (Worker actEnv)
forall a b. (a -> b) -> a -> b
$ do
  workerCore <-
    Text
-> SpanArguments
-> WorkerContextM m (Worker 'Real)
-> WorkerContextM m (Worker 'Real)
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadTracer m, HasCallStack) =>
Text -> SpanArguments -> m a -> m a
inSpan
      Text
"Core.newWorker"
      SpanArguments
defaultSpanArguments
      ((WorkerError -> WorkerContextM m (Worker 'Real))
-> (Worker 'Real -> WorkerContextM m (Worker 'Real))
-> Either WorkerError (Worker 'Real)
-> WorkerContextM m (Worker 'Real)
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either WorkerError -> WorkerContextM m (Worker 'Real)
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO Worker 'Real -> WorkerContextM m (Worker 'Real)
forall a. a -> WorkerContextM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either WorkerError (Worker 'Real)
 -> WorkerContextM m (Worker 'Real))
-> WorkerContextM m (Either WorkerError (Worker 'Real))
-> WorkerContextM m (Worker 'Real)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO (Either WorkerError (Worker 'Real))
-> WorkerContextM m (Either WorkerError (Worker 'Real))
forall a. IO a -> WorkerContextM m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Client -> WorkerConfig -> IO (Either WorkerError (Worker 'Real))
Core.newWorker Client
client WorkerConfig actEnv
conf.coreConfig))
  validationRes <- liftIO $ Core.validateWorker workerCore
  workerEvictionEmitter <- newBroadcastTChanIO
  case validationRes of
    Left WorkerValidationError
err -> WorkerValidationError -> WorkerContextM m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO WorkerValidationError
err
    Right () -> () -> WorkerContextM m ()
forall a. a -> WorkerContextM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
  runningWorkflows <- newTVarIO mempty
  runningActivities <- newTVarIO mempty
  activityEnv <- newIORef conf.actEnv
  let errorConverters = [ApplicationFailureHandler] -> [ApplicationFailureHandler]
mkAnnotatedHandlers WorkerConfig actEnv
conf.applicationErrorConverters
      workerWorkflowFunctions = WorkerConfig actEnv
conf.wfDefs
      workerTaskQueue = Text -> TaskQueue
TaskQueue (Text -> TaskQueue) -> Text -> TaskQueue
forall a b. (a -> b) -> a -> b
$ WorkerConfig -> Text
Core.taskQueue WorkerConfig actEnv
conf.coreConfig
      workerInboundInterceptors = WorkerConfig actEnv
conf.interceptorConfig.workflowInboundInterceptors
      workerOutboundInterceptors = WorkerConfig actEnv
conf.interceptorConfig.workflowOutboundInterceptors
      workerDeadlockTimeout = WorkerConfig actEnv
conf.deadlockTimeout
      workerErrorConverters = [ApplicationFailureHandler]
errorConverters
      processor = WorkerConfig actEnv
conf.payloadProcessor
      workflowWorker = Workflow.WorkflowWorker {[ApplicationFailureHandler]
Maybe Int
HashMap Text WorkflowDefinition
TVar (HashMap RunId WorkflowInstance)
TChan EvictionWithRunID
Client
Worker 'Real
InactiveForReplay 'Real Client
PayloadProcessor
TaskQueue
WorkflowOutboundInterceptor
WorkflowInboundInterceptor
workerEvictionEmitter :: TChan EvictionWithRunID
processor :: PayloadProcessor
workerErrorConverters :: [ApplicationFailureHandler]
workerTaskQueue :: TaskQueue
workerDeadlockTimeout :: Maybe Int
workerOutboundInterceptors :: WorkflowOutboundInterceptor
workerInboundInterceptors :: WorkflowInboundInterceptor
workerCore :: Worker 'Real
workerClient :: InactiveForReplay 'Real Client
runningWorkflows :: TVar (HashMap RunId WorkflowInstance)
workerWorkflowFunctions :: HashMap Text WorkflowDefinition
workerCore :: Worker 'Real
workerEvictionEmitter :: TChan EvictionWithRunID
runningWorkflows :: TVar (HashMap RunId WorkflowInstance)
workerWorkflowFunctions :: HashMap Text WorkflowDefinition
workerTaskQueue :: TaskQueue
workerInboundInterceptors :: WorkflowInboundInterceptor
workerOutboundInterceptors :: WorkflowOutboundInterceptor
workerDeadlockTimeout :: Maybe Int
workerErrorConverters :: [ApplicationFailureHandler]
processor :: PayloadProcessor
workerClient :: Client
..}
      definitions = WorkerConfig actEnv
conf.actDefs
      activityInboundInterceptors = WorkerConfig actEnv
conf.interceptorConfig.activityInboundInterceptors
      activityOutboundInterceptors = WorkerConfig actEnv
conf.interceptorConfig.activityOutboundInterceptors
      clientInterceptors = WorkerConfig actEnv
conf.interceptorConfig.clientInterceptors
      activityErrorConverters = [ApplicationFailureHandler]
errorConverters
      payloadProcessor = WorkerConfig actEnv
conf.payloadProcessor
      workerActivityWorker = Activity.ActivityWorker {[ApplicationFailureHandler]
HashMap Text (ActivityDefinition actEnv)
TVar (HashMap TaskToken (Async ()))
IORef actEnv
Worker 'Real
PayloadProcessor
ClientInterceptors
ActivityOutboundInterceptor actEnv
ActivityInboundInterceptor actEnv
workerCore :: Worker 'Real
runningActivities :: TVar (HashMap TaskToken (Async ()))
activityEnv :: IORef actEnv
definitions :: HashMap Text (ActivityDefinition actEnv)
activityInboundInterceptors :: ActivityInboundInterceptor actEnv
activityOutboundInterceptors :: ActivityOutboundInterceptor actEnv
clientInterceptors :: ClientInterceptors
activityErrorConverters :: [ApplicationFailureHandler]
payloadProcessor :: PayloadProcessor
payloadProcessor :: PayloadProcessor
activityErrorConverters :: [ApplicationFailureHandler]
clientInterceptors :: ClientInterceptors
activityOutboundInterceptors :: ActivityOutboundInterceptor actEnv
activityInboundInterceptors :: ActivityInboundInterceptor actEnv
workerCore :: Worker 'Real
runningActivities :: TVar (HashMap TaskToken (Async ()))
definitions :: HashMap Text (ActivityDefinition actEnv)
activityEnv :: IORef actEnv
..}
      workerClient = Client
client
      workerTracer = TracerProvider -> InstrumentationLibrary -> TracerOptions -> Tracer
makeTracer WorkerConfig actEnv
conf.tracerProvider (Text -> Text -> InstrumentationLibrary
InstrumentationLibrary Text
"hs-temporal-sdk" Text
"0.0.1.0") TracerOptions
tracerOptions
  let workerType = SWorkerType 'Real
Core.SReal
  -- logs <- liftIO $ fetchLogs globalRuntime
  -- forM_ logs $ \l -> case l.level of
  --   Trace -> $(logDebug) l.message
  --   Debug -> $(logDebug) l.message
  --   Info -> $(logInfo) l.message
  --   Warn -> $(logWarn) l.message
  --   Error -> $(logError) l.message
  workerWorkflowLoop <- asyncLabelled (T.unpack $ T.concat ["temporal/worker/workflow/", Core.namespace conf.coreConfig, "/", Core.taskQueue conf.coreConfig]) $ do
    $(logDebug) "Starting workflow worker loop"
    Workflow.execute workflowWorker
      `UnliftIO.finally` $(logDebug) "Exiting workflow worker loop"
  workerActivityLoop <- asyncLabelled (T.unpack $ T.concat ["temporal/worker/activity/", Core.namespace conf.coreConfig, "/", Core.taskQueue conf.coreConfig]) $ do
    $(logDebug) "Starting activity worker loop"
    Activity.execute workerActivityWorker
      `UnliftIO.finally` $(logDebug) "Exiting activity worker loop"
  pure Temporal.Worker.Worker {..}


{- | Wait for a worker to exit. This waits for both the workflow and activity loops to complete.

Any exceptions thrown by the workflow or activity loops will be rethrown.

This function is generally not needed, as 'shutdown' will wait for the worker to exit.
-}
waitWorker :: (MonadIO m) => Temporal.Worker.Worker actEnv -> m ()
waitWorker :: forall (m :: * -> *) actEnv. MonadIO m => Worker actEnv -> m ()
waitWorker (Temporal.Worker.Worker {SWorkerType ty
workerType :: ()
workerType :: SWorkerType ty
workerType, Async ()
workerWorkflowLoop :: forall env. Worker env -> Async ()
workerWorkflowLoop :: Async ()
workerWorkflowLoop, InactiveForReplay ty (Async ())
workerActivityLoop :: ()
workerActivityLoop :: InactiveForReplay ty (Async ())
workerActivityLoop}) = do
  case SWorkerType ty
workerType of
    SWorkerType ty
Core.SReal -> do
      wfRes <- Async () -> m (Either SomeException ())
forall (m :: * -> *) a.
MonadIO m =>
Async a -> m (Either SomeException a)
waitCatch Async ()
workerWorkflowLoop
      actRes <- waitCatch workerActivityLoop
      for_ (lefts [wfRes, actRes]) throwIO
    SWorkerType ty
Core.SReplay -> do
      _ <- Async () -> m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m a
wait Async ()
workerWorkflowLoop
      pure ()


-- logs <- liftIO $ fetchLogs globalRuntime
-- forM_ logs $ \l -> do
--   $(logInfo) $ Text.pack $ show l

{- | Link a worker to the current thread. This will cause uncaught worker exceptions to be rethrown in the current thread.

If the worker has both workflow and activity definitions registered, this will link both the workflow and activity loops.
If one of the loops fails, the worker will initiate graceful shutdown (but not block) before rethrowing the exception.
-}
linkWorker :: Temporal.Worker.Worker actEnv -> IO ()
linkWorker :: forall actEnv. Worker actEnv -> IO ()
linkWorker Temporal.Worker.Worker {Worker ty
workerCore :: ()
workerCore :: Worker ty
workerCore, SWorkerType ty
workerType :: ()
workerType :: SWorkerType ty
workerType, Async ()
workerWorkflowLoop :: forall env. Worker env -> Async ()
workerWorkflowLoop :: Async ()
workerWorkflowLoop, InactiveForReplay ty (Async ())
workerActivityLoop :: ()
workerActivityLoop :: InactiveForReplay ty (Async ())
workerActivityLoop} = do
  tid <- IO ThreadId
myThreadId
  void $ forkIOLabelled (T.unpack $ T.concat ["temporal/worker/link/", Core.namespace c, "/", Core.taskQueue c]) $ do
    let errorClause :: SomeException -> IO ()
        errorClause SomeException
e = do
          Worker ty -> IO ()
forall (ty :: WorkerType). KnownWorkerType ty => Worker ty -> IO ()
Core.initiateShutdown Worker ty
workerCore
          ThreadId -> SomeException -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
Control.Concurrent.throwTo ThreadId
tid SomeException
e
    case workerType of
      SWorkerType ty
Core.SReal -> do
        eeRes <- IO (Either SomeException ())
-> IO (Either SomeException ())
-> IO (Either (Either SomeException ()) (Either SomeException ()))
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> m b -> m (Either a b)
race (Async () -> IO (Either SomeException ())
forall (m :: * -> *) a.
MonadIO m =>
Async a -> m (Either SomeException a)
waitCatch Async ()
workerWorkflowLoop) (Async () -> IO (Either SomeException ())
forall (m :: * -> *) a.
MonadIO m =>
Async a -> m (Either SomeException a)
waitCatch Async ()
InactiveForReplay ty (Async ())
workerActivityLoop)
        case eeRes of
          Left (Left SomeException
e) -> SomeException -> IO ()
errorClause SomeException
e
          Right (Left SomeException
e) -> SomeException -> IO ()
errorClause SomeException
e
          Either (Either SomeException ()) (Either SomeException ())
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
      SWorkerType ty
Core.SReplay -> do
        eRes <- Async () -> IO (Either SomeException ())
forall (m :: * -> *) a.
MonadIO m =>
Async a -> m (Either SomeException a)
waitCatch Async ()
workerWorkflowLoop
        case eRes of
          Left SomeException
e -> SomeException -> IO ()
errorClause SomeException
e
          Either SomeException ()
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
  where
    c :: WorkerConfig
c = Worker ty -> WorkerConfig
forall (ty :: WorkerType). Worker ty -> WorkerConfig
Core.getWorkerConfig Worker ty
workerCore


pollWorkerSTM :: Temporal.Worker.Worker actEnv -> STM (Maybe (Either SomeException ()))
pollWorkerSTM :: forall actEnv.
Worker actEnv -> STM (Maybe (Either SomeException ()))
pollWorkerSTM Temporal.Worker.Worker {SWorkerType ty
workerType :: ()
workerType :: SWorkerType ty
workerType, Async ()
workerWorkflowLoop :: forall env. Worker env -> Async ()
workerWorkflowLoop :: Async ()
workerWorkflowLoop, InactiveForReplay ty (Async ())
workerActivityLoop :: ()
workerActivityLoop :: InactiveForReplay ty (Async ())
workerActivityLoop} = do
  case SWorkerType ty
workerType of
    SWorkerType ty
Core.SReal -> do
      wfRes <- Async () -> STM (Maybe (Either SomeException ()))
forall a. Async a -> STM (Maybe (Either SomeException a))
pollSTM Async ()
workerWorkflowLoop
      actRes <- pollSTM workerActivityLoop
      pure $ case (wfRes, actRes) of
        (Just (Left SomeException
e), Maybe (Either SomeException ())
_) -> Either SomeException () -> Maybe (Either SomeException ())
forall a. a -> Maybe a
Just (SomeException -> Either SomeException ()
forall a b. a -> Either a b
Left SomeException
e)
        (Maybe (Either SomeException ())
_, Just (Left SomeException
e)) -> Either SomeException () -> Maybe (Either SomeException ())
forall a. a -> Maybe a
Just (SomeException -> Either SomeException ()
forall a b. a -> Either a b
Left SomeException
e)
        (Just (Right ()), Just (Right ())) -> Either SomeException () -> Maybe (Either SomeException ())
forall a. a -> Maybe a
Just (() -> Either SomeException ()
forall a b. b -> Either a b
Right ())
        (Maybe (Either SomeException ()), Maybe (Either SomeException ()))
_ -> Maybe (Either SomeException ())
forall a. Maybe a
Nothing
    SWorkerType ty
Core.SReplay -> Async () -> STM (Maybe (Either SomeException ()))
forall a. Async a -> STM (Maybe (Either SomeException a))
pollSTM Async ()
workerWorkflowLoop


waitWorkerSTM :: Temporal.Worker.Worker actEnv -> STM ()
waitWorkerSTM :: forall actEnv. Worker actEnv -> STM ()
waitWorkerSTM Temporal.Worker.Worker {SWorkerType ty
workerType :: ()
workerType :: SWorkerType ty
workerType, Async ()
workerWorkflowLoop :: forall env. Worker env -> Async ()
workerWorkflowLoop :: Async ()
workerWorkflowLoop, InactiveForReplay ty (Async ())
workerActivityLoop :: ()
workerActivityLoop :: InactiveForReplay ty (Async ())
workerActivityLoop} = do
  case SWorkerType ty
workerType of
    SWorkerType ty
Core.SReal -> do
      Async () -> STM ()
forall a. Async a -> STM a
waitSTM Async ()
workerWorkflowLoop
      Async () -> STM ()
forall a. Async a -> STM a
waitSTM Async ()
InactiveForReplay ty (Async ())
workerActivityLoop
    SWorkerType ty
Core.SReplay -> Async () -> STM ()
forall a. Async a -> STM a
waitSTM Async ()
workerWorkflowLoop


{- | Shut down a worker. This will initiate a graceful shutdown of the worker, waiting for all
in-flight tasks to complete before finalizing the shutdown.
-}
shutdown :: (MonadUnliftIO m) => Temporal.Worker.Worker actEnv -> m ()
shutdown :: forall (m :: * -> *) actEnv.
MonadUnliftIO m =>
Worker actEnv -> m ()
shutdown worker :: Worker actEnv
worker@Temporal.Worker.Worker {Worker ty
workerCore :: ()
workerCore :: Worker ty
workerCore, Tracer
workerTracer :: forall env. Worker env -> Tracer
workerTracer :: Tracer
workerTracer, SWorkerType ty
workerType :: ()
workerType :: SWorkerType ty
workerType, InactiveForReplay ty (ActivityWorker actEnv)
workerActivityWorker :: ()
workerActivityWorker :: InactiveForReplay ty (ActivityWorker actEnv)
workerActivityWorker} = Tracer -> Text -> SpanArguments -> m () -> m ()
forall (m :: * -> *) a.
(MonadUnliftIO m, HasCallStack) =>
Tracer -> Text -> SpanArguments -> m a -> m a
OT.inSpan Tracer
workerTracer Text
"shutdown" SpanArguments
defaultSpanArguments (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ ((forall a. m a -> m a) -> m ()) -> m ()
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> m a) -> m b) -> m b
UnliftIO.mask (((forall a. m a -> m a) -> m ()) -> m ())
-> ((forall a. m a -> m a) -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> m a
restore -> do
  Tracer -> Text -> SpanArguments -> m () -> m ()
forall (m :: * -> *) a.
(MonadUnliftIO m, HasCallStack) =>
Tracer -> Text -> SpanArguments -> m a -> m a
OT.inSpan Tracer
workerTracer Text
"initiateShutdown" SpanArguments
defaultSpanArguments (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Worker ty -> IO ()
forall (ty :: WorkerType). KnownWorkerType ty => Worker ty -> IO ()
Core.initiateShutdown Worker ty
workerCore

  -- Worker shutdown will wait on all activities to complete, so if a long-running activity does not respect cancellation,
  -- the shutdown may never complete. However, we do issue a shutdown notification to the activities in the form of an
  -- async exception, so they would have to be actively ignoring the shutdown notification to prevent the shutdown from completing.
  () <- case SWorkerType ty
workerType of
    SWorkerType ty
Core.SReal -> ActivityWorker actEnv -> m ()
forall (m :: * -> *) env. MonadIO m => ActivityWorker env -> m ()
Activity.notifyShutdown InactiveForReplay ty (ActivityWorker actEnv)
ActivityWorker actEnv
workerActivityWorker
    SWorkerType ty
Core.SReplay -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

  OT.inSpan workerTracer "waitWorker" defaultSpanArguments $ restore $ waitWorker worker

  err' <- OT.inSpan workerTracer "finalizeShutdown" defaultSpanArguments $ liftIO $ Core.finalizeShutdown workerCore
  case err' of
    Left WorkerError
err -> WorkerError -> m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO WorkerError
err
    Right () -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()


-- logs <- liftIO $ fetchLogs globalRuntime
-- forM_ logs $ \l -> do
--   $(logInfo) $ Text.pack $ show l

-- | Subscribe to evictions from the worker. This is not generally needed, but can be useful for debugging.
subscribeToEvictions :: MonadIO m => Temporal.Worker.Worker actEnv -> m (TChan Workflow.EvictionWithRunID)
subscribeToEvictions :: forall (m :: * -> *) actEnv.
MonadIO m =>
Worker actEnv -> m (TChan EvictionWithRunID)
subscribeToEvictions Worker actEnv
worker = IO (TChan EvictionWithRunID) -> m (TChan EvictionWithRunID)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TChan EvictionWithRunID) -> m (TChan EvictionWithRunID))
-> IO (TChan EvictionWithRunID) -> m (TChan EvictionWithRunID)
forall a b. (a -> b) -> a -> b
$ STM (TChan EvictionWithRunID) -> IO (TChan EvictionWithRunID)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (TChan EvictionWithRunID) -> IO (TChan EvictionWithRunID))
-> STM (TChan EvictionWithRunID) -> IO (TChan EvictionWithRunID)
forall a b. (a -> b) -> a -> b
$ TChan EvictionWithRunID -> STM (TChan EvictionWithRunID)
forall a. TChan a -> STM (TChan a)
dupTChan Worker actEnv
worker.workerEvictionEmitter


-- | Subscribe to evictions from the worker. This is not generally needed, but can be useful for debugging.
subscribeToEvictionsSTM :: Temporal.Worker.Worker actEnv -> STM (TChan Workflow.EvictionWithRunID)
subscribeToEvictionsSTM :: forall actEnv. Worker actEnv -> STM (TChan EvictionWithRunID)
subscribeToEvictionsSTM Worker actEnv
worker = TChan EvictionWithRunID -> STM (TChan EvictionWithRunID)
forall a. TChan a -> STM (TChan a)
dupTChan Worker actEnv
worker.workerEvictionEmitter


evictionMessage :: Workflow.EvictionWithRunID -> Text
evictionMessage :: EvictionWithRunID -> Text
evictionMessage Workflow.EvictionWithRunID {RemoveFromCache
eviction :: RemoveFromCache
eviction :: EvictionWithRunID -> RemoveFromCache
eviction} = RemoveFromCache
eviction RemoveFromCache
-> FoldLike Text RemoveFromCache RemoveFromCache Text Text -> Text
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike Text RemoveFromCache RemoveFromCache Text Text
forall (f :: * -> *) s a.
(Functor f, HasField s "message" a) =>
LensLike' f s a
Activation.message


evictionWasFatal :: Workflow.EvictionWithRunID -> Bool
evictionWasFatal :: EvictionWithRunID -> Bool
evictionWasFatal Workflow.EvictionWithRunID {RemoveFromCache
eviction :: EvictionWithRunID -> RemoveFromCache
eviction :: RemoveFromCache
eviction} = case RemoveFromCache
eviction RemoveFromCache
-> FoldLike
     RemoveFromCache'EvictionReason
     RemoveFromCache
     RemoveFromCache
     RemoveFromCache'EvictionReason
     RemoveFromCache'EvictionReason
-> RemoveFromCache'EvictionReason
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
  RemoveFromCache'EvictionReason
  RemoveFromCache
  RemoveFromCache
  RemoveFromCache'EvictionReason
  RemoveFromCache'EvictionReason
forall (f :: * -> *) s a.
(Functor f, HasField s "reason" a) =>
LensLike' f s a
Activation.reason of
  RemoveFromCache'EvictionReason
RemoveFromCache'FATAL -> Bool
True
  RemoveFromCache'EvictionReason
_ -> Bool
False


{- | Replace the environment for activities run within a Worker. This is generally not needed, but can be useful for reusing
a Worker under test.
-}
replaceEnvironment :: MonadIO m => Temporal.Worker.Worker actEnv -> actEnv -> m ()
replaceEnvironment :: forall (m :: * -> *) actEnv.
MonadIO m =>
Worker actEnv -> actEnv -> m ()
replaceEnvironment Temporal.Worker.Worker {Async ()
Tracer
TChan EvictionWithRunID
Worker ty
InactiveForReplay ty (Async ())
InactiveForReplay ty (ActivityWorker actEnv)
SWorkerType ty
workerType :: ()
workerWorkflowLoop :: forall env. Worker env -> Async ()
workerActivityLoop :: ()
workerActivityWorker :: ()
workerCore :: ()
workerTracer :: forall env. Worker env -> Tracer
workerEvictionEmitter :: forall env. Worker env -> TChan EvictionWithRunID
workerType :: SWorkerType ty
workerWorkflowLoop :: Async ()
workerActivityLoop :: InactiveForReplay ty (Async ())
workerActivityWorker :: InactiveForReplay ty (ActivityWorker actEnv)
workerCore :: Worker ty
workerTracer :: Tracer
workerEvictionEmitter :: TChan EvictionWithRunID
..} actEnv
env = do
  case SWorkerType ty
workerType of
    SWorkerType ty
Core.SReal -> do
      IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef actEnv -> actEnv -> IO ()
forall (m :: * -> *) a. MonadIO m => IORef a -> a -> m ()
writeIORef InactiveForReplay ty (ActivityWorker actEnv)
ActivityWorker actEnv
workerActivityWorker.activityEnv actEnv
env
    SWorkerType ty
Core.SReplay -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()