{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}

module Temporal.Core.Worker (
  Worker,
  getWorkerClient,
  WorkerConfig (..),
  getWorkerConfig,
  defaultWorkerConfig,
  WorkerError (..),
  WorkerErrorCode (..),
  WorkerType (..),
  SWorkerType (..),
  SomeWorkerType (..),
  newWorker,
  validateWorker,
  newReplayWorker,
  closeWorker,
  InactiveForReplay,
  WorkflowActivation,
  pollWorkflowActivation,
  ActivityTask,
  pollActivityTask,
  WorkflowActivationCompletion,
  completeWorkflowActivation,
  ActivityTaskCompletion,
  completeActivityTask,
  ActivityHeartbeat,
  recordActivityHeartbeat,
  requestWorkflowEviction,
  initiateShutdown,
  finalizeShutdown,
  HistoryPusher,
  History,
  pushHistory,
  closeHistory,
  KnownWorkerType (..),
) where

import Control.Exception
import Control.Monad
import Data.Aeson
import Data.Aeson.TH
import Data.ByteString (ByteString)
import qualified Data.ByteString.Lazy as BL
import Data.IORef
import Data.ProtoLens.Encoding (decodeMessageOrDie, encodeMessage)
import Data.Text (Text)
import Data.Void (Void)
import Data.Word
import Foreign.C.String
import Foreign.ForeignPtr
import Foreign.Marshal
import Foreign.Ptr
import Foreign.Storable
import Proto.Temporal.Api.History.V1.Message (History)
import Proto.Temporal.Sdk.Core.ActivityTask.ActivityTask (ActivityTask)
import Proto.Temporal.Sdk.Core.CoreInterface (ActivityHeartbeat, ActivityTaskCompletion)
import Proto.Temporal.Sdk.Core.WorkflowActivation.WorkflowActivation (WorkflowActivation)
import Proto.Temporal.Sdk.Core.WorkflowCompletion.WorkflowCompletion (WorkflowActivationCompletion)
import Temporal.Core.CTypes
import Temporal.Core.Client
import Temporal.Internal.FFI
import Temporal.Runtime


data WorkerType = Real | Replay


-- | A singleton type for 'WorkerType'.
data SWorkerType (ty :: WorkerType) where
  SReal :: SWorkerType 'Real
  SReplay :: SWorkerType 'Replay


-- Promote a 'WorkerType' to a singleton type.
data SomeWorkerType where
  SomeWorkerType :: SWorkerType ty -> SomeWorkerType


type family InactiveForReplay (ty :: WorkerType) a where
  InactiveForReplay 'Real a = a
  InactiveForReplay 'Replay _ = ()


class KnownWorkerType (ty :: WorkerType) where
  knownWorkerType :: SWorkerType ty


instance KnownWorkerType 'Real where
  knownWorkerType :: SWorkerType 'Real
knownWorkerType = SWorkerType 'Real
SReal


instance KnownWorkerType 'Replay where
  knownWorkerType :: SWorkerType 'Replay
knownWorkerType = SWorkerType 'Replay
SReplay


singFor :: KnownWorkerType ty => proxy ty -> SWorkerType ty
singFor :: forall (ty :: WorkerType) (proxy :: WorkerType -> *).
KnownWorkerType ty =>
proxy ty -> SWorkerType ty
singFor proxy ty
_ = SWorkerType ty
forall (ty :: WorkerType). KnownWorkerType ty => SWorkerType ty
knownWorkerType


