Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix wrong marshalling for call17 & eval responses + add support for unknown request types and response body keys #77

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions binpacket.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func (pp *BinaryPacket) Reset() {
pp.packet.SchemaID = 0
pp.packet.requestID = 0
pp.packet.Result = nil
pp.packet.opts = PacketOpts{}
pp.body = pp.body[:0]
}

Expand Down
17 changes: 15 additions & 2 deletions packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import (
"github.com/tinylib/msgp/msgp"
)

type PacketOpts struct {
asQuery bool
}

type Packet struct {
Cmd uint
LSN uint64
Expand All @@ -16,6 +20,15 @@ type Packet struct {
Timestamp time.Time
Request Query
Result *Result

opts PacketOpts
}

// AsQuery forces packet to be unmarshaled as query even if it's not supported.
func (pack *Packet) AsQuery() *Packet {
pack.opts.asQuery = true

return pack
}

func (pack *Packet) String() string {
Expand Down Expand Up @@ -114,7 +127,7 @@ func (pack *Packet) UnmarshalBinaryBody(data []byte) (buf []byte, err error) {
return unpackr(pack.Cmd^ErrorFlag, data)
}

if q := NewQuery(pack.Cmd); q != nil {
if q := NewQuery(pack.Cmd); IsKnownQuery(q) || pack.opts.asQuery {
return unpackq(q, data)
}
return unpackr(OKCommand, data)
Expand All @@ -128,7 +141,7 @@ func (pack *Packet) UnmarshalBinary(data []byte) error {

// UnmarshalMsg implements msgp.Unmarshaler
func (pack *Packet) UnmarshalMsg(data []byte) (buf []byte, err error) {
*pack = Packet{}
*pack = Packet{opts: pack.opts}

buf = data

Expand Down
2 changes: 1 addition & 1 deletion query.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ func NewQuery(cmd uint) Query {
case EvalCommand:
return &Eval{}
default:
return nil
return NewUnknownQuery(cmd)
}
}
51 changes: 51 additions & 0 deletions result.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@ package tarantool

import (
"fmt"

"github.com/tinylib/msgp/msgp"
)

type Result struct {
RawBytes []byte

ErrorCode uint
Error error
Data [][]interface{}

marshaller msgp.Marshaler
}

func (r *Result) GetCommandID() uint {
Expand All @@ -20,6 +25,32 @@ func (r *Result) GetCommandID() uint {

// MarshalMsg implements msgp.Marshaler
func (r *Result) MarshalMsg(b []byte) (o []byte, err error) {
if r.marshaller == nil {
r.marshaller = defaultResultMarshaller{Result: r}
}
return r.marshaller.MarshalMsg(b)
}

// WithBytesMarshaller changes the marshaller for result serialization.
//
// Current implementation of unmarshaller may change structure of the result (e.g. in call17)
// if it's not array of tuples in which case it's forcefully wrapped. It also skips
// unknown keys. Therefore serialized sequence of bytes produced by the default marshaller
// is different from the incoming.
//
// Bytes marshaller on the other hand returns exactly the same array
// the result was successfully unmarshalled from (preserving all the keys of the body
// including unknown ones). But it won't reflect any manual changes of unmarshalled data.
func (r *Result) WithBytesMarshaller() *Result {
r.marshaller = bytesResultMarshaller{Result: r}
return r
}

type defaultResultMarshaller struct {
*Result
}

func (r defaultResultMarshaller) MarshalMsg(b []byte) (o []byte, err error) {
o = b
if r.Error != nil {
o = msgp.AppendMapHeader(o, 1)
Expand All @@ -40,6 +71,14 @@ func (r *Result) MarshalMsg(b []byte) (o []byte, err error) {
return o, nil
}

type bytesResultMarshaller struct {
*Result
}

func (r bytesResultMarshaller) MarshalMsg(b []byte) (o []byte, err error) {
return append(b, r.RawBytes...), nil
}

// UnmarshalMsg implements msgp.Unmarshaler
func (r *Result) UnmarshalMsg(data []byte) (buf []byte, err error) {
var l uint32
Expand All @@ -53,6 +92,15 @@ func (r *Result) UnmarshalMsg(data []byte) (buf []byte, err error) {
if len(buf) == 0 && r.ErrorCode == OKCommand {
return buf, nil
}

defer func() {
if err == nil {
rawPacketLength := len(data) - len(buf)
r.RawBytes = make([]byte, rawPacketLength)
copy(r.RawBytes, data[:rawPacketLength])
}
}()

l, buf, err = msgp.ReadMapHeaderBytes(buf)

if err != nil {
Expand Down Expand Up @@ -96,18 +144,21 @@ func (r *Result) UnmarshalMsg(data []byte) (buf []byte, err error) {
}
}
}

case KeyError:
errorMessage, buf, err = msgp.ReadStringBytes(buf)
if err != nil {
return
}
r.Error = NewQueryError(r.ErrorCode, errorMessage)

default:
if buf, err = msgp.Skip(buf); err != nil {
return
}
}
}

return
}

Expand Down
47 changes: 47 additions & 0 deletions result_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package tarantool

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestResultMarshaling(t *testing.T) {
var result Result

// The result of a call17 to:
// function a()
// return "a"
// end
tntBodyBytes := []byte{
0x81, // MP_MAP
0x30, // key IPROTO_DATA
0xdd, 0x0, 0x0, 0x0, 0x1, // MP_ARRAY
0xa1, 0x61, // string value "a"
}

expectedDefaultMarshalerBytes := []byte{
0x81, // MP_MAP
0x30, // key IPROTO_DATA
0x91, // MP_ARRAY
0x91, // MP_ARRAY
0xa1, 0x61, // string value "a"
}

buf, err := result.UnmarshalMsg(tntBodyBytes)
assert.NoError(t, err, "error unmarshaling result")
assert.Empty(t, buf, "unmarshaling residual buffer is not empty")

defaultMarshalerRes, err := result.MarshalMsg(nil)
assert.NoError(t, err, "error marshaling by default marshaller")
assert.Equal(
t,
expectedDefaultMarshalerBytes,
defaultMarshalerRes,
)

result.WithBytesMarshaller()
bytesMarshalerRes, err := result.MarshalMsg(nil)
assert.NoError(t, err, "error marshaling by bytes marshaller")
assert.Equal(t, tntBodyBytes, bytesMarshalerRes)
}
8 changes: 8 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,15 @@ type IprotoServer struct {
schemaID uint64
wg sync.WaitGroup
getPingStatus func(*IprotoServer) uint

// asQueryServer forces incoming requests to be parsed as queries
asQueryServer bool
}

type IprotoServerOptions struct {
Perf PerfCount
GetPingStatus func(*IprotoServer) uint
AsQueryServer bool
}

func NewIprotoServer(uuid string, handler QueryHandler, onShutdown OnShutdownCallback) *IprotoServer {
Expand All @@ -58,6 +62,7 @@ func (s *IprotoServer) WithOptions(opts *IprotoServerOptions) *IprotoServer {
opts = &IprotoServerOptions{}
}
s.perf = opts.Perf
s.asQueryServer = opts.AsQueryServer
s.getPingStatus = opts.GetPingStatus
if s.getPingStatus == nil {
s.getPingStatus = func(*IprotoServer) uint { return 0 }
Expand Down Expand Up @@ -216,6 +221,9 @@ READER_LOOP:
wg.Add(1)
go func(pp *BinaryPacket) {
packet := &pp.packet
if s.asQueryServer {
packet.AsQuery()
}
defer wg.Done()

err := packet.UnmarshalBinary(pp.body)
Expand Down
36 changes: 36 additions & 0 deletions unknown_query.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package tarantool

type UnknownQuery struct {
cmd uint
data []byte
}

var _ Query = (*UnknownQuery)(nil)

func NewUnknownQuery(cmd uint) *UnknownQuery {
return &UnknownQuery{cmd: cmd}
}

func (q *UnknownQuery) GetCommandID() uint {
return q.cmd
}

func (q *UnknownQuery) MarshalMsg(b []byte) ([]byte, error) {
return append(b, q.data...), nil
}

// UnmarshalMsg saves all of the data into the query.
// So make sure it doesn't contain part of another packet.
func (q *UnknownQuery) UnmarshalMsg(data []byte) (buf []byte, err error) {
q.data = make([]byte, len(data))
copy(q.data, data)

return nil, nil
}

// IsKnownQuery returns true if passed query is known and supported.
func IsKnownQuery(q Query) bool {
_, unknown := q.(*UnknownQuery)

return q != nil && !unknown
}