Skip to content

Commit

Permalink
Add grpc channel arguments about keepalive
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP committed Oct 24, 2023
1 parent 2b75655 commit 60199f2
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 18 deletions.
13 changes: 10 additions & 3 deletions hs-grpc-server/HsGrpc/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ runServer :: ServerOptions -> [ServiceHandler] -> IO ()
runServer ServerOptions{..} handlers = do
server <- newAsioServer
serverHost serverPort serverParallelism
serverSslOptions serverInterceptors
serverSslOptions
serverChannelArgs
serverInterceptors
runAsioGrpc server handlers serverOnStarted serverInternalChannelSize

-------------------------------------------------------------------------------
Expand All @@ -108,14 +110,19 @@ newAsioServer
-> Int -- ^ port
-> Int -- ^ parallelism
-> Maybe SslServerCredentialsOptions
-> [ChannelArg]
-> [ServerInterceptor]
-> IO AsioServer
newAsioServer host port parallelism m_sslOpts interceptors = do
newAsioServer host port parallelism m_sslOpts chanArgs interceptors = do
ptr <-
HF.withShortByteString host $ \host' host_len ->
HF.withMaybePtr m_sslOpts withSslServerCredentialsOptions $ \sslOpts' ->
withChannelArgs chanArgs $ \chanArgs' chanArgs_size ->
HF.withPrimList (map toCItcptFact interceptors) $ \intcept' intcept_size ->
new_asio_server host' host_len port parallelism sslOpts' intcept' intcept_size
new_asio_server host' host_len port parallelism
sslOpts'
chanArgs' chanArgs_size
intcept' intcept_size
if ptr == nullPtr then Ex.throwIO $ ServerException "newGrpcServer failed!"
else newForeignPtr delete_asio_server_fun ptr
where
Expand Down
2 changes: 2 additions & 0 deletions hs-grpc-server/HsGrpc/Server/FFI.hs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ foreign import ccall unsafe "new_asio_server"
-- ^ parallelism
-> Ptr SslServerCredentialsOptions
-- ^ tls options
-> Ptr ChannelArg -> Int
-- ^ Grpc Channel arguments
-> Ptr (Ptr CServerInterceptorFactory) -> Int
-- ^ Interceptors
-> IO (Ptr CppAsioServer)
Expand Down
109 changes: 99 additions & 10 deletions hs-grpc-server/HsGrpc/Server/Types.hsc
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE MagicHash #-}
{-# LANGUAGE PatternSynonyms #-}

module HsGrpc.Server.Types
Expand Down Expand Up @@ -54,6 +56,16 @@ module HsGrpc.Server.Types
, CServerInterceptorFactory
, ServerInterceptor (..)

-- * Channel arguments
, ChannelArg
, mk_GRPC_ARG_KEEPALIVE_TIME_MS
, mk_GRPC_ARG_KEEPALIVE_TIMEOUT_MS
, mk_GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS
, mk_GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS
, mk_GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA
, mk_GRPC_ARG_HTTP2_MAX_PING_STRIKES
, withChannelArgs -- XXX: this function should be in a Internal module

-- * Internal Types
, Request (..)
, Response (..)
Expand All @@ -75,16 +87,18 @@ module HsGrpc.Server.Types
) where

import Control.Exception (Exception, bracket, throwIO)
import Control.Monad (forM_)
import Data.ByteString (ByteString)
import Data.ByteString.Short (ShortByteString)
import qualified Data.ByteString.Unsafe as BS
import Data.Maybe (isJust)
import Data.ProtoLens.Service.Types (StreamingType (..))
import Data.Text (Text)
import Data.Word (Word64, Word8)
import Foreign.Marshal.Alloc (allocaBytesAligned)
import Foreign.Ptr (FunPtr, Ptr, freeHaskellFunPtr,
nullPtr)
import Foreign.C.Types
import Foreign.Marshal.Alloc
import Foreign.Marshal.Array
import Foreign.Ptr
import Foreign.StablePtr (StablePtr)
import Foreign.Storable (Storable (..))
import GHC.Conc (PrimMVar)
Expand All @@ -93,6 +107,7 @@ import qualified HsForeign as HF
import HsGrpc.Common.Foreign.Channel
import HsGrpc.Server.Internal.Types

#include <grpc/grpc.h>
#include "hs_grpc_server.h"

