diff --git a/package.json b/package.json index 4c4af73..8149ba1 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@memelabs/protobuf", - "version": "0.3.3", + "version": "0.3.4", "description": "", "license": "MIT", "repository": { diff --git a/pkg/rpc/http_server.go b/pkg/rpc/http_server.go index f64fb71..ffc4de0 100644 --- a/pkg/rpc/http_server.go +++ b/pkg/rpc/http_server.go @@ -2,11 +2,13 @@ package rpc import ( "context" - "io/ioutil" + "encoding/binary" + "io" "net/http" "strconv" pb "github.com/MemeLabs/protobuf/pkg/apis/rpc" + "github.com/MemeLabs/protobuf/pkg/bytereader" "go.uber.org/zap" "google.golang.org/protobuf/proto" ) @@ -24,24 +26,32 @@ type HTTPServer struct { } func (s *HTTPServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { - b, err := ioutil.ReadAll(r.Body) + l, err := binary.ReadUvarint(bytereader.New(r.Body)) if err != nil { httpServeError(http.StatusBadRequest, err, w) return } + b := make([]byte, l) + if _, err := io.ReadAtLeast(r.Body, b, int(l)); err != nil { + httpServeError(http.StatusBadRequest, err, w) + return + } + req := &pb.Call{} if err := proto.Unmarshal(b, req); err != nil { httpServeError(http.StatusBadRequest, err, w) return } + ctx, cancel := context.WithCancel(r.Context()) send := func(_ context.Context, res *pb.Call) error { return httpServeProto(res, w) } - call := NewCallIn(r.Context(), req, noopParentCallAccessor{}, send) + call := NewCallIn(ctx, req, noopParentCallAccessor{}, send) - s.ServiceDispatcher.Dispatch(call, func() {}) + s.ServiceDispatcher.Dispatch(call, cancel) + <-ctx.Done() } func httpServeError(statusCode int, err error, w http.ResponseWriter) error { diff --git a/pkg/rpc/server.go b/pkg/rpc/server.go index c7d8404..6f9e4b2 100644 --- a/pkg/rpc/server.go +++ b/pkg/rpc/server.go @@ -96,6 +96,7 @@ func (h *ServiceDispatcher) call(call *CallIn, done func()) { method, ok := h.methods[call.Method()] if !ok { call.returnError(fmt.Errorf("method not found: %s", call.Method())) + done() return }