{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE TemplateHaskell #-}
module Temporal.Activity.Worker where
import Control.Exception.Annotated
import Control.Monad
import Control.Monad.Logger
import Control.Monad.Reader
import Data.Bifunctor
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as HashMap
import Data.ProtoLens
import Data.Text (Text)
import qualified Data.Text as T
import Lens.Family2
import qualified Proto.Temporal.Api.Common.V1.Message_Fields as P
import qualified Proto.Temporal.Api.Failure.V1.Message_Fields as F
import qualified Proto.Temporal.Sdk.Core.ActivityResult.ActivityResult_Fields as AR
import qualified Proto.Temporal.Sdk.Core.ActivityTask.ActivityTask as AT
import qualified Proto.Temporal.Sdk.Core.ActivityTask.ActivityTask_Fields as AT
import qualified Proto.Temporal.Sdk.Core.CoreInterface_Fields as C
import Temporal.Activity.Definition
import Temporal.Activity.Types
import Temporal.Common
import Temporal.Common.Async
import qualified Temporal.Core.Worker as Core
import Temporal.Duration (durationFromProto)
import Temporal.Exception
import Temporal.Interceptor
import Temporal.Payload
import UnliftIO
import UnliftIO.Concurrent (threadDelay)
data ActivityWorker env = ActivityWorker
{ forall env. ActivityWorker env -> IORef env
activityEnv :: {-# UNPACK #-} !(IORef env)
, forall env.
ActivityWorker env -> HashMap Text (ActivityDefinition env)
definitions :: {-# UNPACK #-} !(HashMap Text (ActivityDefinition env))
, forall env.
ActivityWorker env -> TVar (HashMap TaskToken (Async ()))
runningActivities :: {-# UNPACK #-} !(TVar (HashMap TaskToken (Async ())))
, forall env. ActivityWorker env -> Worker 'Real
workerCore :: {-# UNPACK #-} !(Core.Worker 'Core.Real)
, forall env. ActivityWorker env -> ActivityInboundInterceptor env
activityInboundInterceptors :: {-# UNPACK #-} !(ActivityInboundInterceptor env)
, forall env. ActivityWorker env -> ActivityOutboundInterceptor env
activityOutboundInterceptors :: {-# UNPACK #-} !(ActivityOutboundInterceptor env)
, forall env. ActivityWorker env -> ClientInterceptors
clientInterceptors :: {-# UNPACK #-} !ClientInterceptors
, forall env. ActivityWorker env -> [ApplicationFailureHandler]
activityErrorConverters :: {-# UNPACK #-} ![ApplicationFailureHandler]
, forall env. ActivityWorker env -> PayloadProcessor
payloadProcessor :: {-# UNPACK #-} !PayloadProcessor
}
notifyShutdown :: MonadIO m => ActivityWorker env -> m ()
notifyShutdown :: forall (m :: * -> *) env. MonadIO m => ActivityWorker env -> m ()
notifyShutdown ActivityWorker env
worker = do
let shutdownGracePeriod :: Int
shutdownGracePeriod = Word64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (WorkerConfig -> Word64
Core.gracefulShutdownPeriodMillis (WorkerConfig -> Word64) -> WorkerConfig -> Word64
forall a b. (a -> b) -> a -> b
$ Worker 'Real -> WorkerConfig
forall (ty :: WorkerType). Worker ty -> WorkerConfig
Core.getWorkerConfig ActivityWorker env
worker.workerCore)
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
shutdownGracePeriod Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Int -> m ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay (Int
shutdownGracePeriod Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000)
running <- TVar (HashMap TaskToken (Async ()))
-> m (HashMap TaskToken (Async ()))
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO ActivityWorker env
worker.runningActivities
forM_ running $ \Async ()
thread -> Async () -> ActivityCancelReason -> m ()
forall e (m :: * -> *) a.
(Exception e, MonadIO m) =>
Async a -> e -> m ()
cancelWith Async ()
thread ActivityCancelReason
WorkerShutdown
newtype ActivityWorkerM env m a = ActivityWorkerM {forall env (m :: * -> *) a.
ActivityWorkerM env m a -> ReaderT (ActivityWorker env) m a
unActivityWorkerM :: ReaderT (ActivityWorker env) m a}
deriving newtype
( (forall a b.
(a -> b) -> ActivityWorkerM env m a -> ActivityWorkerM env m b)
-> (forall a b.
a -> ActivityWorkerM env m b -> ActivityWorkerM env m a)
-> Functor (ActivityWorkerM env m)
forall a b. a -> ActivityWorkerM env m b -> ActivityWorkerM env m a
forall a b.
(a -> b) -> ActivityWorkerM env m a -> ActivityWorkerM env m b
forall env (m :: * -> *) a b.
Functor m =>
a -> ActivityWorkerM env m b -> ActivityWorkerM env m a
forall env (m :: * -> *) a b.
Functor m =>
(a -> b) -> ActivityWorkerM env m a -> ActivityWorkerM env m b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
$cfmap :: forall env (m :: * -> *) a b.
Functor m =>
(a -> b) -> ActivityWorkerM env m a -> ActivityWorkerM env m b
fmap :: forall a b.
(a -> b) -> ActivityWorkerM env m a -> ActivityWorkerM env m b
$c<$ :: forall env (m :: * -> *) a b.
Functor m =>
a -> ActivityWorkerM env m b -> ActivityWorkerM env m a
<$ :: forall a b. a -> ActivityWorkerM env m b -> ActivityWorkerM env m a
Functor
, Functor (ActivityWorkerM env m)
Functor (ActivityWorkerM env m) =>
(forall a. a -> ActivityWorkerM env m a)
-> (forall a b.
ActivityWorkerM env m (a -> b)
-> ActivityWorkerM env m a -> ActivityWorkerM env m b)
-> (forall a b c.
(a -> b -> c)
-> ActivityWorkerM env m a
-> ActivityWorkerM env m b
-> ActivityWorkerM env m c)
-> (forall a b.
ActivityWorkerM env m a
-> ActivityWorkerM env m b -> ActivityWorkerM env m b)
-> (forall a b.
ActivityWorkerM env m a
-> ActivityWorkerM env m b -> ActivityWorkerM env m a)
-> Applicative (ActivityWorkerM env m)
forall a. a -> ActivityWorkerM env m a
forall a b.
ActivityWorkerM env m a
-> ActivityWorkerM env m b -> ActivityWorkerM env m a
forall a b.
ActivityWorkerM env m a
-> ActivityWorkerM env m b -> ActivityWorkerM env m b
forall a b.
ActivityWorkerM env m (a -> b)
-> ActivityWorkerM env m a -> ActivityWorkerM env m b
forall a b c.
(a -> b -> c)
-> ActivityWorkerM env m a
-> ActivityWorkerM env m b
-> ActivityWorkerM env m c
forall env (m :: * -> *).
Applicative m =>
Functor (ActivityWorkerM env m)
forall env (m :: * -> *) a.
Applicative m =>
a -> ActivityWorkerM env m a
forall env (m :: * -> *) a b.
Applicative m =>
ActivityWorkerM env m a
-> ActivityWorkerM env m b -> ActivityWorkerM env m a
forall env (m :: * -> *) a b.
Applicative m =>
ActivityWorkerM env m a
-> ActivityWorkerM env m b -> ActivityWorkerM env m b
forall env (m :: * -> *) a b.
Applicative m =>
ActivityWorkerM env m (a -> b)
-> ActivityWorkerM env m a -> ActivityWorkerM env m b
forall env (m :: * -> *) a b c.
Applicative m =>
(a -> b -> c)
-> ActivityWorkerM env m a
-> ActivityWorkerM env m b
-> ActivityWorkerM env m c
forall (f :: * -> *).
Functor f =>
(forall a. a -> f a)
-> (forall a b. f (a -> b) -> f a -> f b)
-> (forall a b c. (a -> b -> c) -> f a -> f b -> f c)
-> (forall a b. f a -> f b -> f b)
-> (forall a b. f a -> f b -> f a)
-> Applicative f
$cpure :: forall env (m :: * -> *) a.
Applicative m =>
a -> ActivityWorkerM env m a
pure :: forall a. a -> ActivityWorkerM env m a
$c<*> :: forall env (m :: * -> *) a b.
Applicative m =>
ActivityWorkerM env m (a -> b)
-> ActivityWorkerM env m a -> ActivityWorkerM env m b
<*> :: forall a b.
ActivityWorkerM env m (a -> b)
-> ActivityWorkerM env m a -> ActivityWorkerM env m b
$cliftA2 :: forall env (m :: * -> *) a b c.
Applicative m =>
(a -> b -> c)
-> ActivityWorkerM env m a
-> ActivityWorkerM env m b
-> ActivityWorkerM env m c
liftA2 :: forall a b c.
(a -> b -> c)
-> ActivityWorkerM env m a
-> ActivityWorkerM env m b
-> ActivityWorkerM env m c
$c*> :: forall env (m :: * -> *) a b.
Applicative m =>
ActivityWorkerM env m a
-> ActivityWorkerM env m b -> ActivityWorkerM env m b
*> :: forall a b.
ActivityWorkerM env m a
-> ActivityWorkerM env m b -> ActivityWorkerM env m b
$c<* :: forall env (m :: * -> *) a b.
Applicative m =>
ActivityWorkerM env m a
-> ActivityWorkerM env m b -> ActivityWorkerM env m a
<* :: forall a b.
ActivityWorkerM env m a
-> ActivityWorkerM env m b -> ActivityWorkerM env m a
Applicative
, Applicative (ActivityWorkerM env m)
Applicative (ActivityWorkerM env m) =>
(forall a b.
ActivityWorkerM env m a
-> (a -> ActivityWorkerM env m b) -> ActivityWorkerM env m b)
-> (forall a b.
ActivityWorkerM env m a
-> ActivityWorkerM env m b -> ActivityWorkerM env m b)
-> (forall a. a -> ActivityWorkerM env m a)
-> Monad (ActivityWorkerM env m)
forall a. a -> ActivityWorkerM env m a
forall a b.
ActivityWorkerM env m a
-> ActivityWorkerM env m b -> ActivityWorkerM env m b
forall a b.
ActivityWorkerM env m a
-> (a -> ActivityWorkerM env m b) -> ActivityWorkerM env m b
forall env (m :: * -> *).
Monad m =>
Applicative (ActivityWorkerM env m)
forall env (m :: * -> *) a. Monad m => a -> ActivityWorkerM env m a
forall env (m :: * -> *) a b.
Monad m =>
ActivityWorkerM env m a
-> ActivityWorkerM env m b -> ActivityWorkerM env m b
forall env (m :: * -> *) a b.
Monad m =>
ActivityWorkerM env m a
-> (a -> ActivityWorkerM env m b) -> ActivityWorkerM env m b
forall (m :: * -> *).
Applicative m =>
(forall a b. m a -> (a -> m b) -> m b)
-> (forall a b. m a -> m b -> m b)
-> (forall a. a -> m a)
-> Monad m
$c>>= :: forall env (m :: * -> *) a b.
Monad m =>
ActivityWorkerM env m a
-> (a -> ActivityWorkerM env m b) -> ActivityWorkerM env m b
>>= :: forall a b.
ActivityWorkerM env m a
-> (a -> ActivityWorkerM env m b) -> ActivityWorkerM env m b
$c>> :: forall env (m :: * -> *) a b.
Monad m =>
ActivityWorkerM env m a
-> ActivityWorkerM env m b -> ActivityWorkerM env m b
>> :: forall a b.
ActivityWorkerM env m a
-> ActivityWorkerM env m b -> ActivityWorkerM env m b
$creturn :: forall env (m :: * -> *) a. Monad m => a -> ActivityWorkerM env m a
return :: forall a. a -> ActivityWorkerM env m a
Monad
, Monad (ActivityWorkerM env m)
Monad (ActivityWorkerM env m) =>
(forall a. IO a -> ActivityWorkerM env m a)
-> MonadIO (ActivityWorkerM env m)
forall a. IO a -> ActivityWorkerM env m a
forall env (m :: * -> *).
MonadIO m =>
Monad (ActivityWorkerM env m)
forall env (m :: * -> *) a.
MonadIO m =>
IO a -> ActivityWorkerM env m a
forall (m :: * -> *).
Monad m =>
(forall a. IO a -> m a) -> MonadIO m
$cliftIO :: forall env (m :: * -> *) a.
MonadIO m =>
IO a -> ActivityWorkerM env m a
liftIO :: forall a. IO a -> ActivityWorkerM env m a
MonadIO
, MonadReader (ActivityWorker env)
, MonadIO (ActivityWorkerM env m)
MonadIO (ActivityWorkerM env m) =>
(forall b.
((forall a. ActivityWorkerM env m a -> IO a) -> IO b)
-> ActivityWorkerM env m b)
-> MonadUnliftIO (ActivityWorkerM env m)
forall b.
((forall a. ActivityWorkerM env m a -> IO a) -> IO b)
-> ActivityWorkerM env m b
forall env (m :: * -> *).
MonadUnliftIO m =>
MonadIO (ActivityWorkerM env m)
forall env (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. ActivityWorkerM env m a -> IO a) -> IO b)
-> ActivityWorkerM env m b
forall (m :: * -> *).
MonadIO m =>
(forall b. ((forall a. m a -> IO a) -> IO b) -> m b)
-> MonadUnliftIO m
$cwithRunInIO :: forall env (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. ActivityWorkerM env m a -> IO a) -> IO b)
-> ActivityWorkerM env m b
withRunInIO :: forall b.
((forall a. ActivityWorkerM env m a -> IO a) -> IO b)
-> ActivityWorkerM env m b
MonadUnliftIO
, Monad (ActivityWorkerM env m)
Monad (ActivityWorkerM env m) =>
(forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> ActivityWorkerM env m ())
-> MonadLogger (ActivityWorkerM env m)
forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> ActivityWorkerM env m ()
forall env (m :: * -> *).
MonadLogger m =>
Monad (ActivityWorkerM env m)
forall env (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> ActivityWorkerM env m ()
forall (m :: * -> *).
Monad m =>
(forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> m ())
-> MonadLogger m
$cmonadLoggerLog :: forall env (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> ActivityWorkerM env m ()
monadLoggerLog :: forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> ActivityWorkerM env m ()
MonadLogger
, MonadIO (ActivityWorkerM env m)
MonadLogger (ActivityWorkerM env m)
ActivityWorkerM env m (Loc -> Text -> LogLevel -> LogStr -> IO ())
(MonadLogger (ActivityWorkerM env m),
MonadIO (ActivityWorkerM env m)) =>
ActivityWorkerM env m (Loc -> Text -> LogLevel -> LogStr -> IO ())
-> MonadLoggerIO (ActivityWorkerM env m)
forall env (m :: * -> *).
MonadLoggerIO m =>
MonadIO (ActivityWorkerM env m)
forall env (m :: * -> *).
MonadLoggerIO m =>
MonadLogger (ActivityWorkerM env m)
forall env (m :: * -> *).
MonadLoggerIO m =>
ActivityWorkerM env m (Loc -> Text -> LogLevel -> LogStr -> IO ())
forall (m :: * -> *).
(MonadLogger m, MonadIO m) =>
m (Loc -> Text -> LogLevel -> LogStr -> IO ()) -> MonadLoggerIO m
$caskLoggerIO :: forall env (m :: * -> *).
MonadLoggerIO m =>
ActivityWorkerM env m (Loc -> Text -> LogLevel -> LogStr -> IO ())
askLoggerIO :: ActivityWorkerM env m (Loc -> Text -> LogLevel -> LogStr -> IO ())
MonadLoggerIO
)
runActivityWorker :: (MonadUnliftIO m, MonadLogger m) => ActivityWorker env -> ActivityWorkerM env m a -> m a
runActivityWorker :: forall (m :: * -> *) env a.
(MonadUnliftIO m, MonadLogger m) =>
ActivityWorker env -> ActivityWorkerM env m a -> m a
runActivityWorker ActivityWorker env
w ActivityWorkerM env m a
m = ReaderT (ActivityWorker env) m a -> ActivityWorker env -> m a
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (ActivityWorkerM env m a -> ReaderT (ActivityWorker env) m a
forall env (m :: * -> *) a.
ActivityWorkerM env m a -> ReaderT (ActivityWorker env) m a
unActivityWorkerM ActivityWorkerM env m a
m) ActivityWorker env
w
execute :: (MonadUnliftIO m, MonadLogger m) => ActivityWorker actEnv -> m ()
execute :: forall (m :: * -> *) actEnv.
(MonadUnliftIO m, MonadLogger m) =>
ActivityWorker actEnv -> m ()
execute ActivityWorker actEnv
worker = ActivityWorker actEnv -> ActivityWorkerM actEnv m () -> m ()
forall (m :: * -> *) env a.
(MonadUnliftIO m, MonadLogger m) =>
ActivityWorker env -> ActivityWorkerM env m a -> m a
runActivityWorker ActivityWorker actEnv
worker ActivityWorkerM actEnv m ()
go
where
go :: ActivityWorkerM actEnv m ()
go = do
eTask <- IO (Either WorkerError ActivityTask)
-> ActivityWorkerM actEnv m (Either WorkerError ActivityTask)
forall a. IO a -> ActivityWorkerM actEnv m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either WorkerError ActivityTask)
-> ActivityWorkerM actEnv m (Either WorkerError ActivityTask))
-> IO (Either WorkerError ActivityTask)
-> ActivityWorkerM actEnv m (Either WorkerError ActivityTask)
forall a b. (a -> b) -> a -> b
$ Worker 'Real -> IO (Either WorkerError ActivityTask)
forall (ty :: WorkerType).
KnownWorkerType ty =>
Worker ty -> IO (Either WorkerError ActivityTask)
Core.pollActivityTask ActivityWorker actEnv
worker.workerCore
case eTask of
Left (Core.WorkerError WorkerErrorCode
Core.PollShutdown Text
_) -> do
() -> ActivityWorkerM actEnv m ()
forall a. a -> ActivityWorkerM actEnv m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Left WorkerError
e -> do
$Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> ActivityWorkerM actEnv m ()
(Text -> ActivityWorkerM actEnv m ())
-> (Text -> Text) -> Text -> ActivityWorkerM actEnv m ()
forall a. a -> a
forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> ActivityWorkerM actEnv m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
pack :: String -> Text
logError (String -> Text
T.pack (WorkerError -> String
forall a. Show a => a -> String
show WorkerError
e))
WorkerError -> ActivityWorkerM actEnv m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO WorkerError
e
Right ActivityTask
task -> do
ActivityTask -> ActivityWorkerM actEnv m ()
forall (m :: * -> *) actEnv.
(MonadUnliftIO m, MonadLogger m) =>
ActivityTask -> ActivityWorkerM actEnv m ()
applyActivityTask ActivityTask
task
ActivityWorkerM actEnv m ()
go
activityInfoFromProto :: MonadIO m => TaskToken -> AT.Start -> ActivityWorkerM actEnv m ActivityInfo
activityInfoFromProto :: forall (m :: * -> *) actEnv.
MonadIO m =>
TaskToken -> Start -> ActivityWorkerM actEnv m ActivityInfo
activityInfoFromProto TaskToken
tt Start
msg = do
w <- ActivityWorkerM actEnv m (ActivityWorker actEnv)
forall r (m :: * -> *). MonadReader r m => m r
ask
hdrs <- processorDecodePayloads w.payloadProcessor (fmap convertFromProtoPayload (msg ^. AT.headerFields))
heartbeats <- processorDecodePayloads w.payloadProcessor (fmap convertFromProtoPayload (msg ^. AT.vec'heartbeatDetails))
pure $
ActivityInfo
{ workflowNamespace = Namespace $ msg ^. AT.workflowNamespace
, workflowType = WorkflowType $ msg ^. AT.workflowType
, workflowId = WorkflowId $ msg ^. AT.workflowExecution . P.workflowId
, runId = RunId $ msg ^. AT.workflowExecution . P.runId
, activityId = ActivityId $ msg ^. AT.activityId
, activityType = msg ^. AT.activityType
, headerFields = hdrs
, heartbeatDetails = heartbeats
, scheduledTime = msg ^. AT.scheduledTime . to timespecFromTimestamp
, currentAttemptScheduledTime = msg ^. AT.currentAttemptScheduledTime . to timespecFromTimestamp
, startedTime = msg ^. AT.startedTime . to timespecFromTimestamp
, attempt = msg ^. AT.attempt
, scheduleToCloseTimeout = fmap durationFromProto (msg ^. AT.maybe'scheduleToCloseTimeout)
, startToCloseTimeout = fmap durationFromProto (msg ^. AT.maybe'startToCloseTimeout)
, heartbeatTimeout = fmap durationFromProto (msg ^. AT.maybe'heartbeatTimeout)
, retryPolicy = fmap retryPolicyFromProto (msg ^. AT.maybe'retryPolicy)
, isLocal = msg ^. AT.isLocal
, taskToken = tt
}
completeAsync :: MonadIO m => m ()
completeAsync :: forall (m :: * -> *). MonadIO m => m ()
completeAsync = CompleteAsync -> m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO CompleteAsync
CompleteAsync
applyActivityTask :: (MonadUnliftIO m, MonadLogger m) => AT.ActivityTask -> ActivityWorkerM actEnv m ()
applyActivityTask :: forall (m :: * -> *) actEnv.
(MonadUnliftIO m, MonadLogger m) =>
ActivityTask -> ActivityWorkerM actEnv m ()
applyActivityTask ActivityTask
task = case ActivityTask
task ActivityTask
-> FoldLike
(Maybe ActivityTask'Variant)
ActivityTask
ActivityTask
(Maybe ActivityTask'Variant)
(Maybe ActivityTask'Variant)
-> Maybe ActivityTask'Variant
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
(Maybe ActivityTask'Variant)
ActivityTask
ActivityTask
(Maybe ActivityTask'Variant)
(Maybe ActivityTask'Variant)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'variant" a) =>
LensLike' f s a
AT.maybe'variant of
Maybe ActivityTask'Variant
Nothing -> RuntimeError -> ActivityWorkerM actEnv m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (RuntimeError -> ActivityWorkerM actEnv m ())
-> RuntimeError -> ActivityWorkerM actEnv m ()
forall a b. (a -> b) -> a -> b
$ String -> RuntimeError
RuntimeError String
"Activity task has no variant or an unknown variant"
Just (AT.ActivityTask'Start Start
msg) -> ActivityTask -> TaskToken -> Start -> ActivityWorkerM actEnv m ()
forall (m :: * -> *) actEnv.
(MonadUnliftIO m, MonadLogger m) =>
ActivityTask -> TaskToken -> Start -> ActivityWorkerM actEnv m ()
applyActivityTaskStart ActivityTask
task TaskToken
tt Start
msg
Just (AT.ActivityTask'Cancel Cancel
msg) -> TaskToken -> Cancel -> ActivityWorkerM actEnv m ()
forall (m :: * -> *) actEnv.
(MonadUnliftIO m, MonadLogger m) =>
TaskToken -> Cancel -> ActivityWorkerM actEnv m ()
applyActivityTaskCancel TaskToken
tt Cancel
msg
where
tt :: TaskToken
tt = ByteString -> TaskToken
TaskToken (ByteString -> TaskToken) -> ByteString -> TaskToken
forall a b. (a -> b) -> a -> b
$ ActivityTask
task ActivityTask
-> FoldLike
ByteString ActivityTask ActivityTask ByteString ByteString
-> ByteString
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike ByteString ActivityTask ActivityTask ByteString ByteString
forall (f :: * -> *) s a.
(Functor f, HasField s "taskToken" a) =>
LensLike' f s a
AT.taskToken
requireActivityNotRunning :: MonadUnliftIO m => TaskToken -> ActivityWorkerM actEnv m () -> ActivityWorkerM actEnv m ()
requireActivityNotRunning :: forall (m :: * -> *) actEnv.
MonadUnliftIO m =>
TaskToken
-> ActivityWorkerM actEnv m () -> ActivityWorkerM actEnv m ()
requireActivityNotRunning TaskToken
tt ActivityWorkerM actEnv m ()
m = do
w <- ActivityWorkerM actEnv m (ActivityWorker actEnv)
forall r (m :: * -> *). MonadReader r m => m r
ask
running <- readTVarIO w.runningActivities
case HashMap.lookup tt running of
Just Async ()
_ -> RuntimeError -> ActivityWorkerM actEnv m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (RuntimeError -> ActivityWorkerM actEnv m ())
-> RuntimeError -> ActivityWorkerM actEnv m ()
forall a b. (a -> b) -> a -> b
$ String -> RuntimeError
RuntimeError String
"Activity task already running"
Maybe (Async ())
Nothing -> ActivityWorkerM actEnv m ()
m
applyActivityTaskStart :: (MonadUnliftIO m, MonadLogger m) => AT.ActivityTask -> TaskToken -> AT.Start -> ActivityWorkerM actEnv m ()
applyActivityTaskStart :: forall (m :: * -> *) actEnv.
(MonadUnliftIO m, MonadLogger m) =>
ActivityTask -> TaskToken -> Start -> ActivityWorkerM actEnv m ()
applyActivityTaskStart ActivityTask
tsk TaskToken
tt Start
msg = do
w <- ActivityWorkerM actEnv m (ActivityWorker actEnv)
forall r (m :: * -> *). MonadReader r m => m r
ask
$logDebug $ "Starting activity: " <> T.pack (show tsk)
requireActivityNotRunning tt $ do
info <- activityInfoFromProto tt msg
args <- processorDecodePayloads w.payloadProcessor (fmap convertFromProtoPayload (msg ^. AT.vec'input))
env <- readIORef w.activityEnv
let
actEnv = Worker 'Real
-> ActivityInfo
-> ClientInterceptors
-> PayloadProcessor
-> actEnv
-> ActivityEnv actEnv
forall env.
Worker 'Real
-> ActivityInfo
-> ClientInterceptors
-> PayloadProcessor
-> env
-> ActivityEnv env
ActivityEnv ActivityWorker actEnv
w.workerCore ActivityInfo
info ActivityWorker actEnv
w.clientInterceptors ActivityWorker actEnv
w.payloadProcessor
input =
Vector Payload
-> Map Text Payload -> ActivityInfo -> ExecuteActivityInput
ExecuteActivityInput
Vector Payload
args
ActivityInfo
info.headerFields
ActivityInfo
info
mask_ $ do
syncPoint <- newEmptyMVar
let c = Worker 'Real -> WorkerConfig
forall (ty :: WorkerType). Worker ty -> WorkerConfig
Core.getWorkerConfig ActivityWorker actEnv
w.workerCore
runningActivity <- asyncLabelled (T.unpack $ T.concat ["temporal/worker/activity/start/", Core.namespace c, "/", Core.taskQueue c]) $ do
(ef :: Either SomeException (Either String Payload)) <- liftIO $ UnliftIO.trySyncOrAsync $ do
w.activityInboundInterceptors.executeActivity env input $ \actEnv
env' ExecuteActivityInput
input' -> do
case Text
-> HashMap Text (ActivityDefinition actEnv)
-> Maybe (ActivityDefinition actEnv)
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HashMap.lookup ActivityInfo
info.activityType ActivityWorker actEnv
w.definitions of
Maybe (ActivityDefinition actEnv)
Nothing -> RuntimeError -> IO (Either String Payload)
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (RuntimeError -> IO (Either String Payload))
-> RuntimeError -> IO (Either String Payload)
forall a b. (a -> b) -> a -> b
$ String -> RuntimeError
RuntimeError (String
"Activity type not found: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Text -> String
T.unpack ActivityInfo
info.activityType)
Just ActivityDefinition {Text
ActivityEnv actEnv
-> ExecuteActivityInput -> IO (Either String Payload)
activityName :: Text
activityRun :: ActivityEnv actEnv
-> ExecuteActivityInput -> IO (Either String Payload)
activityRun :: forall env.
ActivityDefinition env
-> ActivityEnv env
-> ExecuteActivityInput
-> IO (Either String Payload)
activityName :: forall env. ActivityDefinition env -> Text
..} ->
ActivityEnv actEnv
-> ExecuteActivityInput -> IO (Either String Payload)
activityRun (actEnv -> ActivityEnv actEnv
actEnv actEnv
env') ExecuteActivityInput
input'
IO (Either String Payload) -> IO () -> IO (Either String Payload)
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m a
`finally` (MVar () -> IO ()
forall (m :: * -> *) a. MonadIO m => MVar a -> m a
takeMVar MVar ()
syncPoint IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TVar (HashMap TaskToken (Async ()))
-> (HashMap TaskToken (Async ()) -> HashMap TaskToken (Async ()))
-> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' ActivityWorker actEnv
w.runningActivities (TaskToken
-> HashMap TaskToken (Async ()) -> HashMap TaskToken (Async ())
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> HashMap k v
HashMap.delete TaskToken
tt)))
completionMsg <- case ef >>= first (toException . ValueError) of
Left err :: SomeException
err@(SomeException e
_wrappedErr) -> do
$Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> ActivityWorkerM actEnv m ()
(Text -> ActivityWorkerM actEnv m ())
-> (Text -> Text) -> Text -> ActivityWorkerM actEnv m ()
forall a. a -> a
forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> ActivityWorkerM actEnv m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
pack :: String -> Text
logDebug (String -> Text
T.pack (SomeException -> String
forall a. Show a => a -> String
show SomeException
err))
let appFailure :: ApplicationFailure
appFailure = SomeException -> [ApplicationFailureHandler] -> ApplicationFailure
mkApplicationFailure SomeException
err ActivityWorker actEnv
w.activityErrorConverters
enrichedApplicationFailure :: Failure
enrichedApplicationFailure = ApplicationFailure -> Failure
applicationFailureToFailureProto ApplicationFailure
appFailure
ActivityTaskCompletion
-> ActivityWorkerM actEnv m ActivityTaskCompletion
forall a. a -> ActivityWorkerM actEnv m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ActivityTaskCompletion
-> ActivityWorkerM actEnv m ActivityTaskCompletion)
-> ActivityTaskCompletion
-> ActivityWorkerM actEnv m ActivityTaskCompletion
forall a b. (a -> b) -> a -> b
$
ActivityTaskCompletion
forall msg. Message msg => msg
defMessage
ActivityTaskCompletion
-> (ActivityTaskCompletion -> ActivityTaskCompletion)
-> ActivityTaskCompletion
forall s t. s -> (s -> t) -> t
& LensLike' f ActivityTaskCompletion ByteString
forall {f :: * -> *}.
Identical f =>
LensLike' f ActivityTaskCompletion ByteString
forall (f :: * -> *) s a.
(Functor f, HasField s "taskToken" a) =>
LensLike' f s a
C.taskToken (forall {f :: * -> *}.
Identical f =>
LensLike' f ActivityTaskCompletion ByteString)
-> ByteString -> ActivityTaskCompletion -> ActivityTaskCompletion
forall s t a b. Setter s t a b -> b -> s -> t
.~ TaskToken -> ByteString
rawTaskToken TaskToken
tt
ActivityTaskCompletion
-> (ActivityTaskCompletion -> ActivityTaskCompletion)
-> ActivityTaskCompletion
forall s t. s -> (s -> t) -> t
& LensLike' f ActivityTaskCompletion ActivityExecutionResult
forall {f :: * -> *}.
Identical f =>
LensLike' f ActivityTaskCompletion ActivityExecutionResult
forall (f :: * -> *) s a.
(Functor f, HasField s "result" a) =>
LensLike' f s a
C.result (forall {f :: * -> *}.
Identical f =>
LensLike' f ActivityTaskCompletion ActivityExecutionResult)
-> ActivityExecutionResult
-> ActivityTaskCompletion
-> ActivityTaskCompletion
forall s t a b. Setter s t a b -> b -> s -> t
.~ case SomeException -> Maybe ActivityCancelReason
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
err of
Just (ActivityCancelReason
_cancelled :: ActivityCancelReason) ->
ActivityExecutionResult
forall msg. Message msg => msg
defMessage
ActivityExecutionResult
-> (ActivityExecutionResult -> ActivityExecutionResult)
-> ActivityExecutionResult
forall s t. s -> (s -> t) -> t
& LensLike' f ActivityExecutionResult Cancellation
forall {f :: * -> *}.
Identical f =>
LensLike' f ActivityExecutionResult Cancellation
forall (f :: * -> *) s a.
(Functor f, HasField s "cancelled" a) =>
LensLike' f s a
AR.cancelled
(forall {f :: * -> *}.
Identical f =>
LensLike' f ActivityExecutionResult Cancellation)
-> Cancellation
-> ActivityExecutionResult
-> ActivityExecutionResult
forall s t a b. Setter s t a b -> b -> s -> t
.~ ( Cancellation
forall msg. Message msg => msg
defMessage
Cancellation -> (Cancellation -> Cancellation) -> Cancellation
forall s t. s -> (s -> t) -> t
& LensLike' f Cancellation Failure
forall {f :: * -> *}.
Identical f =>
LensLike' f Cancellation Failure
forall (f :: * -> *) s a.
(Functor f, HasField s "failure" a) =>
LensLike' f s a
AR.failure
(forall {f :: * -> *}.
Identical f =>
LensLike' f Cancellation Failure)
-> Failure -> Cancellation -> Cancellation
forall s t a b. Setter s t a b -> b -> s -> t
.~ ( Failure
forall msg. Message msg => msg
defMessage
Failure -> (Failure -> Failure) -> Failure
forall s t. s -> (s -> t) -> t
& LensLike' f Failure Text
forall {f :: * -> *}. Identical f => LensLike' f Failure Text
forall (f :: * -> *) s a.
(Functor f, HasField s "message" a) =>
LensLike' f s a
F.message (forall {f :: * -> *}. Identical f => LensLike' f Failure Text)
-> Text -> Failure -> Failure
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
"Activity cancelled"
Failure -> (Failure -> Failure) -> Failure
forall s t. s -> (s -> t) -> t
& LensLike' f Failure CanceledFailureInfo
forall {f :: * -> *}.
Identical f =>
LensLike' f Failure CanceledFailureInfo
forall (f :: * -> *) s a.
(Functor f, HasField s "canceledFailureInfo" a) =>
LensLike' f s a
F.canceledFailureInfo
(forall {f :: * -> *}.
Identical f =>
LensLike' f Failure CanceledFailureInfo)
-> CanceledFailureInfo -> Failure -> Failure
forall s t a b. Setter s t a b -> b -> s -> t
.~ ( CanceledFailureInfo
forall msg. Message msg => msg
defMessage
CanceledFailureInfo
-> (CanceledFailureInfo -> CanceledFailureInfo)
-> CanceledFailureInfo
forall s t. s -> (s -> t) -> t
& LensLike' f CanceledFailureInfo Payloads
forall {f :: * -> *}.
Identical f =>
LensLike' f CanceledFailureInfo Payloads
forall (f :: * -> *) s a.
(Functor f, HasField s "details" a) =>
LensLike' f s a
F.details (forall {f :: * -> *}.
Identical f =>
LensLike' f CanceledFailureInfo Payloads)
-> Payloads -> CanceledFailureInfo -> CanceledFailureInfo
forall s t a b. Setter s t a b -> b -> s -> t
.~ Payloads
forall msg. Message msg => msg
defMessage
)
)
)
Maybe ActivityCancelReason
Nothing ->
ActivityExecutionResult
forall msg. Message msg => msg
defMessage
ActivityExecutionResult
-> (ActivityExecutionResult -> ActivityExecutionResult)
-> ActivityExecutionResult
forall s t. s -> (s -> t) -> t
& LensLike' f ActivityExecutionResult Failure
forall {f :: * -> *}.
Identical f =>
LensLike' f ActivityExecutionResult Failure
forall (f :: * -> *) s a.
(Functor f, HasField s "failed" a) =>
LensLike' f s a
AR.failed
(forall {f :: * -> *}.
Identical f =>
LensLike' f ActivityExecutionResult Failure)
-> Failure -> ActivityExecutionResult -> ActivityExecutionResult
forall s t a b. Setter s t a b -> b -> s -> t
.~ (Failure
forall msg. Message msg => msg
defMessage Failure -> (Failure -> Failure) -> Failure
forall s t. s -> (s -> t) -> t
& LensLike' f Failure Failure
forall {f :: * -> *}. Identical f => LensLike' f Failure Failure
forall (f :: * -> *) s a.
(Functor f, HasField s "failure" a) =>
LensLike' f s a
AR.failure (forall {f :: * -> *}. Identical f => LensLike' f Failure Failure)
-> Failure -> Failure -> Failure
forall s t a b. Setter s t a b -> b -> s -> t
.~ Failure
enrichedApplicationFailure)
Right Payload
ok -> do
$Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> ActivityWorkerM actEnv m ()
(Text -> ActivityWorkerM actEnv m ())
-> (Text -> Text) -> Text -> ActivityWorkerM actEnv m ()
forall a. a -> a
forall msg.
ToLogStr msg =>
Loc -> Text -> LogLevel -> msg -> ActivityWorkerM actEnv m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
pack :: String -> Text
logDebug Text
"Got activity result"
ok' <- IO Payload -> ActivityWorkerM actEnv m Payload
forall a. IO a -> ActivityWorkerM actEnv m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Payload -> ActivityWorkerM actEnv m Payload)
-> IO Payload -> ActivityWorkerM actEnv m Payload
forall a b. (a -> b) -> a -> b
$ PayloadProcessor -> Payload -> IO Payload
payloadProcessorEncode ActivityWorker actEnv
w.payloadProcessor Payload
ok
pure $
defMessage
& C.taskToken .~ rawTaskToken tt
& C.result
.~ ( defMessage
& AR.completed
.~ (defMessage & AR.result .~ convertToProtoPayload ok')
)
$logDebug ("Activity completion message: " <> T.pack (show completionMsg))
completionResult <- liftIO $ Core.completeActivityTask w.workerCore completionMsg
case completionResult of
Left WorkerError
err -> WorkerError -> ActivityWorkerM actEnv m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO WorkerError
err
Right ()
_ -> () -> ActivityWorkerM actEnv m ()
forall a. a -> ActivityWorkerM actEnv m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
atomically $ modifyTVar' w.runningActivities (HashMap.insert tt runningActivity)
link runningActivity
putMVar syncPoint ()
applyActivityTaskCancel :: (MonadUnliftIO m, MonadLogger m) => TaskToken -> AT.Cancel -> ActivityWorkerM actEnv m ()
applyActivityTaskCancel :: forall (m :: * -> *) actEnv.
(MonadUnliftIO m, MonadLogger m) =>
TaskToken -> Cancel -> ActivityWorkerM actEnv m ()
applyActivityTaskCancel TaskToken
tt Cancel
msg = do
w <- ActivityWorkerM actEnv m (ActivityWorker actEnv)
forall r (m :: * -> *). MonadReader r m => m r
ask
$logDebug $ "Cancelling activity: " <> T.pack (show tt)
running <- readTVarIO w.runningActivities
let cancelReason = case Cancel
msg Cancel
-> FoldLike
ActivityCancelReason
Cancel
Cancel
ActivityCancelReason
ActivityCancelReason
-> ActivityCancelReason
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
ActivityCancelReason
Cancel
Cancel
ActivityCancelReason
ActivityCancelReason
forall (f :: * -> *) s a.
(Functor f, HasField s "reason" a) =>
LensLike' f s a
AT.reason of
ActivityCancelReason
AT.NOT_FOUND -> ActivityCancelReason
NotFound
ActivityCancelReason
AT.CANCELLED -> ActivityCancelReason
CancelRequested
ActivityCancelReason
AT.TIMED_OUT -> ActivityCancelReason
Timeout
ActivityCancelReason
AT.WORKER_SHUTDOWN -> ActivityCancelReason
WorkerShutdown
AT.ActivityCancelReason'Unrecognized ActivityCancelReason'UnrecognizedValue
_ -> ActivityCancelReason
UnknownCancellationReason
forM_ (HashMap.lookup tt running) $ \Async ()
a ->
Async () -> ActivityCancelReason -> ActivityWorkerM actEnv m ()
forall e (m :: * -> *) a.
(Exception e, MonadIO m) =>
Async a -> e -> m ()
cancelWith Async ()
a ActivityCancelReason
cancelReason ActivityWorkerM actEnv m ()
-> ActivityWorkerM actEnv m () -> ActivityWorkerM actEnv m ()
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m a
`finally` STM () -> ActivityWorkerM actEnv m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TVar (HashMap TaskToken (Async ()))
-> (HashMap TaskToken (Async ()) -> HashMap TaskToken (Async ()))
-> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' ActivityWorker actEnv
w.runningActivities (TaskToken
-> HashMap TaskToken (Async ()) -> HashMap TaskToken (Async ())
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> HashMap k v
HashMap.delete TaskToken
tt))