Skip to content

Commit

Permalink
server: remove unparsed body from request and message
Browse files Browse the repository at this point in the history
Signed-off-by: Vasiliy Tolstov <[email protected]>
  • Loading branch information
vtolstov committed Mar 21, 2022
1 parent 74633f4 commit 4904cad
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 12 deletions.
6 changes: 1 addition & 5 deletions server/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,14 @@ func (r *rpcMessage) Topic() string {
return r.topic
}

func (r *rpcMessage) Payload() interface{} {
func (r *rpcMessage) Body() interface{} {
return r.payload
}

func (r *rpcMessage) Header() metadata.Metadata {
return r.header
}

func (r *rpcMessage) Body() []byte {
return r.body
}

func (r *rpcMessage) Codec() codec.Codec {
return r.codec
}
4 changes: 1 addition & 3 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,11 @@ type Message interface {
// Topic of the message
Topic() string
// The decoded payload value
Payload() interface{}
Body() interface{}
// The content type of the payload
ContentType() string
// The raw headers of the message
Header() metadata.Metadata
// The raw body of the message
Body() []byte
// Codec used to decode the message
Codec() codec.Codec
}
Expand Down
7 changes: 3 additions & 4 deletions server/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (n *noopServer) newBatchSubHandler(sb *subscriber, opts Options) broker.Bat
return err
}
rb := reflect.New(req.Type().Elem())
if err = cf.ReadBody(bytes.NewReader(msg.Body()), rb.Interface()); err != nil {
if err = cf.ReadBody(bytes.NewReader(msg.(*rpcMessage).body), rb.Interface()); err != nil {
return err
}
msg.(*rpcMessage).codec = cf
Expand All @@ -269,7 +269,7 @@ func (n *noopServer) newBatchSubHandler(sb *subscriber, opts Options) broker.Bat
}
payloads := reflect.MakeSlice(reqType, 0, len(ms))
for _, m := range ms {
payloads = reflect.Append(payloads, reflect.ValueOf(m.Payload()))
payloads = reflect.Append(payloads, reflect.ValueOf(m.Body()))
}
vals = append(vals, payloads)

Expand Down Expand Up @@ -381,7 +381,7 @@ func (n *noopServer) newSubHandler(sb *subscriber, opts Options) broker.Handler
vals = append(vals, reflect.ValueOf(ctx))
}

vals = append(vals, reflect.ValueOf(msg.Payload()))
vals = append(vals, reflect.ValueOf(msg.Body()))

returnValues := handler.method.Call(vals)
if rerr := returnValues[0].Interface(); rerr != nil {
Expand All @@ -406,7 +406,6 @@ func (n *noopServer) newSubHandler(sb *subscriber, opts Options) broker.Handler
contentType: ct,
payload: req.Interface(),
header: msg.Header,
body: msg.Body,
})
results <- cerr
}()
Expand Down

0 comments on commit 4904cad

Please sign in to comment.