{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}

{- |
Module: Temporal.Activity
Description: Define and work with Temporal activities.

Temporal activities are units of work in a Temporal workflow. They encapsulate
a specific task or operation that needs to be executed within a workflow
instance. Activities can be synchronous or asynchronous and are designed to
perform external actions, such as interacting with databases, sending emails,
or invoking external services.

When writing activities to execute withing a workflow, we recommend writing
them while keeping the following principles in mind:

1. __Retryable__: Activities can be automatically retried by the Temporal
   system in case of failures. You can configure the retry behavior, such as
   the maximum number of retries and retry intervals.

2. __Idempotency__: Since Activities are designed to be retryable,
   executing an activity multiple times should not have unintended side
   effects. This is crucial for ensuring the reliability of workflow
   executions.

3. __Cancellable__: Activities can be cancelled, which means that they can
   be stopped before they complete execution. This is useful for handling
   long-running activities that need to be aborted in certain situations.
   As a consequence, activities should to be written to receive 'ActivityCancelReason'
   exceptions asynchronously.
-}
module Temporal.Activity (
  Activity,

  -- * Built-in Acitivity primitives
  heartbeat,
  ActivityInfo (..),
  askActivityInfo,
  ActivityCancelReason (..),
  activityWorkflowClient,
  askActivityClient,

  -- * Defining Activities
  provideActivity,
  ProvidedActivity (..),
  KnownActivity (..),
  ActivityDefinition (..),

  -- * Asynchronous Completion
  TaskToken (..),
  CompleteAsync (..),
  completeAsync,

  -- * Commonly used
  (:->:),
) where

import Control.Monad
import Control.Monad.IO.Class
import Control.Monad.Reader.Class
import Data.ProtoLens
import Data.Proxy
import Data.Text (Text)
import Lens.Family2
import qualified Proto.Temporal.Sdk.Core.CoreInterface_Fields as Proto
import Temporal.Activity.Definition
import Temporal.Activity.Types
import Temporal.Activity.Worker
import Temporal.Client
import Temporal.Common
import Temporal.Core.Worker (getWorkerClient, recordActivityHeartbeat)
import Temporal.Exception
import Temporal.Payload
import Temporal.Workflow.Types


{- | A utility function for constructing an 'ActivityDefinition' from a function as well as
a 'KnownActivity' value. This is useful for keeping the argument, codec, and result types
in sync with each other so that changes to the function are reflected at their use sites.

> myActivity :: Activity env ()
> myActivity = liftIO $ putStrLn "Hello world!"
>
> myActivityDef :: ActivityDefinition env (Activity env ())
> myActivityDef = provideWorkflow
>   JSON -- codec for serializing arguments and results
>   "myActivity" -- visible name of the workflow
>   myActivity -- the workflow function
-}
provideActivity
  :: forall codec env f
   . ( f ~ (ArgsOf f :->: Activity env (ResultOf (Activity env) f))
     , FunctionSupportsCodec codec (ArgsOf f) (ResultOf (Activity env) f)
     )
  => codec
  -> Text
  -> f
  -> ProvidedActivity env f
provideActivity :: forall codec env f.
(f ~ (ArgsOf f :->: Activity env (ResultOf (Activity env) f)),
 FunctionSupportsCodec
   codec (ArgsOf f) (ResultOf (Activity env) f)) =>
codec -> Text -> f -> ProvidedActivity env f
provideActivity codec
codec Text
name f
f =
  ProvidedActivity
    { definition :: ActivityDefinition env
definition =
        ActivityDefinition
          { activityName :: Text
activityName = Text
name
          , activityRun :: ActivityEnv env
-> ExecuteActivityInput -> IO (Either String Payload)
activityRun = \ActivityEnv env
actEnv ExecuteActivityInput
input -> do
              eAct <-
                codec
-> Proxy (ArgsOf f)
-> Proxy (Activity env (ResultOf (Activity env) f))
-> (ArgsOf f :->: Activity env (ResultOf (Activity env) f))
-> Vector Payload
-> IO (Either String (Activity env (ResultOf (Activity env) f)))
forall result.
codec
-> Proxy (ArgsOf f)
-> Proxy result
-> (ArgsOf f :->: result)
-> Vector Payload
-> IO (Either String result)
forall codec (args :: [*]) result.
ApplyPayloads codec args =>
codec
-> Proxy args
-> Proxy result
-> (args :->: result)
-> Vector Payload
-> IO (Either String result)
applyPayloads
                  codec
codec
                  (forall (t :: [*]). Proxy t
forall {k} (t :: k). Proxy t
Proxy @(ArgsOf f))
                  (forall t. Proxy t
forall {k} (t :: k). Proxy t
Proxy @(Activity env (ResultOf (Activity env) f)))
                  f
ArgsOf f :->: Activity env (ResultOf (Activity env) f)
f
                  ExecuteActivityInput
input.activityArgs
              traverse (runActivity actEnv >=> encode codec) eAct
          }
    , reference :: KnownActivity (ArgsOf f) (ResultOf (Activity env) f)
reference =
        KnownActivity
          { knownActivityCodec :: codec
knownActivityCodec = codec
codec
          , knownActivityName :: Text
knownActivityName = Text
name
          }
    }


