{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TypeApplications #-}

{- |
Module: Temporal.Contrib.OpenTelemetry
Description: OpenTelemetry instrumentation for Temporal workflows and activities.

OpenTelemetry instrumentation for Temporal workflows and activities.

This module provides an interceptor for Temporal workflows and activities that
automatically instruments them with OpenTelemetry tracing. It also provides
functions for extracting and injecting OpenTelemetry context from and to
Temporal headers.

To use, initialize the OpenTelemetry 'globalTracerProvider'. Then, add
the interceptor to your Temporal client and worker configuration.
-}
module Temporal.Contrib.OpenTelemetry where

import Control.Monad.IO.Class
import qualified Data.HashMap.Strict as HashMap
import Data.Int
import qualified Data.Map.Strict as Map
import qualified Data.Text as T
import Data.Version (showVersion)
import Data.Word (Word32)
import OpenTelemetry.Attributes (emptyAttributes)
import qualified OpenTelemetry.Context as Ctxt
import OpenTelemetry.Context.ThreadLocal (attachContext, getContext)
import OpenTelemetry.Propagator
import OpenTelemetry.Propagator.W3CTraceContext
import OpenTelemetry.Trace.Core
import Paths_temporal_sdk
import Temporal.Activity.Types
-- TODO rework WorkflowExitVariant to not expose internals

import Temporal.Client.Types (
  StartWorkflowOptions (..),
 )
import Temporal.Common
import Temporal.Duration
import Temporal.Interceptor
import Temporal.Payload (Payload (..))
import Temporal.Workflow ()
import Temporal.Workflow.Types
import Prelude hiding (span)


-- | "_tracer-data"
defaultHeaderKey :: T.Text
defaultHeaderKey :: Text
defaultHeaderKey = Text
"_tracer-data"


data OpenTelemetryInterceptorOptions = OpenTelemetryInterceptorOptions
  { OpenTelemetryInterceptorOptions -> Maybe TracerProvider
tracerProvider :: Maybe TracerProvider
  , OpenTelemetryInterceptorOptions -> Text
headerKey :: T.Text
  }


defaultOpenTelemetryInterceptorOptions :: OpenTelemetryInterceptorOptions
defaultOpenTelemetryInterceptorOptions :: OpenTelemetryInterceptorOptions
defaultOpenTelemetryInterceptorOptions =
  OpenTelemetryInterceptorOptions
    { tracerProvider :: Maybe TracerProvider
tracerProvider = Maybe TracerProvider
forall a. Maybe a
Nothing
    , headerKey :: Text
headerKey = Text
defaultHeaderKey
    }


headersPropagator :: Propagator Ctxt.Context (Map.Map T.Text Payload) (Map.Map T.Text Payload)
headersPropagator :: Propagator Context (Map Text Payload) (Map Text Payload)
headersPropagator =
  Propagator
    { propagatorNames :: [Text]
propagatorNames = [Text
"tracecontext"]
    , extractor :: Map Text Payload -> Context -> IO Context
extractor = \Map Text Payload
hs Context
c -> do
        let traceParentHeader :: Maybe ByteString
traceParentHeader = Payload -> ByteString
payloadData (Payload -> ByteString) -> Maybe Payload -> Maybe ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Text -> Map Text Payload -> Maybe Payload
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup Text
"traceparent" Map Text Payload
hs
            traceStateHeader :: Maybe ByteString
traceStateHeader = Payload -> ByteString
payloadData (Payload -> ByteString) -> Maybe Payload -> Maybe ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Text -> Map Text Payload -> Maybe Payload
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup Text
"tracestate" Map Text Payload
hs
            mspanContext :: Maybe SpanContext
mspanContext = Maybe ByteString -> Maybe ByteString -> Maybe SpanContext
decodeSpanContext Maybe ByteString
traceParentHeader Maybe ByteString
traceStateHeader
        Context -> IO Context
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Context -> IO Context) -> Context -> IO Context
forall a b. (a -> b) -> a -> b
$! case Maybe SpanContext
mspanContext of
          Maybe SpanContext
Nothing -> Context
c
          Just SpanContext
s -> Span -> Context -> Context
Ctxt.insertSpan (SpanContext -> Span
wrapSpanContext (SpanContext
s {isRemote = True})) Context
c
    , injector :: Context -> Map Text Payload -> IO (Map Text Payload)
injector = \Context
c Map Text Payload
hs -> case Context -> Maybe Span
Ctxt.lookupSpan Context
c of
        Maybe Span
Nothing -> Map Text Payload -> IO (Map Text Payload)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Map Text Payload
hs
        Just Span
s -> do
          (traceParentHeader, traceStateHeader) <- Span -> IO (ByteString, ByteString)
encodeSpanContext Span
s
          pure $
            Map.insert "traceparent" (Payload traceParentHeader mempty) $
              Map.insert "tracestate" (Payload traceStateHeader mempty) hs
    }


