Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add redis entity_type #165

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/batch_result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

func TestObBatchOperationResult_Size(t *testing.T) {
result := newObSingleResult(1, nil)
result := newObSingleResult(1, nil, 0)
emptyResult := make([]SingleResult, 0)
batchResult := newObBatchOperationResult(emptyResult)
assert.EqualValues(t, 0, batchResult.Size())
Expand Down
9 changes: 9 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package client

import (
"context"
"github.com/oceanbase/obkv-table-client-go/protocol"

"github.com/pkg/errors"

"github.com/oceanbase/obkv-table-client-go/client/option"
Expand Down Expand Up @@ -174,6 +176,10 @@ type Client interface {
// InsertOrUpdate insert or update a record by rowKey.
// insert if the primary key does not exist, update if it does.
InsertOrUpdate(ctx context.Context, tableName string, rowKey []*table.Column, mutateColumns []*table.Column, opts ...option.ObOperationOption) (int64, error)
// InsertOrUpdateWithResult insert or update a record by rowKey.
// insert if the primary key does not exist, update if it does.
// IsInsertOrUpdateDoInsert() in SingleResult tells whether the insert operation has been performed.
InsertOrUpdateWithResult(ctx context.Context, tableName string, rowKey []*table.Column, mutateColumns []*table.Column, opts ...option.ObOperationOption) (SingleResult, error)
// Replace a record by rowKey.
Replace(ctx context.Context, tableName string, rowKey []*table.Column, mutateColumns []*table.Column, opts ...option.ObOperationOption) (int64, error)
// Increment a record by rowKey.
Expand All @@ -193,4 +199,7 @@ type Client interface {
// Close closes the client.
// close will disconnect all connections and exit all goroutines.
Close()
Redis(ctx context.Context, tableName string, rowKey []*table.Column, mutateColumns []*table.Column, opts ...option.ObOperationOption) (SingleResult, error)
SetEntityType(entityType protocol.ObTableEntityType)
GetEntityType() protocol.ObTableEntityType
}
26 changes: 15 additions & 11 deletions client/obbatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func newObBatchExecutor(tableName string, cli *obClient) *obBatchExecutor {
cli: cli,
rowKeyName: nil,
samePropertiesNames: false,
entityType: protocol.ObTableEntityTypeDynamic,
entityType: cli.entityType,
}
}

Expand All @@ -57,15 +57,19 @@ type obBatchExecutor struct {

func (b *obBatchExecutor) setBatchOptions(batchOptions *option.ObBatchOptions) {
b.samePropertiesNames = batchOptions.SamePropertiesNames
switch batchOptions.KeyValueMode {
case table.DynamicMode:
b.entityType = protocol.ObTableEntityTypeDynamic
case table.ObTableMode:
b.entityType = protocol.ObTableEntityTypeKV
case table.ObHBaseMode:
b.entityType = protocol.ObTableEntityTypeHKV
default:
b.entityType = protocol.ObTableEntityTypeDynamic
if b.entityType == protocol.ObTableEntityTypeDynamic {
switch batchOptions.KeyValueMode {
case table.DynamicMode:
b.entityType = protocol.ObTableEntityTypeDynamic
case table.ObTableMode:
b.entityType = protocol.ObTableEntityTypeKV
case table.ObHBaseMode:
b.entityType = protocol.ObTableEntityTypeHKV
case table.ObRedisMode:
b.entityType = protocol.ObTableEntityTypeRedis
default:
b.entityType = protocol.ObTableEntityTypeDynamic
}
}
}

Expand Down Expand Up @@ -413,7 +417,7 @@ func (b *obBatchExecutor) Execute(ctx context.Context) (BatchOperationResult, er

// operationResponse2SingleResult convert operation response to single result.
func operationResponse2SingleResult(res *protocol.ObTableOperationResponse) SingleResult {
return newObSingleResult(res.AffectedRows(), res.Entity())
return newObSingleResult(res.AffectedRows(), res.Entity(), res.Flags())
}

type obPartOp struct {
Expand Down
81 changes: 70 additions & 11 deletions client/obclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ package client
import (
"context"
"fmt"
"github.com/oceanbase/obkv-table-client-go/log"
"golang.org/x/sys/unix"
"runtime"
"strings"
"time"

"golang.org/x/sys/unix"

"github.com/oceanbase/obkv-table-client-go/log"

"github.com/pkg/errors"

"github.com/oceanbase/obkv-table-client-go/client/option"
Expand All @@ -48,6 +50,7 @@ type obClient struct {
password string
database string
sysUA *route.ObUserAuth
entityType protocol.ObTableEntityType

// for odp client
odpTable *route.ObTable
Expand Down Expand Up @@ -109,6 +112,14 @@ func newOdpClient(
return cli, nil
}

func (c *obClient) SetEntityType(entityType protocol.ObTableEntityType) {
c.entityType = entityType
}

func (c *obClient) GetEntityType() protocol.ObTableEntityType {
return c.entityType
}

func (c *obClient) String() string {
var configStr = "nil"
if c.config != nil {
Expand Down Expand Up @@ -360,6 +371,27 @@ func (c *obClient) InsertOrUpdate(
return res.AffectedRows(), nil
}

func (c *obClient) InsertOrUpdateWithResult(
ctx context.Context,
tableName string,
rowKey []*table.Column,
mutateColumns []*table.Column,
opts ...option.ObOperationOption) (SingleResult, error) {
operationOptions := c.getOperationOptions(opts...)
res, err := c.executeWithRetry(
ctx,
tableName,
protocol.ObTableOperationInsertOrUpdate,
rowKey,
mutateColumns,
operationOptions)
if err != nil {
return nil, errors.WithMessagef(err, "execute insert or update, tableName:%s, rowKey:%s, mutateColumns:%s",
tableName, table.ColumnsToString(rowKey), table.ColumnsToString(mutateColumns))
}
return newObSingleResult(res.AffectedRows(), nil, res.Flags()), nil
}

func (c *obClient) Replace(
ctx context.Context,
tableName string,
Expand Down Expand Up @@ -400,7 +432,7 @@ func (c *obClient) Increment(
if err != nil {
return nil, err
}
return newObSingleResult(res.AffectedRows(), res.Entity()), nil
return newObSingleResult(res.AffectedRows(), res.Entity(), res.Flags()), nil
} else {
res, err := c.executeWithFilterAndRetry(
ctx,
Expand All @@ -412,7 +444,7 @@ func (c *obClient) Increment(
if err != nil {
return nil, err
}
return newObSingleResult(res.AffectedRows(), nil), nil
return newObSingleResult(res.AffectedRows(), nil, 0), nil
}
}

Expand All @@ -435,7 +467,7 @@ func (c *obClient) Append(
if err != nil {
return nil, err
}
return newObSingleResult(res.AffectedRows(), res.Entity()), nil
return newObSingleResult(res.AffectedRows(), res.Entity(), res.Flags()), nil
} else {
res, err := c.executeWithFilterAndRetry(
ctx,
Expand All @@ -447,7 +479,7 @@ func (c *obClient) Append(
if err != nil {
return nil, err
}
return newObSingleResult(res.AffectedRows(), nil), nil
return newObSingleResult(res.AffectedRows(), nil, 0), nil
}
}

Expand Down Expand Up @@ -507,7 +539,28 @@ func (c *obClient) Get(
if err != nil {
return nil, err
}
return newObSingleResult(res.AffectedRows(), res.Entity()), nil
return newObSingleResult(res.AffectedRows(), res.Entity(), res.Flags()), nil
}

func (c *obClient) Redis(
ctx context.Context,
tableName string,
rowKey []*table.Column,
mutateColumns []*table.Column,
opts ...option.ObOperationOption) (SingleResult, error) {
log.InitTraceId(&ctx)
operationOptions := c.getOperationOptions(opts...)
res, err := c.executeWithRetry(
ctx,
tableName,
protocol.ObTableOperationRedis,
rowKey,
mutateColumns,
operationOptions)
if err != nil {
return nil, err
}
return newObSingleResult(res.AffectedRows(), res.Entity(), res.Flags()), nil
}

func (c *obClient) Query(ctx context.Context, tableName string, rangePairs []*table.RangePair, opts ...option.ObQueryOption) (QueryResultIterator, error) {
Expand Down Expand Up @@ -616,7 +669,7 @@ func (c *obClient) execute(
tableParam, err := c.GetTableParam(ctx, tableName, rowKey)
if err != nil {
log.Error("Runtime", ctx.Value(log.ObkvTraceIdName), "error occur in execute",
log.Int64("opType", int64(opType)), log.String("tableName", tableName), log.String("tableParam", tableParam.String()))
log.Int64("opType", int64(opType)), log.String("tableName", tableName))
return nil, errors.WithMessagef(err, "get table param, tableName:%s, opType:%d", tableName, opType), needRetry
}

Expand All @@ -633,6 +686,7 @@ func (c *obClient) execute(
c.config.OperationTimeOut,
c.GetRpcFlag(),
)
request.SetEntityType(c.entityType)
if err != nil {
log.Error("Runtime", ctx.Value(log.ObkvTraceIdName), "error occur in execute",
log.Int64("opType", int64(opType)), log.String("tableName", tableName), log.String("tableParam", tableParam.String()))
Expand All @@ -645,15 +699,19 @@ func (c *obClient) execute(
err, needRetry = c.executeInternal(ctx, tableName, tableParam.Table(), request, result)
if err != nil {
trace := fmt.Sprintf("Y%X-%016X", result.UniqueId(), result.Sequence())
log.Error("Runtime", ctx.Value(log.ObkvTraceIdName), "error occur in execute", log.String("observerTraceId", trace))
addr := fmt.Sprintf("%s:%d", tableParam.Table().Ip(), tableParam.Table().Port())
log.Error("Runtime", ctx.Value(log.ObkvTraceIdName), "error occur in execute", log.String("observerTraceId", trace),
log.String("remote addr", addr))
return nil, err, needRetry
}

if oberror.ObErrorCode(result.Header().ErrorNo()) != oberror.ObSuccess {
trace := fmt.Sprintf("Y%X-%016X", result.UniqueId(), result.Sequence())
log.Error("Runtime", ctx.Value(log.ObkvTraceIdName), "error occur in execute", log.String("observerTraceId", trace))
addr := fmt.Sprintf("%s:%d", tableParam.Table().Ip(), tableParam.Table().Port())
log.Error("Runtime", ctx.Value(log.ObkvTraceIdName), "error occur in execute", log.String("observerTraceId", trace),
log.String("remote addr", addr))
return nil, protocol.NewProtocolError(
result.RemoteAddr().String(),
addr,
oberror.ObErrorCode(result.Header().ErrorNo()),
result.Header().Msg(),
result.Sequence(),
Expand Down Expand Up @@ -721,6 +779,7 @@ func (c *obClient) executeWithFilter(
c.config.OperationTimeOut,
c.GetRpcFlag(),
)
request.SetEntityType(c.entityType)
if err != nil {
return nil, errors.WithMessagef(err, "new operation request, tableName:%s, tableParam:%s, opType:%d",
tableName, tableParam.String(), opType), needRetry
Expand Down
2 changes: 1 addition & 1 deletion client/obclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestObClient_String(t *testing.T) {
cfg := config.NewDefaultClientConfig()
cli, err := newObClient(testConfigUrl, testFullUserName, testPassWord, testSysUserName, testSysPassWord, cfg)
assert.Equal(t, nil, err)
assert.Equal(t, "obClient{config:ClientConfig{ConnPoolMaxConnSize:1, ConnConnectTimeOut:1s, ConnLoginTimeout:5s, OperationTimeOut:10s, LogLevel:1, TableEntryRefreshLockTimeout:4s, TableEntryRefreshTryTimes:3, TableEntryRefreshIntervalBase:100ms, TableEntryRefreshIntervalCeiling:1.6s, MetadataRefreshInterval:1m0s, MetadataRefreshLockTimeout:8s, RsListLocalFileLocation:, RsListHttpGetTimeout:1s, RsListHttpGetRetryTimes:3, RsListHttpGetRetryInterval:100ms, EnableRerouting:true, MaxConnectionAge:0s, EnableSLBLoadBalance:false}, configUrl:http://127.0.0.1:8080/services?User_ID=xxx&UID=xxx&Action=ObRootServiceInfo&ObCluster=xxx&database=xxx, fullUserName:user@mysql#obkv_cluster, userName:user, tenantName:mysql, clusterName:obkv_cluster, database:xxx, sysUA:ObUserAuth{userName:sys, password:}}", cli.String())
assert.Equal(t, "obClient{config:ClientConfig{ConnPoolMaxConnSize:1, ConnConnectTimeOut:1s, ConnLoginTimeout:5s, OperationTimeOut:10s, LogLevel:1, TableEntryRefreshLockTimeout:4s, TableEntryRefreshTryTimes:3, TableEntryRefreshIntervalBase:100ms, TableEntryRefreshIntervalCeiling:1.6s, MetadataRefreshInterval:1m0s, MetadataRefreshLockTimeout:8s, RsListLocalFileLocation:, RsListHttpGetTimeout:1s, RsListHttpGetRetryTimes:3, RsListHttpGetRetryInterval:100ms, EnableRerouting:true, MaxConnectionAge:0s, EnableSLBLoadBalance:false}, configUrl:http://127.0.0.1:8080/services?User_ID=xxx&UID=xxx&Action=ObRootServiceInfo&ObCluster=xxx&database=xxx, fullUserName:user@mysql#obkv_cluster, userName:user, tenantName:mysql, clusterName:obkv_cluster, database:xxx, sysUA:ObUserAuth{userName:sys, password:[invisible]}}", cli.String())
}

func TestObClient_parseFullUserName(t *testing.T) {
Expand Down
24 changes: 14 additions & 10 deletions client/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func newObQueryExecutorWithParams(tableName string, cli *obClient) *obQueryExecu
cli: cli,
keyRanges: nil,
tableQuery: protocol.NewObTableQueryWithParams(-1),
entityType: protocol.ObTableEntityTypeDynamic,
entityType: cli.entityType,
}
}

Expand Down Expand Up @@ -93,15 +93,19 @@ func (q *obQueryExecutor) setQueryOptions(queryOptions *option.ObQueryOptions) {
q.entityType = protocol.ObTableEntityTypeHKV
} else {
q.tableQuery.SetIsHbaseQuery(queryOptions.IsHbaseQuery)
switch queryOptions.KeyValueMode {
case table.DynamicMode:
q.entityType = protocol.ObTableEntityTypeDynamic
case table.ObTableMode:
q.entityType = protocol.ObTableEntityTypeKV
case table.ObHBaseMode:
q.entityType = protocol.ObTableEntityTypeHKV
default:
q.entityType = protocol.ObTableEntityTypeDynamic
if q.entityType == protocol.ObTableEntityTypeDynamic {
switch queryOptions.KeyValueMode {
case table.DynamicMode:
q.entityType = protocol.ObTableEntityTypeDynamic
case table.ObTableMode:
q.entityType = protocol.ObTableEntityTypeKV
case table.ObHBaseMode:
q.entityType = protocol.ObTableEntityTypeHKV
case table.ObRedisMode:
q.entityType = protocol.ObTableEntityTypeRedis
default:
q.entityType = protocol.ObTableEntityTypeDynamic
}
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion client/query_result_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ func (q *ObQueryResultIterator) fetchNextWithRetry(hasPrev bool) error {
return errors.WithMessage(err, "retry and timeout")
default:
// get and set route table again
targetParts, err := q.queryExecutor.getTableParams(q.ctx, q.queryExecutor.tableName, q.queryExecutor.keyRanges)
var targetParts []*route.ObTableParam
targetParts, err = q.queryExecutor.getTableParams(q.ctx, q.queryExecutor.tableName, q.queryExecutor.keyRanges)
if err != nil {
return err
}
Expand Down
25 changes: 23 additions & 2 deletions client/single_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,24 @@ type SingleResult interface {
Values() map[string]interface{}
// RowKey get affected rowkey, only work in Increment and Append operation.
RowKey() []interface{}
// IsInsertOrUpdateDoInsert InsertOrUpdate impl maybe do insert or do update,
// IsInsertOrUpdateDoInsert() to tell you which operation has done.
IsInsertOrUpdateDoInsert() bool
// IsInsertOrUpdateDoUpdate InsertOrUpdate impl maybe do insert or do update,
// IsInsertOrUpdateDoUpdate() to tell you which operation has done.
IsInsertOrUpdateDoUpdate() bool
// IsInsertOrUpdateDoPut InsertOrUpdate impl maybe do put when all columns are filled.
IsInsertOrUpdateDoPut() bool
}

func newObSingleResult(affectedRows int64, affectedEntity *protocol.ObTableEntity) *obSingleResult {
return &obSingleResult{affectedRows, affectedEntity}
func newObSingleResult(affectedRows int64, affectedEntity *protocol.ObTableEntity, flags uint64) *obSingleResult {
return &obSingleResult{affectedRows, affectedEntity, flags}
}

type obSingleResult struct {
affectedRows int64
affectedEntity *protocol.ObTableEntity
flags uint64
}

func (r *obSingleResult) IsEmptySet() bool {
Expand Down Expand Up @@ -105,3 +114,15 @@ func (r *obSingleResult) RowKey() []interface{} {
}
return res
}

func (r *obSingleResult) IsInsertOrUpdateDoInsert() bool {
return r.flags&protocol.IsInsertUpDoInsertMask != 0
}

func (r *obSingleResult) IsInsertOrUpdateDoUpdate() bool {
return !r.IsInsertOrUpdateDoInsert() && !r.IsInsertOrUpdateDoPut()
}

func (r *obSingleResult) IsInsertOrUpdateDoPut() bool {
return r.flags&protocol.IsInsertUpDoPutMask != 0
}
2 changes: 1 addition & 1 deletion client/single_result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

func TestSingleResult(t *testing.T) {
res := newObSingleResult(1, nil)
res := newObSingleResult(1, nil, 0)
assert.EqualValues(t, 1, res.AffectedRows())
assert.Equal(t, nil, res.Value("c1"))

Expand Down
2 changes: 2 additions & 0 deletions error/error_code.go
Original file line number Diff line number Diff line change
Expand Up @@ -2154,6 +2154,7 @@ const (
ObKvColumnTypeNotMatch ObErrorCode = -10511
ObKvCollationMismatch ObErrorCode = -10512
ObKvScanRangeMissing ObErrorCode = -10513
ObKvRedisParseError ObErrorCode = -10515
ObErrValuesClauseNeedHaveColumn ObErrorCode = -11000
ObErrValuesClauseCannotUseDefaultValues ObErrorCode = -11001
ObWrongPartitionName ObErrorCode = -11002
Expand Down Expand Up @@ -4303,6 +4304,7 @@ var ObErrorNames = map[ObErrorCode]string{
ObKvColumnTypeNotMatch: "ObKvColumnTypeNotMatch",
ObKvCollationMismatch: "ObKvCollationMismatch",
ObKvScanRangeMissing: "ObKvScanRangeMissing",
ObKvRedisParseError: "ObKvRedisParseError",
ObErrValuesClauseNeedHaveColumn: "ObErrValuesClauseNeedHaveColumn",
ObErrValuesClauseCannotUseDefaultValues: "ObErrValuesClauseCannotUseDefaultValues",
ObWrongPartitionName: "ObWrongPartitionName",
Expand Down
Loading
Loading