{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
module Temporal.Activity (
Activity,
heartbeat,
ActivityInfo (..),
askActivityInfo,
ActivityCancelReason (..),
activityWorkflowClient,
askActivityClient,
provideActivity,
ProvidedActivity (..),
KnownActivity (..),
ActivityDefinition (..),
TaskToken (..),
CompleteAsync (..),
completeAsync,
(:->:),
) where
import Control.Monad
import Control.Monad.IO.Class
import Control.Monad.Reader.Class
import Data.ProtoLens
import Data.Proxy
import Data.Text (Text)
import Lens.Family2
import qualified Proto.Temporal.Sdk.Core.CoreInterface_Fields as Proto
import Temporal.Activity.Definition
import Temporal.Activity.Types
import Temporal.Activity.Worker
import Temporal.Client
import Temporal.Common
import Temporal.Core.Worker (getWorkerClient, recordActivityHeartbeat)
import Temporal.Exception
import Temporal.Payload
import Temporal.Workflow.Types
provideActivity
:: forall codec env f
. ( f ~ (ArgsOf f :->: Activity env (ResultOf (Activity env) f))
, FunctionSupportsCodec codec (ArgsOf f) (ResultOf (Activity env) f)
)
=> codec
-> Text
-> f
-> ProvidedActivity env f
provideActivity :: forall codec env f.
(f ~ (ArgsOf f :->: Activity env (ResultOf (Activity env) f)),
FunctionSupportsCodec
codec (ArgsOf f) (ResultOf (Activity env) f)) =>
codec -> Text -> f -> ProvidedActivity env f
provideActivity codec
codec Text
name f
f =
ProvidedActivity
{ definition :: ActivityDefinition env
definition =
ActivityDefinition
{ activityName :: Text
activityName = Text
name
, activityRun :: ActivityEnv env
-> ExecuteActivityInput -> IO (Either String Payload)
activityRun = \ActivityEnv env
actEnv ExecuteActivityInput
input -> do
eAct <-
codec
-> Proxy (ArgsOf f)
-> Proxy (Activity env (ResultOf (Activity env) f))
-> (ArgsOf f :->: Activity env (ResultOf (Activity env) f))
-> Vector Payload
-> IO (Either String (Activity env (ResultOf (Activity env) f)))
forall result.
codec
-> Proxy (ArgsOf f)
-> Proxy result
-> (ArgsOf f :->: result)
-> Vector Payload
-> IO (Either String result)
forall codec (args :: [*]) result.
ApplyPayloads codec args =>
codec
-> Proxy args
-> Proxy result
-> (args :->: result)
-> Vector Payload
-> IO (Either String result)
applyPayloads
codec
codec
(forall (t :: [*]). Proxy t
forall {k} (t :: k). Proxy t
Proxy @(ArgsOf f))
(forall t. Proxy t
forall {k} (t :: k). Proxy t
Proxy @(Activity env (ResultOf (Activity env) f)))
f
ArgsOf f :->: Activity env (ResultOf (Activity env) f)
f
ExecuteActivityInput
input.activityArgs
traverse (runActivity actEnv >=> encode codec) eAct
}
, reference :: KnownActivity (ArgsOf f) (ResultOf (Activity env) f)
reference =
KnownActivity
{ knownActivityCodec :: codec
knownActivityCodec = codec
codec
, knownActivityName :: Text
knownActivityName = Text
name
}
}
heartbeat :: [Payload] -> Activity env ()
heartbeat :: forall env. [Payload] -> Activity env ()
heartbeat [Payload]
baseDetails = do
(TaskToken token) <- ActivityInfo -> TaskToken
taskToken (ActivityInfo -> TaskToken)
-> Activity env ActivityInfo -> Activity env TaskToken
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Activity env ActivityInfo
forall env. Activity env ActivityInfo
askActivityInfo
worker <- askActivityWorker
let details =
ActivityHeartbeat
forall msg. Message msg => msg
defMessage
ActivityHeartbeat
-> (ActivityHeartbeat -> ActivityHeartbeat) -> ActivityHeartbeat
forall s t. s -> (s -> t) -> t
& LensLike' f ActivityHeartbeat ByteString
forall {f :: * -> *}.
Identical f =>
LensLike' f ActivityHeartbeat ByteString
forall (f :: * -> *) s a.
(Functor f, HasField s "taskToken" a) =>
LensLike' f s a
Proto.taskToken (forall {f :: * -> *}.
Identical f =>
LensLike' f ActivityHeartbeat ByteString)
-> ByteString -> ActivityHeartbeat -> ActivityHeartbeat
forall s t a b. Setter s t a b -> b -> s -> t
.~ ByteString
token
ActivityHeartbeat
-> (ActivityHeartbeat -> ActivityHeartbeat) -> ActivityHeartbeat
forall s t. s -> (s -> t) -> t
& LensLike' f ActivityHeartbeat [Payload]
forall {f :: * -> *}.
Identical f =>
LensLike' f ActivityHeartbeat [Payload]
forall (f :: * -> *) s a.
(Functor f, HasField s "details" a) =>
LensLike' f s a
Proto.details (forall {f :: * -> *}.
Identical f =>
LensLike' f ActivityHeartbeat [Payload])
-> [Payload] -> ActivityHeartbeat -> ActivityHeartbeat
forall s t a b. Setter s t a b -> b -> s -> t
.~ (Payload -> Payload) -> [Payload] -> [Payload]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Payload -> Payload
convertToProtoPayload [Payload]
baseDetails
void $ liftIO $ recordActivityHeartbeat worker details
activityWorkflowClient :: Activity env WorkflowClient
activityWorkflowClient :: forall env. Activity env WorkflowClient
activityWorkflowClient = ReaderT (ActivityEnv env) IO WorkflowClient
-> Activity env WorkflowClient
forall env a. ReaderT (ActivityEnv env) IO a -> Activity env a
Activity (ReaderT (ActivityEnv env) IO WorkflowClient
-> Activity env WorkflowClient)
-> ReaderT (ActivityEnv env) IO WorkflowClient
-> Activity env WorkflowClient
forall a b. (a -> b) -> a -> b
$ do
e <- ReaderT (ActivityEnv env) IO (ActivityEnv env)
forall r (m :: * -> *). MonadReader r m => m r
ask
workflowClient (getWorkerClient e.activityWorker) $
WorkflowClientConfig
{ namespace = e.activityInfo.workflowNamespace
, interceptors = e.activityClientInterceptors
, payloadProcessor = e.activityPayloadProcessor
}
instance HasWorkflowClient (Activity env) where
askWorkflowClient :: Activity env WorkflowClient
askWorkflowClient = Activity env WorkflowClient
forall env. Activity env WorkflowClient
activityWorkflowClient