{- |
An Activity Heartbeat is a ping from the Worker Process that is executing the Activity to the Temporal Cluster. Each Heartbeat informs the Temporal Cluster that the Activity Execution is making progress and the Worker has not crashed. If the Cluster does not receive a Heartbeat within a Heartbeat Timeout time period, the Activity will be considered failed and another Activity Task Execution may be scheduled according to the Retry Policy.

Heartbeats may not always be sent to the Cluster—they may be throttled by the Worker.

Activity Cancellations are delivered to Activities from the Cluster when they Heartbeat. Activities that don't Heartbeat can't receive a Cancellation. Heartbeat throttling may lead to Cancellation getting delivered later than expected.

Heartbeats can contain payloads describing the Activity's current progress. If an Activity gets retried, the Activity can access the details from the last Heartbeat that was sent to the Cluster.
-}
heartbeat :: [Payload] -> Activity env ()
heartbeat :: forall env. [Payload] -> Activity env ()
heartbeat [Payload]
baseDetails = do
  (TaskToken token) <- ActivityInfo -> TaskToken
taskToken (ActivityInfo -> TaskToken)
-> Activity env ActivityInfo -> Activity env TaskToken
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Activity env ActivityInfo
forall env. Activity env ActivityInfo
askActivityInfo
  worker <- askActivityWorker
  let details =
        ActivityHeartbeat
forall msg. Message msg => msg
defMessage
          ActivityHeartbeat
-> (ActivityHeartbeat -> ActivityHeartbeat) -> ActivityHeartbeat
forall s t. s -> (s -> t) -> t
& LensLike' f ActivityHeartbeat ByteString
forall {f :: * -> *}.
Identical f =>
LensLike' f ActivityHeartbeat ByteString
forall (f :: * -> *) s a.
(Functor f, HasField s "taskToken" a) =>
LensLike' f s a
Proto.taskToken (forall {f :: * -> *}.
 Identical f =>
 LensLike' f ActivityHeartbeat ByteString)
-> ByteString -> ActivityHeartbeat -> ActivityHeartbeat
forall s t a b. Setter s t a b -> b -> s -> t
.~ ByteString
token
          ActivityHeartbeat
-> (ActivityHeartbeat -> ActivityHeartbeat) -> ActivityHeartbeat
forall s t. s -> (s -> t) -> t
& LensLike' f ActivityHeartbeat [Payload]
forall {f :: * -> *}.
Identical f =>
LensLike' f ActivityHeartbeat [Payload]
forall (f :: * -> *) s a.
(Functor f, HasField s "details" a) =>
LensLike' f s a
Proto.details (forall {f :: * -> *}.
 Identical f =>
 LensLike' f ActivityHeartbeat [Payload])
-> [Payload] -> ActivityHeartbeat -> ActivityHeartbeat
forall s t a b. Setter s t a b -> b -> s -> t
.~ (Payload -> Payload) -> [Payload] -> [Payload]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Payload -> Payload
convertToProtoPayload [Payload]
baseDetails
  -- TODO throw exception if this fails?
  void $ liftIO $ recordActivityHeartbeat worker details


{- | It is very common for an Activity to do things that interact
with another Workflow, like using 'query' or 'signal'. This function
provides a 'WorkflowClient' that can be used to accomplish that.
-}
activityWorkflowClient :: Activity env WorkflowClient
activityWorkflowClient :: forall env. Activity env WorkflowClient
activityWorkflowClient = ReaderT (ActivityEnv env) IO WorkflowClient
-> Activity env WorkflowClient
forall env a. ReaderT (ActivityEnv env) IO a -> Activity env a
Activity (ReaderT (ActivityEnv env) IO WorkflowClient
 -> Activity env WorkflowClient)
-> ReaderT (ActivityEnv env) IO WorkflowClient
-> Activity env WorkflowClient
forall a b. (a -> b) -> a -> b
$ do
  e <- ReaderT (ActivityEnv env) IO (ActivityEnv env)
forall r (m :: * -> *). MonadReader r m => m r
ask
  workflowClient (getWorkerClient e.activityWorker) $
    WorkflowClientConfig
      { namespace = e.activityInfo.workflowNamespace
      , interceptors = e.activityClientInterceptors
      , payloadProcessor = e.activityPayloadProcessor
      }


instance HasWorkflowClient (Activity env) where
  askWorkflowClient :: Activity env WorkflowClient
askWorkflowClient = Activity env WorkflowClient
forall env. Activity env WorkflowClient
activityWorkflowClient