{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeApplications #-}
module Temporal.WorkflowInstance (
WorkflowInstance,
Info (..),
create,
activate,
addCommand,
nextActivitySequence,
nextChildWorkflowSequence,
nextExternalCancelSequence,
nextExternalSignalSequence,
nextTimerSequence,
nextConditionSequence,
addStackTraceHandler,
) where
import Control.Applicative
import qualified Control.Exception as E
import Control.Monad
import Control.Monad.Logger
import Control.Monad.Reader
import Data.Foldable
import Data.Functor.Identity
import qualified Data.HashMap.Strict as HashMap
import Data.ProtoLens
import Data.Proxy
import Data.Set (Set)
import qualified Data.Set as Set
import qualified Data.Text as Text
import Data.Time.Clock.System (SystemTime (..))
import Data.Vector (Vector)
import qualified Data.Vector as V
import GHC.Stack (HasCallStack, emptyCallStack)
import Lens.Family2
import qualified Proto.Temporal.Api.Failure.V1.Message_Fields as F
import qualified Proto.Temporal.Api.Failure.V1.Message_Fields as Failure
import Proto.Temporal.Sdk.Core.ChildWorkflow.ChildWorkflow (
StartChildWorkflowExecutionFailedCause (..),
)
import Proto.Temporal.Sdk.Core.WorkflowActivation.WorkflowActivation (
CancelWorkflow,
FireTimer,
NotifyHasPatch,
QueryWorkflow,
ResolveActivity,
ResolveChildWorkflowExecution,
ResolveChildWorkflowExecutionStart,
ResolveChildWorkflowExecutionStart'Status (..),
ResolveRequestCancelExternalWorkflow,
ResolveSignalExternalWorkflow,
SignalWorkflow,
StartWorkflow,
UpdateRandomSeed,
WorkflowActivation,
WorkflowActivationJob,
WorkflowActivationJob'Variant (..),
)
import qualified Proto.Temporal.Sdk.Core.WorkflowActivation.WorkflowActivation_Fields as Activation
import qualified Proto.Temporal.Sdk.Core.WorkflowCommands.WorkflowCommands as Command
import qualified Proto.Temporal.Sdk.Core.WorkflowCommands.WorkflowCommands_Fields as Command
import qualified Proto.Temporal.Sdk.Core.WorkflowCompletion.WorkflowCompletion as Completion
import qualified Proto.Temporal.Sdk.Core.WorkflowCompletion.WorkflowCompletion_Fields as Completion
import System.Random (mkStdGen)
import Temporal.Common
import qualified Temporal.Core.Worker as Core
import Temporal.Coroutine
import Temporal.Duration
import Temporal.Exception
import qualified Temporal.Exception as Err
import Temporal.Payload
import Temporal.SearchAttributes.Internal
import Temporal.Workflow.Eval (ActivationResult (..), SuspendableWorkflowExecution, injectWorkflowSignal, runWorkflow)
import Temporal.Workflow.Internal.Instance
import Temporal.Workflow.Internal.Monad
import Temporal.Workflow.Types
import UnliftIO
create
:: (HasCallStack, MonadLoggerIO m)
=> (Core.WorkflowActivationCompletion -> IO (Either Core.WorkerError ()))
-> (Vector Payload -> IO (Either String (Workflow Payload)))
-> Maybe Int
-> [ApplicationFailureHandler]
-> WorkflowInboundInterceptor
-> WorkflowOutboundInterceptor
-> PayloadProcessor
-> Info
-> StartWorkflow
-> m WorkflowInstance
create :: forall (m :: * -> *).
(?callStack::CallStack, MonadLoggerIO m) =>
(WorkflowActivationCompletion -> IO (Either WorkerError ()))
-> (Vector Payload -> IO (Either String (Workflow Payload)))
-> Maybe Int
-> [ApplicationFailureHandler]
-> WorkflowInboundInterceptor
-> WorkflowOutboundInterceptor
-> PayloadProcessor
-> Info
-> StartWorkflow
-> m WorkflowInstance
create WorkflowActivationCompletion -> IO (Either WorkerError ())
workflowCompleteActivation Vector Payload -> IO (Either String (Workflow Payload))
workflowFn Maybe Int
workflowDeadlockTimeout [ApplicationFailureHandler]
errorConverters WorkflowInboundInterceptor
inboundInterceptor WorkflowOutboundInterceptor
outboundInterceptor PayloadProcessor
payloadProcessor Info
info StartWorkflow
start = do
$Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall msg. ToLogStr msg => Loc -> Text -> LogLevel -> msg -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
logDebug Text
"Instantiating workflow instance"
workflowInstanceLogger <- m (Loc -> Text -> LogLevel -> LogStr -> IO ())
forall (m :: * -> *).
MonadLoggerIO m =>
m (Loc -> Text -> LogLevel -> LogStr -> IO ())
askLoggerIO
workflowRandomnessSeed <- WorkflowGenM <$> newIORef (mkStdGen 0)
workflowNotifiedPatches <- newIORef mempty
workflowMemoizedPatches <- newIORef mempty
workflowSequences <-
newIORef
Sequences
{ externalCancel = 1
, childWorkflow = 1
, externalSignal = 1
, timer = 1
, activity = 1
, condition = 1
, varId = 1
}
workflowTime <- newIORef $ MkSystemTime 0 0
workflowIsReplaying <- newIORef False
workflowSequenceMaps <- newTVarIO $ SequenceMaps mempty mempty mempty mempty mempty mempty
workflowCommands <- newTVarIO $ Reversed []
workflowSignalHandlers <- newIORef mempty
workflowCallStack <- newIORef emptyCallStack
workflowQueryHandlers <- newIORef mempty
workflowInstanceInfo <- newIORef info
workflowInstanceContinuationEnv <- ContinuationEnv <$> newIORef JobNil
workflowCancellationVar <- newIVar
activationChannel <- newTQueueIO
executionThread <- newIORef (error "Workflow thread not yet started")
let inst = WorkflowInstance {[ApplicationFailureHandler]
Maybe Int
TVar SequenceMaps
TVar (Reversed WorkflowCommand)
IORef Bool
IORef CallStack
IORef (HashMap (Maybe Text) (Vector Payload -> Workflow ()))
IORef
(HashMap
(Maybe Text)
(QueryId
-> Vector Payload
-> Map Text Payload
-> IO (Either SomeException Payload)))
IORef (HashMap PatchId Bool)
IORef (Set PatchId)
IORef (Async ())
IORef SystemTime
IORef Info
IORef Sequences
TQueue WorkflowActivation
PayloadProcessor
WorkflowOutboundInterceptor
WorkflowInboundInterceptor
IVar ()
WorkflowGenM
ContinuationEnv
Loc -> Text -> LogLevel -> LogStr -> IO ()
WorkflowActivationCompletion -> IO (Either WorkerError ())
workflowCompleteActivation :: WorkflowActivationCompletion -> IO (Either WorkerError ())
workflowDeadlockTimeout :: Maybe Int
errorConverters :: [ApplicationFailureHandler]
inboundInterceptor :: WorkflowInboundInterceptor
outboundInterceptor :: WorkflowOutboundInterceptor
payloadProcessor :: PayloadProcessor
workflowInstanceLogger :: Loc -> Text -> LogLevel -> LogStr -> IO ()
workflowRandomnessSeed :: WorkflowGenM
workflowNotifiedPatches :: IORef (Set PatchId)
workflowMemoizedPatches :: IORef (HashMap PatchId Bool)
workflowSequences :: IORef Sequences
workflowTime :: IORef SystemTime
workflowIsReplaying :: IORef Bool
workflowSequenceMaps :: TVar SequenceMaps
workflowCommands :: TVar (Reversed WorkflowCommand)
workflowSignalHandlers :: IORef (HashMap (Maybe Text) (Vector Payload -> Workflow ()))
workflowCallStack :: IORef CallStack
workflowQueryHandlers :: IORef
(HashMap
(Maybe Text)
(QueryId
-> Vector Payload
-> Map Text Payload
-> IO (Either SomeException Payload)))
workflowInstanceInfo :: IORef Info
workflowInstanceContinuationEnv :: ContinuationEnv
workflowCancellationVar :: IVar ()
activationChannel :: TQueue WorkflowActivation
executionThread :: IORef (Async ())
payloadProcessor :: PayloadProcessor
errorConverters :: [ApplicationFailureHandler]
outboundInterceptor :: WorkflowOutboundInterceptor
inboundInterceptor :: WorkflowInboundInterceptor
executionThread :: IORef (Async ())
activationChannel :: TQueue WorkflowActivation
workflowDeadlockTimeout :: Maybe Int
workflowCancellationVar :: IVar ()
workflowInstanceContinuationEnv :: ContinuationEnv
workflowCompleteActivation :: WorkflowActivationCompletion -> IO (Either WorkerError ())
workflowCallStack :: IORef CallStack
workflowQueryHandlers :: IORef
(HashMap
(Maybe Text)
(QueryId
-> Vector Payload
-> Map Text Payload
-> IO (Either SomeException Payload)))
workflowSignalHandlers :: IORef (HashMap (Maybe Text) (Vector Payload -> Workflow ()))
workflowSequenceMaps :: TVar SequenceMaps
workflowCommands :: TVar (Reversed WorkflowCommand)
workflowIsReplaying :: IORef Bool
workflowTime :: IORef SystemTime
workflowSequences :: IORef Sequences
workflowMemoizedPatches :: IORef (HashMap PatchId Bool)
workflowNotifiedPatches :: IORef (Set PatchId)
workflowRandomnessSeed :: WorkflowGenM
workflowInstanceLogger :: Loc -> Text -> LogLevel -> LogStr -> IO ()
workflowInstanceInfo :: IORef Info
..}
workerThread <- liftIO $ async $ runInstanceM inst $ do
$logDebug "Start workflow execution thread"
exec <- setUpWorkflowExecution start
res <- liftIO $ inboundInterceptor.executeWorkflow exec $ \ExecuteWorkflowInput
exec' -> WorkflowInstance
-> InstanceM (WorkflowExitVariant Payload)
-> IO (WorkflowExitVariant Payload)
forall a. WorkflowInstance -> InstanceM a -> IO a
runInstanceM WorkflowInstance
inst (InstanceM (WorkflowExitVariant Payload)
-> IO (WorkflowExitVariant Payload))
-> InstanceM (WorkflowExitVariant Payload)
-> IO (WorkflowExitVariant Payload)
forall a b. (a -> b) -> a -> b
$ InstanceM Payload -> InstanceM (WorkflowExitVariant Payload)
runTopLevel (InstanceM Payload -> InstanceM (WorkflowExitVariant Payload))
-> InstanceM Payload -> InstanceM (WorkflowExitVariant Payload)
forall a b. (a -> b) -> a -> b
$ do
$Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> InstanceM ()
(Text -> InstanceM ()) -> (Text -> Text) -> Text -> InstanceM ()
forall a. a -> a
forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> InstanceM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
logDebug Text
"Executing workflow"
wf <- ExecuteWorkflowInput
-> (Vector Payload -> IO (Either String (Workflow Payload)))
-> InstanceM (SuspendableWorkflowExecution Payload)
applyStartWorkflow ExecuteWorkflowInput
exec' Vector Payload -> IO (Either String (Workflow Payload))
workflowFn
runWorkflowToCompletion wf
$logDebug "Workflow execution completed"
addCommand =<< convertExitVariantToCommand res
flushCommands
$logDebug "Handling leftover queries"
handleQueriesAfterCompletion
link workerThread
writeIORef executionThread workerThread
pure inst
runWorkflowToCompletion :: HasCallStack => SuspendableWorkflowExecution Payload -> InstanceM Payload
runWorkflowToCompletion :: (?callStack::CallStack) =>
SuspendableWorkflowExecution Payload -> InstanceM Payload
runWorkflowToCompletion SuspendableWorkflowExecution Payload
wf = do
inst <- InstanceM WorkflowInstance
forall r (m :: * -> *). MonadReader r m => m r
ask
let completeStep :: Await [ActivationResult] (SuspendableWorkflowExecution Payload) -> InstanceM (SuspendableWorkflowExecution Payload)
completeStep Await [ActivationResult] (SuspendableWorkflowExecution Payload)
suspension = do
$Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> InstanceM ()
(Text -> InstanceM ()) -> (Text -> Text) -> Text -> InstanceM ()
forall a. a -> a
forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> InstanceM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
logDebug Text
"Awaiting activation results from workflow"
activation <- InstanceM (InstanceM WorkflowActivation)
-> InstanceM WorkflowActivation
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (InstanceM (InstanceM WorkflowActivation)
-> InstanceM WorkflowActivation)
-> InstanceM (InstanceM WorkflowActivation)
-> InstanceM WorkflowActivation
forall a b. (a -> b) -> a -> b
$ STM (InstanceM WorkflowActivation)
-> InstanceM (InstanceM WorkflowActivation)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (InstanceM WorkflowActivation)
-> InstanceM (InstanceM WorkflowActivation))
-> STM (InstanceM WorkflowActivation)
-> InstanceM (InstanceM WorkflowActivation)
forall a b. (a -> b) -> a -> b
$ do
mActivition <- TQueue WorkflowActivation -> STM (Maybe WorkflowActivation)
forall a. TQueue a -> STM (Maybe a)
tryReadTQueue WorkflowInstance
inst.activationChannel
case mActivition of
Maybe WorkflowActivation
Nothing -> do
InstanceM WorkflowActivation -> STM (InstanceM WorkflowActivation)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (InstanceM WorkflowActivation
-> STM (InstanceM WorkflowActivation))
-> InstanceM WorkflowActivation
-> STM (InstanceM WorkflowActivation)
forall a b. (a -> b) -> a -> b
$ do
InstanceM ()
(?callStack::CallStack) => InstanceM ()
flushCommands
STM WorkflowActivation -> InstanceM WorkflowActivation
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM WorkflowActivation -> InstanceM WorkflowActivation)
-> STM WorkflowActivation -> InstanceM WorkflowActivation
forall a b. (a -> b) -> a -> b
$ TQueue WorkflowActivation -> STM WorkflowActivation
forall a. TQueue a -> STM a
readTQueue WorkflowInstance
inst.activationChannel
Just WorkflowActivation
act -> InstanceM WorkflowActivation -> STM (InstanceM WorkflowActivation)
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (InstanceM WorkflowActivation
-> STM (InstanceM WorkflowActivation))
-> InstanceM WorkflowActivation
-> STM (InstanceM WorkflowActivation)
forall a b. (a -> b) -> a -> b
$ WorkflowActivation -> InstanceM WorkflowActivation
forall a. a -> InstanceM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure WorkflowActivation
act
fmap runIdentity $ activate activation $ Identity suspension
supplyM completeStep wf
handleQueriesAfterCompletion :: InstanceM ()
handleQueriesAfterCompletion :: InstanceM ()
handleQueriesAfterCompletion = InstanceM () -> InstanceM ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (InstanceM () -> InstanceM ()) -> InstanceM () -> InstanceM ()
forall a b. (a -> b) -> a -> b
$ do
w <- InstanceM WorkflowInstance
forall r (m :: * -> *). MonadReader r m => m r
ask
activation <- atomically . readTQueue =<< asks activationChannel
completion <- UnliftIO.try $ activate activation Proxy
case completion of
Left SomeException
err -> do
$(logDebug) (Text
"Workflow failure: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
Text.pack (SomeException -> String
forall a. Show a => a -> String
show SomeException
err))
let appFailure :: ApplicationFailure
appFailure = SomeException -> [ApplicationFailureHandler] -> ApplicationFailure
mkApplicationFailure SomeException
err WorkflowInstance
w.errorConverters
enrichedApplicationFailure :: Failure
enrichedApplicationFailure = ApplicationFailure -> Failure
applicationFailureToFailureProto ApplicationFailure
appFailure
failureProto :: Completion.Failure
failureProto :: Failure
failureProto = Failure
forall msg. Message msg => msg
defMessage Failure -> (Failure -> Failure) -> Failure
forall s t. s -> (s -> t) -> t
& LensLike' f Failure Failure
forall {f :: * -> *}. Identical f => LensLike' f Failure Failure
forall (f :: * -> *) s a.
(Functor f, HasField s "failure" a) =>
LensLike' f s a
Completion.failure (forall {f :: * -> *}. Identical f => LensLike' f Failure Failure)
-> Failure -> Failure -> Failure
forall s t a b. Setter s t a b -> b -> s -> t
.~ Failure
enrichedApplicationFailure
completionMessage :: WorkflowActivationCompletion
completionMessage =
WorkflowActivationCompletion
forall msg. Message msg => msg
defMessage
WorkflowActivationCompletion
-> (WorkflowActivationCompletion -> WorkflowActivationCompletion)
-> WorkflowActivationCompletion
forall s t. s -> (s -> t) -> t
& LensLike' f WorkflowActivationCompletion Text
forall {f :: * -> *}.
Identical f =>
LensLike' f WorkflowActivationCompletion Text
forall (f :: * -> *) s a.
(Functor f, HasField s "runId" a) =>
LensLike' f s a
Completion.runId (forall {f :: * -> *}.
Identical f =>
LensLike' f WorkflowActivationCompletion Text)
-> Text
-> WorkflowActivationCompletion
-> WorkflowActivationCompletion
forall s t a b. Setter s t a b -> b -> s -> t
.~ (WorkflowActivation
activation WorkflowActivation
-> FoldLike Text WorkflowActivation WorkflowActivation Text Text
-> Text
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike Text WorkflowActivation WorkflowActivation Text Text
forall (f :: * -> *) s a.
(Functor f, HasField s "runId" a) =>
LensLike' f s a
Activation.runId)
WorkflowActivationCompletion
-> (WorkflowActivationCompletion -> WorkflowActivationCompletion)
-> WorkflowActivationCompletion
forall s t. s -> (s -> t) -> t
& LensLike' f WorkflowActivationCompletion Failure
forall {f :: * -> *}.
Identical f =>
LensLike' f WorkflowActivationCompletion Failure
forall (f :: * -> *) s a.
(Functor f, HasField s "failed" a) =>
LensLike' f s a
Completion.failed (forall {f :: * -> *}.
Identical f =>
LensLike' f WorkflowActivationCompletion Failure)
-> Failure
-> WorkflowActivationCompletion
-> WorkflowActivationCompletion
forall s t a b. Setter s t a b -> b -> s -> t
.~ Failure
failureProto
inst <- InstanceM WorkflowInstance
forall r (m :: * -> *). MonadReader r m => m r
ask
liftIO (inst.workflowCompleteActivation completionMessage >>= either throwIO pure)
Right Proxy (SuspendableWorkflowExecution Payload)
Proxy -> do
flushCommands
addStackTraceHandler :: WorkflowInstance -> IO ()
addStackTraceHandler :: WorkflowInstance -> IO ()
addStackTraceHandler WorkflowInstance
inst = do
let specialHandler :: QueryId
-> Vector Payload
-> Map Text Payload
-> IO (Either SomeException Payload)
specialHandler QueryId
_ Vector Payload
_ Map Text Payload
_ = do
cs <- IORef CallStack -> IO CallStack
forall (m :: * -> *) a. MonadIO m => IORef a -> m a
readIORef WorkflowInstance
inst.workflowCallStack
Right <$> Temporal.Payload.encode JSON (Text.pack $ Temporal.Exception.prettyCallStack cs)
IORef
(HashMap
(Maybe Text)
(QueryId
-> Vector Payload
-> Map Text Payload
-> IO (Either SomeException Payload)))
-> (HashMap
(Maybe Text)
(QueryId
-> Vector Payload
-> Map Text Payload
-> IO (Either SomeException Payload))
-> HashMap
(Maybe Text)
(QueryId
-> Vector Payload
-> Map Text Payload
-> IO (Either SomeException Payload)))
-> IO ()
forall (m :: * -> *) a. MonadIO m => IORef a -> (a -> a) -> m ()
modifyIORef' WorkflowInstance
inst.workflowQueryHandlers (Maybe Text
-> (QueryId
-> Vector Payload
-> Map Text Payload
-> IO (Either SomeException Payload))
-> HashMap
(Maybe Text)
(QueryId
-> Vector Payload
-> Map Text Payload
-> IO (Either SomeException Payload))
-> HashMap
(Maybe Text)
(QueryId
-> Vector Payload
-> Map Text Payload
-> IO (Either SomeException Payload))
forall k v.
(Eq k, Hashable k) =>
k -> v -> HashMap k v -> HashMap k v
HashMap.insert (Text -> Maybe Text
forall a. a -> Maybe a
Just Text
"__stack_trace") QueryId
-> Vector Payload
-> Map Text Payload
-> IO (Either SomeException Payload)
specialHandler)
activate
:: Functor f
=> WorkflowActivation
-> f (Await [ActivationResult] (SuspendableWorkflowExecution Payload))
-> InstanceM (f (SuspendableWorkflowExecution Payload))
activate :: forall (f :: * -> *).
Functor f =>
WorkflowActivation
-> f (Await
[ActivationResult] (SuspendableWorkflowExecution Payload))
-> InstanceM (f (SuspendableWorkflowExecution Payload))
activate WorkflowActivation
act f (Await [ActivationResult] (SuspendableWorkflowExecution Payload))
suspension = do
inst <- InstanceM WorkflowInstance
forall r (m :: * -> *). MonadReader r m => m r
ask
info <- atomicModifyIORef' inst.workflowInstanceInfo $ \Info
info ->
let info' :: Info
info' =
Info
info
{ historyLength = act ^. Activation.historyLength
, continueAsNewSuggested = act ^. Activation.continueAsNewSuggested
}
in (Info
info', Info
info')
let completionBase = WorkflowActivationCompletion
forall msg. Message msg => msg
defMessage WorkflowActivationCompletion
-> (WorkflowActivationCompletion -> WorkflowActivationCompletion)
-> WorkflowActivationCompletion
forall s t. s -> (s -> t) -> t
& LensLike' f WorkflowActivationCompletion Text
forall {f :: * -> *}.
Identical f =>
LensLike' f WorkflowActivationCompletion Text
forall (f :: * -> *) s a.
(Functor f, HasField s "runId" a) =>
LensLike' f s a
Completion.runId (forall {f :: * -> *}.
Identical f =>
LensLike' f WorkflowActivationCompletion Text)
-> Text
-> WorkflowActivationCompletion
-> WorkflowActivationCompletion
forall s t a b. Setter s t a b -> b -> s -> t
.~ RunId -> Text
rawRunId Info
info.runId
writeIORef inst.workflowTime (act ^. Activation.timestamp . to timespecFromTimestamp)
eResult <- case inst.workflowDeadlockTimeout of
Maybe Int
Nothing -> Vector WorkflowActivationJob
-> f (Await
[ActivationResult] (SuspendableWorkflowExecution Payload))
-> InstanceM
(Either SomeException (f (SuspendableWorkflowExecution Payload)))
forall (f :: * -> *).
Functor f =>
Vector WorkflowActivationJob
-> f (Await
[ActivationResult] (SuspendableWorkflowExecution Payload))
-> InstanceM
(Either SomeException (f (SuspendableWorkflowExecution Payload)))
applyJobs (WorkflowActivation
act WorkflowActivation
-> FoldLike
(Vector WorkflowActivationJob)
WorkflowActivation
WorkflowActivation
(Vector WorkflowActivationJob)
(Vector WorkflowActivationJob)
-> Vector WorkflowActivationJob
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
(Vector WorkflowActivationJob)
WorkflowActivation
WorkflowActivation
(Vector WorkflowActivationJob)
(Vector WorkflowActivationJob)
forall (f :: * -> *) s a.
(Functor f, HasField s "vec'jobs" a) =>
LensLike' f s a
Activation.vec'jobs) f (Await [ActivationResult] (SuspendableWorkflowExecution Payload))
suspension
Just Int
timeoutDuration -> do
res <- Int
-> InstanceM
(Either SomeException (f (SuspendableWorkflowExecution Payload)))
-> InstanceM
(Maybe
(Either SomeException (f (SuspendableWorkflowExecution Payload))))
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> m a -> m (Maybe a)
UnliftIO.timeout Int
timeoutDuration (InstanceM
(Either SomeException (f (SuspendableWorkflowExecution Payload)))
-> InstanceM
(Maybe
(Either SomeException (f (SuspendableWorkflowExecution Payload)))))
-> InstanceM
(Either SomeException (f (SuspendableWorkflowExecution Payload)))
-> InstanceM
(Maybe
(Either SomeException (f (SuspendableWorkflowExecution Payload))))
forall a b. (a -> b) -> a -> b
$ Vector WorkflowActivationJob
-> f (Await
[ActivationResult] (SuspendableWorkflowExecution Payload))
-> InstanceM
(Either SomeException (f (SuspendableWorkflowExecution Payload)))
forall (f :: * -> *).
Functor f =>
Vector WorkflowActivationJob
-> f (Await
[ActivationResult] (SuspendableWorkflowExecution Payload))
-> InstanceM
(Either SomeException (f (SuspendableWorkflowExecution Payload)))
applyJobs (WorkflowActivation
act WorkflowActivation
-> FoldLike
(Vector WorkflowActivationJob)
WorkflowActivation
WorkflowActivation
(Vector WorkflowActivationJob)
(Vector WorkflowActivationJob)
-> Vector WorkflowActivationJob
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
(Vector WorkflowActivationJob)
WorkflowActivation
WorkflowActivation
(Vector WorkflowActivationJob)
(Vector WorkflowActivationJob)
forall (f :: * -> *) s a.
(Functor f, HasField s "vec'jobs" a) =>
LensLike' f s a
Activation.vec'jobs) f (Await [ActivationResult] (SuspendableWorkflowExecution Payload))
suspension
case res of
Maybe
(Either SomeException (f (SuspendableWorkflowExecution Payload)))
Nothing -> do
$(logError) Text
"Deadlock detected"
Either SomeException (f (SuspendableWorkflowExecution Payload))
-> InstanceM
(Either SomeException (f (SuspendableWorkflowExecution Payload)))
forall a. a -> InstanceM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either SomeException (f (SuspendableWorkflowExecution Payload))
-> InstanceM
(Either SomeException (f (SuspendableWorkflowExecution Payload))))
-> Either SomeException (f (SuspendableWorkflowExecution Payload))
-> InstanceM
(Either SomeException (f (SuspendableWorkflowExecution Payload)))
forall a b. (a -> b) -> a -> b
$ SomeException
-> Either SomeException (f (SuspendableWorkflowExecution Payload))
forall a b. a -> Either a b
Left (SomeException
-> Either SomeException (f (SuspendableWorkflowExecution Payload)))
-> SomeException
-> Either SomeException (f (SuspendableWorkflowExecution Payload))
forall a b. (a -> b) -> a -> b
$ LogicBug -> SomeException
forall e. Exception e => e -> SomeException
toException (LogicBug -> SomeException) -> LogicBug -> SomeException
forall a b. (a -> b) -> a -> b
$ LogicBugType -> LogicBug
LogicBug LogicBugType
WorkflowActivationDeadlock
Just Either SomeException (f (SuspendableWorkflowExecution Payload))
res' -> Either SomeException (f (SuspendableWorkflowExecution Payload))
-> InstanceM
(Either SomeException (f (SuspendableWorkflowExecution Payload)))
forall a. a -> InstanceM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Either SomeException (f (SuspendableWorkflowExecution Payload))
res'
case eResult of
Left SomeException
err -> do
$(logWarn) Text
"Failed activation on workflow"
let failure :: Failure
failure =
Failure
forall msg. Message msg => msg
defMessage
Failure -> (Failure -> Failure) -> Failure
forall s t. s -> (s -> t) -> t
& LensLike' f Failure Failure
forall {f :: * -> *}. Identical f => LensLike' f Failure Failure
forall (f :: * -> *) s a.
(Functor f, HasField s "failure" a) =>
LensLike' f s a
Completion.failure (forall {f :: * -> *}. Identical f => LensLike' f Failure Failure)
-> Failure -> Failure -> Failure
forall s t a b. Setter s t a b -> b -> s -> t
.~ (Failure
forall msg. Message msg => msg
defMessage Failure -> (Failure -> Failure) -> Failure
forall s t. s -> (s -> t) -> t
& LensLike' f Failure Text
forall {f :: * -> *}. Identical f => LensLike' f Failure Text
forall (f :: * -> *) s a.
(Functor f, HasField s "message" a) =>
LensLike' f s a
Failure.message (forall {f :: * -> *}. Identical f => LensLike' f Failure Text)
-> Text -> Failure -> Failure
forall s t a b. Setter s t a b -> b -> s -> t
.~ String -> Text
Text.pack (SomeException -> String
forall a. Show a => a -> String
show SomeException
err))
completion :: WorkflowActivationCompletion
completion =
WorkflowActivationCompletion
completionBase
WorkflowActivationCompletion
-> (WorkflowActivationCompletion -> WorkflowActivationCompletion)
-> WorkflowActivationCompletion
forall s t. s -> (s -> t) -> t
& LensLike' f WorkflowActivationCompletion Failure
forall {f :: * -> *}.
Identical f =>
LensLike' f WorkflowActivationCompletion Failure
forall (f :: * -> *) s a.
(Functor f, HasField s "failed" a) =>
LensLike' f s a
Completion.failed (forall {f :: * -> *}.
Identical f =>
LensLike' f WorkflowActivationCompletion Failure)
-> Failure
-> WorkflowActivationCompletion
-> WorkflowActivationCompletion
forall s t a b. Setter s t a b -> b -> s -> t
.~ Failure
failure
IO () -> InstanceM ()
forall a. IO a -> InstanceM a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (WorkflowInstance
inst.workflowCompleteActivation WorkflowActivationCompletion
completion IO (Either WorkerError ())
-> (Either WorkerError () -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (WorkerError -> IO ())
-> (() -> IO ()) -> Either WorkerError () -> IO ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either WorkerError -> IO ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure)
SomeException
-> InstanceM (f (SuspendableWorkflowExecution Payload))
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO SomeException
err
Right f (SuspendableWorkflowExecution Payload)
f -> f (SuspendableWorkflowExecution Payload)
-> InstanceM (f (SuspendableWorkflowExecution Payload))
forall a. a -> InstanceM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure f (SuspendableWorkflowExecution Payload)
f
setUpWorkflowExecution :: StartWorkflow -> InstanceM ExecuteWorkflowInput
setUpWorkflowExecution :: StartWorkflow -> InstanceM ExecuteWorkflowInput
setUpWorkflowExecution StartWorkflow
startWorkflow = do
inst <- InstanceM WorkflowInstance
forall r (m :: * -> *). MonadReader r m => m r
ask
let (WorkflowGenM genRef) = inst.workflowRandomnessSeed
writeIORef genRef (mkStdGen $ fromIntegral $ startWorkflow ^. Activation.randomnessSeed)
writeIORef inst.workflowTime (startWorkflow ^. Activation.startTime . to timespecFromTimestamp)
info <- readIORef inst.workflowInstanceInfo
pure $
ExecuteWorkflowInput
{ executeWorkflowInputType = startWorkflow ^. Activation.workflowType
, executeWorkflowInputArgs = fmap convertFromProtoPayload (startWorkflow ^. Command.vec'arguments)
, executeWorkflowInputHeaders = fmap convertFromProtoPayload (startWorkflow ^. Activation.headers)
, executeWorkflowInputInfo = info
}
applyStartWorkflow :: ExecuteWorkflowInput -> (Vector Payload -> IO (Either String (Workflow Payload))) -> InstanceM (SuspendableWorkflowExecution Payload)
applyStartWorkflow :: ExecuteWorkflowInput
-> (Vector Payload -> IO (Either String (Workflow Payload)))
-> InstanceM (SuspendableWorkflowExecution Payload)
applyStartWorkflow ExecuteWorkflowInput
execInput Vector Payload -> IO (Either String (Workflow Payload))
workflowFn = do
inst <- InstanceM WorkflowInstance
forall r (m :: * -> *). MonadReader r m => m r
ask
let executeWorkflowBase ExecuteWorkflowInput
input = WorkflowInstance
-> InstanceM (SuspendableWorkflowExecution Payload)
-> IO (SuspendableWorkflowExecution Payload)
forall a. WorkflowInstance -> InstanceM a -> IO a
runInstanceM WorkflowInstance
inst (InstanceM (SuspendableWorkflowExecution Payload)
-> IO (SuspendableWorkflowExecution Payload))
-> InstanceM (SuspendableWorkflowExecution Payload)
-> IO (SuspendableWorkflowExecution Payload)
forall a b. (a -> b) -> a -> b
$ do
eAct <- IO (Either String (Workflow Payload))
-> InstanceM (Either String (Workflow Payload))
forall a. IO a -> InstanceM a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Vector Payload -> IO (Either String (Workflow Payload))
workflowFn (Vector Payload -> IO (Either String (Workflow Payload)))
-> IO (Vector Payload) -> IO (Either String (Workflow Payload))
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< PayloadProcessor -> Vector Payload -> IO (Vector Payload)
forall (m :: * -> *) (f :: * -> *).
(MonadIO m, Traversable f) =>
PayloadProcessor -> f Payload -> m (f Payload)
processorDecodePayloads WorkflowInstance
inst.payloadProcessor ExecuteWorkflowInput
input.executeWorkflowInputArgs)
case eAct of
Left String
msg -> do
$(logError) (Text -> InstanceM ()) -> Text -> InstanceM ()
forall a b. (a -> b) -> a -> b
$ String -> Text
Text.pack (String
"Failed to decode workflow arguments: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
msg)
ValueError -> InstanceM (SuspendableWorkflowExecution Payload)
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (String -> ValueError
ValueError String
msg)
Right Workflow Payload
act -> do
$(logDebug) Text
"Calling runWorkflow"
SuspendableWorkflowExecution Payload
-> InstanceM (SuspendableWorkflowExecution Payload)
forall a. a -> InstanceM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((RequireCallStackImpl => Workflow Payload)
-> SuspendableWorkflowExecution Payload
forall a.
(?callStack::CallStack) =>
(RequireCallStackImpl => Workflow a)
-> SuspendableWorkflowExecution a
runWorkflow Workflow Payload
RequireCallStackImpl => Workflow Payload
act)
liftIO $ executeWorkflowBase execInput
applyUpdateRandomSeed :: UpdateRandomSeed -> InstanceM ()
applyUpdateRandomSeed :: UpdateRandomSeed -> InstanceM ()
applyUpdateRandomSeed UpdateRandomSeed
updateRandomSeed = do
inst <- InstanceM WorkflowInstance
forall r (m :: * -> *). MonadReader r m => m r
ask
let (WorkflowGenM genRef) = inst.workflowRandomnessSeed
writeIORef genRef (mkStdGen $ fromIntegral $ updateRandomSeed ^. Activation.randomnessSeed)
applyQueryWorkflow :: HasCallStack => QueryWorkflow -> InstanceM ()
applyQueryWorkflow :: (?callStack::CallStack) => QueryWorkflow -> InstanceM ()
applyQueryWorkflow QueryWorkflow
queryWorkflow = do
inst <- InstanceM WorkflowInstance
forall r (m :: * -> *). MonadReader r m => m r
ask
handles <- readIORef inst.workflowQueryHandlers
$logDebug $ Text.pack ("Applying query: " <> show (queryWorkflow ^. Activation.queryType))
let processor = WorkflowInstance
inst.payloadProcessor
args <- processorDecodePayloads processor (fmap convertFromProtoPayload (queryWorkflow ^. Command.vec'arguments))
hdrs <- processorDecodePayloads processor (fmap convertFromProtoPayload (queryWorkflow ^. Activation.headers))
let baseInput =
HandleQueryInput
{ handleQueryId :: Text
handleQueryId = QueryWorkflow
queryWorkflow QueryWorkflow
-> FoldLike Text QueryWorkflow QueryWorkflow Text Text -> Text
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike Text QueryWorkflow QueryWorkflow Text Text
forall (f :: * -> *) s a.
(Functor f, HasField s "queryId" a) =>
LensLike' f s a
Activation.queryId
, handleQueryInputType :: Text
handleQueryInputType = QueryWorkflow
queryWorkflow QueryWorkflow
-> FoldLike Text QueryWorkflow QueryWorkflow Text Text -> Text
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike Text QueryWorkflow QueryWorkflow Text Text
forall (f :: * -> *) s a.
(Functor f, HasField s "queryType" a) =>
LensLike' f s a
Activation.queryType
, handleQueryInputArgs :: Vector Payload
handleQueryInputArgs = Vector Payload
args
, handleQueryInputHeaders :: Map Text Payload
handleQueryInputHeaders = Map Text Payload
hdrs
}
res <- liftIO $ inst.inboundInterceptor.handleQuery baseInput $ \HandleQueryInput
input -> do
let handlerOrDefault :: Maybe
(QueryId
-> Vector Payload
-> Map Text Payload
-> IO (Either SomeException Payload))
handlerOrDefault =
Maybe Text
-> HashMap
(Maybe Text)
(QueryId
-> Vector Payload
-> Map Text Payload
-> IO (Either SomeException Payload))
-> Maybe
(QueryId
-> Vector Payload
-> Map Text Payload
-> IO (Either SomeException Payload))
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup (Text -> Maybe Text
forall a. a -> Maybe a
Just HandleQueryInput
input.handleQueryInputType) HashMap
(Maybe Text)
(QueryId
-> Vector Payload
-> Map Text Payload
-> IO (Either SomeException Payload))
handles
Maybe
(QueryId
-> Vector Payload
-> Map Text Payload
-> IO (Either SomeException Payload))
-> Maybe
(QueryId
-> Vector Payload
-> Map Text Payload
-> IO (Either SomeException Payload))
-> Maybe
(QueryId
-> Vector Payload
-> Map Text Payload
-> IO (Either SomeException Payload))
forall a. Maybe a -> Maybe a -> Maybe a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Maybe Text
-> HashMap
(Maybe Text)
(QueryId
-> Vector Payload
-> Map Text Payload
-> IO (Either SomeException Payload))
-> Maybe
(QueryId
-> Vector Payload
-> Map Text Payload
-> IO (Either SomeException Payload))
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup Maybe Text
forall a. Maybe a
Nothing HashMap
(Maybe Text)
(QueryId
-> Vector Payload
-> Map Text Payload
-> IO (Either SomeException Payload))
handles
case Maybe
(QueryId
-> Vector Payload
-> Map Text Payload
-> IO (Either SomeException Payload))
handlerOrDefault of
Maybe
(QueryId
-> Vector Payload
-> Map Text Payload
-> IO (Either SomeException Payload))
Nothing -> do
Either SomeException Payload -> IO (Either SomeException Payload)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either SomeException Payload -> IO (Either SomeException Payload))
-> Either SomeException Payload
-> IO (Either SomeException Payload)
forall a b. (a -> b) -> a -> b
$ SomeException -> Either SomeException Payload
forall a b. a -> Either a b
Left (SomeException -> Either SomeException Payload)
-> SomeException -> Either SomeException Payload
forall a b. (a -> b) -> a -> b
$ QueryNotFound -> SomeException
forall e. Exception e => e -> SomeException
toException (QueryNotFound -> SomeException) -> QueryNotFound -> SomeException
forall a b. (a -> b) -> a -> b
$ String -> QueryNotFound
QueryNotFound (String -> QueryNotFound) -> String -> QueryNotFound
forall a b. (a -> b) -> a -> b
$ Text -> String
Text.unpack HandleQueryInput
input.handleQueryInputType
Just QueryId
-> Vector Payload
-> Map Text Payload
-> IO (Either SomeException Payload)
h ->
IO (Either SomeException Payload)
-> IO (Either SomeException Payload)
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either SomeException Payload)
-> IO (Either SomeException Payload))
-> IO (Either SomeException Payload)
-> IO (Either SomeException Payload)
forall a b. (a -> b) -> a -> b
$
QueryId
-> Vector Payload
-> Map Text Payload
-> IO (Either SomeException Payload)
h
(Text -> QueryId
QueryId HandleQueryInput
input.handleQueryId)
HandleQueryInput
input.handleQueryInputArgs
HandleQueryInput
input.handleQueryInputHeaders
cmd <- case res of
Left SomeException
err ->
QueryResult -> InstanceM QueryResult
forall a. a -> InstanceM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (QueryResult -> InstanceM QueryResult)
-> QueryResult -> InstanceM QueryResult
forall a b. (a -> b) -> a -> b
$
QueryResult
forall msg. Message msg => msg
defMessage
QueryResult -> (QueryResult -> QueryResult) -> QueryResult
forall s t. s -> (s -> t) -> t
& LensLike' f QueryResult Failure
forall {f :: * -> *}.
Identical f =>
LensLike' f QueryResult Failure
forall (f :: * -> *) s a.
(Functor f, HasField s "failed" a) =>
LensLike' f s a
Command.failed
(forall {f :: * -> *}.
Identical f =>
LensLike' f QueryResult Failure)
-> Failure -> QueryResult -> QueryResult
forall s t a b. Setter s t a b -> b -> s -> t
.~ ( Failure
forall msg. Message msg => msg
defMessage
Failure -> (Failure -> Failure) -> Failure
forall s t. s -> (s -> t) -> t
& LensLike' f Failure Text
forall {f :: * -> *}. Identical f => LensLike' f Failure Text
forall (f :: * -> *) s a.
(Functor f, HasField s "message" a) =>
LensLike' f s a
F.message (forall {f :: * -> *}. Identical f => LensLike' f Failure Text)
-> Text -> Failure -> Failure
forall s t a b. Setter s t a b -> b -> s -> t
.~ String -> Text
Text.pack (SomeException -> String
forall a. Show a => a -> String
show SomeException
err)
)
Right Payload
ok -> do
res' <- IO Payload -> InstanceM Payload
forall a. IO a -> InstanceM a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Payload -> InstanceM Payload)
-> IO Payload -> InstanceM Payload
forall a b. (a -> b) -> a -> b
$ PayloadProcessor -> Payload -> IO Payload
payloadProcessorEncode PayloadProcessor
processor Payload
ok
pure $
defMessage
& Command.queryId .~ baseInput.handleQueryId
& Command.succeeded
.~ ( defMessage
& Command.response .~ convertToProtoPayload res'
)
addCommand $ defMessage & Command.respondToQuery .~ cmd
applySignalWorkflow :: SignalWorkflow -> Workflow ()
applySignalWorkflow :: SignalWorkflow -> Workflow ()
applySignalWorkflow SignalWorkflow
signalWorkflow = Workflow (Workflow ()) -> Workflow ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (Workflow (Workflow ()) -> Workflow ())
-> Workflow (Workflow ()) -> Workflow ()
forall a b. (a -> b) -> a -> b
$ do
(ContinuationEnv -> InstanceM (Result (Workflow ())))
-> Workflow (Workflow ())
forall a. (ContinuationEnv -> InstanceM (Result a)) -> Workflow a
Workflow ((ContinuationEnv -> InstanceM (Result (Workflow ())))
-> Workflow (Workflow ()))
-> (ContinuationEnv -> InstanceM (Result (Workflow ())))
-> Workflow (Workflow ())
forall a b. (a -> b) -> a -> b
$ \ContinuationEnv
_ -> do
inst <- InstanceM WorkflowInstance
forall r (m :: * -> *). MonadReader r m => m r
ask
handlers <- readIORef inst.workflowSignalHandlers
let handlerOrDefault =
Maybe Text
-> HashMap (Maybe Text) (Vector Payload -> Workflow ())
-> Maybe (Vector Payload -> Workflow ())
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup (Text -> Maybe Text
forall a. a -> Maybe a
Just (SignalWorkflow
signalWorkflow SignalWorkflow
-> FoldLike Text SignalWorkflow SignalWorkflow Text Text -> Text
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike Text SignalWorkflow SignalWorkflow Text Text
forall (f :: * -> *) s a.
(Functor f, HasField s "signalName" a) =>
LensLike' f s a
Activation.signalName)) HashMap (Maybe Text) (Vector Payload -> Workflow ())
handlers
Maybe (Vector Payload -> Workflow ())
-> Maybe (Vector Payload -> Workflow ())
-> Maybe (Vector Payload -> Workflow ())
forall a. Maybe a -> Maybe a -> Maybe a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Maybe Text
-> HashMap (Maybe Text) (Vector Payload -> Workflow ())
-> Maybe (Vector Payload -> Workflow ())
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup Maybe Text
forall a. Maybe a
Nothing HashMap (Maybe Text) (Vector Payload -> Workflow ())
handlers
case handlerOrDefault of
Maybe (Vector Payload -> Workflow ())
Nothing -> Result (Workflow ()) -> InstanceM (Result (Workflow ()))
forall a. a -> InstanceM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Result (Workflow ()) -> InstanceM (Result (Workflow ())))
-> Result (Workflow ()) -> InstanceM (Result (Workflow ()))
forall a b. (a -> b) -> a -> b
$ Workflow () -> Result (Workflow ())
forall a. a -> Result a
Done (Workflow () -> Result (Workflow ()))
-> Workflow () -> Result (Workflow ())
forall a b. (a -> b) -> a -> b
$ do
$(logWarn) (Text -> Workflow ()) -> Text -> Workflow ()
forall a b. (a -> b) -> a -> b
$ String -> Text
Text.pack (String
"No signal handler found for signal: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Text -> String
forall a. Show a => a -> String
show (SignalWorkflow
signalWorkflow SignalWorkflow
-> FoldLike Text SignalWorkflow SignalWorkflow Text Text -> Text
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike Text SignalWorkflow SignalWorkflow Text Text
forall (f :: * -> *) s a.
(Functor f, HasField s "signalName" a) =>
LensLike' f s a
Activation.signalName))
() -> Workflow ()
forall a. a -> Workflow a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just Vector Payload -> Workflow ()
handler -> do
eInputs <- InstanceM (Vector Payload)
-> InstanceM (Either SomeException (Vector Payload))
forall (m :: * -> *) e a.
(MonadUnliftIO m, Exception e) =>
m a -> m (Either e a)
UnliftIO.try (InstanceM (Vector Payload)
-> InstanceM (Either SomeException (Vector Payload)))
-> InstanceM (Vector Payload)
-> InstanceM (Either SomeException (Vector Payload))
forall a b. (a -> b) -> a -> b
$ PayloadProcessor -> Vector Payload -> InstanceM (Vector Payload)
forall (m :: * -> *) (f :: * -> *).
(MonadIO m, Traversable f) =>
PayloadProcessor -> f Payload -> m (f Payload)
processorDecodePayloads WorkflowInstance
inst.payloadProcessor ((Payload -> Payload) -> Vector Payload -> Vector Payload
forall a b. (a -> b) -> Vector a -> Vector b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Payload -> Payload
convertFromProtoPayload (SignalWorkflow
signalWorkflow SignalWorkflow
-> FoldLike
(Vector Payload)
SignalWorkflow
SignalWorkflow
(Vector Payload)
(Vector Payload)
-> Vector Payload
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
(Vector Payload)
SignalWorkflow
SignalWorkflow
(Vector Payload)
(Vector Payload)
forall (f :: * -> *) s a.
(Functor f, HasField s "vec'input" a) =>
LensLike' f s a
Command.vec'input))
case eInputs of
Left SomeException
err -> Result (Workflow ()) -> InstanceM (Result (Workflow ()))
forall a. a -> InstanceM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Result (Workflow ()) -> InstanceM (Result (Workflow ())))
-> Result (Workflow ()) -> InstanceM (Result (Workflow ()))
forall a b. (a -> b) -> a -> b
$ SomeException -> Result (Workflow ())
forall a. SomeException -> Result a
Throw SomeException
err
Right Vector Payload
args -> Result (Workflow ()) -> InstanceM (Result (Workflow ()))
forall a. a -> InstanceM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Result (Workflow ()) -> InstanceM (Result (Workflow ())))
-> Result (Workflow ()) -> InstanceM (Result (Workflow ()))
forall a b. (a -> b) -> a -> b
$ Workflow () -> Result (Workflow ())
forall a. a -> Result a
Done (Workflow () -> Result (Workflow ()))
-> Workflow () -> Result (Workflow ())
forall a b. (a -> b) -> a -> b
$ do
$(logDebug) (Text -> Workflow ()) -> Text -> Workflow ()
forall a b. (a -> b) -> a -> b
$ String -> Text
Text.pack (String
"Applying signal handler for signal: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Text -> String
forall a. Show a => a -> String
show (SignalWorkflow
signalWorkflow SignalWorkflow
-> FoldLike Text SignalWorkflow SignalWorkflow Text Text -> Text
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike Text SignalWorkflow SignalWorkflow Text Text
forall (f :: * -> *) s a.
(Functor f, HasField s "signalName" a) =>
LensLike' f s a
Activation.signalName))
Vector Payload -> Workflow ()
handler Vector Payload
args
applyNotifyHasPatch :: NotifyHasPatch -> InstanceM ()
applyNotifyHasPatch :: NotifyHasPatch -> InstanceM ()
applyNotifyHasPatch NotifyHasPatch
notifyHasPatch = do
inst <- InstanceM WorkflowInstance
forall r (m :: * -> *). MonadReader r m => m r
ask
let patches :: IORef (Set PatchId)
patches = WorkflowInstance
inst.workflowNotifiedPatches
atomicModifyIORef' patches $ \Set PatchId
patchSet -> (PatchId -> Set PatchId -> Set PatchId
forall a. Ord a => a -> Set a -> Set a
Set.insert (NotifyHasPatch
notifyHasPatch NotifyHasPatch
-> FoldLike PatchId NotifyHasPatch NotifyHasPatch PatchId Any
-> PatchId
forall s a t b. s -> FoldLike a s t a b -> a
^. LensLike' (Constant PatchId) NotifyHasPatch Text
forall (f :: * -> *) s a.
(Functor f, HasField s "patchId" a) =>
LensLike' f s a
Activation.patchId LensLike' (Constant PatchId) NotifyHasPatch Text
-> ((PatchId -> Constant PatchId Any)
-> Text -> Constant PatchId Text)
-> FoldLike PatchId NotifyHasPatch NotifyHasPatch PatchId Any
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Text -> PatchId) -> Getter Text Text PatchId Any
forall s a t b. (s -> a) -> Getter s t a b
to Text -> PatchId
PatchId) Set PatchId
patchSet, ())
data PendingJob
= PendingJobResolveActivity ResolveActivity
| PendingJobResolveChildWorkflowExecutionStart ResolveChildWorkflowExecutionStart
| PendingJobResolveChildWorkflowExecution ResolveChildWorkflowExecution
| PendingJobResolveSignalExternalWorkflow ResolveSignalExternalWorkflow
| PendingJobResolveRequestCancelExternalWorkflow ResolveRequestCancelExternalWorkflow
| PendingJobFireTimer FireTimer
| PendingWorkflowCancellation CancelWorkflow
applyJobs
:: Functor f
=> Vector WorkflowActivationJob
-> f (Await [ActivationResult] (SuspendableWorkflowExecution Payload))
-> InstanceM (Either SomeException (f (SuspendableWorkflowExecution Payload)))
applyJobs :: forall (f :: * -> *).
Functor f =>
Vector WorkflowActivationJob
-> f (Await
[ActivationResult] (SuspendableWorkflowExecution Payload))
-> InstanceM
(Either SomeException (f (SuspendableWorkflowExecution Payload)))
applyJobs Vector WorkflowActivationJob
jobs f (Await [ActivationResult] (SuspendableWorkflowExecution Payload))
fAwait = InstanceM (f (SuspendableWorkflowExecution Payload))
-> InstanceM
(Either SomeException (f (SuspendableWorkflowExecution Payload)))
forall (m :: * -> *) e a.
(MonadUnliftIO m, Exception e) =>
m a -> m (Either e a)
UnliftIO.try (InstanceM (f (SuspendableWorkflowExecution Payload))
-> InstanceM
(Either SomeException (f (SuspendableWorkflowExecution Payload))))
-> InstanceM (f (SuspendableWorkflowExecution Payload))
-> InstanceM
(Either SomeException (f (SuspendableWorkflowExecution Payload)))
forall a b. (a -> b) -> a -> b
$ do
$Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> InstanceM ()
(Text -> InstanceM ()) -> (Text -> Text) -> Text -> InstanceM ()
forall a. a -> a
forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> InstanceM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
logDebug (Text -> InstanceM ()) -> Text -> InstanceM ()
forall a b. (a -> b) -> a -> b
$ String -> Text
Text.pack (String
"Applying jobs: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Vector WorkflowActivationJob -> String
forall a. Show a => a -> String
show Vector WorkflowActivationJob
jobs)
let (InstanceM ()
patchNotifications, [Workflow ()]
signalWorkflows, InstanceM ()
queryWorkflows, [PendingJob]
resolutions, InstanceM ()
otherJobs) = (InstanceM (), [Workflow ()], InstanceM (), [PendingJob],
InstanceM ())
jobGroups
InstanceM ()
patchNotifications
InstanceM ()
queryWorkflows
InstanceM ()
otherJobs
activationResults <- [PendingJob] -> InstanceM [ActivationResult]
applyResolutions [PendingJob]
resolutions
let activations = [ActivationResult]
activationResults
pure $
( \(Await [ActivationResult] -> SuspendableWorkflowExecution Payload
wf) ->
case [ActivationResult]
activations of
[] -> case [Workflow ()]
signalWorkflows of
[] -> Await [ActivationResult] (SuspendableWorkflowExecution Payload)
-> SuspendableWorkflowExecution Payload
forall (m :: * -> *) (s :: * -> *) x.
Monad m =>
s (Coroutine s m x) -> Coroutine s m x
suspend (([ActivationResult] -> SuspendableWorkflowExecution Payload)
-> Await [ActivationResult] (SuspendableWorkflowExecution Payload)
forall x y. (x -> y) -> Await x y
Await [ActivationResult] -> SuspendableWorkflowExecution Payload
wf)
[Workflow ()]
sigs -> do
InstanceM () -> Coroutine (Await [ActivationResult]) InstanceM ()
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (InstanceM () -> Coroutine (Await [ActivationResult]) InstanceM ())
-> InstanceM ()
-> Coroutine (Await [ActivationResult]) InstanceM ()
forall a b. (a -> b) -> a -> b
$ $Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> InstanceM ()
(Text -> InstanceM ()) -> (Text -> Text) -> Text -> InstanceM ()
forall a. a -> a
forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> InstanceM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
logDebug Text
"We get signal"
InstanceM () -> Coroutine (Await [ActivationResult]) InstanceM ()
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift ((Workflow () -> InstanceM ()) -> [Workflow ()] -> InstanceM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Workflow () -> InstanceM ()
injectWorkflowSignal [Workflow ()]
sigs) Coroutine (Await [ActivationResult]) InstanceM ()
-> SuspendableWorkflowExecution Payload
-> SuspendableWorkflowExecution Payload
forall a b.
Coroutine (Await [ActivationResult]) InstanceM a
-> Coroutine (Await [ActivationResult]) InstanceM b
-> Coroutine (Await [ActivationResult]) InstanceM b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> [ActivationResult] -> SuspendableWorkflowExecution Payload
wf []
[ActivationResult]
_ -> case [Workflow ()]
signalWorkflows of
[] -> [ActivationResult] -> SuspendableWorkflowExecution Payload
wf [ActivationResult]
activations
[Workflow ()]
sigs -> InstanceM () -> Coroutine (Await [ActivationResult]) InstanceM ()
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift ((Workflow () -> InstanceM ()) -> [Workflow ()] -> InstanceM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Workflow () -> InstanceM ()
injectWorkflowSignal [Workflow ()]
sigs) Coroutine (Await [ActivationResult]) InstanceM ()
-> SuspendableWorkflowExecution Payload
-> SuspendableWorkflowExecution Payload
forall a b.
Coroutine (Await [ActivationResult]) InstanceM a
-> Coroutine (Await [ActivationResult]) InstanceM b
-> Coroutine (Await [ActivationResult]) InstanceM b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> [ActivationResult] -> SuspendableWorkflowExecution Payload
wf [ActivationResult]
activations
)
<$> fAwait
where
jobGroups :: (InstanceM (), [Workflow ()], InstanceM (), [PendingJob],
InstanceM ())
jobGroups =
(WorkflowActivationJob
-> (InstanceM (), [Workflow ()], InstanceM (), [PendingJob],
InstanceM ())
-> (InstanceM (), [Workflow ()], InstanceM (), [PendingJob],
InstanceM ()))
-> (InstanceM (), [Workflow ()], InstanceM (), [PendingJob],
InstanceM ())
-> Vector WorkflowActivationJob
-> (InstanceM (), [Workflow ()], InstanceM (), [PendingJob],
InstanceM ())
forall a b. (a -> b -> b) -> b -> Vector a -> b
V.foldr
( \WorkflowActivationJob
job tup :: (InstanceM (), [Workflow ()], InstanceM (), [PendingJob],
InstanceM ())
tup@(!InstanceM ()
patchNotifications, ![Workflow ()]
signalWorkflows, !InstanceM ()
queryWorkflows, ![PendingJob]
resolutions, !InstanceM ()
otherJobs) -> case WorkflowActivationJob
job WorkflowActivationJob
-> FoldLike
(Maybe WorkflowActivationJob'Variant)
WorkflowActivationJob
WorkflowActivationJob
(Maybe WorkflowActivationJob'Variant)
(Maybe WorkflowActivationJob'Variant)
-> Maybe WorkflowActivationJob'Variant
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
(Maybe WorkflowActivationJob'Variant)
WorkflowActivationJob
WorkflowActivationJob
(Maybe WorkflowActivationJob'Variant)
(Maybe WorkflowActivationJob'Variant)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'variant" a) =>
LensLike' f s a
Activation.maybe'variant of
Just (WorkflowActivationJob'NotifyHasPatch NotifyHasPatch
n) -> (NotifyHasPatch -> InstanceM ()
applyNotifyHasPatch NotifyHasPatch
n InstanceM () -> InstanceM () -> InstanceM ()
forall a b. InstanceM a -> InstanceM b -> InstanceM b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> InstanceM ()
patchNotifications, [Workflow ()]
signalWorkflows, InstanceM ()
queryWorkflows, [PendingJob]
resolutions, InstanceM ()
otherJobs)
Just (WorkflowActivationJob'SignalWorkflow SignalWorkflow
sig) -> (InstanceM ()
patchNotifications, SignalWorkflow -> Workflow ()
applySignalWorkflow SignalWorkflow
sig Workflow () -> [Workflow ()] -> [Workflow ()]
forall a. a -> [a] -> [a]
: [Workflow ()]
signalWorkflows, InstanceM ()
queryWorkflows, [PendingJob]
resolutions, InstanceM ()
otherJobs)
Just (WorkflowActivationJob'QueryWorkflow QueryWorkflow
q) -> (InstanceM ()
patchNotifications, [Workflow ()]
signalWorkflows, (?callStack::CallStack) => QueryWorkflow -> InstanceM ()
QueryWorkflow -> InstanceM ()
applyQueryWorkflow QueryWorkflow
q InstanceM () -> InstanceM () -> InstanceM ()
forall a b. InstanceM a -> InstanceM b -> InstanceM b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> InstanceM ()
queryWorkflows, [PendingJob]
resolutions, InstanceM ()
otherJobs)
Just (WorkflowActivationJob'FireTimer FireTimer
r) -> (InstanceM ()
patchNotifications, [Workflow ()]
signalWorkflows, InstanceM ()
queryWorkflows, FireTimer -> PendingJob
PendingJobFireTimer FireTimer
r PendingJob -> [PendingJob] -> [PendingJob]
forall a. a -> [a] -> [a]
: [PendingJob]
resolutions, InstanceM ()
otherJobs)
Just (WorkflowActivationJob'ResolveActivity ResolveActivity
r) -> (InstanceM ()
patchNotifications, [Workflow ()]
signalWorkflows, InstanceM ()
queryWorkflows, ResolveActivity -> PendingJob
PendingJobResolveActivity ResolveActivity
r PendingJob -> [PendingJob] -> [PendingJob]
forall a. a -> [a] -> [a]
: [PendingJob]
resolutions, InstanceM ()
otherJobs)
Just (WorkflowActivationJob'ResolveChildWorkflowExecutionStart ResolveChildWorkflowExecutionStart
r) -> (InstanceM ()
patchNotifications, [Workflow ()]
signalWorkflows, InstanceM ()
queryWorkflows, ResolveChildWorkflowExecutionStart -> PendingJob
PendingJobResolveChildWorkflowExecutionStart ResolveChildWorkflowExecutionStart
r PendingJob -> [PendingJob] -> [PendingJob]
forall a. a -> [a] -> [a]
: [PendingJob]
resolutions, InstanceM ()
otherJobs)
Just (WorkflowActivationJob'ResolveChildWorkflowExecution ResolveChildWorkflowExecution
r) -> (InstanceM ()
patchNotifications, [Workflow ()]
signalWorkflows, InstanceM ()
queryWorkflows, ResolveChildWorkflowExecution -> PendingJob
PendingJobResolveChildWorkflowExecution ResolveChildWorkflowExecution
r PendingJob -> [PendingJob] -> [PendingJob]
forall a. a -> [a] -> [a]
: [PendingJob]
resolutions, InstanceM ()
otherJobs)
Just (WorkflowActivationJob'ResolveSignalExternalWorkflow ResolveSignalExternalWorkflow
r) -> (InstanceM ()
patchNotifications, [Workflow ()]
signalWorkflows, InstanceM ()
queryWorkflows, ResolveSignalExternalWorkflow -> PendingJob
PendingJobResolveSignalExternalWorkflow ResolveSignalExternalWorkflow
r PendingJob -> [PendingJob] -> [PendingJob]
forall a. a -> [a] -> [a]
: [PendingJob]
resolutions, InstanceM ()
otherJobs)
Just (WorkflowActivationJob'ResolveRequestCancelExternalWorkflow ResolveRequestCancelExternalWorkflow
r) -> (InstanceM ()
patchNotifications, [Workflow ()]
signalWorkflows, InstanceM ()
queryWorkflows, ResolveRequestCancelExternalWorkflow -> PendingJob
PendingJobResolveRequestCancelExternalWorkflow ResolveRequestCancelExternalWorkflow
r PendingJob -> [PendingJob] -> [PendingJob]
forall a. a -> [a] -> [a]
: [PendingJob]
resolutions, InstanceM ()
otherJobs)
Just (WorkflowActivationJob'UpdateRandomSeed UpdateRandomSeed
updateRandomSeed) -> (InstanceM ()
patchNotifications, [Workflow ()]
signalWorkflows, InstanceM ()
queryWorkflows, [PendingJob]
resolutions, InstanceM ()
otherJobs InstanceM () -> InstanceM () -> InstanceM ()
forall a b. InstanceM a -> InstanceM b -> InstanceM b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> UpdateRandomSeed -> InstanceM ()
applyUpdateRandomSeed UpdateRandomSeed
updateRandomSeed)
Just (WorkflowActivationJob'CancelWorkflow CancelWorkflow
cancelWorkflow) -> (InstanceM ()
patchNotifications, [Workflow ()]
signalWorkflows, InstanceM ()
queryWorkflows, CancelWorkflow -> PendingJob
PendingWorkflowCancellation CancelWorkflow
cancelWorkflow PendingJob -> [PendingJob] -> [PendingJob]
forall a. a -> [a] -> [a]
: [PendingJob]
resolutions, InstanceM ()
otherJobs)
Just (WorkflowActivationJob'StartWorkflow StartWorkflow
_startWorkflow) -> (InstanceM (), [Workflow ()], InstanceM (), [PendingJob],
InstanceM ())
tup
Just (WorkflowActivationJob'RemoveFromCache RemoveFromCache
_removeFromCache) -> (InstanceM (), [Workflow ()], InstanceM (), [PendingJob],
InstanceM ())
tup
Just (WorkflowActivationJob'DoUpdate DoUpdate
_) -> String
-> (InstanceM (), [Workflow ()], InstanceM (), [PendingJob],
InstanceM ())
forall a. (?callStack::CallStack) => String -> a
error String
"DoUpdate not yet implemented"
Maybe WorkflowActivationJob'Variant
Nothing -> RuntimeError
-> (InstanceM (), [Workflow ()], InstanceM (), [PendingJob],
InstanceM ())
forall a e. (?callStack::CallStack, Exception e) => e -> a
E.throw (RuntimeError
-> (InstanceM (), [Workflow ()], InstanceM (), [PendingJob],
InstanceM ()))
-> RuntimeError
-> (InstanceM (), [Workflow ()], InstanceM (), [PendingJob],
InstanceM ())
forall a b. (a -> b) -> a -> b
$ String -> RuntimeError
RuntimeError String
"Uncrecognized workflow activation job variant"
)
(() -> InstanceM ()
forall a. a -> InstanceM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (), [], () -> InstanceM ()
forall a. a -> InstanceM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (), [], () -> InstanceM ()
forall a. a -> InstanceM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
Vector WorkflowActivationJob
jobs
applyResolutions :: [PendingJob] -> InstanceM [ActivationResult]
applyResolutions :: [PendingJob] -> InstanceM [ActivationResult]
applyResolutions [] = [ActivationResult] -> InstanceM [ActivationResult]
forall a. a -> InstanceM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
applyResolutions [PendingJob]
rs = do
inst <- InstanceM WorkflowInstance
forall r (m :: * -> *). MonadReader r m => m r
ask
atomically $ do
sequenceMaps <- readTVar inst.workflowSequenceMaps
let makeCompletion :: ([ActivationResult], SequenceMaps) -> PendingJob -> ([ActivationResult], SequenceMaps)
makeCompletion (![ActivationResult]
completions, !SequenceMaps
sequenceMaps') PendingJob
pj = case PendingJob
pj of
PendingJobResolveActivity ResolveActivity
msg -> do
let existingIVar :: Maybe (IVar ResolveActivity)
existingIVar = Sequence
-> HashMap Sequence (IVar ResolveActivity)
-> Maybe (IVar ResolveActivity)
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup (ResolveActivity
msg ResolveActivity
-> FoldLike Sequence ResolveActivity ResolveActivity Sequence Any
-> Sequence
forall s a t b. s -> FoldLike a s t a b -> a
^. LensLike' (Constant Sequence) ResolveActivity Word32
forall (f :: * -> *) s a.
(Functor f, HasField s "seq" a) =>
LensLike' f s a
Activation.seq LensLike' (Constant Sequence) ResolveActivity Word32
-> ((Sequence -> Constant Sequence Any)
-> Word32 -> Constant Sequence Word32)
-> FoldLike Sequence ResolveActivity ResolveActivity Sequence Any
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Word32 -> Sequence) -> Getter Word32 Word32 Sequence Any
forall s a t b. (s -> a) -> Getter s t a b
to Word32 -> Sequence
Sequence) SequenceMaps
sequenceMaps'.activities
case Maybe (IVar ResolveActivity)
existingIVar of
Maybe (IVar ResolveActivity)
Nothing ->
RuntimeError -> ([ActivationResult], SequenceMaps)
forall a e. (?callStack::CallStack, Exception e) => e -> a
E.throw (RuntimeError -> ([ActivationResult], SequenceMaps))
-> RuntimeError -> ([ActivationResult], SequenceMaps)
forall a b. (a -> b) -> a -> b
$ String -> RuntimeError
RuntimeError String
"Activity handle not found"
Just IVar ResolveActivity
existing ->
( ResultVal ResolveActivity
-> IVar ResolveActivity -> ActivationResult
forall a. ResultVal a -> IVar a -> ActivationResult
ActivationResult (ResolveActivity -> ResultVal ResolveActivity
forall a. a -> ResultVal a
Ok ResolveActivity
msg) IVar ResolveActivity
existing ActivationResult -> [ActivationResult] -> [ActivationResult]
forall a. a -> [a] -> [a]
: [ActivationResult]
completions
, SequenceMaps
sequenceMaps'
{ activities = HashMap.delete (msg ^. Activation.seq . to Sequence) sequenceMaps'.activities
}
)
PendingJobResolveChildWorkflowExecutionStart ResolveChildWorkflowExecutionStart
msg -> do
let existingHandle :: Maybe SomeChildWorkflowHandle
existingHandle = Sequence
-> HashMap Sequence SomeChildWorkflowHandle
-> Maybe SomeChildWorkflowHandle
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup (ResolveChildWorkflowExecutionStart
msg ResolveChildWorkflowExecutionStart
-> FoldLike
Sequence
ResolveChildWorkflowExecutionStart
ResolveChildWorkflowExecutionStart
Sequence
Any
-> Sequence
forall s a t b. s -> FoldLike a s t a b -> a
^. LensLike'
(Constant Sequence) ResolveChildWorkflowExecutionStart Word32
forall (f :: * -> *) s a.
(Functor f, HasField s "seq" a) =>
LensLike' f s a
Activation.seq LensLike'
(Constant Sequence) ResolveChildWorkflowExecutionStart Word32
-> ((Sequence -> Constant Sequence Any)
-> Word32 -> Constant Sequence Word32)
-> FoldLike
Sequence
ResolveChildWorkflowExecutionStart
ResolveChildWorkflowExecutionStart
Sequence
Any
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Word32 -> Sequence) -> Getter Word32 Word32 Sequence Any
forall s a t b. (s -> a) -> Getter s t a b
to Word32 -> Sequence
Sequence) SequenceMaps
sequenceMaps'.childWorkflows
case Maybe SomeChildWorkflowHandle
existingHandle of
Maybe SomeChildWorkflowHandle
Nothing ->
RuntimeError -> ([ActivationResult], SequenceMaps)
forall a e. (?callStack::CallStack, Exception e) => e -> a
E.throw (RuntimeError -> ([ActivationResult], SequenceMaps))
-> RuntimeError -> ([ActivationResult], SequenceMaps)
forall a b. (a -> b) -> a -> b
$ String -> RuntimeError
RuntimeError String
"Child workflow not found"
Just (SomeChildWorkflowHandle ChildWorkflowHandle result
existing) -> case ResolveChildWorkflowExecutionStart
msg ResolveChildWorkflowExecutionStart
-> FoldLike
(Maybe ResolveChildWorkflowExecutionStart'Status)
ResolveChildWorkflowExecutionStart
ResolveChildWorkflowExecutionStart
(Maybe ResolveChildWorkflowExecutionStart'Status)
(Maybe ResolveChildWorkflowExecutionStart'Status)
-> Maybe ResolveChildWorkflowExecutionStart'Status
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
(Maybe ResolveChildWorkflowExecutionStart'Status)
ResolveChildWorkflowExecutionStart
ResolveChildWorkflowExecutionStart
(Maybe ResolveChildWorkflowExecutionStart'Status)
(Maybe ResolveChildWorkflowExecutionStart'Status)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'status" a) =>
LensLike' f s a
Activation.maybe'status of
Maybe ResolveChildWorkflowExecutionStart'Status
Nothing ->
RuntimeError -> ([ActivationResult], SequenceMaps)
forall a e. (?callStack::CallStack, Exception e) => e -> a
E.throw (RuntimeError -> ([ActivationResult], SequenceMaps))
-> RuntimeError -> ([ActivationResult], SequenceMaps)
forall a b. (a -> b) -> a -> b
$ String -> RuntimeError
RuntimeError String
"Child workflow start did not have a known status"
Just ResolveChildWorkflowExecutionStart'Status
status -> case ResolveChildWorkflowExecutionStart'Status
status of
ResolveChildWorkflowExecutionStart'Succeeded ResolveChildWorkflowExecutionStartSuccess
succeeded ->
( ResultVal () -> IVar () -> ActivationResult
forall a. ResultVal a -> IVar a -> ActivationResult
ActivationResult (() -> ResultVal ()
forall a. a -> ResultVal a
Ok ()) ChildWorkflowHandle result
existing.startHandle
ActivationResult -> [ActivationResult] -> [ActivationResult]
forall a. a -> [a] -> [a]
: ResultVal RunId -> IVar RunId -> ActivationResult
forall a. ResultVal a -> IVar a -> ActivationResult
ActivationResult (RunId -> ResultVal RunId
forall a. a -> ResultVal a
Ok (ResolveChildWorkflowExecutionStartSuccess
succeeded ResolveChildWorkflowExecutionStartSuccess
-> FoldLike
RunId
ResolveChildWorkflowExecutionStartSuccess
ResolveChildWorkflowExecutionStartSuccess
RunId
Any
-> RunId
forall s a t b. s -> FoldLike a s t a b -> a
^. LensLike'
(Constant RunId) ResolveChildWorkflowExecutionStartSuccess Text
forall (f :: * -> *) s a.
(Functor f, HasField s "runId" a) =>
LensLike' f s a
Activation.runId LensLike'
(Constant RunId) ResolveChildWorkflowExecutionStartSuccess Text
-> ((RunId -> Constant RunId Any) -> Text -> Constant RunId Text)
-> FoldLike
RunId
ResolveChildWorkflowExecutionStartSuccess
ResolveChildWorkflowExecutionStartSuccess
RunId
Any
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Text -> RunId) -> Getter Text Text RunId Any
forall s a t b. (s -> a) -> Getter s t a b
to Text -> RunId
RunId)) ChildWorkflowHandle result
existing.firstExecutionRunId
ActivationResult -> [ActivationResult] -> [ActivationResult]
forall a. a -> [a] -> [a]
: [ActivationResult]
completions
, SequenceMaps
sequenceMaps'
)
ResolveChildWorkflowExecutionStart'Failed ResolveChildWorkflowExecutionStartFailure
failed ->
let updatedMaps :: SequenceMaps
updatedMaps =
SequenceMaps
sequenceMaps'
{ childWorkflows = HashMap.delete (msg ^. Activation.seq . to Sequence) sequenceMaps'.childWorkflows
}
in case ResolveChildWorkflowExecutionStartFailure
failed ResolveChildWorkflowExecutionStartFailure
-> FoldLike
StartChildWorkflowExecutionFailedCause
ResolveChildWorkflowExecutionStartFailure
ResolveChildWorkflowExecutionStartFailure
StartChildWorkflowExecutionFailedCause
StartChildWorkflowExecutionFailedCause
-> StartChildWorkflowExecutionFailedCause
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
StartChildWorkflowExecutionFailedCause
ResolveChildWorkflowExecutionStartFailure
ResolveChildWorkflowExecutionStartFailure
StartChildWorkflowExecutionFailedCause
StartChildWorkflowExecutionFailedCause
forall (f :: * -> *) s a.
(Functor f, HasField s "cause" a) =>
LensLike' f s a
Activation.cause of
StartChildWorkflowExecutionFailedCause
START_CHILD_WORKFLOW_EXECUTION_FAILED_CAUSE_WORKFLOW_ALREADY_EXISTS ->
let failure :: forall v. ResultVal v
failure :: forall v. ResultVal v
failure =
SomeException -> ResultVal v
forall a. SomeException -> ResultVal a
ThrowWorkflow (SomeException -> ResultVal v) -> SomeException -> ResultVal v
forall a b. (a -> b) -> a -> b
$
WorkflowAlreadyStarted -> SomeException
forall e. Exception e => e -> SomeException
toException (WorkflowAlreadyStarted -> SomeException)
-> WorkflowAlreadyStarted -> SomeException
forall a b. (a -> b) -> a -> b
$
WorkflowAlreadyStarted
{ workflowAlreadyStartedWorkflowId :: WorkflowId
workflowAlreadyStartedWorkflowId = Text -> WorkflowId
WorkflowId (ResolveChildWorkflowExecutionStartFailure
failed ResolveChildWorkflowExecutionStartFailure
-> FoldLike
Text
ResolveChildWorkflowExecutionStartFailure
ResolveChildWorkflowExecutionStartFailure
Text
Text
-> Text
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
Text
ResolveChildWorkflowExecutionStartFailure
ResolveChildWorkflowExecutionStartFailure
Text
Text
forall (f :: * -> *) s a.
(Functor f, HasField s "workflowId" a) =>
LensLike' f s a
Activation.workflowId)
, workflowAlreadyStartedWorkflowType :: WorkflowType
workflowAlreadyStartedWorkflowType = Text -> WorkflowType
WorkflowType (ResolveChildWorkflowExecutionStartFailure
failed ResolveChildWorkflowExecutionStartFailure
-> FoldLike
Text
ResolveChildWorkflowExecutionStartFailure
ResolveChildWorkflowExecutionStartFailure
Text
Text
-> Text
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
Text
ResolveChildWorkflowExecutionStartFailure
ResolveChildWorkflowExecutionStartFailure
Text
Text
forall (f :: * -> *) s a.
(Functor f, HasField s "workflowType" a) =>
LensLike' f s a
Activation.workflowType)
}
in ( ResultVal () -> IVar () -> ActivationResult
forall a. ResultVal a -> IVar a -> ActivationResult
ActivationResult ResultVal ()
forall v. ResultVal v
failure ChildWorkflowHandle result
existing.startHandle
ActivationResult -> [ActivationResult] -> [ActivationResult]
forall a. a -> [a] -> [a]
: ResultVal RunId -> IVar RunId -> ActivationResult
forall a. ResultVal a -> IVar a -> ActivationResult
ActivationResult ResultVal RunId
forall v. ResultVal v
failure ChildWorkflowHandle result
existing.firstExecutionRunId
ActivationResult -> [ActivationResult] -> [ActivationResult]
forall a. a -> [a] -> [a]
: [ActivationResult]
completions
, SequenceMaps
updatedMaps
)
StartChildWorkflowExecutionFailedCause
_ ->
( ResultVal () -> IVar () -> ActivationResult
forall a. ResultVal a -> IVar a -> ActivationResult
ActivationResult
(SomeException -> ResultVal ()
forall a. SomeException -> ResultVal a
ThrowInternal (SomeException -> ResultVal ()) -> SomeException -> ResultVal ()
forall a b. (a -> b) -> a -> b
$ RuntimeError -> SomeException
forall e. Exception e => e -> SomeException
toException (RuntimeError -> SomeException) -> RuntimeError -> SomeException
forall a b. (a -> b) -> a -> b
$ String -> RuntimeError
RuntimeError (String
"Unknown child workflow start failure: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> StartChildWorkflowExecutionFailedCause -> String
forall a. Show a => a -> String
show (ResolveChildWorkflowExecutionStartFailure
failed ResolveChildWorkflowExecutionStartFailure
-> FoldLike
StartChildWorkflowExecutionFailedCause
ResolveChildWorkflowExecutionStartFailure
ResolveChildWorkflowExecutionStartFailure
StartChildWorkflowExecutionFailedCause
StartChildWorkflowExecutionFailedCause
-> StartChildWorkflowExecutionFailedCause
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
StartChildWorkflowExecutionFailedCause
ResolveChildWorkflowExecutionStartFailure
ResolveChildWorkflowExecutionStartFailure
StartChildWorkflowExecutionFailedCause
StartChildWorkflowExecutionFailedCause
forall (f :: * -> *) s a.
(Functor f, HasField s "cause" a) =>
LensLike' f s a
Activation.cause)))
ChildWorkflowHandle result
existing.startHandle
ActivationResult -> [ActivationResult] -> [ActivationResult]
forall a. a -> [a] -> [a]
: [ActivationResult]
completions
, SequenceMaps
updatedMaps
)
ResolveChildWorkflowExecutionStart'Cancelled ResolveChildWorkflowExecutionStartCancelled
_cancelled ->
( ResultVal () -> IVar () -> ActivationResult
forall a. ResultVal a -> IVar a -> ActivationResult
ActivationResult (() -> ResultVal ()
forall a. a -> ResultVal a
Ok ()) ChildWorkflowHandle result
existing.startHandle
ActivationResult -> [ActivationResult] -> [ActivationResult]
forall a. a -> [a] -> [a]
: ResultVal RunId -> IVar RunId -> ActivationResult
forall a. ResultVal a -> IVar a -> ActivationResult
ActivationResult (SomeException -> ResultVal RunId
forall a. SomeException -> ResultVal a
ThrowWorkflow (SomeException -> ResultVal RunId)
-> SomeException -> ResultVal RunId
forall a b. (a -> b) -> a -> b
$ ChildWorkflowCancelled -> SomeException
forall e. Exception e => e -> SomeException
toException ChildWorkflowCancelled
ChildWorkflowCancelled) ChildWorkflowHandle result
existing.firstExecutionRunId
ActivationResult -> [ActivationResult] -> [ActivationResult]
forall a. a -> [a] -> [a]
: ResultVal ResolveChildWorkflowExecution
-> IVar ResolveChildWorkflowExecution -> ActivationResult
forall a. ResultVal a -> IVar a -> ActivationResult
ActivationResult (SomeException -> ResultVal ResolveChildWorkflowExecution
forall a. SomeException -> ResultVal a
ThrowWorkflow (SomeException -> ResultVal ResolveChildWorkflowExecution)
-> SomeException -> ResultVal ResolveChildWorkflowExecution
forall a b. (a -> b) -> a -> b
$ ChildWorkflowCancelled -> SomeException
forall e. Exception e => e -> SomeException
toException ChildWorkflowCancelled
ChildWorkflowCancelled) ChildWorkflowHandle result
existing.resultHandle
ActivationResult -> [ActivationResult] -> [ActivationResult]
forall a. a -> [a] -> [a]
: [ActivationResult]
completions
, SequenceMaps
sequenceMaps'
{ childWorkflows = HashMap.delete (msg ^. Activation.seq . to Sequence) sequenceMaps'.childWorkflows
}
)
PendingWorkflowCancellation CancelWorkflow
_ -> (ResultVal () -> IVar () -> ActivationResult
forall a. ResultVal a -> IVar a -> ActivationResult
ActivationResult (() -> ResultVal ()
forall a. a -> ResultVal a
Ok ()) WorkflowInstance
inst.workflowCancellationVar ActivationResult -> [ActivationResult] -> [ActivationResult]
forall a. a -> [a] -> [a]
: [ActivationResult]
completions, SequenceMaps
sequenceMaps')
PendingJobResolveChildWorkflowExecution ResolveChildWorkflowExecution
msg -> do
let existingHandle :: Maybe SomeChildWorkflowHandle
existingHandle = Sequence
-> HashMap Sequence SomeChildWorkflowHandle
-> Maybe SomeChildWorkflowHandle
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup (ResolveChildWorkflowExecution
msg ResolveChildWorkflowExecution
-> FoldLike
Sequence
ResolveChildWorkflowExecution
ResolveChildWorkflowExecution
Sequence
Any
-> Sequence
forall s a t b. s -> FoldLike a s t a b -> a
^. LensLike' (Constant Sequence) ResolveChildWorkflowExecution Word32
forall (f :: * -> *) s a.
(Functor f, HasField s "seq" a) =>
LensLike' f s a
Activation.seq LensLike' (Constant Sequence) ResolveChildWorkflowExecution Word32
-> ((Sequence -> Constant Sequence Any)
-> Word32 -> Constant Sequence Word32)
-> FoldLike
Sequence
ResolveChildWorkflowExecution
ResolveChildWorkflowExecution
Sequence
Any
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Word32 -> Sequence) -> Getter Word32 Word32 Sequence Any
forall s a t b. (s -> a) -> Getter s t a b
to Word32 -> Sequence
Sequence) SequenceMaps
sequenceMaps'.childWorkflows
case Maybe SomeChildWorkflowHandle
existingHandle of
Maybe SomeChildWorkflowHandle
Nothing -> RuntimeError -> ([ActivationResult], SequenceMaps)
forall a e. (?callStack::CallStack, Exception e) => e -> a
E.throw (RuntimeError -> ([ActivationResult], SequenceMaps))
-> RuntimeError -> ([ActivationResult], SequenceMaps)
forall a b. (a -> b) -> a -> b
$ String -> RuntimeError
RuntimeError String
"Child Workflow Execution not found"
Just (SomeChildWorkflowHandle ChildWorkflowHandle result
h) ->
( ResultVal ResolveChildWorkflowExecution
-> IVar ResolveChildWorkflowExecution -> ActivationResult
forall a. ResultVal a -> IVar a -> ActivationResult
ActivationResult (ResolveChildWorkflowExecution
-> ResultVal ResolveChildWorkflowExecution
forall a. a -> ResultVal a
Ok ResolveChildWorkflowExecution
msg) ChildWorkflowHandle result
h.resultHandle ActivationResult -> [ActivationResult] -> [ActivationResult]
forall a. a -> [a] -> [a]
: [ActivationResult]
completions
, SequenceMaps
sequenceMaps'
{ childWorkflows = HashMap.delete (msg ^. Activation.seq . to Sequence) sequenceMaps'.childWorkflows
}
)
PendingJobResolveSignalExternalWorkflow ResolveSignalExternalWorkflow
msg -> do
let mresVar :: Maybe (IVar ResolveSignalExternalWorkflow)
mresVar = Sequence
-> HashMap Sequence (IVar ResolveSignalExternalWorkflow)
-> Maybe (IVar ResolveSignalExternalWorkflow)
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup (ResolveSignalExternalWorkflow
msg ResolveSignalExternalWorkflow
-> FoldLike
Sequence
ResolveSignalExternalWorkflow
ResolveSignalExternalWorkflow
Sequence
Any
-> Sequence
forall s a t b. s -> FoldLike a s t a b -> a
^. LensLike' (Constant Sequence) ResolveSignalExternalWorkflow Word32
forall (f :: * -> *) s a.
(Functor f, HasField s "seq" a) =>
LensLike' f s a
Activation.seq LensLike' (Constant Sequence) ResolveSignalExternalWorkflow Word32
-> ((Sequence -> Constant Sequence Any)
-> Word32 -> Constant Sequence Word32)
-> FoldLike
Sequence
ResolveSignalExternalWorkflow
ResolveSignalExternalWorkflow
Sequence
Any
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Word32 -> Sequence) -> Getter Word32 Word32 Sequence Any
forall s a t b. (s -> a) -> Getter s t a b
to Word32 -> Sequence
Sequence) SequenceMaps
sequenceMaps'.externalSignals
case Maybe (IVar ResolveSignalExternalWorkflow)
mresVar of
Maybe (IVar ResolveSignalExternalWorkflow)
Nothing -> RuntimeError -> ([ActivationResult], SequenceMaps)
forall a e. (?callStack::CallStack, Exception e) => e -> a
E.throw (RuntimeError -> ([ActivationResult], SequenceMaps))
-> RuntimeError -> ([ActivationResult], SequenceMaps)
forall a b. (a -> b) -> a -> b
$ String -> RuntimeError
RuntimeError String
"External Signal IVar for sequence not found"
Just IVar ResolveSignalExternalWorkflow
resVar ->
( ResultVal ResolveSignalExternalWorkflow
-> IVar ResolveSignalExternalWorkflow -> ActivationResult
forall a. ResultVal a -> IVar a -> ActivationResult
ActivationResult (ResolveSignalExternalWorkflow
-> ResultVal ResolveSignalExternalWorkflow
forall a. a -> ResultVal a
Ok ResolveSignalExternalWorkflow
msg) IVar ResolveSignalExternalWorkflow
resVar ActivationResult -> [ActivationResult] -> [ActivationResult]
forall a. a -> [a] -> [a]
: [ActivationResult]
completions
, SequenceMaps
sequenceMaps'
{ externalSignals = HashMap.delete (msg ^. Activation.seq . to Sequence) sequenceMaps'.externalSignals
}
)
PendingJobResolveRequestCancelExternalWorkflow ResolveRequestCancelExternalWorkflow
msg -> do
let mresVar :: Maybe (IVar ResolveRequestCancelExternalWorkflow)
mresVar = Sequence
-> HashMap Sequence (IVar ResolveRequestCancelExternalWorkflow)
-> Maybe (IVar ResolveRequestCancelExternalWorkflow)
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup (ResolveRequestCancelExternalWorkflow
msg ResolveRequestCancelExternalWorkflow
-> FoldLike
Sequence
ResolveRequestCancelExternalWorkflow
ResolveRequestCancelExternalWorkflow
Sequence
Any
-> Sequence
forall s a t b. s -> FoldLike a s t a b -> a
^. LensLike'
(Constant Sequence) ResolveRequestCancelExternalWorkflow Word32
forall (f :: * -> *) s a.
(Functor f, HasField s "seq" a) =>
LensLike' f s a
Activation.seq LensLike'
(Constant Sequence) ResolveRequestCancelExternalWorkflow Word32
-> ((Sequence -> Constant Sequence Any)
-> Word32 -> Constant Sequence Word32)
-> FoldLike
Sequence
ResolveRequestCancelExternalWorkflow
ResolveRequestCancelExternalWorkflow
Sequence
Any
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Word32 -> Sequence) -> Getter Word32 Word32 Sequence Any
forall s a t b. (s -> a) -> Getter s t a b
to Word32 -> Sequence
Sequence) SequenceMaps
sequenceMaps'.externalCancels
case Maybe (IVar ResolveRequestCancelExternalWorkflow)
mresVar of
Maybe (IVar ResolveRequestCancelExternalWorkflow)
Nothing -> RuntimeError -> ([ActivationResult], SequenceMaps)
forall a e. (?callStack::CallStack, Exception e) => e -> a
E.throw (RuntimeError -> ([ActivationResult], SequenceMaps))
-> RuntimeError -> ([ActivationResult], SequenceMaps)
forall a b. (a -> b) -> a -> b
$ String -> RuntimeError
RuntimeError String
"External Cancel IVar for sequence not found"
Just IVar ResolveRequestCancelExternalWorkflow
resVar ->
( ResultVal ResolveRequestCancelExternalWorkflow
-> IVar ResolveRequestCancelExternalWorkflow -> ActivationResult
forall a. ResultVal a -> IVar a -> ActivationResult
ActivationResult (ResolveRequestCancelExternalWorkflow
-> ResultVal ResolveRequestCancelExternalWorkflow
forall a. a -> ResultVal a
Ok ResolveRequestCancelExternalWorkflow
msg) IVar ResolveRequestCancelExternalWorkflow
resVar ActivationResult -> [ActivationResult] -> [ActivationResult]
forall a. a -> [a] -> [a]
: [ActivationResult]
completions
, SequenceMaps
sequenceMaps'
{ externalCancels = HashMap.delete (msg ^. Activation.seq . to Sequence) sequenceMaps'.externalCancels
}
)
PendingJobFireTimer FireTimer
msg -> do
let existingIVar :: Maybe (IVar ())
existingIVar = Sequence -> HashMap Sequence (IVar ()) -> Maybe (IVar ())
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup (FireTimer
msg FireTimer
-> FoldLike Sequence FireTimer FireTimer Sequence Any -> Sequence
forall s a t b. s -> FoldLike a s t a b -> a
^. LensLike' (Constant Sequence) FireTimer Word32
forall (f :: * -> *) s a.
(Functor f, HasField s "seq" a) =>
LensLike' f s a
Activation.seq LensLike' (Constant Sequence) FireTimer Word32
-> ((Sequence -> Constant Sequence Any)
-> Word32 -> Constant Sequence Word32)
-> FoldLike Sequence FireTimer FireTimer Sequence Any
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Word32 -> Sequence) -> Getter Word32 Word32 Sequence Any
forall s a t b. (s -> a) -> Getter s t a b
to Word32 -> Sequence
Sequence) SequenceMaps
sequenceMaps'.timers
case Maybe (IVar ())
existingIVar of
Maybe (IVar ())
Nothing -> RuntimeError -> ([ActivationResult], SequenceMaps)
forall a e. (?callStack::CallStack, Exception e) => e -> a
E.throw (RuntimeError -> ([ActivationResult], SequenceMaps))
-> RuntimeError -> ([ActivationResult], SequenceMaps)
forall a b. (a -> b) -> a -> b
$ String -> RuntimeError
RuntimeError String
"Timer not found"
Just IVar ()
existing ->
( ResultVal () -> IVar () -> ActivationResult
forall a. ResultVal a -> IVar a -> ActivationResult
ActivationResult (() -> ResultVal ()
forall a. a -> ResultVal a
Ok ()) IVar ()
existing ActivationResult -> [ActivationResult] -> [ActivationResult]
forall a. a -> [a] -> [a]
: [ActivationResult]
completions
, SequenceMaps
sequenceMaps'
{ timers = HashMap.delete (msg ^. Activation.seq . to Sequence) sequenceMaps'.timers
}
)
let (newCompletions, updatedSequenceMaps) = foldl' makeCompletion ([], sequenceMaps) rs
writeTVar inst.workflowSequenceMaps updatedSequenceMaps
pure newCompletions
convertExitVariantToCommand :: WorkflowExitVariant Payload -> InstanceM Command.WorkflowCommand
convertExitVariantToCommand :: WorkflowExitVariant Payload -> InstanceM WorkflowCommand
convertExitVariantToCommand WorkflowExitVariant Payload
variant = do
inst <- InstanceM WorkflowInstance
forall r (m :: * -> *). MonadReader r m => m r
ask
let processor = WorkflowInstance
inst.payloadProcessor
case variant of
WorkflowExitSuccess Payload
result -> do
result' <- IO Payload -> InstanceM Payload
forall a. IO a -> InstanceM a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Payload -> InstanceM Payload)
-> IO Payload -> InstanceM Payload
forall a b. (a -> b) -> a -> b
$ PayloadProcessor -> Payload -> IO Payload
payloadProcessorEncode PayloadProcessor
processor Payload
result
pure $
defMessage
& Command.completeWorkflowExecution .~ (defMessage & Command.result .~ convertToProtoPayload result')
WorkflowExitContinuedAsNew (ContinueAsNewException {Vector Payload
WorkflowType
ContinueAsNewOptions
continueAsNewWorkflowType :: WorkflowType
continueAsNewArguments :: Vector Payload
continueAsNewOptions :: ContinueAsNewOptions
continueAsNewOptions :: ContinueAsNewException -> ContinueAsNewOptions
continueAsNewArguments :: ContinueAsNewException -> Vector Payload
continueAsNewWorkflowType :: ContinueAsNewException -> WorkflowType
..}) -> do
i <- IORef Info -> InstanceM Info
forall (m :: * -> *) a. MonadIO m => IORef a -> m a
readIORef WorkflowInstance
inst.workflowInstanceInfo
searchAttrs <-
liftIO $
searchAttributesToProto
( if continueAsNewOptions.searchAttributes == mempty
then i.searchAttributes
else continueAsNewOptions.searchAttributes
)
args <- processorEncodePayloads processor continueAsNewArguments
hdrs <- processorEncodePayloads processor continueAsNewOptions.headers
memo <- processorEncodePayloads processor continueAsNewOptions.memo
pure $
defMessage
& Command.continueAsNewWorkflowExecution
.~ ( defMessage
& Command.workflowType .~ rawWorkflowType continueAsNewWorkflowType
& Command.taskQueue .~ maybe "" rawTaskQueue continueAsNewOptions.taskQueue
& Command.vec'arguments .~ fmap convertToProtoPayload args
& Command.maybe'retryPolicy .~ (retryPolicyToProto <$> continueAsNewOptions.retryPolicy)
& Command.searchAttributes .~ searchAttrs
& Command.headers .~ fmap convertToProtoPayload hdrs
& Command.memo .~ fmap convertToProtoPayload memo
& Command.maybe'workflowTaskTimeout .~ (durationToProto <$> continueAsNewOptions.taskTimeout)
& Command.maybe'workflowRunTimeout .~ (durationToProto <$> continueAsNewOptions.runTimeout)
)
WorkflowExitCancelled WorkflowCancelRequested
WorkflowCancelRequested -> do
WorkflowCommand -> InstanceM WorkflowCommand
forall a. a -> InstanceM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (WorkflowCommand -> InstanceM WorkflowCommand)
-> WorkflowCommand -> InstanceM WorkflowCommand
forall a b. (a -> b) -> a -> b
$ WorkflowCommand
forall msg. Message msg => msg
defMessage WorkflowCommand
-> (WorkflowCommand -> WorkflowCommand) -> WorkflowCommand
forall s t. s -> (s -> t) -> t
& LensLike' f WorkflowCommand CancelWorkflowExecution
forall {f :: * -> *}.
Identical f =>
LensLike' f WorkflowCommand CancelWorkflowExecution
forall (f :: * -> *) s a.
(Functor f, HasField s "cancelWorkflowExecution" a) =>
LensLike' f s a
Command.cancelWorkflowExecution (forall {f :: * -> *}.
Identical f =>
LensLike' f WorkflowCommand CancelWorkflowExecution)
-> CancelWorkflowExecution -> WorkflowCommand -> WorkflowCommand
forall s t a b. Setter s t a b -> b -> s -> t
.~ CancelWorkflowExecution
forall msg. Message msg => msg
defMessage
WorkflowExitFailed SomeException
e | Just (ActivityFailure
actFailure :: ActivityFailure) <- SomeException -> Maybe ActivityFailure
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e -> do
let appFailure :: ApplicationFailure
appFailure = ActivityFailure
actFailure.cause
enrichedApplicationFailure :: Failure
enrichedApplicationFailure =
Failure
forall msg. Message msg => msg
defMessage
Failure -> (Failure -> Failure) -> Failure
forall s t. s -> (s -> t) -> t
& LensLike' f Failure Text
forall {f :: * -> *}. Identical f => LensLike' f Failure Text
forall (f :: * -> *) s a.
(Functor f, HasField s "message" a) =>
LensLike' f s a
F.message (forall {f :: * -> *}. Identical f => LensLike' f Failure Text)
-> Text -> Failure -> Failure
forall s t a b. Setter s t a b -> b -> s -> t
.~ ActivityFailure
actFailure.message
Failure -> (Failure -> Failure) -> Failure
forall s t. s -> (s -> t) -> t
& LensLike' f Failure Text
forall {f :: * -> *}. Identical f => LensLike' f Failure Text
forall (f :: * -> *) s a.
(Functor f, HasField s "source" a) =>
LensLike' f s a
F.source (forall {f :: * -> *}. Identical f => LensLike' f Failure Text)
-> Text -> Failure -> Failure
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
"hs-temporal-sdk"
Failure -> (Failure -> Failure) -> Failure
forall s t. s -> (s -> t) -> t
& LensLike' f Failure ActivityFailureInfo
forall {f :: * -> *}.
Identical f =>
LensLike' f Failure ActivityFailureInfo
forall (f :: * -> *) s a.
(Functor f, HasField s "activityFailureInfo" a) =>
LensLike' f s a
F.activityFailureInfo (forall {f :: * -> *}.
Identical f =>
LensLike' f Failure ActivityFailureInfo)
-> ActivityFailureInfo -> Failure -> Failure
forall s t a b. Setter s t a b -> b -> s -> t
.~ ActivityFailure
actFailure.original
Failure -> (Failure -> Failure) -> Failure
forall s t. s -> (s -> t) -> t
& LensLike' f Failure Text
forall {f :: * -> *}. Identical f => LensLike' f Failure Text
forall (f :: * -> *) s a.
(Functor f, HasField s "stackTrace" a) =>
LensLike' f s a
F.stackTrace (forall {f :: * -> *}. Identical f => LensLike' f Failure Text)
-> Text -> Failure -> Failure
forall s t a b. Setter s t a b -> b -> s -> t
.~ ActivityFailure
actFailure.stack
Failure -> (Failure -> Failure) -> Failure
forall s t. s -> (s -> t) -> t
& LensLike' f Failure Failure
forall {f :: * -> *}. Identical f => LensLike' f Failure Failure
forall (f :: * -> *) s a.
(Functor f, HasField s "cause" a) =>
LensLike' f s a
F.cause
(forall {f :: * -> *}. Identical f => LensLike' f Failure Failure)
-> Failure -> Failure -> Failure
forall s t a b. Setter s t a b -> b -> s -> t
.~ ( Failure
forall msg. Message msg => msg
defMessage
Failure -> (Failure -> Failure) -> Failure
forall s t. s -> (s -> t) -> t
& LensLike' f Failure Text
forall {f :: * -> *}. Identical f => LensLike' f Failure Text
forall (f :: * -> *) s a.
(Functor f, HasField s "message" a) =>
LensLike' f s a
F.message (forall {f :: * -> *}. Identical f => LensLike' f Failure Text)
-> Text -> Failure -> Failure
forall s t a b. Setter s t a b -> b -> s -> t
.~ ApplicationFailure
appFailure.message
Failure -> (Failure -> Failure) -> Failure
forall s t. s -> (s -> t) -> t
& LensLike' f Failure Text
forall {f :: * -> *}. Identical f => LensLike' f Failure Text
forall (f :: * -> *) s a.
(Functor f, HasField s "source" a) =>
LensLike' f s a
F.source (forall {f :: * -> *}. Identical f => LensLike' f Failure Text)
-> Text -> Failure -> Failure
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
"hs-temporal-sdk"
Failure -> (Failure -> Failure) -> Failure
forall s t. s -> (s -> t) -> t
& LensLike' f Failure Text
forall {f :: * -> *}. Identical f => LensLike' f Failure Text
forall (f :: * -> *) s a.
(Functor f, HasField s "stackTrace" a) =>
LensLike' f s a
F.stackTrace (forall {f :: * -> *}. Identical f => LensLike' f Failure Text)
-> Text -> Failure -> Failure
forall s t a b. Setter s t a b -> b -> s -> t
.~ ApplicationFailure
appFailure.stack
Failure -> (Failure -> Failure) -> Failure
forall s t. s -> (s -> t) -> t
& LensLike' f Failure ApplicationFailureInfo
forall {f :: * -> *}.
Identical f =>
LensLike' f Failure ApplicationFailureInfo
forall (f :: * -> *) s a.
(Functor f, HasField s "applicationFailureInfo" a) =>
LensLike' f s a
F.applicationFailureInfo
(forall {f :: * -> *}.
Identical f =>
LensLike' f Failure ApplicationFailureInfo)
-> ApplicationFailureInfo -> Failure -> Failure
forall s t a b. Setter s t a b -> b -> s -> t
.~ ( ApplicationFailureInfo
forall msg. Message msg => msg
defMessage
ApplicationFailureInfo
-> (ApplicationFailureInfo -> ApplicationFailureInfo)
-> ApplicationFailureInfo
forall s t. s -> (s -> t) -> t
& LensLike' f ApplicationFailureInfo Text
forall {f :: * -> *}.
Identical f =>
LensLike' f ApplicationFailureInfo Text
forall (f :: * -> *) s a.
(Functor f, HasField s "type'" a) =>
LensLike' f s a
F.type' (forall {f :: * -> *}.
Identical f =>
LensLike' f ApplicationFailureInfo Text)
-> Text -> ApplicationFailureInfo -> ApplicationFailureInfo
forall s t a b. Setter s t a b -> b -> s -> t
.~ ApplicationFailure -> Text
Err.type' ApplicationFailure
appFailure
ApplicationFailureInfo
-> (ApplicationFailureInfo -> ApplicationFailureInfo)
-> ApplicationFailureInfo
forall s t. s -> (s -> t) -> t
& LensLike' f ApplicationFailureInfo Bool
forall {f :: * -> *}.
Identical f =>
LensLike' f ApplicationFailureInfo Bool
forall (f :: * -> *) s a.
(Functor f, HasField s "nonRetryable" a) =>
LensLike' f s a
F.nonRetryable (forall {f :: * -> *}.
Identical f =>
LensLike' f ApplicationFailureInfo Bool)
-> Bool -> ApplicationFailureInfo -> ApplicationFailureInfo
forall s t a b. Setter s t a b -> b -> s -> t
.~ ApplicationFailure -> Bool
Err.nonRetryable ApplicationFailure
appFailure
)
)
WorkflowCommand -> InstanceM WorkflowCommand
forall a. a -> InstanceM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (WorkflowCommand -> InstanceM WorkflowCommand)
-> WorkflowCommand -> InstanceM WorkflowCommand
forall a b. (a -> b) -> a -> b
$
WorkflowCommand
forall msg. Message msg => msg
defMessage
WorkflowCommand
-> (WorkflowCommand -> WorkflowCommand) -> WorkflowCommand
forall s t. s -> (s -> t) -> t
& LensLike' f WorkflowCommand FailWorkflowExecution
forall {f :: * -> *}.
Identical f =>
LensLike' f WorkflowCommand FailWorkflowExecution
forall (f :: * -> *) s a.
(Functor f, HasField s "failWorkflowExecution" a) =>
LensLike' f s a
Command.failWorkflowExecution
(forall {f :: * -> *}.
Identical f =>
LensLike' f WorkflowCommand FailWorkflowExecution)
-> FailWorkflowExecution -> WorkflowCommand -> WorkflowCommand
forall s t a b. Setter s t a b -> b -> s -> t
.~ ( FailWorkflowExecution
forall msg. Message msg => msg
defMessage
FailWorkflowExecution
-> (FailWorkflowExecution -> FailWorkflowExecution)
-> FailWorkflowExecution
forall s t. s -> (s -> t) -> t
& LensLike' f FailWorkflowExecution Failure
forall {f :: * -> *}.
Identical f =>
LensLike' f FailWorkflowExecution Failure
forall (f :: * -> *) s a.
(Functor f, HasField s "failure" a) =>
LensLike' f s a
Command.failure (forall {f :: * -> *}.
Identical f =>
LensLike' f FailWorkflowExecution Failure)
-> Failure -> FailWorkflowExecution -> FailWorkflowExecution
forall s t a b. Setter s t a b -> b -> s -> t
.~ Failure
enrichedApplicationFailure
)
WorkflowExitFailed SomeException
e -> do
w <- InstanceM WorkflowInstance
forall r (m :: * -> *). MonadReader r m => m r
ask
let appFailure = SomeException -> [ApplicationFailureHandler] -> ApplicationFailure
mkApplicationFailure SomeException
e WorkflowInstance
w.errorConverters
enrichedApplicationFailure = ApplicationFailure -> Failure
applicationFailureToFailureProto ApplicationFailure
appFailure
pure $
defMessage
& Command.failWorkflowExecution
.~ ( defMessage
& Command.failure .~ enrichedApplicationFailure
)
runTopLevel :: InstanceM Payload -> InstanceM (WorkflowExitVariant Payload)
runTopLevel :: InstanceM Payload -> InstanceM (WorkflowExitVariant Payload)
runTopLevel InstanceM Payload
m = do
(Payload -> WorkflowExitVariant Payload
forall a. a -> WorkflowExitVariant a
WorkflowExitSuccess (Payload -> WorkflowExitVariant Payload)
-> InstanceM Payload -> InstanceM (WorkflowExitVariant Payload)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> InstanceM Payload
m)
InstanceM (WorkflowExitVariant Payload)
-> [Handler InstanceM (WorkflowExitVariant Payload)]
-> InstanceM (WorkflowExitVariant Payload)
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> [Handler m a] -> m a
`catches` [ (ContinueAsNewException -> InstanceM (WorkflowExitVariant Payload))
-> Handler InstanceM (WorkflowExitVariant Payload)
forall (m :: * -> *) a e. Exception e => (e -> m a) -> Handler m a
Handler ((ContinueAsNewException
-> InstanceM (WorkflowExitVariant Payload))
-> Handler InstanceM (WorkflowExitVariant Payload))
-> (ContinueAsNewException
-> InstanceM (WorkflowExitVariant Payload))
-> Handler InstanceM (WorkflowExitVariant Payload)
forall a b. (a -> b) -> a -> b
$ \e :: ContinueAsNewException
e@(ContinueAsNewException {}) -> WorkflowExitVariant Payload
-> InstanceM (WorkflowExitVariant Payload)
forall a. a -> InstanceM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (WorkflowExitVariant Payload
-> InstanceM (WorkflowExitVariant Payload))
-> WorkflowExitVariant Payload
-> InstanceM (WorkflowExitVariant Payload)
forall a b. (a -> b) -> a -> b
$ ContinueAsNewException -> WorkflowExitVariant Payload
forall a. ContinueAsNewException -> WorkflowExitVariant a
WorkflowExitContinuedAsNew ContinueAsNewException
e
, (WorkflowCancelRequested
-> InstanceM (WorkflowExitVariant Payload))
-> Handler InstanceM (WorkflowExitVariant Payload)
forall (m :: * -> *) a e. Exception e => (e -> m a) -> Handler m a
Handler ((WorkflowCancelRequested
-> InstanceM (WorkflowExitVariant Payload))
-> Handler InstanceM (WorkflowExitVariant Payload))
-> (WorkflowCancelRequested
-> InstanceM (WorkflowExitVariant Payload))
-> Handler InstanceM (WorkflowExitVariant Payload)
forall a b. (a -> b) -> a -> b
$ \WorkflowCancelRequested
WorkflowCancelRequested -> do
WorkflowExitVariant Payload
-> InstanceM (WorkflowExitVariant Payload)
forall a. a -> InstanceM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (WorkflowExitVariant Payload
-> InstanceM (WorkflowExitVariant Payload))
-> WorkflowExitVariant Payload
-> InstanceM (WorkflowExitVariant Payload)
forall a b. (a -> b) -> a -> b
$ WorkflowCancelRequested -> WorkflowExitVariant Payload
forall a. WorkflowCancelRequested -> WorkflowExitVariant a
WorkflowExitCancelled WorkflowCancelRequested
WorkflowCancelRequested
, (SomeException -> InstanceM (WorkflowExitVariant Payload))
-> Handler InstanceM (WorkflowExitVariant Payload)
forall (m :: * -> *) a e. Exception e => (e -> m a) -> Handler m a
Handler ((SomeException -> InstanceM (WorkflowExitVariant Payload))
-> Handler InstanceM (WorkflowExitVariant Payload))
-> (SomeException -> InstanceM (WorkflowExitVariant Payload))
-> Handler InstanceM (WorkflowExitVariant Payload)
forall a b. (a -> b) -> a -> b
$ \(SomeException
e :: SomeException) -> do
WorkflowExitVariant Payload
-> InstanceM (WorkflowExitVariant Payload)
forall a. a -> InstanceM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (WorkflowExitVariant Payload
-> InstanceM (WorkflowExitVariant Payload))
-> WorkflowExitVariant Payload
-> InstanceM (WorkflowExitVariant Payload)
forall a b. (a -> b) -> a -> b
$ SomeException -> WorkflowExitVariant Payload
forall a. SomeException -> WorkflowExitVariant a
WorkflowExitFailed SomeException
e
]