--    * Workflow is scheduled by a client
--    */
--   WORKFLOW_START = 'StartWorkflow',

--   /**
--    * Workflow is client calls signalWithStart
--    */
--   WORKFLOW_SIGNAL_WITH_START = 'SignalWithStartWorkflow',

--   /**
--    * Workflow run is executing
--    */
--   WORKFLOW_EXECUTE = 'RunWorkflow',
--   /**
--    * Child Workflow is started (by parent Workflow)
--    */
--   CHILD_WORKFLOW_START = 'StartChildWorkflow',
--   /**
--    * Activity is scheduled by a Workflow
--    */
--   ACTIVITY_START = 'StartActivity',
--   /**
--    * Activity is executing
--    */
--   ACTIVITY_EXECUTE = 'RunActivity',
--   /**
--    * Workflow is continuing as new
--    */
-- CONTINUE_AS_NEW = 'ContinueAsNew',

-- TODO, we will need to account for replays when we support them
makeOpenTelemetryInterceptor :: MonadIO m => m (Interceptors env)
makeOpenTelemetryInterceptor :: forall (m :: * -> *) env. MonadIO m => m (Interceptors env)
makeOpenTelemetryInterceptor = do
  tracerProvider <- m TracerProvider
forall (m :: * -> *). MonadIO m => m TracerProvider
getGlobalTracerProvider
  let tracer =
        TracerProvider -> InstrumentationLibrary -> TracerOptions -> Tracer
makeTracer
          TracerProvider
