Skip to content

Commit

Permalink
save log into file in go client
Browse files Browse the repository at this point in the history
  • Loading branch information
foronedream committed Jan 17, 2024
1 parent 250df4a commit 2a0f71b
Show file tree
Hide file tree
Showing 24 changed files with 511 additions and 59 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.DS_Store
.idea
.vscode
./logs
33 changes: 32 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ package client

import (
"context"

"github.com/pkg/errors"

"github.com/oceanbase/obkv-table-client-go/client/option"
"github.com/oceanbase/obkv-table-client-go/config"
"github.com/oceanbase/obkv-table-client-go/log"
"github.com/oceanbase/obkv-table-client-go/table"
)

Expand All @@ -40,6 +40,11 @@ func NewClient(
sysUserName string,
sysPassWord string,
cliConfig *config.ClientConfig) (Client, error) {
// init log
err := initLogProcess(cliConfig)
if err != nil {
return nil, err
}
// 1. Check args
if configUrl == "" {
return nil, errors.New("config url is empty")
Expand Down Expand Up @@ -81,6 +86,12 @@ func NewOdpClient(
odpRpcPort int,
database string,
cliConfig *config.ClientConfig) (Client, error) {
// init log
err := initLogProcess(cliConfig)
if err != nil {
return nil, err
}

// 1. Check args
if fullUserName == "" {
return nil, errors.New("full user name is null")
Expand Down Expand Up @@ -111,6 +122,11 @@ func NewClientWithTomlConfig(configFilePath string) (Client, error) {
if err != nil {
return nil, errors.WithMessagef(err, "get client config from toml, configFilePath:%s", configFilePath)
}
// init log
err = initLogProcess(clientConfig.GetClientConfig())
if err != nil {
return nil, err
}
switch clientConfig.Mode {
case "direct":
return NewClient(
Expand All @@ -135,6 +151,21 @@ func NewClientWithTomlConfig(configFilePath string) (Client, error) {
}
}

func initLogProcess(c *config.ClientConfig) error {
var logConfig log.LogConfig
if c != nil {
logConfig.LogFileName = c.LogFileName
logConfig.MaxAgeFileRem = c.MaxAgeFileRem
logConfig.MaxBackupFileSize = c.MaxBackupFileSize
logConfig.SingleFileMaxSize = c.SingleFileMaxSize
logConfig.Compress = c.Compress
logConfig.SlowQueryThreshold = c.SlowQueryThreshold
} else {
return errors.New("client config is null")
}
return log.InitLoggerWithFile(logConfig)
}

type Client interface {
// Insert a record by rowKey.
Insert(ctx context.Context, tableName string, rowKey []*table.Column, mutateColumns []*table.Column, opts ...option.ObOperationOption) (int64, error)
Expand Down
4 changes: 2 additions & 2 deletions client/obbatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (b *obBatchExecutor) executeInternal(ctx context.Context) (BatchOperationRe
defer wg.Done()
err, partNeedRetry := b.partitionExecute(ctx, partOp, res)
if err != nil {
log.Warn("failed to execute partition operations", log.String("partOp", partOp.String()), log.String("err", err.Error()))
log.Warn("Runtime", ctx.Value(log.ObkvTraceId), "failed to execute partition operations", log.String("partOp", partOp.String()), log.String("err", err.Error()))
errArrLock.Lock()
errArr = append(errArr, err)
if needRetry == false && partNeedRetry {
Expand All @@ -321,7 +321,7 @@ func (b *obBatchExecutor) executeInternal(ctx context.Context) (BatchOperationRe
}
wg.Wait()
if len(errArr) != 0 {
log.Warn("error occur when execute partition operations")
log.Warn("Runtime", ctx.Value(log.ObkvTraceId), "error occur when execute partition operations")
return nil, errArr[0], needRetry
}
} else {
Expand Down
36 changes: 36 additions & 0 deletions client/obclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ package client

import (
"context"
"fmt"
"github.com/oceanbase/obkv-table-client-go/log"
"golang.org/x/sys/unix"
"runtime"
"strings"
"time"

Expand Down Expand Up @@ -271,6 +275,7 @@ func (c *obClient) Insert(
rowKey []*table.Column,
mutateColumns []*table.Column,
opts ...option.ObOperationOption) (int64, error) {
log.InitTraceId(&ctx)
operationOptions := c.getOperationOptions(opts...)
if operationOptions.TableFilter == nil {
res, err := c.executeWithRetry(
Expand Down Expand Up @@ -305,6 +310,7 @@ func (c *obClient) Update(
rowKey []*table.Column,
mutateColumns []*table.Column,
opts ...option.ObOperationOption) (int64, error) {
log.InitTraceId(&ctx)
operationOptions := c.getOperationOptions(opts...)
if operationOptions.TableFilter == nil {
res, err := c.executeWithRetry(
Expand Down Expand Up @@ -339,6 +345,7 @@ func (c *obClient) InsertOrUpdate(
rowKey []*table.Column,
mutateColumns []*table.Column,
opts ...option.ObOperationOption) (int64, error) {
log.InitTraceId(&ctx)
operationOptions := c.getOperationOptions(opts...)
res, err := c.executeWithRetry(
ctx,
Expand All @@ -359,6 +366,7 @@ func (c *obClient) Replace(
rowKey []*table.Column,
mutateColumns []*table.Column,
opts ...option.ObOperationOption) (int64, error) {
log.InitTraceId(&ctx)
operationOptions := c.getOperationOptions(opts...)
res, err := c.executeWithRetry(
ctx,
Expand All @@ -379,6 +387,7 @@ func (c *obClient) Increment(
rowKey []*table.Column,
mutateColumns []*table.Column,
opts ...option.ObOperationOption) (SingleResult, error) {
log.InitTraceId(&ctx)
operationOptions := c.getOperationOptions(opts...)
if operationOptions.TableFilter == nil {
res, err := c.executeWithRetry(
Expand Down Expand Up @@ -413,6 +422,7 @@ func (c *obClient) Append(
rowKey []*table.Column,
mutateColumns []*table.Column,
opts ...option.ObOperationOption) (SingleResult, error) {
log.InitTraceId(&ctx)
operationOptions := c.getOperationOptions(opts...)
if operationOptions.TableFilter == nil {
res, err := c.executeWithRetry(
Expand Down Expand Up @@ -446,6 +456,7 @@ func (c *obClient) Delete(
tableName string,
rowKey []*table.Column,
opts ...option.ObOperationOption) (int64, error) {
log.InitTraceId(&ctx)
operationOptions := c.getOperationOptions(opts...)
if operationOptions.TableFilter == nil {
res, err := c.executeWithRetry(
Expand Down Expand Up @@ -480,6 +491,7 @@ func (c *obClient) Get(
rowKey []*table.Column,
getColumns []string,
opts ...option.ObOperationOption) (SingleResult, error) {
log.InitTraceId(&ctx)
var columns = make([]*table.Column, 0, len(getColumns))
for _, columnName := range getColumns {
columns = append(columns, table.NewColumn(columnName, nil))
Expand All @@ -499,6 +511,7 @@ func (c *obClient) Get(
}

func (c *obClient) Query(ctx context.Context, tableName string, rangePairs []*table.RangePair, opts ...option.ObQueryOption) (QueryResultIterator, error) {
log.InitTraceId(&ctx)
queryOpts := c.getObQueryOptions(opts...)
queryExecutor := newObQueryExecutorWithParams(tableName, c)
queryExecutor.addKeyRanges(rangePairs)
Expand Down Expand Up @@ -533,6 +546,7 @@ func (c *obClient) Close() {
if c.odpTable != nil {
c.odpTable.Close()
}
_ = log.Sync()
}

func (c *obClient) getOperationOptions(opts ...option.ObOperationOption) *option.ObOperationOptions {
Expand Down Expand Up @@ -601,6 +615,8 @@ func (c *obClient) execute(
// 1. Get table route
tableParam, err := c.GetTableParam(ctx, tableName, rowKey)
if err != nil {
log.Error("Runtime", ctx.Value(log.ObkvTraceId), "error occur in execute",
log.Int64("opType", int64(opType)), log.String("tableName", tableName), log.String("tableParam", tableParam.String()))
return nil, errors.WithMessagef(err, "get table param, tableName:%s, opType:%d", tableName, opType), needRetry
}

Expand All @@ -618,6 +634,8 @@ func (c *obClient) execute(
c.GetRpcFlag(),
)
if err != nil {
log.Error("Runtime", ctx.Value(log.ObkvTraceId), "error occur in execute",
log.Int64("opType", int64(opType)), log.String("tableName", tableName), log.String("tableParam", tableParam.String()))
return nil, errors.WithMessagef(err, "new operation request, tableName:%s, tableParam:%s, opType:%d",
tableName, tableParam.String(), opType), needRetry
}
Expand All @@ -626,10 +644,14 @@ func (c *obClient) execute(
result := protocol.NewObTableOperationResponse()
err, needRetry = c.executeInternal(ctx, tableName, tableParam.Table(), request, result)
if err != nil {
trace := fmt.Sprintf("Y%X-%016X", result.UniqueId(), result.Sequence())
log.Error("Runtime", ctx.Value(log.ObkvTraceId), "error occur in execute", log.String("observerTraceId", trace))
return nil, err, needRetry
}

if oberror.ObErrorCode(result.Header().ErrorNo()) != oberror.ObSuccess {
trace := fmt.Sprintf("Y%X-%016X", result.UniqueId(), result.Sequence())
log.Error("Runtime", ctx.Value(log.ObkvTraceId), "error occur in execute", log.String("observerTraceId", trace))
return nil, protocol.NewProtocolError(
result.RemoteAddr().String(),
oberror.ObErrorCode(result.Header().ErrorNo()),
Expand Down Expand Up @@ -751,3 +773,17 @@ func (c *obClient) GetTableParam(

return c.routeInfo.GetTableParam(ctx, tableName, rowKey)
}

func MonitorSlowQuery(executeTime int64, slowQueryThreshold int64, tableName string, clientTraceId any) {
if executeTime > slowQueryThreshold {
pId := unix.Getpid()
buf := make([]byte, 64)
n := runtime.Stack(buf, false)
id := buf[:n]
var goroutineID uint64
fmt.Sscanf(string(id), "goroutine %d", &goroutineID)
log.Info("Monitor", clientTraceId, "SlowQuery", log.String("tableName", tableName),
log.Int64("executeTime", executeTime), log.Int64("slowQueryThreshold", slowQueryThreshold),
log.Int("pId", pId), log.Uint64("goroutineID", goroutineID))
}
}
6 changes: 5 additions & 1 deletion client/query_result_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package client

import (
"context"
"github.com/oceanbase/obkv-table-client-go/log"
"sync"
"time"

Expand Down Expand Up @@ -88,7 +89,7 @@ func (q *ObQueryResultIterator) Next() (QueryResult, error) {
if err != nil {
return nil, err
}

var startTime int64 = time.Now().UnixMilli()
// lock
q.lock.Lock()
defer q.lock.Unlock()
Expand Down Expand Up @@ -117,6 +118,9 @@ func (q *ObQueryResultIterator) Next() (QueryResult, error) {

// get next row from next server
err = q.fetchNextWithRetry(false)
endTime := time.Now().UnixMilli()
var duration int64 = endTime - startTime
MonitorSlowQuery(duration, log.SlowQueryThreshold, q.queryExecutor.tableName, q.ctx.Value(log.ObkvTraceId))
if err != nil {
return nil, err
}
Expand Down
20 changes: 20 additions & 0 deletions config/client_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ type ClientConfig struct {
// connection rebalance in ODP mode
MaxConnectionAge time.Duration
EnableSLBLoadBalance bool

// log config
LogFileName string // log file dir
SingleFileMaxSize int // log file size(MB)
MaxBackupFileSize int // Maximum number of old files to keep
MaxAgeFileRem int // Maximum number of days to keep old files
Compress bool // Whether to compress/archive old files
SlowQueryThreshold int64 // Slow query threshold
}

func NewDefaultClientConfig() *ClientConfig {
Expand All @@ -71,6 +79,12 @@ func NewDefaultClientConfig() *ClientConfig {
EnableRerouting: true,
MaxConnectionAge: time.Duration(0) * time.Second, // valid iff > 0
EnableSLBLoadBalance: false,
LogFileName: ".",
SingleFileMaxSize: 256, // MB
MaxBackupFileSize: 10,
MaxAgeFileRem: 30,
Compress: false,
SlowQueryThreshold: 40, // ms
}
}

Expand All @@ -94,5 +108,11 @@ func (c *ClientConfig) String() string {
"EnableRerouting:" + strconv.FormatBool(c.EnableRerouting) + ", " +
"MaxConnectionAge:" + c.MaxConnectionAge.String() + ", " +
"EnableSLBLoadBalance:" + strconv.FormatBool(c.EnableSLBLoadBalance) +
"LogFileName:" + c.LogFileName + "," +
"SingleFileMaxSize:" + strconv.Itoa(c.SingleFileMaxSize) + "MB," +
"MaxBackupFileSize:" + strconv.Itoa(c.MaxBackupFileSize) + "," +
"MaxAgeFileRem:" + strconv.Itoa(c.MaxAgeFileRem) + "," +
"Compress:" + strconv.FormatBool(c.Compress) + "," +
"SlowQueryThreshold:" + strconv.Itoa(int(c.SlowQueryThreshold)) +
"}"
}
24 changes: 24 additions & 0 deletions config/toml_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type ClientConfiguration struct {
RouteMetaDataConfig RouteMetaDataConfig
RsListConfig RsListConfig
ExtraConfig ExtraConfig
LogConfig LogConfig
Mode string
}

Expand Down Expand Up @@ -88,6 +89,15 @@ type ExtraConfig struct {
EnableSLBLoadBalance bool
}

type LogConfig struct {
LogFileName string // log file dir
SingleFileMaxSize int // log file size(MB)
MaxBackupFileSize int // Maximum number of old files to keep
MaxAgeFileRem int // Maximum number of days to keep old files
Compress bool // Whether to compress/archive old files
SlowQueryThreshold int64 // Slow query threshold
}

func (c *ClientConfiguration) checkClientConfiguration() error {
if c.Mode == "direct" {
if c.DirectClientConfig.ConfigUrl == "" {
Expand All @@ -109,6 +119,14 @@ func (c *ClientConfiguration) checkClientConfiguration() error {
} else if c.OdpClientConfig.FullUserName == "" {
return errors.New("full user name is empty")
}
} else if c.Mode == "log" {
if c.LogConfig.LogFileName == "." {
return errors.New("should set log file name in toml config")
} else if c.LogConfig.SingleFileMaxSize == 0 {
return errors.New("single file maxSize is invalid")
} else if c.LogConfig.SlowQueryThreshold == 0 {
return errors.New("slow query threshold is invalid")
}
} else {
return errors.New("mode is invalid")
}
Expand Down Expand Up @@ -151,6 +169,12 @@ func (c *ClientConfiguration) GetClientConfig() *ClientConfig {
EnableRerouting: c.ExtraConfig.EnableRerouting,
MaxConnectionAge: time.Duration(c.ExtraConfig.MaxConnectionAge) * time.Millisecond,
EnableSLBLoadBalance: c.ExtraConfig.EnableSLBLoadBalance,
LogFileName: c.LogConfig.LogFileName,
SingleFileMaxSize: c.LogConfig.SingleFileMaxSize, // MB
MaxBackupFileSize: c.LogConfig.MaxBackupFileSize,
MaxAgeFileRem: c.LogConfig.MaxAgeFileRem,
Compress: c.LogConfig.Compress,
SlowQueryThreshold: c.LogConfig.SlowQueryThreshold, // ms
}
}

Expand Down
11 changes: 10 additions & 1 deletion configurations/obkv-table-default.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Client Mode - `direct` or `proxy`
mode = "proxy"
# log is not use toml config
mode = "log"

## Direct Mode
[DirectClientConfig]
Expand Down Expand Up @@ -50,3 +51,11 @@ LogLevel = "info"
EnableRerouting = false
MaxConnectionAge = 0
EnableSLBLoadBalance = false

[LogConfig]
LogFileName = "."
SingleFileMaxSize = 256 ## MB
MaxBackupFileSize = 10 ## 0 is not delete
MaxAgeFileRem = 30 ## 30 day
Compress = false ## default not
SlowQueryThreshold = 40 ## ms
7 changes: 7 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,19 @@ require (

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fatih/color v1.16.0 // indirect
github.com/frankban/quicktest v1.14.6 // indirect
github.com/google/uuid v1.5.0 // indirect
github.com/jessevdk/go-flags v1.5.0 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/naoina/go-stringutil v0.1.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/uber/go-torch v0.0.0-20181107071353-86f327cc820e // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/sys v0.15.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit 2a0f71b

Please sign in to comment.