{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE UndecidableInstances #-}
module Temporal.Workflow.Eval where
import Control.Monad.Logger
import Control.Monad.Reader
import Data.Text (Text)
import qualified Data.Text as Text
import GHC.Stack
import RequireCallStack
import Temporal.Common
import Temporal.Coroutine
import Temporal.Workflow.Internal.Monad
import Temporal.Workflow.Types
import Text.Printf
import UnliftIO
import Unsafe.Coerce
withRunId :: Text -> InstanceM Text
withRunId :: Text -> InstanceM Text
withRunId Text
arg = do
inst <- InstanceM WorkflowInstance
forall r (m :: * -> *). MonadReader r m => m r
info <- readIORef inst.workflowInstanceInfo
return ("[runId=" <> rawRunId info.runId <> "] " <> arg)
type SuspendableWorkflowExecution a = Coroutine (Await [ActivationResult]) InstanceM a
data ActivationResult
= forall a.
(ResultVal a)
!(IVar a)
runWorkflow :: forall a. HasCallStack => (RequireCallStackImpl => Workflow a) -> SuspendableWorkflowExecution a
runWorkflow :: forall a.
HasCallStack =>
(RequireCallStackImpl => Workflow a)
-> SuspendableWorkflowExecution a
runWorkflow RequireCallStackImpl => Workflow a
wf = (RequireCallStackImpl => SuspendableWorkflowExecution a)
-> SuspendableWorkflowExecution a
forall r. (RequireCallStackImpl => r) -> r
provideCallStack ((RequireCallStackImpl => SuspendableWorkflowExecution a)
-> SuspendableWorkflowExecution a)
-> (RequireCallStackImpl => SuspendableWorkflowExecution a)
-> SuspendableWorkflowExecution a
forall a b. (a -> b) -> a -> b
$ do
inst <- InstanceM WorkflowInstance
-> Coroutine (Await [ActivationResult]) InstanceM WorkflowInstance
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift InstanceM WorkflowInstance
forall r (m :: * -> *). MonadReader r m => m r
pendingActivations <- lift $ newTVarIO []
finalResult@IVar {ivarRef = resultRef} <- newIVar
schedule :: ContinuationEnv -> JobList -> Workflow b -> IVar b -> SuspendableWorkflowExecution ()
schedule ContinuationEnv
env JobList
rq Workflow b
wf' ivar :: IVar b
ivar@IVar {ivarRef :: forall a. IVar a -> IORef (IVarContents a)
ivarRef = !IORef (IVarContents b)
ref} = do
InstanceM () -> SuspendableWorkflowExecution ()
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (InstanceM () -> SuspendableWorkflowExecution ())
-> InstanceM () -> SuspendableWorkflowExecution ()
forall a b. (a -> b) -> a -> b
$ do
cs <- IORef CallStack -> InstanceM CallStack
forall (m :: * -> *) a. MonadIO m => IORef a -> m a
readIORef WorkflowInstance
logMsg <- withRunId (Text.pack $ printf "schedule: %d\n%s" (1 + lengthJobList rq) $ prettyCallStack cs)
$logDebug logMsg
let {-# INLINE result #-}
result :: ResultVal b -> SuspendableWorkflowExecution ()
result ResultVal b
r = do
e <- InstanceM (IVarContents b)
-> Coroutine (Await [ActivationResult]) InstanceM (IVarContents b)
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (InstanceM (IVarContents b)
-> Coroutine (Await [ActivationResult]) InstanceM (IVarContents b))
-> InstanceM (IVarContents b)
-> Coroutine (Await [ActivationResult]) InstanceM (IVarContents b)
forall a b. (a -> b) -> a -> b
$ IORef (IVarContents b) -> InstanceM (IVarContents b)
forall (m :: * -> *) a. MonadIO m => IORef a -> m a
readIORef IORef (IVarContents b)
case e of
IVarFull ResultVal b
_ ->
ContinuationEnv -> JobList -> SuspendableWorkflowExecution ()
reschedule ContinuationEnv
env JobList
IVarEmpty JobList
workflowActions -> do
InstanceM () -> SuspendableWorkflowExecution ()
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (InstanceM () -> SuspendableWorkflowExecution ())
-> InstanceM () -> SuspendableWorkflowExecution ()
forall a b. (a -> b) -> a -> b
$ IORef (IVarContents b) -> IVarContents b -> InstanceM ()
forall (m :: * -> *) a. MonadIO m => IORef a -> a -> m ()
writeIORef IORef (IVarContents b)
ref (ResultVal b -> IVarContents b
forall a. ResultVal a -> IVarContents a
IVarFull ResultVal b
if IORef (IVarContents b)
ref IORef (IVarContents b) -> IORef (IVarContents b) -> Bool
forall a. Eq a => a -> a -> Bool
== IORef (IVarContents a) -> IORef (IVarContents b)
forall a b. a -> b
unsafeCoerce IORef (IVarContents a)
() -> SuspendableWorkflowExecution ()
forall a. a -> Coroutine (Await [ActivationResult]) InstanceM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
ContinuationEnv -> JobList -> SuspendableWorkflowExecution ()
reschedule ContinuationEnv
env (JobList -> SuspendableWorkflowExecution ())
-> JobList -> SuspendableWorkflowExecution ()
forall a b. (a -> b) -> a -> b
$ JobList -> JobList -> JobList
appendJobList JobList
workflowActions JobList
r <- InstanceM (Either SomeException (Result b))
-> Coroutine
(Await [ActivationResult])
(Either SomeException (Result b))
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (InstanceM (Either SomeException (Result b))
-> Coroutine
(Await [ActivationResult])
(Either SomeException (Result b)))
-> InstanceM (Either SomeException (Result b))
-> Coroutine
(Await [ActivationResult])
(Either SomeException (Result b))
forall a b. (a -> b) -> a -> b
$ InstanceM (Result b) -> InstanceM (Either SomeException (Result b))
forall (m :: * -> *) e a.
(MonadUnliftIO m, Exception e) =>
m a -> m (Either e a)
UnliftIO.try (InstanceM (Result b)
-> InstanceM (Either SomeException (Result b)))
-> InstanceM (Result b)
-> InstanceM (Either SomeException (Result b))
forall a b. (a -> b) -> a -> b
$ do
let (Workflow ContinuationEnv -> InstanceM (Result b)
run) = Workflow b
ContinuationEnv -> InstanceM (Result b)
run ContinuationEnv
case r of
Left SomeException
e -> do
SomeException -> SuspendableWorkflowExecution ()
forall (m :: * -> *). MonadIO m => SomeException -> m ()
rethrowAsyncExceptions SomeException
ResultVal b -> SuspendableWorkflowExecution ()
result (ResultVal b -> SuspendableWorkflowExecution ())
-> ResultVal b -> SuspendableWorkflowExecution ()
forall a b. (a -> b) -> a -> b
$ SomeException -> ResultVal b
forall a. SomeException -> ResultVal a
ThrowInternal SomeException
Right (Done b
a) -> ResultVal b -> SuspendableWorkflowExecution ()
result (ResultVal b -> SuspendableWorkflowExecution ())
-> ResultVal b -> SuspendableWorkflowExecution ()
forall a b. (a -> b) -> a -> b
$ b -> ResultVal b
forall a. a -> ResultVal a
Ok b
Right (Throw SomeException
ex) -> ResultVal b -> SuspendableWorkflowExecution ()
result (ResultVal b -> SuspendableWorkflowExecution ())
-> ResultVal b -> SuspendableWorkflowExecution ()
forall a b. (a -> b) -> a -> b
$ SomeException -> ResultVal b
forall a. SomeException -> ResultVal a
ThrowWorkflow SomeException
Right (Blocked IVar b
i Cont b
fn) -> do
InstanceM () -> SuspendableWorkflowExecution ()
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift ($Int
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> InstanceM ()
(Text -> InstanceM ()) -> (Text -> Text) -> Text -> InstanceM ()
forall a. a -> a
forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> InstanceM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
logDebug (Text -> InstanceM ()) -> InstanceM Text -> InstanceM ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Text -> InstanceM Text
withRunId Text
"scheduled job blocked")
InstanceM () -> SuspendableWorkflowExecution ()
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (InstanceM () -> SuspendableWorkflowExecution ())
-> InstanceM () -> SuspendableWorkflowExecution ()
forall a b. (a -> b) -> a -> b
$ ContinuationEnv -> Workflow b -> IVar b -> IVar b -> InstanceM ()
forall b a.
ContinuationEnv -> Workflow b -> IVar b -> IVar a -> InstanceM ()
addJob ContinuationEnv
env (Cont b -> Workflow b
forall a. Cont a -> Workflow a
toWf Cont b
fn) IVar b
ivar IVar b
ContinuationEnv -> JobList -> SuspendableWorkflowExecution ()
reschedule ContinuationEnv
env JobList
reschedule :: ContinuationEnv -> JobList -> SuspendableWorkflowExecution ()
reschedule env :: ContinuationEnv
env@ContinuationEnv {IORef JobList
runQueueRef :: IORef JobList
runQueueRef :: ContinuationEnv -> IORef JobList
..} JobList
jobs = do
InstanceM () -> SuspendableWorkflowExecution ()
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift ($Int
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> InstanceM ()
(Text -> InstanceM ()) -> (Text -> Text) -> Text -> InstanceM ()
forall a. a -> a
forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> InstanceM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
logDebug (Text -> InstanceM ()) -> InstanceM Text -> InstanceM ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Text -> InstanceM Text
withRunId Text
case JobList
jobs of
JobNil -> do
rq <- InstanceM JobList
-> Coroutine (Await [ActivationResult]) InstanceM JobList
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (InstanceM JobList
-> Coroutine (Await [ActivationResult]) InstanceM JobList)
-> InstanceM JobList
-> Coroutine (Await [ActivationResult]) InstanceM JobList
forall a b. (a -> b) -> a -> b
$ IORef JobList -> InstanceM JobList
forall (m :: * -> *) a. MonadIO m => IORef a -> m a
readIORef IORef JobList
case rq of
JobNil -> ContinuationEnv -> SuspendableWorkflowExecution ()
emptyRunQueue ContinuationEnv
JobCons ContinuationEnv
env' Workflow a
a IVar a
b JobList
c -> do
InstanceM () -> SuspendableWorkflowExecution ()
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (InstanceM () -> SuspendableWorkflowExecution ())
-> InstanceM () -> SuspendableWorkflowExecution ()
forall a b. (a -> b) -> a -> b
$ IORef JobList -> JobList -> InstanceM ()
forall (m :: * -> *) a. MonadIO m => IORef a -> a -> m ()
writeIORef IORef JobList
runQueueRef JobList
-> JobList
-> Workflow a
-> IVar a
-> SuspendableWorkflowExecution ()
forall b.
-> JobList
-> Workflow b
-> IVar b
-> SuspendableWorkflowExecution ()
schedule ContinuationEnv
env' JobList
c Workflow a
a IVar a
JobCons ContinuationEnv
env' Workflow a
a IVar a
b JobList
c ->
-> JobList
-> Workflow a
-> IVar a
-> SuspendableWorkflowExecution ()
forall b.
-> JobList
-> Workflow b
-> IVar b
-> SuspendableWorkflowExecution ()
schedule ContinuationEnv
env' JobList
c Workflow a
a IVar a
emptyRunQueue :: ContinuationEnv -> SuspendableWorkflowExecution ()
emptyRunQueue ContinuationEnv
env = do
InstanceM () -> SuspendableWorkflowExecution ()
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (InstanceM () -> SuspendableWorkflowExecution ())
-> InstanceM () -> SuspendableWorkflowExecution ()
forall a b. (a -> b) -> a -> b
$ do
logMsg <- Text -> InstanceM Text
withRunId Text
$logDebug logMsg
workflowActions <- ContinuationEnv
-> Coroutine (Await [ActivationResult]) InstanceM JobList
checkActivationResults ContinuationEnv
case workflowActions of
JobNil -> ContinuationEnv -> SuspendableWorkflowExecution ()
awaitActivation ContinuationEnv
_ -> ContinuationEnv -> JobList -> SuspendableWorkflowExecution ()
reschedule ContinuationEnv
env JobList
awaitActivation :: ContinuationEnv -> SuspendableWorkflowExecution ()
awaitActivation ContinuationEnv
env = do
InstanceM () -> SuspendableWorkflowExecution ()
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift ($Int
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> InstanceM ()
(Text -> InstanceM ()) -> (Text -> Text) -> Text -> InstanceM ()
forall a. a -> a
forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> InstanceM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
logDebug (Text -> InstanceM ()) -> InstanceM Text -> InstanceM ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Text -> InstanceM Text
withRunId Text
ContinuationEnv -> SuspendableWorkflowExecution ()
waitActivationResults ContinuationEnv
checkActivationResults :: ContinuationEnv -> SuspendableWorkflowExecution JobList
checkActivationResults ContinuationEnv
_env = InstanceM JobList
-> Coroutine (Await [ActivationResult]) InstanceM JobList
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (InstanceM JobList
-> Coroutine (Await [ActivationResult]) InstanceM JobList)
-> InstanceM JobList
-> Coroutine (Await [ActivationResult]) InstanceM JobList
forall a b. (a -> b) -> a -> b
$ do
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> InstanceM ()
(Text -> InstanceM ()) -> (Text -> Text) -> Text -> InstanceM ()
forall a. a -> a
forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> InstanceM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
logDebug (Text -> InstanceM ()) -> InstanceM Text -> InstanceM ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Text -> InstanceM Text
withRunId Text
comps <- STM [ActivationResult] -> InstanceM [ActivationResult]
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM [ActivationResult] -> InstanceM [ActivationResult])
-> STM [ActivationResult] -> InstanceM [ActivationResult]
forall a b. (a -> b) -> a -> b
$ do
c <- TVar [ActivationResult] -> STM [ActivationResult]
forall a. TVar a -> STM a
readTVar TVar [ActivationResult]
writeTVar pendingActivations []
pure c
case comps of
[] -> do
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> InstanceM ()
(Text -> InstanceM ()) -> (Text -> Text) -> Text -> InstanceM ()
forall a. a -> a
forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> InstanceM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
logDebug (Text -> InstanceM ()) -> InstanceM Text -> InstanceM ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Text -> InstanceM Text
withRunId Text
"No new activation results"
JobList -> InstanceM JobList
forall a. a -> InstanceM a
forall (m :: * -> *) a. Monad m => a -> m a
return JobList
_ -> do
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> InstanceM ()
(Text -> InstanceM ()) -> (Text -> Text) -> Text -> InstanceM ()
forall a. a -> a
forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> InstanceM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
logDebug (Text -> InstanceM ()) -> InstanceM Text -> InstanceM ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Text -> InstanceM Text
withRunId (String -> Text
Text.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ String -> Int -> String
forall r. PrintfType r => String -> r
printf String
"%d complete" ([ActivationResult] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [ActivationResult]
getComplete :: ActivationResult -> InstanceM JobList
getComplete (ActivationResult ResultVal a
a IVar {ivarRef :: forall a. IVar a -> IORef (IVarContents a)
ivarRef = IORef (IVarContents a)
cr}) = do
r <- IORef (IVarContents a) -> InstanceM (IVarContents a)
forall (m :: * -> *) a. MonadIO m => IORef a -> m a
readIORef IORef (IVarContents a)
case r of
IVarFull ResultVal a
_ -> do
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> InstanceM ()
(Text -> InstanceM ()) -> (Text -> Text) -> Text -> InstanceM ()
forall a. a -> a
forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> InstanceM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
logDebug (Text -> InstanceM ()) -> InstanceM Text -> InstanceM ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Text -> InstanceM Text
withRunId Text
"existing result"
JobList -> InstanceM JobList
forall a. a -> InstanceM a
forall (m :: * -> *) a. Monad m => a -> m a
return JobList
IVarEmpty JobList
cv -> do
IORef (IVarContents a) -> IVarContents a -> InstanceM ()
forall (m :: * -> *) a. MonadIO m => IORef a -> a -> m ()
writeIORef IORef (IVarContents a)
cr (ResultVal a -> IVarContents a
forall a. ResultVal a -> IVarContents a
IVarFull ResultVal a
JobList -> InstanceM JobList
forall a. a -> InstanceM a
forall (m :: * -> *) a. Monad m => a -> m a
return JobList
jobs <- (ActivationResult -> InstanceM JobList)
-> [ActivationResult] -> InstanceM [JobList]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM ActivationResult -> InstanceM JobList
getComplete [ActivationResult]
return (foldr appendJobList JobNil jobs)
waitActivationResults :: ContinuationEnv -> SuspendableWorkflowExecution ()
waitActivationResults ContinuationEnv
env = do
InstanceM () -> SuspendableWorkflowExecution ()
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift ($Int
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> InstanceM ()
(Text -> InstanceM ()) -> (Text -> Text) -> Text -> InstanceM ()
forall a. a -> a
forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> InstanceM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
logDebug (Text -> InstanceM ()) -> InstanceM Text -> InstanceM ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Text -> InstanceM Text
withRunId Text
newActivations <- do
activations <- InstanceM [ActivationResult]
-> Coroutine
(Await [ActivationResult]) InstanceM [ActivationResult]
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (InstanceM [ActivationResult]
-> Coroutine
(Await [ActivationResult]) InstanceM [ActivationResult])
-> InstanceM [ActivationResult]
-> Coroutine
(Await [ActivationResult]) InstanceM [ActivationResult]
forall a b. (a -> b) -> a -> b
$ TVar [ActivationResult] -> InstanceM [ActivationResult]
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar [ActivationResult]
if null activations
then await
else do
lift $ atomically $ writeTVar pendingActivations []
return activations
atomically $ writeTVar pendingActivations newActivations
jobs <- lift $ readIORef env.runQueueRef
case jobs of
JobNil -> do
ContinuationEnv -> SuspendableWorkflowExecution ()
emptyRunQueue ContinuationEnv
_ -> do
InstanceM () -> SuspendableWorkflowExecution ()
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (InstanceM () -> SuspendableWorkflowExecution ())
-> InstanceM () -> SuspendableWorkflowExecution ()
forall a b. (a -> b) -> a -> b
$ IORef JobList -> JobList -> InstanceM ()
forall (m :: * -> *) a. MonadIO m => IORef a -> a -> m ()
writeIORef ContinuationEnv
env.runQueueRef JobList
ContinuationEnv -> JobList -> SuspendableWorkflowExecution ()
reschedule ContinuationEnv
env JobList
let env = WorkflowInstance
schedule env JobNil wf finalResult
r <- readIORef resultRef
case r of
IVarEmpty JobList
_ -> String -> SuspendableWorkflowExecution a
forall a. HasCallStack => String -> a
error String
"runWorkflow: missing result"
IVarFull (Ok a
a) -> a -> SuspendableWorkflowExecution a
forall a. a -> Coroutine (Await [ActivationResult]) InstanceM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
IVarFull (ThrowWorkflow SomeException
e) -> SomeException -> SuspendableWorkflowExecution a
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO SomeException
IVarFull (ThrowInternal SomeException
e) -> SomeException -> SuspendableWorkflowExecution a
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO SomeException
rethrowAsyncExceptions :: MonadIO m => SomeException -> m ()
rethrowAsyncExceptions :: forall (m :: * -> *). MonadIO m => SomeException -> m ()
rethrowAsyncExceptions SomeException
| Just SomeAsyncException {} <- SomeException -> Maybe SomeAsyncException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e = SomeException -> m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
UnliftIO.throwIO SomeException
| Bool
otherwise = () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
injectWorkflowSignal :: Workflow () -> InstanceM ()
injectWorkflowSignal :: Workflow () -> InstanceM ()
injectWorkflowSignal Workflow ()
signal = do
result <- InstanceM (IVar ())
forall (m :: * -> *) a. MonadIO m => m (IVar a)
inst <- ask
let env@(ContinuationEnv jobList) = inst.workflowInstanceContinuationEnv
modifyIORef' jobList $ \JobList
j -> ContinuationEnv -> Workflow () -> IVar () -> JobList -> JobList
forall a.
ContinuationEnv -> Workflow a -> IVar a -> JobList -> JobList
JobCons ContinuationEnv
env Workflow ()
signal IVar ()
result JobList