tracerProvider
          (Text -> Text -> InstrumentationLibrary
InstrumentationLibrary Text
"temporal-sdk" (String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Version -> String
showVersion Version
Paths_temporal_sdk.version))
          (Maybe Text -> TracerOptions
TracerOptions Maybe Text
forall a. Maybe a
Nothing)
  return $
    Interceptors
      { workflowInboundInterceptors =
          WorkflowInboundInterceptor
            { executeWorkflow = \ExecuteWorkflowInput
input ExecuteWorkflowInput -> IO (WorkflowExitVariant Payload)
next -> do
                ctxt <- Propagator Context (Map Text Payload) (Map Text Payload)
-> Map Text Payload -> Context -> IO Context
forall (m :: * -> *) context i o.
MonadIO m =>
Propagator context i o -> i -> context -> m context
extract Propagator Context (Map Text Payload) (Map Text Payload)
headersPropagator ExecuteWorkflowInput
input.executeWorkflowInputHeaders Context
Ctxt.empty
                _ <- attachContext ctxt
                let spanArgs =
                      SpanArguments
defaultSpanArguments
                        { kind = Server
                        , attributes =
                            HashMap.fromList $
                              concat
                                [
                                  [ ("temporal.workflow_id", toAttribute $ rawWorkflowId input.executeWorkflowInputInfo.workflowId)
                                  , ("temporal.run_id", toAttribute $ rawRunId input.executeWorkflowInputInfo.runId)
                                  , ("temporal.workflow_type", toAttribute input.executeWorkflowInputType)
                                  , ("temporal.attempt", toAttribute input.executeWorkflowInputInfo.attempt)
                                  , ("temporal.namespace", toAttribute $ rawNamespace input.executeWorkflowInputInfo.namespace)
                                  , ("temporal.task_queue", toAttribute $ rawTaskQueue input.executeWorkflowInputInfo.taskQueue)
                                  ]
                                , maybe
                                    []
                                    (\Duration
executionTimeout -> [(Text
"temporal.execution_timeout_ms", Double -> Attribute
forall a. ToAttribute a => a -> Attribute
toAttribute (Double -> Attribute) -> Double -> Attribute
forall a b. (a -> b) -> a -> b
$ Duration -> Double
durationToMilliseconds Duration
executionTimeout)])
                                    input.executeWorkflowInputInfo.executionTimeout
                                , maybe
                                    []
                                    ( \RunId
continuedRunId ->
                                        [ (Text
"temporal.continued_run_id", Text -> Attribute
forall a. ToAttribute a => a -> Attribute
toAttribute (Text -> Attribute) -> Text -> Attribute
forall a b. (a -> b) -> a -> b
$ RunId -> Text
rawRunId RunId
continuedRunId)
                                        ]
                                    )
                                    input.executeWorkflowInputInfo.continuedRunId
                                , maybe
                                    []
                                    ( \Text
cronSchedule ->
                                        [(Text
"temporal.cron_schedule", Text -> Attribute
forall a. ToAttribute a => a -> Attribute
toAttribute Text
cronSchedule)]
                                    )
                                    input.executeWorkflowInputInfo.cronSchedule
                                , maybe
                                    []
                                    ( \ParentInfo
parentInfo ->
                                        [ (Text
"temporal.parent.namespace", Text -> Attribute
forall a. ToAttribute a => a -> Attribute
toAttribute (Text -> Attribute) -> Text -> Attribute
forall a b. (a -> b) -> a -> b
$ Namespace -> Text
rawNamespace ParentInfo
parentInfo.parentNamespace)
                                        , (Text
"temporal.parent.run_id", Text -> Attribute
forall a. ToAttribute a => a -> Attribute
toAttribute (Text -> Attribute) -> Text -> Attribute
forall a b. (a -> b) -> a -> b
$ RunId -> Text
rawRunId ParentInfo
parentInfo.parentRunId)
                                        , (Text
"temporal.parent.workflow_id", Text -> Attribute
forall a. ToAttribute a => a -> Attribute
toAttribute (Text -> Attribute) -> Text -> Attribute
forall a b. (a -> b) -> a -> b
$ WorkflowId -> Text
rawWorkflowId ParentInfo
parentInfo.parentWorkflowId)
                                        ]
                                    )
                                    input.executeWorkflowInputInfo.parent
                                , maybe
                                    []
                                    ( \RetryPolicy
retryPolicy ->
                                        ([(Text, Attribute)] -> [(Text, Attribute)])
-> (Duration -> [(Text, Attribute)] -> [(Text, Attribute)])
-> Maybe Duration
-> [(Text, Attribute)]
-> [(Text, Attribute)]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe
                                          [(Text, Attribute)] -> [(Text, Attribute)]
forall a. a -> a
id
                                          (\Duration
maxInterval -> ((Text
"temporal.retry_policy.maximum_interval_ms", Double -> Attribute
forall a. ToAttribute a => a -> Attribute
toAttribute (Double -> Attribute) -> Double -> Attribute
forall a b. (a -> b) -> a -> b
$ Duration -> Double
durationToMilliseconds Duration
maxInterval) (Text, Attribute) -> [(Text, Attribute)] -> [(Text, Attribute)]
forall a. a -> [a] -> [a]
:))
                                          RetryPolicy
retryPolicy.maximumInterval
                                          [ (Text
"temporal.retry_policy.initial_interval_ms", Double -> Attribute
forall a. ToAttribute a => a -> Attribute
toAttribute (Double -> Attribute) -> Double -> Attribute
forall a b. (a -> b) -> a -> b
$ Duration -> Double
durationToMilliseconds RetryPolicy
retryPolicy.initialInterval)
                                          , (Text
"temporal.retry_policy.backoff_coefficient", Double -> Attribute
forall a. ToAttribute a => a -> Attribute
toAttribute RetryPolicy
retryPolicy.backoffCoefficient)
                                          , (Text
"temporal.retry_policy.maximum_attempts", Int -> Attribute
forall a. ToAttribute a => a -> Attribute
toAttribute (Int -> Attribute) -> Int -> Attribute
forall a b. (a -> b) -> a -> b
$ forall a b. (Integral a, Num b) => a -> b
fromIntegral @Int32 @Int (Int32 -> Int) -> Int32 -> Int
forall a b. (a -> b) -> a -> b
$ RetryPolicy
retryPolicy.maximumAttempts)
                                          ]
                                    )
                                    input.executeWorkflowInputInfo.retryPolicy
                                ]
                        }
                inSpan'' tracer ("RunWorkflow:" <> input.executeWorkflowInputType) spanArgs $ \Span
span -> do
                  execution <- ExecuteWorkflowInput -> IO (WorkflowExitVariant Payload)
next ExecuteWorkflowInput
input
                  case execution of
                    WorkflowExitFailed SomeException
e -> do
                      -- TODO use our enrichment handlers here
                      Span -> SpanStatus -> IO ()
forall (m :: * -> *). MonadIO m => Span -> SpanStatus -> m ()
setStatus Span
span (Text -> SpanStatus
Error (Text -> SpanStatus) -> Text -> SpanStatus
forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ SomeException -> String
forall a. Show a => a -> String
show SomeException
e)
                      Span
-> HashMap Text Attribute
-> Maybe Timestamp
-> SomeException
-> IO ()
forall (m :: * -> *) e.
(MonadIO m, Exception e) =>
Span -> HashMap Text Attribute -> Maybe Timestamp -> e -> m ()
recordException Span
span HashMap Text Attribute
forall a. Monoid a => a
mempty Maybe Timestamp
forall a. Maybe a
Nothing SomeException
e
                    WorkflowExitVariant Payload
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                  pure execution
            , handleQuery = \HandleQueryInput
input HandleQueryInput -> IO (Either SomeException Payload)
next -> do
                -- Only trace this if there is a header, and make that span the parent.
                -- We do not put anything that happens in a query handler on the workflow
                -- span.
                --
                -- However, we do _link_ the query span to the workflow span if we have the
                -- context for it.
                let spanArgs :: SpanArguments
spanArgs =
                      SpanArguments
defaultSpanArguments
                        { kind = Server
                        , attributes = mempty
                        -- HashMap.fromList
                        --   [ ("temporal.workflow_id", toAttribute $ rawWorkflowId $ input.handleQueryInputInfo.workflowId)
                        --   , ("temporal.run_id", toAttribute $ rawRunId $ input.handleQueryInputInfo.runId)
                        --   , ("temporal.workflow_type", toAttribute $ input.handleQueryInputType)
                        --   ]
                        }
                ctxt <- Propagator Context (Map Text Payload) (Map Text Payload)
-> Map Text Payload -> Context -> IO Context
forall (m :: * -> *) context i o.
MonadIO m =>
Propagator context i o -> i -> context -> m context
extract Propagator Context (Map Text Payload) (Map Text Payload)
headersPropagator HandleQueryInput
input.handleQueryInputHeaders Context
Ctxt.empty
                _ <- attachContext ctxt
                case Ctxt.lookupSpan ctxt of
                  Maybe Span
Nothing -> HandleQueryInput -> IO (Either SomeException Payload)
next HandleQueryInput
input
                  Just Span
_ ->
                    Tracer
-> Text
-> SpanArguments
-> (Span -> IO (Either SomeException Payload))
-> IO (Either SomeException Payload)
forall (m :: * -> *) a.
(MonadUnliftIO m, HasCallStack) =>
Tracer -> Text -> SpanArguments -> (Span -> m a) -> m a
inSpan'' Tracer
tracer (Text
"HandleQuery:" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> HandleQueryInput
input.handleQueryInputType) SpanArguments
spanArgs ((Span -> IO (Either SomeException Payload))
 -> IO (Either SomeException Payload))
-> (Span -> IO (Either SomeException Payload))
-> IO (Either SomeException Payload)
forall a b. (a -> b) -> a -> b
$ \Span
_ -> do
                      HandleQueryInput -> IO (Either SomeException Payload)
next HandleQueryInput
input
            }
      , workflowOutboundInterceptors =
          WorkflowOutboundInterceptor
            { scheduleActivity = \ActivityInput
input ActivityInput -> IO (Task Payload)
next -> do
                let StartActivityOptions {Bool
Maybe Duration
Maybe RetryPolicy
Maybe ActivityId
Maybe TaskQueue
Map Text Payload
ActivityCancellationType
ActivityTimeoutPolicy
activityId :: Maybe ActivityId
taskQueue :: Maybe TaskQueue
timeout :: ActivityTimeoutPolicy
scheduleToStartTimeout :: Maybe Duration
heartbeatTimeout :: Maybe Duration
retryPolicy :: Maybe RetryPolicy
cancellationType :: ActivityCancellationType
headers :: Map Text Payload
disableEagerExecution :: Bool
disableEagerExecution :: StartActivityOptions -> Bool
headers :: StartActivityOptions -> Map Text Payload
cancellationType :: StartActivityOptions -> ActivityCancellationType
retryPolicy :: StartActivityOptions -> Maybe RetryPolicy
heartbeatTimeout :: StartActivityOptions -> Maybe Duration
scheduleToStartTimeout :: StartActivityOptions -> Maybe Duration
timeout :: StartActivityOptions -> ActivityTimeoutPolicy
taskQueue :: StartActivityOptions -> Maybe TaskQueue
activityId :: StartActivityOptions -> Maybe ActivityId
..} = ActivityInput
input.options
                    spanArgs :: SpanArguments
spanArgs =
                      SpanArguments
defaultSpanArguments
                        { kind = Client
                        }
                Tracer
-> Text
-> SpanArguments
-> (Span -> IO (Task Payload))
-> IO (Task Payload)
forall (m :: * -> *) a.
(MonadUnliftIO m, HasCallStack) =>
Tracer -> Text -> SpanArguments -> (Span -> m a) -> m a
inSpan'' Tracer
tracer (Text
"StartActivity:" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> ActivityInput
input.activityType) SpanArguments
spanArgs ((Span -> IO (Task Payload)) -> IO (Task Payload))
-> (Span -> IO (Task Payload)) -> IO (Task Payload)
forall a b. (a -> b) -> a -> b
$ \Span
_ -> do
                  ctxt <- IO Context
forall (m :: * -> *). MonadIO m => m Context
getContext
                  hdrs <- inject headersPropagator ctxt headers
                  next $ input {options = StartActivityOptions {headers = hdrs, ..}}
            , continueAsNew = \Text
n ContinueAsNewOptions {Maybe Duration
Maybe RetryPolicy
Maybe TaskQueue
Map Text Payload
Map SearchAttributeKey SearchAttributeType
taskQueue :: Maybe TaskQueue
runTimeout :: Maybe Duration
taskTimeout :: Maybe Duration
retryPolicy :: Maybe RetryPolicy
memo :: Map Text Payload
searchAttributes :: Map SearchAttributeKey SearchAttributeType
headers :: Map Text Payload
headers :: ContinueAsNewOptions -> Map Text Payload
searchAttributes :: ContinueAsNewOptions -> Map SearchAttributeKey SearchAttributeType
memo :: ContinueAsNewOptions -> Map Text Payload
retryPolicy :: ContinueAsNewOptions -> Maybe RetryPolicy
taskTimeout :: ContinueAsNewOptions -> Maybe Duration
runTimeout :: ContinueAsNewOptions -> Maybe Duration
taskQueue :: ContinueAsNewOptions -> Maybe TaskQueue
..} Text -> ContinueAsNewOptions -> IO a
next -> do
                ctxt <- IO Context
forall (m :: * -> *). MonadIO m => m Context
getContext
                hdrs <- inject headersPropagator ctxt headers
                next n (ContinueAsNewOptions {headers = hdrs, ..})
            , startChildWorkflowExecution = \Text
wfName StartChildWorkflowOptions
input Text
-> StartChildWorkflowOptions -> IO (ChildWorkflowHandle Payload)
next -> do
                let StartChildWorkflowOptions {Maybe Text
Maybe RetryPolicy
Maybe TaskQueue
Maybe WorkflowId
Map Text Payload
Map SearchAttributeKey SearchAttributeType
TimeoutOptions
WorkflowIdReusePolicy
ParentClosePolicy
ChildWorkflowCancellationType
cancellationType :: ChildWorkflowCancellationType
parentClosePolicy :: ParentClosePolicy
timeoutOptions :: TimeoutOptions
retryPolicy :: Maybe RetryPolicy
cronSchedule :: Maybe Text
initialMemo :: Map Text Payload
searchAttributes :: Map SearchAttributeKey SearchAttributeType
headers :: Map Text Payload
workflowIdReusePolicy :: WorkflowIdReusePolicy
workflowId :: Maybe WorkflowId
taskQueue :: Maybe TaskQueue
taskQueue :: StartChildWorkflowOptions -> Maybe TaskQueue
workflowId :: StartChildWorkflowOptions -> Maybe WorkflowId
workflowIdReusePolicy :: StartChildWorkflowOptions -> WorkflowIdReusePolicy
headers :: StartChildWorkflowOptions -> Map Text Payload
searchAttributes :: StartChildWorkflowOptions
-> Map SearchAttributeKey SearchAttributeType
initialMemo :: StartChildWorkflowOptions -> Map Text Payload
cronSchedule :: StartChildWorkflowOptions -> Maybe Text
retryPolicy :: StartChildWorkflowOptions -> Maybe RetryPolicy
timeoutOptions :: StartChildWorkflowOptions -> TimeoutOptions
parentClosePolicy :: StartChildWorkflowOptions -> ParentClosePolicy
cancellationType :: StartChildWorkflowOptions -> ChildWorkflowCancellationType
..} = StartChildWorkflowOptions
input
                    spanArgs :: SpanArguments
spanArgs =
                      SpanArguments
defaultSpanArguments
                        { kind = Client
                        }
                Tracer
-> Text
-> SpanArguments
-> (Span -> IO (ChildWorkflowHandle Payload))
-> IO (ChildWorkflowHandle Payload)
forall (m :: * -> *) a.
(MonadUnliftIO m, HasCallStack) =>
Tracer -> Text -> SpanArguments -> (Span -> m a) -> m a
inSpan'' Tracer
tracer (Text
"StartChildWorkflow:" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
wfName) SpanArguments
spanArgs ((Span -> IO (ChildWorkflowHandle Payload))
 -> IO (ChildWorkflowHandle Payload))
-> (Span -> IO (ChildWorkflowHandle Payload))
-> IO (ChildWorkflowHandle Payload)
forall a b. (a -> b) -> a -> b
$ \Span
_ -> do
                  ctxt <- IO Context
forall (m :: * -> *). MonadIO m => m Context
getContext
                  hdrs <- inject headersPropagator ctxt headers
                  next wfName $ StartChildWorkflowOptions {headers = hdrs, ..}
            }
      , activityInboundInterceptors =
          ActivityInboundInterceptor
            { executeActivity = \env
env ExecuteActivityInput
input env -> ExecuteActivityInput -> IO (Either String Payload)
next -> do
                let ActivityInfo {Bool
Maybe Duration
Maybe RetryPolicy
Word32
Text
Vector Payload
Map Text Payload
SystemTime
TaskToken
ActivityId
RunId
Namespace
WorkflowId
WorkflowType
workflowNamespace :: Namespace
workflowType :: WorkflowType
workflowId :: WorkflowId
runId :: RunId
activityId :: ActivityId
activityType :: Text
headerFields :: Map Text Payload
heartbeatDetails :: Vector Payload
scheduledTime :: SystemTime
currentAttemptScheduledTime :: SystemTime
startedTime :: SystemTime
attempt :: Word32
scheduleToCloseTimeout :: Maybe Duration
startToCloseTimeout :: Maybe Duration
heartbeatTimeout :: Maybe Duration
retryPolicy :: Maybe RetryPolicy
isLocal :: Bool
taskToken :: TaskToken
taskToken :: ActivityInfo -> TaskToken
isLocal :: ActivityInfo -> Bool
retryPolicy :: ActivityInfo -> Maybe RetryPolicy
heartbeatTimeout :: ActivityInfo -> Maybe Duration
startToCloseTimeout :: ActivityInfo -> Maybe Duration
scheduleToCloseTimeout :: ActivityInfo -> Maybe Duration
attempt :: ActivityInfo -> Word32
startedTime :: ActivityInfo -> SystemTime
currentAttemptScheduledTime :: ActivityInfo -> SystemTime
scheduledTime :: ActivityInfo -> SystemTime
heartbeatDetails :: ActivityInfo -> Vector Payload
headerFields :: ActivityInfo -> Map Text Payload
activityType :: ActivityInfo -> Text
activityId :: ActivityInfo -> ActivityId
runId :: ActivityInfo -> RunId
workflowId :: ActivityInfo -> WorkflowId
workflowType :: ActivityInfo -> WorkflowType
workflowNamespace :: ActivityInfo -> Namespace
..} = ExecuteActivityInput
input.activityInfo
                    spanArgs :: SpanArguments
spanArgs =
                      SpanArguments
defaultSpanArguments
                        { kind = Server
                        , attributes =
                            HashMap.fromList
                              [ ("temporal.workflow_id", toAttribute $ rawWorkflowId workflowId)
                              , ("temporal.workflow_type", toAttribute $ rawWorkflowType workflowType)
                              , ("temporal.run_id", toAttribute $ rawRunId runId)
                              , ("temporal.activity_id", toAttribute $ rawActivityId activityId)
                              , ("temporal.activity_type", toAttribute activityType)
                              , ("temporal.attempt", toAttribute $ fromIntegral @Word32 @Int attempt)
                              , -- , ("temporal.namespace", toAttribute $ rawNamespace $ input.activityInfo.namespace)
                                ("temporal.activity_is_local", toAttribute isLocal)
                              ]
                        }

                ctxt <- Propagator Context (Map Text Payload) (Map Text Payload)
-> Map Text Payload -> Context -> IO Context
forall (m :: * -> *) context i o.
MonadIO m =>
Propagator context i o -> i -> context -> m context
extract Propagator Context (Map Text Payload) (Map Text Payload)
headersPropagator ExecuteActivityInput
input.activityHeaders Context
Ctxt.empty
                _ <- attachContext ctxt
                inSpan'' tracer ("RunActivity:" <> input.activityInfo.activityType) spanArgs $ \Span
_span -> do
                  env -> ExecuteActivityInput -> IO (Either String Payload)
next env
env ExecuteActivityInput
input
            }
      , activityOutboundInterceptors = ActivityOutboundInterceptor
      , clientInterceptors =
          ClientInterceptors
            { start = \WorkflowType
ty WorkflowId
wfId StartWorkflowOptions {Bool
Maybe Text
Maybe Duration
Maybe WorkflowIdReusePolicy
Maybe RetryPolicy
Map Text Payload
Map SearchAttributeKey SearchAttributeType
TimeoutOptions
TaskQueue
taskQueue :: TaskQueue
followRuns :: Bool
workflowIdReusePolicy :: Maybe WorkflowIdReusePolicy
retryPolicy :: Maybe RetryPolicy
cronSchedule :: Maybe Text
memo :: Map Text Payload
searchAttributes :: Map SearchAttributeKey SearchAttributeType
headers :: Map Text Payload
timeouts :: TimeoutOptions
requestEagerExecution :: Bool
workflowStartDelay :: Maybe Duration
workflowStartDelay :: StartWorkflowOptions -> Maybe Duration
requestEagerExecution :: StartWorkflowOptions -> Bool
timeouts :: StartWorkflowOptions -> TimeoutOptions
headers :: StartWorkflowOptions -> Map Text Payload
searchAttributes :: StartWorkflowOptions -> Map SearchAttributeKey SearchAttributeType
memo :: StartWorkflowOptions -> Map Text Payload
cronSchedule :: StartWorkflowOptions -> Maybe Text
retryPolicy :: StartWorkflowOptions -> Maybe RetryPolicy
workflowIdReusePolicy :: StartWorkflowOptions -> Maybe WorkflowIdReusePolicy
followRuns :: StartWorkflowOptions -> Bool
taskQueue :: StartWorkflowOptions -> TaskQueue
..} Vector Payload
ps WorkflowType
-> WorkflowId
-> StartWorkflowOptions
-> Vector Payload
-> IO (WorkflowHandle Payload)
next -> do
                let spanArgs :: SpanArguments
spanArgs =
                      SpanArguments
defaultSpanArguments
                        { kind = Client
                        , attributes =
                            HashMap.fromList
                              [ ("temporal.workflow_id", toAttribute $ rawWorkflowId wfId)
                              , ("temporal.workflow_type", toAttribute $ rawWorkflowType ty)
                              ]
                        }
                Tracer
-> Text
-> SpanArguments
-> (Span -> IO (WorkflowHandle Payload))
-> IO (WorkflowHandle Payload)
forall (m :: * -> *) a.
(MonadUnliftIO m, HasCallStack) =>
Tracer -> Text -> SpanArguments -> (Span -> m a) -> m a
inSpan'' Tracer
tracer (Text
"StartWorkflow:" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> WorkflowType -> Text
rawWorkflowType WorkflowType
ty) SpanArguments
spanArgs ((Span -> IO (WorkflowHandle Payload))
 -> IO (WorkflowHandle Payload))
-> (Span -> IO (WorkflowHandle Payload))
-> IO (WorkflowHandle Payload)
forall a b. (a -> b) -> a -> b
$ \Span
_ -> do
                  ctxt <- IO Context
forall (m :: * -> *). MonadIO m => m Context
getContext
                  hdrs <- inject headersPropagator ctxt headers
                  next ty wfId (StartWorkflowOptions {headers = hdrs, ..}) ps
            , queryWorkflow = \QueryWorkflowInput
input QueryWorkflowInput -> IO (Either QueryRejected Payload)
next -> do
                let spanArgs :: SpanArguments
spanArgs =
                      SpanArguments
defaultSpanArguments
                        { kind = Client
                        }
                Tracer
-> Text
-> SpanArguments
-> (Span -> IO (Either QueryRejected Payload))
-> IO (Either QueryRejected Payload)
forall (m :: * -> *) a.
(MonadUnliftIO m, HasCallStack) =>
Tracer -> Text -> SpanArguments -> (Span -> m a) -> m a
inSpan'' Tracer
tracer (Text
"QueryWorkflow:" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> QueryWorkflowInput -> Text
queryWorkflowType QueryWorkflowInput
input) SpanArguments
spanArgs ((Span -> IO (Either QueryRejected Payload))
 -> IO (Either QueryRejected Payload))
-> (Span -> IO (Either QueryRejected Payload))
-> IO (Either QueryRejected Payload)
forall a b. (a -> b) -> a -> b
$ \Span
_ -> do
                  ctxt <- IO Context
forall (m :: * -> *). MonadIO m => m Context
getContext
                  hdrs <- inject headersPropagator ctxt input.queryWorkflowHeaders
                  next (input {queryWorkflowHeaders = hdrs})
            , signalWithStart = \SignalWithStartWorkflowInput
input SignalWithStartWorkflowInput -> IO (WorkflowHandle Payload)
next -> do
                let spanArgs :: SpanArguments
spanArgs =
                      SpanArguments
defaultSpanArguments
                        { kind = Client
                        }
                Tracer
-> Text
-> SpanArguments
-> (Span -> IO (WorkflowHandle Payload))
-> IO (WorkflowHandle Payload)
forall (m :: * -> *) a.
(MonadUnliftIO m, HasCallStack) =>
Tracer -> Text -> SpanArguments -> (Span -> m a) -> m a
inSpan'' Tracer
tracer (Text
"SignalWithStartWorkflow:" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> WorkflowType -> Text
rawWorkflowType (SignalWithStartWorkflowInput -> WorkflowType
signalWithStartWorkflowType SignalWithStartWorkflowInput
input)) SpanArguments
spanArgs ((Span -> IO (WorkflowHandle Payload))
 -> IO (WorkflowHandle Payload))
-> (Span -> IO (WorkflowHandle Payload))
-> IO (WorkflowHandle Payload)
forall a b. (a -> b) -> a -> b
$ \Span
_ -> do
                  ctxt <- IO Context
forall (m :: * -> *). MonadIO m => m Context
getContext
                  hdrs <- inject headersPropagator ctxt input.signalWithStartOptions.headers
                  next (input {signalWithStartOptions = (signalWithStartOptions input) {Temporal.Client.Types.headers = hdrs}})
            }
      , -- Not really anything to do here since new cron jobs should be in their own context
        scheduleClientInterceptors = mempty
      }