{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE UndecidableInstances #-}

module Temporal.Workflow.Eval where

import Control.Monad.Logger
import Control.Monad.Reader
import Data.Text (Text)
import qualified Data.Text as Text
import GHC.Stack
import RequireCallStack
import Temporal.Common
import Temporal.Coroutine
import Temporal.Workflow.Internal.Monad
import Temporal.Workflow.Types
import Text.Printf
import UnliftIO
import Unsafe.Coerce


withRunId :: Text -> InstanceM Text
withRunId :: Text -> InstanceM Text
withRunId Text
arg = do
  inst <- InstanceM WorkflowInstance
forall r (m :: * -> *). MonadReader r m => m r
ask
  info <- readIORef inst.workflowInstanceInfo
  return ("[runId=" <> rawRunId info.runId <> "] " <> arg)


type SuspendableWorkflowExecution a = Coroutine (Await [ActivationResult]) InstanceM a


{- | Values that were blocking waiting for an activation, and have now
been unblocked. The worker feeds these into a suspended workflow
after converting workflow activation results.
This unblocks the relevant computations.
-}
data ActivationResult
  = forall a.
    ActivationResult
      (ResultVal a)
      !(IVar a)


-- How this works:
--
-- A workflow instance executes a `Workflow` computation on its own thread.
-- There is a cooperative interplay between the main instance thread
-- responsible for executing WorkflowActivations and the workflow thread.
--
-- The workflow thread evaluates computations as deeply as possible before
-- blocking and forcing the WorkfowActivation processing code to submit
-- a WorkflowActivationCompletion to the core worker.
--
-- This has a lot of similarities to ideas from Haxl's GenHaxl monad with
-- regards to suspending and resuming as we get results back, but is different
-- in that we want to treat execution of async tasks
-- (executeChildWorkflow, timers, etc.) as awaitable and cancellable.
-- That is, we return a handle similar to an Async value and
-- let the workflow writer choose if/when to suspend. It's also viable
-- to just start a workflow or activity for its side effects and ignore the result
--
-- Another difference is that workflow signals allow for these handles to be
-- altered out of band, so we can't just traverse the tree and trust the
-- computation flow to be fully within our control. I think this is fine,
-- but it does mean that we might need to update the scheduler code below
-- to allow injecting these signals into the run queue.
--
-- Regardless, once we signal a WorkflowActivationCompletion, we wait in a
-- suspended state.
--
-- The core worker will eventually activate the instance again, at which
-- point we use Sequence values in the variants of the different workflow
-- activation jobs to fill any corresponding handles with their
-- results and continue execution.
--                                                              ┌────────────────────┐
--                               Run a Workflow action that     │                    │
--                               fills an IVar.                 │                    │
--                                                              │      schedule      │
--                               Once we get the result of the  │                    │
--                               Workflow action, figure out if │                    │
--                               the full computation is        └────────────────────┘
--                               completed. If not, reschedule  ▲          │
--                               to execute more Workflow code. │          │
--                                                              │          ▼
--                                                              ┌────────────────────┐
--                                                              │                    │
--                                                              │                    │
--                                ┌────────────────────┐◀────── │     reschedule     │
--                                │                    │        │                    │
--                                │                    │        │                    │
--                  ┌─────────────│   emptyRunQueue    │───────▶└────────────────────┘
--                  │             │                    │
--                  │             │                    │
--                  │             └────────────────────┘◀────────────────────────────────┐
--                  │             ▲          │                                           │
--                  │             │          └───────┐                                   │
--                  │             │                  │                                   │
--                  ▼             │                  ▼                                   │
--     ┌────────────────────────┐ ┌────────────────────────────────────┐    ┌────────────────────────┐
--     │                        │ │                                    │    │                        │
--     │ checkActivationResults │ │  flushCommandsAndAwaitActivation   │───▶│ waitActivationResults  │
--     │                        │ │                                    │    │                        │
--     └────────────────────────┘ └────────────────────────────────────┘    └────────────────────────┘
-- Look at queued workflow         If we don't have any filled
-- computations. If we don't have  IVars after calling
-- any queued computations to      checkActivationResults, and we         The workflow activation logic
-- run, we need to see if there    also don't have any more               is fully in charge of
-- are any activation results      scheduled jobs, then we're             resolving pending IVars, so we
-- that we've received from        blocked. We have to flush what         just hang out until an
-- polling and used to fill IVars  we've got, then wait for an            activation is registered.
-- tracked in the sequence maps.   activation to come in to
--                                 unblock us.
-- If we fill an IVar, then we
-- add any blocked computations
-- to the scheduled jobs list so
-- we can resume working on them.
--
-- We hand this back to
-- emptyRunQueue.
runWorkflow :: forall a. HasCallStack => (RequireCallStackImpl => Workflow a) -> SuspendableWorkflowExecution a
runWorkflow :: forall a.
HasCallStack =>
(RequireCallStackImpl => Workflow a)
-> SuspendableWorkflowExecution a
runWorkflow RequireCallStackImpl => Workflow a
wf = (RequireCallStackImpl => SuspendableWorkflowExecution a)
-> SuspendableWorkflowExecution a
forall r. (RequireCallStackImpl => r) -> r
provideCallStack ((RequireCallStackImpl => SuspendableWorkflowExecution a)
 -> SuspendableWorkflowExecution a)
-> (RequireCallStackImpl => SuspendableWorkflowExecution a)
-> SuspendableWorkflowExecution a
forall a b. (a -> b) -> a -> b
$ do
  inst <- InstanceM WorkflowInstance
-> Coroutine (Await [ActivationResult]) InstanceM WorkflowInstance
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift InstanceM WorkflowInstance
forall r (m :: * -> *). MonadReader r m => m r
ask
  pendingActivations <- lift $ newTVarIO []
  finalResult@IVar {ivarRef = resultRef} <- newIVar -- where to put the final result
  let
    -- Run a job, and put its result in the given IVar
    schedule :: ContinuationEnv -> JobList -> Workflow b -> IVar b -> SuspendableWorkflowExecution ()
    schedule ContinuationEnv
env JobList
rq Workflow b
wf' ivar :: IVar b
ivar@IVar {ivarRef :: forall a. IVar a -> IORef (IVarContents a)
ivarRef = !IORef (IVarContents b)
ref} = do
      InstanceM () -> SuspendableWorkflowExecution ()
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (InstanceM () -> SuspendableWorkflowExecution ())
-> InstanceM () -> SuspendableWorkflowExecution ()
forall a b. (a -> b) -> a -> b
$ do
        cs <- IORef CallStack -> InstanceM CallStack
forall (m :: * -> *) a. MonadIO m => IORef a -> m a
readIORef WorkflowInstance
inst.workflowCallStack
        logMsg <- withRunId (Text.pack $ printf "schedule: %d\n%s" (1 + lengthJobList rq) $ prettyCallStack cs)
        $logDebug logMsg
      let {-# INLINE result #-}
          result :: ResultVal b -> SuspendableWorkflowExecution ()
result ResultVal b
r = do
            e <- InstanceM (IVarContents b)
-> Coroutine (Await [ActivationResult]) InstanceM (IVarContents b)
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (InstanceM (IVarContents b)
 -> Coroutine (Await [ActivationResult]) InstanceM (IVarContents b))
-> InstanceM (IVarContents b)
-> Coroutine (Await [ActivationResult]) InstanceM (IVarContents b)
forall a b. (a -> b) -> a -> b
$ IORef (IVarContents b) -> InstanceM (IVarContents b)
forall (m :: * -> *) a. MonadIO m => IORef a -> m a
readIORef IORef (IVarContents b)
ref
            case e of
              IVarFull ResultVal b
_ ->
                -- An IVar is typically only meant to be written to once
                -- so it's tempting to think we should throw an error here. But there
                -- are legitimate use-cases for writing several times– namely
                -- when using race, biselect, etc.
                ContinuationEnv -> JobList -> SuspendableWorkflowExecution ()
reschedule ContinuationEnv
env JobList
rq
              IVarEmpty JobList
workflowActions -> do
                InstanceM () -> SuspendableWorkflowExecution ()
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (InstanceM () -> SuspendableWorkflowExecution ())
-> InstanceM () -> SuspendableWorkflowExecution ()
forall a b. (a -> b) -> a -> b
$ IORef (IVarContents b) -> IVarContents b -> InstanceM ()
forall (m :: * -> *) a. MonadIO m => IORef a -> a -> m ()
writeIORef IORef (IVarContents b)
ref (ResultVal b -> IVarContents b
forall a. ResultVal a -> IVarContents a
IVarFull ResultVal b
r)
                -- Have we got the final result now?
                if IORef (IVarContents b)
ref IORef (IVarContents b) -> IORef (IVarContents b) -> Bool
forall a. Eq a => a -> a -> Bool
== IORef (IVarContents a) -> IORef (IVarContents b)
forall a b. a -> b
unsafeCoerce IORef (IVarContents a)
resultRef
                  then -- comparing IORefs of different types is safe, it's
                  -- pointer-equality on the MutVar#.

                  -- TODO, I don't know if there are any cases where we need
                  -- to worry about discarding unfinished computations, but
                  -- I think exiting at the conclusion of a workflow is the
                  -- right thing to do.
                    () -> SuspendableWorkflowExecution ()
forall a. a -> Coroutine (Await [ActivationResult]) InstanceM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                  else -- We have a result, but don't discard unfinished
                  -- computations in the run queue.
                  --
                  -- In our case, unfinished computations can represent signals.
                  --
                  -- Nothing can depend on the final IVar, so workflowActions must
                  -- be empty.
                  -- case rq of
                  --   JobNil -> return ()
                  --   _ -> modifyIORef' env.runQueueRef (appendJobList rq)
                    ContinuationEnv -> JobList -> SuspendableWorkflowExecution ()
reschedule ContinuationEnv
env (JobList -> SuspendableWorkflowExecution ())
-> JobList -> SuspendableWorkflowExecution ()
forall a b. (a -> b) -> a -> b
$ JobList -> JobList -> JobList
appendJobList JobList
workflowActions JobList
rq
      r <- InstanceM (Either SomeException (Result b))
-> Coroutine
     (Await [ActivationResult])
     InstanceM
     (Either SomeException (Result b))
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (InstanceM (Either SomeException (Result b))
 -> Coroutine
      (Await [ActivationResult])
      InstanceM
      (Either SomeException (Result b)))
-> InstanceM (Either SomeException (Result b))
-> Coroutine
     (Await [ActivationResult])
     InstanceM
     (Either SomeException (Result b))
forall a b. (a -> b) -> a -> b
$ InstanceM (Result b) -> InstanceM (Either SomeException (Result b))
forall (m :: * -> *) e a.
(MonadUnliftIO m, Exception e) =>
m a -> m (Either e a)
UnliftIO.try (InstanceM (Result b)
 -> InstanceM (Either SomeException (Result b)))
-> InstanceM (Result b)
-> InstanceM (Either SomeException (Result b))
forall a b. (a -> b) -> a -> b
$ do
        let (Workflow ContinuationEnv -> InstanceM (Result b)
run) = Workflow b
wf'
        ContinuationEnv -> InstanceM (Result b)
run ContinuationEnv
env
      case r of
        Left SomeException
e -> do
          SomeException -> SuspendableWorkflowExecution ()
forall (m :: * -> *). MonadIO m => SomeException -> m ()
rethrowAsyncExceptions SomeException
e
          ResultVal b -> SuspendableWorkflowExecution ()
result (ResultVal b -> SuspendableWorkflowExecution ())
-> ResultVal b -> SuspendableWorkflowExecution ()
forall a b. (a -> b) -> a -> b
$ SomeException -> ResultVal b
forall a. SomeException -> ResultVal a
ThrowInternal SomeException
e
        Right (Done b
a) -> ResultVal b -> SuspendableWorkflowExecution ()
result (ResultVal b -> SuspendableWorkflowExecution ())
-> ResultVal b -> SuspendableWorkflowExecution ()
forall a b. (a -> b) -> a -> b
$ b -> ResultVal b
forall a. a -> ResultVal a
Ok b
a
        Right (Throw SomeException
ex) -> ResultVal b -> SuspendableWorkflowExecution ()
result (ResultVal b -> SuspendableWorkflowExecution ())
-> ResultVal b -> SuspendableWorkflowExecution ()
forall a b. (a -> b) -> a -> b
$ SomeException -> ResultVal b
forall a. SomeException -> ResultVal a
ThrowWorkflow SomeException
ex
        Right (Blocked IVar b
i Cont b
fn) -> do
          InstanceM () -> SuspendableWorkflowExecution ()
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift ($Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> InstanceM ()
(Text -> InstanceM ()) -> (Text -> Text) -> Text -> InstanceM ()
forall a. a -> a
forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> InstanceM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
logDebug (Text -> InstanceM ()) -> InstanceM Text -> InstanceM ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Text -> InstanceM Text
withRunId Text
"scheduled job blocked")
          InstanceM () -> SuspendableWorkflowExecution ()
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (InstanceM () -> SuspendableWorkflowExecution ())
-> InstanceM () -> SuspendableWorkflowExecution ()
forall a b. (a -> b) -> a -> b
$ ContinuationEnv -> Workflow b -> IVar b -> IVar b -> InstanceM ()
forall b a.
ContinuationEnv -> Workflow b -> IVar b -> IVar a -> InstanceM ()
addJob ContinuationEnv
env (Cont b -> Workflow b
forall a. Cont a -> Workflow a
toWf Cont b
fn) IVar b
ivar IVar b
i
          ContinuationEnv -> JobList -> SuspendableWorkflowExecution ()
reschedule ContinuationEnv
env JobList
rq

    reschedule :: ContinuationEnv -> JobList -> SuspendableWorkflowExecution ()
    reschedule env :: ContinuationEnv
env@ContinuationEnv {IORef JobList
runQueueRef :: IORef JobList
runQueueRef :: ContinuationEnv -> IORef JobList
..} JobList
jobs = do
      InstanceM () -> SuspendableWorkflowExecution ()
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift ($Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> InstanceM ()
(Text -> InstanceM ()) -> (Text -> Text) -> Text -> InstanceM ()
forall a. a -> a
forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> InstanceM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
logDebug (Text -> InstanceM ()) -> InstanceM Text -> InstanceM ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Text -> InstanceM Text
withRunId Text
"reschedule")
      case JobList
jobs of
        JobList
JobNil -> do
          rq <- InstanceM JobList
-> Coroutine (Await [ActivationResult]) InstanceM JobList
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (InstanceM JobList
 -> Coroutine (Await [ActivationResult]) InstanceM JobList)
-> InstanceM JobList
-> Coroutine (Await [ActivationResult]) InstanceM JobList
forall a b. (a -> b) -> a -> b
$ IORef JobList -> InstanceM JobList
forall (m :: * -> *) a. MonadIO m => IORef a -> m a
readIORef IORef JobList
runQueueRef
          case rq of
            JobList
JobNil -> ContinuationEnv -> SuspendableWorkflowExecution ()
emptyRunQueue ContinuationEnv
env
            JobCons ContinuationEnv
env' Workflow a
a IVar a
b JobList
c -> do
              InstanceM () -> SuspendableWorkflowExecution ()
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (InstanceM () -> SuspendableWorkflowExecution ())
-> InstanceM () -> SuspendableWorkflowExecution ()
forall a b. (a -> b) -> a -> b
$ IORef JobList -> JobList -> InstanceM ()
forall (m :: * -> *) a. MonadIO m => IORef a -> a -> m ()
writeIORef IORef JobList
runQueueRef JobList
JobNil
              ContinuationEnv
-> JobList
-> Workflow a
-> IVar a
-> SuspendableWorkflowExecution ()
forall b.
ContinuationEnv
-> JobList
-> Workflow b
-> IVar b
-> SuspendableWorkflowExecution ()
schedule ContinuationEnv
env' JobList
c Workflow a
a IVar a
b
        JobCons ContinuationEnv
env' Workflow a
a IVar a
b JobList
c ->
          ContinuationEnv
-> JobList
-> Workflow a
-> IVar a
-> SuspendableWorkflowExecution ()
forall b.
ContinuationEnv
-> JobList
-> Workflow b
-> IVar b
-> SuspendableWorkflowExecution ()
schedule ContinuationEnv
env' JobList
c Workflow a
a IVar a
b

    emptyRunQueue :: ContinuationEnv -> SuspendableWorkflowExecution ()
    emptyRunQueue ContinuationEnv
env = do
      InstanceM () -> SuspendableWorkflowExecution ()
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (InstanceM () -> SuspendableWorkflowExecution ())
-> InstanceM () -> SuspendableWorkflowExecution ()
forall a b. (a -> b) -> a -> b
$ do
        logMsg <- Text -> InstanceM Text
withRunId Text
"emptyRunQueue"
        $logDebug logMsg
      workflowActions <- ContinuationEnv
-> Coroutine (Await [ActivationResult]) InstanceM JobList
checkActivationResults ContinuationEnv
env
      case workflowActions of
        JobList
JobNil -> ContinuationEnv -> SuspendableWorkflowExecution ()
awaitActivation ContinuationEnv
env
        JobList
_ -> ContinuationEnv -> JobList -> SuspendableWorkflowExecution ()
reschedule ContinuationEnv
env JobList
workflowActions

    awaitActivation :: ContinuationEnv -> SuspendableWorkflowExecution ()
    awaitActivation ContinuationEnv
env = do
      InstanceM () -> SuspendableWorkflowExecution ()
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift ($Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> InstanceM ()
(Text -> InstanceM ()) -> (Text -> Text) -> Text -> InstanceM ()
forall a. a -> a
forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> InstanceM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
logDebug (Text -> InstanceM ()) -> InstanceM Text -> InstanceM ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Text -> InstanceM Text
withRunId Text
"flushCommandsAndAwaitActivation")
      ContinuationEnv -> SuspendableWorkflowExecution ()
waitActivationResults ContinuationEnv
env

    checkActivationResults :: ContinuationEnv -> SuspendableWorkflowExecution JobList
    checkActivationResults ContinuationEnv
_env = InstanceM JobList
-> Coroutine (Await [ActivationResult]) InstanceM JobList
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (InstanceM JobList
 -> Coroutine (Await [ActivationResult]) InstanceM JobList)
-> InstanceM JobList
-> Coroutine (Await [ActivationResult]) InstanceM JobList
forall a b. (a -> b) -> a -> b
$ do
      $Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> InstanceM ()
(Text -> InstanceM ()) -> (Text -> Text) -> Text -> InstanceM ()
forall a. a -> a
forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> InstanceM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
logDebug (Text -> InstanceM ()) -> InstanceM Text -> InstanceM ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Text -> InstanceM Text
withRunId Text
"checkActivationResults"
      comps <- STM [ActivationResult] -> InstanceM [ActivationResult]
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM [ActivationResult] -> InstanceM [ActivationResult])
-> STM [ActivationResult] -> InstanceM [ActivationResult]
forall a b. (a -> b) -> a -> b
$ do
        c <- TVar [ActivationResult] -> STM [ActivationResult]
forall a. TVar a -> STM a
readTVar TVar [ActivationResult]
pendingActivations
        writeTVar pendingActivations []
        pure c
      case comps of
        [] -> do
          $Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> InstanceM ()
(Text -> InstanceM ()) -> (Text -> Text) -> Text -> InstanceM ()
forall a. a -> a
forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> InstanceM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
logDebug (Text -> InstanceM ()) -> InstanceM Text -> InstanceM ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Text -> InstanceM Text
withRunId Text
"No new activation results"
          JobList -> InstanceM JobList
forall a. a -> InstanceM a
forall (m :: * -> *) a. Monad m => a -> m a
return JobList
JobNil
        [ActivationResult]
_ -> do
          $Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> InstanceM ()
(Text -> InstanceM ()) -> (Text -> Text) -> Text -> InstanceM ()
forall a. a -> a
forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> InstanceM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
logDebug (Text -> InstanceM ()) -> InstanceM Text -> InstanceM ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Text -> InstanceM Text
withRunId (String -> Text
Text.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ String -> Int -> String
forall r. PrintfType r => String -> r
printf String
"%d complete" ([ActivationResult] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [ActivationResult]
comps))
          let
            getComplete :: ActivationResult -> InstanceM JobList
getComplete (ActivationResult ResultVal a
a IVar {ivarRef :: forall a. IVar a -> IORef (IVarContents a)
ivarRef = IORef (IVarContents a)
cr}) = do
              r <- IORef (IVarContents a) -> InstanceM (IVarContents a)
forall (m :: * -> *) a. MonadIO m => IORef a -> m a
readIORef IORef (IVarContents a)
cr
              case r of
                IVarFull ResultVal a
_ -> do
                  $Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> InstanceM ()
(Text -> InstanceM ()) -> (Text -> Text) -> Text -> InstanceM ()
forall a. a -> a
forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> InstanceM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
logDebug (Text -> InstanceM ()) -> InstanceM Text -> InstanceM ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Text -> InstanceM Text
withRunId Text
"existing result"
                  JobList -> InstanceM JobList
forall a. a -> InstanceM a
forall (m :: * -> *) a. Monad m => a -> m a
return JobList
JobNil
                -- this happens if a data source reports a result,
                -- and then throws an exception.  We call putResult
                -- a second time for the exception, which comes
                -- ahead of the original request (because it is
                -- pushed on the front of the completions list) and
                -- therefore overrides it.
                IVarEmpty JobList
cv -> do
                  IORef (IVarContents a) -> IVarContents a -> InstanceM ()
forall (m :: * -> *) a. MonadIO m => IORef a -> a -> m ()
writeIORef IORef (IVarContents a)
cr (ResultVal a -> IVarContents a
forall a. ResultVal a -> IVarContents a
IVarFull ResultVal a
a)
                  JobList -> InstanceM JobList
forall a. a -> InstanceM a
forall (m :: * -> *) a. Monad m => a -> m a
return JobList
cv
          jobs <- (ActivationResult -> InstanceM JobList)
-> [ActivationResult] -> InstanceM [JobList]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM ActivationResult -> InstanceM JobList
getComplete [ActivationResult]
comps
          return (foldr appendJobList JobNil jobs)

    waitActivationResults :: ContinuationEnv -> SuspendableWorkflowExecution ()
    waitActivationResults ContinuationEnv
env = do
      InstanceM () -> SuspendableWorkflowExecution ()
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift ($Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> InstanceM ()
(Text -> InstanceM ()) -> (Text -> Text) -> Text -> InstanceM ()
forall a. a -> a
forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> InstanceM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
logDebug (Text -> InstanceM ()) -> InstanceM Text -> InstanceM ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Text -> InstanceM Text
withRunId Text
"waitActivationResults")
      newActivations <- do
        activations <- InstanceM [ActivationResult]
-> Coroutine
     (Await [ActivationResult]) InstanceM [ActivationResult]
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (InstanceM [ActivationResult]
 -> Coroutine
      (Await [ActivationResult]) InstanceM [ActivationResult])
-> InstanceM [ActivationResult]
-> Coroutine
     (Await [ActivationResult]) InstanceM [ActivationResult]
forall a b. (a -> b) -> a -> b
$ TVar [ActivationResult] -> InstanceM [ActivationResult]
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar [ActivationResult]
pendingActivations
        if null activations
          then await
          else do
            lift $ atomically $ writeTVar pendingActivations []
            return activations
      atomically $ writeTVar pendingActivations newActivations
      jobs <- lift $ readIORef env.runQueueRef
      case jobs of
        JobList
JobNil -> do
          ContinuationEnv -> SuspendableWorkflowExecution ()
emptyRunQueue ContinuationEnv
env
        JobList
_ -> do
          InstanceM () -> SuspendableWorkflowExecution ()
forall (m :: * -> *) a.
Monad m =>
m a -> Coroutine (Await [ActivationResult]) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (InstanceM () -> SuspendableWorkflowExecution ())
-> InstanceM () -> SuspendableWorkflowExecution ()
forall a b. (a -> b) -> a -> b
$ IORef JobList -> JobList -> InstanceM ()
forall (m :: * -> *) a. MonadIO m => IORef a -> a -> m ()
writeIORef ContinuationEnv
env.runQueueRef JobList
JobNil
          ContinuationEnv -> JobList -> SuspendableWorkflowExecution ()
reschedule ContinuationEnv
env JobList
jobs

  let env = WorkflowInstance
inst.workflowInstanceContinuationEnv
  schedule env JobNil wf finalResult
  r <- readIORef resultRef
  case r of
    IVarEmpty JobList
_ -> String -> SuspendableWorkflowExecution a
forall a. HasCallStack => String -> a
error String
"runWorkflow: missing result"
    IVarFull (Ok a
a) -> a -> SuspendableWorkflowExecution a
forall a. a -> Coroutine (Await [ActivationResult]) InstanceM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
a
    IVarFull (ThrowWorkflow SomeException
e) -> SomeException -> SuspendableWorkflowExecution a
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO SomeException
e
    IVarFull (ThrowInternal SomeException
e) -> SomeException -> SuspendableWorkflowExecution a
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO SomeException
e


rethrowAsyncExceptions :: MonadIO m => SomeException -> m ()
rethrowAsyncExceptions :: forall (m :: * -> *). MonadIO m => SomeException -> m ()
rethrowAsyncExceptions SomeException
e
  | Just SomeAsyncException {} <- SomeException -> Maybe SomeAsyncException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e = SomeException -> m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
UnliftIO.throwIO SomeException
e
  | Bool
otherwise = () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()


-- experimental. should help ensure that signals blocking and resuming interop
-- properly with the main workflow execution.
injectWorkflowSignal :: Workflow () -> InstanceM ()
injectWorkflowSignal :: Workflow () -> InstanceM ()
injectWorkflowSignal Workflow ()
signal = do
  result <- InstanceM (IVar ())
forall (m :: * -> *) a. MonadIO m => m (IVar a)
newIVar
  inst <- ask
  let env@(ContinuationEnv jobList) = inst.workflowInstanceContinuationEnv
  modifyIORef' jobList $ \JobList
j -> ContinuationEnv -> Workflow () -> IVar () -> JobList -> JobList
forall a.
ContinuationEnv -> Workflow a -> IVar a -> JobList -> JobList
JobCons ContinuationEnv
env Workflow ()
signal IVar ()
result JobList
j