{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}
module Temporal.Worker (
Temporal.Worker.Worker,
startWorker,
waitWorker,
shutdown,
linkWorker,
pollWorkerSTM,
waitWorkerSTM,
replaceEnvironment,
WorkerConfig (..),
Definitions (..),
ToDefinitions (..),
ConfigM,
configure,
runReplayHistory,
ReplayHistoryFailure (..),
subscribeToEvictions,
subscribeToEvictionsSTM,
Workflow.EvictionWithRunID (..),
evictionMessage,
evictionWasFatal,
WorkflowDef (..),
ActivityDef (..),
addErrorConverter,
setLogger,
setTracerProvider,
setNamespace,
setTaskQueue,
setBuildId,
setIdentity,
setMaxCachedWorkflows,
setMaxOutstandingWorkflowTasks,
setMaxOutstandingActivities,
setMaxOutstandingLocalActivities,
setMaxConcurrentWorkflowTaskPolls,
setNonstickyToStickyPollRatio,
setMaxConcurrentActivityTaskPolls,
setNoRemoteActivities,
setStickyQueueScheduleToStartTimeoutMillis,
setMaxHeartbeatThrottleIntervalMillis,
setDefaultHeartbeatThrottleIntervalMillis,
setMaxActivitiesPerSecond,
setMaxTaskQueueActivitiesPerSecond,
setGracefulShutdownPeriodMillis,
addInterceptors,
setPayloadProcessor,
WorkflowId (..),
) where
import Control.Concurrent
import Control.Exception (Exception (..))
import Control.Monad
import Control.Monad.Catch
import Control.Monad.Logger
import Control.Monad.Reader
import Control.Monad.State
import Data.Either (lefts)
import Data.Foldable
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as HashMap
import Data.ProtoLens (encodeMessage)
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import Data.UUID (UUID)
import qualified Data.UUID as UUID
import Data.UUID.V4 (nextRandom)
import Data.Word
import Lens.Family2
import OpenTelemetry.Attributes (emptyAttributes)
import OpenTelemetry.Trace.Core hiding (inSpan)
import qualified OpenTelemetry.Trace.Core as OT
import OpenTelemetry.Trace.Monad
import Proto.Temporal.Api.History.V1.Message (History)
import Proto.Temporal.Api.History.V1.Message_Fields (maybe'workflowExecutionStartedEventAttributes, vec'events, workflowId)
import Proto.Temporal.Sdk.Core.WorkflowActivation.WorkflowActivation
import qualified Proto.Temporal.Sdk.Core.WorkflowActivation.WorkflowActivation_Fields as Activation
import RequireCallStack
import System.IO.Unsafe
import Temporal.Activity.Definition
import qualified Temporal.Activity.Worker as Activity
import Temporal.Common
import Temporal.Common.Async
import Temporal.Core.Client
import Temporal.Core.Worker (InactiveForReplay, WorkerError)
import qualified Temporal.Core.Worker as Core
import Temporal.Exception
import Temporal.Interceptor
import Temporal.Payload (PayloadProcessor (..))
import Temporal.Runtime
import Temporal.Worker.Types
import Temporal.Workflow.Definition
import qualified Temporal.Workflow.Worker as Workflow
import UnliftIO
newtype ConfigM actEnv a = ConfigM {forall actEnv a. ConfigM actEnv a -> State (WorkerConfig actEnv) a
unConfigM :: State (WorkerConfig actEnv) a}
deriving newtype ((forall a b. (a -> b) -> ConfigM actEnv a -> ConfigM actEnv b)
-> (forall a b. a -> ConfigM actEnv b -> ConfigM actEnv a)
-> Functor (ConfigM actEnv)
forall a b. a -> ConfigM actEnv b -> ConfigM actEnv a
forall a b. (a -> b) -> ConfigM actEnv a -> ConfigM actEnv b
forall actEnv a b. a -> ConfigM actEnv b -> ConfigM actEnv a
forall actEnv a b. (a -> b) -> ConfigM actEnv a -> ConfigM actEnv b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
$cfmap :: forall actEnv a b. (a -> b) -> ConfigM actEnv a -> ConfigM actEnv b
fmap :: forall a b. (a -> b) -> ConfigM actEnv a -> ConfigM actEnv b
$c<$ :: forall actEnv a b. a -> ConfigM actEnv b -> ConfigM actEnv a
<$ :: forall a b. a -> ConfigM actEnv b -> ConfigM actEnv a
Functor, Functor (ConfigM actEnv)
Functor (ConfigM actEnv) =>
(forall a. a -> ConfigM actEnv a)
-> (forall a b.
ConfigM actEnv (a -> b) -> ConfigM actEnv a -> ConfigM actEnv b)
-> (forall a b c.
(a -> b -> c)
-> ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv c)
-> (forall a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv b)
-> (forall a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv a)
-> Applicative (ConfigM actEnv)
forall actEnv. Functor (ConfigM actEnv)
forall a. a -> ConfigM actEnv a
forall actEnv a. a -> ConfigM actEnv a
forall a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv a
forall a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv b
forall a b.
ConfigM actEnv (a -> b) -> ConfigM actEnv a -> ConfigM actEnv b
forall actEnv a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv a
forall actEnv a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv b
forall actEnv a b.
ConfigM actEnv (a -> b) -> ConfigM actEnv a -> ConfigM actEnv b
forall a b c.
(a -> b -> c)
-> ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv c
forall actEnv a b c.
(a -> b -> c)
-> ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv c
forall (f :: * -> *).
Functor f =>
(forall a. a -> f a)
-> (forall a b. f (a -> b) -> f a -> f b)
-> (forall a b c. (a -> b -> c) -> f a -> f b -> f c)
-> (forall a b. f a -> f b -> f b)
-> (forall a b. f a -> f b -> f a)
-> Applicative f
$cpure :: forall actEnv a. a -> ConfigM actEnv a
pure :: forall a. a -> ConfigM actEnv a
$c<*> :: forall actEnv a b.
ConfigM actEnv (a -> b) -> ConfigM actEnv a -> ConfigM actEnv b
<*> :: forall a b.
ConfigM actEnv (a -> b) -> ConfigM actEnv a -> ConfigM actEnv b
$cliftA2 :: forall actEnv a b c.
(a -> b -> c)
-> ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv c
liftA2 :: forall a b c.
(a -> b -> c)
-> ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv c
$c*> :: forall actEnv a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv b
*> :: forall a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv b
$c<* :: forall actEnv a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv a
<* :: forall a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv a
Applicative, Applicative (ConfigM actEnv)
Applicative (ConfigM actEnv) =>
(forall a b.
ConfigM actEnv a -> (a -> ConfigM actEnv b) -> ConfigM actEnv b)
-> (forall a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv b)
-> (forall a. a -> ConfigM actEnv a)
-> Monad (ConfigM actEnv)
forall actEnv. Applicative (ConfigM actEnv)
forall a. a -> ConfigM actEnv a
forall actEnv a. a -> ConfigM actEnv a
forall a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv b
forall a b.
ConfigM actEnv a -> (a -> ConfigM actEnv b) -> ConfigM actEnv b
forall actEnv a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv b
forall actEnv a b.
ConfigM actEnv a -> (a -> ConfigM actEnv b) -> ConfigM actEnv b
forall (m :: * -> *).
Applicative m =>
(forall a b. m a -> (a -> m b) -> m b)
-> (forall a b. m a -> m b -> m b)
-> (forall a. a -> m a)
-> Monad m
$c>>= :: forall actEnv a b.
ConfigM actEnv a -> (a -> ConfigM actEnv b) -> ConfigM actEnv b
>>= :: forall a b.
ConfigM actEnv a -> (a -> ConfigM actEnv b) -> ConfigM actEnv b
$c>> :: forall actEnv a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv b
>> :: forall a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv b
$creturn :: forall actEnv a. a -> ConfigM actEnv a
return :: forall a. a -> ConfigM actEnv a
Monad)
instance Semigroup a => Semigroup (ConfigM actEnv a) where
<> :: ConfigM actEnv a -> ConfigM actEnv a -> ConfigM actEnv a
(<>) = (a -> a -> a)
-> ConfigM actEnv a -> ConfigM actEnv a -> ConfigM actEnv a
forall a b c.
(a -> b -> c)
-> ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv c
forall (f :: * -> *) a b c.
Applicative f =>
(a -> b -> c) -> f a -> f b -> f c
liftA2 a -> a -> a
forall a. Semigroup a => a -> a -> a
(<>)
instance Monoid a => Monoid (ConfigM actEnv a) where
mempty :: ConfigM actEnv a
mempty = a -> ConfigM actEnv a
forall a. a -> ConfigM actEnv a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
forall a. Monoid a => a
mempty
inertTracerProvider :: TracerProvider
inertTracerProvider :: TracerProvider
inertTracerProvider = IO TracerProvider -> TracerProvider
forall a. IO a -> a
unsafePerformIO (IO TracerProvider -> TracerProvider)
-> IO TracerProvider -> TracerProvider
forall a b. (a -> b) -> a -> b
$ [Processor] -> TracerProviderOptions -> IO TracerProvider
forall (m :: * -> *).
MonadIO m =>
[Processor] -> TracerProviderOptions -> m TracerProvider
createTracerProvider [] TracerProviderOptions
emptyTracerProviderOptions
{-# NOINLINE inertTracerProvider #-}
configure :: ToDefinitions actEnv defs => actEnv -> defs -> ConfigM actEnv () -> WorkerConfig actEnv
configure :: forall actEnv defs.
ToDefinitions actEnv defs =>
actEnv -> defs -> ConfigM actEnv () -> WorkerConfig actEnv
configure actEnv
actEnv defs
defs = (State (WorkerConfig actEnv) ()
-> WorkerConfig actEnv -> WorkerConfig actEnv)
-> WorkerConfig actEnv
-> State (WorkerConfig actEnv) ()
-> WorkerConfig actEnv
forall a b c. (a -> b -> c) -> b -> a -> c
flip State (WorkerConfig actEnv) ()
-> WorkerConfig actEnv -> WorkerConfig actEnv
forall s a. State s a -> s -> s
execState WorkerConfig actEnv
defaultConfig (State (WorkerConfig actEnv) () -> WorkerConfig actEnv)
-> (ConfigM actEnv () -> State (WorkerConfig actEnv) ())
-> ConfigM actEnv ()
-> WorkerConfig actEnv
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConfigM actEnv () -> State (WorkerConfig actEnv) ()
forall actEnv a. ConfigM actEnv a -> State (WorkerConfig actEnv) a
unConfigM
where
defaultConfig :: WorkerConfig actEnv
defaultConfig =
let (Definitions HashMap Text WorkflowDefinition
wfDefs HashMap Text (ActivityDefinition actEnv)
actDefs) = defs -> Definitions actEnv
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions defs
defs
in WorkerConfig
{ coreConfig :: WorkerConfig
coreConfig = WorkerConfig
Core.defaultWorkerConfig
, deadlockTimeout :: Maybe Int
deadlockTimeout = Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1000000
, interceptorConfig :: Interceptors actEnv
interceptorConfig = Interceptors actEnv
forall a. Monoid a => a
mempty
, applicationErrorConverters :: [ApplicationFailureHandler]
applicationErrorConverters = [ApplicationFailureHandler]
standardApplicationFailureHandlers
, logger :: Loc -> Text -> LogLevel -> LogStr -> IO ()
logger = \Loc
_ Text
_ LogLevel
_ LogStr
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
, tracerProvider :: TracerProvider
tracerProvider = TracerProvider
inertTracerProvider
, payloadProcessor :: PayloadProcessor
payloadProcessor = (Payload -> IO Payload)
-> (Payload -> IO (Either String Payload)) -> PayloadProcessor
PayloadProcessor Payload -> IO Payload
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either String Payload -> IO (Either String Payload)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either String Payload -> IO (Either String Payload))
-> (Payload -> Either String Payload)
-> Payload
-> IO (Either String Payload)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Payload -> Either String Payload
forall a b. b -> Either a b
Right)
, actEnv
HashMap Text WorkflowDefinition
HashMap Text (ActivityDefinition actEnv)
actEnv :: actEnv
wfDefs :: HashMap Text WorkflowDefinition
actDefs :: HashMap Text (ActivityDefinition actEnv)
actDefs :: HashMap Text (ActivityDefinition actEnv)
actEnv :: actEnv
wfDefs :: HashMap Text WorkflowDefinition
..
}
data Definitions env = Definitions
{ forall env. Definitions env -> HashMap Text WorkflowDefinition
workflowDefinitions :: {-# UNPACK #-} !(HashMap Text WorkflowDefinition)
, forall env.
Definitions env -> HashMap Text (ActivityDefinition env)
activityDefinitions :: {-# UNPACK #-} !(HashMap Text (ActivityDefinition env))
}
instance Semigroup (Definitions env) where
(Definitions HashMap Text WorkflowDefinition
w1 HashMap Text (ActivityDefinition env)
a1) <> :: Definitions env -> Definitions env -> Definitions env
<> (Definitions HashMap Text WorkflowDefinition
w2 HashMap Text (ActivityDefinition env)
a2) = HashMap Text WorkflowDefinition
-> HashMap Text (ActivityDefinition env) -> Definitions env
forall env.
HashMap Text WorkflowDefinition
-> HashMap Text (ActivityDefinition env) -> Definitions env
Definitions (HashMap Text WorkflowDefinition
w1 HashMap Text WorkflowDefinition
-> HashMap Text WorkflowDefinition
-> HashMap Text WorkflowDefinition
forall a. Semigroup a => a -> a -> a
<> HashMap Text WorkflowDefinition
w2) (HashMap Text (ActivityDefinition env)
a1 HashMap Text (ActivityDefinition env)
-> HashMap Text (ActivityDefinition env)
-> HashMap Text (ActivityDefinition env)
forall a. Semigroup a => a -> a -> a
<> HashMap Text (ActivityDefinition env)
a2)
instance Monoid (Definitions env) where
mempty :: Definitions env
mempty = HashMap Text WorkflowDefinition
-> HashMap Text (ActivityDefinition env) -> Definitions env
forall env.
HashMap Text WorkflowDefinition
-> HashMap Text (ActivityDefinition env) -> Definitions env
Definitions HashMap Text WorkflowDefinition
forall a. Monoid a => a
mempty HashMap Text (ActivityDefinition env)
forall a. Monoid a => a
mempty
instance ToDefinitions env (Definitions env) where
toDefinitions :: Definitions env -> Definitions env
toDefinitions = Definitions env -> Definitions env
forall a. a -> a
id
class ToDefinitions env f where
toDefinitions :: f -> Definitions env
instance ToDefinitions env WorkflowDefinition where
toDefinitions :: WorkflowDefinition -> Definitions env
toDefinitions WorkflowDefinition
wf = HashMap Text WorkflowDefinition
-> HashMap Text (ActivityDefinition env) -> Definitions env
forall env.
HashMap Text WorkflowDefinition
-> HashMap Text (ActivityDefinition env) -> Definitions env
Definitions (Text -> WorkflowDefinition -> HashMap Text WorkflowDefinition
forall k v. Hashable k => k -> v -> HashMap k v
HashMap.singleton (WorkflowDefinition -> Text
workflowName WorkflowDefinition
wf) WorkflowDefinition
wf) HashMap Text (ActivityDefinition env)
forall a. Monoid a => a
mempty
instance ToDefinitions env (ProvidedWorkflow f) where
toDefinitions :: ProvidedWorkflow f -> Definitions env
toDefinitions ProvidedWorkflow f
wf = WorkflowDefinition -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions ProvidedWorkflow f
wf.definition
instance ToDefinitions env (ActivityDefinition env) where
toDefinitions :: ActivityDefinition env -> Definitions env
toDefinitions ActivityDefinition env
act = HashMap Text WorkflowDefinition
-> HashMap Text (ActivityDefinition env) -> Definitions env
forall env.
HashMap Text WorkflowDefinition
-> HashMap Text (ActivityDefinition env) -> Definitions env
Definitions HashMap Text WorkflowDefinition
forall a. Monoid a => a
mempty (Text
-> ActivityDefinition env -> HashMap Text (ActivityDefinition env)
forall k v. Hashable k => k -> v -> HashMap k v
HashMap.singleton (ActivityDefinition env -> Text
forall env. ActivityDefinition env -> Text
activityName ActivityDefinition env
act) ActivityDefinition env
act)
instance ToDefinitions env (ProvidedActivity env f) where
toDefinitions :: ProvidedActivity env f -> Definitions env
toDefinitions ProvidedActivity env f
act = ActivityDefinition env -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions ProvidedActivity env f
act.definition
instance (ToDefinitions env _1, ToDefinitions env _2) => ToDefinitions env (_1, _2) where
toDefinitions :: (_1, _2) -> Definitions env
toDefinitions (_1
a, _2
b) = _1 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _1
a Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _2 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _2
b
instance (ToDefinitions env _1, ToDefinitions env _2, ToDefinitions env _3) => ToDefinitions env (_1, _2, _3) where
toDefinitions :: (_1, _2, _3) -> Definitions env
toDefinitions (_1
a, _2
b, _3
c) = _1 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _1
a Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _2 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _2
b Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _3 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _3
c
instance (ToDefinitions env _1, ToDefinitions env _2, ToDefinitions env _3, ToDefinitions env _4) => ToDefinitions env (_1, _2, _3, _4) where
toDefinitions :: (_1, _2, _3, _4) -> Definitions env
toDefinitions (_1
a, _2
b, _3
c, _4
d) = _1 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _1
a Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _2 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _2
b Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _3 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _3
c Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _4 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _4
d
instance (ToDefinitions env _1, ToDefinitions env _2, ToDefinitions env _3, ToDefinitions env _4, ToDefinitions env _5) => ToDefinitions env (_1, _2, _3, _4, _5) where
toDefinitions :: (_1, _2, _3, _4, _5) -> Definitions env
toDefinitions (_1
a, _2
b, _3
c, _4
d, _5
e) = _1 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _1
a Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _2 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _2
b Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _3 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _3
c Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _4 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _4
d Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _5 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _5
e
instance (ToDefinitions env _1, ToDefinitions env _2, ToDefinitions env _3, ToDefinitions env _4, ToDefinitions env _5, ToDefinitions env _6) => ToDefinitions env (_1, _2, _3, _4, _5, _6) where
toDefinitions :: (_1, _2, _3, _4, _5, _6) -> Definitions env
toDefinitions (_1
a, _2
b, _3
c, _4
d, _5
e, _6
f) = _1 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _1
a Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _2 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _2
b Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _3 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _3
c Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _4 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _4
d Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _5 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _5
e Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _6 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _6
f
addInterceptors :: Interceptors actEnv -> ConfigM actEnv ()
addInterceptors :: forall actEnv. Interceptors actEnv -> ConfigM actEnv ()
addInterceptors Interceptors actEnv
i = State (WorkerConfig actEnv) () -> ConfigM actEnv ()
forall actEnv a. State (WorkerConfig actEnv) a -> ConfigM actEnv a
ConfigM (State (WorkerConfig actEnv) () -> ConfigM actEnv ())
-> State (WorkerConfig actEnv) () -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ (WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' ((WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ())
-> (WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig actEnv
conf ->
WorkerConfig actEnv
conf
{ interceptorConfig = i <> interceptorConfig conf
}
addErrorConverter :: (Exception e) => (e -> ApplicationFailure) -> ConfigM actEnv ()
addErrorConverter :: forall e actEnv.
Exception e =>
(e -> ApplicationFailure) -> ConfigM actEnv ()
addErrorConverter e -> ApplicationFailure
f = State (WorkerConfig actEnv) () -> ConfigM actEnv ()
forall actEnv a. State (WorkerConfig actEnv) a -> ConfigM actEnv a
ConfigM (State (WorkerConfig actEnv) () -> ConfigM actEnv ())
-> State (WorkerConfig actEnv) () -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ (WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' ((WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ())
-> (WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig actEnv
conf ->
WorkerConfig actEnv
conf
{ applicationErrorConverters = ApplicationFailureHandler f : applicationErrorConverters conf
}
modifyCore :: (Core.WorkerConfig -> Core.WorkerConfig) -> ConfigM actEnv ()
modifyCore :: forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore WorkerConfig -> WorkerConfig
f = State (WorkerConfig actEnv) () -> ConfigM actEnv ()
forall actEnv a. State (WorkerConfig actEnv) a -> ConfigM actEnv a
ConfigM (State (WorkerConfig actEnv) () -> ConfigM actEnv ())
-> State (WorkerConfig actEnv) () -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ (WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' ((WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ())
-> (WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig actEnv
conf ->
WorkerConfig actEnv
conf
{ coreConfig = f (coreConfig conf)
}
setNamespace :: Namespace -> ConfigM actEnv ()
setNamespace :: forall actEnv. Namespace -> ConfigM actEnv ()
setNamespace (Namespace Text
ns) = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
WorkerConfig
conf
{ Core.namespace = ns
}
setTaskQueue :: TaskQueue -> ConfigM actEnv ()
setTaskQueue :: forall actEnv. TaskQueue -> ConfigM actEnv ()
setTaskQueue (TaskQueue Text
tq) = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
WorkerConfig
conf
{ Core.taskQueue = tq
}
setBuildId :: Text -> ConfigM actEnv ()
setBuildId :: forall actEnv. Text -> ConfigM actEnv ()
setBuildId Text
bid = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
WorkerConfig
conf
{ Core.buildId = bid
}
setIdentity :: Text -> ConfigM actEnv ()
setIdentity :: forall actEnv. Text -> ConfigM actEnv ()
setIdentity Text
ident = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
WorkerConfig
conf
{ Core.identityOverride = Just ident
}
setMaxCachedWorkflows :: Word64 -> ConfigM actEnv ()
setMaxCachedWorkflows :: forall actEnv. Word64 -> ConfigM actEnv ()
setMaxCachedWorkflows Word64
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
WorkerConfig
conf
{ Core.maxCachedWorkflows = n
}
setMaxOutstandingWorkflowTasks :: Word64 -> ConfigM actEnv ()
setMaxOutstandingWorkflowTasks :: forall actEnv. Word64 -> ConfigM actEnv ()
setMaxOutstandingWorkflowTasks Word64
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
WorkerConfig
conf
{ Core.maxOutstandingWorkflowTasks = n
}
setMaxOutstandingActivities :: Word64 -> ConfigM actEnv ()
setMaxOutstandingActivities :: forall actEnv. Word64 -> ConfigM actEnv ()
setMaxOutstandingActivities Word64
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
WorkerConfig
conf
{ Core.maxOutstandingActivities = n
}
setMaxOutstandingLocalActivities :: Word64 -> ConfigM actEnv ()
setMaxOutstandingLocalActivities :: forall actEnv. Word64 -> ConfigM actEnv ()
setMaxOutstandingLocalActivities Word64
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
WorkerConfig
conf
{ Core.maxOutstandingLocalActivities = n
}
setMaxConcurrentWorkflowTaskPolls :: Word64 -> ConfigM actEnv ()
setMaxConcurrentWorkflowTaskPolls :: forall actEnv. Word64 -> ConfigM actEnv ()
setMaxConcurrentWorkflowTaskPolls Word64
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
WorkerConfig
conf
{ Core.maxConcurrentWorkflowTaskPolls = n
}
setNonstickyToStickyPollRatio :: Float -> ConfigM actEnv ()
setNonstickyToStickyPollRatio :: forall actEnv. Float -> ConfigM actEnv ()
setNonstickyToStickyPollRatio Float
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
WorkerConfig
conf
{ Core.nonstickyToStickyPollRatio = n
}
setMaxConcurrentActivityTaskPolls :: Word64 -> ConfigM actEnv ()
setMaxConcurrentActivityTaskPolls :: forall actEnv. Word64 -> ConfigM actEnv ()
setMaxConcurrentActivityTaskPolls Word64
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
WorkerConfig
conf
{ Core.maxConcurrentActivityTaskPolls = n
}
setNoRemoteActivities :: Bool -> ConfigM actEnv ()
setNoRemoteActivities :: forall actEnv. Bool -> ConfigM actEnv ()
setNoRemoteActivities Bool
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
WorkerConfig
conf
{ Core.noRemoteActivities = n
}
setStickyQueueScheduleToStartTimeoutMillis :: Word64 -> ConfigM actEnv ()
setStickyQueueScheduleToStartTimeoutMillis :: forall actEnv. Word64 -> ConfigM actEnv ()
setStickyQueueScheduleToStartTimeoutMillis Word64
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
WorkerConfig
conf
{ Core.stickyQueueScheduleToStartTimeoutMillis = n
}
setMaxHeartbeatThrottleIntervalMillis :: Word64 -> ConfigM actEnv ()
setMaxHeartbeatThrottleIntervalMillis :: forall actEnv. Word64 -> ConfigM actEnv ()
setMaxHeartbeatThrottleIntervalMillis Word64
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
WorkerConfig
conf
{ Core.maxHeartbeatThrottleIntervalMillis = n
}
setDefaultHeartbeatThrottleIntervalMillis :: Word64 -> ConfigM actEnv ()
setDefaultHeartbeatThrottleIntervalMillis :: forall actEnv. Word64 -> ConfigM actEnv ()
setDefaultHeartbeatThrottleIntervalMillis Word64
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
WorkerConfig
conf
{ Core.defaultHeartbeatThrottleIntervalMillis = n
}
setMaxActivitiesPerSecond :: Double -> ConfigM actEnv ()
setMaxActivitiesPerSecond :: forall actEnv. Double -> ConfigM actEnv ()
setMaxActivitiesPerSecond Double
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
WorkerConfig
conf
{ Core.maxActivitiesPerSecond = Just n
}
setMaxTaskQueueActivitiesPerSecond :: Double -> ConfigM actEnv ()
setMaxTaskQueueActivitiesPerSecond :: forall actEnv. Double -> ConfigM actEnv ()
setMaxTaskQueueActivitiesPerSecond Double
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
WorkerConfig
conf
{ Core.maxTaskQueueActivitiesPerSecond = Just n
}
setGracefulShutdownPeriodMillis :: Word64 -> ConfigM actEnv ()
setGracefulShutdownPeriodMillis :: forall actEnv. Word64 -> ConfigM actEnv ()
setGracefulShutdownPeriodMillis Word64
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
WorkerConfig
conf
{ Core.gracefulShutdownPeriodMillis = n
}
setLogger :: (Loc -> LogSource -> Control.Monad.Logger.LogLevel -> LogStr -> IO ()) -> ConfigM actEnv ()
setLogger :: forall actEnv.
(Loc -> Text -> LogLevel -> LogStr -> IO ()) -> ConfigM actEnv ()
setLogger Loc -> Text -> LogLevel -> LogStr -> IO ()
f = State (WorkerConfig actEnv) () -> ConfigM actEnv ()
forall actEnv a. State (WorkerConfig actEnv) a -> ConfigM actEnv a
ConfigM (State (WorkerConfig actEnv) () -> ConfigM actEnv ())
-> State (WorkerConfig actEnv) () -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ (WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' ((WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ())
-> (WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig actEnv
conf ->
WorkerConfig actEnv
conf
{ logger = f
}
setTracerProvider :: TracerProvider -> ConfigM actEnv ()
setTracerProvider :: forall actEnv. TracerProvider -> ConfigM actEnv ()
setTracerProvider TracerProvider
tp = State (WorkerConfig actEnv) () -> ConfigM actEnv ()
forall actEnv a. State (WorkerConfig actEnv) a -> ConfigM actEnv a
ConfigM (State (WorkerConfig actEnv) () -> ConfigM actEnv ())
-> State (WorkerConfig actEnv) () -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ (WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' ((WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ())
-> (WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig actEnv
conf ->
WorkerConfig actEnv
conf
{ tracerProvider = tp
}
setPayloadProcessor :: PayloadProcessor -> ConfigM actEnv ()
setPayloadProcessor :: forall actEnv. PayloadProcessor -> ConfigM actEnv ()
setPayloadProcessor PayloadProcessor
p = State (WorkerConfig actEnv) () -> ConfigM actEnv ()
forall actEnv a. State (WorkerConfig actEnv) a -> ConfigM actEnv a
ConfigM (State (WorkerConfig actEnv) () -> ConfigM actEnv ())
-> State (WorkerConfig actEnv) () -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ (WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' ((WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ())
-> (WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig actEnv
conf ->
WorkerConfig actEnv
conf
{ payloadProcessor = p
}
data Worker env = forall ty.
Core.KnownWorkerType ty =>
Worker
{ ()
workerType :: !(Core.SWorkerType ty)
, forall env. Worker env -> Async ()
workerWorkflowLoop :: !(Async ())
, ()
workerActivityLoop :: !(InactiveForReplay ty (Async ()))
, ()
workerActivityWorker :: !(InactiveForReplay ty (Activity.ActivityWorker env))
, ()
workerCore :: !(Core.Worker ty)
, forall env. Worker env -> Tracer
workerTracer :: !Tracer
, forall env. Worker env -> TChan EvictionWithRunID
workerEvictionEmitter :: !(TChan Workflow.EvictionWithRunID)
}
startReplayWorker :: (MonadUnliftIO m, MonadCatch m) => Runtime -> WorkerConfig actEnv -> m (Temporal.Worker.Worker actEnv, Core.HistoryPusher)
startReplayWorker :: forall (m :: * -> *) actEnv.
(MonadUnliftIO m, MonadCatch m) =>
Runtime -> WorkerConfig actEnv -> m (Worker actEnv, HistoryPusher)
startReplayWorker Runtime
rt WorkerConfig actEnv
conf = (RequireCallStackImpl => m (Worker actEnv, HistoryPusher))
-> m (Worker actEnv, HistoryPusher)
forall r. (RequireCallStackImpl => r) -> r
provideCallStack ((RequireCallStackImpl => m (Worker actEnv, HistoryPusher))
-> m (Worker actEnv, HistoryPusher))
-> (RequireCallStackImpl => m (Worker actEnv, HistoryPusher))
-> m (Worker actEnv, HistoryPusher)
forall a b. (a -> b) -> a -> b
$ WorkerConfig actEnv
-> WorkerContextM m (Worker actEnv, HistoryPusher)
-> m (Worker actEnv, HistoryPusher)
forall conf (m :: * -> *) a.
WorkerConfig conf -> WorkerContextM m a -> m a
runWorkerContext WorkerConfig actEnv
conf (WorkerContextM m (Worker actEnv, HistoryPusher)
-> m (Worker actEnv, HistoryPusher))
-> WorkerContextM m (Worker actEnv, HistoryPusher)
-> m (Worker actEnv, HistoryPusher)
forall a b. (a -> b) -> a -> b
$ do
$(logDebug) Text
"Starting worker"
let coreConfig' :: WorkerConfig
coreConfig' = WorkerConfig actEnv
conf.coreConfig {Core.nondeterminismAsWorkflowFail = True}
(workerCore, replay) <- (WorkerError -> WorkerContextM m (Worker 'Replay, HistoryPusher))
-> ((Worker 'Replay, HistoryPusher)
-> WorkerContextM m (Worker 'Replay, HistoryPusher))
-> Either WorkerError (Worker 'Replay, HistoryPusher)
-> WorkerContextM m (Worker 'Replay, HistoryPusher)
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either WorkerError -> WorkerContextM m (Worker 'Replay, HistoryPusher)
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (Worker 'Replay, HistoryPusher)
-> WorkerContextM m (Worker 'Replay, HistoryPusher)
forall a. a -> WorkerContextM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either WorkerError (Worker 'Replay, HistoryPusher)
-> WorkerContextM m (Worker 'Replay, HistoryPusher))
-> WorkerContextM
m (Either WorkerError (Worker 'Replay, HistoryPusher))
-> WorkerContextM m (Worker 'Replay, HistoryPusher)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO (Either WorkerError (Worker 'Replay, HistoryPusher))
-> WorkerContextM
m (Either WorkerError (Worker 'Replay, HistoryPusher))
forall a. IO a -> WorkerContextM m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Runtime
-> WorkerConfig
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher))
Core.newReplayWorker Runtime
rt WorkerConfig
coreConfig')
$(logDebug) "Instantiated core"
workerEvictionEmitter <- newBroadcastTChanIO
runningWorkflows <- newTVarIO mempty
uuid <- liftIO nextRandom
let workerWorkflowFunctions = WorkerConfig actEnv
conf.wfDefs
workerTaskQueue = Text -> TaskQueue
TaskQueue (WorkerConfig -> Text
Core.taskQueue WorkerConfig actEnv
conf.coreConfig Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"-" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> UUID -> Text
UUID.toText UUID
uuid)
workerInboundInterceptors = WorkerConfig actEnv
conf.interceptorConfig.workflowInboundInterceptors
workerOutboundInterceptors = WorkerConfig actEnv
conf.interceptorConfig.workflowOutboundInterceptors
workerDeadlockTimeout = WorkerConfig actEnv
conf.deadlockTimeout
workerClient = ()
workerErrorConverters = WorkerConfig actEnv
conf.applicationErrorConverters
processor = WorkerConfig actEnv
conf.payloadProcessor
workflowWorker = Workflow.WorkflowWorker {[ApplicationFailureHandler]
Maybe Int
()
HashMap Text WorkflowDefinition
TVar (HashMap RunId WorkflowInstance)
TChan EvictionWithRunID
Worker 'Replay
InactiveForReplay 'Replay Client
PayloadProcessor
TaskQueue
WorkflowOutboundInterceptor
WorkflowInboundInterceptor
workerCore :: Worker 'Replay
workerEvictionEmitter :: TChan EvictionWithRunID
runningWorkflows :: TVar (HashMap RunId WorkflowInstance)
workerWorkflowFunctions :: HashMap Text WorkflowDefinition
workerTaskQueue :: TaskQueue
workerInboundInterceptors :: WorkflowInboundInterceptor
workerOutboundInterceptors :: WorkflowOutboundInterceptor
workerDeadlockTimeout :: Maybe Int
workerClient :: ()
workerErrorConverters :: [ApplicationFailureHandler]
processor :: PayloadProcessor
workerEvictionEmitter :: TChan EvictionWithRunID
processor :: PayloadProcessor
workerErrorConverters :: [ApplicationFailureHandler]
workerTaskQueue :: TaskQueue
workerDeadlockTimeout :: Maybe Int
workerOutboundInterceptors :: WorkflowOutboundInterceptor
workerInboundInterceptors :: WorkflowInboundInterceptor
workerCore :: Worker 'Replay
workerClient :: InactiveForReplay 'Replay Client
runningWorkflows :: TVar (HashMap RunId WorkflowInstance)
workerWorkflowFunctions :: HashMap Text WorkflowDefinition
..}
workerActivityWorker = ()
workerActivityLoop = ()
workerType = SWorkerType 'Replay
Core.SReplay
workerTracer = TracerProvider -> InstrumentationLibrary -> TracerOptions -> Tracer
makeTracer WorkerConfig actEnv
conf.tracerProvider (Text -> Text -> InstrumentationLibrary
InstrumentationLibrary Text
"hs-temporal-sdk" Text
"0.0.1.0") TracerOptions
tracerOptions
workerWorkflowLoop <- asyncLabelled (T.unpack $ T.concat ["temporal/worker/workflow/", Core.namespace conf.coreConfig, "/", Core.taskQueue conf.coreConfig]) $ do
$(logDebug) "Starting workflow worker loop"
Workflow.execute workflowWorker
`UnliftIO.finally` $(logDebug) "Exiting workflow worker loop"
pure (Temporal.Worker.Worker {..}, replay)
data ReplayHistoryFailure = ReplayHistoryFailure
{ ReplayHistoryFailure -> Text
message :: Text
}
deriving stock (Int -> ReplayHistoryFailure -> ShowS
[ReplayHistoryFailure] -> ShowS
ReplayHistoryFailure -> String
(Int -> ReplayHistoryFailure -> ShowS)
-> (ReplayHistoryFailure -> String)
-> ([ReplayHistoryFailure] -> ShowS)
-> Show ReplayHistoryFailure
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ReplayHistoryFailure -> ShowS
showsPrec :: Int -> ReplayHistoryFailure -> ShowS
$cshow :: ReplayHistoryFailure -> String
show :: ReplayHistoryFailure -> String
$cshowList :: [ReplayHistoryFailure] -> ShowS
showList :: [ReplayHistoryFailure] -> ShowS
Show, ReplayHistoryFailure -> ReplayHistoryFailure -> Bool
(ReplayHistoryFailure -> ReplayHistoryFailure -> Bool)
-> (ReplayHistoryFailure -> ReplayHistoryFailure -> Bool)
-> Eq ReplayHistoryFailure
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ReplayHistoryFailure -> ReplayHistoryFailure -> Bool
== :: ReplayHistoryFailure -> ReplayHistoryFailure -> Bool
$c/= :: ReplayHistoryFailure -> ReplayHistoryFailure -> Bool
/= :: ReplayHistoryFailure -> ReplayHistoryFailure -> Bool
Eq)
deriving anyclass (Show ReplayHistoryFailure
Typeable ReplayHistoryFailure
(Typeable ReplayHistoryFailure, Show ReplayHistoryFailure) =>
(ReplayHistoryFailure -> SomeException)
-> (SomeException -> Maybe ReplayHistoryFailure)
-> (ReplayHistoryFailure -> String)
-> (ReplayHistoryFailure -> Bool)
-> Exception ReplayHistoryFailure
SomeException -> Maybe ReplayHistoryFailure
ReplayHistoryFailure -> Bool
ReplayHistoryFailure -> String
ReplayHistoryFailure -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e)
-> (e -> String)
-> (e -> Bool)
-> Exception e
$ctoException :: ReplayHistoryFailure -> SomeException
toException :: ReplayHistoryFailure -> SomeException
$cfromException :: SomeException -> Maybe ReplayHistoryFailure
fromException :: SomeException -> Maybe ReplayHistoryFailure
$cdisplayException :: ReplayHistoryFailure -> String
displayException :: ReplayHistoryFailure -> String
$cbacktraceDesired :: ReplayHistoryFailure -> Bool
backtraceDesired :: ReplayHistoryFailure -> Bool
Exception)
runReplayHistory :: (MonadUnliftIO m, MonadCatch m) => Runtime -> WorkerConfig actEnv -> History -> m (Either ReplayHistoryFailure ())
runReplayHistory :: forall (m :: * -> *) actEnv.
(MonadUnliftIO m, MonadCatch m) =>
Runtime
-> WorkerConfig actEnv
-> History
-> m (Either ReplayHistoryFailure ())
runReplayHistory Runtime
rt WorkerConfig actEnv
conf History
history = WorkerConfig actEnv
-> WorkerContextM m (Either ReplayHistoryFailure ())
-> m (Either ReplayHistoryFailure ())
forall conf (m :: * -> *) a.
WorkerConfig conf -> WorkerContextM m a -> m a
runWorkerContext WorkerConfig actEnv
conf (WorkerContextM m (Either ReplayHistoryFailure ())
-> m (Either ReplayHistoryFailure ()))
-> WorkerContextM m (Either ReplayHistoryFailure ())
-> m (Either ReplayHistoryFailure ())
forall a b. (a -> b) -> a -> b
$ WorkerContextM m (Worker actEnv, HistoryPusher)
-> ((Worker actEnv, HistoryPusher) -> WorkerContextM m ())
-> ((Worker actEnv, HistoryPusher)
-> WorkerContextM m (Either ReplayHistoryFailure ()))
-> WorkerContextM m (Either ReplayHistoryFailure ())
forall (m :: * -> *) a b c.
MonadUnliftIO m =>
m a -> (a -> m b) -> (a -> m c) -> m c
UnliftIO.bracket (Runtime
-> WorkerConfig actEnv
-> WorkerContextM m (Worker actEnv, HistoryPusher)
forall (m :: * -> *) actEnv.
(MonadUnliftIO m, MonadCatch m) =>
Runtime -> WorkerConfig actEnv -> m (Worker actEnv, HistoryPusher)
startReplayWorker Runtime
rt WorkerConfig actEnv
conf) (\(Worker actEnv
worker, HistoryPusher
pusher) -> IO () -> WorkerContextM m ()
forall a. IO a -> WorkerContextM m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (HistoryPusher -> IO ()
Core.closeHistory HistoryPusher
pusher) WorkerContextM m () -> WorkerContextM m () -> WorkerContextM m ()
forall a b.
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Worker actEnv -> WorkerContextM m ()
forall (m :: * -> *) actEnv.
MonadUnliftIO m =>
Worker actEnv -> m ()
shutdown Worker actEnv
worker) (((Worker actEnv, HistoryPusher)
-> WorkerContextM m (Either ReplayHistoryFailure ()))
-> WorkerContextM m (Either ReplayHistoryFailure ()))
-> ((Worker actEnv, HistoryPusher)
-> WorkerContextM m (Either ReplayHistoryFailure ()))
-> WorkerContextM m (Either ReplayHistoryFailure ())
forall a b. (a -> b) -> a -> b
$ \(Worker actEnv
worker, HistoryPusher
pusher) -> do
evictions <- Worker actEnv -> WorkerContextM m (TChan EvictionWithRunID)
forall (m :: * -> *) actEnv.
MonadIO m =>
Worker actEnv -> m (TChan EvictionWithRunID)
subscribeToEvictions Worker actEnv
worker
let mWfId = History
history History -> Fold History History ByteString Any -> Maybe ByteString
forall s t a b. s -> Fold s t a b -> Maybe a
^? LensLike' f History (Vector HistoryEvent)
forall (f :: * -> *) s a.
(Functor f, HasField s "vec'events" a) =>
LensLike' f s a
vec'events LensLike' f History (Vector HistoryEvent)
-> ((ByteString -> f Any)
-> Vector HistoryEvent -> f (Vector HistoryEvent))
-> (ByteString -> f Any)
-> History
-> f History
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (HistoryEvent -> f HistoryEvent)
-> Vector HistoryEvent -> f (Vector HistoryEvent)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Vector a -> f (Vector b)
traverse ((HistoryEvent -> f HistoryEvent)
-> Vector HistoryEvent -> f (Vector HistoryEvent))
-> ((ByteString -> f Any) -> HistoryEvent -> f HistoryEvent)
-> (ByteString -> f Any)
-> Vector HistoryEvent
-> f (Vector HistoryEvent)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LensLike'
f HistoryEvent (Maybe WorkflowExecutionStartedEventAttributes)
forall (f :: * -> *) s a.
(Functor f,
HasField s "maybe'workflowExecutionStartedEventAttributes" a) =>
LensLike' f s a
maybe'workflowExecutionStartedEventAttributes LensLike'
f HistoryEvent (Maybe WorkflowExecutionStartedEventAttributes)
-> ((ByteString -> f Any)
-> Maybe WorkflowExecutionStartedEventAttributes
-> f (Maybe WorkflowExecutionStartedEventAttributes))
-> (ByteString -> f Any)
-> HistoryEvent
-> f HistoryEvent
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (WorkflowExecutionStartedEventAttributes
-> f WorkflowExecutionStartedEventAttributes)
-> Maybe WorkflowExecutionStartedEventAttributes
-> f (Maybe WorkflowExecutionStartedEventAttributes)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Maybe a -> f (Maybe b)
traverse ((WorkflowExecutionStartedEventAttributes
-> f WorkflowExecutionStartedEventAttributes)
-> Maybe WorkflowExecutionStartedEventAttributes
-> f (Maybe WorkflowExecutionStartedEventAttributes))
-> ((ByteString -> f Any)
-> WorkflowExecutionStartedEventAttributes
-> f WorkflowExecutionStartedEventAttributes)
-> (ByteString -> f Any)
-> Maybe WorkflowExecutionStartedEventAttributes
-> f (Maybe WorkflowExecutionStartedEventAttributes)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LensLike' f WorkflowExecutionStartedEventAttributes Text
forall (f :: * -> *) s a.
(Functor f, HasField s "workflowId" a) =>
LensLike' f s a
workflowId LensLike' f WorkflowExecutionStartedEventAttributes Text
-> ((ByteString -> f Any) -> Text -> f Text)
-> (ByteString -> f Any)
-> WorkflowExecutionStartedEventAttributes
-> f WorkflowExecutionStartedEventAttributes
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Text -> ByteString) -> Getter Text Text ByteString Any
forall s a t b. (s -> a) -> Getter s t a b
to Text -> ByteString
T.encodeUtf8
wfId <- maybe (throwIO $ userError "No workflow ID found in history") pure mWfId
$(logDebug) $ "Pushing history for workflow ID " <> T.pack (show wfId)
$(logDebug) $ T.pack $ show history
res <- liftIO $ Core.pushHistory pusher wfId (Left $ encodeMessage history)
$(logDebug) $ "Pushed history for workflow ID " <> T.pack (show wfId)
case res of
Left WorkerError
e -> Either ReplayHistoryFailure ()
-> WorkerContextM m (Either ReplayHistoryFailure ())
forall a. a -> WorkerContextM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either ReplayHistoryFailure ()
-> WorkerContextM m (Either ReplayHistoryFailure ()))
-> Either ReplayHistoryFailure ()
-> WorkerContextM m (Either ReplayHistoryFailure ())
forall a b. (a -> b) -> a -> b
$ ReplayHistoryFailure -> Either ReplayHistoryFailure ()
forall a b. a -> Either a b
Left (ReplayHistoryFailure -> Either ReplayHistoryFailure ())
-> ReplayHistoryFailure -> Either ReplayHistoryFailure ()
forall a b. (a -> b) -> a -> b
$ ReplayHistoryFailure {message :: Text
message = WorkerError
e.message}
Right () -> do
res <- STM EvictionWithRunID -> WorkerContextM m EvictionWithRunID
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM EvictionWithRunID -> WorkerContextM m EvictionWithRunID)
-> STM EvictionWithRunID -> WorkerContextM m EvictionWithRunID
forall a b. (a -> b) -> a -> b
$ TChan EvictionWithRunID -> STM EvictionWithRunID
forall a. TChan a -> STM a
readTChan TChan EvictionWithRunID
evictions
if evictionWasFatal res
then pure $ Left $ ReplayHistoryFailure {message = evictionMessage res}
else pure $ Right ()
traced :: WorkerConfig env -> ReaderT Tracer m a -> m a
traced :: forall env (m :: * -> *) a.
WorkerConfig env -> ReaderT Tracer m a -> m a
traced WorkerConfig env
conf ReaderT Tracer m a
m =
ReaderT Tracer m a -> Tracer -> m a
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT ReaderT Tracer m a
m (Tracer -> m a) -> Tracer -> m a
forall a b. (a -> b) -> a -> b
$
TracerProvider -> InstrumentationLibrary -> TracerOptions -> Tracer
makeTracer
WorkerConfig env
conf.tracerProvider
(Text -> Text -> InstrumentationLibrary
InstrumentationLibrary Text
"hs-temporal-sdk" Text
"0.0.1.0")
TracerOptions
tracerOptions
runWorkerContext :: WorkerConfig conf -> WorkerContextM m a -> m a
runWorkerContext :: forall conf (m :: * -> *) a.
WorkerConfig conf -> WorkerContextM m a -> m a
runWorkerContext WorkerConfig conf
conf (WorkerContextM ReaderT Tracer (LoggingT m) a
m) = (LoggingT m a
-> (Loc -> Text -> LogLevel -> LogStr -> IO ()) -> m a)
-> (Loc -> Text -> LogLevel -> LogStr -> IO ())
-> LoggingT m a
-> m a
forall a b c. (a -> b -> c) -> b -> a -> c
flip LoggingT m a -> (Loc -> Text -> LogLevel -> LogStr -> IO ()) -> m a
forall (m :: * -> *) a.
LoggingT m a -> (Loc -> Text -> LogLevel -> LogStr -> IO ()) -> m a
runLoggingT WorkerConfig conf
conf.logger (LoggingT m a -> m a) -> LoggingT m a -> m a
forall a b. (a -> b) -> a -> b
$ WorkerConfig conf -> ReaderT Tracer (LoggingT m) a -> LoggingT m a
forall env (m :: * -> *) a.
WorkerConfig env -> ReaderT Tracer m a -> m a
traced WorkerConfig conf
conf ReaderT Tracer (LoggingT m) a
m
newtype WorkerContextM m a = WorkerContextM (ReaderT Tracer (LoggingT m) a)
deriving newtype ((forall a b. (a -> b) -> WorkerContextM m a -> WorkerContextM m b)
-> (forall a b. a -> WorkerContextM m b -> WorkerContextM m a)
-> Functor (WorkerContextM m)
forall a b. a -> WorkerContextM m b -> WorkerContextM m a
forall a b. (a -> b) -> WorkerContextM m a -> WorkerContextM m b
forall (m :: * -> *) a b.
Functor m =>
a -> WorkerContextM m b -> WorkerContextM m a
forall (m :: * -> *) a b.
Functor m =>
(a -> b) -> WorkerContextM m a -> WorkerContextM m b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
$cfmap :: forall (m :: * -> *) a b.
Functor m =>
(a -> b) -> WorkerContextM m a -> WorkerContextM m b
fmap :: forall a b. (a -> b) -> WorkerContextM m a -> WorkerContextM m b
$c<$ :: forall (m :: * -> *) a b.
Functor m =>
a -> WorkerContextM m b -> WorkerContextM m a
<$ :: forall a b. a -> WorkerContextM m b -> WorkerContextM m a
Functor, Functor (WorkerContextM m)
Functor (WorkerContextM m) =>
(forall a. a -> WorkerContextM m a)
-> (forall a b.
WorkerContextM m (a -> b)
-> WorkerContextM m a -> WorkerContextM m b)
-> (forall a b c.
(a -> b -> c)
-> WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m c)
-> (forall a b.
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m b)
-> (forall a b.
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m a)
-> Applicative (WorkerContextM m)
forall a. a -> WorkerContextM m a
forall a b.
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m a
forall a b.
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m b
forall a b.
WorkerContextM m (a -> b)
-> WorkerContextM m a -> WorkerContextM m b
forall a b c.
(a -> b -> c)
-> WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m c
forall (f :: * -> *).
Functor f =>
(forall a. a -> f a)
-> (forall a b. f (a -> b) -> f a -> f b)
-> (forall a b c. (a -> b -> c) -> f a -> f b -> f c)
-> (forall a b. f a -> f b -> f b)
-> (forall a b. f a -> f b -> f a)
-> Applicative f
forall (m :: * -> *). Applicative m => Functor (WorkerContextM m)
forall (m :: * -> *) a. Applicative m => a -> WorkerContextM m a
forall (m :: * -> *) a b.
Applicative m =>
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m a
forall (m :: * -> *) a b.
Applicative m =>
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m b
forall (m :: * -> *) a b.
Applicative m =>
WorkerContextM m (a -> b)
-> WorkerContextM m a -> WorkerContextM m b
forall (m :: * -> *) a b c.
Applicative m =>
(a -> b -> c)
-> WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m c
$cpure :: forall (m :: * -> *) a. Applicative m => a -> WorkerContextM m a
pure :: forall a. a -> WorkerContextM m a
$c<*> :: forall (m :: * -> *) a b.
Applicative m =>
WorkerContextM m (a -> b)
-> WorkerContextM m a -> WorkerContextM m b
<*> :: forall a b.
WorkerContextM m (a -> b)
-> WorkerContextM m a -> WorkerContextM m b
$cliftA2 :: forall (m :: * -> *) a b c.
Applicative m =>
(a -> b -> c)
-> WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m c
liftA2 :: forall a b c.
(a -> b -> c)
-> WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m c
$c*> :: forall (m :: * -> *) a b.
Applicative m =>
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m b
*> :: forall a b.
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m b
$c<* :: forall (m :: * -> *) a b.
Applicative m =>
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m a
<* :: forall a b.
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m a
Applicative, Applicative (WorkerContextM m)
Applicative (WorkerContextM m) =>
(forall a b.
WorkerContextM m a
-> (a -> WorkerContextM m b) -> WorkerContextM m b)
-> (forall a b.
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m b)
-> (forall a. a -> WorkerContextM m a)
-> Monad (WorkerContextM m)
forall a. a -> WorkerContextM m a
forall a b.
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m b
forall a b.
WorkerContextM m a
-> (a -> WorkerContextM m b) -> WorkerContextM m b
forall (m :: * -> *). Monad m => Applicative (WorkerContextM m)
forall (m :: * -> *) a. Monad m => a -> WorkerContextM m a
forall (m :: * -> *) a b.
Monad m =>
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m b
forall (m :: * -> *) a b.
Monad m =>
WorkerContextM m a
-> (a -> WorkerContextM m b) -> WorkerContextM m b
forall (m :: * -> *).
Applicative m =>
(forall a b. m a -> (a -> m b) -> m b)
-> (forall a b. m a -> m b -> m b)
-> (forall a. a -> m a)
-> Monad m
$c>>= :: forall (m :: * -> *) a b.
Monad m =>
WorkerContextM m a
-> (a -> WorkerContextM m b) -> WorkerContextM m b
>>= :: forall a b.
WorkerContextM m a
-> (a -> WorkerContextM m b) -> WorkerContextM m b
$c>> :: forall (m :: * -> *) a b.
Monad m =>
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m b
>> :: forall a b.
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m b
$creturn :: forall (m :: * -> *) a. Monad m => a -> WorkerContextM m a
return :: forall a. a -> WorkerContextM m a
Monad, Monad (WorkerContextM m)
Monad (WorkerContextM m) =>
(forall a. IO a -> WorkerContextM m a)
-> MonadIO (WorkerContextM m)
forall a. IO a -> WorkerContextM m a
forall (m :: * -> *).
Monad m =>
(forall a. IO a -> m a) -> MonadIO m
forall (m :: * -> *). MonadIO m => Monad (WorkerContextM m)
forall (m :: * -> *) a. MonadIO m => IO a -> WorkerContextM m a
$cliftIO :: forall (m :: * -> *) a. MonadIO m => IO a -> WorkerContextM m a
liftIO :: forall a. IO a -> WorkerContextM m a
MonadIO, MonadIO (WorkerContextM m)
MonadIO (WorkerContextM m) =>
(forall b.
((forall a. WorkerContextM m a -> IO a) -> IO b)
-> WorkerContextM m b)
-> MonadUnliftIO (WorkerContextM m)
forall b.
((forall a. WorkerContextM m a -> IO a) -> IO b)
-> WorkerContextM m b
forall (m :: * -> *).
MonadIO m =>
(forall b. ((forall a. m a -> IO a) -> IO b) -> m b)
-> MonadUnliftIO m
forall (m :: * -> *). MonadUnliftIO m => MonadIO (WorkerContextM m)
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. WorkerContextM m a -> IO a) -> IO b)
-> WorkerContextM m b
$cwithRunInIO :: forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. WorkerContextM m a -> IO a) -> IO b)
-> WorkerContextM m b
withRunInIO :: forall b.
((forall a. WorkerContextM m a -> IO a) -> IO b)
-> WorkerContextM m b
MonadUnliftIO, Monad (WorkerContextM m)
Monad (WorkerContextM m) =>
(forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> WorkerContextM m ())
-> MonadLogger (WorkerContextM m)
forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> WorkerContextM m ()
forall (m :: * -> *).
Monad m =>
(forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> m ())
-> MonadLogger m
forall (m :: * -> *). MonadIO m => Monad (WorkerContextM m)
forall (m :: * -> *) msg.
(MonadIO m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> WorkerContextM m ()
$cmonadLoggerLog :: forall (m :: * -> *) msg.
(MonadIO m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> WorkerContextM m ()
monadLoggerLog :: forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> WorkerContextM m ()
MonadLogger, MonadIO (WorkerContextM m)
MonadLogger (WorkerContextM m)
WorkerContextM m (Loc -> Text -> LogLevel -> LogStr -> IO ())
(MonadLogger (WorkerContextM m), MonadIO (WorkerContextM m)) =>
WorkerContextM m (Loc -> Text -> LogLevel -> LogStr -> IO ())
-> MonadLoggerIO (WorkerContextM m)
forall (m :: * -> *). MonadIO m => MonadIO (WorkerContextM m)
forall (m :: * -> *). MonadIO m => MonadLogger (WorkerContextM m)
forall (m :: * -> *).
MonadIO m =>
WorkerContextM m (Loc -> Text -> LogLevel -> LogStr -> IO ())
forall (m :: * -> *).
(MonadLogger m, MonadIO m) =>
m (Loc -> Text -> LogLevel -> LogStr -> IO ()) -> MonadLoggerIO m
$caskLoggerIO :: forall (m :: * -> *).
MonadIO m =>
WorkerContextM m (Loc -> Text -> LogLevel -> LogStr -> IO ())
askLoggerIO :: WorkerContextM m (Loc -> Text -> LogLevel -> LogStr -> IO ())
MonadLoggerIO, MonadThrow (WorkerContextM m)
MonadThrow (WorkerContextM m) =>
(forall e a.
(HasCallStack, Exception e) =>
WorkerContextM m a
-> (e -> WorkerContextM m a) -> WorkerContextM m a)
-> MonadCatch (WorkerContextM m)
forall e a.
(HasCallStack, Exception e) =>
WorkerContextM m a
-> (e -> WorkerContextM m a) -> WorkerContextM m a
forall (m :: * -> *).
MonadThrow m =>
(forall e a.
(HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a)
-> MonadCatch m
forall (m :: * -> *). MonadCatch m => MonadThrow (WorkerContextM m)
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
WorkerContextM m a
-> (e -> WorkerContextM m a) -> WorkerContextM m a
$ccatch :: forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
WorkerContextM m a
-> (e -> WorkerContextM m a) -> WorkerContextM m a
catch :: forall e a.
(HasCallStack, Exception e) =>
WorkerContextM m a
-> (e -> WorkerContextM m a) -> WorkerContextM m a
MonadCatch, Monad (WorkerContextM m)
Monad (WorkerContextM m) =>
(forall e a.
(HasCallStack, Exception e) =>
e -> WorkerContextM m a)
-> MonadThrow (WorkerContextM m)
forall e a. (HasCallStack, Exception e) => e -> WorkerContextM m a
forall (m :: * -> *).
Monad m =>
(forall e a. (HasCallStack, Exception e) => e -> m a)
-> MonadThrow m
forall (m :: * -> *). MonadThrow m => Monad (WorkerContextM m)
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> WorkerContextM m a
$cthrowM :: forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> WorkerContextM m a
throwM :: forall e a. (HasCallStack, Exception e) => e -> WorkerContextM m a
MonadThrow)
instance Monad m => MonadTracer (WorkerContextM m) where
getTracer :: WorkerContextM m Tracer
getTracer = ReaderT Tracer (LoggingT m) Tracer -> WorkerContextM m Tracer
forall (m :: * -> *) a.
ReaderT Tracer (LoggingT m) a -> WorkerContextM m a
WorkerContextM ReaderT Tracer (LoggingT m) Tracer
forall r (m :: * -> *). MonadReader r m => m r
ask
startWorker :: (MonadUnliftIO m, MonadCatch m) => Client -> WorkerConfig actEnv -> m (Temporal.Worker.Worker actEnv)
startWorker :: forall (m :: * -> *) actEnv.
(MonadUnliftIO m, MonadCatch m) =>
Client -> WorkerConfig actEnv -> m (Worker actEnv)
startWorker Client
client WorkerConfig actEnv
conf = (RequireCallStackImpl => m (Worker actEnv)) -> m (Worker actEnv)
forall r. (RequireCallStackImpl => r) -> r
provideCallStack ((RequireCallStackImpl => m (Worker actEnv)) -> m (Worker actEnv))
-> (RequireCallStackImpl => m (Worker actEnv)) -> m (Worker actEnv)
forall a b. (a -> b) -> a -> b
$ WorkerConfig actEnv
-> WorkerContextM m (Worker actEnv) -> m (Worker actEnv)
forall conf (m :: * -> *) a.
WorkerConfig conf -> WorkerContextM m a -> m a
runWorkerContext WorkerConfig actEnv
conf (WorkerContextM m (Worker actEnv) -> m (Worker actEnv))
-> WorkerContextM m (Worker actEnv) -> m (Worker actEnv)
forall a b. (a -> b) -> a -> b
$ Text
-> SpanArguments
-> WorkerContextM m (Worker actEnv)
-> WorkerContextM m (Worker actEnv)
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadTracer m, HasCallStack) =>
Text -> SpanArguments -> m a -> m a
inSpan Text
"startWorker" SpanArguments
defaultSpanArguments (WorkerContextM m (Worker actEnv)
-> WorkerContextM m (Worker actEnv))
-> WorkerContextM m (Worker actEnv)
-> WorkerContextM m (Worker actEnv)
forall a b. (a -> b) -> a -> b
$ do
workerCore <-
Text
-> SpanArguments
-> WorkerContextM m (Worker 'Real)
-> WorkerContextM m (Worker 'Real)
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadTracer m, HasCallStack) =>
Text -> SpanArguments -> m a -> m a
inSpan
Text
"Core.newWorker"
SpanArguments
defaultSpanArguments
((WorkerError -> WorkerContextM m (Worker 'Real))
-> (Worker 'Real -> WorkerContextM m (Worker 'Real))
-> Either WorkerError (Worker 'Real)
-> WorkerContextM m (Worker 'Real)
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either WorkerError -> WorkerContextM m (Worker 'Real)
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO Worker 'Real -> WorkerContextM m (Worker 'Real)
forall a. a -> WorkerContextM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either WorkerError (Worker 'Real)
-> WorkerContextM m (Worker 'Real))
-> WorkerContextM m (Either WorkerError (Worker 'Real))
-> WorkerContextM m (Worker 'Real)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO (Either WorkerError (Worker 'Real))
-> WorkerContextM m (Either WorkerError (Worker 'Real))
forall a. IO a -> WorkerContextM m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Client -> WorkerConfig -> IO (Either WorkerError (Worker 'Real))
Core.newWorker Client
client WorkerConfig actEnv
conf.coreConfig))
validationRes <- liftIO $ Core.validateWorker workerCore
workerEvictionEmitter <- newBroadcastTChanIO
case validationRes of
Left WorkerValidationError
err -> WorkerValidationError -> WorkerContextM m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO WorkerValidationError
err
Right () -> () -> WorkerContextM m ()
forall a. a -> WorkerContextM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
runningWorkflows <- newTVarIO mempty
runningActivities <- newTVarIO mempty
activityEnv <- newIORef conf.actEnv
let errorConverters = [ApplicationFailureHandler] -> [ApplicationFailureHandler]
mkAnnotatedHandlers WorkerConfig actEnv
conf.applicationErrorConverters
workerWorkflowFunctions = WorkerConfig actEnv
conf.wfDefs
workerTaskQueue = Text -> TaskQueue
TaskQueue (Text -> TaskQueue) -> Text -> TaskQueue
forall a b. (a -> b) -> a -> b
$ WorkerConfig -> Text
Core.taskQueue WorkerConfig actEnv
conf.coreConfig
workerInboundInterceptors = WorkerConfig actEnv
conf.interceptorConfig.workflowInboundInterceptors
workerOutboundInterceptors = WorkerConfig actEnv
conf.interceptorConfig.workflowOutboundInterceptors
workerDeadlockTimeout = WorkerConfig actEnv
conf.deadlockTimeout
workerErrorConverters = [ApplicationFailureHandler]
errorConverters
processor = WorkerConfig actEnv
conf.payloadProcessor
workflowWorker = Workflow.WorkflowWorker {[ApplicationFailureHandler]
Maybe Int
HashMap Text WorkflowDefinition
TVar (HashMap RunId WorkflowInstance)
TChan EvictionWithRunID
Client
Worker 'Real
InactiveForReplay 'Real Client
PayloadProcessor
TaskQueue
WorkflowOutboundInterceptor
WorkflowInboundInterceptor
workerEvictionEmitter :: TChan EvictionWithRunID
processor :: PayloadProcessor
workerErrorConverters :: [ApplicationFailureHandler]
workerTaskQueue :: TaskQueue
workerDeadlockTimeout :: Maybe Int
workerOutboundInterceptors :: WorkflowOutboundInterceptor
workerInboundInterceptors :: WorkflowInboundInterceptor
workerCore :: Worker 'Real
workerClient :: InactiveForReplay 'Real Client
runningWorkflows :: TVar (HashMap RunId WorkflowInstance)
workerWorkflowFunctions :: HashMap Text WorkflowDefinition
workerCore :: Worker 'Real
workerEvictionEmitter :: TChan EvictionWithRunID
runningWorkflows :: TVar (HashMap RunId WorkflowInstance)
workerWorkflowFunctions :: HashMap Text WorkflowDefinition
workerTaskQueue :: TaskQueue
workerInboundInterceptors :: WorkflowInboundInterceptor
workerOutboundInterceptors :: WorkflowOutboundInterceptor
workerDeadlockTimeout :: Maybe Int
workerErrorConverters :: [ApplicationFailureHandler]
processor :: PayloadProcessor
workerClient :: Client
..}
definitions = WorkerConfig actEnv
conf.actDefs
activityInboundInterceptors = WorkerConfig actEnv
conf.interceptorConfig.activityInboundInterceptors
activityOutboundInterceptors = WorkerConfig actEnv
conf.interceptorConfig.activityOutboundInterceptors
clientInterceptors = WorkerConfig actEnv
conf.interceptorConfig.clientInterceptors
activityErrorConverters = [ApplicationFailureHandler]
errorConverters
payloadProcessor = WorkerConfig actEnv
conf.payloadProcessor
workerActivityWorker = Activity.ActivityWorker {[ApplicationFailureHandler]
HashMap Text (ActivityDefinition actEnv)
TVar (HashMap TaskToken (Async ()))
IORef actEnv
Worker 'Real
PayloadProcessor
ClientInterceptors
ActivityOutboundInterceptor actEnv
ActivityInboundInterceptor actEnv
workerCore :: Worker 'Real
runningActivities :: TVar (HashMap TaskToken (Async ()))
activityEnv :: IORef actEnv
definitions :: HashMap Text (ActivityDefinition actEnv)
activityInboundInterceptors :: ActivityInboundInterceptor actEnv
activityOutboundInterceptors :: ActivityOutboundInterceptor actEnv
clientInterceptors :: ClientInterceptors
activityErrorConverters :: [ApplicationFailureHandler]
payloadProcessor :: PayloadProcessor
payloadProcessor :: PayloadProcessor
activityErrorConverters :: [ApplicationFailureHandler]
clientInterceptors :: ClientInterceptors
activityOutboundInterceptors :: ActivityOutboundInterceptor actEnv
activityInboundInterceptors :: ActivityInboundInterceptor actEnv
workerCore :: Worker 'Real
runningActivities :: TVar (HashMap TaskToken (Async ()))
definitions :: HashMap Text (ActivityDefinition actEnv)
activityEnv :: IORef actEnv
..}
workerClient = Client
client
workerTracer = TracerProvider -> InstrumentationLibrary -> TracerOptions -> Tracer
makeTracer WorkerConfig actEnv
conf.tracerProvider (Text -> Text -> InstrumentationLibrary
InstrumentationLibrary Text
"hs-temporal-sdk" Text
"0.0.1.0") TracerOptions
tracerOptions
let workerType = SWorkerType 'Real
Core.SReal
workerWorkflowLoop <- asyncLabelled (T.unpack $ T.concat ["temporal/worker/workflow/", Core.namespace conf.coreConfig, "/", Core.taskQueue conf.coreConfig]) $ do
$(logDebug) "Starting workflow worker loop"
Workflow.execute workflowWorker
`UnliftIO.finally` $(logDebug) "Exiting workflow worker loop"
workerActivityLoop <- asyncLabelled (T.unpack $ T.concat ["temporal/worker/activity/", Core.namespace conf.coreConfig, "/", Core.taskQueue conf.coreConfig]) $ do
$(logDebug) "Starting activity worker loop"
Activity.execute workerActivityWorker
`UnliftIO.finally` $(logDebug) "Exiting activity worker loop"
pure Temporal.Worker.Worker {..}
waitWorker :: (MonadIO m) => Temporal.Worker.Worker actEnv -> m ()
waitWorker :: forall (m :: * -> *) actEnv. MonadIO m => Worker actEnv -> m ()
waitWorker (Temporal.Worker.Worker {SWorkerType ty
workerType :: ()
workerType :: SWorkerType ty
workerType, Async ()
workerWorkflowLoop :: forall env. Worker env -> Async ()
workerWorkflowLoop :: Async ()
workerWorkflowLoop, InactiveForReplay ty (Async ())
workerActivityLoop :: ()
workerActivityLoop :: InactiveForReplay ty (Async ())
workerActivityLoop}) = do
case SWorkerType ty
workerType of
SWorkerType ty
Core.SReal -> do
wfRes <- Async () -> m (Either SomeException ())
forall (m :: * -> *) a.
MonadIO m =>
Async a -> m (Either SomeException a)
waitCatch Async ()
workerWorkflowLoop
actRes <- waitCatch workerActivityLoop
for_ (lefts [wfRes, actRes]) throwIO
SWorkerType ty
Core.SReplay -> do
_ <- Async () -> m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m a
wait Async ()
workerWorkflowLoop
pure ()
linkWorker :: Temporal.Worker.Worker actEnv -> IO ()
linkWorker :: forall actEnv. Worker actEnv -> IO ()
linkWorker Temporal.Worker.Worker {Worker ty
workerCore :: ()
workerCore :: Worker ty
workerCore, SWorkerType ty
workerType :: ()
workerType :: SWorkerType ty
workerType, Async ()
workerWorkflowLoop :: forall env. Worker env -> Async ()
workerWorkflowLoop :: Async ()
workerWorkflowLoop, InactiveForReplay ty (Async ())
workerActivityLoop :: ()
workerActivityLoop :: InactiveForReplay ty (Async ())
workerActivityLoop} = do
tid <- IO ThreadId
myThreadId
void $ forkIOLabelled (T.unpack $ T.concat ["temporal/worker/link/", Core.namespace c, "/", Core.taskQueue c]) $ do
let errorClause :: SomeException -> IO ()
errorClause SomeException
e = do
Worker ty -> IO ()
forall (ty :: WorkerType). KnownWorkerType ty => Worker ty -> IO ()
Core.initiateShutdown Worker ty
workerCore
ThreadId -> SomeException -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
Control.Concurrent.throwTo ThreadId
tid SomeException
e
case workerType of
SWorkerType ty
Core.SReal -> do
eeRes <- IO (Either SomeException ())
-> IO (Either SomeException ())
-> IO (Either (Either SomeException ()) (Either SomeException ()))
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> m b -> m (Either a b)
race (Async () -> IO (Either SomeException ())
forall (m :: * -> *) a.
MonadIO m =>
Async a -> m (Either SomeException a)
waitCatch Async ()
workerWorkflowLoop) (Async () -> IO (Either SomeException ())
forall (m :: * -> *) a.
MonadIO m =>
Async a -> m (Either SomeException a)
waitCatch Async ()
InactiveForReplay ty (Async ())
workerActivityLoop)
case eeRes of
Left (Left SomeException
e) -> SomeException -> IO ()
errorClause SomeException
e
Right (Left SomeException
e) -> SomeException -> IO ()
errorClause SomeException
e
Either (Either SomeException ()) (Either SomeException ())
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
SWorkerType ty
Core.SReplay -> do
eRes <- Async () -> IO (Either SomeException ())
forall (m :: * -> *) a.
MonadIO m =>
Async a -> m (Either SomeException a)
waitCatch Async ()
workerWorkflowLoop
case eRes of
Left SomeException
e -> SomeException -> IO ()
errorClause SomeException
e
Either SomeException ()
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
where
c :: WorkerConfig
c = Worker ty -> WorkerConfig
forall (ty :: WorkerType). Worker ty -> WorkerConfig
Core.getWorkerConfig Worker ty
workerCore
pollWorkerSTM :: Temporal.Worker.Worker actEnv -> STM (Maybe (Either SomeException ()))
pollWorkerSTM :: forall actEnv.
Worker actEnv -> STM (Maybe (Either SomeException ()))
pollWorkerSTM Temporal.Worker.Worker {SWorkerType ty
workerType :: ()
workerType :: SWorkerType ty
workerType, Async ()
workerWorkflowLoop :: forall env. Worker env -> Async ()
workerWorkflowLoop :: Async ()
workerWorkflowLoop, InactiveForReplay ty (Async ())
workerActivityLoop :: ()
workerActivityLoop :: InactiveForReplay ty (Async ())
workerActivityLoop} = do
case SWorkerType ty
workerType of
SWorkerType ty
Core.SReal -> do
wfRes <- Async () -> STM (Maybe (Either SomeException ()))
forall a. Async a -> STM (Maybe (Either SomeException a))
pollSTM Async ()
workerWorkflowLoop
actRes <- pollSTM workerActivityLoop
pure $ case (wfRes, actRes) of
(Just (Left SomeException
e), Maybe (Either SomeException ())
_) -> Either SomeException () -> Maybe (Either SomeException ())
forall a. a -> Maybe a
Just (SomeException -> Either SomeException ()
forall a b. a -> Either a b
Left SomeException
e)
(Maybe (Either SomeException ())
_, Just (Left SomeException
e)) -> Either SomeException () -> Maybe (Either SomeException ())
forall a. a -> Maybe a
Just (SomeException -> Either SomeException ()
forall a b. a -> Either a b
Left SomeException
e)
(Just (Right ()), Just (Right ())) -> Either SomeException () -> Maybe (Either SomeException ())
forall a. a -> Maybe a
Just (() -> Either SomeException ()
forall a b. b -> Either a b
Right ())
(Maybe (Either SomeException ()), Maybe (Either SomeException ()))
_ -> Maybe (Either SomeException ())
forall a. Maybe a
Nothing
SWorkerType ty
Core.SReplay -> Async () -> STM (Maybe (Either SomeException ()))
forall a. Async a -> STM (Maybe (Either SomeException a))
pollSTM Async ()
workerWorkflowLoop
waitWorkerSTM :: Temporal.Worker.Worker actEnv -> STM ()
waitWorkerSTM :: forall actEnv. Worker actEnv -> STM ()
waitWorkerSTM Temporal.Worker.Worker {SWorkerType ty
workerType :: ()
workerType :: SWorkerType ty
workerType, Async ()
workerWorkflowLoop :: forall env. Worker env -> Async ()
workerWorkflowLoop :: Async ()
workerWorkflowLoop, InactiveForReplay ty (Async ())
workerActivityLoop :: ()
workerActivityLoop :: InactiveForReplay ty (Async ())
workerActivityLoop} = do
case SWorkerType ty
workerType of
SWorkerType ty
Core.SReal -> do
Async () -> STM ()
forall a. Async a -> STM a
waitSTM Async ()
workerWorkflowLoop
Async () -> STM ()
forall a. Async a -> STM a
waitSTM Async ()
InactiveForReplay ty (Async ())
workerActivityLoop
SWorkerType ty
Core.SReplay -> Async () -> STM ()
forall a. Async a -> STM a
waitSTM Async ()
workerWorkflowLoop
shutdown :: (MonadUnliftIO m) => Temporal.Worker.Worker actEnv -> m ()
shutdown :: forall (m :: * -> *) actEnv.
MonadUnliftIO m =>
Worker actEnv -> m ()
shutdown worker :: Worker actEnv
worker@Temporal.Worker.Worker {Worker ty
workerCore :: ()
workerCore :: Worker ty
workerCore, Tracer
workerTracer :: forall env. Worker env -> Tracer
workerTracer :: Tracer
workerTracer, SWorkerType ty
workerType :: ()
workerType :: SWorkerType ty
workerType, InactiveForReplay ty (ActivityWorker actEnv)
workerActivityWorker :: ()
workerActivityWorker :: InactiveForReplay ty (ActivityWorker actEnv)
workerActivityWorker} = Tracer -> Text -> SpanArguments -> m () -> m ()
forall (m :: * -> *) a.
(MonadUnliftIO m, HasCallStack) =>
Tracer -> Text -> SpanArguments -> m a -> m a
OT.inSpan Tracer
workerTracer Text
"shutdown" SpanArguments
defaultSpanArguments (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ ((forall a. m a -> m a) -> m ()) -> m ()
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> m a) -> m b) -> m b
UnliftIO.mask (((forall a. m a -> m a) -> m ()) -> m ())
-> ((forall a. m a -> m a) -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> m a
restore -> do
Tracer -> Text -> SpanArguments -> m () -> m ()
forall (m :: * -> *) a.
(MonadUnliftIO m, HasCallStack) =>
Tracer -> Text -> SpanArguments -> m a -> m a
OT.inSpan Tracer
workerTracer Text
"initiateShutdown" SpanArguments
defaultSpanArguments (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Worker ty -> IO ()
forall (ty :: WorkerType). KnownWorkerType ty => Worker ty -> IO ()
Core.initiateShutdown Worker ty
workerCore
() <- case SWorkerType ty
workerType of
SWorkerType ty
Core.SReal -> ActivityWorker actEnv -> m ()
forall (m :: * -> *) env. MonadIO m => ActivityWorker env -> m ()
Activity.notifyShutdown InactiveForReplay ty (ActivityWorker actEnv)
ActivityWorker actEnv
workerActivityWorker
SWorkerType ty
Core.SReplay -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
OT.inSpan workerTracer "waitWorker" defaultSpanArguments $ restore $ waitWorker worker
err' <- OT.inSpan workerTracer "finalizeShutdown" defaultSpanArguments $ liftIO $ Core.finalizeShutdown workerCore
case err' of
Left WorkerError
err -> WorkerError -> m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO WorkerError
err
Right () -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
subscribeToEvictions :: MonadIO m => Temporal.Worker.Worker actEnv -> m (TChan Workflow.EvictionWithRunID)
subscribeToEvictions :: forall (m :: * -> *) actEnv.
MonadIO m =>
Worker actEnv -> m (TChan EvictionWithRunID)
subscribeToEvictions Worker actEnv
worker = IO (TChan EvictionWithRunID) -> m (TChan EvictionWithRunID)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TChan EvictionWithRunID) -> m (TChan EvictionWithRunID))
-> IO (TChan EvictionWithRunID) -> m (TChan EvictionWithRunID)
forall a b. (a -> b) -> a -> b
$ STM (TChan EvictionWithRunID) -> IO (TChan EvictionWithRunID)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (TChan EvictionWithRunID) -> IO (TChan EvictionWithRunID))
-> STM (TChan EvictionWithRunID) -> IO (TChan EvictionWithRunID)
forall a b. (a -> b) -> a -> b
$ TChan EvictionWithRunID -> STM (TChan EvictionWithRunID)
forall a. TChan a -> STM (TChan a)
dupTChan Worker actEnv
worker.workerEvictionEmitter
subscribeToEvictionsSTM :: Temporal.Worker.Worker actEnv -> STM (TChan Workflow.EvictionWithRunID)
subscribeToEvictionsSTM :: forall actEnv. Worker actEnv -> STM (TChan EvictionWithRunID)
subscribeToEvictionsSTM Worker actEnv
worker = TChan EvictionWithRunID -> STM (TChan EvictionWithRunID)
forall a. TChan a -> STM (TChan a)
dupTChan Worker actEnv
worker.workerEvictionEmitter
evictionMessage :: Workflow.EvictionWithRunID -> Text
evictionMessage :: EvictionWithRunID -> Text
evictionMessage Workflow.EvictionWithRunID {RemoveFromCache
eviction :: RemoveFromCache
eviction :: EvictionWithRunID -> RemoveFromCache
eviction} = RemoveFromCache
eviction RemoveFromCache
-> FoldLike Text RemoveFromCache RemoveFromCache Text Text -> Text
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike Text RemoveFromCache RemoveFromCache Text Text
forall (f :: * -> *) s a.
(Functor f, HasField s "message" a) =>
LensLike' f s a
Activation.message
evictionWasFatal :: Workflow.EvictionWithRunID -> Bool
evictionWasFatal :: EvictionWithRunID -> Bool
evictionWasFatal Workflow.EvictionWithRunID {RemoveFromCache
eviction :: EvictionWithRunID -> RemoveFromCache
eviction :: RemoveFromCache
eviction} = case RemoveFromCache
eviction RemoveFromCache
-> FoldLike
RemoveFromCache'EvictionReason
RemoveFromCache
RemoveFromCache
RemoveFromCache'EvictionReason
RemoveFromCache'EvictionReason
-> RemoveFromCache'EvictionReason
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
RemoveFromCache'EvictionReason
RemoveFromCache
RemoveFromCache
RemoveFromCache'EvictionReason
RemoveFromCache'EvictionReason
forall (f :: * -> *) s a.
(Functor f, HasField s "reason" a) =>
LensLike' f s a
Activation.reason of
RemoveFromCache'EvictionReason
RemoveFromCache'FATAL -> Bool
True
RemoveFromCache'EvictionReason
_ -> Bool
False
replaceEnvironment :: MonadIO m => Temporal.Worker.Worker actEnv -> actEnv -> m ()
replaceEnvironment :: forall (m :: * -> *) actEnv.
MonadIO m =>
Worker actEnv -> actEnv -> m ()
replaceEnvironment Temporal.Worker.Worker {Async ()
Tracer
TChan EvictionWithRunID
Worker ty
InactiveForReplay ty (Async ())
InactiveForReplay ty (ActivityWorker actEnv)
SWorkerType ty
workerType :: ()
workerWorkflowLoop :: forall env. Worker env -> Async ()
workerActivityLoop :: ()
workerActivityWorker :: ()
workerCore :: ()
workerTracer :: forall env. Worker env -> Tracer
workerEvictionEmitter :: forall env. Worker env -> TChan EvictionWithRunID
workerType :: SWorkerType ty
workerWorkflowLoop :: Async ()
workerActivityLoop :: InactiveForReplay ty (Async ())
workerActivityWorker :: InactiveForReplay ty (ActivityWorker actEnv)
workerCore :: Worker ty
workerTracer :: Tracer
workerEvictionEmitter :: TChan EvictionWithRunID
..} actEnv
env = do
case SWorkerType ty
workerType of
SWorkerType ty
Core.SReal -> do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef actEnv -> actEnv -> IO ()
forall (m :: * -> *) a. MonadIO m => IORef a -> a -> m ()
writeIORef InactiveForReplay ty (ActivityWorker actEnv)
ActivityWorker actEnv
workerActivityWorker.activityEnv actEnv
env
SWorkerType ty
Core.SReplay -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()