{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE TemplateHaskell #-}

module Temporal.Activity.Worker where

import Control.Exception.Annotated
import Control.Monad
import Control.Monad.Logger
import Control.Monad.Reader
import Data.Bifunctor
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as HashMap
import Data.ProtoLens
import Data.Text (Text)
import qualified Data.Text as T
import Lens.Family2
import qualified Proto.Temporal.Api.Common.V1.Message_Fields as P
import qualified Proto.Temporal.Api.Failure.V1.Message_Fields as F
import qualified Proto.Temporal.Sdk.Core.ActivityResult.ActivityResult_Fields as AR
import qualified Proto.Temporal.Sdk.Core.ActivityTask.ActivityTask as AT
import qualified Proto.Temporal.Sdk.Core.ActivityTask.ActivityTask_Fields as AT
import qualified Proto.Temporal.Sdk.Core.CoreInterface_Fields as C
import Temporal.Activity.Definition
import Temporal.Activity.Types
import Temporal.Common
import Temporal.Common.Async
import qualified Temporal.Core.Worker as Core
import Temporal.Duration (durationFromProto)
import Temporal.Exception
import Temporal.Interceptor
import Temporal.Payload
import UnliftIO
import UnliftIO.Concurrent (threadDelay)


data ActivityWorker env = ActivityWorker
  { forall env. ActivityWorker env -> IORef env
activityEnv :: {-# UNPACK #-} !(IORef env)
  , forall env.
ActivityWorker env -> HashMap Text (ActivityDefinition env)
definitions :: {-# UNPACK #-} !(HashMap Text (ActivityDefinition env))
  , forall env.
ActivityWorker env -> TVar (HashMap TaskToken (Async ()))
runningActivities :: {-# UNPACK #-} !(TVar (HashMap TaskToken (Async ())))
  , forall env. ActivityWorker env -> Worker 'Real
workerCore :: {-# UNPACK #-} !(Core.Worker 'Core.Real)
  , forall env. ActivityWorker env -> ActivityInboundInterceptor env
activityInboundInterceptors :: {-# UNPACK #-} !(ActivityInboundInterceptor env)
  , forall env. ActivityWorker env -> ActivityOutboundInterceptor env
activityOutboundInterceptors :: {-# UNPACK #-} !(ActivityOutboundInterceptor env)
  , forall env. ActivityWorker env -> ClientInterceptors
clientInterceptors :: {-# UNPACK #-} !ClientInterceptors
  , forall env. ActivityWorker env -> [ApplicationFailureHandler]
activityErrorConverters :: {-# UNPACK #-} ![ApplicationFailureHandler]
  , forall env. ActivityWorker env -> PayloadProcessor
payloadProcessor :: {-# UNPACK #-} !PayloadProcessor
  }


notifyShutdown :: MonadIO m => ActivityWorker env -> m ()
notifyShutdown :: forall (m :: * -> *) env. MonadIO m => ActivityWorker env -> m ()
notifyShutdown ActivityWorker env
worker = do
  let shutdownGracePeriod :: Int
shutdownGracePeriod = Word64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (WorkerConfig -> Word64
Core.gracefulShutdownPeriodMillis (WorkerConfig -> Word64) -> WorkerConfig -> Word64
forall a b. (a -> b) -> a -> b
$ Worker 'Real -> WorkerConfig
forall (ty :: WorkerType). Worker ty -> WorkerConfig
Core.getWorkerConfig ActivityWorker env
worker.workerCore)
  -- TODO logging here
  Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
shutdownGracePeriod Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    Int -> m ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay (Int
shutdownGracePeriod Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000)
  running <- TVar (HashMap TaskToken (Async ()))
-> m (HashMap TaskToken (Async ()))
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO ActivityWorker env
worker.runningActivities
  forM_ running $ \Async ()
thread -> Async () -> ActivityCancelReason -> m ()
forall e (m :: * -> *) a.
(Exception e, MonadIO m) =>
Async a -> e -> m ()
cancelWith Async ()
thread ActivityCancelReason
WorkerShutdown


newtype ActivityWorkerM env m a = ActivityWorkerM {forall env (m :: * -> *) a.
ActivityWorkerM env m a -> ReaderT (ActivityWorker env) m a
unActivityWorkerM :: ReaderT (ActivityWorker env) m a}
  deriving newtype
    ( (forall a b.
 (a -> b) -> ActivityWorkerM env m a -> ActivityWorkerM env m b)
-> (forall a b.
    a -> ActivityWorkerM env m b -> ActivityWorkerM env m a)
-> Functor (ActivityWorkerM env m)
forall a b. a -> ActivityWorkerM env m b -> ActivityWorkerM env m a
forall a b.
(a -> b) -> ActivityWorkerM env m a -> ActivityWorkerM env m b
forall env (m :: * -> *) a b.
Functor m =>
a -> ActivityWorkerM env m b -> ActivityWorkerM env m a
forall env (m :: * -> *) a b.
Functor m =>
(a -> b) -> ActivityWorkerM env m a -> ActivityWorkerM env m b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
$cfmap :: forall env (m :: * -> *) a b.
Functor m =>
(a -> b) -> ActivityWorkerM env m a -> ActivityWorkerM env m b
fmap :: forall a b.
(a -> b) -> ActivityWorkerM env m a -> ActivityWorkerM env m b
$c<$ :: forall env (m :: * -> *) a b.
Functor m =>
a -> ActivityWorkerM env m b -> ActivityWorkerM env m a
<$ :: forall a b. a -> ActivityWorkerM env m b -> ActivityWorkerM env m a
Functor
    , Functor (ActivityWorkerM env m)
Functor (ActivityWorkerM env m) =>
(forall a. a -> ActivityWorkerM env m a)
-> (forall a b.
    ActivityWorkerM env m (a -> b)
    -> ActivityWorkerM env m a -> ActivityWorkerM env m b)
-> (forall a b c.
    (a -> b -> c)
    -> ActivityWorkerM env m a
    -> ActivityWorkerM env m b
    -> ActivityWorkerM env m c)
-> (forall a b.
    ActivityWorkerM env m a
    -> ActivityWorkerM env m b -> ActivityWorkerM env m b)
-> (forall a b.
    ActivityWorkerM env m a
    -> ActivityWorkerM env m b -> ActivityWorkerM env m a)
-> Applicative (ActivityWorkerM env m)
forall a. a -> ActivityWorkerM env m a
forall a b.
ActivityWorkerM env m a
-> ActivityWorkerM env m b -> ActivityWorkerM env m a
forall a b.
ActivityWorkerM env m a
-> ActivityWorkerM env m b -> ActivityWorkerM env m b
forall a b.
ActivityWorkerM env m (a -> b)
-> ActivityWorkerM env m a -> ActivityWorkerM env m b
forall a b c.
(a -> b -> c)
-> ActivityWorkerM env m a
-> ActivityWorkerM env m b
-> ActivityWorkerM env m c
forall env (m :: * -> *).
Applicative m =>
Functor (ActivityWorkerM env m)
forall env (m :: * -> *) a.
Applicative m =>
a -> ActivityWorkerM env m a
forall env (m :: * -> *) a b.
Applicative m =>
ActivityWorkerM env m a
-> ActivityWorkerM env m b -> ActivityWorkerM env m a
forall env (m :: * -> *) a b.
Applicative m =>
ActivityWorkerM env m a
-> ActivityWorkerM env m b -> ActivityWorkerM env m b
forall env (m :: * -> *) a b.
Applicative m =>
ActivityWorkerM env m (a -> b)
-> ActivityWorkerM env m a -> ActivityWorkerM env m b
forall env (m :: * -> *) a b c.
Applicative m =>
(a -> b -> c)
-> ActivityWorkerM env m a
-> ActivityWorkerM env m b
-> ActivityWorkerM env m c
forall (f :: * -> *).
Functor f =>
(forall a. a -> f a)
-> (forall a b. f (a -> b) -> f a -> f b)
-> (forall a b c. (a -> b -> c) -> f a -> f b -> f c)
-> (forall a b. f a -> f b -> f b)
-> (forall a b. f a -> f b -> f a)
-> Applicative f
$cpure :: forall env (m :: * -> *) a.
Applicative m =>
a -> ActivityWorkerM env m a
pure :: forall a. a -> ActivityWorkerM env m a
$c<*> :: forall env (m :: * -> *) a b.
Applicative m =>
ActivityWorkerM env m (a -> b)
-> ActivityWorkerM env m a -> ActivityWorkerM env m b
<*> :: forall a b.
ActivityWorkerM env m (a -> b)
-> ActivityWorkerM env m a -> ActivityWorkerM env m b
$cliftA2 :: forall env (m :: * -> *) a b c.
Applicative m =>
(a -> b -> c)
-> ActivityWorkerM env m a
-> ActivityWorkerM env m b
-> ActivityWorkerM env m c
liftA2 :: forall a b c.
(a -> b -> c)
-> ActivityWorkerM env m a
-> ActivityWorkerM env m b
-> ActivityWorkerM env m c
$c*> :: forall env (m :: * -> *) a b.
Applicative m =>
ActivityWorkerM env m a
-> ActivityWorkerM env m b -> ActivityWorkerM env m b
*> :: forall a b.
ActivityWorkerM env m a
-> ActivityWorkerM env m b -> ActivityWorkerM env m b
$c<* :: forall env (m :: * -> *) a b.
Applicative m =>
ActivityWorkerM env m a
-> ActivityWorkerM env m b -> ActivityWorkerM env m a
<* :: forall a b.
ActivityWorkerM env m a
-> ActivityWorkerM env m b -> ActivityWorkerM env m a
Applicative
    , Applicative (ActivityWorkerM env m)
Applicative (ActivityWorkerM env m) =>
(forall a b.
 ActivityWorkerM env m a
 -> (a -> ActivityWorkerM env m b) -> ActivityWorkerM env m b)
-> (forall a b.
    ActivityWorkerM env m a
    -> ActivityWorkerM env m b -> ActivityWorkerM env m b)
-> (forall a. a -> ActivityWorkerM env m a)
-> Monad (ActivityWorkerM env m)
forall a. a -> ActivityWorkerM env m a
forall a b.
ActivityWorkerM env m a
-> ActivityWorkerM env m b -> ActivityWorkerM env m b
forall a b.
ActivityWorkerM env m a
-> (a -> ActivityWorkerM env m b) -> ActivityWorkerM env m b
forall env (m :: * -> *).
Monad m =>
Applicative (ActivityWorkerM env m)
forall env (m :: * -> *) a. Monad m => a -> ActivityWorkerM env m a
forall env (m :: * -> *) a b.
Monad m =>
ActivityWorkerM env m a
-> ActivityWorkerM env m b -> ActivityWorkerM env m b
forall env (m :: * -> *) a b.
Monad m =>
ActivityWorkerM env m a
-> (a -> ActivityWorkerM env m b) -> ActivityWorkerM env m b
forall (m :: * -> *).
Applicative m =>
(forall a b. m a -> (a -> m b) -> m b)
-> (forall a b. m a -> m b -> m b)
-> (forall a. a -> m a)
-> Monad m
$c>>= :: forall env (m :: * -> *) a b.
Monad m =>
ActivityWorkerM env m a
-> (a -> ActivityWorkerM env m b) -> ActivityWorkerM env m b
>>= :: forall a b.
ActivityWorkerM env m a
-> (a -> ActivityWorkerM env m b) -> ActivityWorkerM env m b
$c>> :: forall env (m :: * -> *) a b.
Monad m =>
ActivityWorkerM env m a
-> ActivityWorkerM env m b -> ActivityWorkerM env m b
>> :: forall a b.
ActivityWorkerM env m a
-> ActivityWorkerM env m b -> ActivityWorkerM env m b
$creturn :: forall env (m :: * -> *) a. Monad m => a -> ActivityWorkerM env m a
return :: forall a. a -> ActivityWorkerM env m a
Monad
    , Monad (ActivityWorkerM env m)
Monad (ActivityWorkerM env m) =>
(forall a. IO a -> ActivityWorkerM env m a)
-> MonadIO (ActivityWorkerM env m)
forall a. IO a -> ActivityWorkerM env m a
forall env (m :: * -> *).
MonadIO m =>
Monad (ActivityWorkerM env m)
forall env (m :: * -> *) a.
MonadIO m =>
IO a -> ActivityWorkerM env m a
forall (m :: * -> *).
Monad m =>
(forall a. IO a -> m a) -> MonadIO m
$cliftIO :: forall env (m :: * -> *) a.
MonadIO m =>
IO a -> ActivityWorkerM env m a
liftIO :: forall a. IO a -> ActivityWorkerM env m a
MonadIO
    , MonadReader (ActivityWorker env)
    , MonadIO (ActivityWorkerM env m)
MonadIO (ActivityWorkerM env m) =>
(forall b.
 ((forall a. ActivityWorkerM env m a -> IO a) -> IO b)
 -> ActivityWorkerM env m b)
-> MonadUnliftIO (ActivityWorkerM env m)
forall b.
((forall a. ActivityWorkerM env m a -> IO a) -> IO b)
-> ActivityWorkerM env m b
forall env (m :: * -> *).
MonadUnliftIO m =>
MonadIO (ActivityWorkerM env m)
forall env (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. ActivityWorkerM env m a -> IO a) -> IO b)
-> ActivityWorkerM env m b
forall (m :: * -> *).
MonadIO m =>
(forall b. ((forall a. m a -> IO a) -> IO b) -> m b)
-> MonadUnliftIO m
$cwithRunInIO :: forall env (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. ActivityWorkerM env m a -> IO a) -> IO b)
-> ActivityWorkerM env m b
withRunInIO :: forall b.
((forall a. ActivityWorkerM env m a -> IO a) -> IO b)
-> ActivityWorkerM env m b
MonadUnliftIO
    , Monad (ActivityWorkerM env m)
Monad (ActivityWorkerM env m) =>
(forall msg.
 ToLogStr msg =>
 Loc -> Text -> LogLevel -> msg -> ActivityWorkerM env m ())
-> MonadLogger (ActivityWorkerM env m)
forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> ActivityWorkerM env m ()
forall env (m :: * -> *).
MonadLogger m =>
Monad (ActivityWorkerM env m)
forall env (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> ActivityWorkerM env m ()
forall (m :: * -> *).
Monad m =>
(forall msg.
 ToLogStr msg =>
 Loc -> Text -> LogLevel -> msg -> m ())
-> MonadLogger m
$cmonadLoggerLog :: forall env (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> ActivityWorkerM env m ()
monadLoggerLog :: forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> ActivityWorkerM env m ()
MonadLogger
    , MonadIO (ActivityWorkerM env m)
MonadLogger (ActivityWorkerM env m)
ActivityWorkerM env m (Loc -> Text -> LogLevel -> LogStr -> IO ())
(MonadLogger (ActivityWorkerM env m),
 MonadIO (ActivityWorkerM env m)) =>
ActivityWorkerM env m (Loc -> Text -> LogLevel -> LogStr -> IO ())
-> MonadLoggerIO (ActivityWorkerM env m)
forall env (m :: * -> *).
MonadLoggerIO m =>
MonadIO (ActivityWorkerM env m)
forall env (m :: * -> *).
MonadLoggerIO m =>
MonadLogger (ActivityWorkerM env m)
forall env (m :: * -> *).
MonadLoggerIO m =>
ActivityWorkerM env m (Loc -> Text -> LogLevel -> LogStr -> IO ())
forall (m :: * -> *).
(MonadLogger m, MonadIO m) =>
m (Loc -> Text -> LogLevel -> LogStr -> IO ()) -> MonadLoggerIO m
$caskLoggerIO :: forall env (m :: * -> *).
MonadLoggerIO m =>
ActivityWorkerM env m (Loc -> Text -> LogLevel -> LogStr -> IO ())
askLoggerIO :: ActivityWorkerM env m (Loc -> Text -> LogLevel -> LogStr -> IO ())
MonadLoggerIO
    )


runActivityWorker :: (MonadUnliftIO m, MonadLogger m) => ActivityWorker env -> ActivityWorkerM env m a -> m a
runActivityWorker :: forall (m :: * -> *) env a.
(MonadUnliftIO m, MonadLogger m) =>
ActivityWorker env -> ActivityWorkerM env m a -> m a
runActivityWorker ActivityWorker env
w ActivityWorkerM env m a
m = ReaderT (ActivityWorker env) m a -> ActivityWorker env -> m a
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (ActivityWorkerM env m a -> ReaderT (ActivityWorker env) m a
forall env (m :: * -> *) a.
ActivityWorkerM env m a -> ReaderT (ActivityWorker env) m a
unActivityWorkerM ActivityWorkerM env m a
m) ActivityWorker env
w


execute :: (MonadUnliftIO m, MonadLogger m) => ActivityWorker actEnv -> m ()
execute :: forall (m :: * -> *) actEnv.
(MonadUnliftIO m, MonadLogger m) =>
ActivityWorker actEnv -> m ()
execute ActivityWorker actEnv
worker = ActivityWorker actEnv -> ActivityWorkerM actEnv m () -> m ()
forall (m :: * -> *) env a.
(MonadUnliftIO m, MonadLogger m) =>
ActivityWorker env -> ActivityWorkerM env m a -> m a
runActivityWorker ActivityWorker actEnv
worker ActivityWorkerM actEnv m ()
go
  where
    go :: ActivityWorkerM actEnv m ()
go = do
      eTask <- IO (Either WorkerError ActivityTask)
-> ActivityWorkerM actEnv m (Either WorkerError ActivityTask)
forall a. IO a -> ActivityWorkerM actEnv m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either WorkerError ActivityTask)
 -> ActivityWorkerM actEnv m (Either WorkerError ActivityTask))
-> IO (Either WorkerError ActivityTask)
-> ActivityWorkerM actEnv m (Either WorkerError ActivityTask)
forall a b. (a -> b) -> a -> b
$ Worker 'Real -> IO (Either WorkerError ActivityTask)
forall (ty :: WorkerType).
KnownWorkerType ty =>
Worker ty -> IO (Either WorkerError ActivityTask)
Core.pollActivityTask ActivityWorker actEnv
worker.workerCore
      case eTask of
        Left (Core.WorkerError WorkerErrorCode
Core.PollShutdown Text
_) -> do
          () -> ActivityWorkerM actEnv m ()
forall a. a -> ActivityWorkerM actEnv m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        Left WorkerError
e -> do
          $Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> ActivityWorkerM actEnv m ()
(Text -> ActivityWorkerM actEnv m ())
-> (Text -> Text) -> Text -> ActivityWorkerM actEnv m ()
forall a. a -> a
forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> ActivityWorkerM actEnv m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
pack :: String -> Text
logError (String -> Text
T.pack (WorkerError -> String
forall a. Show a => a -> String
show WorkerError
e))
          WorkerError -> ActivityWorkerM actEnv m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO WorkerError
e
        Right ActivityTask
task -> do
          ActivityTask -> ActivityWorkerM actEnv m ()
forall (m :: * -> *) actEnv.
(MonadUnliftIO m, MonadLogger m) =>
ActivityTask -> ActivityWorkerM actEnv m ()
applyActivityTask ActivityTask
task
          ActivityWorkerM actEnv m ()
go


activityInfoFromProto :: MonadIO m => TaskToken -> AT.Start -> ActivityWorkerM actEnv m ActivityInfo
activityInfoFromProto :: forall (m :: * -> *) actEnv.
MonadIO m =>
TaskToken -> Start -> ActivityWorkerM actEnv m ActivityInfo
activityInfoFromProto TaskToken
tt Start
msg = do
  w <- ActivityWorkerM actEnv m (ActivityWorker actEnv)
forall r (m :: * -> *). MonadReader r m => m r
ask
  hdrs <- processorDecodePayloads w.payloadProcessor (fmap convertFromProtoPayload (msg ^. AT.headerFields))
  heartbeats <- processorDecodePayloads w.payloadProcessor (fmap convertFromProtoPayload (msg ^. AT.vec'heartbeatDetails))
  pure $
    ActivityInfo
      { workflowNamespace = Namespace $ msg ^. AT.workflowNamespace
      , workflowType = WorkflowType $ msg ^. AT.workflowType
      , workflowId = WorkflowId $ msg ^. AT.workflowExecution . P.workflowId
      , runId = RunId $ msg ^. AT.workflowExecution . P.runId
      , activityId = ActivityId $ msg ^. AT.activityId
      , activityType = msg ^. AT.activityType
      , headerFields = hdrs
      , heartbeatDetails = heartbeats
      , scheduledTime = msg ^. AT.scheduledTime . to timespecFromTimestamp
      , currentAttemptScheduledTime = msg ^. AT.currentAttemptScheduledTime . to timespecFromTimestamp
      , startedTime = msg ^. AT.startedTime . to timespecFromTimestamp
      , attempt = msg ^. AT.attempt
      , scheduleToCloseTimeout = fmap durationFromProto (msg ^. AT.maybe'scheduleToCloseTimeout)
      , startToCloseTimeout = fmap durationFromProto (msg ^. AT.maybe'startToCloseTimeout)
      , heartbeatTimeout = fmap durationFromProto (msg ^. AT.maybe'heartbeatTimeout)
      , retryPolicy = fmap retryPolicyFromProto (msg ^. AT.maybe'retryPolicy)
      , isLocal = msg ^. AT.isLocal
      , taskToken = tt
      }


{- | Signal to the Temporal worker that the activity will be completed asynchronously (out of band).

In order to complete the activity once it has been moved to async, use 'Temporal.Client.AsyncActivity.complete', 'Temporal.Client.AsyncActivity.fail', or 'Temporal.Client.AsyncActivity.reportCancellation'.

Note: Under the hood, this throws a 'CompleteAsync' exception, which is caught and handled by the Temporal worker.

Make sure that your own code does not swallow or rewrap this exception, otherwise the activity will fail instead
of signalling that it will be completed asynchronously.
-}
completeAsync :: MonadIO m => m ()
completeAsync :: forall (m :: * -> *). MonadIO m => m ()
completeAsync = CompleteAsync -> m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO CompleteAsync
CompleteAsync


applyActivityTask :: (MonadUnliftIO m, MonadLogger m) => AT.ActivityTask -> ActivityWorkerM actEnv m ()
applyActivityTask :: forall (m :: * -> *) actEnv.
(MonadUnliftIO m, MonadLogger m) =>
ActivityTask -> ActivityWorkerM actEnv m ()
applyActivityTask ActivityTask
task = case ActivityTask
task ActivityTask
-> FoldLike
     (Maybe ActivityTask'Variant)
     ActivityTask
     ActivityTask
     (Maybe ActivityTask'Variant)
     (Maybe ActivityTask'Variant)
-> Maybe ActivityTask'Variant
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
  (Maybe ActivityTask'Variant)
  ActivityTask
  ActivityTask
  (Maybe ActivityTask'Variant)
  (Maybe ActivityTask'Variant)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'variant" a) =>
LensLike' f s a
AT.maybe'variant of
  Maybe ActivityTask'Variant
Nothing -> RuntimeError -> ActivityWorkerM actEnv m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (RuntimeError -> ActivityWorkerM actEnv m ())
-> RuntimeError -> ActivityWorkerM actEnv m ()
forall a b. (a -> b) -> a -> b
$ String -> RuntimeError
RuntimeError String
"Activity task has no variant or an unknown variant"
  Just (AT.ActivityTask'Start Start
msg) -> ActivityTask -> TaskToken -> Start -> ActivityWorkerM actEnv m ()
forall (m :: * -> *) actEnv.
(MonadUnliftIO m, MonadLogger m) =>
ActivityTask -> TaskToken -> Start -> ActivityWorkerM actEnv m ()
applyActivityTaskStart ActivityTask
task TaskToken
tt Start
msg
  Just (AT.ActivityTask'Cancel Cancel
msg) -> TaskToken -> Cancel -> ActivityWorkerM actEnv m ()
forall (m :: * -> *) actEnv.
(MonadUnliftIO m, MonadLogger m) =>
TaskToken -> Cancel -> ActivityWorkerM actEnv m ()
applyActivityTaskCancel TaskToken
tt Cancel
msg
  where
    tt :: TaskToken
tt = ByteString -> TaskToken
TaskToken (ByteString -> TaskToken) -> ByteString -> TaskToken
forall a b. (a -> b) -> a -> b
$ ActivityTask
task ActivityTask
-> FoldLike
     ByteString ActivityTask ActivityTask ByteString ByteString
-> ByteString
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike ByteString ActivityTask ActivityTask ByteString ByteString
forall (f :: * -> *) s a.
(Functor f, HasField s "taskToken" a) =>
LensLike' f s a
AT.taskToken


requireActivityNotRunning :: MonadUnliftIO m => TaskToken -> ActivityWorkerM actEnv m () -> ActivityWorkerM actEnv m ()
requireActivityNotRunning :: forall (m :: * -> *) actEnv.
MonadUnliftIO m =>
TaskToken
-> ActivityWorkerM actEnv m () -> ActivityWorkerM actEnv m ()
requireActivityNotRunning TaskToken
tt ActivityWorkerM actEnv m ()
m = do
  w <- ActivityWorkerM actEnv m (ActivityWorker actEnv)
forall r (m :: * -> *). MonadReader r m => m r
ask
  running <- readTVarIO w.runningActivities
  case HashMap.lookup tt running of
    Just Async ()
_ -> RuntimeError -> ActivityWorkerM actEnv m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (RuntimeError -> ActivityWorkerM actEnv m ())
-> RuntimeError -> ActivityWorkerM actEnv m ()
forall a b. (a -> b) -> a -> b
$ String -> RuntimeError
RuntimeError String
"Activity task already running"
    Maybe (Async ())
Nothing -> ActivityWorkerM actEnv m ()
m


-- TODO, where should async exception masking happen?
applyActivityTaskStart :: (MonadUnliftIO m, MonadLogger m) => AT.ActivityTask -> TaskToken -> AT.Start -> ActivityWorkerM actEnv m ()
applyActivityTaskStart :: forall (m :: * -> *) actEnv.
(MonadUnliftIO m, MonadLogger m) =>
ActivityTask -> TaskToken -> Start -> ActivityWorkerM actEnv m ()
applyActivityTaskStart ActivityTask
tsk TaskToken
tt Start
msg = do
  w <- ActivityWorkerM actEnv m (ActivityWorker actEnv)
forall r (m :: * -> *). MonadReader r m => m r
ask
  $logDebug $ "Starting activity: " <> T.pack (show tsk)
  requireActivityNotRunning tt $ do
    info <- activityInfoFromProto tt msg
    args <- processorDecodePayloads w.payloadProcessor (fmap convertFromProtoPayload (msg ^. AT.vec'input))
    env <- readIORef w.activityEnv
    let
      actEnv = Worker 'Real
-> ActivityInfo
-> ClientInterceptors
-> PayloadProcessor
-> actEnv
-> ActivityEnv actEnv
forall env.
Worker 'Real
-> ActivityInfo
-> ClientInterceptors
-> PayloadProcessor
-> env
-> ActivityEnv env
ActivityEnv ActivityWorker actEnv
w.workerCore ActivityInfo
info ActivityWorker actEnv
w.clientInterceptors ActivityWorker actEnv
w.payloadProcessor
      input =
        Vector Payload
-> Map Text Payload -> ActivityInfo -> ExecuteActivityInput
ExecuteActivityInput
          Vector Payload
args
          ActivityInfo
info.headerFields
          ActivityInfo
info
    -- We mask here to ensure that the activity is definitely registered
    -- before we start running it. This is important because we need to be able to cancel
    -- it later if the orchestrator requests it.
    mask_ $ do
      syncPoint <- newEmptyMVar
      let c = Worker 'Real -> WorkerConfig
forall (ty :: WorkerType). Worker ty -> WorkerConfig
Core.getWorkerConfig ActivityWorker actEnv
w.workerCore
      runningActivity <- asyncLabelled (T.unpack $ T.concat ["temporal/worker/activity/start/", Core.namespace c, "/", Core.taskQueue c]) $ do
        (ef :: Either SomeException (Either String Payload)) <- liftIO $ UnliftIO.trySyncOrAsync $ do
          w.activityInboundInterceptors.executeActivity env input $ \actEnv
env' ExecuteActivityInput
input' -> do
            case Text
-> HashMap Text (ActivityDefinition actEnv)
-> Maybe (ActivityDefinition actEnv)
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup ActivityInfo
info.activityType ActivityWorker actEnv
w.definitions of
              Maybe (ActivityDefinition actEnv)
Nothing -> RuntimeError -> IO (Either String Payload)
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (RuntimeError -> IO (Either String Payload))
-> RuntimeError -> IO (Either String Payload)
forall a b. (a -> b) -> a -> b
$ String -> RuntimeError
RuntimeError (String
"Activity type not found: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Text -> String
T.unpack ActivityInfo
info.activityType)
              Just ActivityDefinition {Text
ActivityEnv actEnv
-> ExecuteActivityInput -> IO (Either String Payload)
activityName :: Text
activityRun :: ActivityEnv actEnv
-> ExecuteActivityInput -> IO (Either String Payload)
activityRun :: forall env.
ActivityDefinition env
-> ActivityEnv env
-> ExecuteActivityInput
-> IO (Either String Payload)
activityName :: forall env. ActivityDefinition env -> Text
..} ->
                ActivityEnv actEnv
-> ExecuteActivityInput -> IO (Either String Payload)
activityRun (actEnv -> ActivityEnv actEnv
actEnv actEnv
env') ExecuteActivityInput
input'
                  IO (Either String Payload) -> IO () -> IO (Either String Payload)
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m a
`finally` (MVar () -> IO ()
forall (m :: * -> *) a. MonadIO m => MVar a -> m a
takeMVar MVar ()
syncPoint IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TVar (HashMap TaskToken (Async ()))
-> (HashMap TaskToken (Async ()) -> HashMap TaskToken (Async ()))
-> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' ActivityWorker actEnv
w.runningActivities (TaskToken
-> HashMap TaskToken (Async ()) -> HashMap TaskToken (Async ())
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> HashMap k v
HashMap.delete TaskToken
tt)))
        completionMsg <- case ef >>= first (toException . ValueError) of
          Left err :: SomeException
err@(SomeException e
_wrappedErr) -> do
            $Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> ActivityWorkerM actEnv m ()
(Text -> ActivityWorkerM actEnv m ())
-> (Text -> Text) -> Text -> ActivityWorkerM actEnv m ()
forall a. a -> a
forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> ActivityWorkerM actEnv m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
pack :: String -> Text
logDebug (String -> Text
T.pack (SomeException -> String
forall a. Show a => a -> String
show SomeException
err))
            let appFailure :: ApplicationFailure
appFailure = SomeException -> [ApplicationFailureHandler] -> ApplicationFailure
mkApplicationFailure SomeException
err ActivityWorker actEnv
w.activityErrorConverters
                enrichedApplicationFailure :: Failure
enrichedApplicationFailure = ApplicationFailure -> Failure
applicationFailureToFailureProto ApplicationFailure
appFailure
            ActivityTaskCompletion
-> ActivityWorkerM actEnv m ActivityTaskCompletion
forall a. a -> ActivityWorkerM actEnv m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ActivityTaskCompletion
 -> ActivityWorkerM actEnv m ActivityTaskCompletion)
-> ActivityTaskCompletion
-> ActivityWorkerM actEnv m ActivityTaskCompletion
forall a b. (a -> b) -> a -> b
$
              ActivityTaskCompletion
forall msg. Message msg => msg
defMessage
                ActivityTaskCompletion
-> (ActivityTaskCompletion -> ActivityTaskCompletion)
-> ActivityTaskCompletion
forall s t. s -> (s -> t) -> t
& LensLike' f ActivityTaskCompletion ByteString
forall {f :: * -> *}.
Identical f =>
LensLike' f ActivityTaskCompletion ByteString
forall (f :: * -> *) s a.
(Functor f, HasField s "taskToken" a) =>
LensLike' f s a
C.taskToken (forall {f :: * -> *}.
 Identical f =>
 LensLike' f ActivityTaskCompletion ByteString)
-> ByteString -> ActivityTaskCompletion -> ActivityTaskCompletion
forall s t a b. Setter s t a b -> b -> s -> t
.~ TaskToken -> ByteString
rawTaskToken TaskToken
tt
                ActivityTaskCompletion
-> (ActivityTaskCompletion -> ActivityTaskCompletion)
-> ActivityTaskCompletion
forall s t. s -> (s -> t) -> t
& LensLike' f ActivityTaskCompletion ActivityExecutionResult
forall {f :: * -> *}.
Identical f =>
LensLike' f ActivityTaskCompletion ActivityExecutionResult
forall (f :: * -> *) s a.
(Functor f, HasField s "result" a) =>
LensLike' f s a
C.result (forall {f :: * -> *}.
 Identical f =>
 LensLike' f ActivityTaskCompletion ActivityExecutionResult)
-> ActivityExecutionResult
-> ActivityTaskCompletion
-> ActivityTaskCompletion
forall s t a b. Setter s t a b -> b -> s -> t
.~ case SomeException -> Maybe ActivityCancelReason
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
err of
                  Just (ActivityCancelReason
_cancelled :: ActivityCancelReason) ->
                    ActivityExecutionResult
forall msg. Message msg => msg
defMessage
                      ActivityExecutionResult
-> (ActivityExecutionResult -> ActivityExecutionResult)
-> ActivityExecutionResult
forall s t. s -> (s -> t) -> t
& LensLike' f ActivityExecutionResult Cancellation
forall {f :: * -> *}.
Identical f =>
LensLike' f ActivityExecutionResult Cancellation
forall (f :: * -> *) s a.
(Functor f, HasField s "cancelled" a) =>
LensLike' f s a
AR.cancelled
                        (forall {f :: * -> *}.
 Identical f =>
 LensLike' f ActivityExecutionResult Cancellation)
-> Cancellation
-> ActivityExecutionResult
-> ActivityExecutionResult
forall s t a b. Setter s t a b -> b -> s -> t
.~ ( Cancellation
forall msg. Message msg => msg
defMessage
                              Cancellation -> (Cancellation -> Cancellation) -> Cancellation
forall s t. s -> (s -> t) -> t
& LensLike' f Cancellation Failure
forall {f :: * -> *}.
Identical f =>
LensLike' f Cancellation Failure
forall (f :: * -> *) s a.
(Functor f, HasField s "failure" a) =>
LensLike' f s a
AR.failure
                                (forall {f :: * -> *}.
 Identical f =>
 LensLike' f Cancellation Failure)
-> Failure -> Cancellation -> Cancellation
forall s t a b. Setter s t a b -> b -> s -> t
.~ ( Failure
forall msg. Message msg => msg
defMessage
                                      Failure -> (Failure -> Failure) -> Failure
forall s t. s -> (s -> t) -> t
& LensLike' f Failure Text
forall {f :: * -> *}. Identical f => LensLike' f Failure Text
forall (f :: * -> *) s a.
(Functor f, HasField s "message" a) =>
LensLike' f s a
F.message (forall {f :: * -> *}. Identical f => LensLike' f Failure Text)
-> Text -> Failure -> Failure
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
"Activity cancelled"
                                      Failure -> (Failure -> Failure) -> Failure
forall s t. s -> (s -> t) -> t
& LensLike' f Failure CanceledFailureInfo
forall {f :: * -> *}.
Identical f =>
LensLike' f Failure CanceledFailureInfo
forall (f :: * -> *) s a.
(Functor f, HasField s "canceledFailureInfo" a) =>
LensLike' f s a
F.canceledFailureInfo
                                        (forall {f :: * -> *}.
 Identical f =>
 LensLike' f Failure CanceledFailureInfo)
-> CanceledFailureInfo -> Failure -> Failure
forall s t a b. Setter s t a b -> b -> s -> t
.~ ( CanceledFailureInfo
forall msg. Message msg => msg
defMessage
                                              -- FIXME: provide some details if we have them
                                              CanceledFailureInfo
-> (CanceledFailureInfo -> CanceledFailureInfo)
-> CanceledFailureInfo
forall s t. s -> (s -> t) -> t
& LensLike' f CanceledFailureInfo Payloads
forall {f :: * -> *}.
Identical f =>
LensLike' f CanceledFailureInfo Payloads
forall (f :: * -> *) s a.
(Functor f, HasField s "details" a) =>
LensLike' f s a
F.details (forall {f :: * -> *}.
 Identical f =>
 LensLike' f CanceledFailureInfo Payloads)
-> Payloads -> CanceledFailureInfo -> CanceledFailureInfo
forall s t a b. Setter s t a b -> b -> s -> t
.~ Payloads
forall msg. Message msg => msg
defMessage
                                           )
                                   )
                           )
                  Maybe ActivityCancelReason
Nothing ->
                    ActivityExecutionResult
forall msg. Message msg => msg
defMessage
                      ActivityExecutionResult
-> (ActivityExecutionResult -> ActivityExecutionResult)
-> ActivityExecutionResult
forall s t. s -> (s -> t) -> t
& LensLike' f ActivityExecutionResult Failure
forall {f :: * -> *}.
Identical f =>
LensLike' f ActivityExecutionResult Failure
forall (f :: * -> *) s a.
(Functor f, HasField s "failed" a) =>
LensLike' f s a
AR.failed
                        (forall {f :: * -> *}.
 Identical f =>
 LensLike' f ActivityExecutionResult Failure)
-> Failure -> ActivityExecutionResult -> ActivityExecutionResult
forall s t a b. Setter s t a b -> b -> s -> t
.~ (Failure
forall msg. Message msg => msg
defMessage Failure -> (Failure -> Failure) -> Failure
forall s t. s -> (s -> t) -> t
& LensLike' f Failure Failure
forall {f :: * -> *}. Identical f => LensLike' f Failure Failure
forall (f :: * -> *) s a.
(Functor f, HasField s "failure" a) =>
LensLike' f s a
AR.failure (forall {f :: * -> *}. Identical f => LensLike' f Failure Failure)
-> Failure -> Failure -> Failure
forall s t a b. Setter s t a b -> b -> s -> t
.~ Failure
enrichedApplicationFailure)
          Right Payload
ok -> do
            $Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> ActivityWorkerM actEnv m ()
(Text -> ActivityWorkerM actEnv m ())
-> (Text -> Text) -> Text -> ActivityWorkerM actEnv m ()
forall a. a -> a
forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> ActivityWorkerM actEnv m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
pack :: String -> Text
logDebug Text
"Got activity result"
            ok' <- IO Payload -> ActivityWorkerM actEnv m Payload
forall a. IO a -> ActivityWorkerM actEnv m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Payload -> ActivityWorkerM actEnv m Payload)
-> IO Payload -> ActivityWorkerM actEnv m Payload
forall a b. (a -> b) -> a -> b
$ PayloadProcessor -> Payload -> IO Payload
payloadProcessorEncode ActivityWorker actEnv
w.payloadProcessor Payload
ok
            pure $
              defMessage
                & C.taskToken .~ rawTaskToken tt
                & C.result
                  .~ ( defMessage
                        & AR.completed
                          .~ (defMessage & AR.result .~ convertToProtoPayload ok')
                     )
        $logDebug ("Activity completion message: " <> T.pack (show completionMsg))
        completionResult <- liftIO $ Core.completeActivityTask w.workerCore completionMsg
        case completionResult of
          Left WorkerError
err -> WorkerError -> ActivityWorkerM actEnv m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO WorkerError
err
          Right ()
_ -> () -> ActivityWorkerM actEnv m ()
forall a. a -> ActivityWorkerM actEnv m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

      atomically $ modifyTVar' w.runningActivities (HashMap.insert tt runningActivity)
      -- We should only be throwing this exception if the activity has a logical error
      -- that is an internal error to the Temporal worker. If the activity throws an
      -- exception, that should be caught and fed to completeActivityTask.
      --
      -- We use link here to kill the worker thread if the activity throws an exception.
      link runningActivity
      putMVar syncPoint ()


applyActivityTaskCancel :: (MonadUnliftIO m, MonadLogger m) => TaskToken -> AT.Cancel -> ActivityWorkerM actEnv m ()
applyActivityTaskCancel :: forall (m :: * -> *) actEnv.
(MonadUnliftIO m, MonadLogger m) =>
TaskToken -> Cancel -> ActivityWorkerM actEnv m ()
applyActivityTaskCancel TaskToken
tt Cancel
msg = do
  w <- ActivityWorkerM actEnv m (ActivityWorker actEnv)
forall r (m :: * -> *). MonadReader r m => m r
ask
  $logDebug $ "Cancelling activity: " <> T.pack (show tt)
  running <- readTVarIO w.runningActivities
  let cancelReason = case Cancel
msg Cancel
-> FoldLike
     ActivityCancelReason
     Cancel
     Cancel
     ActivityCancelReason
     ActivityCancelReason
-> ActivityCancelReason
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
  ActivityCancelReason
  Cancel
  Cancel
  ActivityCancelReason
  ActivityCancelReason
forall (f :: * -> *) s a.
(Functor f, HasField s "reason" a) =>
LensLike' f s a
AT.reason of
        ActivityCancelReason
AT.NOT_FOUND -> ActivityCancelReason
NotFound
        ActivityCancelReason
AT.CANCELLED -> ActivityCancelReason
CancelRequested
        ActivityCancelReason
AT.TIMED_OUT -> ActivityCancelReason
Timeout
        ActivityCancelReason
AT.WORKER_SHUTDOWN -> ActivityCancelReason
WorkerShutdown
        AT.ActivityCancelReason'Unrecognized ActivityCancelReason'UnrecognizedValue
_ -> ActivityCancelReason
UnknownCancellationReason
  forM_ (HashMap.lookup tt running) $ \Async ()
a ->
    Async () -> ActivityCancelReason -> ActivityWorkerM actEnv m ()
forall e (m :: * -> *) a.
(Exception e, MonadIO m) =>
Async a -> e -> m ()
cancelWith Async ()
a ActivityCancelReason
cancelReason ActivityWorkerM actEnv m ()
-> ActivityWorkerM actEnv m () -> ActivityWorkerM actEnv m ()
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m a
`finally` STM () -> ActivityWorkerM actEnv m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TVar (HashMap TaskToken (Async ()))
-> (HashMap TaskToken (Async ()) -> HashMap TaskToken (Async ()))
-> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' ActivityWorker actEnv
w.runningActivities (TaskToken
-> HashMap TaskToken (Async ()) -> HashMap TaskToken (Async ())
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> HashMap k v
HashMap.delete TaskToken
tt))