data Worker (ty :: WorkerType) = Worker
  { forall (ty :: WorkerType).
Worker ty -> IORef (ForeignPtr (Worker ty))
workerPtr :: {-# UNPACK #-} !(IORef (ForeignPtr (Worker ty)))
  , forall (ty :: WorkerType). Worker ty -> WorkerConfig
workerConfig :: !WorkerConfig
  , forall (ty :: WorkerType). Worker ty -> InactiveForReplay ty Client
workerClient :: !(InactiveForReplay ty Client)
  , forall (ty :: WorkerType). Worker ty -> Runtime
workerRuntime :: {-# UNPACK #-} !Runtime
  }


withWorker :: forall ty a. KnownWorkerType ty => Worker ty -> (Ptr (Worker ty) -> IO a) -> IO a
withWorker :: forall (ty :: WorkerType) a.
KnownWorkerType ty =>
Worker ty -> (Ptr (Worker ty) -> IO a) -> IO a
withWorker w :: Worker ty
w@(Worker IORef (ForeignPtr (Worker ty))
ptrRef WorkerConfig
_ InactiveForReplay ty Client
c Runtime
r) Ptr (Worker ty) -> IO a
f = Runtime -> (Ptr Runtime -> IO a) -> IO a
forall a. Runtime -> (Ptr Runtime -> IO a) -> IO a
withRuntime Runtime
r ((Ptr Runtime -> IO a) -> IO a) -> (Ptr Runtime -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \Ptr Runtime
_ ->
  let keepClientAlive :: IO a -> IO a
      keepClientAlive :: forall a. IO a -> IO a
keepClientAlive = case Worker ty -> SWorkerType ty
forall (ty :: WorkerType) (proxy :: WorkerType -> *).
KnownWorkerType ty =>
proxy ty -> SWorkerType ty
singFor Worker ty
w of
        SWorkerType ty
SReal -> \IO a
m -> do
          a <- IO a
m
          touchClient c
          pure a
        SWorkerType ty
SReplay -> IO a -> IO a
forall a. a -> a
id
  in IO a -> IO a
forall a. IO a -> IO a
keepClientAlive (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ do
      ptr <- IORef (ForeignPtr (Worker ty)) -> IO (ForeignPtr (Worker ty))
forall a. IORef a -> IO a
readIORef IORef (ForeignPtr (Worker ty))
ptrRef
      withForeignPtr ptr f


getWorkerClient :: Worker 'Real -> Client
getWorkerClient :: Worker 'Real -> Client
getWorkerClient = Worker 'Real -> Client
Worker 'Real -> InactiveForReplay 'Real Client
forall (ty :: WorkerType). Worker ty -> InactiveForReplay ty Client
workerClient


getWorkerConfig :: Worker ty -> WorkerConfig
getWorkerConfig :: forall (ty :: WorkerType). Worker ty -> WorkerConfig
getWorkerConfig = Worker ty -> WorkerConfig
forall (ty :: WorkerType). Worker ty -> WorkerConfig
workerConfig


newtype HistoryPusher = HistoryPusher {HistoryPusher -> Ptr HistoryPusher
historyPusher :: Ptr HistoryPusher}


type Proto = ByteString


type RunId = ByteString


type WorkflowId = ByteString


data WorkerConfig = WorkerConfig
  { WorkerConfig -> Text
namespace :: Text
  , WorkerConfig -> Text
taskQueue :: Text
  , WorkerConfig -> Text
buildId :: Text
  , WorkerConfig -> Maybe Text
identityOverride :: Maybe Text
  , WorkerConfig -> Word64
maxCachedWorkflows :: Word64
  , WorkerConfig -> Word64
maxOutstandingWorkflowTasks :: Word64
  , WorkerConfig -> Word64
maxOutstandingActivities :: Word64
  , WorkerConfig -> Word64
maxOutstandingLocalActivities :: Word64
  , WorkerConfig -> Word64
maxConcurrentWorkflowTaskPolls :: Word64
  , WorkerConfig -> Float
nonstickyToStickyPollRatio :: Float
  , WorkerConfig -> Word64
maxConcurrentActivityTaskPolls :: Word64
  , WorkerConfig -> Bool
noRemoteActivities :: Bool
  , WorkerConfig -> Word64
stickyQueueScheduleToStartTimeoutMillis :: Word64
  , WorkerConfig -> Word64
maxHeartbeatThrottleIntervalMillis :: Word64
  , WorkerConfig -> Word64
defaultHeartbeatThrottleIntervalMillis :: Word64
  , WorkerConfig -> Maybe Double
maxActivitiesPerSecond :: Maybe Double
  , WorkerConfig -> Maybe Double
maxTaskQueueActivitiesPerSecond :: Maybe Double
  , WorkerConfig -> Word64
gracefulShutdownPeriodMillis :: Word64
  , WorkerConfig -> Bool
nondeterminismAsWorkflowFail :: Bool
  , WorkerConfig -> [Text]
nondeterminismAsWorkflowFailForTypes :: [Text]
  -- TODO:
  -- useWorkerVersioning
  -- tuner
  }


deriveJSON (defaultOptions {fieldLabelModifier = camelTo2 '_'}) ''WorkerConfig


defaultWorkerConfig :: WorkerConfig
defaultWorkerConfig :: WorkerConfig
defaultWorkerConfig =
  WorkerConfig
    { namespace :: Text
namespace = Text
"default"
    , taskQueue :: Text
taskQueue = Text
"default"
    , buildId :: Text
buildId = Text
""
    , identityOverride :: Maybe Text
identityOverride = Maybe Text
forall a. Maybe a
Nothing
    , maxCachedWorkflows :: Word64
maxCachedWorkflows = Word64
100000
    , maxOutstandingWorkflowTasks :: Word64
maxOutstandingWorkflowTasks = Word64
1000
    , maxOutstandingActivities :: Word64
maxOutstandingActivities = Word64
1000
    , maxOutstandingLocalActivities :: Word64
maxOutstandingLocalActivities = Word64
1000
    , maxConcurrentWorkflowTaskPolls :: Word64
maxConcurrentWorkflowTaskPolls = Word64
5
    , nonstickyToStickyPollRatio :: Float
nonstickyToStickyPollRatio = Float
0.85
    , maxConcurrentActivityTaskPolls :: Word64
maxConcurrentActivityTaskPolls = Word64
5
    , noRemoteActivities :: Bool
noRemoteActivities = Bool
False
    , stickyQueueScheduleToStartTimeoutMillis :: Word64
stickyQueueScheduleToStartTimeoutMillis = Word64
60000
    , maxHeartbeatThrottleIntervalMillis :: Word64
maxHeartbeatThrottleIntervalMillis = Word64
300000
    , defaultHeartbeatThrottleIntervalMillis :: Word64
defaultHeartbeatThrottleIntervalMillis = Word64
300000
    , maxActivitiesPerSecond :: Maybe Double
maxActivitiesPerSecond = Maybe Double
forall a. Maybe a
Nothing
    , maxTaskQueueActivitiesPerSecond :: Maybe Double
maxTaskQueueActivitiesPerSecond = Maybe Double
forall a. Maybe a
Nothing
    , gracefulShutdownPeriodMillis :: Word64
gracefulShutdownPeriodMillis = Word64
0
    , nondeterminismAsWorkflowFail :: Bool
nondeterminismAsWorkflowFail = Bool
False
    , nondeterminismAsWorkflowFailForTypes :: [Text]
nondeterminismAsWorkflowFailForTypes = []
    }


foreign import ccall "hs_temporal_new_worker" raw_newWorker :: Ptr CoreClient -> Ptr (CArray Word8) -> Ptr (Ptr (Worker 'Real)) -> Ptr (Ptr CWorkerError) -> IO ()


foreign import ccall "hs_temporal_validate_worker" raw_validateWorker :: Ptr (Worker ty) -> TokioCall CWorkerValidationError CUnit


validateWorker :: Worker 'Real -> IO (Either WorkerValidationError ())
validateWorker :: Worker 'Real -> IO (Either WorkerValidationError ())
validateWorker Worker 'Real
w = Worker 'Real
-> (Ptr (Worker 'Real) -> IO (Either WorkerValidationError ()))
-> IO (Either WorkerValidationError ())
forall (ty :: WorkerType) a.
KnownWorkerType ty =>
Worker ty -> (Ptr (Worker ty) -> IO a) -> IO a
withWorker Worker 'Real
w ((Ptr (Worker 'Real) -> IO (Either WorkerValidationError ()))
 -> IO (Either WorkerValidationError ()))
-> (Ptr (Worker 'Real) -> IO (Either WorkerValidationError ()))
-> IO (Either WorkerValidationError ())
forall a b. (a -> b) -> a -> b
$ \Ptr (Worker 'Real)
wp -> do
  res <- TokioCall CWorkerValidationError CUnit
-> Maybe (FinalizerPtr CWorkerValidationError)
-> Maybe (FinalizerPtr CUnit)
-> IO
     (Either (ForeignPtr CWorkerValidationError) (ForeignPtr CUnit))
forall err res.
TokioCall err res
-> Maybe (FinalizerPtr err)
-> Maybe (FinalizerPtr res)
-> IO (Either (ForeignPtr err) (ForeignPtr res))
makeTokioAsyncCall (Ptr (Worker 'Real) -> TokioCall CWorkerValidationError CUnit
forall (ty :: WorkerType).
Ptr (Worker ty) -> TokioCall CWorkerValidationError CUnit
raw_validateWorker Ptr (Worker 'Real)
wp) (FinalizerPtr CWorkerValidationError
-> Maybe (FinalizerPtr CWorkerValidationError)
forall a. a -> Maybe a
Just FinalizerPtr CWorkerValidationError
rust_dropWorkerValidationError) (FinalizerPtr CUnit -> Maybe (FinalizerPtr CUnit)
forall a. a -> Maybe a
Just FinalizerPtr CUnit
rust_dropUnit)
  case res of
    Left ForeignPtr CWorkerValidationError
err -> WorkerValidationError -> Either WorkerValidationError ()
forall a b. a -> Either a b
Left (WorkerValidationError -> Either WorkerValidationError ())
-> IO WorkerValidationError -> IO (Either WorkerValidationError ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ForeignPtr CWorkerValidationError
-> (Ptr CWorkerValidationError -> IO WorkerValidationError)
-> IO WorkerValidationError
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr ForeignPtr CWorkerValidationError
err (Ptr CWorkerValidationError -> IO CWorkerValidationError
forall a. Storable a => Ptr a -> IO a
peek (Ptr CWorkerValidationError -> IO CWorkerValidationError)
-> (CWorkerValidationError -> IO WorkerValidationError)
-> Ptr CWorkerValidationError
-> IO WorkerValidationError
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> CWorkerValidationError -> IO WorkerValidationError
peekWorkerValidationError)
    Right ForeignPtr CUnit
_ -> Either WorkerValidationError ()
-> IO (Either WorkerValidationError ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either WorkerValidationError ()
 -> IO (Either WorkerValidationError ()))
-> Either WorkerValidationError ()
-> IO (Either WorkerValidationError ())
forall a b. (a -> b) -> a -> b
$ () -> Either WorkerValidationError ()
forall a b. b -> Either a b
Right ()


foreign import ccall "&hs_temporal_drop_worker" raw_closeWorker :: FinalizerPtr (Worker ty)


getWorkerError :: Ptr CWorkerError -> IO WorkerError
getWorkerError :: Ptr CWorkerError -> IO WorkerError
getWorkerError Ptr CWorkerError
errPtr = do
  fp <- FinalizerPtr CWorkerError
-> Ptr CWorkerError -> IO (ForeignPtr CWorkerError)
forall a. FinalizerPtr a -> Ptr a -> IO (ForeignPtr a)
newForeignPtr FinalizerPtr CWorkerError
rust_dropWorkerError Ptr CWorkerError
errPtr
  withForeignPtr fp (peek >=> peekWorkerError)


-- note: removed the Runtime argument from the C function since the runtime can be accessed from the client. Might want to add it back later if
-- it is some sort of load-bearing memory management thing.
newWorker :: Client -> WorkerConfig -> IO (Either WorkerError (Worker 'Real))
newWorker :: Client -> WorkerConfig -> IO (Either WorkerError (Worker 'Real))
newWorker Client
c WorkerConfig
wc = Client
-> (Ptr CoreClient -> IO (Either WorkerError (Worker 'Real)))
-> IO (Either WorkerError (Worker 'Real))
forall a. Client -> (Ptr CoreClient -> IO a) -> IO a
withClient Client
c ((Ptr CoreClient -> IO (Either WorkerError (Worker 'Real)))
 -> IO (Either WorkerError (Worker 'Real)))
-> (Ptr CoreClient -> IO (Either WorkerError (Worker 'Real)))
-> IO (Either WorkerError (Worker 'Real))
forall a b. (a -> b) -> a -> b
$ \Ptr CoreClient
cPtr -> do
  ByteString
-> (Ptr (CArray Word8) -> IO (Either WorkerError (Worker 'Real)))
-> IO (Either WorkerError (Worker 'Real))
forall b. ByteString -> (Ptr (CArray Word8) -> IO b) -> IO b
withCArrayBS (LazyByteString -> ByteString
BL.toStrict (LazyByteString -> ByteString) -> LazyByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ WorkerConfig -> LazyByteString
forall a. ToJSON a => a -> LazyByteString
encode WorkerConfig
wc) ((Ptr (CArray Word8) -> IO (Either WorkerError (Worker 'Real)))
 -> IO (Either WorkerError (Worker 'Real)))
-> (Ptr (CArray Word8) -> IO (Either WorkerError (Worker 'Real)))
-> IO (Either WorkerError (Worker 'Real))
forall a b. (a -> b) -> a -> b
$ \Ptr (CArray Word8)
wcPtr -> do
    wPtrPtrFP <- IO (ForeignPtr (Ptr (Worker 'Real)))
forall a. Storable a => IO (ForeignPtr a)
mallocForeignPtr
    errPtrPtrFP <- mallocForeignPtr
    withForeignPtr wPtrPtrFP $ \Ptr (Ptr (Worker 'Real))
wPtrPtr -> do
      ForeignPtr (Ptr CWorkerError)
-> (Ptr (Ptr CWorkerError)
    -> IO (Either WorkerError (Worker 'Real)))
-> IO (Either WorkerError (Worker 'Real))
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr ForeignPtr (Ptr CWorkerError)
errPtrPtrFP ((Ptr (Ptr CWorkerError) -> IO (Either WorkerError (Worker 'Real)))
 -> IO (Either WorkerError (Worker 'Real)))
-> (Ptr (Ptr CWorkerError)
    -> IO (Either WorkerError (Worker 'Real)))
-> IO (Either WorkerError (Worker 'Real))
forall a b. (a -> b) -> a -> b
$ \Ptr (Ptr CWorkerError)
errPtrPtr -> do
        Ptr (Ptr (Worker 'Real)) -> Ptr (Worker 'Real) -> IO ()
forall a. Storable a => Ptr a -> a -> IO ()
poke Ptr (Ptr (Worker 'Real))
wPtrPtr Ptr (Worker 'Real)
forall a. Ptr a
nullPtr
        Ptr (Ptr CWorkerError) -> Ptr CWorkerError -> IO ()
forall a. Storable a => Ptr a -> a -> IO ()
poke Ptr (Ptr CWorkerError)
errPtrPtr Ptr CWorkerError
forall a. Ptr a
nullPtr

        IO (Either WorkerError (Worker 'Real))
-> IO (Either WorkerError (Worker 'Real))
forall a. IO a -> IO a
mask_ (IO (Either WorkerError (Worker 'Real))
 -> IO (Either WorkerError (Worker 'Real)))
-> IO (Either WorkerError (Worker 'Real))
-> IO (Either WorkerError (Worker 'Real))
forall a b. (a -> b) -> a -> b
$ do
          Ptr CoreClient
-> Ptr (CArray Word8)
-> Ptr (Ptr (Worker 'Real))
-> Ptr (Ptr CWorkerError)
-> IO ()
raw_newWorker Ptr CoreClient
cPtr Ptr (CArray Word8)
wcPtr Ptr (Ptr (Worker 'Real))
wPtrPtr Ptr (Ptr CWorkerError)
errPtrPtr
          errPtr <- Ptr (Ptr CWorkerError) -> IO (Ptr CWorkerError)
forall a. Storable a => Ptr a -> IO a
peek Ptr (Ptr CWorkerError)
errPtrPtr
          if errPtr == nullPtr
            then do
              wPtr <- peek wPtrPtr
              wPtrFP <- newForeignPtr raw_closeWorker wPtr
              wPtrRef <- newIORef wPtrFP
              pure $ Right $ Worker wPtrRef wc c (clientRuntime c)
            else Left <$> getWorkerError errPtr


data WorkerAlreadyClosed = WorkerAlreadyClosed
  deriving stock (Int -> WorkerAlreadyClosed -> ShowS
[WorkerAlreadyClosed] -> ShowS
WorkerAlreadyClosed -> String
(Int -> WorkerAlreadyClosed -> ShowS)
-> (WorkerAlreadyClosed -> String)
-> ([WorkerAlreadyClosed] -> ShowS)
-> Show WorkerAlreadyClosed
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> WorkerAlreadyClosed -> ShowS
showsPrec :: Int -> WorkerAlreadyClosed -> ShowS
$cshow :: WorkerAlreadyClosed -> String
show :: WorkerAlreadyClosed -> String
$cshowList :: [WorkerAlreadyClosed] -> ShowS
showList :: [WorkerAlreadyClosed] -> ShowS
Show)


instance Exception WorkerAlreadyClosed


closeWorker :: Worker ty -> IO ()
closeWorker :: forall (ty :: WorkerType). Worker ty -> IO ()
closeWorker (Worker IORef (ForeignPtr (Worker ty))
w WorkerConfig
_ InactiveForReplay ty Client
_ Runtime
_) = IO () -> IO ()
forall a. IO a -> IO a
mask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
  wp <- IORef (ForeignPtr (Worker ty))
-> (ForeignPtr (Worker ty)
    -> (ForeignPtr (Worker ty), ForeignPtr (Worker ty)))
-> IO (ForeignPtr (Worker ty))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (ForeignPtr (Worker ty))
w ((ForeignPtr (Worker ty)
  -> (ForeignPtr (Worker ty), ForeignPtr (Worker ty)))
 -> IO (ForeignPtr (Worker ty)))
-> (ForeignPtr (Worker ty)
    -> (ForeignPtr (Worker ty), ForeignPtr (Worker ty)))
-> IO (ForeignPtr (Worker ty))
forall a b. (a -> b) -> a -> b
$ \ForeignPtr (Worker ty)
wp -> (WorkerAlreadyClosed -> ForeignPtr (Worker ty)
forall a e. (?callStack::CallStack, Exception e) => e -> a
throw WorkerAlreadyClosed
WorkerAlreadyClosed, ForeignPtr (Worker ty)
wp)
  finalizeForeignPtr wp


foreign import ccall "hs_temporal_new_replay_worker" raw_newReplayWorker :: Ptr Runtime -> Ptr (CArray Word8) -> Ptr (Ptr (Worker 'Replay)) -> Ptr (Ptr HistoryPusher) -> Ptr (Ptr CWorkerError) -> IO ()


newReplayWorker :: Runtime -> WorkerConfig -> IO (Either WorkerError (Worker 'Replay, HistoryPusher))
newReplayWorker :: Runtime
-> WorkerConfig
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher))
newReplayWorker Runtime
r WorkerConfig
conf = Runtime
-> (Ptr Runtime
    -> IO (Either WorkerError (Worker 'Replay, HistoryPusher)))
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher))
forall a. Runtime -> (Ptr Runtime -> IO a) -> IO a
withRuntime Runtime
r ((Ptr Runtime
  -> IO (Either WorkerError (Worker 'Replay, HistoryPusher)))
 -> IO (Either WorkerError (Worker 'Replay, HistoryPusher)))
-> (Ptr Runtime
    -> IO (Either WorkerError (Worker 'Replay, HistoryPusher)))
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher))
forall a b. (a -> b) -> a -> b
$ \Ptr Runtime
rPtr -> do
  (Ptr (Ptr (Worker 'Replay))
 -> IO (Either WorkerError (Worker 'Replay, HistoryPusher)))
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher))
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca ((Ptr (Ptr (Worker 'Replay))
  -> IO (Either WorkerError (Worker 'Replay, HistoryPusher)))
 -> IO (Either WorkerError (Worker 'Replay, HistoryPusher)))
-> (Ptr (Ptr (Worker 'Replay))
    -> IO (Either WorkerError (Worker 'Replay, HistoryPusher)))
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher))
forall a b. (a -> b) -> a -> b
$ \Ptr (Ptr (Worker 'Replay))
wPtrPtr -> do
    (Ptr (Ptr HistoryPusher)
 -> IO (Either WorkerError (Worker 'Replay, HistoryPusher)))
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher))
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca ((Ptr (Ptr HistoryPusher)
  -> IO (Either WorkerError (Worker 'Replay, HistoryPusher)))
 -> IO (Either WorkerError (Worker 'Replay, HistoryPusher)))
-> (Ptr (Ptr HistoryPusher)
    -> IO (Either WorkerError (Worker 'Replay, HistoryPusher)))
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher))
forall a b. (a -> b) -> a -> b
$ \Ptr (Ptr HistoryPusher)
hpPtrPtr -> do
      ByteString
-> (Ptr (CArray Word8)
    -> IO (Either WorkerError (Worker 'Replay, HistoryPusher)))
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher))
forall b. ByteString -> (Ptr (CArray Word8) -> IO b) -> IO b
withCArrayBS (LazyByteString -> ByteString
BL.toStrict (LazyByteString -> ByteString) -> LazyByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ WorkerConfig -> LazyByteString
forall a. ToJSON a => a -> LazyByteString
encode WorkerConfig
conf) ((Ptr (CArray Word8)
  -> IO (Either WorkerError (Worker 'Replay, HistoryPusher)))
 -> IO (Either WorkerError (Worker 'Replay, HistoryPusher)))
-> (Ptr (CArray Word8)
    -> IO (Either WorkerError (Worker 'Replay, HistoryPusher)))
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher))
forall a b. (a -> b) -> a -> b
$ \Ptr (CArray Word8)
confPtr -> do
        (Ptr (Ptr CWorkerError)
 -> IO (Either WorkerError (Worker 'Replay, HistoryPusher)))
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher))
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca ((Ptr (Ptr CWorkerError)
  -> IO (Either WorkerError (Worker 'Replay, HistoryPusher)))
 -> IO (Either WorkerError (Worker 'Replay, HistoryPusher)))
-> (Ptr (Ptr CWorkerError)
    -> IO (Either WorkerError (Worker 'Replay, HistoryPusher)))
-> IO (Either WorkerError (Worker 'Replay, HistoryPusher))
forall a b. (a -> b) -> a -> b
$ \Ptr (Ptr CWorkerError)
errPtrPtr -> do
          Ptr (Ptr (Worker 'Replay)) -> Ptr (Worker 'Replay) -> IO ()
forall a. Storable a => Ptr a -> a -> IO ()
poke Ptr (Ptr (Worker 'Replay))
wPtrPtr Ptr (Worker 'Replay)
forall a. Ptr a
nullPtr
          Ptr (Ptr HistoryPusher) -> Ptr HistoryPusher -> IO ()
forall a. Storable a => Ptr a -> a -> IO ()
poke Ptr (Ptr HistoryPusher)
hpPtrPtr Ptr HistoryPusher
forall a. Ptr a
nullPtr
          Ptr (Ptr CWorkerError) -> Ptr CWorkerError -> IO ()
forall a. Storable a => Ptr a -> a -> IO ()
poke Ptr (Ptr CWorkerError)
errPtrPtr Ptr CWorkerError
forall a. Ptr a
nullPtr

          Ptr Runtime
-> Ptr (CArray Word8)
-> Ptr (Ptr (Worker 'Replay))
-> Ptr (Ptr HistoryPusher)
-> Ptr (Ptr CWorkerError)
-> IO ()
raw_newReplayWorker Ptr Runtime
rPtr Ptr (CArray Word8)
confPtr Ptr (Ptr (Worker 'Replay))
wPtrPtr Ptr (Ptr HistoryPusher)
hpPtrPtr Ptr (Ptr CWorkerError)
errPtrPtr
          errPtr <- Ptr (Ptr CWorkerError) -> IO (Ptr CWorkerError)
forall a. Storable a => Ptr a -> IO a
peek Ptr (Ptr CWorkerError)
errPtrPtr
          if errPtr == nullPtr
            then do
              wPtr <- peek wPtrPtr
              hpPtr <- peek hpPtrPtr
              w <- newForeignPtr raw_closeWorker wPtr
              wRef <- newIORef w
              pure $ Right (Worker wRef conf () r, HistoryPusher hpPtr)
            else Left <$> getWorkerError errPtr


foreign import ccall "hs_temporal_worker_poll_workflow_activation" raw_pollWorkflowActivation :: Ptr (Worker ty) -> TokioCall CWorkerError (CArray Word8)


pollWorkflowActivation :: KnownWorkerType ty => Worker ty -> IO (Either WorkerError WorkflowActivation)
pollWorkflowActivation :: forall (ty :: WorkerType).
KnownWorkerType ty =>
Worker ty -> IO (Either WorkerError WorkflowActivation)
pollWorkflowActivation Worker ty
w = Worker ty
-> (Ptr (Worker ty) -> IO (Either WorkerError WorkflowActivation))
-> IO (Either WorkerError WorkflowActivation)
forall (ty :: WorkerType) a.
KnownWorkerType ty =>
Worker ty -> (Ptr (Worker ty) -> IO a) -> IO a
withWorker Worker ty
w ((Ptr (Worker ty) -> IO (Either WorkerError WorkflowActivation))
 -> IO (Either WorkerError WorkflowActivation))
-> (Ptr (Worker ty) -> IO (Either WorkerError WorkflowActivation))
-> IO (Either WorkerError WorkflowActivation)
forall a b. (a -> b) -> a -> b
$ \Ptr (Worker ty)
wp -> do
  res <-
    TokioCall CWorkerError (CArray Word8)
-> Maybe (FinalizerPtr CWorkerError)
-> Maybe (FinalizerPtr (CArray Word8))
-> IO
     (Either (ForeignPtr CWorkerError) (ForeignPtr (CArray Word8)))
forall err res.
TokioCall err res
-> Maybe (FinalizerPtr err)
-> Maybe (FinalizerPtr res)
-> IO (Either (ForeignPtr err) (ForeignPtr res))
makeTokioAsyncCall
      (Ptr (Worker ty) -> TokioCall CWorkerError (CArray Word8)
forall (ty :: WorkerType).
Ptr (Worker ty) -> TokioCall CWorkerError (CArray Word8)
raw_pollWorkflowActivation Ptr (Worker ty)
wp)
      (FinalizerPtr CWorkerError -> Maybe (FinalizerPtr CWorkerError)
forall a. a -> Maybe a
Just FinalizerPtr CWorkerError
rust_dropWorkerError)
      (FinalizerPtr (CArray Word8) -> Maybe (FinalizerPtr (CArray Word8))
forall a. a -> Maybe a
Just FinalizerPtr (CArray Word8)
rust_dropByteArray)
  case res of
    Left ForeignPtr CWorkerError
err -> WorkerError -> Either WorkerError WorkflowActivation
forall a b. a -> Either a b
Left (WorkerError -> Either WorkerError WorkflowActivation)
-> IO WorkerError -> IO (Either WorkerError WorkflowActivation)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ForeignPtr CWorkerError
-> (Ptr CWorkerError -> IO WorkerError) -> IO WorkerError
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr ForeignPtr CWorkerError
err (Ptr CWorkerError -> IO CWorkerError
forall a. Storable a => Ptr a -> IO a
peek (Ptr CWorkerError -> IO CWorkerError)
-> (CWorkerError -> IO WorkerError)
-> Ptr CWorkerError
-> IO WorkerError
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> CWorkerError -> IO WorkerError
peekWorkerError)
    Right ForeignPtr (CArray Word8)
ok -> WorkflowActivation -> Either WorkerError WorkflowActivation
forall a b. b -> Either a b
Right (WorkflowActivation -> Either WorkerError WorkflowActivation)
-> (ByteString -> WorkflowActivation)
-> ByteString
-> Either WorkerError WorkflowActivation
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> WorkflowActivation
forall msg. Message msg => ByteString -> msg
decodeMessageOrDie (ByteString -> Either WorkerError WorkflowActivation)
-> IO ByteString -> IO (Either WorkerError WorkflowActivation)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ForeignPtr (CArray Word8)
-> (Ptr (CArray Word8) -> IO ByteString) -> IO ByteString
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr ForeignPtr (CArray Word8)
ok (Ptr (CArray Word8) -> IO (CArray Word8)
forall a. Storable a => Ptr a -> IO a
peek (Ptr (CArray Word8) -> IO (CArray Word8))
-> (CArray Word8 -> IO ByteString)
-> Ptr (CArray Word8)
-> IO ByteString
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> CArray Word8 -> IO ByteString
cArrayToByteString)


foreign import ccall "hs_temporal_worker_poll_activity_task" raw_pollActivityTask :: Ptr (Worker ty) -> TokioCall CWorkerError (CArray Word8)


pollActivityTask :: KnownWorkerType ty => Worker ty -> IO (Either WorkerError ActivityTask)
pollActivityTask :: forall (ty :: WorkerType).
KnownWorkerType ty =>
Worker ty -> IO (Either WorkerError ActivityTask)
pollActivityTask Worker ty
w = Worker ty
-> (Ptr (Worker ty) -> IO (Either WorkerError ActivityTask))
-> IO (Either WorkerError ActivityTask)
forall (ty :: WorkerType) a.
KnownWorkerType ty =>
Worker ty -> (Ptr (Worker ty) -> IO a) -> IO a
withWorker Worker ty
w ((Ptr (Worker ty) -> IO (Either WorkerError ActivityTask))
 -> IO (Either WorkerError ActivityTask))
-> (Ptr (Worker ty) -> IO (Either WorkerError ActivityTask))
-> IO (Either WorkerError ActivityTask)
forall a b. (a -> b) -> a -> b
$ \Ptr (Worker ty)
wp -> do
  res <-
    TokioCall CWorkerError (CArray Word8)
-> Maybe (FinalizerPtr CWorkerError)
-> Maybe (FinalizerPtr (CArray Word8))
-> IO
     (Either (ForeignPtr CWorkerError) (ForeignPtr (CArray Word8)))
forall err res.
TokioCall err res
-> Maybe (FinalizerPtr err)
-> Maybe (FinalizerPtr res)
-> IO (Either (ForeignPtr err) (ForeignPtr res))
makeTokioAsyncCall
      (Ptr (Worker ty) -> TokioCall CWorkerError (CArray Word8)
forall (ty :: WorkerType).
Ptr (Worker ty) -> TokioCall CWorkerError (CArray Word8)
raw_pollActivityTask Ptr (Worker ty)
wp)
      (FinalizerPtr CWorkerError -> Maybe (FinalizerPtr CWorkerError)
forall a. a -> Maybe a
Just FinalizerPtr CWorkerError
rust_dropWorkerError)
      (FinalizerPtr (CArray Word8) -> Maybe (FinalizerPtr (CArray Word8))
forall a. a -> Maybe a
Just FinalizerPtr (CArray Word8)
rust_dropByteArray)
  case res of
    Left ForeignPtr CWorkerError
err -> WorkerError -> Either WorkerError ActivityTask
forall a b. a -> Either a b
Left (WorkerError -> Either WorkerError ActivityTask)
-> IO WorkerError -> IO (Either WorkerError ActivityTask)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ForeignPtr CWorkerError
-> (Ptr CWorkerError -> IO WorkerError) -> IO WorkerError
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr ForeignPtr CWorkerError
err (Ptr CWorkerError -> IO CWorkerError
forall a. Storable a => Ptr a -> IO a
peek (Ptr CWorkerError -> IO CWorkerError)
-> (CWorkerError -> IO WorkerError)
-> Ptr CWorkerError
-> IO WorkerError
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> CWorkerError -> IO WorkerError
peekWorkerError)
    Right ForeignPtr (CArray Word8)
res -> ActivityTask -> Either WorkerError ActivityTask
forall a b. b -> Either a b
Right (ActivityTask -> Either WorkerError ActivityTask)
-> (ByteString -> ActivityTask)
-> ByteString
-> Either WorkerError ActivityTask
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ActivityTask
forall msg. Message msg => ByteString -> msg
decodeMessageOrDie (ByteString -> Either WorkerError ActivityTask)
-> IO ByteString -> IO (Either WorkerError ActivityTask)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ForeignPtr (CArray Word8)
-> (Ptr (CArray Word8) -> IO ByteString) -> IO ByteString
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr ForeignPtr (CArray Word8)
res (Ptr (CArray Word8) -> IO (CArray Word8)
forall a. Storable a => Ptr a -> IO a
peek (Ptr (CArray Word8) -> IO (CArray Word8))
-> (CArray Word8 -> IO ByteString)
-> Ptr (CArray Word8)
-> IO ByteString
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> CArray Word8 -> IO ByteString
cArrayToByteString)


foreign import ccall "hs_temporal_worker_complete_workflow_activation" raw_completeWorkflowActivation :: Ptr (Worker ty) -> Ptr (CArray Word8) -> TokioCall CWorkerError CUnit


completeWorkflowActivation :: KnownWorkerType ty => Worker ty -> WorkflowActivationCompletion -> IO (Either WorkerError ())
completeWorkflowActivation :: forall (ty :: WorkerType).
KnownWorkerType ty =>
Worker ty
-> WorkflowActivationCompletion -> IO (Either WorkerError ())
completeWorkflowActivation Worker ty
w WorkflowActivationCompletion
p = Worker ty
-> (Ptr (Worker ty) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall (ty :: WorkerType) a.
KnownWorkerType ty =>
Worker ty -> (Ptr (Worker ty) -> IO a) -> IO a
withWorker Worker ty
w ((Ptr (Worker ty) -> IO (Either WorkerError ()))
 -> IO (Either WorkerError ()))
-> (Ptr (Worker ty) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall a b. (a -> b) -> a -> b
$ \Ptr (Worker ty)
wp ->
  ByteString
-> (Ptr (CArray Word8) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall b. ByteString -> (Ptr (CArray Word8) -> IO b) -> IO b
withCArrayBS (WorkflowActivationCompletion -> ByteString
forall msg. Message msg => msg -> ByteString
encodeMessage WorkflowActivationCompletion
p) ((Ptr (CArray Word8) -> IO (Either WorkerError ()))
 -> IO (Either WorkerError ()))
-> (Ptr (CArray Word8) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall a b. (a -> b) -> a -> b
$ \Ptr (CArray Word8)
pPtr -> do
    res <-
      TokioCall CWorkerError CUnit
-> Maybe (FinalizerPtr CWorkerError)
-> Maybe (FinalizerPtr CUnit)
-> IO (Either (ForeignPtr CWorkerError) (ForeignPtr CUnit))
forall err res.
TokioCall err res
-> Maybe (FinalizerPtr err)
-> Maybe (FinalizerPtr res)
-> IO (Either (ForeignPtr err) (ForeignPtr res))
makeTokioAsyncCall
        (Ptr (Worker ty)
-> Ptr (CArray Word8) -> TokioCall CWorkerError CUnit
forall (ty :: WorkerType).
Ptr (Worker ty)
-> Ptr (CArray Word8) -> TokioCall CWorkerError CUnit
raw_completeWorkflowActivation Ptr (Worker ty)
wp Ptr (CArray Word8)
pPtr)
        (FinalizerPtr CWorkerError -> Maybe (FinalizerPtr CWorkerError)
forall a. a -> Maybe a
Just FinalizerPtr CWorkerError
rust_dropWorkerError)
        (FinalizerPtr CUnit -> Maybe (FinalizerPtr CUnit)
forall a. a -> Maybe a
Just FinalizerPtr CUnit
rust_dropUnit)
    case res of
      Left ForeignPtr CWorkerError
err -> WorkerError -> Either WorkerError ()
forall a b. a -> Either a b
Left (WorkerError -> Either WorkerError ())
-> IO WorkerError -> IO (Either WorkerError ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ForeignPtr CWorkerError
-> (Ptr CWorkerError -> IO WorkerError) -> IO WorkerError
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr ForeignPtr CWorkerError
err (Ptr CWorkerError -> IO CWorkerError
forall a. Storable a => Ptr a -> IO a
peek (Ptr CWorkerError -> IO CWorkerError)
-> (CWorkerError -> IO WorkerError)
-> Ptr CWorkerError
-> IO WorkerError
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> CWorkerError -> IO WorkerError
peekWorkerError)
      Right ForeignPtr CUnit
_ -> Either WorkerError () -> IO (Either WorkerError ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either WorkerError () -> IO (Either WorkerError ()))
-> Either WorkerError () -> IO (Either WorkerError ())
forall a b. (a -> b) -> a -> b
$ () -> Either WorkerError ()
forall a b. b -> Either a b
Right ()


foreign import ccall "hs_temporal_worker_complete_activity_task" raw_completeActivityTask :: Ptr (Worker ty) -> Ptr (CArray Word8) -> TokioCall CWorkerError CUnit


completeActivityTask :: KnownWorkerType ty => Worker ty -> ActivityTaskCompletion -> IO (Either WorkerError ())
completeActivityTask :: forall (ty :: WorkerType).
KnownWorkerType ty =>
Worker ty -> ActivityTaskCompletion -> IO (Either WorkerError ())
completeActivityTask Worker ty
w ActivityTaskCompletion
p = Worker ty
-> (Ptr (Worker ty) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall (ty :: WorkerType) a.
KnownWorkerType ty =>
Worker ty -> (Ptr (Worker ty) -> IO a) -> IO a
withWorker Worker ty
w ((Ptr (Worker ty) -> IO (Either WorkerError ()))
 -> IO (Either WorkerError ()))
-> (Ptr (Worker ty) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall a b. (a -> b) -> a -> b
$ \Ptr (Worker ty)
wp ->
  ByteString
-> (Ptr (CArray Word8) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall b. ByteString -> (Ptr (CArray Word8) -> IO b) -> IO b
withCArrayBS (ActivityTaskCompletion -> ByteString
forall msg. Message msg => msg -> ByteString
encodeMessage ActivityTaskCompletion
p) ((Ptr (CArray Word8) -> IO (Either WorkerError ()))
 -> IO (Either WorkerError ()))
-> (Ptr (CArray Word8) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall a b. (a -> b) -> a -> b
$ \Ptr (CArray Word8)
pPtr -> do
    res <-
      TokioCall CWorkerError CUnit
-> Maybe (FinalizerPtr CWorkerError)
-> Maybe (FinalizerPtr CUnit)
-> IO (Either (ForeignPtr CWorkerError) (ForeignPtr CUnit))
forall err res.
TokioCall err res
-> Maybe (FinalizerPtr err)
-> Maybe (FinalizerPtr res)
-> IO (Either (ForeignPtr err) (ForeignPtr res))
makeTokioAsyncCall
        (Ptr (Worker ty)
-> Ptr (CArray Word8) -> TokioCall CWorkerError CUnit
forall (ty :: WorkerType).
Ptr (Worker ty)
-> Ptr (CArray Word8) -> TokioCall CWorkerError CUnit
raw_completeActivityTask Ptr (Worker ty)
wp Ptr (CArray Word8)
pPtr)
        (FinalizerPtr CWorkerError -> Maybe (FinalizerPtr CWorkerError)
forall a. a -> Maybe a
Just FinalizerPtr CWorkerError
rust_dropWorkerError)
        (FinalizerPtr CUnit -> Maybe (FinalizerPtr CUnit)
forall a. a -> Maybe a
Just FinalizerPtr CUnit
rust_dropUnit)
    case res of
      Left ForeignPtr CWorkerError
err -> WorkerError -> Either WorkerError ()
forall a b. a -> Either a b
Left (WorkerError -> Either WorkerError ())
-> IO WorkerError -> IO (Either WorkerError ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ForeignPtr CWorkerError
-> (Ptr CWorkerError -> IO WorkerError) -> IO WorkerError
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr ForeignPtr CWorkerError
err (Ptr CWorkerError -> IO CWorkerError
forall a. Storable a => Ptr a -> IO a
peek (Ptr CWorkerError -> IO CWorkerError)
-> (CWorkerError -> IO WorkerError)
-> Ptr CWorkerError
-> IO WorkerError
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> CWorkerError -> IO WorkerError
peekWorkerError)
      Right ForeignPtr CUnit
_ -> Either WorkerError () -> IO (Either WorkerError ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either WorkerError () -> IO (Either WorkerError ()))
-> Either WorkerError () -> IO (Either WorkerError ())
forall a b. (a -> b) -> a -> b
$ () -> Either WorkerError ()
forall a b. b -> Either a b
Right ()


foreign import ccall "hs_temporal_worker_record_activity_heartbeat" raw_recordActivityHeartbeat :: Ptr (Worker ty) -> Ptr (CArray Word8) -> Ptr (Ptr CWorkerError) -> Ptr (Ptr CUnit) -> IO ()


recordActivityHeartbeat :: KnownWorkerType ty => Worker ty -> ActivityHeartbeat -> IO (Either WorkerError ())
recordActivityHeartbeat :: forall (ty :: WorkerType).
KnownWorkerType ty =>
Worker ty -> ActivityHeartbeat -> IO (Either WorkerError ())
recordActivityHeartbeat Worker ty
w ActivityHeartbeat
p = Worker ty
-> (Ptr (Worker ty) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall (ty :: WorkerType) a.
KnownWorkerType ty =>
Worker ty -> (Ptr (Worker ty) -> IO a) -> IO a
withWorker Worker ty
w ((Ptr (Worker ty) -> IO (Either WorkerError ()))
 -> IO (Either WorkerError ()))
-> (Ptr (Worker ty) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall a b. (a -> b) -> a -> b
$ \Ptr (Worker ty)
wp ->
  ByteString
-> (Ptr (CArray Word8) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall b. ByteString -> (Ptr (CArray Word8) -> IO b) -> IO b
withCArrayBS (ActivityHeartbeat -> ByteString
forall msg. Message msg => msg -> ByteString
encodeMessage ActivityHeartbeat
p) ((Ptr (CArray Word8) -> IO (Either WorkerError ()))
 -> IO (Either WorkerError ()))
-> (Ptr (CArray Word8) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall a b. (a -> b) -> a -> b
$ \Ptr (CArray Word8)
pPtr -> do
    (Ptr (Ptr CWorkerError) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca ((Ptr (Ptr CWorkerError) -> IO (Either WorkerError ()))
 -> IO (Either WorkerError ()))
-> (Ptr (Ptr CWorkerError) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall a b. (a -> b) -> a -> b
$ \Ptr (Ptr CWorkerError)
errPtrPtr -> do
      (Ptr (Ptr CUnit) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca ((Ptr (Ptr CUnit) -> IO (Either WorkerError ()))
 -> IO (Either WorkerError ()))
-> (Ptr (Ptr CUnit) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall a b. (a -> b) -> a -> b
$ \Ptr (Ptr CUnit)
resPtrPtr -> IO (Either WorkerError ()) -> IO (Either WorkerError ())
forall a. IO a -> IO a
mask_ (IO (Either WorkerError ()) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ()) -> IO (Either WorkerError ())
forall a b. (a -> b) -> a -> b
$ do
        Ptr (Ptr CWorkerError) -> Ptr CWorkerError -> IO ()
forall a. Storable a => Ptr a -> a -> IO ()
poke Ptr (Ptr CWorkerError)
errPtrPtr Ptr CWorkerError
forall a. Ptr a
nullPtr
        Ptr (Ptr CUnit) -> Ptr CUnit -> IO ()
forall a. Storable a => Ptr a -> a -> IO ()
poke Ptr (Ptr CUnit)
resPtrPtr Ptr CUnit
forall a. Ptr a
nullPtr
        Ptr (Worker ty)
-> Ptr (CArray Word8)
-> Ptr (Ptr CWorkerError)
-> Ptr (Ptr CUnit)
-> IO ()
forall (ty :: WorkerType).
Ptr (Worker ty)
-> Ptr (CArray Word8)
-> Ptr (Ptr CWorkerError)
-> Ptr (Ptr CUnit)
-> IO ()
raw_recordActivityHeartbeat Ptr (Worker ty)
wp Ptr (CArray Word8)
pPtr Ptr (Ptr CWorkerError)
errPtrPtr Ptr (Ptr CUnit)
resPtrPtr
        errPtr <- Ptr (Ptr CWorkerError) -> IO (Ptr CWorkerError)
forall a. Storable a => Ptr a -> IO a
peek Ptr (Ptr CWorkerError)
errPtrPtr
        if errPtr == nullPtr
          then do
            rust_dropUnitNow =<< peek resPtrPtr
            pure $ Right ()
          else Left <$> getWorkerError errPtr


foreign import ccall "hs_temporal_worker_request_workflow_eviction" raw_requestWorkflowEviction :: Ptr (Worker ty) -> Ptr (CArray Word8) -> IO ()


requestWorkflowEviction :: KnownWorkerType ty => Worker ty -> RunId -> IO ()
requestWorkflowEviction :: forall (ty :: WorkerType).
KnownWorkerType ty =>
Worker ty -> ByteString -> IO ()
requestWorkflowEviction Worker ty
w ByteString
r = Worker ty -> (Ptr (Worker ty) -> IO ()) -> IO ()
forall (ty :: WorkerType) a.
KnownWorkerType ty =>
Worker ty -> (Ptr (Worker ty) -> IO a) -> IO a
withWorker Worker ty
w ((Ptr (Worker ty) -> IO ()) -> IO ())
-> (Ptr (Worker ty) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Ptr (Worker ty)
wp ->
  ByteString -> (Ptr (CArray Word8) -> IO ()) -> IO ()
forall b. ByteString -> (Ptr (CArray Word8) -> IO b) -> IO b
withCArrayBS ByteString
r ((Ptr (CArray Word8) -> IO ()) -> IO ())
-> (Ptr (CArray Word8) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Ptr (CArray Word8)
rPtr -> do
    Ptr (Worker ty) -> Ptr (CArray Word8) -> IO ()
forall (ty :: WorkerType).
Ptr (Worker ty) -> Ptr (CArray Word8) -> IO ()
raw_requestWorkflowEviction Ptr (Worker ty)
wp Ptr (CArray Word8)
rPtr


foreign import ccall "hs_temporal_worker_initiate_shutdown" raw_initiateShutdown :: Ptr (Worker ty) -> IO ()


-- | Initiate shutdown.
initiateShutdown :: KnownWorkerType ty => Worker ty -> IO ()
initiateShutdown :: forall (ty :: WorkerType). KnownWorkerType ty => Worker ty -> IO ()
initiateShutdown Worker ty
w = Worker ty -> (Ptr (Worker ty) -> IO ()) -> IO ()
forall (ty :: WorkerType) a.
KnownWorkerType ty =>
Worker ty -> (Ptr (Worker ty) -> IO a) -> IO a
withWorker Worker ty
w Ptr (Worker ty) -> IO ()
forall (ty :: WorkerType). Ptr (Worker ty) -> IO ()
raw_initiateShutdown


foreign import ccall "hs_temporal_worker_finalize_shutdown" raw_finalizeShutdown :: Ptr (Worker ty) -> TokioCall CWorkerError CUnit


{- |
Completes shutdown and frees all resources. You should avoid simply dropping workers, as
this does not allow async tasks to report any panics that may have occurred cleanly.

This should be called only after 'initiateShutdown' has resolved and/or both polling
functions have returned `ShutDown` errors.
-}
finalizeShutdown :: KnownWorkerType ty => Worker ty -> IO (Either WorkerError ())
finalizeShutdown :: forall (ty :: WorkerType).
KnownWorkerType ty =>
Worker ty -> IO (Either WorkerError ())
finalizeShutdown Worker ty
w = Worker ty
-> (Ptr (Worker ty) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall (ty :: WorkerType) a.
KnownWorkerType ty =>
Worker ty -> (Ptr (Worker ty) -> IO a) -> IO a
withWorker Worker ty
w ((Ptr (Worker ty) -> IO (Either WorkerError ()))
 -> IO (Either WorkerError ()))
-> (Ptr (Worker ty) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall a b. (a -> b) -> a -> b
$ \Ptr (Worker ty)
wp -> do
  res <-
    TokioCall CWorkerError CUnit
-> Maybe (FinalizerPtr CWorkerError)
-> Maybe (FinalizerPtr CUnit)
-> IO (Either (ForeignPtr CWorkerError) (ForeignPtr CUnit))
forall err res.
TokioCall err res
-> Maybe (FinalizerPtr err)
-> Maybe (FinalizerPtr res)
-> IO (Either (ForeignPtr err) (ForeignPtr res))
makeTokioAsyncCall
      (Ptr (Worker ty) -> TokioCall CWorkerError CUnit
forall (ty :: WorkerType).
Ptr (Worker ty) -> TokioCall CWorkerError CUnit
raw_finalizeShutdown Ptr (Worker ty)
wp)
      (FinalizerPtr CWorkerError -> Maybe (FinalizerPtr CWorkerError)
forall a. a -> Maybe a
Just FinalizerPtr CWorkerError
rust_dropWorkerError)
      (FinalizerPtr CUnit -> Maybe (FinalizerPtr CUnit)
forall a. a -> Maybe a
Just FinalizerPtr CUnit
rust_dropUnit)
  case res of
    Left ForeignPtr CWorkerError
err -> WorkerError -> Either WorkerError ()
forall a b. a -> Either a b
Left (WorkerError -> Either WorkerError ())
-> IO WorkerError -> IO (Either WorkerError ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ForeignPtr CWorkerError
-> (Ptr CWorkerError -> IO WorkerError) -> IO WorkerError
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr ForeignPtr CWorkerError
err (Ptr CWorkerError -> IO CWorkerError
forall a. Storable a => Ptr a -> IO a
peek (Ptr CWorkerError -> IO CWorkerError)
-> (CWorkerError -> IO WorkerError)
-> Ptr CWorkerError
-> IO WorkerError
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> CWorkerError -> IO WorkerError
peekWorkerError)
    Right ForeignPtr CUnit
_ -> Either WorkerError () -> IO (Either WorkerError ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either WorkerError () -> IO (Either WorkerError ()))
-> Either WorkerError () -> IO (Either WorkerError ())
forall a b. (a -> b) -> a -> b
$ () -> Either WorkerError ()
forall a b. b -> Either a b
Right ()


foreign import ccall "hs_temporal_history_pusher_push_history" raw_pushHistory :: Ptr HistoryPusher -> Ptr (CArray Word8) -> Ptr (CArray Word8) -> TokioCall CWorkerError CUnit


pushHistory :: HistoryPusher -> WorkflowId -> Either ByteString History -> IO (Either WorkerError ())
pushHistory :: HistoryPusher
-> ByteString
-> Either ByteString History
-> IO (Either WorkerError ())
pushHistory (HistoryPusher Ptr HistoryPusher
hp) ByteString
wf Either ByteString History
p =
  ByteString
-> (Ptr (CArray Word8) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall b. ByteString -> (Ptr (CArray Word8) -> IO b) -> IO b
withCArrayBS ByteString
wf ((Ptr (CArray Word8) -> IO (Either WorkerError ()))
 -> IO (Either WorkerError ()))
-> (Ptr (CArray Word8) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall a b. (a -> b) -> a -> b
$ \Ptr (CArray Word8)
wfPtr -> do
    ByteString
-> (Ptr (CArray Word8) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall b. ByteString -> (Ptr (CArray Word8) -> IO b) -> IO b
withCArrayBS ((ByteString -> ByteString)
-> (History -> ByteString)
-> Either ByteString History
-> ByteString
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either ByteString -> ByteString
forall a. a -> a
id History -> ByteString
forall msg. Message msg => msg -> ByteString
encodeMessage Either ByteString History
p) ((Ptr (CArray Word8) -> IO (Either WorkerError ()))
 -> IO (Either WorkerError ()))
-> (Ptr (CArray Word8) -> IO (Either WorkerError ()))
-> IO (Either WorkerError ())
forall a b. (a -> b) -> a -> b
$ \Ptr (CArray Word8)
pPtr -> do
      res <-
        TokioCall CWorkerError CUnit
-> Maybe (FinalizerPtr CWorkerError)
-> Maybe (FinalizerPtr CUnit)
-> IO (Either (ForeignPtr CWorkerError) (ForeignPtr CUnit))
forall err res.
TokioCall err res
-> Maybe (FinalizerPtr err)
-> Maybe (FinalizerPtr res)
-> IO (Either (ForeignPtr err) (ForeignPtr res))
makeTokioAsyncCall
          (Ptr HistoryPusher
-> Ptr (CArray Word8)
-> Ptr (CArray Word8)
-> TokioCall CWorkerError CUnit
raw_pushHistory Ptr HistoryPusher
hp Ptr (CArray Word8)
wfPtr Ptr (CArray Word8)
pPtr)
          (FinalizerPtr CWorkerError -> Maybe (FinalizerPtr CWorkerError)
forall a. a -> Maybe a
Just FinalizerPtr CWorkerError
rust_dropWorkerError)
          (FinalizerPtr CUnit -> Maybe (FinalizerPtr CUnit)
forall a. a -> Maybe a
Just FinalizerPtr CUnit
rust_dropUnit)
      case res of
        Left ForeignPtr CWorkerError
err -> WorkerError -> Either WorkerError ()
forall a b. a -> Either a b
Left (WorkerError -> Either WorkerError ())
-> IO WorkerError -> IO (Either WorkerError ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ForeignPtr CWorkerError
-> (Ptr CWorkerError -> IO WorkerError) -> IO WorkerError
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr ForeignPtr CWorkerError
err (Ptr CWorkerError -> IO CWorkerError
forall a. Storable a => Ptr a -> IO a
peek (Ptr CWorkerError -> IO CWorkerError)
-> (CWorkerError -> IO WorkerError)
-> Ptr CWorkerError
-> IO WorkerError
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> CWorkerError -> IO WorkerError
peekWorkerError)
        Right ForeignPtr CUnit
_ -> Either WorkerError () -> IO (Either WorkerError ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either WorkerError () -> IO (Either WorkerError ()))
-> Either WorkerError () -> IO (Either WorkerError ())
forall a b. (a -> b) -> a -> b
$ () -> Either WorkerError ()
forall a b. b -> Either a b
Right ()


foreign import ccall "hs_temporal_history_pusher_close" raw_closeHistoryPusher :: Ptr HistoryPusher -> IO ()


closeHistory :: HistoryPusher -> IO ()
closeHistory :: HistoryPusher -> IO ()
closeHistory (HistoryPusher Ptr HistoryPusher
hp) =
  Ptr HistoryPusher -> IO ()
raw_closeHistoryPusher Ptr HistoryPusher
hp