Add AsyncSchedule tests
4eUeP committed Sep 3, 2023
15 changes: 14 additions & 1 deletion hs-grpc-server/include/hs_grpc_server.h
Expand Up @@ -17,7 +17,20 @@ using ChannelOut = asio::experimental::concurrent_channel<void(

// FIXME: use a lightweight structure instead (a real coroutine lock)
// Using bool for convenience, this can be any type actually.
// While using asio::steady_timer/grpc::Alarm appears to be a promising approach,
// there are some notable concerns:
// 1. We must pass an additional Haskell function to C++ in order to halt the
// Haskell handler when the timer expires. This is necessary because the
// memory pass to Haskell handler resides on the stack, and this additional
// step can introduce some performance overhead.
// 2. The cancellation function for the timer operates synchronously, which may
// not be as fast as I expected. Which means may not suitable for unsafe ffi.
// 3. Testing in real applications has revealed instances of DEADLINE_EXCEEDED
// (client) and segfaults(server).
// 4. Timer is also not lightweight enough.
// bool for convenience, this can be any type actually.
using CoroLock =
asio::experimental::concurrent_channel<void(asio::error_code, bool)>;

all: gen-cpp gen-python

cd tests/protos && mkdir -p ../gen-cpp && \
protoc --cpp_out ../gen-cpp --grpc_out ../gen-cpp \
--plugin=protoc-gen-grpc=/usr/local/bin/grpc_cpp_plugin \

mkdir -p tests/gen-py && \
python3 -m grpc_tools.protoc -I ./tests/protos \
--python_out=tests/gen-py \
--grpc_python_out=tests/gen-py \

rm -rf ./tests/gen-*
import Data.ProtoLens.Setup

main = defaultMainGeneratingProtos "protos"
import os
import asyncio
import grpc
import sys
import timeit

DIR = os.path.dirname(os.path.abspath(os.path.join(__file__, "..")))
sys.path.insert(0, os.path.join(DIR, "gen-py"))

import AsyncSchedule_pb2 as P
import AsyncSchedule_pb2_grpc as G

async def par_main_n(coro, host, port, n):
channel = grpc.aio.insecure_channel(f"{host}:{port}")
stub = G.ServiceStub(channel)

background_tasks = set()

for x in range(n):
task = asyncio.create_task(coro(stub))
# task.add_done_callback(background_tasks.discard)

for t in background_tasks:
await t

par = 0

async def slow_unary(stub):
req = P.Request(msg="x")
r = await stub.SlowUnary(req)
global par
par += 1
print("-> ", par, r.msg)

async def dep_unary(stub):
req = P.Request(msg="x")
r = await stub.DepUnary(req)
print("-> ", r.msg)

async def bidi(stub):
async def reqs():
for _ in range(2):
req = P.Request(msg="hi")
yield req
_call = stub.BidiStream(reqs())
count = 0
async for r in _call:
count += 1
print("=> ", count)

def test_concurrent_slow_unary(host, port):
server_delay = 1 # 1s
repeat = 10
t = timeit.timeit(
lambda:, host, port, 32)),
# t ~= (server_delay * repeat)
assert (server_delay * repeat - 1) <= t <= (server_delay * repeat + 1)

def test_interdependence_unary_stream(host, port):
# NOTE: the server handler is not thread-safe
async def tests(stub):
for _ in range(1000):
task1 = asyncio.create_task(bidi(stub))
task2 = asyncio.create_task(dep_unary(stub))
await task1
await task2

async def run_tests():
channel = grpc.aio.insecure_channel(f"{host}:{port}")
stub = G.ServiceStub(channel)
await asyncio.wait_for(tests(stub), timeout=5) # 5s seems enough

if __name__ == "__main__":
host = ""
port = 50051

test_concurrent_slow_unary(host, port)
test_interdependence_unary_stream(host, port)
module Main (main) where

import Control.Concurrent
import Control.Monad
import Data.Either
import Data.ProtoLens (defMessage)
import Lens.Micro
import System.IO.Unsafe

import HsGrpc.Common.Log
import HsGrpc.Server
import Proto.AsyncSchedule as P
import Proto.AsyncSchedule_Fields as P

main :: IO ()
main = do
let opts = defaultServerOpts
{ serverHost = ""
, serverPort = 50051
, serverParallelism = 1 -- Use 1 is to test concurrency. (For modern CPUs, it seems enough)
, serverOnStarted = Just onStarted
--gprSetLogVerbosity GprLogSeverityDebug
runServer opts $ handlers

onStarted :: IO ()
onStarted = putStrLn "Server listening on"

handlers :: [ServiceHandler]
handlers =
-- With using 'shortUnary', the test case should not pass.
[ unary (GRPC :: GRPC P.Service "slowUnary") handleSlowUnary
, unary (GRPC :: GRPC P.Service "depUnary") handleDepUnary
, bidiStream (GRPC :: GRPC P.Service "bidiStream") handleBidiStream

handleSlowUnary :: UnaryHandler P.Request P.Reply
handleSlowUnary _ctx _req = do
putStrLn "-> Put"
threadDelay 1000000
putStrLn "-> Put done"
pure $ defMessage & P.msg .~ "hi"

-- NOTE: not thread-safe
notifyMVar :: MVar ()
notifyMVar = unsafePerformIO $ newEmptyMVar
{-# NOINLINE notifyMVar #-}

exitedMVar :: MVar ()
exitedMVar = unsafePerformIO $ newEmptyMVar
{-# NOINLINE exitedMVar #-}

handleDepUnary :: UnaryHandler P.Request P.Reply
handleDepUnary _ctx req = do
putStrLn "-> Notify stream exit"
putMVar notifyMVar ()
putStrLn "-> Wait stream exit"
void $ takeMVar exitedMVar
pure $ defMessage & P.msg .~ "done"

handleBidiStream :: BidiStreamHandler P.Request P.Reply ()
handleBidiStream _ctx stream = do
m_req <- streamRead stream
case m_req of
Just req -> do
putStrLn $ "-> Wait exit notification"
_ <- takeMVar notifyMVar
putStrLn $ "-> Put exited notification"
putMVar exitedMVar ()
let reply = defMessage & P.msg .~ ("hi, " <> req ^. P.msg)
r <- streamWrite stream (Just reply)
putStrLn $ "-> Write response " <> show r
Nothing -> putStrLn "Client closed"
cabal-version: 2.4
name: hs-grpc-tests
synopsis: Tests for hs-grpc
Please see the README on Github at <>

build-type: Custom
extra-source-files: protos/*.proto

, base >=4.5 && <5
, Cabal >=2.4 && <4
, proto-lens-setup ^>=0.4

hs-source-dirs: .
, base >=4.13 && <5
, proto-lens-runtime



default-language: GHC2021

common common-exe
hs-source-dirs: cases
, base >=4.13 && <5
, bytestring
, hs-grpc-server
, hs-grpc-tests
, microlens
, proto-lens
, proto-lens-runtime
, text

default-language: GHC2021

-Wall -Wcompat -Widentities -Wincomplete-record-updates
-Wincomplete-uni-patterns -Wpartial-fields -Wredundant-constraints
-threaded -rtsopts -with-rtsopts=-N

executable AsyncSchedule001_server
import: common-exe
main-is: AsyncSchedule001_server.hs
syntax = "proto3";

package test;

service Service {
rpc SlowUnary(Request) returns (Reply) {}
rpc DepUnary(Request) returns (Reply) {}
rpc BidiStream(stream Request) returns (stream Reply) {}

message Request {
string msg = 1;

message Reply {
string msg = 1;
