diff --git a/replayer-agent/logic/match/match.go b/replayer-agent/logic/match/match.go index 59b1b08..c994ec3 100644 --- a/replayer-agent/logic/match/match.go +++ b/replayer-agent/logic/match/match.go @@ -13,7 +13,9 @@ import ( "github.com/didi/sharingan/replayer-agent/model/protocol" "github.com/didi/sharingan/replayer-agent/model/recording" "github.com/didi/sharingan/replayer-agent/model/replaying" + "github.com/didi/sharingan/replayer-agent/utils/fastcgi" "github.com/didi/sharingan/replayer-agent/utils/helper" + "github.com/didi/sharingan/replayer-agent/utils/protocol/pthrift" ) var expect100 = []byte("Expect: 100-continue") @@ -126,6 +128,10 @@ func (m *Matcher) DoMatchOutboundTalk( m.MaxMatchedIndex = maxScoreIndex } m.Visited[maxScoreIndex] = true + + if bytes.HasPrefix(session.CallFromInbound.Raw, fastcgi.FastCGIRequestHeader) { + pthrift.ReplaceSequenceId(request, session.CallOutbounds[maxScoreIndex]) + } return maxScoreIndex, mark, session.CallOutbounds[maxScoreIndex] } diff --git a/replayer-agent/logic/outbound/connection.go b/replayer-agent/logic/outbound/connection.go index d1c810f..845d51b 100644 --- a/replayer-agent/logic/outbound/connection.go +++ b/replayer-agent/logic/outbound/connection.go @@ -107,6 +107,11 @@ func (cs *ConnState) readRequest(ctx context.Context) ([]byte, error) { request := pool.GetBuf(81920, true) + // SetReadDeadline sets the deadline for future Read calls + // and any currently-blocked Read call. + // A zero value for t means Read will not time out. + cs.conn.SetReadDeadline(time.Time{}) + bytesRead, err := cs.conn.Read(buf) if err != nil { return nil, err @@ -153,9 +158,11 @@ func (cs *ConnState) match(ctx context.Context, request []byte) error { cs.Handler = loadHandler(ctx, string(cs.traceID)) if cs.Handler == nil { tlog.Handler.Warnf(ctx, tlog.DebugTag, "errmsg=find Handler failed||request=%s||traceID=%s", quotedRequest, string(cs.traceID)) - return errors.New("find Handler failed") + return nil } + ctx = cs.Handler.Ctx + // 去掉COM_STMT_CLOSE if request = removeMysqlStmtClose(request); len(request) == 0 { return nil diff --git a/replayer-agent/utils/protocol/helper/helper.go b/replayer-agent/utils/protocol/helper/helper.go index e285bef..74d7197 100644 --- a/replayer-agent/utils/protocol/helper/helper.go +++ b/replayer-agent/utils/protocol/helper/helper.go @@ -1,6 +1,8 @@ package helper import ( + "bytes" + "encoding/binary" "encoding/json" "fmt" "strconv" @@ -43,3 +45,24 @@ func convertModelMap2GeneralMap(m model.Map) map[string]interface{} { } return result } + +func IntToBytes(n int,b byte) ([]byte,error) { + switch b { + case 1: + tmp := int8(n) + bytesBuffer := bytes.NewBuffer([]byte{}) + binary.Write(bytesBuffer, binary.BigEndian, &tmp) + return bytesBuffer.Bytes(),nil + case 2: + tmp := int16(n) + bytesBuffer := bytes.NewBuffer([]byte{}) + binary.Write(bytesBuffer, binary.BigEndian, &tmp) + return bytesBuffer.Bytes(),nil + case 3,4: + tmp := int32(n) + bytesBuffer := bytes.NewBuffer([]byte{}) + binary.Write(bytesBuffer, binary.BigEndian, &tmp) + return bytesBuffer.Bytes(),nil + } + return nil,fmt.Errorf("IntToBytes b param is invaild") +} diff --git a/replayer-agent/utils/protocol/pthrift/packet.go b/replayer-agent/utils/protocol/pthrift/packet.go index 9ea0fe0..6b8f831 100644 --- a/replayer-agent/utils/protocol/pthrift/packet.go +++ b/replayer-agent/utils/protocol/pthrift/packet.go @@ -4,6 +4,8 @@ import ( "bytes" "errors" + "github.com/didi/sharingan/replayer-agent/model/recording" + "github.com/didi/sharingan/replayer-agent/utils/protocol/helper" "github.com/modern-go/parse" "github.com/modern-go/parse/model" ) @@ -407,3 +409,31 @@ func decodeFieldValueCompact(src *parse.Source, vType CompactKind) (interface{}, return nil, errInvalidFeildType } } + +func ReplaceSequenceId(request []byte, callOutbound *recording.CallOutbound) { + if len(request) <= 4 { + return + } + thrift, DecErr := DecodeBinary(request[4:]) + if DecErr != nil { + return + } + + seqId := thrift["sequence_id"] + seqIdBytes ,err := helper.IntToBytes(seqId.(int), 4) + if err != nil { + return + } + + name := thrift["name"].(string) + nameBytes := []byte(name) + begin := bytes.LastIndex(callOutbound.Response, nameBytes) + if begin == -1 { + return + } + + begin += len(nameBytes) + for i, v := range seqIdBytes { + callOutbound.Response[begin+i] = v + } +} diff --git a/replayer-agent/utils/protocol/pthrift/packet_test.go b/replayer-agent/utils/protocol/pthrift/packet_test.go index 10cb9be..09d6d46 100644 --- a/replayer-agent/utils/protocol/pthrift/packet_test.go +++ b/replayer-agent/utils/protocol/pthrift/packet_test.go @@ -2,6 +2,7 @@ package pthrift import ( "bytes" + "reflect" "testing" "github.com/modern-go/parse" @@ -264,3 +265,35 @@ func TestDecodeCompact(t *testing.T) { should.Equal(tc.expect, actual, "case #%d fail", idx) } } + +func TestIntToBytes(t *testing.T) { + type args struct { + n int + b byte + } + tests := []struct { + name string + args args + want []byte + wantErr bool + }{ + { + name: "1", + args: args{n: 10, b: 4}, + want: []byte{0, 0, 0, 0x0a}, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := IntToBytes(tt.args.n, tt.args.b) + if (err != nil) != tt.wantErr { + t.Errorf("IntToBytes() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("IntToBytes() got = %v, want %v", got, tt.want) + } + }) + } +}