From 7739ae8cca7115953cf66a6a282b783f537aa64d Mon Sep 17 00:00:00 2001 From: mu <59917266+4eUeP@users.noreply.github.com> Date: Sun, 3 Sep 2023 19:29:47 +0800 Subject: [PATCH] Add AsyncSchedule tests --- .gitignore | 2 +- cabal.project | 1 + hs-grpc-server/include/hs_grpc_server.h | 15 +++- tests/Makefile | 17 +++++ tests/Setup.hs | 3 + tests/cases/AsyncSchedule001_client.py | 91 +++++++++++++++++++++++++ tests/cases/AsyncSchedule001_server.hs | 73 ++++++++++++++++++++ tests/hs-grpc-tests.cabal | 57 ++++++++++++++++ tests/protos/AsyncSchedule.proto | 17 +++++ tests/requirements.txt | 4 ++ 10 files changed, 278 insertions(+), 2 deletions(-) create mode 100644 tests/Makefile create mode 100644 tests/Setup.hs create mode 100644 tests/cases/AsyncSchedule001_client.py create mode 100644 tests/cases/AsyncSchedule001_server.hs create mode 100644 tests/hs-grpc-tests.cabal create mode 100644 tests/protos/AsyncSchedule.proto create mode 100644 tests/requirements.txt diff --git a/.gitignore b/.gitignore index 31e5e86..9464d4b 100644 --- a/.gitignore +++ b/.gitignore @@ -26,4 +26,4 @@ Local* *~ *.swp /core -gen-cpp +gen-* diff --git a/cabal.project b/cabal.project index cb3eeda..000803f 100644 --- a/cabal.project +++ b/cabal.project @@ -1,3 +1,4 @@ packages: ./hs-grpc-server/hs-grpc-server.cabal ./example/hs-grpc-example.cabal + ./tests/hs-grpc-tests.cabal diff --git a/hs-grpc-server/include/hs_grpc_server.h b/hs-grpc-server/include/hs_grpc_server.h index cf64690..6b4e3a6 100644 --- a/hs-grpc-server/include/hs_grpc_server.h +++ b/hs-grpc-server/include/hs_grpc_server.h @@ -17,7 +17,20 @@ using ChannelOut = asio::experimental::concurrent_channel; diff --git a/tests/Makefile b/tests/Makefile new file mode 100644 index 0000000..2d3987d --- /dev/null +++ b/tests/Makefile @@ -0,0 +1,17 @@ +all: gen-cpp gen-python + +gen-cpp: + cd tests/protos && mkdir -p ../gen-cpp && \ + protoc --cpp_out ../gen-cpp --grpc_out ../gen-cpp \ + --plugin=protoc-gen-grpc=/usr/local/bin/grpc_cpp_plugin \ + *.proto + +gen-python: + mkdir -p tests/gen-py && \ + python3 -m grpc_tools.protoc -I ./tests/protos \ + --python_out=tests/gen-py \ + --grpc_python_out=tests/gen-py \ + ./tests/protos/*.proto + +clean: + rm -rf ./tests/gen-* diff --git a/tests/Setup.hs b/tests/Setup.hs new file mode 100644 index 0000000..f206957 --- /dev/null +++ b/tests/Setup.hs @@ -0,0 +1,3 @@ +import Data.ProtoLens.Setup + +main = defaultMainGeneratingProtos "protos" diff --git a/tests/cases/AsyncSchedule001_client.py b/tests/cases/AsyncSchedule001_client.py new file mode 100644 index 0000000..39eab54 --- /dev/null +++ b/tests/cases/AsyncSchedule001_client.py @@ -0,0 +1,91 @@ +import os +import asyncio +import grpc +import sys +import timeit + +DIR = os.path.dirname(os.path.abspath(os.path.join(__file__, ".."))) +sys.path.insert(0, os.path.join(DIR, "gen-py")) + +import AsyncSchedule_pb2 as P +import AsyncSchedule_pb2_grpc as G + + +async def par_main_n(coro, host, port, n): + channel = grpc.aio.insecure_channel(f"{host}:{port}") + stub = G.ServiceStub(channel) + + background_tasks = set() + + for x in range(n): + task = asyncio.create_task(coro(stub)) + background_tasks.add(task) + # task.add_done_callback(background_tasks.discard) + + for t in background_tasks: + await t + + +par = 0 + + +async def slow_unary(stub): + req = P.Request(msg="x") + r = await stub.SlowUnary(req) + global par + par += 1 + print("-> ", par, r.msg) + + +async def dep_unary(stub): + req = P.Request(msg="x") + r = await stub.DepUnary(req) + print("-> ", r.msg) + + +async def bidi(stub): + async def reqs(): + for _ in range(2): + req = P.Request(msg="hi") + yield req + _call = stub.BidiStream(reqs()) + count = 0 + async for r in _call: + count += 1 + print("=> ", count) + + +def test_concurrent_slow_unary(host, port): + server_delay = 1 # 1s + repeat = 10 + t = timeit.timeit( + lambda: asyncio.run(par_main_n(slow_unary, host, port, 32)), + number=repeat, + ) + # t ~= (server_delay * repeat) + assert (server_delay * repeat - 1) <= t <= (server_delay * repeat + 1) + + +def test_interdependence_unary_stream(host, port): + # NOTE: the server handler is not thread-safe + async def tests(stub): + for _ in range(1000): + task1 = asyncio.create_task(bidi(stub)) + task2 = asyncio.create_task(dep_unary(stub)) + await task1 + await task2 + + async def run_tests(): + channel = grpc.aio.insecure_channel(f"{host}:{port}") + stub = G.ServiceStub(channel) + await asyncio.wait_for(tests(stub), timeout=5) # 5s seems enough + + asyncio.run(run_tests()) + + +if __name__ == "__main__": + host = "127.0.0.1" + port = 50051 + + test_concurrent_slow_unary(host, port) + test_interdependence_unary_stream(host, port) diff --git a/tests/cases/AsyncSchedule001_server.hs b/tests/cases/AsyncSchedule001_server.hs new file mode 100644 index 0000000..2dd3944 --- /dev/null +++ b/tests/cases/AsyncSchedule001_server.hs @@ -0,0 +1,73 @@ +module Main (main) where + +import Control.Concurrent +import Control.Monad +import Data.Either +import Data.ProtoLens (defMessage) +import Lens.Micro +import System.IO.Unsafe + +import HsGrpc.Common.Log +import HsGrpc.Server +import Proto.AsyncSchedule as P +import Proto.AsyncSchedule_Fields as P + +main :: IO () +main = do + let opts = defaultServerOpts + { serverHost = "0.0.0.0" + , serverPort = 50051 + , serverParallelism = 1 -- Use 1 is to test concurrency. (For modern CPUs, it seems enough) + , serverOnStarted = Just onStarted + } + --gprSetLogVerbosity GprLogSeverityDebug + runServer opts $ handlers + +onStarted :: IO () +onStarted = putStrLn "Server listening on 0.0.0.0:50051" + +handlers :: [ServiceHandler] +handlers = + -- With using 'shortUnary', the test case should not pass. + [ unary (GRPC :: GRPC P.Service "slowUnary") handleSlowUnary + , unary (GRPC :: GRPC P.Service "depUnary") handleDepUnary + , bidiStream (GRPC :: GRPC P.Service "bidiStream") handleBidiStream + ] + +handleSlowUnary :: UnaryHandler P.Request P.Reply +handleSlowUnary _ctx _req = do + putStrLn "-> Put" + threadDelay 1000000 + putStrLn "-> Put done" + pure $ defMessage & P.msg .~ "hi" + +-- NOTE: not thread-safe +notifyMVar :: MVar () +notifyMVar = unsafePerformIO $ newEmptyMVar +{-# NOINLINE notifyMVar #-} + +exitedMVar :: MVar () +exitedMVar = unsafePerformIO $ newEmptyMVar +{-# NOINLINE exitedMVar #-} + +handleDepUnary :: UnaryHandler P.Request P.Reply +handleDepUnary _ctx req = do + putStrLn "-> Notify stream exit" + putMVar notifyMVar () + putStrLn "-> Wait stream exit" + void $ takeMVar exitedMVar + pure $ defMessage & P.msg .~ "done" + +handleBidiStream :: BidiStreamHandler P.Request P.Reply () +handleBidiStream _ctx stream = do + m_req <- streamRead stream + case m_req of + Just req -> do + putStrLn $ "-> Wait exit notification" + _ <- takeMVar notifyMVar + putStrLn $ "-> Put exited notification" + putMVar exitedMVar () + let reply = defMessage & P.msg .~ ("hi, " <> req ^. P.msg) + r <- streamWrite stream (Just reply) + putStrLn $ "-> Write response " <> show r + Nothing -> putStrLn "Client closed" diff --git a/tests/hs-grpc-tests.cabal b/tests/hs-grpc-tests.cabal new file mode 100644 index 0000000..c2d5fd9 --- /dev/null +++ b/tests/hs-grpc-tests.cabal @@ -0,0 +1,57 @@ +cabal-version: 2.4 +name: hs-grpc-tests +version: 0.1.0.0 +synopsis: Tests for hs-grpc +description: + Please see the README on Github at + +build-type: Custom +extra-source-files: protos/*.proto + +custom-setup + setup-depends: + , base >=4.5 && <5 + , Cabal >=2.4 && <4 + , proto-lens-setup ^>=0.4 + +library + hs-source-dirs: . + build-depends: + , base >=4.13 && <5 + , proto-lens-runtime + + exposed-modules: + Proto.AsyncSchedule + Proto.AsyncSchedule_Fields + + autogen-modules: + Proto.AsyncSchedule + Proto.AsyncSchedule_Fields + + default-language: GHC2021 + +common common-exe + hs-source-dirs: cases + build-depends: + , base >=4.13 && <5 + , bytestring + , hs-grpc-server + , hs-grpc-tests + , microlens + , proto-lens + , proto-lens-runtime + , text + + default-language: GHC2021 + default-extensions: + DataKinds + OverloadedStrings + + ghc-options: + -Wall -Wcompat -Widentities -Wincomplete-record-updates + -Wincomplete-uni-patterns -Wpartial-fields -Wredundant-constraints + -threaded -rtsopts -with-rtsopts=-N + +executable AsyncSchedule001_server + import: common-exe + main-is: AsyncSchedule001_server.hs diff --git a/tests/protos/AsyncSchedule.proto b/tests/protos/AsyncSchedule.proto new file mode 100644 index 0000000..d9d1f26 --- /dev/null +++ b/tests/protos/AsyncSchedule.proto @@ -0,0 +1,17 @@ +syntax = "proto3"; + +package test; + +service Service { + rpc SlowUnary(Request) returns (Reply) {} + rpc DepUnary(Request) returns (Reply) {} + rpc BidiStream(stream Request) returns (stream Reply) {} +} + +message Request { + string msg = 1; +} + +message Reply { + string msg = 1; +} diff --git a/tests/requirements.txt b/tests/requirements.txt new file mode 100644 index 0000000..1c18fde --- /dev/null +++ b/tests/requirements.txt @@ -0,0 +1,4 @@ +grpcio==1.57.0 +grpcio-tools==1.57.0 +protobuf==4.24.2 +pytest==7.4.1