Skip to content

Commit

Permalink
♻️ Stop handling EOF explicitly for bidi (#11)
Browse files Browse the repository at this point in the history
Stop handling EOF explicitly for bidi streaming as it adds a lot of noise
to the .jsonnet files.

We will worry about EOF handling and possibly exposing it via config when
we need it.

Pull-Request: #11
  • Loading branch information
juliaogris authored Dec 19, 2021
1 parent 4a660a8 commit 01f718b
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 32 deletions.
49 changes: 27 additions & 22 deletions serve/method.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,14 @@ func (m method) fullMethod() string {
}

func (m method) call(ss grpc.ServerStream) error {
if m.desc.IsStreamingClient() {
switch {
case m.desc.IsStreamingClient() && m.desc.IsStreamingServer():
return m.streamingBidiCall(ss)
case m.desc.IsStreamingClient():
return m.streamingClientCall(ss)
default: // handle both unary and streaming-server
return m.unaryClientCall(ss)
}
return m.unaryClientCall(ss)
}

func (m method) unaryClientCall(ss grpc.ServerStream) error {
Expand Down Expand Up @@ -69,11 +73,26 @@ func (m method) streamingClientCall(ss grpc.ServerStream) error {
}
break
}
if !m.desc.IsStreamingServer() {
// For client-streaming, we just collect all the messages on
// the input stream to pass once to jsonnet.
stream = append(stream, msg)
continue
stream = append(stream, msg)
}

input, err := makeStreamingInputJSON(stream, md)
if err != nil {
return err
}

return m.evalJsonnet(input, ss)
}

func (m method) streamingBidiCall(ss grpc.ServerStream) error {
md, _ := metadata.FromIncomingContext(ss.Context())
for {
msg := dynamicpb.NewMessage(m.desc.Input())
if err := ss.RecvMsg(msg); err != nil {
if !errors.Is(err, io.EOF) {
return err
}
break
}

// For bidirectional streaming, we call jsonnet once for each message
Expand All @@ -86,21 +105,7 @@ func (m method) streamingClientCall(ss grpc.ServerStream) error {
return err
}
}

var input string
var err error
if m.desc.IsStreamingServer() {
// For bidirectional streaming, call jsonnet one last time with a null
// request so it knows end-of-stream has been reached.
input, err = makeInputJSON(nil, md)
} else {
input, err = makeStreamingInputJSON(stream, md)
}
if err != nil {
return err
}

return m.evalJsonnet(input, ss)
return nil
}

func (m method) evalJsonnet(input string, ss grpc.ServerStream) error {
Expand Down
15 changes: 5 additions & 10 deletions testdata/greet.Greeter.HelloBidiStream.jsonnet
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
function(input)
if input.request != null && input.request.firstName == 'Bart' then
if input.request.firstName != 'Bart' then
{
stream: [{ greeting: '💃 jig [bidi]: Hello ' + input.request.firstName }],
}
else // input.request.firstName == 'Bart' then
{
status: {
code: 3, // InvalidArgument
Expand All @@ -11,12 +15,3 @@ function(input)
eat: ['his', 'shorts'],
},
}
else
{
local response =
if input.request == null then
[]
else
[{ greeting: '💃 jig [bidi]: Hello ' + input.request.firstName }],
stream: response,
}

0 comments on commit 01f718b

Please sign in to comment.