From bb2bb2b97704de9e6f398ff2c5f6b81e96058a50 Mon Sep 17 00:00:00 2001 From: 43193589+WeiXinChan Date: Mon, 20 Nov 2023 21:47:15 +0800 Subject: [PATCH 01/19] insertUp return insert or update operation --- client/batch_result_test.go | 2 +- client/client.go | 5 +++ client/obbatch.go | 2 +- client/obclient.go | 37 +++++++++++++++++----- client/single_result.go | 25 +++++++++++++-- client/single_result_test.go | 2 +- protocol/operation_response.go | 17 +++++++++- test/single/double_table_test.go | 43 +++++++++++++++++++++++++ test/single/int32_table_test.go | 49 +++++++++++++++++++++++++++++ test/single/varbinary_table_test.go | 45 +++++++++++++++++++++++++- test/single/varchar_table_test.go | 45 +++++++++++++++++++++++++- 11 files changed, 257 insertions(+), 15 deletions(-) diff --git a/client/batch_result_test.go b/client/batch_result_test.go index e2ca190..1007c6b 100644 --- a/client/batch_result_test.go +++ b/client/batch_result_test.go @@ -24,7 +24,7 @@ import ( ) func TestObBatchOperationResult_Size(t *testing.T) { - result := newObSingleResult(1, nil) + result := newObSingleResult(1, nil, 0) emptyResult := make([]SingleResult, 0) batchResult := newObBatchOperationResult(emptyResult) assert.EqualValues(t, 0, batchResult.Size()) diff --git a/client/client.go b/client/client.go index bf6bf69..db2a1bb 100644 --- a/client/client.go +++ b/client/client.go @@ -19,6 +19,7 @@ package client import ( "context" + "github.com/pkg/errors" "github.com/oceanbase/obkv-table-client-go/client/option" @@ -174,6 +175,10 @@ type Client interface { // InsertOrUpdate insert or update a record by rowKey. // insert if the primary key does not exist, update if it does. InsertOrUpdate(ctx context.Context, tableName string, rowKey []*table.Column, mutateColumns []*table.Column, opts ...option.ObOperationOption) (int64, error) + // InsertOrUpdateWithResult insert or update a record by rowKey. + // insert if the primary key does not exist, update if it does. + // IsInsertOrUpdateDoInsert() in SingleResult tells whether the insert operation has been performed. + InsertOrUpdateWithResult(ctx context.Context, tableName string, rowKey []*table.Column, mutateColumns []*table.Column, opts ...option.ObOperationOption) (SingleResult, error) // Replace a record by rowKey. Replace(ctx context.Context, tableName string, rowKey []*table.Column, mutateColumns []*table.Column, opts ...option.ObOperationOption) (int64, error) // Increment a record by rowKey. diff --git a/client/obbatch.go b/client/obbatch.go index 3995d0a..f23c290 100644 --- a/client/obbatch.go +++ b/client/obbatch.go @@ -413,7 +413,7 @@ func (b *obBatchExecutor) Execute(ctx context.Context) (BatchOperationResult, er // operationResponse2SingleResult convert operation response to single result. func operationResponse2SingleResult(res *protocol.ObTableOperationResponse) SingleResult { - return newObSingleResult(res.AffectedRows(), res.Entity()) + return newObSingleResult(res.AffectedRows(), res.Entity(), res.Flags()) } type obPartOp struct { diff --git a/client/obclient.go b/client/obclient.go index ef9da19..2d8be8b 100644 --- a/client/obclient.go +++ b/client/obclient.go @@ -20,12 +20,14 @@ package client import ( "context" "fmt" - "github.com/oceanbase/obkv-table-client-go/log" - "golang.org/x/sys/unix" "runtime" "strings" "time" + "golang.org/x/sys/unix" + + "github.com/oceanbase/obkv-table-client-go/log" + "github.com/pkg/errors" "github.com/oceanbase/obkv-table-client-go/client/option" @@ -360,6 +362,27 @@ func (c *obClient) InsertOrUpdate( return res.AffectedRows(), nil } +func (c *obClient) InsertOrUpdateWithResult( + ctx context.Context, + tableName string, + rowKey []*table.Column, + mutateColumns []*table.Column, + opts ...option.ObOperationOption) (SingleResult, error) { + operationOptions := c.getOperationOptions(opts...) + res, err := c.executeWithRetry( + ctx, + tableName, + protocol.ObTableOperationInsertOrUpdate, + rowKey, + mutateColumns, + operationOptions) + if err != nil { + return nil, errors.WithMessagef(err, "execute insert or update, tableName:%s, rowKey:%s, mutateColumns:%s", + tableName, table.ColumnsToString(rowKey), table.ColumnsToString(mutateColumns)) + } + return newObSingleResult(res.AffectedRows(), nil, res.Flags()), nil +} + func (c *obClient) Replace( ctx context.Context, tableName string, @@ -400,7 +423,7 @@ func (c *obClient) Increment( if err != nil { return nil, err } - return newObSingleResult(res.AffectedRows(), res.Entity()), nil + return newObSingleResult(res.AffectedRows(), res.Entity(), res.Flags()), nil } else { res, err := c.executeWithFilterAndRetry( ctx, @@ -412,7 +435,7 @@ func (c *obClient) Increment( if err != nil { return nil, err } - return newObSingleResult(res.AffectedRows(), nil), nil + return newObSingleResult(res.AffectedRows(), nil, 0), nil } } @@ -435,7 +458,7 @@ func (c *obClient) Append( if err != nil { return nil, err } - return newObSingleResult(res.AffectedRows(), res.Entity()), nil + return newObSingleResult(res.AffectedRows(), res.Entity(), res.Flags()), nil } else { res, err := c.executeWithFilterAndRetry( ctx, @@ -447,7 +470,7 @@ func (c *obClient) Append( if err != nil { return nil, err } - return newObSingleResult(res.AffectedRows(), nil), nil + return newObSingleResult(res.AffectedRows(), nil, 0), nil } } @@ -507,7 +530,7 @@ func (c *obClient) Get( if err != nil { return nil, err } - return newObSingleResult(res.AffectedRows(), res.Entity()), nil + return newObSingleResult(res.AffectedRows(), res.Entity(), res.Flags()), nil } func (c *obClient) Query(ctx context.Context, tableName string, rangePairs []*table.RangePair, opts ...option.ObQueryOption) (QueryResultIterator, error) { diff --git a/client/single_result.go b/client/single_result.go index a089cf0..33a4b9e 100644 --- a/client/single_result.go +++ b/client/single_result.go @@ -34,15 +34,24 @@ type SingleResult interface { Values() map[string]interface{} // RowKey get affected rowkey, only work in Increment and Append operation. RowKey() []interface{} + // IsInsertOrUpdateDoInsert InsertOrUpdate impl maybe do insert or do update, + // IsInsertOrUpdateDoInsert() to tell you which operation has done. + IsInsertOrUpdateDoInsert() bool + // IsInsertOrUpdateDoUpdate InsertOrUpdate impl maybe do insert or do update, + // IsInsertOrUpdateDoUpdate() to tell you which operation has done. + IsInsertOrUpdateDoUpdate() bool + // IsInsertOrUpdateDoPut InsertOrUpdate impl maybe do put when all columns are filled. + IsInsertOrUpdateDoPut() bool } -func newObSingleResult(affectedRows int64, affectedEntity *protocol.ObTableEntity) *obSingleResult { - return &obSingleResult{affectedRows, affectedEntity} +func newObSingleResult(affectedRows int64, affectedEntity *protocol.ObTableEntity, flags uint64) *obSingleResult { + return &obSingleResult{affectedRows, affectedEntity, flags} } type obSingleResult struct { affectedRows int64 affectedEntity *protocol.ObTableEntity + flags uint64 } func (r *obSingleResult) IsEmptySet() bool { @@ -105,3 +114,15 @@ func (r *obSingleResult) RowKey() []interface{} { } return res } + +func (r *obSingleResult) IsInsertOrUpdateDoInsert() bool { + return r.flags&protocol.IsInsertUpDoInsertMask != 0 +} + +func (r *obSingleResult) IsInsertOrUpdateDoUpdate() bool { + return !r.IsInsertOrUpdateDoInsert() && !r.IsInsertOrUpdateDoPut() +} + +func (r *obSingleResult) IsInsertOrUpdateDoPut() bool { + return r.flags&protocol.IsInsertUpDoPutMask != 0 +} diff --git a/client/single_result_test.go b/client/single_result_test.go index 30968fd..a54711b 100644 --- a/client/single_result_test.go +++ b/client/single_result_test.go @@ -26,7 +26,7 @@ import ( ) func TestSingleResult(t *testing.T) { - res := newObSingleResult(1, nil) + res := newObSingleResult(1, nil, 0) assert.EqualValues(t, 1, res.AffectedRows()) assert.Equal(t, nil, res.Value("c1")) diff --git a/protocol/operation_response.go b/protocol/operation_response.go index 356b57c..b13d4f5 100644 --- a/protocol/operation_response.go +++ b/protocol/operation_response.go @@ -24,6 +24,11 @@ import ( "github.com/oceanbase/obkv-table-client-go/util" ) +const ( + IsInsertUpDoInsertMask = 1 << 0 + IsInsertUpDoPutMask = 1 << 1 +) + type ObTableOperationResponse struct { ObUniVersionHeader ObPayloadBase @@ -31,6 +36,7 @@ type ObTableOperationResponse struct { operationType ObTableOperationType entity *ObTableEntity affectedRows int64 + flags uint64 } func NewObTableOperationResponse() *ObTableOperationResponse { @@ -51,6 +57,7 @@ func NewObTableOperationResponse() *ObTableOperationResponse { operationType: ObTableOperationGet, entity: NewObTableEntity(), affectedRows: 0, + flags: 0, } } @@ -86,6 +93,10 @@ func (r *ObTableOperationResponse) SetAffectedRows(affectedRows int64) { r.affectedRows = affectedRows } +func (r *ObTableOperationResponse) Flags() uint64 { + return r.flags +} + func (r *ObTableOperationResponse) PCode() ObTablePacketCode { return ObTableApiExecute } @@ -98,7 +109,8 @@ func (r *ObTableOperationResponse) PayloadContentLen() int { totalLen := r.header.PayloadLen() + 1 + r.entity.PayloadLen() + - util.EncodedLengthByVi64(r.affectedRows) + util.EncodedLengthByVi64(r.affectedRows) + + util.EncodedLengthByVi64(int64(r.flags)) r.ObUniVersionHeader.SetContentLength(totalLen) return r.ObUniVersionHeader.ContentLength() @@ -122,6 +134,7 @@ func (r *ObTableOperationResponse) Encode(buffer *bytes.Buffer) { r.entity.Encode(buffer) util.EncodeVi64(buffer, r.affectedRows) + util.EncodeVi64(buffer, int64(r.flags)) } func (r *ObTableOperationResponse) Decode(buffer *bytes.Buffer) { @@ -134,4 +147,6 @@ func (r *ObTableOperationResponse) Decode(buffer *bytes.Buffer) { r.entity.Decode(buffer) r.affectedRows = util.DecodeVi64(buffer) + + r.flags = uint64(util.DecodeVi64(buffer)) } diff --git a/test/single/double_table_test.go b/test/single/double_table_test.go index 92ed572..cc455ae 100644 --- a/test/single/double_table_test.go +++ b/test/single/double_table_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/assert" + "github.com/oceanbase/obkv-table-client-go/client/option" "github.com/oceanbase/obkv-table-client-go/table" "github.com/oceanbase/obkv-table-client-go/test" ) @@ -234,3 +235,45 @@ func TestGetDouble(t *testing.T) { assert.EqualValues(t, nil, result.Value("c1")) assert.EqualValues(t, nil, result.Value("c2")) } + +func TestIncrementDouble(t *testing.T) { + tableName := testDoubleTableName + defer test.DeleteTable(tableName) + + rowKey := []*table.Column{table.NewColumn("c1", 1.1)} + mutateColumns := []*table.Column{table.NewColumn("c2", 1.1)} + affectRows, err := cli.Insert( + context.TODO(), + tableName, + rowKey, + mutateColumns, + ) + assert.Equal(t, nil, err) + assert.EqualValues(t, 1, affectRows) + + // increment + IncrementColumns := []*table.Column{table.NewColumn("c2", 1.1)} + res, err := cli.Increment( + context.TODO(), + tableName, + rowKey, + IncrementColumns, + option.WithReturnRowKey(true), + option.WithReturnAffectedEntity(true), // return affected entity + ) + assert.Equal(t, nil, err) + assert.EqualValues(t, 1, res.AffectedRows()) + assert.EqualValues(t, 2.2, res.Value("c2")) + assert.EqualValues(t, 1.1, res.RowKey()[0]) + + selectColumns := []string{"c1", "c2"} + result, err := cli.Get( + context.TODO(), + tableName, + rowKey, + selectColumns, + ) + assert.Equal(t, nil, err) + assert.EqualValues(t, 1.1, result.Value("c1")) + assert.EqualValues(t, 2.2, result.Value("c2")) +} diff --git a/test/single/int32_table_test.go b/test/single/int32_table_test.go index 13206af..56d47e2 100644 --- a/test/single/int32_table_test.go +++ b/test/single/int32_table_test.go @@ -145,6 +145,55 @@ func TestInsertOrUpdateInt32(t *testing.T) { assert.EqualValues(t, 1, result.Value("c2")) } +func TestInsertOrUpdateWithResultInt32(t *testing.T) { + tableName := testInt32TableName + defer test.DeleteTable(tableName) + + rowKey := []*table.Column{table.NewColumn("c1", int32(1))} + mutateColumns := []*table.Column{table.NewColumn("c2", int32(1))} + res, err := cli.InsertOrUpdateWithResult( + context.TODO(), + tableName, + rowKey, + mutateColumns, + ) + assert.Equal(t, nil, err) + assert.EqualValues(t, 1, res.AffectedRows()) + assert.EqualValues(t, true, res.IsInsertOrUpdateDoPut()) + + selectColumns := []string{"c1", "c2"} + result, err := cli.Get( + context.TODO(), + tableName, + rowKey, + selectColumns, + ) + assert.Equal(t, nil, err) + assert.EqualValues(t, 1, result.Value("c1")) + assert.EqualValues(t, 1, result.Value("c2")) + + res, err = cli.InsertOrUpdateWithResult( + context.TODO(), + tableName, + rowKey, + mutateColumns, + ) + assert.Equal(t, nil, err) + assert.EqualValues(t, 1, res.AffectedRows()) + assert.EqualValues(t, true, res.IsInsertOrUpdateDoPut()) + + selectColumns = []string{"c1", "c2"} + result, err = cli.Get( + context.TODO(), + tableName, + rowKey, + selectColumns, + ) + assert.Equal(t, nil, err) + assert.EqualValues(t, 1, result.Value("c1")) + assert.EqualValues(t, 1, result.Value("c2")) +} + func TestDeleteInt32(t *testing.T) { tableName := testInt32TableName defer test.DeleteTable(tableName) diff --git a/test/single/varbinary_table_test.go b/test/single/varbinary_table_test.go index dac6ff5..4816e69 100644 --- a/test/single/varbinary_table_test.go +++ b/test/single/varbinary_table_test.go @@ -19,9 +19,10 @@ package single import ( "context" - "github.com/oceanbase/obkv-table-client-go/client/option" "testing" + "github.com/oceanbase/obkv-table-client-go/client/option" + "github.com/stretchr/testify/assert" "github.com/oceanbase/obkv-table-client-go/table" @@ -236,6 +237,48 @@ func TestGetVarbinary(t *testing.T) { assert.EqualValues(t, nil, result.Value("c2")) } +func TestIncrementVarbinary(t *testing.T) { + tableName := testVarbinaryTableName + defer test.DeleteTable(tableName) + + rowKey := []*table.Column{table.NewColumn("c1", []byte("1"))} + mutateColumns := []*table.Column{table.NewColumn("c2", []byte("1"))} + affectRows, err := cli.Insert( + context.TODO(), + tableName, + rowKey, + mutateColumns, + ) + assert.Equal(t, nil, err) + assert.EqualValues(t, 1, affectRows) + + // increment + IncrementColumns := []*table.Column{table.NewColumn("c2", []byte("2"))} + res, err := cli.Increment( + context.TODO(), + tableName, + rowKey, + IncrementColumns, + option.WithReturnRowKey(true), + option.WithReturnAffectedEntity(true), // return affected entity + ) + assert.Equal(t, nil, err) + assert.EqualValues(t, 1, res.AffectedRows()) + assert.EqualValues(t, "3", res.Value("c2")) + assert.EqualValues(t, []byte("1"), res.RowKey()[0]) + + selectColumns := []string{"c1", "c2"} + result, err := cli.Get( + context.TODO(), + tableName, + rowKey, + selectColumns, + ) + assert.Equal(t, nil, err) + assert.EqualValues(t, []byte("1"), result.Value("c1")) + assert.EqualValues(t, "3", result.Value("c2")) +} + func TestAppendVarbinary(t *testing.T) { tableName := testVarbinaryTableName defer test.DeleteTable(tableName) diff --git a/test/single/varchar_table_test.go b/test/single/varchar_table_test.go index 680a97e..ed2b928 100644 --- a/test/single/varchar_table_test.go +++ b/test/single/varchar_table_test.go @@ -19,9 +19,10 @@ package single import ( "context" - "github.com/oceanbase/obkv-table-client-go/client/option" "testing" + "github.com/oceanbase/obkv-table-client-go/client/option" + "github.com/stretchr/testify/assert" "github.com/oceanbase/obkv-table-client-go/table" @@ -236,6 +237,48 @@ func TestGetVarchar(t *testing.T) { assert.EqualValues(t, nil, result.Value("c2")) } +func TestIncrementVarchar(t *testing.T) { + tableName := testVarcharTableName + defer test.DeleteTable(tableName) + + rowKey := []*table.Column{table.NewColumn("c1", "1")} + mutateColumns := []*table.Column{table.NewColumn("c2", "1")} + affectRows, err := cli.Insert( + context.TODO(), + tableName, + rowKey, + mutateColumns, + ) + assert.Equal(t, nil, err) + assert.EqualValues(t, 1, affectRows) + + // Increment + incrementColumns := []*table.Column{table.NewColumn("c2", "2")} + res, err := cli.Increment( + context.TODO(), + tableName, + rowKey, + incrementColumns, + option.WithReturnRowKey(true), + option.WithReturnAffectedEntity(true), // return affected entity + ) + assert.Equal(t, nil, err) + assert.EqualValues(t, 1, res.AffectedRows()) + assert.EqualValues(t, "3", res.Value("c2")) + assert.EqualValues(t, "1", res.RowKey()[0]) + + selectColumns := []string{"c1", "c2"} + result, err := cli.Get( + context.TODO(), + tableName, + rowKey, + selectColumns, + ) + assert.Equal(t, nil, err) + assert.EqualValues(t, "1", result.Value("c1")) + assert.EqualValues(t, "3", result.Value("c2")) +} + func TestAppendVarchar(t *testing.T) { tableName := testVarcharTableName defer test.DeleteTable(tableName) From 3eb5f29d7edb49f08181e7978d172043f69d6626 Mon Sep 17 00:00:00 2001 From: 43193589+WeiXinChan Date: Fri, 19 Apr 2024 10:47:36 +0800 Subject: [PATCH 02/19] fix confilct and adapt inserrOrUpdate new format --- client/obclient.go | 2 +- obkvrpc/rpc_server.go | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/client/obclient.go b/client/obclient.go index 2d8be8b..2c6d627 100644 --- a/client/obclient.go +++ b/client/obclient.go @@ -639,7 +639,7 @@ func (c *obClient) execute( tableParam, err := c.GetTableParam(ctx, tableName, rowKey) if err != nil { log.Error("Runtime", ctx.Value(log.ObkvTraceIdName), "error occur in execute", - log.Int64("opType", int64(opType)), log.String("tableName", tableName), log.String("tableParam", tableParam.String())) + log.Int64("opType", int64(opType)), log.String("tableName", tableName)) return nil, errors.WithMessagef(err, "get table param, tableName:%s, opType:%d", tableName, opType), needRetry } diff --git a/obkvrpc/rpc_server.go b/obkvrpc/rpc_server.go index d4d7990..ec002b2 100644 --- a/obkvrpc/rpc_server.go +++ b/obkvrpc/rpc_server.go @@ -22,9 +22,10 @@ import ( "runtime/debug" "sync" - "github.com/oceanbase/obkv-table-client-go/log" "github.com/panjf2000/ants/v2" "go.uber.org/zap" + + "github.com/oceanbase/obkv-table-client-go/log" ) // CodecServer implement interfaces to read/decode request @@ -35,6 +36,7 @@ type CodecServer interface { Call(*Request, *Response) error Close() GetCloseChan() *chan struct{} + GetNormalErrMsg() []byte } // Server implement server frame @@ -152,6 +154,11 @@ func (s *Server) RunWorker(wg *sync.WaitGroup, reqChan <-chan *Request, cServer if err := recover(); err != nil { log.Error("RPCServer", nil, "RunWorker panic", log.Any("error", err), log.String("stack", string(debug.Stack()))) } + // try to send err msg to client, ignore error + resp := Response{ID: "", RspContent: cServer.GetNormalErrMsg()} + _ = cServer.WriteResponse(&resp) + closeChan := *cServer.GetCloseChan() + close(closeChan) }() for req := range reqChan { resp := s.respObjPool.Get().(*Response) From 276616d71f084ed6287f4659ad7e4bdaabae0d27 Mon Sep 17 00:00:00 2001 From: 43193589+WeiXinChan Date: Mon, 22 Apr 2024 10:53:27 +0800 Subject: [PATCH 03/19] dev 2024 --- route/config_server_info.go | 5 +++++ route/location.go | 7 +++++++ 2 files changed, 12 insertions(+) diff --git a/route/config_server_info.go b/route/config_server_info.go index 3c01d01..07e43b8 100644 --- a/route/config_server_info.go +++ b/route/config_server_info.go @@ -70,11 +70,16 @@ func (i *ObConfigServerInfo) FetchRslist() (*ObRslist, error) { return nil, errors.Errorf("fail to split ip and port, server:%s", server.Address) } ip := res[0] + if ip == "172.16.46.180" { + ip = "115.29.212.38" + println(ip) + } svrPort, err := strconv.Atoi(res[1]) if err != nil { return nil, errors.Errorf("fail to convert server port to int, port:%s", res[1]) } serverAddr := NewObServerAddr(ip, server.SqlPort, svrPort) + println(serverAddr.String()) rslist.Append(serverAddr) } diff --git a/route/location.go b/route/location.go index 3eedfba..c82bce3 100644 --- a/route/location.go +++ b/route/location.go @@ -306,6 +306,9 @@ func getTableEntryFromResultSet(rows *Rows, tableName string) (*ObTableEntry, er if err != nil { return nil, errors.WithMessagef(err, "scan row") } + if svrIp == "172.16.46.180" { + svrIp = "115.29.212.38" + } if strings.EqualFold(status, "active") { replica := newReplicaLocation( NewObServerAddr(svrIp, sqlPort, svrPort), @@ -419,6 +422,10 @@ func getPartLocationEntryFromResultSet(rows *Rows, tableName string) (*ObPartLoc return nil, errors.WithMessagef(err, "scan row") } + if svrIp == "172.16.46.180" { + svrIp = "115.29.212.38" + } + // create ObPartLocationEntry if isFirstRow { isFirstRow = false From 0adfdbf8084ef7c4eeecd5f6ceebe5f8038bcaed Mon Sep 17 00:00:00 2001 From: Liyao Xiong Date: Tue, 21 May 2024 21:36:15 +0800 Subject: [PATCH 04/19] [feat] Add OB-Redis framework adaptation (#160) * feat: add ob redis and test * [feat] compat redis frame in observer --------- Co-authored-by: eemjwu <744706242@qq.com> --- client/client.go | 1 + client/obclient.go | 21 ++++++++++++++ obkvrpc/rpc_server.go | 15 +++++++--- protocol/operation.go | 1 + protocol/operation_request.go | 3 ++ protocol/operation_response.go | 3 ++ protocol/packet_code.go | 7 +++++ test/single/redis_test.go | 51 ++++++++++++++++++++++++++++++++++ 8 files changed, 98 insertions(+), 4 deletions(-) create mode 100644 test/single/redis_test.go diff --git a/client/client.go b/client/client.go index db2a1bb..4bb1176 100644 --- a/client/client.go +++ b/client/client.go @@ -198,4 +198,5 @@ type Client interface { // Close closes the client. // close will disconnect all connections and exit all goroutines. Close() + Redis(ctx context.Context, tableName string, rowKey []*table.Column, mutateColumns []*table.Column, opts ...option.ObOperationOption) (SingleResult, error) } diff --git a/client/obclient.go b/client/obclient.go index 2c6d627..f9de560 100644 --- a/client/obclient.go +++ b/client/obclient.go @@ -533,6 +533,27 @@ func (c *obClient) Get( return newObSingleResult(res.AffectedRows(), res.Entity(), res.Flags()), nil } +func (c *obClient) Redis( + ctx context.Context, + tableName string, + rowKey []*table.Column, + mutateColumns []*table.Column, + opts ...option.ObOperationOption) (SingleResult, error) { + log.InitTraceId(&ctx) + operationOptions := c.getOperationOptions(opts...) + res, err := c.executeWithRetry( + ctx, + tableName, + protocol.ObTableOperationRedis, + rowKey, + mutateColumns, + operationOptions) + if err != nil { + return nil, err + } + return newObSingleResult(res.AffectedRows(), res.Entity(), res.Flags()), nil +} + func (c *obClient) Query(ctx context.Context, tableName string, rangePairs []*table.RangePair, opts ...option.ObQueryOption) (QueryResultIterator, error) { log.InitTraceId(&ctx) queryOpts := c.getObQueryOptions(opts...) diff --git a/obkvrpc/rpc_server.go b/obkvrpc/rpc_server.go index ec002b2..3ac3eca 100644 --- a/obkvrpc/rpc_server.go +++ b/obkvrpc/rpc_server.go @@ -49,9 +49,10 @@ type Server struct { // Request is generated by a decoder type Request struct { - Method string // use for mapping - Args [][]byte - ID string + Method string // use for mapping + Args [][]byte + ID string + PlainReq []byte } type Response struct { @@ -95,6 +96,7 @@ func (s *Server) PutRequest(req *Request) { req.Args[i] = req.Args[i][:0] } req.Args = req.Args[:0] + req.PlainReq = req.PlainReq[:0] s.reqObjPool.Put(req) } @@ -152,7 +154,12 @@ func (s *Server) RunWorker(wg *sync.WaitGroup, reqChan <-chan *Request, cServer defer func() { wg.Done() if err := recover(); err != nil { - log.Error("RPCServer", nil, "RunWorker panic", log.Any("error", err), log.String("stack", string(debug.Stack()))) + log.Error("RPCServer", nil, "RunWorker panic, close connection", log.Any("error", err), log.String("stack", string(debug.Stack()))) + // try to send err msg to client, ignore error + resp := Response{ID: "", RspContent: cServer.GetNormalErrMsg()} + _ = cServer.WriteResponse(&resp) + closeChan := *cServer.GetCloseChan() + close(closeChan) } // try to send err msg to client, ignore error resp := Response{ID: "", RspContent: cServer.GetNormalErrMsg()} diff --git a/protocol/operation.go b/protocol/operation.go index 3a3a92f..f665ff2 100644 --- a/protocol/operation.go +++ b/protocol/operation.go @@ -95,6 +95,7 @@ const ( ObTableOperationReplace ObTableOperationIncrement ObTableOperationAppend + ObTableOperationRedis = 13 ) func (o *ObTableOperation) OpType() ObTableOperationType { diff --git a/protocol/operation_request.go b/protocol/operation_request.go index b59eaad..71561b1 100644 --- a/protocol/operation_request.go +++ b/protocol/operation_request.go @@ -185,6 +185,9 @@ func (r *ObTableOperationRequest) SetReturnAffectedRows(returnAffectedRows bool) } func (r *ObTableOperationRequest) PCode() ObTablePacketCode { + if r.tableOperation.opType == ObTableOperationRedis { + return ObTableApiRedis + } return ObTableApiExecute } diff --git a/protocol/operation_response.go b/protocol/operation_response.go index b13d4f5..75b065f 100644 --- a/protocol/operation_response.go +++ b/protocol/operation_response.go @@ -98,6 +98,9 @@ func (r *ObTableOperationResponse) Flags() uint64 { } func (r *ObTableOperationResponse) PCode() ObTablePacketCode { + if r.operationType == ObTableOperationRedis { + return ObTableApiRedis + } return ObTableApiExecute } diff --git a/protocol/packet_code.go b/protocol/packet_code.go index 89c718c..ca5561c 100644 --- a/protocol/packet_code.go +++ b/protocol/packet_code.go @@ -32,6 +32,7 @@ const ( ObTableApiExecuteAsyncQuery ObTableApiMove ObTableApiErrorPacket + ObTableApiRedis ObTableApiNoSuch = -1 ) @@ -45,6 +46,7 @@ const ( obTableApiExecuteAsyncQueryStr string = "ob table execute async query" obTableApiMoveStr string = "ob table route" obTableApiErrorPacketStr string = "ob table error" + obTableApiRedisStr string = "ob table redis" ) const ( @@ -55,6 +57,7 @@ const ( obTableApiPCodeQueryAndMute uint32 = 0x1105 obTableApiPCodeExecuteAsyncQuery uint32 = 0x1106 obTableApiPCodeMove uint32 = 0x1124 + obTableApiPCodeRedisExecute uint32 = 0x1126 obTableApiPCodeErrorPacket uint32 = 0x010 ) @@ -67,6 +70,7 @@ var obTablePacketCodeStrings = []string{ ObTableApiExecuteAsyncQuery: obTableApiExecuteAsyncQueryStr, ObTableApiMove: obTableApiMoveStr, ObTableApiErrorPacket: obTableApiErrorPacketStr, + ObTableApiRedis: obTableApiRedisStr, } var obTablePacketCodePCodes = []uint32{ @@ -78,6 +82,7 @@ var obTablePacketCodePCodes = []uint32{ ObTableApiExecuteAsyncQuery: obTableApiPCodeExecuteAsyncQuery, ObTableApiMove: obTableApiPCodeMove, ObTableApiErrorPacket: obTableApiPCodeErrorPacket, + ObTableApiRedis: obTableApiPCodeRedisExecute, } func (c ObTablePacketCode) Value() uint32 { @@ -102,6 +107,8 @@ func (c ObTablePacketCode) ValueOf(pCode uint32) (ObTablePacketCode, error) { // return ObTableApiMove, nil case obTableApiPCodeErrorPacket: return ObTableApiErrorPacket, nil + case obTableApiPCodeRedisExecute: + return ObTableApiRedis, nil } return ObTableApiNoSuch, errors.New("not match code") } diff --git a/test/single/redis_test.go b/test/single/redis_test.go new file mode 100644 index 0000000..0af3898 --- /dev/null +++ b/test/single/redis_test.go @@ -0,0 +1,51 @@ +/*- + * #%L + * OBKV Table Client Framework + * %% + * Copyright (C) 2021 OceanBase + * %% + * OBKV Table Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ + +package single + +import ( + "context" + "github.com/oceanbase/obkv-table-client-go/client/option" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/oceanbase/obkv-table-client-go/table" + "github.com/oceanbase/obkv-table-client-go/test" +) + +const ( + testRedisTableName = "modis_list_table" +) + +func TestRedis(t *testing.T) { + tableName := testRedisTableName + defer test.DeleteTable(tableName) + + rowKey := []*table.Column{table.NewColumn("db", int64(1)), table.NewColumn("rkey", []byte("key1")), table.NewColumn("index", int64(-1))} + mutateColumns := []*table.Column{table.NewColumn("REDIS_CODE_STR", []byte("*2\r\n$5\r\nLpUSH\r\n$6\r\nfoobar\r\n"))} + result, err := cli.Redis( + context.TODO(), + tableName, + rowKey, + mutateColumns, + option.WithReturnAffectedEntity(true), + ) + assert.Equal(t, nil, err) + assert.Equal(t, "test hello ob_redis list operator!", result.Value("REDIS_CODE_STR")) + assert.EqualValues(t, 10086, result.AffectedRows()) +} From b093d85ebf719268af0a6cf6317e155b9f89aae5 Mon Sep 17 00:00:00 2001 From: lyxiong0 Date: Tue, 28 May 2024 11:50:22 +0800 Subject: [PATCH 05/19] [fix] fix rpc server channels closing off each other --- obkvrpc/rpc_server.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/obkvrpc/rpc_server.go b/obkvrpc/rpc_server.go index 3ac3eca..fc7b199 100644 --- a/obkvrpc/rpc_server.go +++ b/obkvrpc/rpc_server.go @@ -119,7 +119,6 @@ func (s *Server) ReadRequest(reqChan chan<- *Request, cServer CodecServer) { isStop := false select { case <-*closeChan: - cServer.Close() close(reqChan) return default: @@ -158,14 +157,7 @@ func (s *Server) RunWorker(wg *sync.WaitGroup, reqChan <-chan *Request, cServer // try to send err msg to client, ignore error resp := Response{ID: "", RspContent: cServer.GetNormalErrMsg()} _ = cServer.WriteResponse(&resp) - closeChan := *cServer.GetCloseChan() - close(closeChan) } - // try to send err msg to client, ignore error - resp := Response{ID: "", RspContent: cServer.GetNormalErrMsg()} - _ = cServer.WriteResponse(&resp) - closeChan := *cServer.GetCloseChan() - close(closeChan) }() for req := range reqChan { resp := s.respObjPool.Get().(*Response) From 163d79d482f8ea97f9842bc0036205776c6ff63f Mon Sep 17 00:00:00 2001 From: lyxiong0 Date: Fri, 31 May 2024 17:26:04 +0800 Subject: [PATCH 06/19] [feat] add ip:port info in execute error log --- client/obclient.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/client/obclient.go b/client/obclient.go index f9de560..6227c14 100644 --- a/client/obclient.go +++ b/client/obclient.go @@ -689,13 +689,15 @@ func (c *obClient) execute( err, needRetry = c.executeInternal(ctx, tableName, tableParam.Table(), request, result) if err != nil { trace := fmt.Sprintf("Y%X-%016X", result.UniqueId(), result.Sequence()) - log.Error("Runtime", ctx.Value(log.ObkvTraceIdName), "error occur in execute", log.String("observerTraceId", trace)) + log.Error("Runtime", ctx.Value(log.ObkvTraceIdName), "error occur in execute", log.String("observerTraceId", trace), + log.String("remote addr", result.RemoteAddr().String())) return nil, err, needRetry } if oberror.ObErrorCode(result.Header().ErrorNo()) != oberror.ObSuccess { trace := fmt.Sprintf("Y%X-%016X", result.UniqueId(), result.Sequence()) - log.Error("Runtime", ctx.Value(log.ObkvTraceIdName), "error occur in execute", log.String("observerTraceId", trace)) + log.Error("Runtime", ctx.Value(log.ObkvTraceIdName), "error occur in execute", log.String("observerTraceId", trace), + log.String("remote addr", result.RemoteAddr().String())) return nil, protocol.NewProtocolError( result.RemoteAddr().String(), oberror.ObErrorCode(result.Header().ErrorNo()), From 1feedba613bd1031cf8335da3367547c092af2d6 Mon Sep 17 00:00:00 2001 From: lyxiong0 Date: Wed, 5 Jun 2024 20:38:09 +0800 Subject: [PATCH 07/19] [fear] add ObKvRedisParseError errcode --- error/error_code.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/error/error_code.go b/error/error_code.go index 5292159..d1b595f 100644 --- a/error/error_code.go +++ b/error/error_code.go @@ -2154,6 +2154,7 @@ const ( ObKvColumnTypeNotMatch ObErrorCode = -10511 ObKvCollationMismatch ObErrorCode = -10512 ObKvScanRangeMissing ObErrorCode = -10513 + ObKvRedisParseError ObErrorCode = -10515 ObErrValuesClauseNeedHaveColumn ObErrorCode = -11000 ObErrValuesClauseCannotUseDefaultValues ObErrorCode = -11001 ObWrongPartitionName ObErrorCode = -11002 @@ -4303,6 +4304,7 @@ var ObErrorNames = map[ObErrorCode]string{ ObKvColumnTypeNotMatch: "ObKvColumnTypeNotMatch", ObKvCollationMismatch: "ObKvCollationMismatch", ObKvScanRangeMissing: "ObKvScanRangeMissing", + ObKvRedisParseError: "ObKvRedisParseError", ObErrValuesClauseNeedHaveColumn: "ObErrValuesClauseNeedHaveColumn", ObErrValuesClauseCannotUseDefaultValues: "ObErrValuesClauseCannotUseDefaultValues", ObWrongPartitionName: "ObWrongPartitionName", From 359a1a814d863c949f2428ab355bff36e949cb29 Mon Sep 17 00:00:00 2001 From: lyxiong0 Date: Wed, 5 Jun 2024 21:53:22 +0800 Subject: [PATCH 08/19] [fix] fetchNextWithRetry overwrite error --- client/query_result_iterator.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/client/query_result_iterator.go b/client/query_result_iterator.go index f34be72..26cb94f 100644 --- a/client/query_result_iterator.go +++ b/client/query_result_iterator.go @@ -174,7 +174,8 @@ func (q *ObQueryResultIterator) fetchNextWithRetry(hasPrev bool) error { return errors.WithMessage(err, "retry and timeout") default: // get and set route table again - targetParts, err := q.queryExecutor.getTableParams(q.ctx, q.queryExecutor.tableName, q.queryExecutor.keyRanges) + var targetParts []*route.ObTableParam + targetParts, err = q.queryExecutor.getTableParams(q.ctx, q.queryExecutor.tableName, q.queryExecutor.keyRanges) if err != nil { return err } From 6541a94b8b44043846887c1751288806f9a73ff6 Mon Sep 17 00:00:00 2001 From: lyxiong0 Date: Wed, 12 Jun 2024 11:58:53 +0800 Subject: [PATCH 09/19] fix error message print password --- route/authentic.go | 2 +- route/sql_driver.go | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/route/authentic.go b/route/authentic.go index 2c9a049..4bfcf53 100644 --- a/route/authentic.go +++ b/route/authentic.go @@ -39,6 +39,6 @@ func NewObUserAuth(userName string, password string) *ObUserAuth { func (a *ObUserAuth) String() string { return "ObUserAuth{" + "userName:" + a.userName + ", " + - "password:" + a.password + + "password:[invisible]" + "}" } diff --git a/route/sql_driver.go b/route/sql_driver.go index 254fafe..42d8045 100644 --- a/route/sql_driver.go +++ b/route/sql_driver.go @@ -41,11 +41,15 @@ func NewDB( "@tcp(", ip, ":", port, ")/", database, "?charset=utf8"}, "") db, err := sql.Open("mysql", dsn) if err != nil { - return nil, errors.WithMessagef(err, "open db, dsn:%s", dsn) + hidePass := strings.Join([]string{userName, ":[password]", + "@tcp(", ip, ":", port, ")/", database, "?charset=utf8"}, "") + return nil, errors.WithMessagef(err, "open db, dsn:%s", hidePass) } err = db.Ping() if err != nil { - return nil, errors.WithMessagef(err, "ping db, dsn:%s", dsn) + hidePass := strings.Join([]string{userName, ":[password]", + "@tcp(", ip, ":", port, ")/", database, "?charset=utf8"}, "") + return nil, errors.WithMessagef(err, "ping db, dsn:%s", hidePass) } return db, nil } From f165ebf382658038706acbcb89f2ec5528102f45 Mon Sep 17 00:00:00 2001 From: groundwu <1175416256@qq.com> Date: Wed, 12 Jun 2024 17:43:56 +0800 Subject: [PATCH 10/19] fix: table entry do not refresh when create connection fail --- route/route_info.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/route/route_info.go b/route/route_info.go index 5b8eeb7..0589900 100644 --- a/route/route_info.go +++ b/route/route_info.go @@ -61,6 +61,9 @@ func (i *ObRouteInfo) getTableWithRetry(ctx context.Context, server *ObServerAdd for { select { case <-ctx.Done(): + err := i.refreshTableLocations(&server.tcpAddr) + log.Warn("[Runtime]", nil, "get table fail and try to refresh table location ", + log.String("server", server.String()), log.String("err", err.Error())) return nil, errors.Errorf("get table, server:%s", server.String()) default: t, ok = i.tableRoster.Get(server.tcpAddr) From 633c41e18820de7a02988de270032f11554694ed Mon Sep 17 00:00:00 2001 From: groundwu <1175416256@qq.com> Date: Wed, 12 Jun 2024 17:59:51 +0800 Subject: [PATCH 11/19] fix ut --- route/authentic_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/route/authentic_test.go b/route/authentic_test.go index a0e790b..14c6291 100644 --- a/route/authentic_test.go +++ b/route/authentic_test.go @@ -25,9 +25,9 @@ import ( func TestObUserAuth(t *testing.T) { au := &ObUserAuth{} - assert.Equal(t, "ObUserAuth{userName:, password:}", au.String()) + assert.Equal(t, "ObUserAuth{userName:, password:[invisible]}", au.String()) au = NewObUserAuth("sys", "pass") assert.EqualValues(t, "sys", au.UserName()) assert.EqualValues(t, "pass", au.Password()) - assert.Equal(t, "ObUserAuth{userName:sys, password:pass}", au.String()) + assert.Equal(t, "ObUserAuth{userName:sys, password:[invisible]}", au.String()) } From 3bf89433c05148b7af0e0f5a97a44a5cbed0457e Mon Sep 17 00:00:00 2001 From: groundwu <1175416256@qq.com> Date: Wed, 12 Jun 2024 18:06:51 +0800 Subject: [PATCH 12/19] fix ut --- client/obclient_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/obclient_test.go b/client/obclient_test.go index b497707..b85cede 100644 --- a/client/obclient_test.go +++ b/client/obclient_test.go @@ -52,7 +52,7 @@ func TestObClient_String(t *testing.T) { cfg := config.NewDefaultClientConfig() cli, err := newObClient(testConfigUrl, testFullUserName, testPassWord, testSysUserName, testSysPassWord, cfg) assert.Equal(t, nil, err) - assert.Equal(t, "obClient{config:ClientConfig{ConnPoolMaxConnSize:1, ConnConnectTimeOut:1s, ConnLoginTimeout:5s, OperationTimeOut:10s, LogLevel:1, TableEntryRefreshLockTimeout:4s, TableEntryRefreshTryTimes:3, TableEntryRefreshIntervalBase:100ms, TableEntryRefreshIntervalCeiling:1.6s, MetadataRefreshInterval:1m0s, MetadataRefreshLockTimeout:8s, RsListLocalFileLocation:, RsListHttpGetTimeout:1s, RsListHttpGetRetryTimes:3, RsListHttpGetRetryInterval:100ms, EnableRerouting:true, MaxConnectionAge:0s, EnableSLBLoadBalance:false}, configUrl:http://127.0.0.1:8080/services?User_ID=xxx&UID=xxx&Action=ObRootServiceInfo&ObCluster=xxx&database=xxx, fullUserName:user@mysql#obkv_cluster, userName:user, tenantName:mysql, clusterName:obkv_cluster, database:xxx, sysUA:ObUserAuth{userName:sys, password:}}", cli.String()) + assert.Equal(t, "obClient{config:ClientConfig{ConnPoolMaxConnSize:1, ConnConnectTimeOut:1s, ConnLoginTimeout:5s, OperationTimeOut:10s, LogLevel:1, TableEntryRefreshLockTimeout:4s, TableEntryRefreshTryTimes:3, TableEntryRefreshIntervalBase:100ms, TableEntryRefreshIntervalCeiling:1.6s, MetadataRefreshInterval:1m0s, MetadataRefreshLockTimeout:8s, RsListLocalFileLocation:, RsListHttpGetTimeout:1s, RsListHttpGetRetryTimes:3, RsListHttpGetRetryInterval:100ms, EnableRerouting:true, MaxConnectionAge:0s, EnableSLBLoadBalance:false}, configUrl:http://127.0.0.1:8080/services?User_ID=xxx&UID=xxx&Action=ObRootServiceInfo&ObCluster=xxx&database=xxx, fullUserName:user@mysql#obkv_cluster, userName:user, tenantName:mysql, clusterName:obkv_cluster, database:xxx, sysUA:ObUserAuth{userName:sys, password:[invisible]}}", cli.String()) } func TestObClient_parseFullUserName(t *testing.T) { From 03c450bf9298dbe433d22c0c4d5bd04cd9e4fe46 Mon Sep 17 00:00:00 2001 From: groundwu <1175416256@qq.com> Date: Wed, 12 Jun 2024 21:00:46 +0800 Subject: [PATCH 13/19] remove log --- route/route_info.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/route/route_info.go b/route/route_info.go index 0589900..b7f29ec 100644 --- a/route/route_info.go +++ b/route/route_info.go @@ -61,9 +61,9 @@ func (i *ObRouteInfo) getTableWithRetry(ctx context.Context, server *ObServerAdd for { select { case <-ctx.Done(): - err := i.refreshTableLocations(&server.tcpAddr) - log.Warn("[Runtime]", nil, "get table fail and try to refresh table location ", - log.String("server", server.String()), log.String("err", err.Error())) + i.refreshTableLocations(&server.tcpAddr) + log.Warn("[Runtime]", nil, "get table fail and fail to refresh table location ", + log.String("server", server.String())) return nil, errors.Errorf("get table, server:%s", server.String()) default: t, ok = i.tableRoster.Get(server.tcpAddr) From 28d8e2e9c5f66f6c3b98431b9ec5064ecb6d9ac7 Mon Sep 17 00:00:00 2001 From: groundwu <1175416256@qq.com> Date: Thu, 13 Jun 2024 16:02:44 +0800 Subject: [PATCH 14/19] fix panic cause by print log --- client/obclient.go | 8 +++++--- route/route_info.go | 1 + 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/client/obclient.go b/client/obclient.go index 6227c14..f3cc88c 100644 --- a/client/obclient.go +++ b/client/obclient.go @@ -689,17 +689,19 @@ func (c *obClient) execute( err, needRetry = c.executeInternal(ctx, tableName, tableParam.Table(), request, result) if err != nil { trace := fmt.Sprintf("Y%X-%016X", result.UniqueId(), result.Sequence()) + addr := fmt.Sprintf("%s:%d", tableParam.Table().Ip(), tableParam.Table().Port()) log.Error("Runtime", ctx.Value(log.ObkvTraceIdName), "error occur in execute", log.String("observerTraceId", trace), - log.String("remote addr", result.RemoteAddr().String())) + log.String("remote addr", addr)) return nil, err, needRetry } if oberror.ObErrorCode(result.Header().ErrorNo()) != oberror.ObSuccess { trace := fmt.Sprintf("Y%X-%016X", result.UniqueId(), result.Sequence()) + addr := fmt.Sprintf("%s:%d", tableParam.Table().Ip(), tableParam.Table().Port()) log.Error("Runtime", ctx.Value(log.ObkvTraceIdName), "error occur in execute", log.String("observerTraceId", trace), - log.String("remote addr", result.RemoteAddr().String())) + log.String("remote addr", addr)) return nil, protocol.NewProtocolError( - result.RemoteAddr().String(), + addr, oberror.ObErrorCode(result.Header().ErrorNo()), result.Header().Msg(), result.Sequence(), diff --git a/route/route_info.go b/route/route_info.go index b7f29ec..cb21653 100644 --- a/route/route_info.go +++ b/route/route_info.go @@ -61,6 +61,7 @@ func (i *ObRouteInfo) getTableWithRetry(ctx context.Context, server *ObServerAdd for { select { case <-ctx.Done(): + log.Warn("[Runtime]", nil, "catch ctx Done") i.refreshTableLocations(&server.tcpAddr) log.Warn("[Runtime]", nil, "get table fail and fail to refresh table location ", log.String("server", server.String())) From 992187392445a6b27ac9a334e3ad1b31d5d7bfc5 Mon Sep 17 00:00:00 2001 From: groundwu <1175416256@qq.com> Date: Thu, 13 Jun 2024 16:21:23 +0800 Subject: [PATCH 15/19] adapt log print --- route/route_info.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/route/route_info.go b/route/route_info.go index cb21653..7f8d732 100644 --- a/route/route_info.go +++ b/route/route_info.go @@ -61,10 +61,9 @@ func (i *ObRouteInfo) getTableWithRetry(ctx context.Context, server *ObServerAdd for { select { case <-ctx.Done(): - log.Warn("[Runtime]", nil, "catch ctx Done") - i.refreshTableLocations(&server.tcpAddr) - log.Warn("[Runtime]", nil, "get table fail and fail to refresh table location ", + log.Warn("[Runtime]", nil, "get table timeout and try to refresh table location ", log.String("server", server.String())) + go i.refreshTableLocations(&server.tcpAddr) return nil, errors.Errorf("get table, server:%s", server.String()) default: t, ok = i.tableRoster.Get(server.tcpAddr) From 491beeb5011dcd3581481650054dd13864cea078 Mon Sep 17 00:00:00 2001 From: groundwu <1175416256@qq.com> Date: Thu, 13 Jun 2024 21:25:20 +0800 Subject: [PATCH 16/19] fix refresh table entry invalid --- route/route_info.go | 35 ++++++++++++++++++++++++++++------- route/table_entry.go | 2 +- 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/route/route_info.go b/route/route_info.go index 7f8d732..9e806a2 100644 --- a/route/route_info.go +++ b/route/route_info.go @@ -440,13 +440,34 @@ func (i *ObRouteInfo) refreshTableLocations(addr *tcpAddr) error { i.tableLocations.Range(func(key, value interface{}) bool { tableName := key.(string) entry := value.(*ObTableEntry) - for _, replica := range entry.tableLocation.replicaLocations { - if replica.addr.tcpAddr.Equal(addr) { - // trigger refresh table - isRefreshing := &atomic.Bool{} - isRefreshing.Store(false) - i.taskInfo.tables.AddIfAbsent(tableName, isRefreshing) - i.taskInfo.TriggerRefreshTable() + if !entry.IsPartitionTable() { + for _, replica := range entry.tableLocation.replicaLocations { + if replica.addr.tcpAddr.Equal(addr) { + // trigger refresh table + isRefreshing := &atomic.Bool{} + isRefreshing.Store(false) + i.taskInfo.tables.AddIfAbsent(tableName, isRefreshing) + i.taskInfo.TriggerRefreshTable() + } + } + } else { + for partId, partLocation := range entry.partLocationEntry.partLocations { + if partLocation != nil { + replica := partLocation.getReplica(ConsistencyStrong) + if replica != nil && replica.addr.tcpAddr.Equal(addr) { + // trigger refresh table + isRefreshing := &atomic.Bool{} + isRefreshing.Store(false) + i.taskInfo.tables.AddIfAbsent(tableName, isRefreshing) + i.taskInfo.TriggerRefreshTable() + } else if replica == nil { + log.Warn("Routine", nil, "partLocation leader replica is nil", + log.String("tableName", tableName), log.Uint64("partId", partId)) + } + } else { + log.Warn("Routine", nil, "partLocation is nil", + log.String("tableName", tableName), log.Uint64("partId", partId)) + } } } return true diff --git a/route/table_entry.go b/route/table_entry.go index 5d73f5c..49037cc 100644 --- a/route/table_entry.go +++ b/route/table_entry.go @@ -36,7 +36,7 @@ type ObTableEntry struct { refreshTime time.Time // last refresh time tableEntryKey *ObTableEntryKey // clusterName/tenantName/databaseName/tableName partitionInfo *obPartitionInfo // partition key meta info - tableLocation *ObTableLocation // location of table, all replica information of table + tableLocation *ObTableLocation // all replica location for non-partition table partLocationEntry *ObPartLocationEntry // all partition location of table } From 639565a6c2bbb37da25416e2c01fea191979d2b8 Mon Sep 17 00:00:00 2001 From: groundwu <1175416256@qq.com> Date: Fri, 14 Jun 2024 09:51:22 +0800 Subject: [PATCH 17/19] fix bug --- route/route_info.go | 1 + 1 file changed, 1 insertion(+) diff --git a/route/route_info.go b/route/route_info.go index 9e806a2..44a2066 100644 --- a/route/route_info.go +++ b/route/route_info.go @@ -460,6 +460,7 @@ func (i *ObRouteInfo) refreshTableLocations(addr *tcpAddr) error { isRefreshing.Store(false) i.taskInfo.tables.AddIfAbsent(tableName, isRefreshing) i.taskInfo.TriggerRefreshTable() + return true } else if replica == nil { log.Warn("Routine", nil, "partLocation leader replica is nil", log.String("tableName", tableName), log.Uint64("partId", partId)) From 6838942166028b941219370d516c1018c43f7b62 Mon Sep 17 00:00:00 2001 From: Liyao Xiong Date: Mon, 24 Jun 2024 10:44:56 +0800 Subject: [PATCH 18/19] fix rpc server do not stop when ReadRequest panic (#164) --- obkvrpc/rpc_server.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/obkvrpc/rpc_server.go b/obkvrpc/rpc_server.go index fc7b199..d635f9b 100644 --- a/obkvrpc/rpc_server.go +++ b/obkvrpc/rpc_server.go @@ -114,13 +114,15 @@ func (s *Server) Close() { // ReadRequest read requests until an error occurs func (s *Server) ReadRequest(reqChan chan<- *Request, cServer CodecServer) { + defer func() { + close(reqChan) + }() closeChan := cServer.GetCloseChan() + isStop := false for { - isStop := false select { case <-*closeChan: - close(reqChan) - return + isStop = true default: // ReadRequest may include read and encode, depend on the cServer req := s.reqObjPool.Get().(*Request) @@ -132,7 +134,6 @@ func (s *Server) ReadRequest(reqChan chan<- *Request, cServer CodecServer) { log.Warn("RPCServer", req.ID, "fail to read command", zap.Error(err)) } s.PutRequest(req) - close(reqChan) isStop = true } else { reqChan <- req @@ -180,11 +181,12 @@ func (s *Server) RunWorker(wg *sync.WaitGroup, reqChan <-chan *Request, cServer func (s *Server) ServeCodec(cServer CodecServer, maxQueSize int) { wg := new(sync.WaitGroup) defer func() { - wg.Wait() - cServer.Close() if err := recover(); err != nil { log.Error("RPCServer", nil, "ServeCodec panic", log.Any("error", err), log.String("stack", string(debug.Stack()))) } + log.Debug("RPCServer", nil, "close CodecServer") + cServer.Close() + wg.Wait() }() reqChan := make(chan *Request, maxQueSize) var err error From c0f1b6d09db4f00324ee3745ca32586aed16fef1 Mon Sep 17 00:00:00 2001 From: eemjwu <744706242@qq.com> Date: Tue, 25 Jun 2024 19:28:49 +0800 Subject: [PATCH 19/19] add redis entity_type --- client/client.go | 3 +++ client/obbatch.go | 24 ++++++++++++++---------- client/obclient.go | 11 +++++++++++ client/query.go | 24 ++++++++++++++---------- protocol/constants.go | 1 + table/kv_mode.go | 1 + 6 files changed, 44 insertions(+), 20 deletions(-) diff --git a/client/client.go b/client/client.go index 4bb1176..db2c18f 100644 --- a/client/client.go +++ b/client/client.go @@ -19,6 +19,7 @@ package client import ( "context" + "github.com/oceanbase/obkv-table-client-go/protocol" "github.com/pkg/errors" @@ -199,4 +200,6 @@ type Client interface { // close will disconnect all connections and exit all goroutines. Close() Redis(ctx context.Context, tableName string, rowKey []*table.Column, mutateColumns []*table.Column, opts ...option.ObOperationOption) (SingleResult, error) + SetEntityType(entityType protocol.ObTableEntityType) + GetEntityType() protocol.ObTableEntityType } diff --git a/client/obbatch.go b/client/obbatch.go index f23c290..e40d7db 100644 --- a/client/obbatch.go +++ b/client/obbatch.go @@ -42,7 +42,7 @@ func newObBatchExecutor(tableName string, cli *obClient) *obBatchExecutor { cli: cli, rowKeyName: nil, samePropertiesNames: false, - entityType: protocol.ObTableEntityTypeDynamic, + entityType: cli.entityType, } } @@ -57,15 +57,19 @@ type obBatchExecutor struct { func (b *obBatchExecutor) setBatchOptions(batchOptions *option.ObBatchOptions) { b.samePropertiesNames = batchOptions.SamePropertiesNames - switch batchOptions.KeyValueMode { - case table.DynamicMode: - b.entityType = protocol.ObTableEntityTypeDynamic - case table.ObTableMode: - b.entityType = protocol.ObTableEntityTypeKV - case table.ObHBaseMode: - b.entityType = protocol.ObTableEntityTypeHKV - default: - b.entityType = protocol.ObTableEntityTypeDynamic + if b.entityType == protocol.ObTableEntityTypeDynamic { + switch batchOptions.KeyValueMode { + case table.DynamicMode: + b.entityType = protocol.ObTableEntityTypeDynamic + case table.ObTableMode: + b.entityType = protocol.ObTableEntityTypeKV + case table.ObHBaseMode: + b.entityType = protocol.ObTableEntityTypeHKV + case table.ObRedisMode: + b.entityType = protocol.ObTableEntityTypeRedis + default: + b.entityType = protocol.ObTableEntityTypeDynamic + } } } diff --git a/client/obclient.go b/client/obclient.go index f3cc88c..6f2da15 100644 --- a/client/obclient.go +++ b/client/obclient.go @@ -50,6 +50,7 @@ type obClient struct { password string database string sysUA *route.ObUserAuth + entityType protocol.ObTableEntityType // for odp client odpTable *route.ObTable @@ -111,6 +112,14 @@ func newOdpClient( return cli, nil } +func (c *obClient) SetEntityType(entityType protocol.ObTableEntityType) { + c.entityType = entityType +} + +func (c *obClient) GetEntityType() protocol.ObTableEntityType { + return c.entityType +} + func (c *obClient) String() string { var configStr = "nil" if c.config != nil { @@ -677,6 +686,7 @@ func (c *obClient) execute( c.config.OperationTimeOut, c.GetRpcFlag(), ) + request.SetEntityType(c.entityType) if err != nil { log.Error("Runtime", ctx.Value(log.ObkvTraceIdName), "error occur in execute", log.Int64("opType", int64(opType)), log.String("tableName", tableName), log.String("tableParam", tableParam.String())) @@ -769,6 +779,7 @@ func (c *obClient) executeWithFilter( c.config.OperationTimeOut, c.GetRpcFlag(), ) + request.SetEntityType(c.entityType) if err != nil { return nil, errors.WithMessagef(err, "new operation request, tableName:%s, tableParam:%s, opType:%d", tableName, tableParam.String(), opType), needRetry diff --git a/client/query.go b/client/query.go index 0fc8b2d..7aa737a 100644 --- a/client/query.go +++ b/client/query.go @@ -47,7 +47,7 @@ func newObQueryExecutorWithParams(tableName string, cli *obClient) *obQueryExecu cli: cli, keyRanges: nil, tableQuery: protocol.NewObTableQueryWithParams(-1), - entityType: protocol.ObTableEntityTypeDynamic, + entityType: cli.entityType, } } @@ -93,15 +93,19 @@ func (q *obQueryExecutor) setQueryOptions(queryOptions *option.ObQueryOptions) { q.entityType = protocol.ObTableEntityTypeHKV } else { q.tableQuery.SetIsHbaseQuery(queryOptions.IsHbaseQuery) - switch queryOptions.KeyValueMode { - case table.DynamicMode: - q.entityType = protocol.ObTableEntityTypeDynamic - case table.ObTableMode: - q.entityType = protocol.ObTableEntityTypeKV - case table.ObHBaseMode: - q.entityType = protocol.ObTableEntityTypeHKV - default: - q.entityType = protocol.ObTableEntityTypeDynamic + if q.entityType == protocol.ObTableEntityTypeDynamic { + switch queryOptions.KeyValueMode { + case table.DynamicMode: + q.entityType = protocol.ObTableEntityTypeDynamic + case table.ObTableMode: + q.entityType = protocol.ObTableEntityTypeKV + case table.ObHBaseMode: + q.entityType = protocol.ObTableEntityTypeHKV + case table.ObRedisMode: + q.entityType = protocol.ObTableEntityTypeRedis + default: + q.entityType = protocol.ObTableEntityTypeDynamic + } } } } diff --git a/protocol/constants.go b/protocol/constants.go index 56884c2..12ff59a 100644 --- a/protocol/constants.go +++ b/protocol/constants.go @@ -30,6 +30,7 @@ const ( ObTableEntityTypeDynamic ObTableEntityType = iota ObTableEntityTypeKV ObTableEntityTypeHKV + ObTableEntityTypeRedis ) type ObScanOrder uint8 diff --git a/table/kv_mode.go b/table/kv_mode.go index 9e4b1c6..dcf3724 100644 --- a/table/kv_mode.go +++ b/table/kv_mode.go @@ -23,4 +23,5 @@ const ( DynamicMode ObKeyValueMode = iota ObTableMode ObHBaseMode + ObRedisMode )