{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE UndecidableInstances #-}
module Temporal.Workflow.Eval where
import Control.Monad.Logger
import Control.Monad.Reader
import Data.Text (Text)
import qualified Data.Text as Text
import GHC.Stack
import RequireCallStack
import Temporal.Common
import Temporal.Coroutine
import Temporal.Workflow.Internal.Monad
import Temporal.Workflow.Types
import Text.Printf
import UnliftIO
import Unsafe.Coerce
withRunId :: Text -> InstanceM Text
withRunId :: Text -> InstanceM Text
withRunId Text
arg = do
inst <- InstanceM WorkflowInstance
forall r (m :: * -> *). MonadReader r m => m r
ask
info <- readIORef inst.workflowInstanceInfo
return ("[runId=" <> rawRunId info.runId <> "] " <> arg)
type SuspendableWorkflowExecution a = Coroutine (Await [ActivationResult]) InstanceM a
data ActivationResult
= forall a.
ActivationResult
(ResultVal a)
!(IVar a)
runWorkflow :: forall a. HasCallStack => (RequireCallStackImpl => Workflow a) -> SuspendableWorkflowExecution a
runWorkflow :: forall a.
HasCallStack =>
(RequireCallStackImpl => Workflow a)
-> SuspendableWorkflowExecution a
runWorkflow RequireCallStackImpl => Workflow a
wf = (RequireCallStackImpl => SuspendableWorkflowExecution a)
-> SuspendableWorkflowExecution a
forall r. (RequireCallStackImpl => r) -> r
provideCallStack ((RequireCallStackImpl => SuspendableWorkflowExecution a)
-> SuspendableWorkflowExecution a)
-> (RequireCallStackImpl => SuspendableWorkflowExecution a)
-> SuspendableWorkflowExecution a
forall a b. (a -> b) -> a -> b
$ do
inst <- InstanceM WorkflowInstance
-> Coroutine (Await [ActivationResult]) InstanceM WorkflowInstance
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift InstanceM WorkflowInstance
forall r (m :: * -> *). MonadReader r m => m r
ask
pendingActivations <- lift $ newTVarIO []
finalResult@IVar {ivarRef = resultRef} <- newIVar
let
schedule :: ContinuationEnv -> JobList -> Workflow b -> IVar b -> SuspendableWorkflowExecution ()
schedule ContinuationEnv
env JobList
rq Workflow b
wf' ivar :: IVar b
ivar@IVar {ivarRef :: forall a. IVar a -> IORef (IVarContents a)
ivarRef = !IORef (IVarContents b)
ref} = do
InstanceM () -> SuspendableWorkflowExecution ()
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (InstanceM () -> SuspendableWorkflowExecution ())
-> InstanceM () -> SuspendableWorkflowExecution ()
forall a b. (a -> b) -> a -> b
$ do
cs <- IORef CallStack -> InstanceM CallStack
forall (m :: * -> *) a. MonadIO m => IORef a -> m a
readIORef WorkflowInstance
inst.workflowCallStack
logMsg <- withRunId (Text.pack $ printf "schedule: %d\n%s" (1 + lengthJobList rq) $ prettyCallStack cs)
$logDebug logMsg
let {-# INLINE result #-}
result :: ResultVal b -> SuspendableWorkflowExecution ()
result ResultVal b
r = do
e <- InstanceM (IVarContents b)
-> Coroutine (Await [ActivationResult]) InstanceM (IVarContents b)
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (InstanceM (IVarContents b)
-> Coroutine (Await [ActivationResult]) InstanceM (IVarContents b))
-> InstanceM (IVarContents b)
-> Coroutine (Await [ActivationResult]) InstanceM (IVarContents b)
forall a b. (a -> b) -> a -> b
$ IORef (IVarContents b) -> InstanceM (IVarContents b)
forall (m :: * -> *) a. MonadIO m => IORef a -> m a
readIORef IORef (IVarContents b)
ref
case e of
IVarFull ResultVal b
_ ->
ContinuationEnv -> JobList -> SuspendableWorkflowExecution ()
reschedule ContinuationEnv
env JobList
rq
IVarEmpty JobList
workflowActions -> do
InstanceM () -> SuspendableWorkflowExecution ()
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (InstanceM () -> SuspendableWorkflowExecution ())
-> InstanceM () -> SuspendableWorkflowExecution ()
forall a b. (a -> b) -> a -> b
$ IORef (IVarContents b) -> IVarContents b -> InstanceM ()
forall (m :: * -> *) a. MonadIO m => IORef a -> a -> m ()
writeIORef IORef (IVarContents b)
ref (ResultVal b -> IVarContents b
forall a. ResultVal a -> IVarContents a
IVarFull ResultVal b
r)
if IORef (IVarContents b)
ref IORef (IVarContents b) -> IORef (IVarContents b) -> Bool
forall a. Eq a => a -> a -> Bool
== IORef (IVarContents a) -> IORef (IVarContents b)
forall a b. a -> b
unsafeCoerce IORef (IVarContents a)
resultRef
then
() -> SuspendableWorkflowExecution ()
forall a. a -> Coroutine (Await [ActivationResult]) InstanceM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
else
ContinuationEnv -> JobList -> SuspendableWorkflowExecution ()
reschedule ContinuationEnv
env (JobList -> SuspendableWorkflowExecution ())
-> JobList -> SuspendableWorkflowExecution ()
forall a b. (a -> b) -> a -> b
$ JobList -> JobList -> JobList
appendJobList JobList
workflowActions JobList
rq
r <- InstanceM (Either SomeException (Result b))
-> Coroutine
(Await [ActivationResult])
InstanceM
(Either SomeException (Result b))
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (InstanceM (Either SomeException (Result b))
-> Coroutine
(Await [ActivationResult])
InstanceM
(Either SomeException (Result b)))
-> InstanceM (Either SomeException (Result b))
-> Coroutine
(Await [ActivationResult])
InstanceM
(Either SomeException (Result b))
forall a b. (a -> b) -> a -> b
$ InstanceM (Result b) -> InstanceM (Either SomeException (Result b))
forall (m :: * -> *) e a.
(MonadUnliftIO m, Exception e) =>
m a -> m (Either e a)
UnliftIO.try (InstanceM (Result b)
-> InstanceM (Either SomeException (Result b)))
-> InstanceM (Result b)
-> InstanceM (Either SomeException (Result b))
forall a b. (a -> b) -> a -> b
$ do
let (Workflow ContinuationEnv -> InstanceM (Result b)
run) = Workflow b
wf'
ContinuationEnv -> InstanceM (Result b)
run ContinuationEnv
env
case r of
Left SomeException
e -> do
SomeException -> SuspendableWorkflowExecution ()
forall (m :: * -> *). MonadIO m => SomeException -> m ()
rethrowAsyncExceptions SomeException
e
ResultVal b -> SuspendableWorkflowExecution ()
result (ResultVal b -> SuspendableWorkflowExecution ())
-> ResultVal b -> SuspendableWorkflowExecution ()
forall a b. (a -> b) -> a -> b
$ SomeException -> ResultVal b
forall a. SomeException -> ResultVal a
ThrowInternal SomeException
e
Right (Done b
a) -> ResultVal b -> SuspendableWorkflowExecution ()
result (ResultVal b -> SuspendableWorkflowExecution ())
-> ResultVal b -> SuspendableWorkflowExecution ()
forall a b. (a -> b) -> a -> b
$ b -> ResultVal b
forall a. a -> ResultVal a
Ok b
a
Right (Throw SomeException
ex) -> ResultVal b -> SuspendableWorkflowExecution ()
result (ResultVal b -> SuspendableWorkflowExecution ())
-> ResultVal b -> SuspendableWorkflowExecution ()
forall a b. (a -> b) -> a -> b
$ SomeException -> ResultVal b
forall a. SomeException -> ResultVal a
ThrowWorkflow SomeException
ex
Right (Blocked IVar b
i Cont b
fn) -> do
InstanceM () -> SuspendableWorkflowExecution ()
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift ($Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> InstanceM ()
(Text -> InstanceM ()) -> (Text -> Text) -> Text -> InstanceM ()
forall a. a -> a
forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> InstanceM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
logDebug (Text -> InstanceM ()) -> InstanceM Text -> InstanceM ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Text -> InstanceM Text
withRunId Text
"scheduled job blocked")
InstanceM () -> SuspendableWorkflowExecution ()
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (InstanceM () -> SuspendableWorkflowExecution ())
-> InstanceM () -> SuspendableWorkflowExecution ()
forall a b. (a -> b) -> a -> b
$ ContinuationEnv -> Workflow b -> IVar b -> IVar b -> InstanceM ()
forall b a.
ContinuationEnv -> Workflow b -> IVar b -> IVar a -> InstanceM ()
addJob ContinuationEnv
env (Cont b -> Workflow b
forall a. Cont a -> Workflow a
toWf Cont b
fn) IVar b
ivar IVar b
i
ContinuationEnv -> JobList -> SuspendableWorkflowExecution ()
reschedule ContinuationEnv
env JobList
rq
reschedule :: ContinuationEnv -> JobList -> SuspendableWorkflowExecution ()
reschedule env :: ContinuationEnv
env@ContinuationEnv {IORef JobList
runQueueRef :: IORef JobList
runQueueRef :: ContinuationEnv -> IORef JobList
..} JobList
jobs = do
InstanceM () -> SuspendableWorkflowExecution ()
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift ($Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> InstanceM ()
(Text -> InstanceM ()) -> (Text -> Text) -> Text -> InstanceM ()
forall a. a -> a
forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> InstanceM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
logDebug (Text -> InstanceM ()) -> InstanceM Text -> InstanceM ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Text -> InstanceM Text
withRunId Text
"reschedule")
case JobList
jobs of
JobList
JobNil -> do
rq <- InstanceM JobList
-> Coroutine (Await [ActivationResult]) InstanceM JobList
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (InstanceM JobList
-> Coroutine (Await [ActivationResult]) InstanceM JobList)
-> InstanceM JobList
-> Coroutine (Await [ActivationResult]) InstanceM JobList
forall a b. (a -> b) -> a -> b
$ IORef JobList -> InstanceM JobList
forall (m :: * -> *) a. MonadIO m => IORef a -> m a
readIORef IORef JobList
runQueueRef
case rq of
JobList
JobNil -> ContinuationEnv -> SuspendableWorkflowExecution ()
emptyRunQueue ContinuationEnv
env
JobCons ContinuationEnv
env' Workflow a
a IVar a
b JobList
c -> do
InstanceM () -> SuspendableWorkflowExecution ()
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (InstanceM () -> SuspendableWorkflowExecution ())
-> InstanceM () -> SuspendableWorkflowExecution ()
forall a b. (a -> b) -> a -> b
$ IORef JobList -> JobList -> InstanceM ()
forall (m :: * -> *) a. MonadIO m => IORef a -> a -> m ()
writeIORef IORef JobList
runQueueRef JobList
JobNil
ContinuationEnv
-> JobList
-> Workflow a
-> IVar a
-> SuspendableWorkflowExecution ()
forall b.
ContinuationEnv
-> JobList
-> Workflow b
-> IVar b
-> SuspendableWorkflowExecution ()
schedule ContinuationEnv
env' JobList
c Workflow a
a IVar a
b
JobCons ContinuationEnv
env' Workflow a
a IVar a
b JobList
c ->
ContinuationEnv
-> JobList
-> Workflow a
-> IVar a
-> SuspendableWorkflowExecution ()
forall b.
ContinuationEnv
-> JobList
-> Workflow b
-> IVar b
-> SuspendableWorkflowExecution ()
schedule ContinuationEnv
env' JobList
c Workflow a
a IVar a
b
emptyRunQueue :: ContinuationEnv -> SuspendableWorkflowExecution ()
emptyRunQueue ContinuationEnv
env = do
InstanceM () -> SuspendableWorkflowExecution ()
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (InstanceM () -> SuspendableWorkflowExecution ())
-> InstanceM () -> SuspendableWorkflowExecution ()
forall a b. (a -> b) -> a -> b
$ do
logMsg <- Text -> InstanceM Text
withRunId Text
"emptyRunQueue"
$logDebug logMsg
workflowActions <- ContinuationEnv
-> Coroutine (Await [ActivationResult]) InstanceM JobList
checkActivationResults ContinuationEnv
env
case workflowActions of
JobList
JobNil -> ContinuationEnv -> SuspendableWorkflowExecution ()
awaitActivation ContinuationEnv
env
JobList
_ -> ContinuationEnv -> JobList -> SuspendableWorkflowExecution ()
reschedule ContinuationEnv
env JobList
workflowActions
awaitActivation :: ContinuationEnv -> SuspendableWorkflowExecution ()
awaitActivation ContinuationEnv
env = do
InstanceM () -> SuspendableWorkflowExecution ()
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift ($Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> InstanceM ()
(Text -> InstanceM ()) -> (Text -> Text) -> Text -> InstanceM ()
forall a. a -> a
forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> InstanceM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
logDebug (Text -> InstanceM ()) -> InstanceM Text -> InstanceM ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Text -> InstanceM Text
withRunId Text
"flushCommandsAndAwaitActivation")
ContinuationEnv -> SuspendableWorkflowExecution ()
waitActivationResults ContinuationEnv
env
checkActivationResults :: ContinuationEnv -> SuspendableWorkflowExecution JobList
checkActivationResults ContinuationEnv
_env = InstanceM JobList
-> Coroutine (Await [ActivationResult]) InstanceM JobList
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (InstanceM JobList
-> Coroutine (Await [ActivationResult]) InstanceM JobList)
-> InstanceM JobList
-> Coroutine (Await [ActivationResult]) InstanceM JobList
forall a b. (a -> b) -> a -> b
$ do
$Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> InstanceM ()
(Text -> InstanceM ()) -> (Text -> Text) -> Text -> InstanceM ()
forall a. a -> a
forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> InstanceM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
logDebug (Text -> InstanceM ()) -> InstanceM Text -> InstanceM ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Text -> InstanceM Text
withRunId Text
"checkActivationResults"
comps <- STM [ActivationResult] -> InstanceM [ActivationResult]
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM [ActivationResult] -> InstanceM [ActivationResult])
-> STM [ActivationResult] -> InstanceM [ActivationResult]
forall a b. (a -> b) -> a -> b
$ do
c <- TVar [ActivationResult] -> STM [ActivationResult]
forall a. TVar a -> STM a
readTVar TVar [ActivationResult]
pendingActivations
writeTVar pendingActivations []
pure c
case comps of
[] -> do
$Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> InstanceM ()
(Text -> InstanceM ()) -> (Text -> Text) -> Text -> InstanceM ()
forall a. a -> a
forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> InstanceM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
logDebug (Text -> InstanceM ()) -> InstanceM Text -> InstanceM ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Text -> InstanceM Text
withRunId Text
"No new activation results"
JobList -> InstanceM JobList
forall a. a -> InstanceM a
forall (m :: * -> *) a. Monad m => a -> m a
return JobList
JobNil
[ActivationResult]
_ -> do
$Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> InstanceM ()
(Text -> InstanceM ()) -> (Text -> Text) -> Text -> InstanceM ()
forall a. a -> a
forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> InstanceM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
logDebug (Text -> InstanceM ()) -> InstanceM Text -> InstanceM ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Text -> InstanceM Text
withRunId (String -> Text
Text.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ String -> Int -> String
forall r. PrintfType r => String -> r
printf String
"%d complete" ([ActivationResult] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [ActivationResult]
comps))
let
getComplete :: ActivationResult -> InstanceM JobList
getComplete (ActivationResult ResultVal a
a IVar {ivarRef :: forall a. IVar a -> IORef (IVarContents a)
ivarRef = IORef (IVarContents a)
cr}) = do
r <- IORef (IVarContents a) -> InstanceM (IVarContents a)
forall (m :: * -> *) a. MonadIO m => IORef a -> m a
readIORef IORef (IVarContents a)
cr
case r of
IVarFull ResultVal a
_ -> do
$Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> InstanceM ()
(Text -> InstanceM ()) -> (Text -> Text) -> Text -> InstanceM ()
forall a. a -> a
forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> InstanceM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
logDebug (Text -> InstanceM ()) -> InstanceM Text -> InstanceM ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Text -> InstanceM Text
withRunId Text
"existing result"
JobList -> InstanceM JobList
forall a. a -> InstanceM a
forall (m :: * -> *) a. Monad m => a -> m a
return JobList
JobNil
IVarEmpty JobList
cv -> do
IORef (IVarContents a) -> IVarContents a -> InstanceM ()
forall (m :: * -> *) a. MonadIO m => IORef a -> a -> m ()
writeIORef IORef (IVarContents a)
cr (ResultVal a -> IVarContents a
forall a. ResultVal a -> IVarContents a
IVarFull ResultVal a
a)
JobList -> InstanceM JobList
forall a. a -> InstanceM a
forall (m :: * -> *) a. Monad m => a -> m a
return JobList
cv
jobs <- (ActivationResult -> InstanceM JobList)
-> [ActivationResult] -> InstanceM [JobList]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM ActivationResult -> InstanceM JobList
getComplete [ActivationResult]
comps
return (foldr appendJobList JobNil jobs)
waitActivationResults :: ContinuationEnv -> SuspendableWorkflowExecution ()
waitActivationResults ContinuationEnv
env = do
InstanceM () -> SuspendableWorkflowExecution ()
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift ($Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> InstanceM ()
(Text -> InstanceM ()) -> (Text -> Text) -> Text -> InstanceM ()
forall a. a -> a
forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> InstanceM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
logDebug (Text -> InstanceM ()) -> InstanceM Text -> InstanceM ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Text -> InstanceM Text
withRunId Text
"waitActivationResults")
newActivations <- do
activations <- InstanceM [ActivationResult]
-> Coroutine
(Await [ActivationResult]) InstanceM [ActivationResult]
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (InstanceM [ActivationResult]
-> Coroutine
(Await [ActivationResult]) InstanceM [ActivationResult])
-> InstanceM [ActivationResult]
-> Coroutine
(Await [ActivationResult]) InstanceM [ActivationResult]
forall a b. (a -> b) -> a -> b
$ TVar [ActivationResult] -> InstanceM [ActivationResult]
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar [ActivationResult]
pendingActivations
if null activations
then await
else do
lift $ atomically $ writeTVar pendingActivations []
return activations
atomically $ writeTVar pendingActivations newActivations
jobs <- lift $ readIORef env.runQueueRef
case jobs of
JobList
JobNil -> do
ContinuationEnv -> SuspendableWorkflowExecution ()
emptyRunQueue ContinuationEnv
env
JobList
_ -> do
InstanceM () -> SuspendableWorkflowExecution ()
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (InstanceM () -> SuspendableWorkflowExecution ())
-> InstanceM () -> SuspendableWorkflowExecution ()
forall a b. (a -> b) -> a -> b
$ IORef JobList -> JobList -> InstanceM ()
forall (m :: * -> *) a. MonadIO m => IORef a -> a -> m ()
writeIORef ContinuationEnv
env.runQueueRef JobList
JobNil
ContinuationEnv -> JobList -> SuspendableWorkflowExecution ()
reschedule ContinuationEnv
env JobList
jobs
let env = WorkflowInstance
inst.workflowInstanceContinuationEnv
schedule env JobNil wf finalResult
r <- readIORef resultRef
case r of
IVarEmpty JobList
_ -> String -> SuspendableWorkflowExecution a
forall a. HasCallStack => String -> a
error String
"runWorkflow: missing result"
IVarFull (Ok a
a) -> a -> SuspendableWorkflowExecution a
forall a. a -> Coroutine (Await [ActivationResult]) InstanceM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
a
IVarFull (ThrowWorkflow SomeException
e) -> SomeException -> SuspendableWorkflowExecution a
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO SomeException
e
IVarFull (ThrowInternal SomeException
e) -> SomeException -> SuspendableWorkflowExecution a
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO SomeException
e
rethrowAsyncExceptions :: MonadIO m => SomeException -> m ()
rethrowAsyncExceptions :: forall (m :: * -> *). MonadIO m => SomeException -> m ()
rethrowAsyncExceptions SomeException
e
| Just SomeAsyncException {} <- SomeException -> Maybe SomeAsyncException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e = SomeException -> m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
UnliftIO.throwIO SomeException
e
| Bool
otherwise = () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
injectWorkflowSignal :: Workflow () -> InstanceM ()
injectWorkflowSignal :: Workflow () -> InstanceM ()
injectWorkflowSignal Workflow ()
signal = do
result <- InstanceM (IVar ())
forall (m :: * -> *) a. MonadIO m => m (IVar a)
newIVar
inst <- ask
let env@(ContinuationEnv jobList) = inst.workflowInstanceContinuationEnv
modifyIORef' jobList $ \JobList
j -> ContinuationEnv -> Workflow () -> IVar () -> JobList -> JobList
forall a.
ContinuationEnv -> Workflow a -> IVar a -> JobList -> JobList
JobCons ContinuationEnv
env Workflow ()
signal IVar ()
result JobList
j