{-# LANGUAGE TemplateHaskell #-}
module Temporal.Workflow.Worker where
import qualified Control.Exception.Annotated as Ann
import Control.Monad
import Control.Monad.Catch (MonadCatch)
import Control.Monad.Logger
import Control.Monad.Reader
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as HashMap
import Data.Maybe
import Data.ProtoLens
import Data.Text (Text)
import qualified Data.Text as Text
import qualified Data.Vector as V
import Lens.Family2
import OpenTelemetry.Context.ThreadLocal
import OpenTelemetry.Trace.Core hiding (inSpan, inSpan')
import OpenTelemetry.Trace.Monad
import qualified Proto.Temporal.Api.Common.V1.Message_Fields as Message
import qualified Proto.Temporal.Api.Failure.V1.Message_Fields as F
import qualified Proto.Temporal.Sdk.Core.Common.Common_Fields as CommonProto
import Proto.Temporal.Sdk.Core.WorkflowActivation.WorkflowActivation
import qualified Proto.Temporal.Sdk.Core.WorkflowActivation.WorkflowActivation_Fields as Activation
import qualified Proto.Temporal.Sdk.Core.WorkflowCompletion.WorkflowCompletion as Completion
import qualified Proto.Temporal.Sdk.Core.WorkflowCompletion.WorkflowCompletion_Fields as Completion
import RequireCallStack
import Temporal.Common
import Temporal.Common.Async
import qualified Temporal.Core.Client as C
import Temporal.Core.Worker (InactiveForReplay)
import qualified Temporal.Core.Worker as Core
import Temporal.Duration (durationFromProto)
import qualified Temporal.Exception as Err
import Temporal.Payload
import Temporal.SearchAttributes.Internal
import Temporal.Workflow.Definition
import Temporal.Workflow.Internal.Monad hiding (try)
import Temporal.WorkflowInstance
import UnliftIO
data EvictionWithRunID = EvictionWithRunID
{ EvictionWithRunID -> RunId
runId :: RunId
, EvictionWithRunID -> RemoveFromCache
eviction :: RemoveFromCache
}
deriving stock (Int -> EvictionWithRunID -> ShowS
[EvictionWithRunID] -> ShowS
EvictionWithRunID -> String
(Int -> EvictionWithRunID -> ShowS)
-> (EvictionWithRunID -> String)
-> ([EvictionWithRunID] -> ShowS)
-> Show EvictionWithRunID
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> EvictionWithRunID -> ShowS
showsPrec :: Int -> EvictionWithRunID -> ShowS
$cshow :: EvictionWithRunID -> String
show :: EvictionWithRunID -> String
$cshowList :: [EvictionWithRunID] -> ShowS
showList :: [EvictionWithRunID] -> ShowS
Show)
data WorkflowWorker = forall ty.
Core.KnownWorkerType ty =>
WorkflowWorker
{ WorkflowWorker -> HashMap Text WorkflowDefinition
workerWorkflowFunctions :: {-# UNPACK #-} !(HashMap Text WorkflowDefinition)
, WorkflowWorker -> TVar (HashMap RunId WorkflowInstance)
runningWorkflows :: {-# UNPACK #-} !(TVar (HashMap RunId WorkflowInstance))
, ()
workerClient :: InactiveForReplay ty C.Client
, ()
workerCore :: Core.Worker ty
, WorkflowWorker -> WorkflowInboundInterceptor
workerInboundInterceptors :: {-# UNPACK #-} !WorkflowInboundInterceptor
, WorkflowWorker -> WorkflowOutboundInterceptor
workerOutboundInterceptors :: {-# UNPACK #-} !WorkflowOutboundInterceptor
, WorkflowWorker -> Maybe Int
workerDeadlockTimeout :: Maybe Int
, WorkflowWorker -> TaskQueue
workerTaskQueue :: TaskQueue
, WorkflowWorker -> [ApplicationFailureHandler]
workerErrorConverters :: [Err.ApplicationFailureHandler]
, WorkflowWorker -> PayloadProcessor
processor :: {-# UNPACK #-} !PayloadProcessor
, WorkflowWorker -> TChan EvictionWithRunID
workerEvictionEmitter :: TChan EvictionWithRunID
}
pollWorkflowActivation :: (MonadLoggerIO m) => ReaderT WorkflowWorker m (Either Core.WorkerError Core.WorkflowActivation)
pollWorkflowActivation :: forall (m :: * -> *).
MonadLoggerIO m =>
ReaderT WorkflowWorker m (Either WorkerError WorkflowActivation)
pollWorkflowActivation = do
WorkflowWorker {workerCore} <- ReaderT WorkflowWorker m WorkflowWorker
forall r (m :: * -> *). MonadReader r m => m r
ask
liftIO $ Core.pollWorkflowActivation workerCore
upsertWorkflowInstance :: (MonadLoggerIO m) => RunId -> WorkflowInstance -> ReaderT WorkflowWorker m WorkflowInstance
upsertWorkflowInstance :: forall (m :: * -> *).
MonadLoggerIO m =>
RunId
-> WorkflowInstance -> ReaderT WorkflowWorker m WorkflowInstance
upsertWorkflowInstance RunId
r WorkflowInstance
inst = do
worker <- ReaderT WorkflowWorker m WorkflowWorker
forall r (m :: * -> *). MonadReader r m => m r
ask
liftIO $ atomically $ do
workflows <- readTVar worker.runningWorkflows
case HashMap.lookup r workflows of
Maybe WorkflowInstance
Nothing -> do
let workflows' :: HashMap RunId WorkflowInstance
workflows' = RunId
-> WorkflowInstance
-> HashMap RunId WorkflowInstance
-> HashMap RunId WorkflowInstance
forall k v.
(Eq k, Hashable k) =>
k -> v -> HashMap k v -> HashMap k v
HashMap.insert RunId
r WorkflowInstance
inst HashMap RunId WorkflowInstance
workflows
TVar (HashMap RunId WorkflowInstance)
-> HashMap RunId WorkflowInstance -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar WorkflowWorker
worker.runningWorkflows HashMap RunId WorkflowInstance
workflows'
WorkflowInstance -> STM WorkflowInstance
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure WorkflowInstance
inst
Just WorkflowInstance
existingInstance -> do
TVar (HashMap RunId WorkflowInstance)
-> HashMap RunId WorkflowInstance -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar WorkflowWorker
worker.runningWorkflows HashMap RunId WorkflowInstance
workflows
WorkflowInstance -> STM WorkflowInstance
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure WorkflowInstance
existingInstance
whileM_ :: (Monad m) => m Bool -> m ()
whileM_ :: forall (m :: * -> *). Monad m => m Bool -> m ()
whileM_ m Bool
p = m ()
go
where
go :: m ()
go = do
x <- m Bool
p
when x go
execute :: (MonadLoggerIO m, MonadUnliftIO m, MonadCatch m, MonadTracer m, RequireCallStack) => WorkflowWorker -> m ()
execute :: forall (m :: * -> *).
(MonadLoggerIO m, MonadUnliftIO m, MonadCatch m, MonadTracer m,
RequireCallStack) =>
WorkflowWorker -> m ()
execute worker :: WorkflowWorker
worker@WorkflowWorker {Worker ty
workerCore :: ()
workerCore :: Worker ty
workerCore} = (ReaderT WorkflowWorker m () -> WorkflowWorker -> m ())
-> WorkflowWorker -> ReaderT WorkflowWorker m () -> m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip ReaderT WorkflowWorker m () -> WorkflowWorker -> m ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT WorkflowWorker
worker (ReaderT WorkflowWorker m () -> m ())
-> ReaderT WorkflowWorker m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
$(logDebug) Text
"Starting workflow worker"
ReaderT WorkflowWorker m Bool -> ReaderT WorkflowWorker m ()
forall (m :: * -> *). Monad m => m Bool -> m ()
whileM_ ReaderT WorkflowWorker m Bool
go
where
c :: WorkerConfig
c = Worker ty -> WorkerConfig
forall (ty :: WorkerType). Worker ty -> WorkerConfig
Core.getWorkerConfig Worker ty
workerCore
go :: ReaderT WorkflowWorker m Bool
go = Text
-> SpanArguments
-> (Span -> ReaderT WorkflowWorker m Bool)
-> ReaderT WorkflowWorker m Bool
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadTracer m, HasCallStack) =>
Text -> SpanArguments -> (Span -> m a) -> m a
inSpan' Text
"Workflow activation step" SpanArguments
defaultSpanArguments ((Span -> ReaderT WorkflowWorker m Bool)
-> ReaderT WorkflowWorker m Bool)
-> (Span -> ReaderT WorkflowWorker m Bool)
-> ReaderT WorkflowWorker m Bool
forall a b. (a -> b) -> a -> b
$ \Span
s -> do
eActivation <- ReaderT WorkflowWorker m (Either WorkerError WorkflowActivation)
forall (m :: * -> *).
MonadLoggerIO m =>
ReaderT WorkflowWorker m (Either WorkerError WorkflowActivation)
pollWorkflowActivation
case eActivation of
(Left (Core.WorkerError WorkerErrorCode
Core.PollShutdown Text
_)) -> do
$(logDebug) Text
"Poller shutting down"
runningWorkflows <- TVar (HashMap RunId WorkflowInstance)
-> ReaderT WorkflowWorker m (HashMap RunId WorkflowInstance)
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO WorkflowWorker
worker.runningWorkflows
mapM_ (cancel <=< readIORef . executionThread) runningWorkflows
pure False
(Left WorkerError
err) -> do
$(logError) (Text -> ReaderT WorkflowWorker m ())
-> Text -> ReaderT WorkflowWorker m ()
forall a b. (a -> b) -> a -> b
$ String -> Text
Text.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ WorkerError -> String
forall a. Show a => a -> String
show WorkerError
err
Span
-> HashMap Text Attribute
-> Maybe Timestamp
-> WorkerError
-> ReaderT WorkflowWorker m ()
forall (m :: * -> *) e.
(MonadIO m, Exception e) =>
Span -> HashMap Text Attribute -> Maybe Timestamp -> e -> m ()
recordException Span
s HashMap Text Attribute
forall a. Monoid a => a
mempty Maybe Timestamp
forall a. Maybe a
Nothing WorkerError
err
Bool -> ReaderT WorkflowWorker m Bool
forall a. a -> ReaderT WorkflowWorker m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
(Right WorkflowActivation
activation) -> do
$(logDebug) (Text -> ReaderT WorkflowWorker m ())
-> Text -> ReaderT WorkflowWorker m ()
forall a b. (a -> b) -> a -> b
$ String -> Text
Text.pack (String
"Got activation " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> WorkflowActivation -> String
forall a. Show a => a -> String
show WorkflowActivation
activation)
activationCtxt <- ReaderT WorkflowWorker m Context
forall (m :: * -> *). MonadIO m => m Context
getContext
activator <- asyncLabelled (Text.unpack $ Text.concat ["temporal/worker/workflow/activate", Core.namespace c, "/", Core.taskQueue c]) $ do
_ <- attachContext activationCtxt
handleActivation activation
link activator
pure True
handleActivation :: forall m. (MonadUnliftIO m, MonadLoggerIO m, MonadCatch m, MonadTracer m) => Core.WorkflowActivation -> ReaderT WorkflowWorker m ()
handleActivation :: forall (m :: * -> *).
(MonadUnliftIO m, MonadLoggerIO m, MonadCatch m, MonadTracer m) =>
WorkflowActivation -> ReaderT WorkflowWorker m ()
handleActivation WorkflowActivation
activation = Text
-> SpanArguments
-> (Span -> ReaderT WorkflowWorker m ())
-> ReaderT WorkflowWorker m ()
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadTracer m, HasCallStack) =>
Text -> SpanArguments -> (Span -> m a) -> m a
inSpan' Text
"handleActivation" (SpanArguments
defaultSpanArguments {attributes = HashMap.fromList [("temporal.activation.run_id", toAttribute $ activation ^. Activation.runId)]}) ((Span -> ReaderT WorkflowWorker m ())
-> ReaderT WorkflowWorker m ())
-> (Span -> ReaderT WorkflowWorker m ())
-> ReaderT WorkflowWorker m ()
forall a b. (a -> b) -> a -> b
$ \Span
_s -> do
$(logDebug) (Text
"Handling activation: RunId " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
Text.pack (Text -> String
forall a. Show a => a -> String
show (WorkflowActivation
activation WorkflowActivation
-> FoldLike Text WorkflowActivation WorkflowActivation Text Text
-> Text
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike Text WorkflowActivation WorkflowActivation Text Text
forall (f :: * -> *) s a.
(Functor f, HasField s "runId" a) =>
LensLike' f s a
Activation.runId)))
[WorkflowActivationJob]
-> (WorkflowActivationJob -> ReaderT WorkflowWorker m ())
-> ReaderT WorkflowWorker m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (WorkflowActivation
activation WorkflowActivation
-> FoldLike
[WorkflowActivationJob]
WorkflowActivation
WorkflowActivation
[WorkflowActivationJob]
[WorkflowActivationJob]
-> [WorkflowActivationJob]
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
[WorkflowActivationJob]
WorkflowActivation
WorkflowActivation
[WorkflowActivationJob]
[WorkflowActivationJob]
forall (f :: * -> *) s a.
(Functor f, HasField s "jobs" a) =>
LensLike' f s a
Activation.jobs) ((WorkflowActivationJob -> ReaderT WorkflowWorker m ())
-> ReaderT WorkflowWorker m ())
-> (WorkflowActivationJob -> ReaderT WorkflowWorker m ())
-> ReaderT WorkflowWorker m ()
forall a b. (a -> b) -> a -> b
$ \WorkflowActivationJob
job -> do
$(logDebug) (Text
"Job: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
Text.pack (WorkflowActivationJob -> String
forall a. Show a => a -> String
show WorkflowActivationJob
job))
WorkflowWorker {workerCore} <- ReaderT WorkflowWorker m WorkflowWorker
forall r (m :: * -> *). MonadReader r m => m r
ask
if shouldRun
then do
mInst <- createOrFetchWorkflowInstance
forM_ mInst $ \WorkflowInstance
inst -> do
let withoutStart :: [WorkflowActivationJob]
withoutStart = (WorkflowActivationJob -> Bool)
-> [WorkflowActivationJob] -> [WorkflowActivationJob]
forall a. (a -> Bool) -> [a] -> [a]
filter (\WorkflowActivationJob
job -> Maybe StartWorkflow -> Bool
forall a. Maybe a -> Bool
isNothing (WorkflowActivationJob
job WorkflowActivationJob
-> FoldLike
(Maybe StartWorkflow)
WorkflowActivationJob
WorkflowActivationJob
(Maybe StartWorkflow)
(Maybe StartWorkflow)
-> Maybe StartWorkflow
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
(Maybe StartWorkflow)
WorkflowActivationJob
WorkflowActivationJob
(Maybe StartWorkflow)
(Maybe StartWorkflow)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'startWorkflow" a) =>
LensLike' f s a
Activation.maybe'startWorkflow)) (WorkflowActivation
activation WorkflowActivation
-> FoldLike
[WorkflowActivationJob]
WorkflowActivation
WorkflowActivation
[WorkflowActivationJob]
[WorkflowActivationJob]
-> [WorkflowActivationJob]
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
[WorkflowActivationJob]
WorkflowActivation
WorkflowActivation
[WorkflowActivationJob]
[WorkflowActivationJob]
forall (f :: * -> *) s a.
(Functor f, HasField s "jobs" a) =>
LensLike' f s a
Activation.jobs)
case [WorkflowActivationJob]
withoutStart of
[] -> () -> ReaderT WorkflowWorker m ()
forall a. a -> ReaderT WorkflowWorker m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
[WorkflowActivationJob]
otherJobs -> STM () -> ReaderT WorkflowWorker m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> ReaderT WorkflowWorker m ())
-> STM () -> ReaderT WorkflowWorker m ()
forall a b. (a -> b) -> a -> b
$ TQueue WorkflowActivation -> WorkflowActivation -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue WorkflowInstance
inst.activationChannel (WorkflowActivation
activation WorkflowActivation
-> (WorkflowActivation -> WorkflowActivation) -> WorkflowActivation
forall s t. s -> (s -> t) -> t
& LensLike' f WorkflowActivation [WorkflowActivationJob]
forall {f :: * -> *}.
Identical f =>
LensLike' f WorkflowActivation [WorkflowActivationJob]
forall (f :: * -> *) s a.
(Functor f, HasField s "jobs" a) =>
LensLike' f s a
Activation.jobs (forall {f :: * -> *}.
Identical f =>
LensLike' f WorkflowActivation [WorkflowActivationJob])
-> [WorkflowActivationJob]
-> WorkflowActivation
-> WorkflowActivation
forall s t a b. Setter s t a b -> b -> s -> t
.~ [WorkflowActivationJob]
otherJobs)
else do
$(logDebug) "Workflow does not need to run."
let completionMessage =
WorkflowActivationCompletion
forall msg. Message msg => msg
defMessage
WorkflowActivationCompletion
-> (WorkflowActivationCompletion -> WorkflowActivationCompletion)
-> WorkflowActivationCompletion
forall s t. s -> (s -> t) -> t
& LensLike' f WorkflowActivationCompletion Text
forall {f :: * -> *}.
Identical f =>
LensLike' f WorkflowActivationCompletion Text
forall (f :: * -> *) s a.
(Functor f, HasField s "runId" a) =>
LensLike' f s a
Completion.runId (forall {f :: * -> *}.
Identical f =>
LensLike' f WorkflowActivationCompletion Text)
-> Text
-> WorkflowActivationCompletion
-> WorkflowActivationCompletion
forall s t a b. Setter s t a b -> b -> s -> t
.~ WorkflowActivation
activation WorkflowActivation
-> FoldLike Text WorkflowActivation WorkflowActivation Text Text
-> Text
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike Text WorkflowActivation WorkflowActivation Text Text
forall (f :: * -> *) s a.
(Functor f, HasField s "runId" a) =>
LensLike' f s a
Activation.runId
WorkflowActivationCompletion
-> (WorkflowActivationCompletion -> WorkflowActivationCompletion)
-> WorkflowActivationCompletion
forall s t. s -> (s -> t) -> t
& LensLike' f WorkflowActivationCompletion Success
forall {f :: * -> *}.
Identical f =>
LensLike' f WorkflowActivationCompletion Success
forall (f :: * -> *) s a.
(Functor f, HasField s "successful" a) =>
LensLike' f s a
Completion.successful (forall {f :: * -> *}.
Identical f =>
LensLike' f WorkflowActivationCompletion Success)
-> Success
-> WorkflowActivationCompletion
-> WorkflowActivationCompletion
forall s t a b. Setter s t a b -> b -> s -> t
.~ Success
forall msg. Message msg => msg
defMessage
liftIO (Core.completeWorkflowActivation workerCore completionMessage >>= either throwIO pure)
removeEvictedWorkflowInstances
where
shouldRun :: Bool
shouldRun :: Bool
shouldRun = Bool
moreThanOneJob Bool -> Bool -> Bool
|| Bool -> Bool
not Bool
removeFromCacheJob
where
jobs :: Vector WorkflowActivationJob
jobs = WorkflowActivation
activation WorkflowActivation
-> FoldLike
(Vector WorkflowActivationJob)
WorkflowActivation
WorkflowActivation
(Vector WorkflowActivationJob)
(Vector WorkflowActivationJob)
-> Vector WorkflowActivationJob
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
(Vector WorkflowActivationJob)
WorkflowActivation
WorkflowActivation
(Vector WorkflowActivationJob)
(Vector WorkflowActivationJob)
forall (f :: * -> *) s a.
(Functor f, HasField s "vec'jobs" a) =>
LensLike' f s a
Activation.vec'jobs
removeFromCacheJob :: Bool
removeFromCacheJob = (WorkflowActivationJob -> Bool)
-> Vector WorkflowActivationJob -> Bool
forall a. (a -> Bool) -> Vector a -> Bool
V.any (\WorkflowActivationJob
j -> Maybe RemoveFromCache -> Bool
forall a. Maybe a -> Bool
isJust (WorkflowActivationJob
j WorkflowActivationJob
-> FoldLike
(Maybe RemoveFromCache)
WorkflowActivationJob
WorkflowActivationJob
(Maybe RemoveFromCache)
(Maybe RemoveFromCache)
-> Maybe RemoveFromCache
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
(Maybe RemoveFromCache)
WorkflowActivationJob
WorkflowActivationJob
(Maybe RemoveFromCache)
(Maybe RemoveFromCache)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'removeFromCache" a) =>
LensLike' f s a
Activation.maybe'removeFromCache)) Vector WorkflowActivationJob
jobs
moreThanOneJob :: Bool
moreThanOneJob = Vector WorkflowActivationJob -> Int
forall a. Vector a -> Int
V.length Vector WorkflowActivationJob
jobs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
1
activationStartWorkflowJobs :: V.Vector (WorkflowActivationJob, StartWorkflow)
activationStartWorkflowJobs :: Vector (WorkflowActivationJob, StartWorkflow)
activationStartWorkflowJobs =
(WorkflowActivationJob
-> Maybe (WorkflowActivationJob, StartWorkflow))
-> Vector WorkflowActivationJob
-> Vector (WorkflowActivationJob, StartWorkflow)
forall a b. (a -> Maybe b) -> Vector a -> Vector b
V.mapMaybe
( \WorkflowActivationJob
rawJob ->
case WorkflowActivationJob
rawJob WorkflowActivationJob
-> FoldLike
(Maybe WorkflowActivationJob'Variant)
WorkflowActivationJob
WorkflowActivationJob
(Maybe WorkflowActivationJob'Variant)
(Maybe WorkflowActivationJob'Variant)
-> Maybe WorkflowActivationJob'Variant
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
(Maybe WorkflowActivationJob'Variant)
WorkflowActivationJob
WorkflowActivationJob
(Maybe WorkflowActivationJob'Variant)
(Maybe WorkflowActivationJob'Variant)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'variant" a) =>
LensLike' f s a
Activation.maybe'variant of
Just (WorkflowActivationJob'StartWorkflow StartWorkflow
startWorkflow) -> (WorkflowActivationJob, StartWorkflow)
-> Maybe (WorkflowActivationJob, StartWorkflow)
forall a. a -> Maybe a
Just (WorkflowActivationJob
rawJob, StartWorkflow
startWorkflow)
Maybe WorkflowActivationJob'Variant
_ -> Maybe (WorkflowActivationJob, StartWorkflow)
forall a. Maybe a
Nothing
)
(WorkflowActivation
activation WorkflowActivation
-> FoldLike
(Vector WorkflowActivationJob)
WorkflowActivation
WorkflowActivation
(Vector WorkflowActivationJob)
(Vector WorkflowActivationJob)
-> Vector WorkflowActivationJob
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
(Vector WorkflowActivationJob)
WorkflowActivation
WorkflowActivation
(Vector WorkflowActivationJob)
(Vector WorkflowActivationJob)
forall (f :: * -> *) s a.
(Functor f, HasField s "vec'jobs" a) =>
LensLike' f s a
Activation.vec'jobs)
createOrFetchWorkflowInstance :: ReaderT WorkflowWorker m (Maybe WorkflowInstance)
createOrFetchWorkflowInstance :: ReaderT WorkflowWorker m (Maybe WorkflowInstance)
createOrFetchWorkflowInstance = Text
-> SpanArguments
-> (Span -> ReaderT WorkflowWorker m (Maybe WorkflowInstance))
-> ReaderT WorkflowWorker m (Maybe WorkflowInstance)
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadTracer m, HasCallStack) =>
Text -> SpanArguments -> (Span -> m a) -> m a
inSpan' Text
"createOrFetchWorkflowInstance" (SpanArguments
defaultSpanArguments {attributes = HashMap.fromList [("temporal.activation.run_id", toAttribute $ activation ^. Activation.runId)]}) ((Span -> ReaderT WorkflowWorker m (Maybe WorkflowInstance))
-> ReaderT WorkflowWorker m (Maybe WorkflowInstance))
-> (Span -> ReaderT WorkflowWorker m (Maybe WorkflowInstance))
-> ReaderT WorkflowWorker m (Maybe WorkflowInstance)
forall a b. (a -> b) -> a -> b
$ \Span
s -> do
worker@WorkflowWorker {workerCore} <- ReaderT WorkflowWorker m WorkflowWorker
forall r (m :: * -> *). MonadReader r m => m r
ask
runningWorkflows_ <- atomically $ readTVar worker.runningWorkflows
case HashMap.lookup (RunId $ activation ^. Activation.runId) runningWorkflows_ of
Just WorkflowInstance
inst -> do
Span -> Text -> Text -> ReaderT WorkflowWorker m ()
forall (m :: * -> *) a.
(MonadIO m, ToAttribute a) =>
Span -> Text -> a -> m ()
addAttribute Span
s Text
"temporal.workflow.worker.instance_state" (Text
"existing" :: Text)
Maybe WorkflowInstance
-> ReaderT WorkflowWorker m (Maybe WorkflowInstance)
forall a. a -> ReaderT WorkflowWorker m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe WorkflowInstance
-> ReaderT WorkflowWorker m (Maybe WorkflowInstance))
-> Maybe WorkflowInstance
-> ReaderT WorkflowWorker m (Maybe WorkflowInstance)
forall a b. (a -> b) -> a -> b
$ WorkflowInstance -> Maybe WorkflowInstance
forall a. a -> Maybe a
Just WorkflowInstance
inst
Maybe WorkflowInstance
Nothing -> do
Span -> Text -> Text -> ReaderT WorkflowWorker m ()
forall (m :: * -> *) a.
(MonadIO m, ToAttribute a) =>
Span -> Text -> a -> m ()
addAttribute Span
s Text
"temporal.workflow.worker.instance_state" (Text
"new" :: Text)
vExistingInstance <- Vector (WorkflowActivationJob, StartWorkflow)
-> ((WorkflowActivationJob, StartWorkflow)
-> ReaderT WorkflowWorker m (Maybe WorkflowInstance))
-> ReaderT WorkflowWorker m (Vector (Maybe WorkflowInstance))
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM Vector (WorkflowActivationJob, StartWorkflow)
activationStartWorkflowJobs (((WorkflowActivationJob, StartWorkflow)
-> ReaderT WorkflowWorker m (Maybe WorkflowInstance))
-> ReaderT WorkflowWorker m (Vector (Maybe WorkflowInstance)))
-> ((WorkflowActivationJob, StartWorkflow)
-> ReaderT WorkflowWorker m (Maybe WorkflowInstance))
-> ReaderT WorkflowWorker m (Vector (Maybe WorkflowInstance))
forall a b. (a -> b) -> a -> b
$ \(WorkflowActivationJob
_job, StartWorkflow
startWorkflow) -> do
Span -> Text -> Text -> ReaderT WorkflowWorker m ()
forall (m :: * -> *) a.
(MonadIO m, ToAttribute a) =>
Span -> Text -> a -> m ()
addAttribute Span
s Text
"temporal.workflow.type" (StartWorkflow
startWorkflow StartWorkflow
-> FoldLike Text StartWorkflow StartWorkflow Text Text -> Text
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike Text StartWorkflow StartWorkflow Text Text
forall (f :: * -> *) s a.
(Functor f, HasField s "workflowType" a) =>
LensLike' f s a
Activation.workflowType)
ePayloads <- ReaderT
WorkflowWorker
m
(Map SearchAttributeKey SearchAttributeType, Map Text Payload,
Map Text Payload)
-> ReaderT
WorkflowWorker
m
(Either
SomeException
(Map SearchAttributeKey SearchAttributeType, Map Text Payload,
Map Text Payload))
forall e (m :: * -> *) a.
(Exception e, MonadCatch m) =>
m a -> m (Either e a)
Ann.try (ReaderT
WorkflowWorker
m
(Map SearchAttributeKey SearchAttributeType, Map Text Payload,
Map Text Payload)
-> ReaderT
WorkflowWorker
m
(Either
SomeException
(Map SearchAttributeKey SearchAttributeType, Map Text Payload,
Map Text Payload)))
-> ReaderT
WorkflowWorker
m
(Map SearchAttributeKey SearchAttributeType, Map Text Payload,
Map Text Payload)
-> ReaderT
WorkflowWorker
m
(Either
SomeException
(Map SearchAttributeKey SearchAttributeType, Map Text Payload,
Map Text Payload))
forall a b. (a -> b) -> a -> b
$ do
searchAttrs <- IO (Map SearchAttributeKey SearchAttributeType)
-> ReaderT
WorkflowWorker m (Map SearchAttributeKey SearchAttributeType)
forall a. IO a -> ReaderT WorkflowWorker m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Map SearchAttributeKey SearchAttributeType)
-> ReaderT
WorkflowWorker m (Map SearchAttributeKey SearchAttributeType))
-> IO (Map SearchAttributeKey SearchAttributeType)
-> ReaderT
WorkflowWorker m (Map SearchAttributeKey SearchAttributeType)
forall a b. (a -> b) -> a -> b
$ do
decodedAttrs <- StartWorkflow
startWorkflow StartWorkflow
-> FoldLike
(IO (Either String (Map SearchAttributeKey SearchAttributeType)))
StartWorkflow
StartWorkflow
(IO (Either String (Map SearchAttributeKey SearchAttributeType)))
Any
-> IO (Either String (Map SearchAttributeKey SearchAttributeType))
forall s a t b. s -> FoldLike a s t a b -> a
^. LensLike'
(Constant
(IO (Either String (Map SearchAttributeKey SearchAttributeType))))
StartWorkflow
SearchAttributes
forall (f :: * -> *) s a.
(Functor f, HasField s "searchAttributes" a) =>
LensLike' f s a
Activation.searchAttributes LensLike'
(Constant
(IO (Either String (Map SearchAttributeKey SearchAttributeType))))
StartWorkflow
SearchAttributes
-> ((IO
(Either String (Map SearchAttributeKey SearchAttributeType))
-> Constant
(IO (Either String (Map SearchAttributeKey SearchAttributeType)))
Any)
-> SearchAttributes
-> Constant
(IO (Either String (Map SearchAttributeKey SearchAttributeType)))
SearchAttributes)
-> FoldLike
(IO (Either String (Map SearchAttributeKey SearchAttributeType)))
StartWorkflow
StartWorkflow
(IO (Either String (Map SearchAttributeKey SearchAttributeType)))
Any
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LensLike'
(Constant
(IO (Either String (Map SearchAttributeKey SearchAttributeType))))
SearchAttributes
(Map Text Payload)
forall (f :: * -> *) s a.
(Functor f, HasField s "indexedFields" a) =>
LensLike' f s a
Message.indexedFields LensLike'
(Constant
(IO (Either String (Map SearchAttributeKey SearchAttributeType))))
SearchAttributes
(Map Text Payload)
-> ((IO
(Either String (Map SearchAttributeKey SearchAttributeType))
-> Constant
(IO (Either String (Map SearchAttributeKey SearchAttributeType)))
Any)
-> Map Text Payload
-> Constant
(IO (Either String (Map SearchAttributeKey SearchAttributeType)))
(Map Text Payload))
-> (IO (Either String (Map SearchAttributeKey SearchAttributeType))
-> Constant
(IO (Either String (Map SearchAttributeKey SearchAttributeType)))
Any)
-> SearchAttributes
-> Constant
(IO (Either String (Map SearchAttributeKey SearchAttributeType)))
SearchAttributes
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Map Text Payload
-> IO (Either String (Map SearchAttributeKey SearchAttributeType)))
-> Getter
(Map Text Payload)
(Map Text Payload)
(IO (Either String (Map SearchAttributeKey SearchAttributeType)))
Any
forall s a t b. (s -> a) -> Getter s t a b
to Map Text Payload
-> IO (Either String (Map SearchAttributeKey SearchAttributeType))
searchAttributesFromProto
either (throwIO . ValueError) pure decodedAttrs
hdrs <- processorDecodePayloads worker.processor (startWorkflow ^. Activation.headers . to (fmap convertFromProtoPayload))
memo <- processorDecodePayloads worker.processor (startWorkflow ^. Activation.memo . Message.fields . to (fmap convertFromProtoPayload))
pure (searchAttrs, hdrs, memo)
case ePayloads of
Left SomeException
err -> do
let appFailure :: ApplicationFailure
appFailure = SomeException -> [ApplicationFailureHandler] -> ApplicationFailure
Err.mkApplicationFailure SomeException
err WorkflowWorker
worker.workerErrorConverters
enrichedApplicationFailure :: Failure
enrichedApplicationFailure =
Failure
forall msg. Message msg => msg
defMessage
Failure -> (Failure -> Failure) -> Failure
forall s t. s -> (s -> t) -> t
& LensLike' f Failure Text
forall {f :: * -> *}. Identical f => LensLike' f Failure Text
forall (f :: * -> *) s a.
(Functor f, HasField s "message" a) =>
LensLike' f s a
F.message (forall {f :: * -> *}. Identical f => LensLike' f Failure Text)
-> Text -> Failure -> Failure
forall s t a b. Setter s t a b -> b -> s -> t
.~ ApplicationFailure
appFailure.message
Failure -> (Failure -> Failure) -> Failure
forall s t. s -> (s -> t) -> t
& LensLike' f Failure Text
forall {f :: * -> *}. Identical f => LensLike' f Failure Text
forall (f :: * -> *) s a.
(Functor f, HasField s "source" a) =>
LensLike' f s a
F.source (forall {f :: * -> *}. Identical f => LensLike' f Failure Text)
-> Text -> Failure -> Failure
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
"hs-temporal-sdk"
Failure -> (Failure -> Failure) -> Failure
forall s t. s -> (s -> t) -> t
& LensLike' f Failure Text
forall {f :: * -> *}. Identical f => LensLike' f Failure Text
forall (f :: * -> *) s a.
(Functor f, HasField s "stackTrace" a) =>
LensLike' f s a
F.stackTrace (forall {f :: * -> *}. Identical f => LensLike' f Failure Text)
-> Text -> Failure -> Failure
forall s t a b. Setter s t a b -> b -> s -> t
.~ ApplicationFailure
appFailure.stack
Failure -> (Failure -> Failure) -> Failure
forall s t. s -> (s -> t) -> t
& LensLike' f Failure ApplicationFailureInfo
forall {f :: * -> *}.
Identical f =>
LensLike' f Failure ApplicationFailureInfo
forall (f :: * -> *) s a.
(Functor f, HasField s "applicationFailureInfo" a) =>
LensLike' f s a
F.applicationFailureInfo
(forall {f :: * -> *}.
Identical f =>
LensLike' f Failure ApplicationFailureInfo)
-> ApplicationFailureInfo -> Failure -> Failure
forall s t a b. Setter s t a b -> b -> s -> t
.~ ( ApplicationFailureInfo
forall msg. Message msg => msg
defMessage
ApplicationFailureInfo
-> (ApplicationFailureInfo -> ApplicationFailureInfo)
-> ApplicationFailureInfo
forall s t. s -> (s -> t) -> t
& LensLike' f ApplicationFailureInfo Text
forall {f :: * -> *}.
Identical f =>
LensLike' f ApplicationFailureInfo Text
forall (f :: * -> *) s a.
(Functor f, HasField s "type'" a) =>
LensLike' f s a
F.type' (forall {f :: * -> *}.
Identical f =>
LensLike' f ApplicationFailureInfo Text)
-> Text -> ApplicationFailureInfo -> ApplicationFailureInfo
forall s t a b. Setter s t a b -> b -> s -> t
.~ ApplicationFailure -> Text
Err.type' ApplicationFailure
appFailure
ApplicationFailureInfo
-> (ApplicationFailureInfo -> ApplicationFailureInfo)
-> ApplicationFailureInfo
forall s t. s -> (s -> t) -> t
& LensLike' f ApplicationFailureInfo Bool
forall {f :: * -> *}.
Identical f =>
LensLike' f ApplicationFailureInfo Bool
forall (f :: * -> *) s a.
(Functor f, HasField s "nonRetryable" a) =>
LensLike' f s a
F.nonRetryable (forall {f :: * -> *}.
Identical f =>
LensLike' f ApplicationFailureInfo Bool)
-> Bool -> ApplicationFailureInfo -> ApplicationFailureInfo
forall s t a b. Setter s t a b -> b -> s -> t
.~ ApplicationFailure -> Bool
Err.nonRetryable ApplicationFailure
appFailure
)
failureProto :: Completion.Failure
failureProto :: Failure
failureProto = Failure
forall msg. Message msg => msg
defMessage Failure -> (Failure -> Failure) -> Failure
forall s t. s -> (s -> t) -> t
& LensLike' f Failure Failure
forall {f :: * -> *}. Identical f => LensLike' f Failure Failure
forall (f :: * -> *) s a.
(Functor f, HasField s "failure" a) =>
LensLike' f s a
Completion.failure (forall {f :: * -> *}. Identical f => LensLike' f Failure Failure)
-> Failure -> Failure -> Failure
forall s t a b. Setter s t a b -> b -> s -> t
.~ Failure
enrichedApplicationFailure
completionMessage :: WorkflowActivationCompletion
completionMessage =
WorkflowActivationCompletion
forall msg. Message msg => msg
defMessage
WorkflowActivationCompletion
-> (WorkflowActivationCompletion -> WorkflowActivationCompletion)
-> WorkflowActivationCompletion
forall s t. s -> (s -> t) -> t
& LensLike' f WorkflowActivationCompletion Text
forall {f :: * -> *}.
Identical f =>
LensLike' f WorkflowActivationCompletion Text
forall (f :: * -> *) s a.
(Functor f, HasField s "runId" a) =>
LensLike' f s a
Completion.runId (forall {f :: * -> *}.
Identical f =>
LensLike' f WorkflowActivationCompletion Text)
-> Text
-> WorkflowActivationCompletion
-> WorkflowActivationCompletion
forall s t a b. Setter s t a b -> b -> s -> t
.~ (WorkflowActivation
activation WorkflowActivation
-> FoldLike Text WorkflowActivation WorkflowActivation Text Text
-> Text
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike Text WorkflowActivation WorkflowActivation Text Text
forall (f :: * -> *) s a.
(Functor f, HasField s "runId" a) =>
LensLike' f s a
Activation.runId)
WorkflowActivationCompletion
-> (WorkflowActivationCompletion -> WorkflowActivationCompletion)
-> WorkflowActivationCompletion
forall s t. s -> (s -> t) -> t
& LensLike' f WorkflowActivationCompletion Failure
forall {f :: * -> *}.
Identical f =>
LensLike' f WorkflowActivationCompletion Failure
forall (f :: * -> *) s a.
(Functor f, HasField s "failed" a) =>
LensLike' f s a
Completion.failed (forall {f :: * -> *}.
Identical f =>
LensLike' f WorkflowActivationCompletion Failure)
-> Failure
-> WorkflowActivationCompletion
-> WorkflowActivationCompletion
forall s t a b. Setter s t a b -> b -> s -> t
.~ Failure
failureProto
IO (Either WorkerError ())
-> ReaderT WorkflowWorker m (Either WorkerError ())
forall a. IO a -> ReaderT WorkflowWorker m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either WorkerError ())
-> ReaderT WorkflowWorker m (Either WorkerError ()))
-> IO (Either WorkerError ())
-> ReaderT WorkflowWorker m (Either WorkerError ())
forall a b. (a -> b) -> a -> b
$ Worker ty
-> WorkflowActivationCompletion -> IO (Either WorkerError ())
forall (ty :: WorkerType).
KnownWorkerType ty =>
Worker ty
-> WorkflowActivationCompletion -> IO (Either WorkerError ())
Core.completeWorkflowActivation Worker ty
workerCore WorkflowActivationCompletion
completionMessage
Maybe WorkflowInstance
-> ReaderT WorkflowWorker m (Maybe WorkflowInstance)
forall a. a -> ReaderT WorkflowWorker m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe WorkflowInstance
forall a. Maybe a
Nothing
Right (Map SearchAttributeKey SearchAttributeType
searchAttrs, Map Text Payload
hdrs, Map Text Payload
memo) -> do
let runId_ :: RunId
runId_ = Text -> RunId
RunId (Text -> RunId) -> Text -> RunId
forall a b. (a -> b) -> a -> b
$ WorkflowActivation
activation WorkflowActivation
-> FoldLike Text WorkflowActivation WorkflowActivation Text Text
-> Text
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike Text WorkflowActivation WorkflowActivation Text Text
forall (f :: * -> *) s a.
(Functor f, HasField s "runId" a) =>
LensLike' f s a
CommonProto.runId
parentProto :: Maybe NamespacedWorkflowExecution
parentProto = StartWorkflow
startWorkflow StartWorkflow
-> FoldLike
(Maybe NamespacedWorkflowExecution)
StartWorkflow
StartWorkflow
(Maybe NamespacedWorkflowExecution)
(Maybe NamespacedWorkflowExecution)
-> Maybe NamespacedWorkflowExecution
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
(Maybe NamespacedWorkflowExecution)
StartWorkflow
StartWorkflow
(Maybe NamespacedWorkflowExecution)
(Maybe NamespacedWorkflowExecution)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'parentWorkflowInfo" a) =>
LensLike' f s a
Activation.maybe'parentWorkflowInfo
parentInfo :: Maybe ParentInfo
parentInfo = case Maybe NamespacedWorkflowExecution
parentProto of
Maybe NamespacedWorkflowExecution
Nothing -> Maybe ParentInfo
forall a. Maybe a
Nothing
Just NamespacedWorkflowExecution
parent ->
ParentInfo -> Maybe ParentInfo
forall a. a -> Maybe a
Just
ParentInfo
{ parentNamespace :: Namespace
parentNamespace = Text -> Namespace
Namespace (Text -> Namespace) -> Text -> Namespace
forall a b. (a -> b) -> a -> b
$ NamespacedWorkflowExecution
parent NamespacedWorkflowExecution
-> FoldLike
Text
NamespacedWorkflowExecution
NamespacedWorkflowExecution
Text
Text
-> Text
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
Text
NamespacedWorkflowExecution
NamespacedWorkflowExecution
Text
Text
forall (f :: * -> *) s a.
(Functor f, HasField s "namespace" a) =>
LensLike' f s a
CommonProto.namespace
, parentRunId :: RunId
parentRunId = Text -> RunId
RunId (Text -> RunId) -> Text -> RunId
forall a b. (a -> b) -> a -> b
$ NamespacedWorkflowExecution
parent NamespacedWorkflowExecution
-> FoldLike
Text
NamespacedWorkflowExecution
NamespacedWorkflowExecution
Text
Text
-> Text
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
Text
NamespacedWorkflowExecution
NamespacedWorkflowExecution
Text
Text
forall (f :: * -> *) s a.
(Functor f, HasField s "runId" a) =>
LensLike' f s a
CommonProto.runId
, parentWorkflowId :: WorkflowId
parentWorkflowId = Text -> WorkflowId
WorkflowId (Text -> WorkflowId) -> Text -> WorkflowId
forall a b. (a -> b) -> a -> b
$ NamespacedWorkflowExecution
parent NamespacedWorkflowExecution
-> FoldLike
Text
NamespacedWorkflowExecution
NamespacedWorkflowExecution
Text
Text
-> Text
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
Text
NamespacedWorkflowExecution
NamespacedWorkflowExecution
Text
Text
forall (f :: * -> *) s a.
(Functor f, HasField s "workflowId" a) =>
LensLike' f s a
CommonProto.workflowId
}
workflowInfo :: Info
workflowInfo =
Temporal.WorkflowInstance.Info
{ historyLength :: Word32
historyLength = WorkflowActivation
activation WorkflowActivation
-> FoldLike
Word32 WorkflowActivation WorkflowActivation Word32 Word32
-> Word32
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike Word32 WorkflowActivation WorkflowActivation Word32 Word32
forall (f :: * -> *) s a.
(Functor f, HasField s "historyLength" a) =>
LensLike' f s a
Activation.historyLength
, attempt :: Int
attempt = Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int32 -> Int) -> Int32 -> Int
forall a b. (a -> b) -> a -> b
$ StartWorkflow
startWorkflow StartWorkflow
-> FoldLike Int32 StartWorkflow StartWorkflow Int32 Int32 -> Int32
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike Int32 StartWorkflow StartWorkflow Int32 Int32
forall (f :: * -> *) s a.
(Functor f, HasField s "attempt" a) =>
LensLike' f s a
Activation.attempt
, taskQueue :: TaskQueue
taskQueue = WorkflowWorker
worker.workerTaskQueue
, workflowId :: WorkflowId
workflowId = Text -> WorkflowId
WorkflowId (Text -> WorkflowId) -> Text -> WorkflowId
forall a b. (a -> b) -> a -> b
$ StartWorkflow
startWorkflow StartWorkflow
-> FoldLike Text StartWorkflow StartWorkflow Text Text -> Text
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike Text StartWorkflow StartWorkflow Text Text
forall (f :: * -> *) s a.
(Functor f, HasField s "workflowId" a) =>
LensLike' f s a
Activation.workflowId
, workflowType :: WorkflowType
workflowType = StartWorkflow
startWorkflow StartWorkflow
-> FoldLike
WorkflowType StartWorkflow StartWorkflow WorkflowType Any
-> WorkflowType
forall s a t b. s -> FoldLike a s t a b -> a
^. LensLike' (Constant WorkflowType) StartWorkflow Text
forall (f :: * -> *) s a.
(Functor f, HasField s "workflowType" a) =>
LensLike' f s a
Activation.workflowType LensLike' (Constant WorkflowType) StartWorkflow Text
-> ((WorkflowType -> Constant WorkflowType Any)
-> Text -> Constant WorkflowType Text)
-> FoldLike
WorkflowType StartWorkflow StartWorkflow WorkflowType Any
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Text -> WorkflowType) -> Getter Text Text WorkflowType Any
forall s a t b. (s -> a) -> Getter s t a b
to Text -> WorkflowType
WorkflowType
, continuedRunId :: Maybe RunId
continuedRunId = (Text -> RunId) -> Maybe Text -> Maybe RunId
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Text -> RunId
RunId (Maybe Text -> Maybe RunId) -> Maybe Text -> Maybe RunId
forall a b. (a -> b) -> a -> b
$ StartWorkflow
startWorkflow StartWorkflow
-> FoldLike
(Maybe Text) StartWorkflow StartWorkflow (Maybe Text) Any
-> Maybe Text
forall s a t b. s -> FoldLike a s t a b -> a
^. LensLike' (Constant (Maybe Text)) StartWorkflow Text
forall (f :: * -> *) s a.
(Functor f, HasField s "continuedFromExecutionRunId" a) =>
LensLike' f s a
Activation.continuedFromExecutionRunId LensLike' (Constant (Maybe Text)) StartWorkflow Text
-> ((Maybe Text -> Constant (Maybe Text) Any)
-> Text -> Constant (Maybe Text) Text)
-> FoldLike
(Maybe Text) StartWorkflow StartWorkflow (Maybe Text) Any
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Text -> Maybe Text) -> Getter Text Text (Maybe Text) Any
forall s a t b. (s -> a) -> Getter s t a b
to Text -> Maybe Text
nonEmptyString
, cronSchedule :: Maybe Text
cronSchedule = StartWorkflow
startWorkflow StartWorkflow
-> FoldLike
(Maybe Text) StartWorkflow StartWorkflow (Maybe Text) Any
-> Maybe Text
forall s a t b. s -> FoldLike a s t a b -> a
^. LensLike' (Constant (Maybe Text)) StartWorkflow Text
forall (f :: * -> *) s a.
(Functor f, HasField s "cronSchedule" a) =>
LensLike' f s a
Activation.cronSchedule LensLike' (Constant (Maybe Text)) StartWorkflow Text
-> ((Maybe Text -> Constant (Maybe Text) Any)
-> Text -> Constant (Maybe Text) Text)
-> FoldLike
(Maybe Text) StartWorkflow StartWorkflow (Maybe Text) Any
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Text -> Maybe Text) -> Getter Text Text (Maybe Text) Any
forall s a t b. (s -> a) -> Getter s t a b
to Text -> Maybe Text
nonEmptyString
, taskTimeout :: Duration
taskTimeout = StartWorkflow
startWorkflow StartWorkflow
-> FoldLike Duration StartWorkflow StartWorkflow Duration Any
-> Duration
forall s a t b. s -> FoldLike a s t a b -> a
^. LensLike' (Constant Duration) StartWorkflow Duration
forall (f :: * -> *) s a.
(Functor f, HasField s "workflowTaskTimeout" a) =>
LensLike' f s a
Activation.workflowTaskTimeout LensLike' (Constant Duration) StartWorkflow Duration
-> ((Duration -> Constant Duration Any)
-> Duration -> Constant Duration Duration)
-> FoldLike Duration StartWorkflow StartWorkflow Duration Any
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Duration -> Duration) -> Getter Duration Duration Duration Any
forall s a t b. (s -> a) -> Getter s t a b
to Duration -> Duration
durationFromProto
, executionTimeout :: Maybe Duration
executionTimeout = (Duration -> Duration) -> Maybe Duration -> Maybe Duration
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Duration -> Duration
durationFromProto (Maybe Duration -> Maybe Duration)
-> Maybe Duration -> Maybe Duration
forall a b. (a -> b) -> a -> b
$ StartWorkflow
startWorkflow StartWorkflow
-> FoldLike
(Maybe Duration)
StartWorkflow
StartWorkflow
(Maybe Duration)
(Maybe Duration)
-> Maybe Duration
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
(Maybe Duration)
StartWorkflow
StartWorkflow
(Maybe Duration)
(Maybe Duration)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'workflowExecutionTimeout" a) =>
LensLike' f s a
Activation.maybe'workflowExecutionTimeout
, namespace :: Namespace
namespace = Text -> Namespace
Namespace (Text -> Namespace) -> Text -> Namespace
forall a b. (a -> b) -> a -> b
$ WorkerConfig -> Text
Core.namespace (WorkerConfig -> Text) -> WorkerConfig -> Text
forall a b. (a -> b) -> a -> b
$ Worker ty -> WorkerConfig
forall (ty :: WorkerType). Worker ty -> WorkerConfig
Core.getWorkerConfig Worker ty
workerCore
, parent :: Maybe ParentInfo
parent = Maybe ParentInfo
parentInfo
, headers :: Map Text Payload
headers = Map Text Payload
hdrs
, rawMemo :: Map Text Payload
rawMemo = Map Text Payload
memo
, searchAttributes :: Map SearchAttributeKey SearchAttributeType
searchAttributes = Map SearchAttributeKey SearchAttributeType
searchAttrs
, retryPolicy :: Maybe RetryPolicy
retryPolicy = RetryPolicy -> RetryPolicy
retryPolicyFromProto (RetryPolicy -> RetryPolicy)
-> Maybe RetryPolicy -> Maybe RetryPolicy
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StartWorkflow
startWorkflow StartWorkflow
-> FoldLike
(Maybe RetryPolicy)
StartWorkflow
StartWorkflow
(Maybe RetryPolicy)
(Maybe RetryPolicy)
-> Maybe RetryPolicy
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
(Maybe RetryPolicy)
StartWorkflow
StartWorkflow
(Maybe RetryPolicy)
(Maybe RetryPolicy)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'retryPolicy" a) =>
LensLike' f s a
Activation.maybe'retryPolicy
, runId :: RunId
runId = Text -> RunId
RunId (Text -> RunId) -> Text -> RunId
forall a b. (a -> b) -> a -> b
$ WorkflowActivation
activation WorkflowActivation
-> FoldLike Text WorkflowActivation WorkflowActivation Text Text
-> Text
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike Text WorkflowActivation WorkflowActivation Text Text
forall (f :: * -> *) s a.
(Functor f, HasField s "runId" a) =>
LensLike' f s a
CommonProto.runId
, runTimeout :: Maybe Duration
runTimeout = (Duration -> Duration) -> Maybe Duration -> Maybe Duration
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Duration -> Duration
durationFromProto (Maybe Duration -> Maybe Duration)
-> Maybe Duration -> Maybe Duration
forall a b. (a -> b) -> a -> b
$ StartWorkflow
startWorkflow StartWorkflow
-> FoldLike
(Maybe Duration)
StartWorkflow
StartWorkflow
(Maybe Duration)
(Maybe Duration)
-> Maybe Duration
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
(Maybe Duration)
StartWorkflow
StartWorkflow
(Maybe Duration)
(Maybe Duration)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'workflowRunTimeout" a) =>
LensLike' f s a
Activation.maybe'workflowRunTimeout
, startTime :: SystemTime
startTime =
Timestamp -> SystemTime
timespecFromTimestamp (Timestamp -> SystemTime) -> Timestamp -> SystemTime
forall a b. (a -> b) -> a -> b
$
Timestamp -> Maybe Timestamp -> Timestamp
forall a. a -> Maybe a -> a
fromMaybe
(WorkflowActivation
activation WorkflowActivation
-> FoldLike
Timestamp WorkflowActivation WorkflowActivation Timestamp Timestamp
-> Timestamp
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
Timestamp WorkflowActivation WorkflowActivation Timestamp Timestamp
forall (f :: * -> *) s a.
(Functor f, HasField s "timestamp" a) =>
LensLike' f s a
Activation.timestamp)
(StartWorkflow
startWorkflow StartWorkflow
-> FoldLike
(Maybe Timestamp)
StartWorkflow
StartWorkflow
(Maybe Timestamp)
(Maybe Timestamp)
-> Maybe Timestamp
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
(Maybe Timestamp)
StartWorkflow
StartWorkflow
(Maybe Timestamp)
(Maybe Timestamp)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'startTime" a) =>
LensLike' f s a
Activation.maybe'startTime)
, continueAsNewSuggested :: Bool
continueAsNewSuggested = Bool
False
}
case Text -> HashMap Text WorkflowDefinition -> Maybe WorkflowDefinition
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup (StartWorkflow
startWorkflow StartWorkflow
-> FoldLike Text StartWorkflow StartWorkflow Text Text -> Text
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike Text StartWorkflow StartWorkflow Text Text
forall (f :: * -> *) s a.
(Functor f, HasField s "workflowType" a) =>
LensLike' f s a
Activation.workflowType) WorkflowWorker
worker.workerWorkflowFunctions of
Maybe WorkflowDefinition
Nothing -> do
Span -> SpanStatus -> ReaderT WorkflowWorker m ()
forall (m :: * -> *). MonadIO m => Span -> SpanStatus -> m ()
setStatus Span
s (Text -> SpanStatus
Error Text
"No workflow definition found")
$Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> ReaderT WorkflowWorker m ()
(Text -> ReaderT WorkflowWorker m ())
-> (Text -> Text) -> Text -> ReaderT WorkflowWorker m ()
forall a. a -> a
forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> ReaderT WorkflowWorker m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
logInfo Text
"No workflow definition found"
let failureProto :: Failure
failureProto =
Failure
forall msg. Message msg => msg
defMessage
Failure -> (Failure -> Failure) -> Failure
forall s t. s -> (s -> t) -> t
& LensLike' f Failure Failure
forall {f :: * -> *}. Identical f => LensLike' f Failure Failure
forall (f :: * -> *) s a.
(Functor f, HasField s "failure" a) =>
LensLike' f s a
Completion.failure
(forall {f :: * -> *}. Identical f => LensLike' f Failure Failure)
-> Failure -> Failure -> Failure
forall s t a b. Setter s t a b -> b -> s -> t
.~ ( Failure
forall msg. Message msg => msg
defMessage
Failure -> (Failure -> Failure) -> Failure
forall s t. s -> (s -> t) -> t
& LensLike' f Failure Text
forall {f :: * -> *}. Identical f => LensLike' f Failure Text
forall (f :: * -> *) s a.
(Functor f, HasField s "message" a) =>
LensLike' f s a
F.message (forall {f :: * -> *}. Identical f => LensLike' f Failure Text)
-> Text -> Failure -> Failure
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
"No workflow definition found"
Failure -> (Failure -> Failure) -> Failure
forall s t. s -> (s -> t) -> t
& LensLike' f Failure ApplicationFailureInfo
forall {f :: * -> *}.
Identical f =>
LensLike' f Failure ApplicationFailureInfo
forall (f :: * -> *) s a.
(Functor f, HasField s "applicationFailureInfo" a) =>
LensLike' f s a
F.applicationFailureInfo
(forall {f :: * -> *}.
Identical f =>
LensLike' f Failure ApplicationFailureInfo)
-> ApplicationFailureInfo -> Failure -> Failure
forall s t a b. Setter s t a b -> b -> s -> t
.~ ( ApplicationFailureInfo
forall msg. Message msg => msg
defMessage
ApplicationFailureInfo
-> (ApplicationFailureInfo -> ApplicationFailureInfo)
-> ApplicationFailureInfo
forall s t. s -> (s -> t) -> t
& LensLike' f ApplicationFailureInfo Text
forall {f :: * -> *}.
Identical f =>
LensLike' f ApplicationFailureInfo Text
forall (f :: * -> *) s a.
(Functor f, HasField s "type'" a) =>
LensLike' f s a
F.type' (forall {f :: * -> *}.
Identical f =>
LensLike' f ApplicationFailureInfo Text)
-> Text -> ApplicationFailureInfo -> ApplicationFailureInfo
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
"NotFound"
ApplicationFailureInfo
-> (ApplicationFailureInfo -> ApplicationFailureInfo)
-> ApplicationFailureInfo
forall s t. s -> (s -> t) -> t
& LensLike' f ApplicationFailureInfo Bool
forall {f :: * -> *}.
Identical f =>
LensLike' f ApplicationFailureInfo Bool
forall (f :: * -> *) s a.
(Functor f, HasField s "nonRetryable" a) =>
LensLike' f s a
F.nonRetryable (forall {f :: * -> *}.
Identical f =>
LensLike' f ApplicationFailureInfo Bool)
-> Bool -> ApplicationFailureInfo -> ApplicationFailureInfo
forall s t a b. Setter s t a b -> b -> s -> t
.~ Bool
False
)
)
completionMessage :: WorkflowActivationCompletion
completionMessage =
WorkflowActivationCompletion
forall msg. Message msg => msg
defMessage
WorkflowActivationCompletion
-> (WorkflowActivationCompletion -> WorkflowActivationCompletion)
-> WorkflowActivationCompletion
forall s t. s -> (s -> t) -> t
& LensLike' f WorkflowActivationCompletion Text
forall {f :: * -> *}.
Identical f =>
LensLike' f WorkflowActivationCompletion Text
forall (f :: * -> *) s a.
(Functor f, HasField s "runId" a) =>
LensLike' f s a
Completion.runId (forall {f :: * -> *}.
Identical f =>
LensLike' f WorkflowActivationCompletion Text)
-> Text
-> WorkflowActivationCompletion
-> WorkflowActivationCompletion
forall s t a b. Setter s t a b -> b -> s -> t
.~ (WorkflowActivation
activation WorkflowActivation
-> FoldLike Text WorkflowActivation WorkflowActivation Text Text
-> Text
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike Text WorkflowActivation WorkflowActivation Text Text
forall (f :: * -> *) s a.
(Functor f, HasField s "runId" a) =>
LensLike' f s a
Activation.runId)
WorkflowActivationCompletion
-> (WorkflowActivationCompletion -> WorkflowActivationCompletion)
-> WorkflowActivationCompletion
forall s t. s -> (s -> t) -> t
& LensLike' f WorkflowActivationCompletion Failure
forall {f :: * -> *}.
Identical f =>
LensLike' f WorkflowActivationCompletion Failure
forall (f :: * -> *) s a.
(Functor f, HasField s "failed" a) =>
LensLike' f s a
Completion.failed (forall {f :: * -> *}.
Identical f =>
LensLike' f WorkflowActivationCompletion Failure)
-> Failure
-> WorkflowActivationCompletion
-> WorkflowActivationCompletion
forall s t a b. Setter s t a b -> b -> s -> t
.~ Failure
failureProto
IO () -> ReaderT WorkflowWorker m ()
forall a. IO a -> ReaderT WorkflowWorker m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Worker ty
-> WorkflowActivationCompletion -> IO (Either WorkerError ())
forall (ty :: WorkerType).
KnownWorkerType ty =>
Worker ty
-> WorkflowActivationCompletion -> IO (Either WorkerError ())
Core.completeWorkflowActivation Worker ty
workerCore WorkflowActivationCompletion
completionMessage IO (Either WorkerError ())
-> (Either WorkerError () -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (WorkerError -> IO ())
-> (() -> IO ()) -> Either WorkerError () -> IO ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either WorkerError -> IO ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure)
Maybe WorkflowInstance
-> ReaderT WorkflowWorker m (Maybe WorkflowInstance)
forall a. a -> ReaderT WorkflowWorker m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe WorkflowInstance
forall a. Maybe a
Nothing
Just (WorkflowDefinition Text
_ Vector Payload -> IO (Either String (Workflow Payload))
f) -> do
inst <-
(WorkflowActivationCompletion -> IO (Either WorkerError ()))
-> (Vector Payload -> IO (Either String (Workflow Payload)))
-> Maybe Int
-> [ApplicationFailureHandler]
-> WorkflowInboundInterceptor
-> WorkflowOutboundInterceptor
-> PayloadProcessor
-> Info
-> StartWorkflow
-> ReaderT WorkflowWorker m WorkflowInstance
forall (m :: * -> *).
(HasCallStack, MonadLoggerIO m) =>
(WorkflowActivationCompletion -> IO (Either WorkerError ()))
-> (Vector Payload -> IO (Either String (Workflow Payload)))
-> Maybe Int
-> [ApplicationFailureHandler]
-> WorkflowInboundInterceptor
-> WorkflowOutboundInterceptor
-> PayloadProcessor
-> Info
-> StartWorkflow
-> m WorkflowInstance
create
( \WorkflowActivationCompletion
wf -> do
Worker ty
-> WorkflowActivationCompletion -> IO (Either WorkerError ())
forall (ty :: WorkerType).
KnownWorkerType ty =>
Worker ty
-> WorkflowActivationCompletion -> IO (Either WorkerError ())
Core.completeWorkflowActivation Worker ty
workerCore WorkflowActivationCompletion
wf
)
Vector Payload -> IO (Either String (Workflow Payload))
f
WorkflowWorker
worker.workerDeadlockTimeout
WorkflowWorker
worker.workerErrorConverters
WorkflowWorker
worker.workerInboundInterceptors
WorkflowWorker
worker.workerOutboundInterceptors
WorkflowWorker
worker.processor
Info
workflowInfo
StartWorkflow
startWorkflow
liftIO $ addStackTraceHandler inst
Just <$> upsertWorkflowInstance runId_ inst
pure $ join (vExistingInstance V.!? 0)
removeEvictedWorkflowInstances :: ReaderT WorkflowWorker m ()
removeEvictedWorkflowInstances :: ReaderT WorkflowWorker m ()
removeEvictedWorkflowInstances = Vector WorkflowActivationJob
-> (WorkflowActivationJob -> ReaderT WorkflowWorker m ())
-> ReaderT WorkflowWorker m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (WorkflowActivation
activation WorkflowActivation
-> FoldLike
(Vector WorkflowActivationJob)
WorkflowActivation
WorkflowActivation
(Vector WorkflowActivationJob)
(Vector WorkflowActivationJob)
-> Vector WorkflowActivationJob
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
(Vector WorkflowActivationJob)
WorkflowActivation
WorkflowActivation
(Vector WorkflowActivationJob)
(Vector WorkflowActivationJob)
forall (f :: * -> *) s a.
(Functor f, HasField s "vec'jobs" a) =>
LensLike' f s a
Activation.vec'jobs) ((WorkflowActivationJob -> ReaderT WorkflowWorker m ())
-> ReaderT WorkflowWorker m ())
-> (WorkflowActivationJob -> ReaderT WorkflowWorker m ())
-> ReaderT WorkflowWorker m ()
forall a b. (a -> b) -> a -> b
$ \WorkflowActivationJob
job -> do
case WorkflowActivationJob
job WorkflowActivationJob
-> FoldLike
(Maybe WorkflowActivationJob'Variant)
WorkflowActivationJob
WorkflowActivationJob
(Maybe WorkflowActivationJob'Variant)
(Maybe WorkflowActivationJob'Variant)
-> Maybe WorkflowActivationJob'Variant
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
(Maybe WorkflowActivationJob'Variant)
WorkflowActivationJob
WorkflowActivationJob
(Maybe WorkflowActivationJob'Variant)
(Maybe WorkflowActivationJob'Variant)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'variant" a) =>
LensLike' f s a
Activation.maybe'variant of
Just (WorkflowActivationJob'RemoveFromCache RemoveFromCache
removeFromCache) -> do
let spanAttrs :: HashMap Text Attribute
spanAttrs =
[(Text, Attribute)] -> HashMap Text Attribute
forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
HashMap.fromList
[ (Text
"temporal.workflow.worker.instance_state", Text -> Attribute
forall a. ToAttribute a => a -> Attribute
toAttribute (Text
"evicted" :: Text))
, (Text
"temporal.activation.run_id", Text -> Attribute
forall a. ToAttribute a => a -> Attribute
toAttribute (Text -> Attribute) -> Text -> Attribute
forall a b. (a -> b) -> a -> b
$ WorkflowActivation
activation WorkflowActivation
-> FoldLike Text WorkflowActivation WorkflowActivation Text Text
-> Text
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike Text WorkflowActivation WorkflowActivation Text Text
forall (f :: * -> *) s a.
(Functor f, HasField s "runId" a) =>
LensLike' f s a
CommonProto.runId)
]
Text
-> SpanArguments
-> (Span -> ReaderT WorkflowWorker m ())
-> ReaderT WorkflowWorker m ()
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadTracer m, HasCallStack) =>
Text -> SpanArguments -> (Span -> m a) -> m a
inSpan' Text
"removeEvictedWorkflowInstance" (SpanArguments
defaultSpanArguments {attributes = spanAttrs}) ((Span -> ReaderT WorkflowWorker m ())
-> ReaderT WorkflowWorker m ())
-> (Span -> ReaderT WorkflowWorker m ())
-> ReaderT WorkflowWorker m ()
forall a b. (a -> b) -> a -> b
$ \Span
s -> do
worker <- ReaderT WorkflowWorker m WorkflowWorker
forall r (m :: * -> *). MonadReader r m => m r
ask
let runId_ = Text -> RunId
RunId (Text -> RunId) -> Text -> RunId
forall a b. (a -> b) -> a -> b
$ WorkflowActivation
activation WorkflowActivation
-> FoldLike Text WorkflowActivation WorkflowActivation Text Text
-> Text
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike Text WorkflowActivation WorkflowActivation Text Text
forall (f :: * -> *) s a.
(Functor f, HasField s "runId" a) =>
LensLike' f s a
CommonProto.runId
join $ atomically $ do
currentWorkflows <- readTVar worker.runningWorkflows
writeTVar worker.runningWorkflows $ HashMap.delete runId_ currentWorkflows
writeTChan worker.workerEvictionEmitter EvictionWithRunID {runId = runId_, eviction = removeFromCache}
case HashMap.lookup runId_ currentWorkflows of
Maybe WorkflowInstance
Nothing -> do
let msg :: Text
msg = String -> Text
Text.pack (String
"Eviction request on an unknown workflow with run ID " String -> ShowS
forall a. [a] -> [a] -> [a]
++ RunId -> String
forall a. Show a => a -> String
show RunId
runId_ String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
", message: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Text -> String
forall a. Show a => a -> String
show (RemoveFromCache
removeFromCache RemoveFromCache
-> FoldLike Text RemoveFromCache RemoveFromCache Text Text -> Text
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike Text RemoveFromCache RemoveFromCache Text Text
forall (f :: * -> *) s a.
(Functor f, HasField s "message" a) =>
LensLike' f s a
Activation.message))
ReaderT WorkflowWorker m () -> STM (ReaderT WorkflowWorker m ())
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ReaderT WorkflowWorker m () -> STM (ReaderT WorkflowWorker m ()))
-> ReaderT WorkflowWorker m () -> STM (ReaderT WorkflowWorker m ())
forall a b. (a -> b) -> a -> b
$ do
Span -> SpanStatus -> ReaderT WorkflowWorker m ()
forall (m :: * -> *). MonadIO m => Span -> SpanStatus -> m ()
setStatus Span
s (SpanStatus -> ReaderT WorkflowWorker m ())
-> SpanStatus -> ReaderT WorkflowWorker m ()
forall a b. (a -> b) -> a -> b
$ Text -> SpanStatus
Error Text
msg
$(logDebug) Text
msg
Just WorkflowInstance
wf -> do
ReaderT WorkflowWorker m () -> STM (ReaderT WorkflowWorker m ())
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ReaderT WorkflowWorker m () -> STM (ReaderT WorkflowWorker m ()))
-> ReaderT WorkflowWorker m () -> STM (ReaderT WorkflowWorker m ())
forall a b. (a -> b) -> a -> b
$ do
Async () -> ReaderT WorkflowWorker m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
cancel (Async () -> ReaderT WorkflowWorker m ())
-> ReaderT WorkflowWorker m (Async ())
-> ReaderT WorkflowWorker m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IORef (Async ()) -> ReaderT WorkflowWorker m (Async ())
forall (m :: * -> *) a. MonadIO m => IORef a -> m a
readIORef WorkflowInstance
wf.executionThread
$(logDebug) (Text -> ReaderT WorkflowWorker m ())
-> Text -> ReaderT WorkflowWorker m ()
forall a b. (a -> b) -> a -> b
$ String -> Text
Text.pack (String
"Evicting workflow instance with run ID " String -> ShowS
forall a. [a] -> [a] -> [a]
++ RunId -> String
forall a. Show a => a -> String
show RunId
runId_ String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
", message: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Text -> String
forall a. Show a => a -> String
show (RemoveFromCache
removeFromCache RemoveFromCache
-> FoldLike Text RemoveFromCache RemoveFromCache Text Text -> Text
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike Text RemoveFromCache RemoveFromCache Text Text
forall (f :: * -> *) s a.
(Functor f, HasField s "message" a) =>
LensLike' f s a
Activation.message))
Maybe WorkflowActivationJob'Variant
_ -> () -> ReaderT WorkflowWorker m ()
forall a. a -> ReaderT WorkflowWorker m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()