{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TypeApplications #-}
module Temporal.Contrib.OpenTelemetry where
import Control.Monad.IO.Class
import qualified Data.HashMap.Strict as HashMap
import Data.Int
import qualified Data.Map.Strict as Map
import qualified Data.Text as T
import Data.Version (showVersion)
import Data.Word (Word32)
import OpenTelemetry.Attributes (emptyAttributes)
import qualified OpenTelemetry.Context as Ctxt
import OpenTelemetry.Context.ThreadLocal (attachContext, getContext)
import OpenTelemetry.Propagator
import OpenTelemetry.Propagator.W3CTraceContext
import OpenTelemetry.Trace.Core
import Paths_temporal_sdk
import Temporal.Activity.Types
import Temporal.Client.Types (
StartWorkflowOptions (..),
)
import Temporal.Common
import Temporal.Duration
import Temporal.Interceptor
import Temporal.Payload (Payload (..))
import Temporal.Workflow ()
import Temporal.Workflow.Types
import Prelude hiding (span)
defaultHeaderKey :: T.Text
= Text
"_tracer-data"
data OpenTelemetryInterceptorOptions = OpenTelemetryInterceptorOptions
{ OpenTelemetryInterceptorOptions -> Maybe TracerProvider
tracerProvider :: Maybe TracerProvider
, :: T.Text
}
defaultOpenTelemetryInterceptorOptions :: OpenTelemetryInterceptorOptions
defaultOpenTelemetryInterceptorOptions :: OpenTelemetryInterceptorOptions
defaultOpenTelemetryInterceptorOptions =
OpenTelemetryInterceptorOptions
{ tracerProvider :: Maybe TracerProvider
tracerProvider = Maybe TracerProvider
forall a. Maybe a
Nothing
, headerKey :: Text
headerKey = Text
defaultHeaderKey
}
headersPropagator :: Propagator Ctxt.Context (Map.Map T.Text Payload) (Map.Map T.Text Payload)
=
Propagator
{ propagatorNames :: [Text]
propagatorNames = [Text
"tracecontext"]
, extractor :: Map Text Payload -> Context -> IO Context
extractor = \Map Text Payload
hs Context
c -> do
let traceParentHeader :: Maybe ByteString
traceParentHeader = Payload -> ByteString
payloadData (Payload -> ByteString) -> Maybe Payload -> Maybe ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Text -> Map Text Payload -> Maybe Payload
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup Text
"traceparent" Map Text Payload
hs
traceStateHeader :: Maybe ByteString
traceStateHeader = Payload -> ByteString
payloadData (Payload -> ByteString) -> Maybe Payload -> Maybe ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Text -> Map Text Payload -> Maybe Payload
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup Text
"tracestate" Map Text Payload
hs
mspanContext :: Maybe SpanContext
mspanContext = Maybe ByteString -> Maybe ByteString -> Maybe SpanContext
decodeSpanContext Maybe ByteString
traceParentHeader Maybe ByteString
traceStateHeader
Context -> IO Context
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Context -> IO Context) -> Context -> IO Context
forall a b. (a -> b) -> a -> b
$! case Maybe SpanContext
mspanContext of
Maybe SpanContext
Nothing -> Context
c
Just SpanContext
s -> Span -> Context -> Context
Ctxt.insertSpan (SpanContext -> Span
wrapSpanContext (SpanContext
s {isRemote = True})) Context
c
, injector :: Context -> Map Text Payload -> IO (Map Text Payload)
injector = \Context
c Map Text Payload
hs -> case Context -> Maybe Span
Ctxt.lookupSpan Context
c of
Maybe Span
Nothing -> Map Text Payload -> IO (Map Text Payload)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Map Text Payload
hs
Just Span
s -> do
(traceParentHeader, traceStateHeader) <- Span -> IO (ByteString, ByteString)
encodeSpanContext Span
s
pure $
Map.insert "traceparent" (Payload traceParentHeader mempty) $
Map.insert "tracestate" (Payload traceStateHeader mempty) hs
}
makeOpenTelemetryInterceptor :: MonadIO m => m (Interceptors env)
makeOpenTelemetryInterceptor :: forall (m :: * -> *) env. MonadIO m => m (Interceptors env)
makeOpenTelemetryInterceptor = do
tracerProvider <- m TracerProvider
forall (m :: * -> *). MonadIO m => m TracerProvider
getGlobalTracerProvider
let tracer =
TracerProvider -> InstrumentationLibrary -> TracerOptions -> Tracer
makeTracer
TracerProvider
tracerProvider
(Text -> Text -> InstrumentationLibrary
InstrumentationLibrary Text
"temporal-sdk" (String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Version -> String
showVersion Version
Paths_temporal_sdk.version))
(Maybe Text -> TracerOptions
TracerOptions Maybe Text
forall a. Maybe a
Nothing)
return $
Interceptors
{ workflowInboundInterceptors =
WorkflowInboundInterceptor
{ executeWorkflow = \ExecuteWorkflowInput
input ExecuteWorkflowInput -> IO (WorkflowExitVariant Payload)
next -> do
ctxt <- Propagator Context (Map Text Payload) (Map Text Payload)
-> Map Text Payload -> Context -> IO Context
forall (m :: * -> *) context i o.
MonadIO m =>
Propagator context i o -> i -> context -> m context
extract Propagator Context (Map Text Payload) (Map Text Payload)
headersPropagator ExecuteWorkflowInput
input.executeWorkflowInputHeaders Context
Ctxt.empty
_ <- attachContext ctxt
let spanArgs =
SpanArguments
defaultSpanArguments
{ kind = Server
, attributes =
HashMap.fromList $
concat
[
[ ("temporal.workflow_id", toAttribute $ rawWorkflowId input.executeWorkflowInputInfo.workflowId)
, ("temporal.run_id", toAttribute $ rawRunId input.executeWorkflowInputInfo.runId)
, ("temporal.workflow_type", toAttribute input.executeWorkflowInputType)
, ("temporal.attempt", toAttribute input.executeWorkflowInputInfo.attempt)
, ("temporal.namespace", toAttribute $ rawNamespace input.executeWorkflowInputInfo.namespace)
, ("temporal.task_queue", toAttribute $ rawTaskQueue input.executeWorkflowInputInfo.taskQueue)
]
, maybe
[]
(\Duration
executionTimeout -> [(Text
"temporal.execution_timeout_ms", Double -> Attribute
forall a. ToAttribute a => a -> Attribute
toAttribute (Double -> Attribute) -> Double -> Attribute
forall a b. (a -> b) -> a -> b
$ Duration -> Double
durationToMilliseconds Duration
executionTimeout)])
input.executeWorkflowInputInfo.executionTimeout
, maybe
[]
( \RunId
continuedRunId ->
[ (Text
"temporal.continued_run_id", Text -> Attribute
forall a. ToAttribute a => a -> Attribute
toAttribute (Text -> Attribute) -> Text -> Attribute
forall a b. (a -> b) -> a -> b
$ RunId -> Text
rawRunId RunId
continuedRunId)
]
)
input.executeWorkflowInputInfo.continuedRunId
, maybe
[]
( \Text
cronSchedule ->
[(Text
"temporal.cron_schedule", Text -> Attribute
forall a. ToAttribute a => a -> Attribute
toAttribute Text
cronSchedule)]
)
input.executeWorkflowInputInfo.cronSchedule
, maybe
[]
( \ParentInfo
parentInfo ->
[ (Text
"temporal.parent.namespace", Text -> Attribute
forall a. ToAttribute a => a -> Attribute
toAttribute (Text -> Attribute) -> Text -> Attribute
forall a b. (a -> b) -> a -> b
$ Namespace -> Text
rawNamespace ParentInfo
parentInfo.parentNamespace)
, (Text
"temporal.parent.run_id", Text -> Attribute
forall a. ToAttribute a => a -> Attribute
toAttribute (Text -> Attribute) -> Text -> Attribute
forall a b. (a -> b) -> a -> b
$ RunId -> Text
rawRunId ParentInfo
parentInfo.parentRunId)
, (Text
"temporal.parent.workflow_id", Text -> Attribute
forall a. ToAttribute a => a -> Attribute
toAttribute (Text -> Attribute) -> Text -> Attribute
forall a b. (a -> b) -> a -> b
$ WorkflowId -> Text
rawWorkflowId ParentInfo
parentInfo.parentWorkflowId)
]
)
input.executeWorkflowInputInfo.parent
, maybe
[]
( \RetryPolicy
retryPolicy ->
([(Text, Attribute)] -> [(Text, Attribute)])
-> (Duration -> [(Text, Attribute)] -> [(Text, Attribute)])
-> Maybe Duration
-> [(Text, Attribute)]
-> [(Text, Attribute)]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe
[(Text, Attribute)] -> [(Text, Attribute)]
forall a. a -> a
id
(\Duration
maxInterval -> ((Text
"temporal.retry_policy.maximum_interval_ms", Double -> Attribute
forall a. ToAttribute a => a -> Attribute
toAttribute (Double -> Attribute) -> Double -> Attribute
forall a b. (a -> b) -> a -> b
$ Duration -> Double
durationToMilliseconds Duration
maxInterval) (Text, Attribute) -> [(Text, Attribute)] -> [(Text, Attribute)]
forall a. a -> [a] -> [a]
:))
RetryPolicy
retryPolicy.maximumInterval
[ (Text
"temporal.retry_policy.initial_interval_ms", Double -> Attribute
forall a. ToAttribute a => a -> Attribute
toAttribute (Double -> Attribute) -> Double -> Attribute
forall a b. (a -> b) -> a -> b
$ Duration -> Double
durationToMilliseconds RetryPolicy
retryPolicy.initialInterval)
, (Text
"temporal.retry_policy.backoff_coefficient", Double -> Attribute
forall a. ToAttribute a => a -> Attribute
toAttribute RetryPolicy
retryPolicy.backoffCoefficient)
, (Text
"temporal.retry_policy.maximum_attempts", Int -> Attribute
forall a. ToAttribute a => a -> Attribute
toAttribute (Int -> Attribute) -> Int -> Attribute
forall a b. (a -> b) -> a -> b
$ forall a b. (Integral a, Num b) => a -> b
fromIntegral @Int32 @Int (Int32 -> Int) -> Int32 -> Int
forall a b. (a -> b) -> a -> b
$ RetryPolicy
retryPolicy.maximumAttempts)
]
)
input.executeWorkflowInputInfo.retryPolicy
]
}
inSpan'' tracer ("RunWorkflow:" <> input.executeWorkflowInputType) spanArgs $ \Span
span -> do
execution <- ExecuteWorkflowInput -> IO (WorkflowExitVariant Payload)
next ExecuteWorkflowInput
input
case execution of
WorkflowExitFailed SomeException
e -> do
Span -> SpanStatus -> IO ()
forall (m :: * -> *). MonadIO m => Span -> SpanStatus -> m ()
setStatus Span
span (Text -> SpanStatus
Error (Text -> SpanStatus) -> Text -> SpanStatus
forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ SomeException -> String
forall a. Show a => a -> String
show SomeException
e)
Span
-> HashMap Text Attribute
-> Maybe Timestamp
-> SomeException
-> IO ()
forall (m :: * -> *) e.
(MonadIO m, Exception e) =>
Span -> HashMap Text Attribute -> Maybe Timestamp -> e -> m ()
recordException Span
span HashMap Text Attribute
forall a. Monoid a => a
mempty Maybe Timestamp
forall a. Maybe a
Nothing SomeException
e
WorkflowExitVariant Payload
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
pure execution
, handleQuery = \HandleQueryInput
input HandleQueryInput -> IO (Either SomeException Payload)
next -> do
let spanArgs :: SpanArguments
spanArgs =
SpanArguments
defaultSpanArguments
{ kind = Server
, attributes = mempty
}
ctxt <- Propagator Context (Map Text Payload) (Map Text Payload)
-> Map Text Payload -> Context -> IO Context
forall (m :: * -> *) context i o.
MonadIO m =>
Propagator context i o -> i -> context -> m context
extract Propagator Context (Map Text Payload) (Map Text Payload)
headersPropagator HandleQueryInput
input.handleQueryInputHeaders Context
Ctxt.empty
_ <- attachContext ctxt
case Ctxt.lookupSpan ctxt of
Maybe Span
Nothing -> HandleQueryInput -> IO (Either SomeException Payload)
next HandleQueryInput
input
Just Span
_ ->
Tracer
-> Text
-> SpanArguments
-> (Span -> IO (Either SomeException Payload))
-> IO (Either SomeException Payload)
forall (m :: * -> *) a.
(MonadUnliftIO m, HasCallStack) =>
Tracer -> Text -> SpanArguments -> (Span -> m a) -> m a
inSpan'' Tracer
tracer (Text
"HandleQuery:" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> HandleQueryInput
input.handleQueryInputType) SpanArguments
spanArgs ((Span -> IO (Either SomeException Payload))
-> IO (Either SomeException Payload))
-> (Span -> IO (Either SomeException Payload))
-> IO (Either SomeException Payload)
forall a b. (a -> b) -> a -> b
$ \Span
_ -> do
HandleQueryInput -> IO (Either SomeException Payload)
next HandleQueryInput
input
}
, workflowOutboundInterceptors =
WorkflowOutboundInterceptor
{ scheduleActivity = \ActivityInput
input ActivityInput -> IO (Task Payload)
next -> do
let StartActivityOptions {Bool
Maybe Duration
Maybe RetryPolicy
Maybe ActivityId
Maybe TaskQueue
Map Text Payload
ActivityCancellationType
ActivityTimeoutPolicy
activityId :: Maybe ActivityId
taskQueue :: Maybe TaskQueue
timeout :: ActivityTimeoutPolicy
scheduleToStartTimeout :: Maybe Duration
heartbeatTimeout :: Maybe Duration
retryPolicy :: Maybe RetryPolicy
cancellationType :: ActivityCancellationType
headers :: Map Text Payload
disableEagerExecution :: Bool
disableEagerExecution :: StartActivityOptions -> Bool
headers :: StartActivityOptions -> Map Text Payload
cancellationType :: StartActivityOptions -> ActivityCancellationType
retryPolicy :: StartActivityOptions -> Maybe RetryPolicy
heartbeatTimeout :: StartActivityOptions -> Maybe Duration
scheduleToStartTimeout :: StartActivityOptions -> Maybe Duration
timeout :: StartActivityOptions -> ActivityTimeoutPolicy
taskQueue :: StartActivityOptions -> Maybe TaskQueue
activityId :: StartActivityOptions -> Maybe ActivityId
..} = ActivityInput
input.options
spanArgs :: SpanArguments
spanArgs =
SpanArguments
defaultSpanArguments
{ kind = Client
}
Tracer
-> Text
-> SpanArguments
-> (Span -> IO (Task Payload))
-> IO (Task Payload)
forall (m :: * -> *) a.
(MonadUnliftIO m, HasCallStack) =>
Tracer -> Text -> SpanArguments -> (Span -> m a) -> m a
inSpan'' Tracer
tracer (Text
"StartActivity:" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> ActivityInput
input.activityType) SpanArguments
spanArgs ((Span -> IO (Task Payload)) -> IO (Task Payload))
-> (Span -> IO (Task Payload)) -> IO (Task Payload)
forall a b. (a -> b) -> a -> b
$ \Span
_ -> do
ctxt <- IO Context
forall (m :: * -> *). MonadIO m => m Context
getContext
hdrs <- inject headersPropagator ctxt headers
next $ input {options = StartActivityOptions {headers = hdrs, ..}}
, continueAsNew = \Text
n ContinueAsNewOptions {Maybe Duration
Maybe RetryPolicy
Maybe TaskQueue
Map Text Payload
Map SearchAttributeKey SearchAttributeType
taskQueue :: Maybe TaskQueue
runTimeout :: Maybe Duration
taskTimeout :: Maybe Duration
retryPolicy :: Maybe RetryPolicy
memo :: Map Text Payload
searchAttributes :: Map SearchAttributeKey SearchAttributeType
headers :: Map Text Payload
headers :: ContinueAsNewOptions -> Map Text Payload
searchAttributes :: ContinueAsNewOptions -> Map SearchAttributeKey SearchAttributeType
memo :: ContinueAsNewOptions -> Map Text Payload
retryPolicy :: ContinueAsNewOptions -> Maybe RetryPolicy
taskTimeout :: ContinueAsNewOptions -> Maybe Duration
runTimeout :: ContinueAsNewOptions -> Maybe Duration
taskQueue :: ContinueAsNewOptions -> Maybe TaskQueue
..} Text -> ContinueAsNewOptions -> IO a
next -> do
ctxt <- IO Context
forall (m :: * -> *). MonadIO m => m Context
getContext
hdrs <- inject headersPropagator ctxt headers
next n (ContinueAsNewOptions {headers = hdrs, ..})
, startChildWorkflowExecution = \Text
wfName StartChildWorkflowOptions
input Text
-> StartChildWorkflowOptions -> IO (ChildWorkflowHandle Payload)
next -> do
let StartChildWorkflowOptions {Maybe Text
Maybe RetryPolicy
Maybe TaskQueue
Maybe WorkflowId
Map Text Payload
Map SearchAttributeKey SearchAttributeType
TimeoutOptions
WorkflowIdReusePolicy
ParentClosePolicy
ChildWorkflowCancellationType
cancellationType :: ChildWorkflowCancellationType
parentClosePolicy :: ParentClosePolicy
timeoutOptions :: TimeoutOptions
retryPolicy :: Maybe RetryPolicy
cronSchedule :: Maybe Text
initialMemo :: Map Text Payload
searchAttributes :: Map SearchAttributeKey SearchAttributeType
headers :: Map Text Payload
workflowIdReusePolicy :: WorkflowIdReusePolicy
workflowId :: Maybe WorkflowId
taskQueue :: Maybe TaskQueue
taskQueue :: StartChildWorkflowOptions -> Maybe TaskQueue
workflowId :: StartChildWorkflowOptions -> Maybe WorkflowId
workflowIdReusePolicy :: StartChildWorkflowOptions -> WorkflowIdReusePolicy
headers :: StartChildWorkflowOptions -> Map Text Payload
searchAttributes :: StartChildWorkflowOptions
-> Map SearchAttributeKey SearchAttributeType
initialMemo :: StartChildWorkflowOptions -> Map Text Payload
cronSchedule :: StartChildWorkflowOptions -> Maybe Text
retryPolicy :: StartChildWorkflowOptions -> Maybe RetryPolicy
timeoutOptions :: StartChildWorkflowOptions -> TimeoutOptions
parentClosePolicy :: StartChildWorkflowOptions -> ParentClosePolicy
cancellationType :: StartChildWorkflowOptions -> ChildWorkflowCancellationType
..} = StartChildWorkflowOptions
input
spanArgs :: SpanArguments
spanArgs =
SpanArguments
defaultSpanArguments
{ kind = Client
}
Tracer
-> Text
-> SpanArguments
-> (Span -> IO (ChildWorkflowHandle Payload))
-> IO (ChildWorkflowHandle Payload)
forall (m :: * -> *) a.
(MonadUnliftIO m, HasCallStack) =>
Tracer -> Text -> SpanArguments -> (Span -> m a) -> m a
inSpan'' Tracer
tracer (Text
"StartChildWorkflow:" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
wfName) SpanArguments
spanArgs ((Span -> IO (ChildWorkflowHandle Payload))
-> IO (ChildWorkflowHandle Payload))
-> (Span -> IO (ChildWorkflowHandle Payload))
-> IO (ChildWorkflowHandle Payload)
forall a b. (a -> b) -> a -> b
$ \Span
_ -> do
ctxt <- IO Context
forall (m :: * -> *). MonadIO m => m Context
getContext
hdrs <- inject headersPropagator ctxt headers
next wfName $ StartChildWorkflowOptions {headers = hdrs, ..}
}
, activityInboundInterceptors =
ActivityInboundInterceptor
{ executeActivity = \env
env ExecuteActivityInput
input env -> ExecuteActivityInput -> IO (Either String Payload)
next -> do
let ActivityInfo {Bool
Maybe Duration
Maybe RetryPolicy
Word32
Text
Vector Payload
Map Text Payload
SystemTime
TaskToken
ActivityId
RunId
Namespace
WorkflowId
WorkflowType
workflowNamespace :: Namespace
workflowType :: WorkflowType
workflowId :: WorkflowId
runId :: RunId
activityId :: ActivityId
activityType :: Text
headerFields :: Map Text Payload
heartbeatDetails :: Vector Payload
scheduledTime :: SystemTime
currentAttemptScheduledTime :: SystemTime
startedTime :: SystemTime
attempt :: Word32
scheduleToCloseTimeout :: Maybe Duration
startToCloseTimeout :: Maybe Duration
heartbeatTimeout :: Maybe Duration
retryPolicy :: Maybe RetryPolicy
isLocal :: Bool
taskToken :: TaskToken
taskToken :: ActivityInfo -> TaskToken
isLocal :: ActivityInfo -> Bool
retryPolicy :: ActivityInfo -> Maybe RetryPolicy
heartbeatTimeout :: ActivityInfo -> Maybe Duration
startToCloseTimeout :: ActivityInfo -> Maybe Duration
scheduleToCloseTimeout :: ActivityInfo -> Maybe Duration
attempt :: ActivityInfo -> Word32
startedTime :: ActivityInfo -> SystemTime
currentAttemptScheduledTime :: ActivityInfo -> SystemTime
scheduledTime :: ActivityInfo -> SystemTime
heartbeatDetails :: ActivityInfo -> Vector Payload
headerFields :: ActivityInfo -> Map Text Payload
activityType :: ActivityInfo -> Text
activityId :: ActivityInfo -> ActivityId
runId :: ActivityInfo -> RunId
workflowId :: ActivityInfo -> WorkflowId
workflowType :: ActivityInfo -> WorkflowType
workflowNamespace :: ActivityInfo -> Namespace
..} = ExecuteActivityInput
input.activityInfo
spanArgs :: SpanArguments
spanArgs =
SpanArguments
defaultSpanArguments
{ kind = Server
, attributes =
HashMap.fromList
[ ("temporal.workflow_id", toAttribute $ rawWorkflowId workflowId)
, ("temporal.workflow_type", toAttribute $ rawWorkflowType workflowType)
, ("temporal.run_id", toAttribute $ rawRunId runId)
, ("temporal.activity_id", toAttribute $ rawActivityId activityId)
, ("temporal.activity_type", toAttribute activityType)
, ("temporal.attempt", toAttribute $ fromIntegral @Word32 @Int attempt)
,
("temporal.activity_is_local", toAttribute isLocal)
]
}
ctxt <- Propagator Context (Map Text Payload) (Map Text Payload)
-> Map Text Payload -> Context -> IO Context
forall (m :: * -> *) context i o.
MonadIO m =>
Propagator context i o -> i -> context -> m context
extract Propagator Context (Map Text Payload) (Map Text Payload)
headersPropagator ExecuteActivityInput
input.activityHeaders Context
Ctxt.empty
_ <- attachContext ctxt
inSpan'' tracer ("RunActivity:" <> input.activityInfo.activityType) spanArgs $ \Span
_span -> do
env -> ExecuteActivityInput -> IO (Either String Payload)
next env
env ExecuteActivityInput
input
}
, activityOutboundInterceptors = ActivityOutboundInterceptor
, clientInterceptors =
ClientInterceptors
{ start = \WorkflowType
ty WorkflowId
wfId StartWorkflowOptions {Bool
Maybe Text
Maybe Duration
Maybe WorkflowIdReusePolicy
Maybe RetryPolicy
Map Text Payload
Map SearchAttributeKey SearchAttributeType
TimeoutOptions
TaskQueue
taskQueue :: TaskQueue
followRuns :: Bool
workflowIdReusePolicy :: Maybe WorkflowIdReusePolicy
retryPolicy :: Maybe RetryPolicy
cronSchedule :: Maybe Text
memo :: Map Text Payload
searchAttributes :: Map SearchAttributeKey SearchAttributeType
headers :: Map Text Payload
timeouts :: TimeoutOptions
requestEagerExecution :: Bool
workflowStartDelay :: Maybe Duration
workflowStartDelay :: StartWorkflowOptions -> Maybe Duration
requestEagerExecution :: StartWorkflowOptions -> Bool
timeouts :: StartWorkflowOptions -> TimeoutOptions
headers :: StartWorkflowOptions -> Map Text Payload
searchAttributes :: StartWorkflowOptions -> Map SearchAttributeKey SearchAttributeType
memo :: StartWorkflowOptions -> Map Text Payload
cronSchedule :: StartWorkflowOptions -> Maybe Text
retryPolicy :: StartWorkflowOptions -> Maybe RetryPolicy
workflowIdReusePolicy :: StartWorkflowOptions -> Maybe WorkflowIdReusePolicy
followRuns :: StartWorkflowOptions -> Bool
taskQueue :: StartWorkflowOptions -> TaskQueue
..} Vector Payload
ps WorkflowType
-> WorkflowId
-> StartWorkflowOptions
-> Vector Payload
-> IO (WorkflowHandle Payload)
next -> do
let spanArgs :: SpanArguments
spanArgs =
SpanArguments
defaultSpanArguments
{ kind = Client
, attributes =
HashMap.fromList
[ ("temporal.workflow_id", toAttribute $ rawWorkflowId wfId)
, ("temporal.workflow_type", toAttribute $ rawWorkflowType ty)
]
}
Tracer
-> Text
-> SpanArguments
-> (Span -> IO (WorkflowHandle Payload))
-> IO (WorkflowHandle Payload)
forall (m :: * -> *) a.
(MonadUnliftIO m, HasCallStack) =>
Tracer -> Text -> SpanArguments -> (Span -> m a) -> m a
inSpan'' Tracer
tracer (Text
"StartWorkflow:" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> WorkflowType -> Text
rawWorkflowType WorkflowType
ty) SpanArguments
spanArgs ((Span -> IO (WorkflowHandle Payload))
-> IO (WorkflowHandle Payload))
-> (Span -> IO (WorkflowHandle Payload))
-> IO (WorkflowHandle Payload)
forall a b. (a -> b) -> a -> b
$ \Span
_ -> do
ctxt <- IO Context
forall (m :: * -> *). MonadIO m => m Context
getContext
hdrs <- inject headersPropagator ctxt headers
next ty wfId (StartWorkflowOptions {headers = hdrs, ..}) ps
, queryWorkflow = \QueryWorkflowInput
input QueryWorkflowInput -> IO (Either QueryRejected Payload)
next -> do
let spanArgs :: SpanArguments
spanArgs =
SpanArguments
defaultSpanArguments
{ kind = Client
}
Tracer
-> Text
-> SpanArguments
-> (Span -> IO (Either QueryRejected Payload))
-> IO (Either QueryRejected Payload)
forall (m :: * -> *) a.
(MonadUnliftIO m, HasCallStack) =>
Tracer -> Text -> SpanArguments -> (Span -> m a) -> m a
inSpan'' Tracer
tracer (Text
"QueryWorkflow:" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> QueryWorkflowInput -> Text
queryWorkflowType QueryWorkflowInput
input) SpanArguments
spanArgs ((Span -> IO (Either QueryRejected Payload))
-> IO (Either QueryRejected Payload))
-> (Span -> IO (Either QueryRejected Payload))
-> IO (Either QueryRejected Payload)
forall a b. (a -> b) -> a -> b
$ \Span
_ -> do
ctxt <- IO Context
forall (m :: * -> *). MonadIO m => m Context
getContext
hdrs <- inject headersPropagator ctxt input.queryWorkflowHeaders
next (input {queryWorkflowHeaders = hdrs})
, signalWithStart = \SignalWithStartWorkflowInput
input SignalWithStartWorkflowInput -> IO (WorkflowHandle Payload)
next -> do
let spanArgs :: SpanArguments
spanArgs =
SpanArguments
defaultSpanArguments
{ kind = Client
}
Tracer
-> Text
-> SpanArguments
-> (Span -> IO (WorkflowHandle Payload))
-> IO (WorkflowHandle Payload)
forall (m :: * -> *) a.
(MonadUnliftIO m, HasCallStack) =>
Tracer -> Text -> SpanArguments -> (Span -> m a) -> m a
inSpan'' Tracer
tracer (Text
"SignalWithStartWorkflow:" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> WorkflowType -> Text
rawWorkflowType (SignalWithStartWorkflowInput -> WorkflowType
signalWithStartWorkflowType SignalWithStartWorkflowInput
input)) SpanArguments
spanArgs ((Span -> IO (WorkflowHandle Payload))
-> IO (WorkflowHandle Payload))
-> (Span -> IO (WorkflowHandle Payload))
-> IO (WorkflowHandle Payload)
forall a b. (a -> b) -> a -> b
$ \Span
_ -> do
ctxt <- IO Context
forall (m :: * -> *). MonadIO m => m Context
getContext
hdrs <- inject headersPropagator ctxt input.signalWithStartOptions.headers
next (input {signalWithStartOptions = (signalWithStartOptions input) {Temporal.Client.Types.headers = hdrs}})
}
,
scheduleClientInterceptors = mempty
}