{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
module Temporal.Core.Worker (
Worker,
getWorkerClient,
WorkerConfig (..),
getWorkerConfig,
defaultWorkerConfig,
WorkerError (..),
WorkerErrorCode (..),
WorkerType (..),
SWorkerType (..),
SomeWorkerType (..),
newWorker,
validateWorker,
newReplayWorker,
closeWorker,
InactiveForReplay,
WorkflowActivation,
pollWorkflowActivation,
ActivityTask,
pollActivityTask,
WorkflowActivationCompletion,
completeWorkflowActivation,
ActivityTaskCompletion,
completeActivityTask,
ActivityHeartbeat,
recordActivityHeartbeat,
requestWorkflowEviction,
initiateShutdown,
finalizeShutdown,
HistoryPusher,
History,
pushHistory,
closeHistory,
KnownWorkerType (..),
) where
import Control.Exception
import Control.Monad
import Data.Aeson
import Data.Aeson.TH
import Data.ByteString (ByteString)
import qualified Data.ByteString.Lazy as BL
import Data.IORef
import Data.ProtoLens.Encoding (decodeMessageOrDie, encodeMessage)
import Data.Text (Text)
import Data.Void (Void)
import Data.Word
import Foreign.C.String
import Foreign.ForeignPtr
import Foreign.Marshal
import Foreign.Ptr
import Foreign.Storable
import Proto.Temporal.Api.History.V1.Message (History)
import Proto.Temporal.Sdk.Core.ActivityTask.ActivityTask (ActivityTask)
import Proto.Temporal.Sdk.Core.CoreInterface (ActivityHeartbeat, ActivityTaskCompletion)
import Proto.Temporal.Sdk.Core.WorkflowActivation.WorkflowActivation (WorkflowActivation)
import Proto.Temporal.Sdk.Core.WorkflowCompletion.WorkflowCompletion (WorkflowActivationCompletion)
import Temporal.Core.CTypes
import Temporal.Core.Client
import Temporal.Internal.FFI
import Temporal.Runtime
data WorkerType = Real | Replay
data SWorkerType (ty :: WorkerType) where
SReal :: SWorkerType 'Real
SReplay :: SWorkerType 'Replay
data SomeWorkerType where
SomeWorkerType :: SWorkerType ty -> SomeWorkerType
type family InactiveForReplay (ty :: WorkerType) a where
InactiveForReplay 'Real a = a
InactiveForReplay 'Replay _ = ()
class KnownWorkerType (ty :: WorkerType) where
knownWorkerType :: SWorkerType ty
instance KnownWorkerType 'Real where
knownWorkerType :: SWorkerType 'Real
knownWorkerType = SWorkerType 'Real
SReal
instance KnownWorkerType 'Replay where
knownWorkerType :: SWorkerType 'Replay
knownWorkerType = SWorkerType 'Replay
SReplay
singFor :: KnownWorkerType ty => proxy ty -> SWorkerType ty
singFor :: forall (ty :: WorkerType) (proxy :: WorkerType -> *).
KnownWorkerType ty =>
proxy ty -> SWorkerType ty
singFor proxy ty
_ = SWorkerType ty
forall (ty :: WorkerType). KnownWorkerType ty => SWorkerType ty
knownWorkerType
data Worker (ty :: WorkerType) = Worker
{ forall (ty :: WorkerType).
Worker ty -> IORef (ForeignPtr (Worker ty))
workerPtr :: {-# UNPACK #-} !(IORef (ForeignPtr (Worker ty)))
, forall (ty :: WorkerType). Worker ty -> WorkerConfig
workerConfig :: !WorkerConfig
, forall (ty :: WorkerType). Worker ty -> InactiveForReplay ty Client
workerClient :: !(InactiveForReplay ty Client)
, forall (ty :: WorkerType). Worker ty -> Runtime
workerRuntime :: {-# UNPACK #-} !Runtime
}
withWorker :: forall ty a. KnownWorkerType ty => Worker ty -> (Ptr (Worker ty) -> IO a) -> IO a
withWorker :: forall (ty :: WorkerType) a.
KnownWorkerType ty =>
Worker ty -> (Ptr (Worker ty) -> IO a) -> IO a
withWorker w :: Worker ty
w@(Worker IORef (ForeignPtr (Worker ty))
ptrRef WorkerConfig
_ InactiveForReplay ty Client
c Runtime
r) Ptr (Worker ty) -> IO a
f = Runtime -> (Ptr Runtime -> IO a) -> IO a
forall a. Runtime -> (Ptr Runtime -> IO a) -> IO a
withRuntime Runtime
r ((Ptr Runtime -> IO a) -> IO a) -> (Ptr Runtime -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \Ptr Runtime
_ ->
let keepClientAlive :: IO a -> IO a
keepClientAlive :: forall a. IO a -> IO a
keepClientAlive = case Worker ty -> SWorkerType ty
forall (ty :: WorkerType) (proxy :: WorkerType -> *).
KnownWorkerType ty =>
proxy ty -> SWorkerType ty
singFor Worker ty
w of
SWorkerType ty
SReal -> \IO a
m -> do
a <- IO a
m
touchClient c
pure a
SWorkerType ty
SReplay -> IO a -> IO a
forall a. a -> a
id
in IO a -> IO a
forall a. IO a -> IO a
keepClientAlive (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ do
ptr <- IORef (ForeignPtr (Worker ty)) -> IO (ForeignPtr (Worker ty))
forall a. IORef a -> IO a
readIORef IORef (ForeignPtr (Worker ty))
ptrRef
withForeignPtr ptr f
getWorkerClient :: Worker 'Real -> Client
getWorkerClient :: Worker 'Real -> Client
getWorkerClient = Worker 'Real -> Client
Worker 'Real -> InactiveForReplay 'Real Client
forall (ty :: WorkerType). Worker ty -> InactiveForReplay ty Client
workerClient
getWorkerConfig :: Worker ty -> WorkerConfig
getWorkerConfig :: forall (ty :: WorkerType). Worker ty -> WorkerConfig
getWorkerConfig = Worker ty -> WorkerConfig
forall (ty :: WorkerType). Worker ty -> WorkerConfig
workerConfig
newtype HistoryPusher = HistoryPusher {HistoryPusher -> Ptr HistoryPusher
historyPusher :: Ptr HistoryPusher}
type Proto = ByteString
type RunId = ByteString
type WorkflowId = ByteString
data WorkerConfig = WorkerConfig
{ WorkerConfig -> Text
namespace :: Text
, WorkerConfig -> Text
taskQueue :: Text
, WorkerConfig -> Text
buildId :: Text
, WorkerConfig -> Maybe Text
identityOverride :: Maybe Text
, WorkerConfig -> Word64
maxCachedWorkflows :: Word64
, WorkerConfig -> Word64
maxOutstandingWorkflowTasks :: Word64
, WorkerConfig -> Word64
maxOutstandingActivities :: Word64
, WorkerConfig -> Word64
maxOutstandingLocalActivities :: Word64
, WorkerConfig -> Word64
maxConcurrentWorkflowTaskPolls :: Word64
, WorkerConfig -> Float
nonstickyToStickyPollRatio :: Float
, WorkerConfig -> Word64
maxConcurrentActivityTaskPolls :: Word64
, WorkerConfig -> Bool
noRemoteActivities :: Bool
, WorkerConfig -> Word64
stickyQueueScheduleToStartTimeoutMillis :: Word64
, WorkerConfig -> Word64
maxHeartbeatThrottleIntervalMillis :: Word64
, WorkerConfig -> Word64
defaultHeartbeatThrottleIntervalMillis :: Word64
, WorkerConfig -> Maybe Double
maxActivitiesPerSecond :: Maybe Double
, WorkerConfig -> Maybe Double
maxTaskQueueActivitiesPerSecond :: Maybe Double
, WorkerConfig -> Word64
gracefulShutdownPeriodMillis :: Word64
, WorkerConfig -> Bool
nondeterminismAsWorkflowFail :: Bool
, WorkerConfig -> [Text]
nondeterminismAsWorkflowFailForTypes :: [Text]
}
deriveJSON (defaultOptions {fieldLabelModifier = camelTo2 '_'}) ''WorkerConfig
defaultWorkerConfig :: WorkerConfig
defaultWorkerConfig :: WorkerConfig
defaultWorkerConfig =
WorkerConfig
{ namespace :: Text
namespace = Text
"default"
, taskQueue :: Text
taskQueue = Text
"default"
, buildId :: Text
buildId = Text
""
, identityOverride :: Maybe Text
identityOverride = Maybe Text
forall a. Maybe a
Nothing
, maxCachedWorkflows :: Word64
maxCachedWorkflows = Word64
100000
, maxOutstandingWorkflowTasks :: Word64
maxOutstandingWorkflowTasks = Word64
1000
, maxOutstandingActivities :: Word64
maxOutstandingActivities = Word64
1000
, maxOutstandingLocalActivities :: Word64
maxOutstandingLocalActivities = Word64
1000
, maxConcurrentWorkflowTaskPolls :: Word64
maxConcurrentWorkflowTaskPolls = Word64
5
, nonstickyToStickyPollRatio :: Float
nonstickyToStickyPollRatio = Float
0.85
, maxConcurrentActivityTaskPolls :: Word64
maxConcurrentActivityTaskPolls = Word64
5
, noRemoteActivities :: Bool
noRemoteActivities = Bool
False
, stickyQueueScheduleToStartTimeoutMillis :: Word64
stickyQueueScheduleToStartTimeoutMillis = Word64
60000
, maxHeartbeatThrottleIntervalMillis :: Word64
maxHeartbeatThrottleIntervalMillis = Word64
300000
, defaultHeartbeatThrottleIntervalMillis :: Word64
defaultHeartbeatThrottleIntervalMillis = Word64
300000
, maxActivitiesPerSecond :: Maybe Double
maxActivitiesPerSecond = Maybe Double
forall a. Maybe a
Nothing
, maxTaskQueueActivitiesPerSecond :: Maybe Double
maxTaskQueueActivitiesPerSecond = Maybe Double
forall a. Maybe a
Nothing
, gracefulShutdownPeriodMillis :: Word64
gracefulShutdownPeriodMillis = Word64
0
, nondeterminismAsWorkflowFail :: Bool
nondeterminismAsWorkflowFail = Bool
False
, nondeterminismAsWorkflowFailForTypes :: [Text]
nondeterminismAsWorkflowFailForTypes = []
}
foreign import ccall "hs_temporal_new_worker" raw_newWorker :: Ptr CoreClient -> Ptr (CArray Word8) -> Ptr (Ptr (Worker 'Real)) -> Ptr (Ptr CWorkerError) -> IO ()
foreign import ccall "hs_temporal_validate_worker" raw_validateWorker :: Ptr (Worker ty) -> TokioCall CWorkerValidationError CUnit
validateWorker :: Worker 'Real -> IO (Either WorkerValidationError ())
validateWorker :: Worker 'Real -> IO (Either WorkerValidationError ())
validateWorker Worker 'Real
w = Worker 'Real
-> (Ptr (Worker 'Real) -> IO (Either WorkerValidationError ()))
-> IO (Either WorkerValidationError ())
forall (ty :: WorkerType) a.
KnownWorkerType ty =>
Worker ty -> (Ptr (Worker ty) -> IO a) -> IO a
withWorker Worker 'Real
w ((Ptr (Worker 'Real) -> IO (Either WorkerValidationError ()))
-> IO (Either WorkerValidationError ()))
-> (Ptr (Worker 'Real) -> IO (Either WorkerValidationError ()))
-> IO (Either WorkerValidationError ())
forall a b. (a -> b) -> a -> b
$ \Ptr (Worker 'Real)
wp -> do
res <- TokioCall CWorkerValidationError CUnit
-> Maybe (FinalizerPtr CWorkerValidationError)
-> Maybe (FinalizerPtr CUnit)
-> IO
(Either (ForeignPtr CWorkerValidationError) (ForeignPtr CUnit))
forall err res.
TokioCall err res
-> Maybe (FinalizerPtr err)
-> Maybe (FinalizerPtr res)
-> IO (Either (ForeignPtr err) (ForeignPtr res))
makeTokioAsyncCall (Ptr (Worker 'Real) -> TokioCall CWorkerValidationError CUnit
forall (ty :: WorkerType).
Ptr (Worker ty) -> TokioCall CWorkerValidationError CUnit
raw_validateWorker Ptr (Worker 'Real)
wp) (FinalizerPtr CWorkerValidationError
-> Maybe (FinalizerPtr CWorkerValidationError)
forall a. a -> Maybe a
Just FinalizerPtr CWorkerValidationError
rust_dropWorkerValidationError) (FinalizerPtr CUnit -> Maybe (FinalizerPtr CUnit)
forall a. a -> Maybe a
Just FinalizerPtr CUnit
rust_dropUnit)
case res of
Left ForeignPtr CWorkerValidationError
err -> WorkerValidationError -> Either WorkerValidationError ()
forall a b. a -> Either a b
Left (WorkerValidationError -> Either WorkerValidationError ())
-> IO WorkerValidationError -> IO (Either WorkerValidationError ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ForeignPtr CWorkerValidationError
-> (Ptr CWorkerValidationError -> IO WorkerValidationError)
-> IO WorkerValidationError
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr ForeignPtr CWorkerValidationError
err (Ptr CWorkerValidationError -> IO CWorkerValidationError
forall a. Storable a => Ptr a -> IO a
peek (Ptr CWorkerValidationError -> IO CWorkerValidationError)
-> (CWorkerValidationError -> IO WorkerValidationError)
-> Ptr CWorkerValidationError
-> IO WorkerValidationError
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> CWorkerValidationError -> IO WorkerValidationError
peekWorkerValidationError)
Right ForeignPtr CUnit
_ -> Either WorkerValidationError ()
-> IO (Either WorkerValidationError ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either WorkerValidationError ()
-> IO (Either WorkerValidationError ()))
-> Either WorkerValidationError ()
-> IO (Either WorkerValidationError ())
forall a b. (a -> b) -> a -> b
$ () -> Either WorkerValidationError ()
forall a b. b -> Either a b
Right ()
foreign import ccall "&hs_temporal_drop_worker" raw_closeWorker :: FinalizerPtr (Worker ty)
getWorkerError :: Ptr CWorkerError -> IO WorkerError
getWorkerError :: Ptr CWorkerError -> IO WorkerError
getWorkerError Ptr CWorkerError
errPtr = do
fp <- FinalizerPtr CWorkerError
-> Ptr CWorkerError -> IO (ForeignPtr CWorkerError)
forall a. FinalizerPtr a -> Ptr a -> IO (ForeignPtr a)
newForeignPtr FinalizerPtr CWorkerError
rust_dropWorkerError Ptr CWorkerError
errPtr
withForeignPtr fp (peek >=> peekWorkerError)
newWorker :: Client -> WorkerConfig -> IO (Either WorkerError (Worker 'Real))
newWorker :: Client -> WorkerConfig -> IO (Either WorkerError (Worker 'Real))
newWorker Client
c WorkerConfig
wc = Client
-> (Ptr CoreClient -> IO (Either WorkerError (Worker 'Real)))
-> IO (Either WorkerError (Worker 'Real))
forall a. Client -> (Ptr CoreClient -> IO a) -> IO a
withClient Client
c ((Ptr CoreClient -> IO (Either WorkerError (Worker 'Real)))
-> IO (Either WorkerError (Worker 'Real)))
-> (Ptr CoreClient -> IO (Either WorkerError (Worker 'Real)))
-> IO (Either WorkerError (Worker 'Real))
forall a b. (a -> b) -> a -> b
$ \Ptr CoreClient
cPtr -> do
ByteString
-> (Ptr (CArray Word8) -> IO (Either WorkerError (Worker 'Real)))
-> IO (Either WorkerError (Worker 'Real))
forall b. ByteString -> (Ptr (CArray Word8) -> IO b) -> IO b
withCArrayBS (LazyByteString -> ByteString
BL.toStrict (LazyByteString -> ByteString) -> LazyByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ WorkerConfig -> LazyByteString
forall a. ToJSON a => a -> LazyByteString
encode WorkerConfig
wc) ((Ptr (CArray Word8) -> IO (Either WorkerError (Worker 'Real)))
-> IO (Either WorkerError (Worker 'Real)))
-> (Ptr (CArray Word8) -> IO (Either WorkerError (Worker 'Real)))
-> IO (Either WorkerError (Worker 'Real))
forall a b. (a -> b) -> a -> b
$ \Ptr (CArray Word8)
wcPtr -> do
wPtrPtrFP <- IO (ForeignPtr (Ptr (Worker 'Real)))
forall a. Storable a => IO (ForeignPtr a)
mallocForeignPtr
errPtrPtrFP <- mallocForeignPtr
withForeignPtr wPtrPtrFP $ \Ptr (Ptr (Worker 'Real))
wPtrPtr -> do
ForeignPtr (Ptr CWorkerError)
-> (Ptr (Ptr CWorkerError)
-> IO (Either WorkerError (Worker 'Real)))
-> IO (Either WorkerError (Worker 'Real))
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr ForeignPtr (Ptr CWorkerError)
errPtrPtrFP ((Ptr (Ptr CWorkerError) -> IO (Either WorkerError (Worker 'Real)))
-> IO (Either WorkerError (Worker 'Real)))
-> (Ptr (Ptr CWorkerError)
-> IO (Either WorkerError (Worker 'Real)))
-> IO (Either WorkerError (Worker 'Real))
forall a b. (a -> b) -> a -> b
$ \Ptr (Ptr CWorkerError)
errPtrPtr -> do
Ptr (Ptr (Worker 'Real)) -> Ptr (Worker 'Real) -> IO ()
forall a. Storable a => Ptr a -> a -> IO ()
poke Ptr (Ptr (Worker 'Real))
wPtrPtr Ptr (Worker 'Real)
forall a. Ptr a
nullPtr
Ptr (Ptr CWorkerError) -> Ptr CWorkerError -> IO ()
forall a. Storable a => Ptr a -> a -> IO ()
poke Ptr (Ptr CWorkerError)
errPtrPtr Ptr CWorkerError
forall a. Ptr a
nullPtr
IO (Either WorkerError (Worker 'Real))
-> IO (Either WorkerError (Worker 'Real))
forall a. IO a -> IO a
mask_ (IO (Either WorkerError (Worker 'Real))
-> IO (Either WorkerError (Worker 'Real)))
-> IO (Either WorkerError (Worker 'Real))
-> IO (Either WorkerError (Worker 'Real))
forall a b. (a -> b) -> a -> b
$ do
Ptr CoreClient
-> Ptr (CArray Word8)
-> Ptr (Ptr (Worker 'Real))
-> Ptr (Ptr CWorkerError)
-> IO ()
raw_newWorker Ptr CoreClient
cPtr Ptr (CArray Word8)
wcPtr Ptr (Ptr (Worker 'Real))
wPtrPtr Ptr (Ptr CWorkerError)
errPtrPtr
errPtr <- Ptr (Ptr CWorkerError) -> IO (Ptr CWorkerError)
forall a. Storable a => Ptr a -> IO a
peek Ptr (Ptr CWorkerError)
errPtrPtr
if errPtr == nullPtr
then do
wPtr <- peek wPtrPtr
wPtrFP <- newForeignPtr raw_closeWorker wPtr
wPtrRef <- newIORef wPtrFP
pure $ Right $ Worker wPtrRef wc c (clientRuntime c)
else Left <$> getWorkerError errPtr
data WorkerAlreadyClosed = WorkerAlreadyClosed
deriving stock (Int -> WorkerAlreadyClosed -> ShowS
[WorkerAlreadyClosed] -> ShowS
WorkerAlreadyClosed -> String
(Int -> WorkerAlreadyClosed -> ShowS)
-> (WorkerAlreadyClosed -> String)
-> ([WorkerAlreadyClosed] -> ShowS)
-> Show WorkerAlreadyClosed
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> WorkerAlreadyClosed -> ShowS
showsPrec :: Int -> WorkerAlreadyClosed -> ShowS
$cshow :: WorkerAlreadyClosed -> String
show :: WorkerAlreadyClosed -> String
$cshowList :: [WorkerAlreadyClosed] -> ShowS
showList :: [WorkerAlreadyClosed] -> ShowS
Show)
instance Exception WorkerAlreadyClosed
closeWorker :: Worker ty -> IO ()
closeWorker :: forall (ty :: WorkerType). Worker ty -> IO ()
closeWorker (Worker IORef (ForeignPtr (Worker ty))
w WorkerConfig
_ InactiveForReplay ty Client
_ Runtime
_) = IO () -> IO ()
forall a. IO a -> IO a
mask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
wp <- IORef (ForeignPtr (Worker ty))
-> (ForeignPtr (Worker ty)
-> (ForeignPtr (Worker ty), ForeignPtr (Worker ty)))
-> IO (ForeignPtr (Worker ty))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (ForeignPtr (Worker ty))
w ((ForeignPtr (Worker ty)
-> (ForeignPtr (Worker ty), ForeignPtr (Worker ty)))
-> IO (ForeignPtr (Worker ty)))
-> (ForeignPtr (Worker ty)
-> (ForeignPtr (Worker ty), ForeignPtr (Worker ty)))
-> IO (ForeignPtr (Worker ty))
forall a b. (a -> b) -> a -> b
$ \ForeignPtr (Worker ty)
wp -> (WorkerAlreadyClosed -> ForeignPtr (Worker ty)
forall a e. (?callStack::CallStack, Exception e) => e -> a
throw WorkerAlreadyClosed
WorkerAlreadyClosed, ForeignPtr (Worker ty)
wp)
finalizeForeignPtr wp
foreign import ccall "hs_temporal_new_replay_worker" raw_newReplayWorker :: Ptr Runtime -> Ptr (CArray Word8) -> Ptr (Ptr (Worker 'Replay)) -> Ptr (Ptr HistoryPusher) -> Ptr (Ptr CWorkerError) -> IO ()
newReplayWorker :: Runtime -> WorkerConfig -> IO (Either WorkerError (Worker 'Replay, HistoryPusher))
newReplayWorker :: Runtime
-> WorkerConfig
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher))
newReplayWorker Runtime
r WorkerConfig
conf = Runtime
-> (Ptr Runtime
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher)))
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher))
forall a. Runtime -> (Ptr Runtime -> IO a) -> IO a
withRuntime Runtime
r ((Ptr Runtime
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher)))
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher)))
-> (Ptr Runtime
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher)))
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher))
forall a b. (a -> b) -> a -> b
$ \Ptr Runtime
rPtr -> do
(Ptr (Ptr (Worker 'Replay))
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher)))
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher))
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca ((Ptr (Ptr (Worker 'Replay))
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher)))
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher)))
-> (Ptr (Ptr (Worker 'Replay))
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher)))
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher))
forall a b. (a -> b) -> a -> b
$ \Ptr (Ptr (Worker 'Replay))
wPtrPtr -> do
(Ptr (Ptr HistoryPusher)
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher)))
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher))
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca ((Ptr (Ptr HistoryPusher)
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher)))
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher)))
-> (Ptr (Ptr HistoryPusher)
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher)))
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher))
forall a b. (a -> b) -> a -> b
$ \Ptr (Ptr HistoryPusher)
hpPtrPtr -> do
ByteString
-> (Ptr (CArray Word8)
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher)))
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher))
forall b. ByteString -> (Ptr (CArray Word8) -> IO b) -> IO b
withCArrayBS (LazyByteString -> ByteString
BL.toStrict (LazyByteString -> ByteString) -> LazyByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ WorkerConfig -> LazyByteString
forall a. ToJSON a => a -> LazyByteString
encode WorkerConfig
conf) ((Ptr (CArray Word8)
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher)))
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher)))
-> (Ptr (CArray Word8)
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher)))
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher))
forall a b. (a -> b) -> a -> b
$ \Ptr (CArray Word8)
confPtr -> do
(Ptr (Ptr CWorkerError)
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher)))
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher))
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca ((Ptr (Ptr CWorkerError)
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher)))
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher)))
-> (Ptr (Ptr CWorkerError)
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher)))
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher))
forall a b. (a -> b) -> a -> b
$ \Ptr (Ptr CWorkerError)
errPtrPtr -> do
Ptr (Ptr (Worker 'Replay)) -> Ptr (Worker 'Replay) -> IO ()
forall a. Storable a => Ptr a -> a -> IO ()
poke Ptr (Ptr (Worker 'Replay))
wPtrPtr Ptr (Worker 'Replay)
forall a. Ptr a
nullPtr
Ptr (Ptr HistoryPusher) -> Ptr HistoryPusher -> IO ()
forall a. Storable a => Ptr a -> a -> IO ()
poke Ptr (Ptr HistoryPusher)
hpPtrPtr Ptr HistoryPusher
forall a. Ptr a
nullPtr
Ptr (Ptr CWorkerError) -> Ptr CWorkerError -> IO ()
forall a. Storable a => Ptr a -> a -> IO ()
poke Ptr (Ptr CWorkerError)
errPtrPtr Ptr CWorkerError
forall a. Ptr a
nullPtr
Ptr Runtime
-> Ptr (CArray Word8)
-> Ptr (Ptr (Worker 'Replay))
-> Ptr (Ptr HistoryPusher)
-> Ptr (Ptr CWorkerError)
-> IO ()
raw_newReplayWorker Ptr Runtime
rPtr Ptr (CArray Word8)
confPtr Ptr (Ptr (Worker 'Replay))
wPtrPtr Ptr (Ptr HistoryPusher)
hpPtrPtr Ptr (Ptr CWorkerError)
errPtrPtr
errPtr <- Ptr (Ptr CWorkerError) -> IO (Ptr CWorkerError)
forall a. Storable a => Ptr a -> IO a
peek Ptr (Ptr CWorkerError)
errPtrPtr
if errPtr == nullPtr
then do
wPtr <- peek wPtrPtr
hpPtr <- peek hpPtrPtr
w <- newForeignPtr raw_closeWorker wPtr
wRef <- newIORef w
pure $ Right (Worker wRef conf () r, HistoryPusher hpPtr)
else Left <$> getWorkerError errPtr
foreign import ccall "hs_temporal_worker_poll_workflow_activation" raw_pollWorkflowActivation :: Ptr (Worker ty) -> TokioCall CWorkerError (CArray Word8)
pollWorkflowActivation :: KnownWorkerType ty => Worker ty -> IO (Either WorkerError WorkflowActivation)
pollWorkflowActivation :: forall (ty :: WorkerType).
KnownWorkerType ty =>
Worker ty -> IO (Either WorkerError WorkflowActivation)
pollWorkflowActivation Worker ty
w = Worker ty
-> (Ptr (Worker ty) -> IO (Either WorkerError WorkflowActivation))
-> IO (Either WorkerError WorkflowActivation)
forall (ty :: WorkerType) a.
KnownWorkerType ty =>
Worker ty -> (Ptr (Worker ty) -> IO a) -> IO a
withWorker Worker ty
w ((Ptr (Worker ty) -> IO (Either WorkerError WorkflowActivation))
-> IO (Either WorkerError WorkflowActivation))
-> (Ptr (Worker ty) -> IO (Either WorkerError WorkflowActivation))
-> IO (Either WorkerError WorkflowActivation)
forall a b. (a -> b) -> a -> b
$ \Ptr (Worker ty)
wp -> do
res <-
TokioCall CWorkerError (CArray Word8)
-> Maybe (FinalizerPtr CWorkerError)
-> Maybe (FinalizerPtr (CArray Word8))
-> IO
(Either (ForeignPtr CWorkerError) (ForeignPtr (CArray Word8)))
forall err res.
TokioCall err res
-> Maybe (FinalizerPtr err)
-> Maybe (FinalizerPtr res)
-> IO (Either (ForeignPtr err) (ForeignPtr res))
makeTokioAsyncCall
(Ptr (Worker ty) -> TokioCall CWorkerError (CArray Word8)
forall (ty :: WorkerType).
Ptr (Worker ty) -> TokioCall CWorkerError (CArray Word8)
raw_pollWorkflowActivation Ptr (Worker ty)
wp)
(FinalizerPtr CWorkerError -> Maybe (FinalizerPtr CWorkerError)
forall a. a -> Maybe a
Just FinalizerPtr CWorkerError
rust_dropWorkerError)
(FinalizerPtr (CArray Word8) -> Maybe (FinalizerPtr (CArray Word8))
forall a. a -> Maybe a
Just FinalizerPtr (CArray Word8)
rust_dropByteArray)
case res of
Left ForeignPtr CWorkerError
err -> WorkerError -> Either WorkerError WorkflowActivation
forall a b. a -> Either a b
Left (WorkerError -> Either WorkerError WorkflowActivation)
-> IO WorkerError -> IO (Either WorkerError WorkflowActivation)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ForeignPtr CWorkerError
-> (Ptr CWorkerError -> IO WorkerError) -> IO WorkerError
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr ForeignPtr CWorkerError
err (Ptr CWorkerError -> IO CWorkerError
forall a. Storable a => Ptr a -> IO a
peek (Ptr CWorkerError -> IO CWorkerError)
-> (CWorkerError -> IO WorkerError)
-> Ptr CWorkerError
-> IO WorkerError
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> CWorkerError -> IO WorkerError
peekWorkerError)
Right ForeignPtr (CArray Word8)
ok -> WorkflowActivation -> Either WorkerError WorkflowActivation
forall a b. b -> Either a b
Right (WorkflowActivation -> Either WorkerError WorkflowActivation)
-> (ByteString -> WorkflowActivation)
-> ByteString
-> Either WorkerError WorkflowActivation
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> WorkflowActivation
forall msg. Message msg => ByteString -> msg
decodeMessageOrDie (ByteString -> Either WorkerError WorkflowActivation)
-> IO ByteString -> IO (Either WorkerError WorkflowActivation)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ForeignPtr (CArray Word8)
-> (Ptr (CArray Word8) -> IO ByteString) -> IO ByteString
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr ForeignPtr (CArray Word8)
ok (Ptr (CArray Word8) -> IO (CArray Word8)
forall a. Storable a => Ptr a -> IO a
peek (Ptr (CArray Word8) -> IO (CArray Word8))
-> (CArray Word8 -> IO ByteString)
-> Ptr (CArray Word8)
-> IO ByteString
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> CArray Word8 -> IO ByteString
cArrayToByteString)
foreign import ccall "hs_temporal_worker_poll_activity_task" raw_pollActivityTask :: Ptr (Worker ty) -> TokioCall CWorkerError (CArray Word8)
pollActivityTask :: KnownWorkerType ty => Worker ty -> IO (Either WorkerError ActivityTask)
pollActivityTask :: forall (ty :: WorkerType).
KnownWorkerType ty =>
Worker ty -> IO (Either WorkerError ActivityTask)
pollActivityTask Worker ty
w = Worker ty
-> (Ptr (Worker ty) -> IO (Either WorkerError ActivityTask))
-> IO (Either WorkerError ActivityTask)
forall (ty :: WorkerType) a.
KnownWorkerType ty =>
Worker ty -> (Ptr (Worker ty) -> IO a) -> IO a
withWorker Worker ty
w ((Ptr (Worker ty) -> IO (Either WorkerError ActivityTask))
-> IO (Either WorkerError ActivityTask))
-> (Ptr (Worker ty) -> IO (Either WorkerError ActivityTask))
-> IO (Either WorkerError ActivityTask)
forall a b. (a -> b) -> a -> b
$ \Ptr (Worker ty)
wp -> do
res <-
TokioCall CWorkerError (CArray Word8)
-> Maybe (FinalizerPtr CWorkerError)
-> Maybe (FinalizerPtr (CArray Word8))
-> IO
(Either (ForeignPtr CWorkerError) (ForeignPtr (CArray Word8)))
forall err res.
TokioCall err res
-> Maybe (FinalizerPtr err)
-> Maybe (FinalizerPtr res)
-> IO (Either (ForeignPtr err) (ForeignPtr res))
makeTokioAsyncCall
(Ptr (Worker ty) -> TokioCall CWorkerError (CArray Word8)
forall (ty :: WorkerType).
Ptr (Worker ty) -> TokioCall CWorkerError (CArray Word8)
raw_pollActivityTask Ptr (Worker ty)
wp)
(FinalizerPtr CWorkerError -> Maybe (FinalizerPtr CWorkerError)
forall a. a -> Maybe a
Just FinalizerPtr CWorkerError
rust_dropWorkerError)
(FinalizerPtr (CArray Word8) -> Maybe (FinalizerPtr (CArray Word8))
forall a. a -> Maybe a
Just FinalizerPtr (CArray Word8)
rust_dropByteArray)
case res of
Left ForeignPtr CWorkerError
err -> WorkerError -> Either WorkerError ActivityTask
forall a b. a -> Either a b
Left (WorkerError -> Either WorkerError ActivityTask)
-> IO WorkerError -> IO (Either WorkerError ActivityTask)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ForeignPtr CWorkerError
-> (Ptr CWorkerError -> IO WorkerError) -> IO WorkerError
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr ForeignPtr CWorkerError
err (Ptr CWorkerError -> IO CWorkerError
forall a. Storable a => Ptr a -> IO a
peek (Ptr CWorkerError -> IO CWorkerError)
-> (CWorkerError -> IO WorkerError)
-> Ptr CWorkerError
-> IO WorkerError
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> CWorkerError -> IO WorkerError
peekWorkerError)
Right ForeignPtr (CArray Word8)
res -> ActivityTask -> Either WorkerError ActivityTask
forall a b. b -> Either a b
Right (ActivityTask -> Either WorkerError ActivityTask)
-> (ByteString -> ActivityTask)
-> ByteString
-> Either WorkerError ActivityTask
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ActivityTask
forall msg. Message msg => ByteString -> msg
decodeMessageOrDie (ByteString -> Either WorkerError ActivityTask)
-> IO ByteString -> IO (Either WorkerError ActivityTask)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ForeignPtr (CArray Word8)
-> (Ptr (CArray Word8) -> IO ByteString) -> IO ByteString
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr ForeignPtr (CArray Word8)
res (Ptr (CArray Word8) -> IO (CArray Word8)
forall a. Storable a => Ptr a -> IO a
peek (Ptr (CArray Word8) -> IO (CArray Word8))
-> (CArray Word8 -> IO ByteString)
-> Ptr (CArray Word8)
-> IO ByteString
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> CArray Word8 -> IO ByteString
cArrayToByteString)
foreign import ccall "hs_temporal_worker_complete_workflow_activation" raw_completeWorkflowActivation :: Ptr (Worker ty) -> Ptr (CArray Word8) -> TokioCall CWorkerError CUnit
completeWorkflowActivation :: KnownWorkerType ty => Worker ty -> WorkflowActivationCompletion -> IO (Either WorkerError ())
completeWorkflowActivation :: forall (ty :: WorkerType).
KnownWorkerType ty =>
Worker ty
-> WorkflowActivationCompletion -> IO (Either WorkerError ())
completeWorkflowActivation Worker ty
w WorkflowActivationCompletion
p = Worker ty
-> (Ptr (Worker ty) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall (ty :: WorkerType) a.
KnownWorkerType ty =>
Worker ty -> (Ptr (Worker ty) -> IO a) -> IO a
withWorker Worker ty
w ((Ptr (Worker ty) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ()))
-> (Ptr (Worker ty) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall a b. (a -> b) -> a -> b
$ \Ptr (Worker ty)
wp ->
ByteString
-> (Ptr (CArray Word8) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall b. ByteString -> (Ptr (CArray Word8) -> IO b) -> IO b
withCArrayBS (WorkflowActivationCompletion -> ByteString
forall msg. Message msg => msg -> ByteString
encodeMessage WorkflowActivationCompletion
p) ((Ptr (CArray Word8) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ()))
-> (Ptr (CArray Word8) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall a b. (a -> b) -> a -> b
$ \Ptr (CArray Word8)
pPtr -> do
res <-
TokioCall CWorkerError CUnit
-> Maybe (FinalizerPtr CWorkerError)
-> Maybe (FinalizerPtr CUnit)
-> IO (Either (ForeignPtr CWorkerError) (ForeignPtr CUnit))
forall err res.
TokioCall err res
-> Maybe (FinalizerPtr err)
-> Maybe (FinalizerPtr res)
-> IO (Either (ForeignPtr err) (ForeignPtr res))
makeTokioAsyncCall
(Ptr (Worker ty)
-> Ptr (CArray Word8) -> TokioCall CWorkerError CUnit
forall (ty :: WorkerType).
Ptr (Worker ty)
-> Ptr (CArray Word8) -> TokioCall CWorkerError CUnit
raw_completeWorkflowActivation Ptr (Worker ty)
wp Ptr (CArray Word8)
pPtr)
(FinalizerPtr CWorkerError -> Maybe (FinalizerPtr CWorkerError)
forall a. a -> Maybe a
Just FinalizerPtr CWorkerError
rust_dropWorkerError)
(FinalizerPtr CUnit -> Maybe (FinalizerPtr CUnit)
forall a. a -> Maybe a
Just FinalizerPtr CUnit
rust_dropUnit)
case res of
Left ForeignPtr CWorkerError
err -> WorkerError -> Either WorkerError ()
forall a b. a -> Either a b
Left (WorkerError -> Either WorkerError ())
-> IO WorkerError -> IO (Either WorkerError ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ForeignPtr CWorkerError
-> (Ptr CWorkerError -> IO WorkerError) -> IO WorkerError
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr ForeignPtr CWorkerError
err (Ptr CWorkerError -> IO CWorkerError
forall a. Storable a => Ptr a -> IO a
peek (Ptr CWorkerError -> IO CWorkerError)
-> (CWorkerError -> IO WorkerError)
-> Ptr CWorkerError
-> IO WorkerError
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> CWorkerError -> IO WorkerError
peekWorkerError)
Right ForeignPtr CUnit
_ -> Either WorkerError () -> IO (Either WorkerError ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either WorkerError () -> IO (Either WorkerError ()))
-> Either WorkerError () -> IO (Either WorkerError ())
forall a b. (a -> b) -> a -> b
$ () -> Either WorkerError ()
forall a b. b -> Either a b
Right ()
foreign import ccall "hs_temporal_worker_complete_activity_task" raw_completeActivityTask :: Ptr (Worker ty) -> Ptr (CArray Word8) -> TokioCall CWorkerError CUnit
completeActivityTask :: KnownWorkerType ty => Worker ty -> ActivityTaskCompletion -> IO (Either WorkerError ())
completeActivityTask :: forall (ty :: WorkerType).
KnownWorkerType ty =>
Worker ty -> ActivityTaskCompletion -> IO (Either WorkerError ())
completeActivityTask Worker ty
w ActivityTaskCompletion
p = Worker ty
-> (Ptr (Worker ty) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall (ty :: WorkerType) a.
KnownWorkerType ty =>
Worker ty -> (Ptr (Worker ty) -> IO a) -> IO a
withWorker Worker ty
w ((Ptr (Worker ty) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ()))
-> (Ptr (Worker ty) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall a b. (a -> b) -> a -> b
$ \Ptr (Worker ty)
wp ->
ByteString
-> (Ptr (CArray Word8) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall b. ByteString -> (Ptr (CArray Word8) -> IO b) -> IO b
withCArrayBS (ActivityTaskCompletion -> ByteString
forall msg. Message msg => msg -> ByteString
encodeMessage ActivityTaskCompletion
p) ((Ptr (CArray Word8) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ()))
-> (Ptr (CArray Word8) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall a b. (a -> b) -> a -> b
$ \Ptr (CArray Word8)
pPtr -> do
res <-
TokioCall CWorkerError CUnit
-> Maybe (FinalizerPtr CWorkerError)
-> Maybe (FinalizerPtr CUnit)
-> IO (Either (ForeignPtr CWorkerError) (ForeignPtr CUnit))
forall err res.
TokioCall err res
-> Maybe (FinalizerPtr err)
-> Maybe (FinalizerPtr res)
-> IO (Either (ForeignPtr err) (ForeignPtr res))
makeTokioAsyncCall
(Ptr (Worker ty)
-> Ptr (CArray Word8) -> TokioCall CWorkerError CUnit
forall (ty :: WorkerType).
Ptr (Worker ty)
-> Ptr (CArray Word8) -> TokioCall CWorkerError CUnit
raw_completeActivityTask Ptr (Worker ty)
wp Ptr (CArray Word8)
pPtr)
(FinalizerPtr CWorkerError -> Maybe (FinalizerPtr CWorkerError)
forall a. a -> Maybe a
Just FinalizerPtr CWorkerError
rust_dropWorkerError)
(FinalizerPtr CUnit -> Maybe (FinalizerPtr CUnit)
forall a. a -> Maybe a
Just FinalizerPtr CUnit
rust_dropUnit)
case res of
Left ForeignPtr CWorkerError
err -> WorkerError -> Either WorkerError ()
forall a b. a -> Either a b
Left (WorkerError -> Either WorkerError ())
-> IO WorkerError -> IO (Either WorkerError ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ForeignPtr CWorkerError
-> (Ptr CWorkerError -> IO WorkerError) -> IO WorkerError
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr ForeignPtr CWorkerError
err (Ptr CWorkerError -> IO CWorkerError
forall a. Storable a => Ptr a -> IO a
peek (Ptr CWorkerError -> IO CWorkerError)
-> (CWorkerError -> IO WorkerError)
-> Ptr CWorkerError
-> IO WorkerError
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> CWorkerError -> IO WorkerError
peekWorkerError)
Right ForeignPtr CUnit
_ -> Either WorkerError () -> IO (Either WorkerError ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either WorkerError () -> IO (Either WorkerError ()))
-> Either WorkerError () -> IO (Either WorkerError ())
forall a b. (a -> b) -> a -> b
$ () -> Either WorkerError ()
forall a b. b -> Either a b
Right ()
foreign import ccall "hs_temporal_worker_record_activity_heartbeat" raw_recordActivityHeartbeat :: Ptr (Worker ty) -> Ptr (CArray Word8) -> Ptr (Ptr CWorkerError) -> Ptr (Ptr CUnit) -> IO ()
recordActivityHeartbeat :: KnownWorkerType ty => Worker ty -> ActivityHeartbeat -> IO (Either WorkerError ())
recordActivityHeartbeat :: forall (ty :: WorkerType).
KnownWorkerType ty =>
Worker ty -> ActivityHeartbeat -> IO (Either WorkerError ())
recordActivityHeartbeat Worker ty
w ActivityHeartbeat
p = Worker ty
-> (Ptr (Worker ty) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall (ty :: WorkerType) a.
KnownWorkerType ty =>
Worker ty -> (Ptr (Worker ty) -> IO a) -> IO a
withWorker Worker ty
w ((Ptr (Worker ty) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ()))
-> (Ptr (Worker ty) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall a b. (a -> b) -> a -> b
$ \Ptr (Worker ty)
wp ->
ByteString
-> (Ptr (CArray Word8) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall b. ByteString -> (Ptr (CArray Word8) -> IO b) -> IO b
withCArrayBS (ActivityHeartbeat -> ByteString
forall msg. Message msg => msg -> ByteString
encodeMessage ActivityHeartbeat
p) ((Ptr (CArray Word8) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ()))
-> (Ptr (CArray Word8) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall a b. (a -> b) -> a -> b
$ \Ptr (CArray Word8)
pPtr -> do
(Ptr (Ptr CWorkerError) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca ((Ptr (Ptr CWorkerError) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ()))
-> (Ptr (Ptr CWorkerError) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall a b. (a -> b) -> a -> b
$ \Ptr (Ptr CWorkerError)
errPtrPtr -> do
(Ptr (Ptr CUnit) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca ((Ptr (Ptr CUnit) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ()))
-> (Ptr (Ptr CUnit) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall a b. (a -> b) -> a -> b
$ \Ptr (Ptr CUnit)
resPtrPtr -> IO (Either WorkerError ()) -> IO (Either WorkerError ())
forall a. IO a -> IO a
mask_ (IO (Either WorkerError ()) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ()) -> IO (Either WorkerError ())
forall a b. (a -> b) -> a -> b
$ do
Ptr (Ptr CWorkerError) -> Ptr CWorkerError -> IO ()
forall a. Storable a => Ptr a -> a -> IO ()
poke Ptr (Ptr CWorkerError)
errPtrPtr Ptr CWorkerError
forall a. Ptr a
nullPtr
Ptr (Ptr CUnit) -> Ptr CUnit -> IO ()
forall a. Storable a => Ptr a -> a -> IO ()
poke Ptr (Ptr CUnit)
resPtrPtr Ptr CUnit
forall a. Ptr a
nullPtr
Ptr (Worker ty)
-> Ptr (CArray Word8)
-> Ptr (Ptr CWorkerError)
-> Ptr (Ptr CUnit)
-> IO ()
forall (ty :: WorkerType).
Ptr (Worker ty)
-> Ptr (CArray Word8)
-> Ptr (Ptr CWorkerError)
-> Ptr (Ptr CUnit)
-> IO ()
raw_recordActivityHeartbeat Ptr (Worker ty)
wp Ptr (CArray Word8)
pPtr Ptr (Ptr CWorkerError)
errPtrPtr Ptr (Ptr CUnit)
resPtrPtr
errPtr <- Ptr (Ptr CWorkerError) -> IO (Ptr CWorkerError)
forall a. Storable a => Ptr a -> IO a
peek Ptr (Ptr CWorkerError)
errPtrPtr
if errPtr == nullPtr
then do
rust_dropUnitNow =<< peek resPtrPtr
pure $ Right ()
else Left <$> getWorkerError errPtr
foreign import ccall "hs_temporal_worker_request_workflow_eviction" raw_requestWorkflowEviction :: Ptr (Worker ty) -> Ptr (CArray Word8) -> IO ()
requestWorkflowEviction :: KnownWorkerType ty => Worker ty -> RunId -> IO ()
requestWorkflowEviction :: forall (ty :: WorkerType).
KnownWorkerType ty =>
Worker ty -> ByteString -> IO ()
requestWorkflowEviction Worker ty
w ByteString
r = Worker ty -> (Ptr (Worker ty) -> IO ()) -> IO ()
forall (ty :: WorkerType) a.
KnownWorkerType ty =>
Worker ty -> (Ptr (Worker ty) -> IO a) -> IO a
withWorker Worker ty
w ((Ptr (Worker ty) -> IO ()) -> IO ())
-> (Ptr (Worker ty) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Ptr (Worker ty)
wp ->
ByteString -> (Ptr (CArray Word8) -> IO ()) -> IO ()
forall b. ByteString -> (Ptr (CArray Word8) -> IO b) -> IO b
withCArrayBS ByteString
r ((Ptr (CArray Word8) -> IO ()) -> IO ())
-> (Ptr (CArray Word8) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Ptr (CArray Word8)
rPtr -> do
Ptr (Worker ty) -> Ptr (CArray Word8) -> IO ()
forall (ty :: WorkerType).
Ptr (Worker ty) -> Ptr (CArray Word8) -> IO ()
raw_requestWorkflowEviction Ptr (Worker ty)
wp Ptr (CArray Word8)
rPtr
foreign import ccall "hs_temporal_worker_initiate_shutdown" raw_initiateShutdown :: Ptr (Worker ty) -> IO ()
initiateShutdown :: KnownWorkerType ty => Worker ty -> IO ()
initiateShutdown :: forall (ty :: WorkerType). KnownWorkerType ty => Worker ty -> IO ()
initiateShutdown Worker ty
w = Worker ty -> (Ptr (Worker ty) -> IO ()) -> IO ()
forall (ty :: WorkerType) a.
KnownWorkerType ty =>
Worker ty -> (Ptr (Worker ty) -> IO a) -> IO a
withWorker Worker ty
w Ptr (Worker ty) -> IO ()
forall (ty :: WorkerType). Ptr (Worker ty) -> IO ()
raw_initiateShutdown
foreign import ccall "hs_temporal_worker_finalize_shutdown" raw_finalizeShutdown :: Ptr (Worker ty) -> TokioCall CWorkerError CUnit
finalizeShutdown :: KnownWorkerType ty => Worker ty -> IO (Either WorkerError ())
finalizeShutdown :: forall (ty :: WorkerType).
KnownWorkerType ty =>
Worker ty -> IO (Either WorkerError ())
finalizeShutdown Worker ty
w = Worker ty
-> (Ptr (Worker ty) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall (ty :: WorkerType) a.
KnownWorkerType ty =>
Worker ty -> (Ptr (Worker ty) -> IO a) -> IO a
withWorker Worker ty
w ((Ptr (Worker ty) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ()))
-> (Ptr (Worker ty) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall a b. (a -> b) -> a -> b
$ \Ptr (Worker ty)
wp -> do
res <-
TokioCall CWorkerError CUnit
-> Maybe (FinalizerPtr CWorkerError)
-> Maybe (FinalizerPtr CUnit)
-> IO (Either (ForeignPtr CWorkerError) (ForeignPtr CUnit))
forall err res.
TokioCall err res
-> Maybe (FinalizerPtr err)
-> Maybe (FinalizerPtr res)
-> IO (Either (ForeignPtr err) (ForeignPtr res))
makeTokioAsyncCall
(Ptr (Worker ty) -> TokioCall CWorkerError CUnit
forall (ty :: WorkerType).
Ptr (Worker ty) -> TokioCall CWorkerError CUnit
raw_finalizeShutdown Ptr (Worker ty)
wp)
(FinalizerPtr CWorkerError -> Maybe (FinalizerPtr CWorkerError)
forall a. a -> Maybe a
Just FinalizerPtr CWorkerError
rust_dropWorkerError)
(FinalizerPtr CUnit -> Maybe (FinalizerPtr CUnit)
forall a. a -> Maybe a
Just FinalizerPtr CUnit
rust_dropUnit)
case res of
Left ForeignPtr CWorkerError
err -> WorkerError -> Either WorkerError ()
forall a b. a -> Either a b
Left (WorkerError -> Either WorkerError ())
-> IO WorkerError -> IO (Either WorkerError ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ForeignPtr CWorkerError
-> (Ptr CWorkerError -> IO WorkerError) -> IO WorkerError
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr ForeignPtr CWorkerError
err (Ptr CWorkerError -> IO CWorkerError
forall a. Storable a => Ptr a -> IO a
peek (Ptr CWorkerError -> IO CWorkerError)
-> (CWorkerError -> IO WorkerError)
-> Ptr CWorkerError
-> IO WorkerError
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> CWorkerError -> IO WorkerError
peekWorkerError)
Right ForeignPtr CUnit
_ -> Either WorkerError () -> IO (Either WorkerError ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either WorkerError () -> IO (Either WorkerError ()))
-> Either WorkerError () -> IO (Either WorkerError ())
forall a b. (a -> b) -> a -> b
$ () -> Either WorkerError ()
forall a b. b -> Either a b
Right ()
foreign import ccall "hs_temporal_history_pusher_push_history" raw_pushHistory :: Ptr HistoryPusher -> Ptr (CArray Word8) -> Ptr (CArray Word8) -> TokioCall CWorkerError CUnit
pushHistory :: HistoryPusher -> WorkflowId -> Either ByteString History -> IO (Either WorkerError ())
pushHistory :: HistoryPusher
-> ByteString
-> Either ByteString History
-> IO (Either WorkerError ())
pushHistory (HistoryPusher Ptr HistoryPusher
hp) ByteString
wf Either ByteString History
p =
ByteString
-> (Ptr (CArray Word8) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall b. ByteString -> (Ptr (CArray Word8) -> IO b) -> IO b
withCArrayBS ByteString
wf ((Ptr (CArray Word8) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ()))
-> (Ptr (CArray Word8) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall a b. (a -> b) -> a -> b
$ \Ptr (CArray Word8)
wfPtr -> do
ByteString
-> (Ptr (CArray Word8) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall b. ByteString -> (Ptr (CArray Word8) -> IO b) -> IO b
withCArrayBS ((ByteString -> ByteString)
-> (History -> ByteString)
-> Either ByteString History
-> ByteString
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either ByteString -> ByteString
forall a. a -> a
id History -> ByteString
forall msg. Message msg => msg -> ByteString
encodeMessage Either ByteString History
p) ((Ptr (CArray Word8) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ()))
-> (Ptr (CArray Word8) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall a b. (a -> b) -> a -> b
$ \Ptr (CArray Word8)
pPtr -> do
res <-
TokioCall CWorkerError CUnit
-> Maybe (FinalizerPtr CWorkerError)
-> Maybe (FinalizerPtr CUnit)
-> IO (Either (ForeignPtr CWorkerError) (ForeignPtr CUnit))
forall err res.
TokioCall err res
-> Maybe (FinalizerPtr err)
-> Maybe (FinalizerPtr res)
-> IO (Either (ForeignPtr err) (ForeignPtr res))
makeTokioAsyncCall
(Ptr HistoryPusher
-> Ptr (CArray Word8)
-> Ptr (CArray Word8)
-> TokioCall CWorkerError CUnit
raw_pushHistory Ptr HistoryPusher
hp Ptr (CArray Word8)
wfPtr Ptr (CArray Word8)
pPtr)
(FinalizerPtr CWorkerError -> Maybe (FinalizerPtr CWorkerError)
forall a. a -> Maybe a
Just FinalizerPtr CWorkerError
rust_dropWorkerError)
(FinalizerPtr CUnit -> Maybe (FinalizerPtr CUnit)
forall a. a -> Maybe a
Just FinalizerPtr CUnit
rust_dropUnit)
case res of
Left ForeignPtr CWorkerError
err -> WorkerError -> Either WorkerError ()
forall a b. a -> Either a b
Left (WorkerError -> Either WorkerError ())
-> IO WorkerError -> IO (Either WorkerError ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ForeignPtr CWorkerError
-> (Ptr CWorkerError -> IO WorkerError) -> IO WorkerError
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr ForeignPtr CWorkerError
err (Ptr CWorkerError -> IO CWorkerError
forall a. Storable a => Ptr a -> IO a
peek (Ptr CWorkerError -> IO CWorkerError)
-> (CWorkerError -> IO WorkerError)
-> Ptr CWorkerError
-> IO WorkerError
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> CWorkerError -> IO WorkerError
peekWorkerError)
Right ForeignPtr CUnit
_ -> Either WorkerError () -> IO (Either WorkerError ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either WorkerError () -> IO (Either WorkerError ()))
-> Either WorkerError () -> IO (Either WorkerError ())
forall a b. (a -> b) -> a -> b
$ () -> Either WorkerError ()
forall a b. b -> Either a b
Right ()
foreign import ccall "hs_temporal_history_pusher_close" raw_closeHistoryPusher :: Ptr HistoryPusher -> IO ()
closeHistory :: HistoryPusher -> IO ()
closeHistory :: HistoryPusher -> IO ()
closeHistory (HistoryPusher Ptr HistoryPusher
hp) =
Ptr HistoryPusher -> IO ()
raw_closeHistoryPusher Ptr HistoryPusher
hp