-------------------------------------------------------------------------------
Expand All @@ -104,6 +119,7 @@ data ServerOptions = ServerOptions
, serverSslOptions :: !(Maybe SslServerCredentialsOptions)
, serverOnStarted :: !(Maybe (IO ()))
, serverInterceptors :: ![ServerInterceptor]
, serverChannelArgs :: ![ChannelArg]
-- The following options are considering as internal
, serverInternalChannelSize :: !Word
}
Expand All @@ -116,6 +132,7 @@ defaultServerOpts = ServerOptions
, serverSslOptions = Nothing
, serverOnStarted = Nothing
, serverInterceptors = []
, serverChannelArgs = []
, serverInternalChannelSize = 2
}

Expand All @@ -126,6 +143,7 @@ instance Show ServerOptions where
<> "port: " <> show serverPort <> ", "
<> "parallelism: " <> show serverParallelism <> ", "
<> "sslOptions: " <> show serverSslOptions <> ", "
<> "channelArgs: " <> show serverChannelArgs <> ", "
<> "onStartedEvent: " <> notifyFn serverOnStarted
<> "}"

Expand Down Expand Up @@ -240,16 +258,11 @@ instance Storable Response where
(#poke hsgrpc::server_response_t, data) ptr data_ptr
(#poke hsgrpc::server_response_t, data_size) ptr data_size
(#poke hsgrpc::server_response_t, status_code) ptr (unStatusCode responseStatusCode)
errmsg_ptr <- maybeNewStdString responseErrorMsg
errmsg_ptr <- HF.maybeNewStdString responseErrorMsg
(#poke hsgrpc::server_response_t, error_msg) ptr errmsg_ptr
errdetails_ptr <- maybeNewStdString responseErrorDetails
errdetails_ptr <- HF.maybeNewStdString responseErrorDetails
(#poke hsgrpc::server_response_t, error_details) ptr errdetails_ptr

-- TODO: upgrade foreign package to use HS.maybeNewStdString
maybeNewStdString :: Maybe ByteString -> IO (Ptr HF.StdString)
maybeNewStdString Nothing = pure nullPtr
maybeNewStdString (Just bs) = HF.withByteString bs $ HF.hs_new_std_string

-------------------------------------------------------------------------------

pattern C_StreamingType_NonStreaming :: Word8
Expand Down Expand Up @@ -468,6 +481,82 @@ newtype StatusCode = StatusCode { unStatusCode :: Int }
, StatusDoNotUse
#-}

-------------------------------------------------------------------------------
-- Grpc channel arguments
--
-- https://grpc.github.io/grpc/core/group__grpc__arg__keys.html

data ChanArgValue
= ChanArgValueInt CInt
| ChanArgValueString ShortByteString
deriving (Show, Eq)

newtype ChannelArg = ChannelArg
{ unChannelArg :: (ShortByteString, ChanArgValue) }
deriving (Show, Eq)

instance Storable ChannelArg where
sizeOf _ = (#size hsgrpc::hs_grpc_channel_arg_t)
alignment _ = (#alignment hsgrpc::hs_grpc_channel_arg_t)
peek _ptr = error "Unimplemented"
poke ptr (ChannelArg (key, val)) = do
key_ptr <- HF.newStdStringFromShort key -- should be deleted on cpp side
(#poke hsgrpc::hs_grpc_channel_arg_t, key) ptr key_ptr
case val of
ChanArgValueInt i -> do
(#poke hsgrpc::hs_grpc_channel_arg_t, type)
ptr
((#const static_cast<uint8_t>(hsgrpc::GrpcChannelArgValType::Int)) :: Word8)
(#poke hsgrpc::hs_grpc_channel_arg_t, value.int_value)
ptr i
ChanArgValueString s -> do
(#poke hsgrpc::hs_grpc_channel_arg_t, type)
ptr
((#const static_cast<uint8_t>(hsgrpc::GrpcChannelArgValType::String)) :: Word8)
value_ptr <- HF.newStdStringFromShort s -- should be deleted on cpp side
(#poke hsgrpc::hs_grpc_channel_arg_t, value.string_value) ptr value_ptr

withChannelArgs :: [ChannelArg] -> (Ptr ChannelArg -> Int -> IO a) -> IO a
withChannelArgs args f = do
let !len = length args
allocaArray @ChannelArg len $ \ptr -> do
forM_ (zip [0..len-1] args) $ \(i, arg) -> pokeElemOff ptr i arg
f ptr len

#define hsc_mk_chan_args(c, vt, vw) \
hsc_printf("pattern %s :: ShortByteString\n", #c, #c); \
hsc_printf("pattern %s = ", #c); hsc_const_str(c); \
hsc_printf("\n"); \
hsc_printf("mk_%s :: %s -> ChannelArg\n", #c, #vt); \
hsc_printf("mk_%s v = ChannelArg (%s, %s v)\n", #c, #c, #vw);

-- | After a duration of this time the client/server pings its peer to see if
-- the transport is still alive. Int valued, milliseconds.
#mk_chan_args GRPC_ARG_KEEPALIVE_TIME_MS, CInt, ChanArgValueInt

-- | After waiting for a duration of this time, if the keepalive ping sender
-- does not receive the ping ack, it will close the transport. Int valued,
-- milliseconds.
#mk_chan_args GRPC_ARG_KEEPALIVE_TIMEOUT_MS, CInt, ChanArgValueInt

-- | Is it permissible to send keepalive pings from the client without any
-- outstanding streams. Int valued, 0(false)/1(true).
#mk_chan_args GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, CInt, ChanArgValueInt

-- | Minimum allowed time between a server receiving successive ping frames
-- without sending any data/header frame. Int valued, milliseconds
#mk_chan_args GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, CInt, ChanArgValueInt

-- | How many pings can the client send before needing to send a
-- data/header frame? (0 indicates that an infinite number of
-- pings can be sent without sending a data frame or header frame)
#mk_chan_args GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, CInt, ChanArgValueInt

-- | How many misbehaving pings the server can bear before sending goaway and
-- closing the transport? (0 indicates that the server can bear an infinite
-- number of misbehaving pings)
#mk_chan_args GRPC_ARG_HTTP2_MAX_PING_STRIKES, CInt, ChanArgValueInt

-------------------------------------------------------------------------------
-- Interceptors

Expand Down
22 changes: 22 additions & 0 deletions hs-grpc-server/cbits/hs_grpc_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,8 @@ CppAsioServer* new_asio_server(
const char* host, HsInt host_len, HsInt port, HsInt parallelism,
// ssl options
hsgrpc::hs_ssl_server_credentials_options_t* ssl_server_opts,
// grpc channel args
hsgrpc::hs_grpc_channel_arg_t* grpc_chan_args, HsInt grpc_chan_args_size,
// interceptors
grpc::experimental::ServerInterceptorFactoryInterface** interceptor_facts,
HsInt interceptors_size) {
Expand Down Expand Up @@ -528,6 +530,26 @@ CppAsioServer* new_asio_server(

builder.RegisterAsyncGenericService(&server_data->service_);

// Set grpc channel args
for (auto i = 0; i < grpc_chan_args_size; ++i) {
auto& arg = grpc_chan_args[i];
switch (arg.type) {
case hsgrpc::GrpcChannelArgValType::Int:
gpr_log(GPR_DEBUG, "AddChannelArgument: (%s, %d)", arg.key->c_str(),
arg.value.int_value);
builder.AddChannelArgument(*arg.key, arg.value.int_value);
delete arg.key;
break;
case hsgrpc::GrpcChannelArgValType::String:
gpr_log(GPR_DEBUG, "AddChannelArgument: (%s, %s)", arg.key->c_str(),
arg.value.string_value->c_str());
builder.AddChannelArgument(*arg.key, *arg.value.string_value);
delete arg.key;
delete arg.value.string_value;
break;
}
}

if (interceptors_size > 0) {
std::vector<
std::unique_ptr<grpc::experimental::ServerInterceptorFactoryInterface>>
Expand Down
6 changes: 3 additions & 3 deletions hs-grpc-server/hs-grpc-server.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ library

build-depends:
, async ^>=2.2
, base >=4.14 && <5
, bytestring >=0.10 && <0.12
, foreign ^>=0.2
, base >=4.14 && <5
, bytestring >=0.10 && <0.12
, foreign ^>=0.2.1
, ghc-prim
, microlens
, primitive
Expand Down
15 changes: 13 additions & 2 deletions hs-grpc-server/include/hs_grpc_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ using ChannelOut = asio::experimental::concurrent_channel<void(

// FIXME: use a lightweight structure instead (a real coroutine lock)
//
// While using asio::steady_timer/grpc::Alarm appears to be a promising approach,
// there are some notable concerns:
// While using asio::steady_timer/grpc::Alarm appears to be a promising
// approach, there are some notable concerns:
//
// 1. We must pass an additional Haskell function to C++ in order to halt the
// Haskell handler when the timer expires. This is necessary because the
Expand Down Expand Up @@ -65,6 +65,17 @@ struct hs_ssl_server_credentials_options_t {
grpc_ssl_client_certificate_request_type client_certificate_request;
};

enum class GrpcChannelArgValType : uint8_t { Int, String };

struct hs_grpc_channel_arg_t {
std::string* key;
GrpcChannelArgValType type;
union {
int int_value;
std::string* string_value;
} value;
};

struct server_request_t {
uint8_t* data;
size_t data_size;
Expand Down

0 comments on commit 60199f2

Please sign in to comment.