{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}
module Temporal.Worker (
WorkerConfig (..),
Definitions (..),
ToDefinitions (..),
ReplayHistoryFailure (..),
Workflow.EvictionWithRunID (..),
WorkflowDef (..),
ActivityDef (..),
WorkflowId (..),
) where
import Control.Concurrent
import Control.Exception (Exception (..))
import Control.Monad
import Control.Monad.Catch
import Control.Monad.Logger
import Control.Monad.Reader
import Control.Monad.State
import Data.Either (lefts)
import Data.Foldable
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as HashMap
import Data.ProtoLens (encodeMessage)
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import Data.UUID (UUID)
import qualified Data.UUID as UUID
import Data.UUID.V4 (nextRandom)
import Data.Word
import Lens.Family2
import OpenTelemetry.Attributes (emptyAttributes)
import OpenTelemetry.Trace.Core hiding (inSpan)
import qualified OpenTelemetry.Trace.Core as OT
import OpenTelemetry.Trace.Monad
import Proto.Temporal.Api.History.V1.Message (History)
import Proto.Temporal.Api.History.V1.Message_Fields (maybe'workflowExecutionStartedEventAttributes, vec'events, workflowId)
import Proto.Temporal.Sdk.Core.WorkflowActivation.WorkflowActivation
import qualified Proto.Temporal.Sdk.Core.WorkflowActivation.WorkflowActivation_Fields as Activation
import RequireCallStack
import System.IO.Unsafe
import Temporal.Activity.Definition
import qualified Temporal.Activity.Worker as Activity
import Temporal.Common
import Temporal.Common.Async
import Temporal.Core.Client
import Temporal.Core.Worker (InactiveForReplay, WorkerError)
import qualified Temporal.Core.Worker as Core
import Temporal.Exception
import Temporal.Interceptor
import Temporal.Payload (PayloadProcessor (..))
import Temporal.Runtime
import Temporal.Worker.Types
import Temporal.Workflow.Definition
import qualified Temporal.Workflow.Worker as Workflow
import UnliftIO
newtype ConfigM actEnv a = ConfigM {forall actEnv a. ConfigM actEnv a -> State (WorkerConfig actEnv) a
unConfigM :: State (WorkerConfig actEnv) a}
deriving newtype ((forall a b. (a -> b) -> ConfigM actEnv a -> ConfigM actEnv b)
-> (forall a b. a -> ConfigM actEnv b -> ConfigM actEnv a)
-> Functor (ConfigM actEnv)
forall a b. a -> ConfigM actEnv b -> ConfigM actEnv a
forall a b. (a -> b) -> ConfigM actEnv a -> ConfigM actEnv b
forall actEnv a b. a -> ConfigM actEnv b -> ConfigM actEnv a
forall actEnv a b. (a -> b) -> ConfigM actEnv a -> ConfigM actEnv b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
$cfmap :: forall actEnv a b. (a -> b) -> ConfigM actEnv a -> ConfigM actEnv b
fmap :: forall a b. (a -> b) -> ConfigM actEnv a -> ConfigM actEnv b
$c<$ :: forall actEnv a b. a -> ConfigM actEnv b -> ConfigM actEnv a
<$ :: forall a b. a -> ConfigM actEnv b -> ConfigM actEnv a
Functor, Functor (ConfigM actEnv)
Functor (ConfigM actEnv) =>
(forall a. a -> ConfigM actEnv a)
-> (forall a b.
ConfigM actEnv (a -> b) -> ConfigM actEnv a -> ConfigM actEnv b)
-> (forall a b c.
(a -> b -> c)
-> ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv c)
-> (forall a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv b)
-> (forall a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv a)
-> Applicative (ConfigM actEnv)
forall actEnv. Functor (ConfigM actEnv)
forall a. a -> ConfigM actEnv a
forall actEnv a. a -> ConfigM actEnv a
forall a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv a
forall a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv b
forall a b.
ConfigM actEnv (a -> b) -> ConfigM actEnv a -> ConfigM actEnv b
forall actEnv a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv a
forall actEnv a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv b
forall actEnv a b.
ConfigM actEnv (a -> b) -> ConfigM actEnv a -> ConfigM actEnv b
forall a b c.
(a -> b -> c)
-> ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv c
forall actEnv a b c.
(a -> b -> c)
-> ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv c
forall (f :: * -> *).
Functor f =>
(forall a. a -> f a)
-> (forall a b. f (a -> b) -> f a -> f b)
-> (forall a b c. (a -> b -> c) -> f a -> f b -> f c)
-> (forall a b. f a -> f b -> f b)
-> (forall a b. f a -> f b -> f a)
-> Applicative f
$cpure :: forall actEnv a. a -> ConfigM actEnv a
pure :: forall a. a -> ConfigM actEnv a
$c<*> :: forall actEnv a b.
ConfigM actEnv (a -> b) -> ConfigM actEnv a -> ConfigM actEnv b
<*> :: forall a b.
ConfigM actEnv (a -> b) -> ConfigM actEnv a -> ConfigM actEnv b
$cliftA2 :: forall actEnv a b c.
(a -> b -> c)
-> ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv c
liftA2 :: forall a b c.
(a -> b -> c)
-> ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv c
$c*> :: forall actEnv a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv b
*> :: forall a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv b
$c<* :: forall actEnv a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv a
<* :: forall a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv a
Applicative, Applicative (ConfigM actEnv)
Applicative (ConfigM actEnv) =>
(forall a b.
ConfigM actEnv a -> (a -> ConfigM actEnv b) -> ConfigM actEnv b)
-> (forall a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv b)
-> (forall a. a -> ConfigM actEnv a)
-> Monad (ConfigM actEnv)
forall actEnv. Applicative (ConfigM actEnv)
forall a. a -> ConfigM actEnv a
forall actEnv a. a -> ConfigM actEnv a
forall a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv b
forall a b.
ConfigM actEnv a -> (a -> ConfigM actEnv b) -> ConfigM actEnv b
forall actEnv a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv b
forall actEnv a b.
ConfigM actEnv a -> (a -> ConfigM actEnv b) -> ConfigM actEnv b
forall (m :: * -> *).
Applicative m =>
(forall a b. m a -> (a -> m b) -> m b)
-> (forall a b. m a -> m b -> m b)
-> (forall a. a -> m a)
-> Monad m
$c>>= :: forall actEnv a b.
ConfigM actEnv a -> (a -> ConfigM actEnv b) -> ConfigM actEnv b
>>= :: forall a b.
ConfigM actEnv a -> (a -> ConfigM actEnv b) -> ConfigM actEnv b
$c>> :: forall actEnv a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv b
>> :: forall a b.
ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv b
$creturn :: forall actEnv a. a -> ConfigM actEnv a
return :: forall a. a -> ConfigM actEnv a
instance Semigroup a => Semigroup (ConfigM actEnv a) where
<> :: ConfigM actEnv a -> ConfigM actEnv a -> ConfigM actEnv a
(<>) = (a -> a -> a)
-> ConfigM actEnv a -> ConfigM actEnv a -> ConfigM actEnv a
forall a b c.
(a -> b -> c)
-> ConfigM actEnv a -> ConfigM actEnv b -> ConfigM actEnv c
forall (f :: * -> *) a b c.
Applicative f =>
(a -> b -> c) -> f a -> f b -> f c
liftA2 a -> a -> a
forall a. Semigroup a => a -> a -> a
instance Monoid a => Monoid (ConfigM actEnv a) where
mempty :: ConfigM actEnv a
mempty = a -> ConfigM actEnv a
forall a. a -> ConfigM actEnv a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
forall a. Monoid a => a
inertTracerProvider :: TracerProvider
inertTracerProvider :: TracerProvider
inertTracerProvider = IO TracerProvider -> TracerProvider
forall a. IO a -> a
unsafePerformIO (IO TracerProvider -> TracerProvider)
-> IO TracerProvider -> TracerProvider
forall a b. (a -> b) -> a -> b
$ [Processor] -> TracerProviderOptions -> IO TracerProvider
forall (m :: * -> *).
MonadIO m =>
[Processor] -> TracerProviderOptions -> m TracerProvider
createTracerProvider [] TracerProviderOptions
{-# NOINLINE inertTracerProvider #-}
configure :: ToDefinitions actEnv defs => actEnv -> defs -> ConfigM actEnv () -> WorkerConfig actEnv
configure :: forall actEnv defs.
ToDefinitions actEnv defs =>
actEnv -> defs -> ConfigM actEnv () -> WorkerConfig actEnv
configure actEnv
actEnv defs
defs = (State (WorkerConfig actEnv) ()
-> WorkerConfig actEnv -> WorkerConfig actEnv)
-> WorkerConfig actEnv
-> State (WorkerConfig actEnv) ()
-> WorkerConfig actEnv
forall a b c. (a -> b -> c) -> b -> a -> c
flip State (WorkerConfig actEnv) ()
-> WorkerConfig actEnv -> WorkerConfig actEnv
forall s a. State s a -> s -> s
execState WorkerConfig actEnv
defaultConfig (State (WorkerConfig actEnv) () -> WorkerConfig actEnv)
-> (ConfigM actEnv () -> State (WorkerConfig actEnv) ())
-> ConfigM actEnv ()
-> WorkerConfig actEnv
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConfigM actEnv () -> State (WorkerConfig actEnv) ()
forall actEnv a. ConfigM actEnv a -> State (WorkerConfig actEnv) a
defaultConfig :: WorkerConfig actEnv
defaultConfig =
let (Definitions HashMap Text WorkflowDefinition
wfDefs HashMap Text (ActivityDefinition actEnv)
actDefs) = defs -> Definitions actEnv
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions defs
in WorkerConfig
{ coreConfig :: WorkerConfig
coreConfig = WorkerConfig
, deadlockTimeout :: Maybe Int
deadlockTimeout = Int -> Maybe Int
forall a. a -> Maybe a
Just Int
, interceptorConfig :: Interceptors actEnv
interceptorConfig = Interceptors actEnv
forall a. Monoid a => a
, applicationErrorConverters :: [ApplicationFailureHandler]
applicationErrorConverters = [ApplicationFailureHandler]
, logger :: Loc -> Text -> LogLevel -> LogStr -> IO ()
logger = \Loc
_ Text
_ LogLevel
_ LogStr
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
, tracerProvider :: TracerProvider
tracerProvider = TracerProvider
, payloadProcessor :: PayloadProcessor
payloadProcessor = (Payload -> IO Payload)
-> (Payload -> IO (Either String Payload)) -> PayloadProcessor
PayloadProcessor Payload -> IO Payload
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either String Payload -> IO (Either String Payload)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either String Payload -> IO (Either String Payload))
-> (Payload -> Either String Payload)
-> Payload
-> IO (Either String Payload)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Payload -> Either String Payload
forall a b. b -> Either a b
, actEnv
HashMap Text WorkflowDefinition
HashMap Text (ActivityDefinition actEnv)
actEnv :: actEnv
wfDefs :: HashMap Text WorkflowDefinition
actDefs :: HashMap Text (ActivityDefinition actEnv)
actDefs :: HashMap Text (ActivityDefinition actEnv)
actEnv :: actEnv
wfDefs :: HashMap Text WorkflowDefinition
data Definitions env = Definitions
{ forall env. Definitions env -> HashMap Text WorkflowDefinition
workflowDefinitions :: {-# UNPACK #-} !(HashMap Text WorkflowDefinition)
, forall env.
Definitions env -> HashMap Text (ActivityDefinition env)
activityDefinitions :: {-# UNPACK #-} !(HashMap Text (ActivityDefinition env))
instance Semigroup (Definitions env) where
(Definitions HashMap Text WorkflowDefinition
w1 HashMap Text (ActivityDefinition env)
a1) <> :: Definitions env -> Definitions env -> Definitions env
<> (Definitions HashMap Text WorkflowDefinition
w2 HashMap Text (ActivityDefinition env)
a2) = HashMap Text WorkflowDefinition
-> HashMap Text (ActivityDefinition env) -> Definitions env
forall env.
HashMap Text WorkflowDefinition
-> HashMap Text (ActivityDefinition env) -> Definitions env
Definitions (HashMap Text WorkflowDefinition
w1 HashMap Text WorkflowDefinition
-> HashMap Text WorkflowDefinition
-> HashMap Text WorkflowDefinition
forall a. Semigroup a => a -> a -> a
<> HashMap Text WorkflowDefinition
w2) (HashMap Text (ActivityDefinition env)
a1 HashMap Text (ActivityDefinition env)
-> HashMap Text (ActivityDefinition env)
-> HashMap Text (ActivityDefinition env)
forall a. Semigroup a => a -> a -> a
<> HashMap Text (ActivityDefinition env)
instance Monoid (Definitions env) where
mempty :: Definitions env
mempty = HashMap Text WorkflowDefinition
-> HashMap Text (ActivityDefinition env) -> Definitions env
forall env.
HashMap Text WorkflowDefinition
-> HashMap Text (ActivityDefinition env) -> Definitions env
Definitions HashMap Text WorkflowDefinition
forall a. Monoid a => a
mempty HashMap Text (ActivityDefinition env)
forall a. Monoid a => a
instance ToDefinitions env (Definitions env) where
toDefinitions :: Definitions env -> Definitions env
toDefinitions = Definitions env -> Definitions env
forall a. a -> a
class ToDefinitions env f where
toDefinitions :: f -> Definitions env
instance ToDefinitions env WorkflowDefinition where
toDefinitions :: WorkflowDefinition -> Definitions env
toDefinitions WorkflowDefinition
wf = HashMap Text WorkflowDefinition
-> HashMap Text (ActivityDefinition env) -> Definitions env
forall env.
HashMap Text WorkflowDefinition
-> HashMap Text (ActivityDefinition env) -> Definitions env
Definitions (Text -> WorkflowDefinition -> HashMap Text WorkflowDefinition
forall k v. Hashable k => k -> v -> HashMap k v
HashMap.singleton (WorkflowDefinition -> Text
workflowName WorkflowDefinition
wf) WorkflowDefinition
wf) HashMap Text (ActivityDefinition env)
forall a. Monoid a => a
instance ToDefinitions env (ProvidedWorkflow f) where
toDefinitions :: ProvidedWorkflow f -> Definitions env
toDefinitions ProvidedWorkflow f
wf = WorkflowDefinition -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions ProvidedWorkflow f
instance ToDefinitions env (ActivityDefinition env) where
toDefinitions :: ActivityDefinition env -> Definitions env
toDefinitions ActivityDefinition env
act = HashMap Text WorkflowDefinition
-> HashMap Text (ActivityDefinition env) -> Definitions env
forall env.
HashMap Text WorkflowDefinition
-> HashMap Text (ActivityDefinition env) -> Definitions env
Definitions HashMap Text WorkflowDefinition
forall a. Monoid a => a
mempty (Text
-> ActivityDefinition env -> HashMap Text (ActivityDefinition env)
forall k v. Hashable k => k -> v -> HashMap k v
HashMap.singleton (ActivityDefinition env -> Text
forall env. ActivityDefinition env -> Text
activityName ActivityDefinition env
act) ActivityDefinition env
instance ToDefinitions env (ProvidedActivity env f) where
toDefinitions :: ProvidedActivity env f -> Definitions env
toDefinitions ProvidedActivity env f
act = ActivityDefinition env -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions ProvidedActivity env f
instance (ToDefinitions env _1, ToDefinitions env _2) => ToDefinitions env (_1, _2) where
toDefinitions :: (_1, _2) -> Definitions env
toDefinitions (_1
a, _2
b) = _1 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _1
a Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _2 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _2
instance (ToDefinitions env _1, ToDefinitions env _2, ToDefinitions env _3) => ToDefinitions env (_1, _2, _3) where
toDefinitions :: (_1, _2, _3) -> Definitions env
toDefinitions (_1
a, _2
b, _3
c) = _1 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _1
a Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _2 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _2
b Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _3 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _3
instance (ToDefinitions env _1, ToDefinitions env _2, ToDefinitions env _3, ToDefinitions env _4) => ToDefinitions env (_1, _2, _3, _4) where
toDefinitions :: (_1, _2, _3, _4) -> Definitions env
toDefinitions (_1
a, _2
b, _3
c, _4
d) = _1 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _1
a Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _2 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _2
b Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _3 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _3
c Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _4 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _4
instance (ToDefinitions env _1, ToDefinitions env _2, ToDefinitions env _3, ToDefinitions env _4, ToDefinitions env _5) => ToDefinitions env (_1, _2, _3, _4, _5) where
toDefinitions :: (_1, _2, _3, _4, _5) -> Definitions env
toDefinitions (_1
a, _2
b, _3
c, _4
d, _5
e) = _1 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _1
a Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _2 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _2
b Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _3 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _3
c Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _4 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _4
d Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _5 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _5
instance (ToDefinitions env _1, ToDefinitions env _2, ToDefinitions env _3, ToDefinitions env _4, ToDefinitions env _5, ToDefinitions env _6) => ToDefinitions env (_1, _2, _3, _4, _5, _6) where
toDefinitions :: (_1, _2, _3, _4, _5, _6) -> Definitions env
toDefinitions (_1
a, _2
b, _3
c, _4
d, _5
e, _6
f) = _1 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _1
a Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _2 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _2
b Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _3 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _3
c Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _4 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _4
d Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _5 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _5
e Definitions env -> Definitions env -> Definitions env
forall a. Semigroup a => a -> a -> a
<> _6 -> Definitions env
forall env f. ToDefinitions env f => f -> Definitions env
toDefinitions _6
addInterceptors :: Interceptors actEnv -> ConfigM actEnv ()
addInterceptors :: forall actEnv. Interceptors actEnv -> ConfigM actEnv ()
addInterceptors Interceptors actEnv
i = State (WorkerConfig actEnv) () -> ConfigM actEnv ()
forall actEnv a. State (WorkerConfig actEnv) a -> ConfigM actEnv a
ConfigM (State (WorkerConfig actEnv) () -> ConfigM actEnv ())
-> State (WorkerConfig actEnv) () -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ (WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' ((WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ())
-> (WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig actEnv
conf ->
WorkerConfig actEnv
{ interceptorConfig = i <> interceptorConfig conf
addErrorConverter :: (Exception e) => (e -> ApplicationFailure) -> ConfigM actEnv ()
addErrorConverter :: forall e actEnv.
Exception e =>
(e -> ApplicationFailure) -> ConfigM actEnv ()
addErrorConverter e -> ApplicationFailure
f = State (WorkerConfig actEnv) () -> ConfigM actEnv ()
forall actEnv a. State (WorkerConfig actEnv) a -> ConfigM actEnv a
ConfigM (State (WorkerConfig actEnv) () -> ConfigM actEnv ())
-> State (WorkerConfig actEnv) () -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ (WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' ((WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ())
-> (WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig actEnv
conf ->
WorkerConfig actEnv
{ applicationErrorConverters = ApplicationFailureHandler f : applicationErrorConverters conf
modifyCore :: (Core.WorkerConfig -> Core.WorkerConfig) -> ConfigM actEnv ()
modifyCore :: forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore WorkerConfig -> WorkerConfig
f = State (WorkerConfig actEnv) () -> ConfigM actEnv ()
forall actEnv a. State (WorkerConfig actEnv) a -> ConfigM actEnv a
ConfigM (State (WorkerConfig actEnv) () -> ConfigM actEnv ())
-> State (WorkerConfig actEnv) () -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ (WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' ((WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ())
-> (WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig actEnv
conf ->
WorkerConfig actEnv
{ coreConfig = f (coreConfig conf)
setNamespace :: Namespace -> ConfigM actEnv ()
setNamespace :: forall actEnv. Namespace -> ConfigM actEnv ()
setNamespace (Namespace Text
ns) = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
{ Core.namespace = ns
setTaskQueue :: TaskQueue -> ConfigM actEnv ()
setTaskQueue :: forall actEnv. TaskQueue -> ConfigM actEnv ()
setTaskQueue (TaskQueue Text
tq) = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
{ Core.taskQueue = tq
setBuildId :: Text -> ConfigM actEnv ()
setBuildId :: forall actEnv. Text -> ConfigM actEnv ()
setBuildId Text
bid = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
{ Core.buildId = bid
setIdentity :: Text -> ConfigM actEnv ()
setIdentity :: forall actEnv. Text -> ConfigM actEnv ()
setIdentity Text
ident = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
{ Core.identityOverride = Just ident
setMaxCachedWorkflows :: Word64 -> ConfigM actEnv ()
setMaxCachedWorkflows :: forall actEnv. Word64 -> ConfigM actEnv ()
setMaxCachedWorkflows Word64
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
{ Core.maxCachedWorkflows = n
setMaxOutstandingWorkflowTasks :: Word64 -> ConfigM actEnv ()
setMaxOutstandingWorkflowTasks :: forall actEnv. Word64 -> ConfigM actEnv ()
setMaxOutstandingWorkflowTasks Word64
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
{ Core.maxOutstandingWorkflowTasks = n
setMaxOutstandingActivities :: Word64 -> ConfigM actEnv ()
setMaxOutstandingActivities :: forall actEnv. Word64 -> ConfigM actEnv ()
setMaxOutstandingActivities Word64
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
{ Core.maxOutstandingActivities = n
setMaxOutstandingLocalActivities :: Word64 -> ConfigM actEnv ()
setMaxOutstandingLocalActivities :: forall actEnv. Word64 -> ConfigM actEnv ()
setMaxOutstandingLocalActivities Word64
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
{ Core.maxOutstandingLocalActivities = n
setMaxConcurrentWorkflowTaskPolls :: Word64 -> ConfigM actEnv ()
setMaxConcurrentWorkflowTaskPolls :: forall actEnv. Word64 -> ConfigM actEnv ()
setMaxConcurrentWorkflowTaskPolls Word64
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
{ Core.maxConcurrentWorkflowTaskPolls = n
setNonstickyToStickyPollRatio :: Float -> ConfigM actEnv ()
setNonstickyToStickyPollRatio :: forall actEnv. Float -> ConfigM actEnv ()
setNonstickyToStickyPollRatio Float
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
{ Core.nonstickyToStickyPollRatio = n
setMaxConcurrentActivityTaskPolls :: Word64 -> ConfigM actEnv ()
setMaxConcurrentActivityTaskPolls :: forall actEnv. Word64 -> ConfigM actEnv ()
setMaxConcurrentActivityTaskPolls Word64
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
{ Core.maxConcurrentActivityTaskPolls = n
setNoRemoteActivities :: Bool -> ConfigM actEnv ()
setNoRemoteActivities :: forall actEnv. Bool -> ConfigM actEnv ()
setNoRemoteActivities Bool
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
{ Core.noRemoteActivities = n
setStickyQueueScheduleToStartTimeoutMillis :: Word64 -> ConfigM actEnv ()
setStickyQueueScheduleToStartTimeoutMillis :: forall actEnv. Word64 -> ConfigM actEnv ()
setStickyQueueScheduleToStartTimeoutMillis Word64
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
{ Core.stickyQueueScheduleToStartTimeoutMillis = n
setMaxHeartbeatThrottleIntervalMillis :: Word64 -> ConfigM actEnv ()
setMaxHeartbeatThrottleIntervalMillis :: forall actEnv. Word64 -> ConfigM actEnv ()
setMaxHeartbeatThrottleIntervalMillis Word64
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
{ Core.maxHeartbeatThrottleIntervalMillis = n
setDefaultHeartbeatThrottleIntervalMillis :: Word64 -> ConfigM actEnv ()
setDefaultHeartbeatThrottleIntervalMillis :: forall actEnv. Word64 -> ConfigM actEnv ()
setDefaultHeartbeatThrottleIntervalMillis Word64
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
{ Core.defaultHeartbeatThrottleIntervalMillis = n
setMaxActivitiesPerSecond :: Double -> ConfigM actEnv ()
setMaxActivitiesPerSecond :: forall actEnv. Double -> ConfigM actEnv ()
setMaxActivitiesPerSecond Double
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
{ Core.maxActivitiesPerSecond = Just n
setMaxTaskQueueActivitiesPerSecond :: Double -> ConfigM actEnv ()
setMaxTaskQueueActivitiesPerSecond :: forall actEnv. Double -> ConfigM actEnv ()
setMaxTaskQueueActivitiesPerSecond Double
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
{ Core.maxTaskQueueActivitiesPerSecond = Just n
setGracefulShutdownPeriodMillis :: Word64 -> ConfigM actEnv ()
setGracefulShutdownPeriodMillis :: forall actEnv. Word64 -> ConfigM actEnv ()
setGracefulShutdownPeriodMillis Word64
n = (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall actEnv. (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
modifyCore ((WorkerConfig -> WorkerConfig) -> ConfigM actEnv ())
-> (WorkerConfig -> WorkerConfig) -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig
conf ->
{ Core.gracefulShutdownPeriodMillis = n
setLogger :: (Loc -> LogSource -> Control.Monad.Logger.LogLevel -> LogStr -> IO ()) -> ConfigM actEnv ()
setLogger :: forall actEnv.
(Loc -> Text -> LogLevel -> LogStr -> IO ()) -> ConfigM actEnv ()
setLogger Loc -> Text -> LogLevel -> LogStr -> IO ()
f = State (WorkerConfig actEnv) () -> ConfigM actEnv ()
forall actEnv a. State (WorkerConfig actEnv) a -> ConfigM actEnv a
ConfigM (State (WorkerConfig actEnv) () -> ConfigM actEnv ())
-> State (WorkerConfig actEnv) () -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ (WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' ((WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ())
-> (WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig actEnv
conf ->
WorkerConfig actEnv
{ logger = f
setTracerProvider :: TracerProvider -> ConfigM actEnv ()
setTracerProvider :: forall actEnv. TracerProvider -> ConfigM actEnv ()
setTracerProvider TracerProvider
tp = State (WorkerConfig actEnv) () -> ConfigM actEnv ()
forall actEnv a. State (WorkerConfig actEnv) a -> ConfigM actEnv a
ConfigM (State (WorkerConfig actEnv) () -> ConfigM actEnv ())
-> State (WorkerConfig actEnv) () -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ (WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' ((WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ())
-> (WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig actEnv
conf ->
WorkerConfig actEnv
{ tracerProvider = tp
setPayloadProcessor :: PayloadProcessor -> ConfigM actEnv ()
setPayloadProcessor :: forall actEnv. PayloadProcessor -> ConfigM actEnv ()
setPayloadProcessor PayloadProcessor
p = State (WorkerConfig actEnv) () -> ConfigM actEnv ()
forall actEnv a. State (WorkerConfig actEnv) a -> ConfigM actEnv a
ConfigM (State (WorkerConfig actEnv) () -> ConfigM actEnv ())
-> State (WorkerConfig actEnv) () -> ConfigM actEnv ()
forall a b. (a -> b) -> a -> b
$ (WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' ((WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ())
-> (WorkerConfig actEnv -> WorkerConfig actEnv)
-> State (WorkerConfig actEnv) ()
forall a b. (a -> b) -> a -> b
$ \WorkerConfig actEnv
conf ->
WorkerConfig actEnv
{ payloadProcessor = p
data Worker env = forall ty.
Core.KnownWorkerType ty =>
{ ()
workerType :: !(Core.SWorkerType ty)
, forall env. Worker env -> Async ()
workerWorkflowLoop :: !(Async ())
, ()
workerActivityLoop :: !(InactiveForReplay ty (Async ()))
, ()
workerActivityWorker :: !(InactiveForReplay ty (Activity.ActivityWorker env))
, ()
workerCore :: !(Core.Worker ty)
, forall env. Worker env -> Tracer
workerTracer :: !Tracer
, forall env. Worker env -> TChan EvictionWithRunID
workerEvictionEmitter :: !(TChan Workflow.EvictionWithRunID)
startReplayWorker :: (MonadUnliftIO m, MonadCatch m) => Runtime -> WorkerConfig actEnv -> m (Temporal.Worker.Worker actEnv, Core.HistoryPusher)
startReplayWorker :: forall (m :: * -> *) actEnv.
(MonadUnliftIO m, MonadCatch m) =>
Runtime -> WorkerConfig actEnv -> m (Worker actEnv, HistoryPusher)
startReplayWorker Runtime
rt WorkerConfig actEnv
conf = (RequireCallStackImpl => m (Worker actEnv, HistoryPusher))
-> m (Worker actEnv, HistoryPusher)
forall r. (RequireCallStackImpl => r) -> r
provideCallStack ((RequireCallStackImpl => m (Worker actEnv, HistoryPusher))
-> m (Worker actEnv, HistoryPusher))
-> (RequireCallStackImpl => m (Worker actEnv, HistoryPusher))
-> m (Worker actEnv, HistoryPusher)
forall a b. (a -> b) -> a -> b
$ WorkerConfig actEnv
-> WorkerContextM m (Worker actEnv, HistoryPusher)
-> m (Worker actEnv, HistoryPusher)
forall conf (m :: * -> *) a.
WorkerConfig conf -> WorkerContextM m a -> m a
runWorkerContext WorkerConfig actEnv
conf (WorkerContextM m (Worker actEnv, HistoryPusher)
-> m (Worker actEnv, HistoryPusher))
-> WorkerContextM m (Worker actEnv, HistoryPusher)
-> m (Worker actEnv, HistoryPusher)
forall a b. (a -> b) -> a -> b
$ do
$(logDebug) Text
"Starting worker"
let coreConfig' :: WorkerConfig
coreConfig' = WorkerConfig actEnv
conf.coreConfig {Core.nondeterminismAsWorkflowFail = True}
(workerCore, replay) <- (WorkerError -> WorkerContextM m (Worker 'Replay, HistoryPusher))
-> ((Worker 'Replay, HistoryPusher)
-> WorkerContextM m (Worker 'Replay, HistoryPusher))
-> Either WorkerError (Worker 'Replay, HistoryPusher)
-> WorkerContextM m (Worker 'Replay, HistoryPusher)
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either WorkerError -> WorkerContextM m (Worker 'Replay, HistoryPusher)
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (Worker 'Replay, HistoryPusher)
-> WorkerContextM m (Worker 'Replay, HistoryPusher)
forall a. a -> WorkerContextM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either WorkerError (Worker 'Replay, HistoryPusher)
-> WorkerContextM m (Worker 'Replay, HistoryPusher))
-> WorkerContextM
m (Either WorkerError (Worker 'Replay, HistoryPusher))
-> WorkerContextM m (Worker 'Replay, HistoryPusher)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO (Either WorkerError (Worker 'Replay, HistoryPusher))
-> WorkerContextM
m (Either WorkerError (Worker 'Replay, HistoryPusher))
forall a. IO a -> WorkerContextM m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Runtime
-> WorkerConfig
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher))
Core.newReplayWorker Runtime
rt WorkerConfig
$(logDebug) "Instantiated core"
workerEvictionEmitter <- newBroadcastTChanIO
runningWorkflows <- newTVarIO mempty
uuid <- liftIO nextRandom
let workerWorkflowFunctions = WorkerConfig actEnv
workerTaskQueue = Text -> TaskQueue
TaskQueue (WorkerConfig -> Text
Core.taskQueue WorkerConfig actEnv
conf.coreConfig Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"-" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> UUID -> Text
workerInboundInterceptors = WorkerConfig actEnv
workerOutboundInterceptors = WorkerConfig actEnv
workerDeadlockTimeout = WorkerConfig actEnv
workerClient = ()
workerErrorConverters = WorkerConfig actEnv
processor = WorkerConfig actEnv
workflowWorker = Workflow.WorkflowWorker {[ApplicationFailureHandler]
Maybe Int
HashMap Text WorkflowDefinition
TVar (HashMap RunId WorkflowInstance)
TChan EvictionWithRunID
Worker 'Replay
InactiveForReplay 'Replay Client
workerCore :: Worker 'Replay
workerEvictionEmitter :: TChan EvictionWithRunID
runningWorkflows :: TVar (HashMap RunId WorkflowInstance)
workerWorkflowFunctions :: HashMap Text WorkflowDefinition
workerTaskQueue :: TaskQueue
workerInboundInterceptors :: WorkflowInboundInterceptor
workerOutboundInterceptors :: WorkflowOutboundInterceptor
workerDeadlockTimeout :: Maybe Int
workerClient :: ()
workerErrorConverters :: [ApplicationFailureHandler]
processor :: PayloadProcessor
workerEvictionEmitter :: TChan EvictionWithRunID
processor :: PayloadProcessor
workerErrorConverters :: [ApplicationFailureHandler]
workerTaskQueue :: TaskQueue
workerDeadlockTimeout :: Maybe Int
workerOutboundInterceptors :: WorkflowOutboundInterceptor
workerInboundInterceptors :: WorkflowInboundInterceptor
workerCore :: Worker 'Replay
workerClient :: InactiveForReplay 'Replay Client
runningWorkflows :: TVar (HashMap RunId WorkflowInstance)
workerWorkflowFunctions :: HashMap Text WorkflowDefinition
workerActivityWorker = ()
workerActivityLoop = ()
workerType = SWorkerType 'Replay
workerTracer = TracerProvider -> InstrumentationLibrary -> TracerOptions -> Tracer
makeTracer WorkerConfig actEnv
conf.tracerProvider (Text -> Text -> InstrumentationLibrary
InstrumentationLibrary Text
"hs-temporal-sdk" Text
"") TracerOptions
workerWorkflowLoop <- asyncLabelled (T.unpack $ T.concat ["temporal/worker/workflow/", Core.namespace conf.coreConfig, "/", Core.taskQueue conf.coreConfig]) $ do
$(logDebug) "Starting workflow worker loop"
Workflow.execute workflowWorker
`UnliftIO.finally` $(logDebug) "Exiting workflow worker loop"
pure (Temporal.Worker.Worker {..}, replay)
data ReplayHistoryFailure = ReplayHistoryFailure
{ ReplayHistoryFailure -> Text
message :: Text
deriving stock (Int -> ReplayHistoryFailure -> ShowS
[ReplayHistoryFailure] -> ShowS
ReplayHistoryFailure -> String
(Int -> ReplayHistoryFailure -> ShowS)
-> (ReplayHistoryFailure -> String)
-> ([ReplayHistoryFailure] -> ShowS)
-> Show ReplayHistoryFailure
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ReplayHistoryFailure -> ShowS
showsPrec :: Int -> ReplayHistoryFailure -> ShowS
$cshow :: ReplayHistoryFailure -> String
show :: ReplayHistoryFailure -> String
$cshowList :: [ReplayHistoryFailure] -> ShowS
showList :: [ReplayHistoryFailure] -> ShowS
Show, ReplayHistoryFailure -> ReplayHistoryFailure -> Bool
(ReplayHistoryFailure -> ReplayHistoryFailure -> Bool)
-> (ReplayHistoryFailure -> ReplayHistoryFailure -> Bool)
-> Eq ReplayHistoryFailure
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ReplayHistoryFailure -> ReplayHistoryFailure -> Bool
== :: ReplayHistoryFailure -> ReplayHistoryFailure -> Bool
$c/= :: ReplayHistoryFailure -> ReplayHistoryFailure -> Bool
/= :: ReplayHistoryFailure -> ReplayHistoryFailure -> Bool
deriving anyclass (Show ReplayHistoryFailure
Typeable ReplayHistoryFailure
(Typeable ReplayHistoryFailure, Show ReplayHistoryFailure) =>
(ReplayHistoryFailure -> SomeException)
-> (SomeException -> Maybe ReplayHistoryFailure)
-> (ReplayHistoryFailure -> String)
-> (ReplayHistoryFailure -> Bool)
-> Exception ReplayHistoryFailure
SomeException -> Maybe ReplayHistoryFailure
ReplayHistoryFailure -> Bool
ReplayHistoryFailure -> String
ReplayHistoryFailure -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e)
-> (e -> String)
-> (e -> Bool)
-> Exception e
$ctoException :: ReplayHistoryFailure -> SomeException
toException :: ReplayHistoryFailure -> SomeException
$cfromException :: SomeException -> Maybe ReplayHistoryFailure
fromException :: SomeException -> Maybe ReplayHistoryFailure
$cdisplayException :: ReplayHistoryFailure -> String
displayException :: ReplayHistoryFailure -> String
$cbacktraceDesired :: ReplayHistoryFailure -> Bool
backtraceDesired :: ReplayHistoryFailure -> Bool
runReplayHistory :: (MonadUnliftIO m, MonadCatch m) => Runtime -> WorkerConfig actEnv -> History -> m (Either ReplayHistoryFailure ())
runReplayHistory :: forall (m :: * -> *) actEnv.
(MonadUnliftIO m, MonadCatch m) =>
-> WorkerConfig actEnv
-> History
-> m (Either ReplayHistoryFailure ())
runReplayHistory Runtime
rt WorkerConfig actEnv
conf History
history = WorkerConfig actEnv
-> WorkerContextM m (Either ReplayHistoryFailure ())
-> m (Either ReplayHistoryFailure ())
forall conf (m :: * -> *) a.
WorkerConfig conf -> WorkerContextM m a -> m a
runWorkerContext WorkerConfig actEnv
conf (WorkerContextM m (Either ReplayHistoryFailure ())
-> m (Either ReplayHistoryFailure ()))
-> WorkerContextM m (Either ReplayHistoryFailure ())
-> m (Either ReplayHistoryFailure ())
forall a b. (a -> b) -> a -> b
$ WorkerContextM m (Worker actEnv, HistoryPusher)
-> ((Worker actEnv, HistoryPusher) -> WorkerContextM m ())
-> ((Worker actEnv, HistoryPusher)
-> WorkerContextM m (Either ReplayHistoryFailure ()))
-> WorkerContextM m (Either ReplayHistoryFailure ())
forall (m :: * -> *) a b c.
MonadUnliftIO m =>
m a -> (a -> m b) -> (a -> m c) -> m c
UnliftIO.bracket (Runtime
-> WorkerConfig actEnv
-> WorkerContextM m (Worker actEnv, HistoryPusher)
forall (m :: * -> *) actEnv.
(MonadUnliftIO m, MonadCatch m) =>
Runtime -> WorkerConfig actEnv -> m (Worker actEnv, HistoryPusher)
startReplayWorker Runtime
rt WorkerConfig actEnv
conf) (\(Worker actEnv
worker, HistoryPusher
pusher) -> IO () -> WorkerContextM m ()
forall a. IO a -> WorkerContextM m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (HistoryPusher -> IO ()
Core.closeHistory HistoryPusher
pusher) WorkerContextM m () -> WorkerContextM m () -> WorkerContextM m ()
forall a b.
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Worker actEnv -> WorkerContextM m ()
forall (m :: * -> *) actEnv.
MonadUnliftIO m =>
Worker actEnv -> m ()
shutdown Worker actEnv
worker) (((Worker actEnv, HistoryPusher)
-> WorkerContextM m (Either ReplayHistoryFailure ()))
-> WorkerContextM m (Either ReplayHistoryFailure ()))
-> ((Worker actEnv, HistoryPusher)
-> WorkerContextM m (Either ReplayHistoryFailure ()))
-> WorkerContextM m (Either ReplayHistoryFailure ())
forall a b. (a -> b) -> a -> b
$ \(Worker actEnv
worker, HistoryPusher
pusher) -> do
evictions <- Worker actEnv -> WorkerContextM m (TChan EvictionWithRunID)
forall (m :: * -> *) actEnv.
MonadIO m =>
Worker actEnv -> m (TChan EvictionWithRunID)
subscribeToEvictions Worker actEnv
let mWfId = History
history History -> Fold History History ByteString Any -> Maybe ByteString
forall s t a b. s -> Fold s t a b -> Maybe a
^? LensLike' f History (Vector HistoryEvent)
forall (f :: * -> *) s a.
(Functor f, HasField s "vec'events" a) =>
LensLike' f s a
vec'events LensLike' f History (Vector HistoryEvent)
-> ((ByteString -> f Any)
-> Vector HistoryEvent -> f (Vector HistoryEvent))
-> (ByteString -> f Any)
-> History
-> f History
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (HistoryEvent -> f HistoryEvent)
-> Vector HistoryEvent -> f (Vector HistoryEvent)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Vector a -> f (Vector b)
traverse ((HistoryEvent -> f HistoryEvent)
-> Vector HistoryEvent -> f (Vector HistoryEvent))
-> ((ByteString -> f Any) -> HistoryEvent -> f HistoryEvent)
-> (ByteString -> f Any)
-> Vector HistoryEvent
-> f (Vector HistoryEvent)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LensLike'
f HistoryEvent (Maybe WorkflowExecutionStartedEventAttributes)
forall (f :: * -> *) s a.
(Functor f,
HasField s "maybe'workflowExecutionStartedEventAttributes" a) =>
LensLike' f s a
maybe'workflowExecutionStartedEventAttributes LensLike'
f HistoryEvent (Maybe WorkflowExecutionStartedEventAttributes)
-> ((ByteString -> f Any)
-> Maybe WorkflowExecutionStartedEventAttributes
-> f (Maybe WorkflowExecutionStartedEventAttributes))
-> (ByteString -> f Any)
-> HistoryEvent
-> f HistoryEvent
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (WorkflowExecutionStartedEventAttributes
-> f WorkflowExecutionStartedEventAttributes)
-> Maybe WorkflowExecutionStartedEventAttributes
-> f (Maybe WorkflowExecutionStartedEventAttributes)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Maybe a -> f (Maybe b)
traverse ((WorkflowExecutionStartedEventAttributes
-> f WorkflowExecutionStartedEventAttributes)
-> Maybe WorkflowExecutionStartedEventAttributes
-> f (Maybe WorkflowExecutionStartedEventAttributes))
-> ((ByteString -> f Any)
-> WorkflowExecutionStartedEventAttributes
-> f WorkflowExecutionStartedEventAttributes)
-> (ByteString -> f Any)
-> Maybe WorkflowExecutionStartedEventAttributes
-> f (Maybe WorkflowExecutionStartedEventAttributes)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LensLike' f WorkflowExecutionStartedEventAttributes Text
forall (f :: * -> *) s a.
(Functor f, HasField s "workflowId" a) =>
LensLike' f s a
workflowId LensLike' f WorkflowExecutionStartedEventAttributes Text
-> ((ByteString -> f Any) -> Text -> f Text)
-> (ByteString -> f Any)
-> WorkflowExecutionStartedEventAttributes
-> f WorkflowExecutionStartedEventAttributes
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Text -> ByteString) -> Getter Text Text ByteString Any
forall s a t b. (s -> a) -> Getter s t a b
to Text -> ByteString
wfId <- maybe (throwIO $ userError "No workflow ID found in history") pure mWfId
$(logDebug) $ "Pushing history for workflow ID " <> T.pack (show wfId)
$(logDebug) $ T.pack $ show history
res <- liftIO $ Core.pushHistory pusher wfId (Left $ encodeMessage history)
$(logDebug) $ "Pushed history for workflow ID " <> T.pack (show wfId)
case res of
Left WorkerError
e -> Either ReplayHistoryFailure ()
-> WorkerContextM m (Either ReplayHistoryFailure ())
forall a. a -> WorkerContextM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either ReplayHistoryFailure ()
-> WorkerContextM m (Either ReplayHistoryFailure ()))
-> Either ReplayHistoryFailure ()
-> WorkerContextM m (Either ReplayHistoryFailure ())
forall a b. (a -> b) -> a -> b
$ ReplayHistoryFailure -> Either ReplayHistoryFailure ()
forall a b. a -> Either a b
Left (ReplayHistoryFailure -> Either ReplayHistoryFailure ())
-> ReplayHistoryFailure -> Either ReplayHistoryFailure ()
forall a b. (a -> b) -> a -> b
$ ReplayHistoryFailure {message :: Text
message = WorkerError
Right () -> do
res <- STM EvictionWithRunID -> WorkerContextM m EvictionWithRunID
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM EvictionWithRunID -> WorkerContextM m EvictionWithRunID)
-> STM EvictionWithRunID -> WorkerContextM m EvictionWithRunID
forall a b. (a -> b) -> a -> b
$ TChan EvictionWithRunID -> STM EvictionWithRunID
forall a. TChan a -> STM a
readTChan TChan EvictionWithRunID
if evictionWasFatal res
then pure $ Left $ ReplayHistoryFailure {message = evictionMessage res}
else pure $ Right ()
traced :: WorkerConfig env -> ReaderT Tracer m a -> m a
traced :: forall env (m :: * -> *) a.
WorkerConfig env -> ReaderT Tracer m a -> m a
traced WorkerConfig env
conf ReaderT Tracer m a
m =
ReaderT Tracer m a -> Tracer -> m a
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT ReaderT Tracer m a
m (Tracer -> m a) -> Tracer -> m a
forall a b. (a -> b) -> a -> b
TracerProvider -> InstrumentationLibrary -> TracerOptions -> Tracer
WorkerConfig env
(Text -> Text -> InstrumentationLibrary
InstrumentationLibrary Text
"hs-temporal-sdk" Text
runWorkerContext :: WorkerConfig conf -> WorkerContextM m a -> m a
runWorkerContext :: forall conf (m :: * -> *) a.
WorkerConfig conf -> WorkerContextM m a -> m a
runWorkerContext WorkerConfig conf
conf (WorkerContextM ReaderT Tracer (LoggingT m) a
m) = (LoggingT m a
-> (Loc -> Text -> LogLevel -> LogStr -> IO ()) -> m a)
-> (Loc -> Text -> LogLevel -> LogStr -> IO ())
-> LoggingT m a
-> m a
forall a b c. (a -> b -> c) -> b -> a -> c
flip LoggingT m a -> (Loc -> Text -> LogLevel -> LogStr -> IO ()) -> m a
forall (m :: * -> *) a.
LoggingT m a -> (Loc -> Text -> LogLevel -> LogStr -> IO ()) -> m a
runLoggingT WorkerConfig conf
conf.logger (LoggingT m a -> m a) -> LoggingT m a -> m a
forall a b. (a -> b) -> a -> b
$ WorkerConfig conf -> ReaderT Tracer (LoggingT m) a -> LoggingT m a
forall env (m :: * -> *) a.
WorkerConfig env -> ReaderT Tracer m a -> m a
traced WorkerConfig conf
conf ReaderT Tracer (LoggingT m) a
newtype WorkerContextM m a = WorkerContextM (ReaderT Tracer (LoggingT m) a)
deriving newtype ((forall a b. (a -> b) -> WorkerContextM m a -> WorkerContextM m b)
-> (forall a b. a -> WorkerContextM m b -> WorkerContextM m a)
-> Functor (WorkerContextM m)
forall a b. a -> WorkerContextM m b -> WorkerContextM m a
forall a b. (a -> b) -> WorkerContextM m a -> WorkerContextM m b
forall (m :: * -> *) a b.
Functor m =>
a -> WorkerContextM m b -> WorkerContextM m a
forall (m :: * -> *) a b.
Functor m =>
(a -> b) -> WorkerContextM m a -> WorkerContextM m b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
$cfmap :: forall (m :: * -> *) a b.
Functor m =>
(a -> b) -> WorkerContextM m a -> WorkerContextM m b
fmap :: forall a b. (a -> b) -> WorkerContextM m a -> WorkerContextM m b
$c<$ :: forall (m :: * -> *) a b.
Functor m =>
a -> WorkerContextM m b -> WorkerContextM m a
<$ :: forall a b. a -> WorkerContextM m b -> WorkerContextM m a
Functor, Functor (WorkerContextM m)
Functor (WorkerContextM m) =>
(forall a. a -> WorkerContextM m a)
-> (forall a b.
WorkerContextM m (a -> b)
-> WorkerContextM m a -> WorkerContextM m b)
-> (forall a b c.
(a -> b -> c)
-> WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m c)
-> (forall a b.
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m b)
-> (forall a b.
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m a)
-> Applicative (WorkerContextM m)
forall a. a -> WorkerContextM m a
forall a b.
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m a
forall a b.
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m b
forall a b.
WorkerContextM m (a -> b)
-> WorkerContextM m a -> WorkerContextM m b
forall a b c.
(a -> b -> c)
-> WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m c
forall (f :: * -> *).
Functor f =>
(forall a. a -> f a)
-> (forall a b. f (a -> b) -> f a -> f b)
-> (forall a b c. (a -> b -> c) -> f a -> f b -> f c)
-> (forall a b. f a -> f b -> f b)
-> (forall a b. f a -> f b -> f a)
-> Applicative f
forall (m :: * -> *). Applicative m => Functor (WorkerContextM m)
forall (m :: * -> *) a. Applicative m => a -> WorkerContextM m a
forall (m :: * -> *) a b.
Applicative m =>
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m a
forall (m :: * -> *) a b.
Applicative m =>
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m b
forall (m :: * -> *) a b.
Applicative m =>
WorkerContextM m (a -> b)
-> WorkerContextM m a -> WorkerContextM m b
forall (m :: * -> *) a b c.
Applicative m =>
(a -> b -> c)
-> WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m c
$cpure :: forall (m :: * -> *) a. Applicative m => a -> WorkerContextM m a
pure :: forall a. a -> WorkerContextM m a
$c<*> :: forall (m :: * -> *) a b.
Applicative m =>
WorkerContextM m (a -> b)
-> WorkerContextM m a -> WorkerContextM m b
<*> :: forall a b.
WorkerContextM m (a -> b)
-> WorkerContextM m a -> WorkerContextM m b
$cliftA2 :: forall (m :: * -> *) a b c.
Applicative m =>
(a -> b -> c)
-> WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m c
liftA2 :: forall a b c.
(a -> b -> c)
-> WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m c
$c*> :: forall (m :: * -> *) a b.
Applicative m =>
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m b
*> :: forall a b.
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m b
$c<* :: forall (m :: * -> *) a b.
Applicative m =>
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m a
<* :: forall a b.
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m a
Applicative, Applicative (WorkerContextM m)
Applicative (WorkerContextM m) =>
(forall a b.
WorkerContextM m a
-> (a -> WorkerContextM m b) -> WorkerContextM m b)
-> (forall a b.
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m b)
-> (forall a. a -> WorkerContextM m a)
-> Monad (WorkerContextM m)
forall a. a -> WorkerContextM m a
forall a b.
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m b
forall a b.
WorkerContextM m a
-> (a -> WorkerContextM m b) -> WorkerContextM m b
forall (m :: * -> *). Monad m => Applicative (WorkerContextM m)
forall (m :: * -> *) a. Monad m => a -> WorkerContextM m a
forall (m :: * -> *) a b.
Monad m =>
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m b
forall (m :: * -> *) a b.
Monad m =>
WorkerContextM m a
-> (a -> WorkerContextM m b) -> WorkerContextM m b
forall (m :: * -> *).
Applicative m =>
(forall a b. m a -> (a -> m b) -> m b)
-> (forall a b. m a -> m b -> m b)
-> (forall a. a -> m a)
-> Monad m
$c>>= :: forall (m :: * -> *) a b.
Monad m =>
WorkerContextM m a
-> (a -> WorkerContextM m b) -> WorkerContextM m b
>>= :: forall a b.
WorkerContextM m a
-> (a -> WorkerContextM m b) -> WorkerContextM m b
$c>> :: forall (m :: * -> *) a b.
Monad m =>
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m b
>> :: forall a b.
WorkerContextM m a -> WorkerContextM m b -> WorkerContextM m b
$creturn :: forall (m :: * -> *) a. Monad m => a -> WorkerContextM m a
return :: forall a. a -> WorkerContextM m a
Monad, Monad (WorkerContextM m)
Monad (WorkerContextM m) =>
(forall a. IO a -> WorkerContextM m a)
-> MonadIO (WorkerContextM m)
forall a. IO a -> WorkerContextM m a
forall (m :: * -> *).
Monad m =>
(forall a. IO a -> m a) -> MonadIO m
forall (m :: * -> *). MonadIO m => Monad (WorkerContextM m)
forall (m :: * -> *) a. MonadIO m => IO a -> WorkerContextM m a
$cliftIO :: forall (m :: * -> *) a. MonadIO m => IO a -> WorkerContextM m a
liftIO :: forall a. IO a -> WorkerContextM m a
MonadIO, MonadIO (WorkerContextM m)
MonadIO (WorkerContextM m) =>
(forall b.
((forall a. WorkerContextM m a -> IO a) -> IO b)
-> WorkerContextM m b)
-> MonadUnliftIO (WorkerContextM m)
forall b.
((forall a. WorkerContextM m a -> IO a) -> IO b)
-> WorkerContextM m b
forall (m :: * -> *).
MonadIO m =>
(forall b. ((forall a. m a -> IO a) -> IO b) -> m b)
-> MonadUnliftIO m
forall (m :: * -> *). MonadUnliftIO m => MonadIO (WorkerContextM m)
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. WorkerContextM m a -> IO a) -> IO b)
-> WorkerContextM m b
$cwithRunInIO :: forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. WorkerContextM m a -> IO a) -> IO b)
-> WorkerContextM m b
withRunInIO :: forall b.
((forall a. WorkerContextM m a -> IO a) -> IO b)
-> WorkerContextM m b
MonadUnliftIO, Monad (WorkerContextM m)
Monad (WorkerContextM m) =>
(forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> WorkerContextM m ())
-> MonadLogger (WorkerContextM m)
forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> WorkerContextM m ()
forall (m :: * -> *).
Monad m =>
(forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> m ())
-> MonadLogger m
forall (m :: * -> *). MonadIO m => Monad (WorkerContextM m)
forall (m :: * -> *) msg.
(MonadIO m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> WorkerContextM m ()
$cmonadLoggerLog :: forall (m :: * -> *) msg.
(MonadIO m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> WorkerContextM m ()
monadLoggerLog :: forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> WorkerContextM m ()
MonadLogger, MonadIO (WorkerContextM m)
MonadLogger (WorkerContextM m)
WorkerContextM m (Loc -> Text -> LogLevel -> LogStr -> IO ())
(MonadLogger (WorkerContextM m), MonadIO (WorkerContextM m)) =>
WorkerContextM m (Loc -> Text -> LogLevel -> LogStr -> IO ())
-> MonadLoggerIO (WorkerContextM m)
forall (m :: * -> *). MonadIO m => MonadIO (WorkerContextM m)
forall (m :: * -> *). MonadIO m => MonadLogger (WorkerContextM m)
forall (m :: * -> *).
MonadIO m =>
WorkerContextM m (Loc -> Text -> LogLevel -> LogStr -> IO ())
forall (m :: * -> *).
(MonadLogger m, MonadIO m) =>
m (Loc -> Text -> LogLevel -> LogStr -> IO ()) -> MonadLoggerIO m
$caskLoggerIO :: forall (m :: * -> *).
MonadIO m =>
WorkerContextM m (Loc -> Text -> LogLevel -> LogStr -> IO ())
askLoggerIO :: WorkerContextM m (Loc -> Text -> LogLevel -> LogStr -> IO ())
MonadLoggerIO, MonadThrow (WorkerContextM m)
MonadThrow (WorkerContextM m) =>
(forall e a.
(HasCallStack, Exception e) =>
WorkerContextM m a
-> (e -> WorkerContextM m a) -> WorkerContextM m a)
-> MonadCatch (WorkerContextM m)
forall e a.
(HasCallStack, Exception e) =>
WorkerContextM m a
-> (e -> WorkerContextM m a) -> WorkerContextM m a
forall (m :: * -> *).
MonadThrow m =>
(forall e a.
(HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a)
-> MonadCatch m
forall (m :: * -> *). MonadCatch m => MonadThrow (WorkerContextM m)
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
WorkerContextM m a
-> (e -> WorkerContextM m a) -> WorkerContextM m a
$ccatch :: forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
WorkerContextM m a
-> (e -> WorkerContextM m a) -> WorkerContextM m a
catch :: forall e a.
(HasCallStack, Exception e) =>
WorkerContextM m a
-> (e -> WorkerContextM m a) -> WorkerContextM m a
MonadCatch, Monad (WorkerContextM m)
Monad (WorkerContextM m) =>
(forall e a.
(HasCallStack, Exception e) =>
e -> WorkerContextM m a)
-> MonadThrow (WorkerContextM m)
forall e a. (HasCallStack, Exception e) => e -> WorkerContextM m a
forall (m :: * -> *).
Monad m =>
(forall e a. (HasCallStack, Exception e) => e -> m a)
-> MonadThrow m
forall (m :: * -> *). MonadThrow m => Monad (WorkerContextM m)
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> WorkerContextM m a
$cthrowM :: forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> WorkerContextM m a
throwM :: forall e a. (HasCallStack, Exception e) => e -> WorkerContextM m a
instance Monad m => MonadTracer (WorkerContextM m) where
getTracer :: WorkerContextM m Tracer
getTracer = ReaderT Tracer (LoggingT m) Tracer -> WorkerContextM m Tracer
forall (m :: * -> *) a.
ReaderT Tracer (LoggingT m) a -> WorkerContextM m a
WorkerContextM ReaderT Tracer (LoggingT m) Tracer
forall r (m :: * -> *). MonadReader r m => m r
startWorker :: (MonadUnliftIO m, MonadCatch m) => Client -> WorkerConfig actEnv -> m (Temporal.Worker.Worker actEnv)
startWorker :: forall (m :: * -> *) actEnv.
(MonadUnliftIO m, MonadCatch m) =>
Client -> WorkerConfig actEnv -> m (Worker actEnv)
startWorker Client
client WorkerConfig actEnv
conf = (RequireCallStackImpl => m (Worker actEnv)) -> m (Worker actEnv)
forall r. (RequireCallStackImpl => r) -> r
provideCallStack ((RequireCallStackImpl => m (Worker actEnv)) -> m (Worker actEnv))
-> (RequireCallStackImpl => m (Worker actEnv)) -> m (Worker actEnv)
forall a b. (a -> b) -> a -> b
$ WorkerConfig actEnv
-> WorkerContextM m (Worker actEnv) -> m (Worker actEnv)
forall conf (m :: * -> *) a.
WorkerConfig conf -> WorkerContextM m a -> m a
runWorkerContext WorkerConfig actEnv
conf (WorkerContextM m (Worker actEnv) -> m (Worker actEnv))
-> WorkerContextM m (Worker actEnv) -> m (Worker actEnv)
forall a b. (a -> b) -> a -> b
$ Text
-> SpanArguments
-> WorkerContextM m (Worker actEnv)
-> WorkerContextM m (Worker actEnv)
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadTracer m, HasCallStack) =>
Text -> SpanArguments -> m a -> m a
inSpan Text
"startWorker" SpanArguments
defaultSpanArguments (WorkerContextM m (Worker actEnv)
-> WorkerContextM m (Worker actEnv))
-> WorkerContextM m (Worker actEnv)
-> WorkerContextM m (Worker actEnv)
forall a b. (a -> b) -> a -> b
$ do
workerCore <-
-> SpanArguments
-> WorkerContextM m (Worker 'Real)
-> WorkerContextM m (Worker 'Real)
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadTracer m, HasCallStack) =>
Text -> SpanArguments -> m a -> m a
((WorkerError -> WorkerContextM m (Worker 'Real))
-> (Worker 'Real -> WorkerContextM m (Worker 'Real))
-> Either WorkerError (Worker 'Real)
-> WorkerContextM m (Worker 'Real)
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either WorkerError -> WorkerContextM m (Worker 'Real)
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO Worker 'Real -> WorkerContextM m (Worker 'Real)
forall a. a -> WorkerContextM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either WorkerError (Worker 'Real)
-> WorkerContextM m (Worker 'Real))
-> WorkerContextM m (Either WorkerError (Worker 'Real))
-> WorkerContextM m (Worker 'Real)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO (Either WorkerError (Worker 'Real))
-> WorkerContextM m (Either WorkerError (Worker 'Real))
forall a. IO a -> WorkerContextM m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Client -> WorkerConfig -> IO (Either WorkerError (Worker 'Real))
Core.newWorker Client
client WorkerConfig actEnv
validationRes <- liftIO $ Core.validateWorker workerCore
workerEvictionEmitter <- newBroadcastTChanIO
case validationRes of
Left WorkerValidationError
err -> WorkerValidationError -> WorkerContextM m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO WorkerValidationError
Right () -> () -> WorkerContextM m ()
forall a. a -> WorkerContextM m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
runningWorkflows <- newTVarIO mempty
runningActivities <- newTVarIO mempty
activityEnv <- newIORef conf.actEnv
let errorConverters = [ApplicationFailureHandler] -> [ApplicationFailureHandler]
mkAnnotatedHandlers WorkerConfig actEnv
workerWorkflowFunctions = WorkerConfig actEnv
workerTaskQueue = Text -> TaskQueue
TaskQueue (Text -> TaskQueue) -> Text -> TaskQueue
forall a b. (a -> b) -> a -> b
$ WorkerConfig -> Text
Core.taskQueue WorkerConfig actEnv
workerInboundInterceptors = WorkerConfig actEnv
workerOutboundInterceptors = WorkerConfig actEnv
workerDeadlockTimeout = WorkerConfig actEnv
workerErrorConverters = [ApplicationFailureHandler]
processor = WorkerConfig actEnv
workflowWorker = Workflow.WorkflowWorker {[ApplicationFailureHandler]
Maybe Int
HashMap Text WorkflowDefinition
TVar (HashMap RunId WorkflowInstance)
TChan EvictionWithRunID
Worker 'Real
InactiveForReplay 'Real Client
workerEvictionEmitter :: TChan EvictionWithRunID
processor :: PayloadProcessor
workerErrorConverters :: [ApplicationFailureHandler]
workerTaskQueue :: TaskQueue
workerDeadlockTimeout :: Maybe Int
workerOutboundInterceptors :: WorkflowOutboundInterceptor
workerInboundInterceptors :: WorkflowInboundInterceptor
workerCore :: Worker 'Real
workerClient :: InactiveForReplay 'Real Client
runningWorkflows :: TVar (HashMap RunId WorkflowInstance)
workerWorkflowFunctions :: HashMap Text WorkflowDefinition
workerCore :: Worker 'Real
workerEvictionEmitter :: TChan EvictionWithRunID
runningWorkflows :: TVar (HashMap RunId WorkflowInstance)
workerWorkflowFunctions :: HashMap Text WorkflowDefinition
workerTaskQueue :: TaskQueue
workerInboundInterceptors :: WorkflowInboundInterceptor
workerOutboundInterceptors :: WorkflowOutboundInterceptor
workerDeadlockTimeout :: Maybe Int
workerErrorConverters :: [ApplicationFailureHandler]
processor :: PayloadProcessor
workerClient :: Client
definitions = WorkerConfig actEnv
activityInboundInterceptors = WorkerConfig actEnv
activityOutboundInterceptors = WorkerConfig actEnv
clientInterceptors = WorkerConfig actEnv
activityErrorConverters = [ApplicationFailureHandler]
payloadProcessor = WorkerConfig actEnv
workerActivityWorker = Activity.ActivityWorker {[ApplicationFailureHandler]
HashMap Text (ActivityDefinition actEnv)
TVar (HashMap TaskToken (Async ()))
IORef actEnv
Worker 'Real
ActivityOutboundInterceptor actEnv
ActivityInboundInterceptor actEnv
workerCore :: Worker 'Real
runningActivities :: TVar (HashMap TaskToken (Async ()))
activityEnv :: IORef actEnv
definitions :: HashMap Text (ActivityDefinition actEnv)
activityInboundInterceptors :: ActivityInboundInterceptor actEnv
activityOutboundInterceptors :: ActivityOutboundInterceptor actEnv
clientInterceptors :: ClientInterceptors
activityErrorConverters :: [ApplicationFailureHandler]
payloadProcessor :: PayloadProcessor
payloadProcessor :: PayloadProcessor
activityErrorConverters :: [ApplicationFailureHandler]
clientInterceptors :: ClientInterceptors
activityOutboundInterceptors :: ActivityOutboundInterceptor actEnv
activityInboundInterceptors :: ActivityInboundInterceptor actEnv
workerCore :: Worker 'Real
runningActivities :: TVar (HashMap TaskToken (Async ()))
definitions :: HashMap Text (ActivityDefinition actEnv)
activityEnv :: IORef actEnv
workerClient = Client
workerTracer = TracerProvider -> InstrumentationLibrary -> TracerOptions -> Tracer
makeTracer WorkerConfig actEnv
conf.tracerProvider (Text -> Text -> InstrumentationLibrary
InstrumentationLibrary Text
"hs-temporal-sdk" Text
"") TracerOptions
let workerType = SWorkerType 'Real
workerWorkflowLoop <- asyncLabelled (T.unpack $ T.concat ["temporal/worker/workflow/", Core.namespace conf.coreConfig, "/", Core.taskQueue conf.coreConfig]) $ do
$(logDebug) "Starting workflow worker loop"
Workflow.execute workflowWorker
`UnliftIO.finally` $(logDebug) "Exiting workflow worker loop"
workerActivityLoop <- asyncLabelled (T.unpack $ T.concat ["temporal/worker/activity/", Core.namespace conf.coreConfig, "/", Core.taskQueue conf.coreConfig]) $ do
$(logDebug) "Starting activity worker loop"
Activity.execute workerActivityWorker
`UnliftIO.finally` $(logDebug) "Exiting activity worker loop"
pure Temporal.Worker.Worker {..}
waitWorker :: (MonadIO m) => Temporal.Worker.Worker actEnv -> m ()
waitWorker :: forall (m :: * -> *) actEnv. MonadIO m => Worker actEnv -> m ()
waitWorker (Temporal.Worker.Worker {SWorkerType ty
workerType :: ()
workerType :: SWorkerType ty
workerType, Async ()
workerWorkflowLoop :: forall env. Worker env -> Async ()
workerWorkflowLoop :: Async ()
workerWorkflowLoop, InactiveForReplay ty (Async ())
workerActivityLoop :: ()
workerActivityLoop :: InactiveForReplay ty (Async ())
workerActivityLoop}) = do
case SWorkerType ty
workerType of
SWorkerType ty
Core.SReal -> do
wfRes <- Async () -> m (Either SomeException ())
forall (m :: * -> *) a.
MonadIO m =>
Async a -> m (Either SomeException a)
waitCatch Async ()
actRes <- waitCatch workerActivityLoop
for_ (lefts [wfRes, actRes]) throwIO
SWorkerType ty
Core.SReplay -> do
_ <- Async () -> m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m a
wait Async ()
pure ()
linkWorker :: Temporal.Worker.Worker actEnv -> IO ()
linkWorker :: forall actEnv. Worker actEnv -> IO ()
linkWorker Temporal.Worker.Worker {Worker ty
workerCore :: ()
workerCore :: Worker ty
workerCore, SWorkerType ty
workerType :: ()
workerType :: SWorkerType ty
workerType, Async ()
workerWorkflowLoop :: forall env. Worker env -> Async ()
workerWorkflowLoop :: Async ()
workerWorkflowLoop, InactiveForReplay ty (Async ())
workerActivityLoop :: ()
workerActivityLoop :: InactiveForReplay ty (Async ())
workerActivityLoop} = do
tid <- IO ThreadId
void $ forkIOLabelled (T.unpack $ T.concat ["temporal/worker/link/", Core.namespace c, "/", Core.taskQueue c]) $ do
let errorClause :: SomeException -> IO ()
errorClause SomeException
e = do
Worker ty -> IO ()
forall (ty :: WorkerType). KnownWorkerType ty => Worker ty -> IO ()
Core.initiateShutdown Worker ty
ThreadId -> SomeException -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
Control.Concurrent.throwTo ThreadId
tid SomeException
case workerType of
SWorkerType ty
Core.SReal -> do
eeRes <- IO (Either SomeException ())
-> IO (Either SomeException ())
-> IO (Either (Either SomeException ()) (Either SomeException ()))
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> m b -> m (Either a b)
race (Async () -> IO (Either SomeException ())
forall (m :: * -> *) a.
MonadIO m =>
Async a -> m (Either SomeException a)
waitCatch Async ()
workerWorkflowLoop) (Async () -> IO (Either SomeException ())
forall (m :: * -> *) a.
MonadIO m =>
Async a -> m (Either SomeException a)
waitCatch Async ()
InactiveForReplay ty (Async ())
case eeRes of
Left (Left SomeException
e) -> SomeException -> IO ()
errorClause SomeException
Right (Left SomeException
e) -> SomeException -> IO ()
errorClause SomeException
Either (Either SomeException ()) (Either SomeException ())
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
SWorkerType ty
Core.SReplay -> do
eRes <- Async () -> IO (Either SomeException ())
forall (m :: * -> *) a.
MonadIO m =>
Async a -> m (Either SomeException a)
waitCatch Async ()
case eRes of
Left SomeException
e -> SomeException -> IO ()
errorClause SomeException
Either SomeException ()
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
c :: WorkerConfig
c = Worker ty -> WorkerConfig
forall (ty :: WorkerType). Worker ty -> WorkerConfig
Core.getWorkerConfig Worker ty
pollWorkerSTM :: Temporal.Worker.Worker actEnv -> STM (Maybe (Either SomeException ()))
pollWorkerSTM :: forall actEnv.
Worker actEnv -> STM (Maybe (Either SomeException ()))
pollWorkerSTM Temporal.Worker.Worker {SWorkerType ty
workerType :: ()
workerType :: SWorkerType ty
workerType, Async ()
workerWorkflowLoop :: forall env. Worker env -> Async ()
workerWorkflowLoop :: Async ()
workerWorkflowLoop, InactiveForReplay ty (Async ())
workerActivityLoop :: ()
workerActivityLoop :: InactiveForReplay ty (Async ())
workerActivityLoop} = do
case SWorkerType ty
workerType of
SWorkerType ty
Core.SReal -> do
wfRes <- Async () -> STM (Maybe (Either SomeException ()))
forall a. Async a -> STM (Maybe (Either SomeException a))
pollSTM Async ()
actRes <- pollSTM workerActivityLoop
pure $ case (wfRes, actRes) of
(Just (Left SomeException
e), Maybe (Either SomeException ())
_) -> Either SomeException () -> Maybe (Either SomeException ())
forall a. a -> Maybe a
Just (SomeException -> Either SomeException ()
forall a b. a -> Either a b
Left SomeException
(Maybe (Either SomeException ())
_, Just (Left SomeException
e)) -> Either SomeException () -> Maybe (Either SomeException ())
forall a. a -> Maybe a
Just (SomeException -> Either SomeException ()
forall a b. a -> Either a b
Left SomeException
(Just (Right ()), Just (Right ())) -> Either SomeException () -> Maybe (Either SomeException ())
forall a. a -> Maybe a
Just (() -> Either SomeException ()
forall a b. b -> Either a b
Right ())
(Maybe (Either SomeException ()), Maybe (Either SomeException ()))
_ -> Maybe (Either SomeException ())
forall a. Maybe a
SWorkerType ty
Core.SReplay -> Async () -> STM (Maybe (Either SomeException ()))
forall a. Async a -> STM (Maybe (Either SomeException a))
pollSTM Async ()
waitWorkerSTM :: Temporal.Worker.Worker actEnv -> STM ()
waitWorkerSTM :: forall actEnv. Worker actEnv -> STM ()
waitWorkerSTM Temporal.Worker.Worker {SWorkerType ty
workerType :: ()
workerType :: SWorkerType ty
workerType, Async ()
workerWorkflowLoop :: forall env. Worker env -> Async ()
workerWorkflowLoop :: Async ()
workerWorkflowLoop, InactiveForReplay ty (Async ())
workerActivityLoop :: ()
workerActivityLoop :: InactiveForReplay ty (Async ())
workerActivityLoop} = do
case SWorkerType ty
workerType of
SWorkerType ty
Core.SReal -> do
Async () -> STM ()
forall a. Async a -> STM a
waitSTM Async ()
Async () -> STM ()
forall a. Async a -> STM a
waitSTM Async ()
InactiveForReplay ty (Async ())
SWorkerType ty
Core.SReplay -> Async () -> STM ()
forall a. Async a -> STM a
waitSTM Async ()
shutdown :: (MonadUnliftIO m) => Temporal.Worker.Worker actEnv -> m ()
shutdown :: forall (m :: * -> *) actEnv.
MonadUnliftIO m =>
Worker actEnv -> m ()
shutdown worker :: Worker actEnv
worker@Temporal.Worker.Worker {Worker ty
workerCore :: ()
workerCore :: Worker ty
workerCore, Tracer
workerTracer :: forall env. Worker env -> Tracer
workerTracer :: Tracer
workerTracer, SWorkerType ty
workerType :: ()
workerType :: SWorkerType ty
workerType, InactiveForReplay ty (ActivityWorker actEnv)
workerActivityWorker :: ()
workerActivityWorker :: InactiveForReplay ty (ActivityWorker actEnv)
workerActivityWorker} = Tracer -> Text -> SpanArguments -> m () -> m ()
forall (m :: * -> *) a.
(MonadUnliftIO m, HasCallStack) =>
Tracer -> Text -> SpanArguments -> m a -> m a
OT.inSpan Tracer
workerTracer Text
"shutdown" SpanArguments
defaultSpanArguments (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ ((forall a. m a -> m a) -> m ()) -> m ()
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> m a) -> m b) -> m b
UnliftIO.mask (((forall a. m a -> m a) -> m ()) -> m ())
-> ((forall a. m a -> m a) -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> m a
restore -> do
Tracer -> Text -> SpanArguments -> m () -> m ()
forall (m :: * -> *) a.
(MonadUnliftIO m, HasCallStack) =>
Tracer -> Text -> SpanArguments -> m a -> m a
OT.inSpan Tracer
workerTracer Text
"initiateShutdown" SpanArguments
defaultSpanArguments (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Worker ty -> IO ()
forall (ty :: WorkerType). KnownWorkerType ty => Worker ty -> IO ()
Core.initiateShutdown Worker ty
() <- case SWorkerType ty
workerType of
SWorkerType ty
Core.SReal -> ActivityWorker actEnv -> m ()
forall (m :: * -> *) env. MonadIO m => ActivityWorker env -> m ()
Activity.notifyShutdown InactiveForReplay ty (ActivityWorker actEnv)
ActivityWorker actEnv
SWorkerType ty
Core.SReplay -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
OT.inSpan workerTracer "waitWorker" defaultSpanArguments $ restore $ waitWorker worker
err' <- OT.inSpan workerTracer "finalizeShutdown" defaultSpanArguments $ liftIO $ Core.finalizeShutdown workerCore
case err' of
Left WorkerError
err -> WorkerError -> m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO WorkerError
Right () -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
subscribeToEvictions :: MonadIO m => Temporal.Worker.Worker actEnv -> m (TChan Workflow.EvictionWithRunID)
subscribeToEvictions :: forall (m :: * -> *) actEnv.
MonadIO m =>
Worker actEnv -> m (TChan EvictionWithRunID)
subscribeToEvictions Worker actEnv
worker = IO (TChan EvictionWithRunID) -> m (TChan EvictionWithRunID)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TChan EvictionWithRunID) -> m (TChan EvictionWithRunID))
-> IO (TChan EvictionWithRunID) -> m (TChan EvictionWithRunID)
forall a b. (a -> b) -> a -> b
$ STM (TChan EvictionWithRunID) -> IO (TChan EvictionWithRunID)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (TChan EvictionWithRunID) -> IO (TChan EvictionWithRunID))
-> STM (TChan EvictionWithRunID) -> IO (TChan EvictionWithRunID)
forall a b. (a -> b) -> a -> b
$ TChan EvictionWithRunID -> STM (TChan EvictionWithRunID)
forall a. TChan a -> STM (TChan a)
dupTChan Worker actEnv
subscribeToEvictionsSTM :: Temporal.Worker.Worker actEnv -> STM (TChan Workflow.EvictionWithRunID)
subscribeToEvictionsSTM :: forall actEnv. Worker actEnv -> STM (TChan EvictionWithRunID)
subscribeToEvictionsSTM Worker actEnv
worker = TChan EvictionWithRunID -> STM (TChan EvictionWithRunID)
forall a. TChan a -> STM (TChan a)
dupTChan Worker actEnv
evictionMessage :: Workflow.EvictionWithRunID -> Text
evictionMessage :: EvictionWithRunID -> Text
evictionMessage Workflow.EvictionWithRunID {RemoveFromCache
eviction :: RemoveFromCache
eviction :: EvictionWithRunID -> RemoveFromCache
eviction} = RemoveFromCache
eviction RemoveFromCache
-> FoldLike Text RemoveFromCache RemoveFromCache Text Text -> Text
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike Text RemoveFromCache RemoveFromCache Text Text
forall (f :: * -> *) s a.
(Functor f, HasField s "message" a) =>
LensLike' f s a
evictionWasFatal :: Workflow.EvictionWithRunID -> Bool
evictionWasFatal :: EvictionWithRunID -> Bool
evictionWasFatal Workflow.EvictionWithRunID {RemoveFromCache
eviction :: EvictionWithRunID -> RemoveFromCache
eviction :: RemoveFromCache
eviction} = case RemoveFromCache
eviction RemoveFromCache
-> FoldLike
-> RemoveFromCache'EvictionReason
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
forall (f :: * -> *) s a.
(Functor f, HasField s "reason" a) =>
LensLike' f s a
Activation.reason of
RemoveFromCache'FATAL -> Bool
_ -> Bool
replaceEnvironment :: MonadIO m => Temporal.Worker.Worker actEnv -> actEnv -> m ()
replaceEnvironment :: forall (m :: * -> *) actEnv.
MonadIO m =>
Worker actEnv -> actEnv -> m ()
replaceEnvironment Temporal.Worker.Worker {Async ()
TChan EvictionWithRunID
Worker ty
InactiveForReplay ty (Async ())
InactiveForReplay ty (ActivityWorker actEnv)
SWorkerType ty
workerType :: ()
workerWorkflowLoop :: forall env. Worker env -> Async ()
workerActivityLoop :: ()
workerActivityWorker :: ()
workerCore :: ()
workerTracer :: forall env. Worker env -> Tracer
workerEvictionEmitter :: forall env. Worker env -> TChan EvictionWithRunID
workerType :: SWorkerType ty
workerWorkflowLoop :: Async ()
workerActivityLoop :: InactiveForReplay ty (Async ())
workerActivityWorker :: InactiveForReplay ty (ActivityWorker actEnv)
workerCore :: Worker ty
workerTracer :: Tracer
workerEvictionEmitter :: TChan EvictionWithRunID
..} actEnv
env = do
case SWorkerType ty
workerType of
SWorkerType ty
Core.SReal -> do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef actEnv -> actEnv -> IO ()
forall (m :: * -> *) a. MonadIO m => IORef a -> a -> m ()
writeIORef InactiveForReplay ty (ActivityWorker actEnv)
ActivityWorker actEnv
workerActivityWorker.activityEnv actEnv
SWorkerType ty
Core.SReplay -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()