From bf382740649e48bde57dec86f7dac6e0eab4b5bb Mon Sep 17 00:00:00 2001 From: zhenghaoz Date: Sat, 12 Oct 2024 12:58:58 +0000 Subject: [PATCH] cache: upgrade redis client version --- base/floats/floats_avx.go | 1 + base/floats/floats_avx512.go | 1 + base/floats/floats_neon.go | 1 + client/client_test.go | 2 +- go.mod | 3 +- go.sum | 14 +-- server/bench_test.go | 2 +- storage/cache/database.go | 1 + storage/cache/redis.go | 231 +++++++++++++---------------------- 9 files changed, 92 insertions(+), 164 deletions(-) diff --git a/base/floats/floats_avx.go b/base/floats/floats_avx.go index d1b8172f6..cb7988966 100644 --- a/base/floats/floats_avx.go +++ b/base/floats/floats_avx.go @@ -1,4 +1,5 @@ //go:build !noasm && amd64 + // AUTO-GENERATED BY GOAT -- DO NOT EDIT package floats diff --git a/base/floats/floats_avx512.go b/base/floats/floats_avx512.go index 468cf097c..1c65d3ef1 100644 --- a/base/floats/floats_avx512.go +++ b/base/floats/floats_avx512.go @@ -1,4 +1,5 @@ //go:build !noasm && amd64 + // AUTO-GENERATED BY GOAT -- DO NOT EDIT package floats diff --git a/base/floats/floats_neon.go b/base/floats/floats_neon.go index 01b763816..c70b9b71d 100644 --- a/base/floats/floats_neon.go +++ b/base/floats/floats_neon.go @@ -1,4 +1,5 @@ //go:build !noasm && arm64 + // AUTO-GENERATED BY GOAT -- DO NOT EDIT package floats diff --git a/client/client_test.go b/client/client_test.go index 452d875a1..f6fa0da3e 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -19,7 +19,7 @@ package client import ( "context" "encoding/base64" - "github.com/go-redis/redis/v9" + "github.com/redis/go-redis/v9" "github.com/stretchr/testify/suite" "testing" "time" diff --git a/go.mod b/go.mod index 8e4ace0d1..1e17dc235 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,6 @@ require ( github.com/go-playground/locales v0.14.0 github.com/go-playground/universal-translator v0.18.0 github.com/go-playground/validator/v10 v10.11.0 - github.com/go-redis/redis/v9 v9.0.0-rc.1 github.com/go-resty/resty/v2 v2.7.0 github.com/go-sql-driver/mysql v1.6.0 github.com/golang/protobuf v1.5.2 @@ -38,7 +37,7 @@ require ( github.com/prometheus/client_golang v1.13.0 github.com/rakyll/statik v0.1.7 github.com/redis/go-redis/extra/redisotel/v9 v9.5.3 - github.com/redis/go-redis/v9 v9.6.1 + github.com/redis/go-redis/v9 v9.7.0-beta.1 github.com/samber/lo v1.38.1 github.com/schollz/progressbar/v3 v3.9.0 github.com/sclevine/yj v0.0.0-20210612025309-737bdf40a5d1 diff --git a/go.sum b/go.sum index 34a0930f8..2fccce003 100644 --- a/go.sum +++ b/go.sum @@ -200,8 +200,6 @@ github.com/go-playground/universal-translator v0.18.0 h1:82dyy6p4OuJq4/CByFNOn/j github.com/go-playground/universal-translator v0.18.0/go.mod h1:UvRDBj+xPUEGrFYl+lu/H90nyDXpg0fqeB/AQUGNTVA= github.com/go-playground/validator/v10 v10.11.0 h1:0W+xRM511GY47Yy3bZUbJVitCNg2BOGlCyvTqsp/xIw= github.com/go-playground/validator/v10 v10.11.0/go.mod h1:i+3WkQ1FvaUjjxh1kSvIA4dMGDBiPU55YFDl0WbKdWU= -github.com/go-redis/redis/v9 v9.0.0-rc.1 h1:/+bS+yeUnanqAbuD3QwlejzQZ+4eqgfUtFTG4b+QnXs= -github.com/go-redis/redis/v9 v9.0.0-rc.1/go.mod h1:8et+z03j0l8N+DvsVnclzjf3Dl/pFHgRk+2Ct1qw66A= github.com/go-resty/resty/v2 v2.7.0 h1:me+K9p3uhSmXtrBZ4k9jcEAfJmuC8IivWHwaLZwPrFY= github.com/go-resty/resty/v2 v2.7.0/go.mod h1:9PWDzw47qPphMRFfhsyk0NnSgvluHcljSMVIq3w7q0I= github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= @@ -480,12 +478,6 @@ github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= -github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= -github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= -github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= -github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= -github.com/onsi/gomega v1.21.1 h1:OB/euWYIExnPBohllTicTHmGTrMaqJ67nIu80j0/uEM= -github.com/onsi/gomega v1.21.1/go.mod h1:iYAIXgPSaDHak0LCMA+AWBpIKBr8WZicMxnE8luStNc= github.com/openzipkin/zipkin-go v0.4.1 h1:kNd/ST2yLLWhaWrkgchya40TJabe8Hioj9udfPcEO5A= github.com/openzipkin/zipkin-go v0.4.1/go.mod h1:qY0VqDSN1pOBN94dBc6w2GJlWLiovAyg7Qt6/I9HecM= github.com/orcaman/concurrent-map v1.0.0 h1:I/2A2XPCb4IuQWcQhBhSwGfiuybl/J0ev9HDbW65HOY= @@ -540,8 +532,8 @@ github.com/redis/go-redis/extra/rediscmd/v9 v9.5.3 h1:1/BDligzCa40GTllkDnY3Y5DTH github.com/redis/go-redis/extra/rediscmd/v9 v9.5.3/go.mod h1:3dZmcLn3Qw6FLlWASn1g4y+YO9ycEFUOM+bhBmzLVKQ= github.com/redis/go-redis/extra/redisotel/v9 v9.5.3 h1:kuvuJL/+MZIEdvtb/kTBRiRgYaOmx1l+lYJyVdrRUOs= github.com/redis/go-redis/extra/redisotel/v9 v9.5.3/go.mod h1:7f/FMrf5RRRVHXgfk7CzSVzXHiWeuOQUu2bsVqWoa+g= -github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4= -github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA= +github.com/redis/go-redis/v9 v9.7.0-beta.1 h1:x8FaPEIjBIjzkO3irDARTa/yemsNeHmni16UcVK/ttE= +github.com/redis/go-redis/v9 v9.7.0-beta.1/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw= github.com/remyoudompheng/bigfft v0.0.0-20170806203942-52369c62f446/go.mod h1:uYEyJGbgTkfkS4+E/PavXkNJcbFIpEtjt2B0KDQ5+9M= github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= @@ -1156,8 +1148,6 @@ gopkg.in/ini.v1 v1.66.6 h1:LATuAqN/shcYAOkv3wl2L4rkaKqkcgTBQjOyYDvcPKI= gopkg.in/ini.v1 v1.66.6/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= -gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= -gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/server/bench_test.go b/server/bench_test.go index 9a1a299b0..6a107b6ca 100644 --- a/server/bench_test.go +++ b/server/bench_test.go @@ -30,8 +30,8 @@ import ( "time" "github.com/emicklei/go-restful/v3" - "github.com/go-redis/redis/v9" "github.com/go-resty/resty/v2" + "github.com/redis/go-redis/v9" "github.com/samber/lo" "github.com/stretchr/testify/require" "github.com/zhenghaoz/gorse/base/log" diff --git a/storage/cache/database.go b/storage/cache/database.go index b01798e1c..e8816fd6c 100644 --- a/storage/cache/database.go +++ b/storage/cache/database.go @@ -308,6 +308,7 @@ func Open(path, tablePrefix string) (Database, error) { if err != nil { return nil, err } + opt.Protocol = 2 database := new(Redis) database.client = redis.NewClient(opt) database.TablePrefix = storage.TablePrefix(tablePrefix) diff --git a/storage/cache/redis.go b/storage/cache/redis.go index b3468554f..5da2426ac 100644 --- a/storage/cache/redis.go +++ b/storage/cache/redis.go @@ -46,14 +46,11 @@ func (r *Redis) Ping() error { // Init nothing. func (r *Redis) Init() error { - // list index - result, err := r.client.Do(context.TODO(), "FT._LIST").Result() + // list indices + indices, err := r.client.FT_List(context.Background()).Result() if err != nil { return errors.Trace(err) } - indices := lo.Map(result.([]any), func(s any, _ int) string { - return s.(string) - }) // create index if !lo.Contains(indices, r.DocumentTable()) { _, err = r.client.Do(context.TODO(), "FT.CREATE", r.DocumentTable(), @@ -66,16 +63,33 @@ func (r *Redis) Init() error { "categories", "TAG", "SEPARATOR", ";", "timestamp", "NUMERIC", "SORTABLE"). Result() + // Blocked by https://github.com/redis/go-redis/issues/3150 + //_, err = r.client.FTCreate(context.TODO(), r.DocumentTable(), + // &redis.FTCreateOptions{ + // OnHash: true, + // Prefix: []any{r.DocumentTable() + ":"}, + // }, + // &redis.FieldSchema{FieldName: "collection", FieldType: redis.SearchFieldTypeTag}, + // &redis.FieldSchema{FieldName: "subset", FieldType: redis.SearchFieldTypeTag}, + // &redis.FieldSchema{FieldName: "id", FieldType: redis.SearchFieldTypeTag}, + // &redis.FieldSchema{FieldName: "score", FieldType: redis.SearchFieldTypeNumeric, Sortable: true}, + // &redis.FieldSchema{FieldName: "is_hidden", FieldType: redis.SearchFieldTypeNumeric}, + // &redis.FieldSchema{FieldName: "categories", FieldType: redis.SearchFieldTypeTag, Seperator: ";"}, + // &redis.FieldSchema{FieldName: "timestamp", FieldType: redis.SearchFieldTypeNumeric, Sortable: true}, + //).Result() if err != nil { return errors.Trace(err) } } if !lo.Contains(indices, r.PointsTable()) { - _, err = r.client.Do(context.TODO(), "FT.CREATE", r.PointsTable(), - "ON", "HASH", "PREFIX", "1", r.PointsTable()+":", "SCHEMA", - "name", "TAG", - "timestamp", "NUMERIC", "SORTABLE"). - Result() + _, err = r.client.FTCreate(context.TODO(), r.PointsTable(), + &redis.FTCreateOptions{ + OnHash: true, + Prefix: []any{r.PointsTable() + ":"}, + }, + &redis.FieldSchema{FieldName: "name", FieldType: redis.SearchFieldTypeTag}, + &redis.FieldSchema{FieldName: "timestamp", FieldType: redis.SearchFieldTypeNumeric, Sortable: true}, + ).Result() if err != nil { return errors.Trace(err) } @@ -262,19 +276,44 @@ func (r *Redis) SearchDocuments(ctx context.Context, collection, subset string, for _, q := range query { builder.WriteString(fmt.Sprintf(" @categories:{ %s }", escape(encdodeCategory(q)))) } - args := []any{"FT.SEARCH", r.DocumentTable(), builder.String(), "SORTBY", "score", "DESC", "LIMIT", begin} + options := &redis.FTSearchOptions{ + SortBy: []redis.FTSearchSortBy{{FieldName: "score", Desc: true}}, + LimitOffset: begin, + } if end == -1 { - args = append(args, 10000) + options.Limit = 10000 } else { - args = append(args, end-begin) + options.Limit = end - begin } - result, err := r.client.Do(ctx, args...).Result() + result, err := r.client.FTSearchWithArgs(ctx, r.DocumentTable(), builder.String(), options).Result() if err != nil { return nil, errors.Trace(err) } - _, _, documents, err := parseSearchDocumentsResult(result) - if err != nil { - return nil, errors.Trace(err) + documents := make([]Document, 0, len(result.Docs)) + for _, doc := range result.Docs { + var document Document + document.Id = doc.Fields["id"] + score, err := strconv.ParseFloat(doc.Fields["score"], 64) + if err != nil { + return nil, errors.Trace(err) + } + document.Score = score + isHidden, err := strconv.ParseInt(doc.Fields["is_hidden"], 10, 64) + if err != nil { + return nil, errors.Trace(err) + } + document.IsHidden = isHidden != 0 + categories, err := decodeCategories(doc.Fields["categories"]) + if err != nil { + return nil, errors.Trace(err) + } + document.Categories = categories + timestamp, err := strconv.ParseInt(doc.Fields["timestamp"], 10, 64) + if err != nil { + return nil, errors.Trace(err) + } + document.Timestamp = time.UnixMicro(timestamp).In(time.UTC) + documents = append(documents, document) } return documents, nil } @@ -291,16 +330,17 @@ func (r *Redis) UpdateDocuments(ctx context.Context, collections []string, id st builder.WriteString(fmt.Sprintf(" @id:{ %s }", escape(id))) for { // search documents - result, err := r.client.Do(ctx, "FT.SEARCH", r.DocumentTable(), builder.String(), "SORTBY", "score", "DESC", "LIMIT", 0, 10000).Result() - if err != nil { - return errors.Trace(err) - } - count, keys, _, err := parseSearchDocumentsResult(result) + result, err := r.client.FTSearchWithArgs(ctx, r.DocumentTable(), builder.String(), &redis.FTSearchOptions{ + SortBy: []redis.FTSearchSortBy{{FieldName: "score", Desc: true}}, + LimitOffset: 0, + Limit: 10000, + }).Result() if err != nil { return errors.Trace(err) } // update documents - for _, key := range keys { + for _, doc := range result.Docs { + key := doc.ID values := make([]any, 0) if patch.Score != nil { values = append(values, "score", *patch.Score) @@ -323,7 +363,7 @@ func (r *Redis) UpdateDocuments(ctx context.Context, collections []string, id st } } // break if no more documents - if count <= int64(len(keys)) { + if result.Total <= len(result.Docs) { break } } @@ -347,89 +387,31 @@ func (r *Redis) DeleteDocuments(ctx context.Context, collections []string, condi } for { // search documents - result, err := r.client.Do(ctx, "FT.SEARCH", r.DocumentTable(), builder.String(), "SORTBY", "score", "DESC", "LIMIT", 0, 10000).Result() - if err != nil { - return errors.Trace(err) - } - count, keys, _, err := parseSearchDocumentsResult(result) + result, err := r.client.FTSearchWithArgs(ctx, r.DocumentTable(), builder.String(), &redis.FTSearchOptions{ + SortBy: []redis.FTSearchSortBy{{FieldName: "score", Desc: true}}, + LimitOffset: 0, + Limit: 10000, + }).Result() if err != nil { return errors.Trace(err) } // delete documents p := r.client.Pipeline() - for _, key := range keys { - p.Del(ctx, key) + for _, doc := range result.Docs { + p.Del(ctx, doc.ID) } _, err = p.Exec(ctx) if err != nil { return errors.Trace(err) } // break if no more documents - if count == int64(len(keys)) { + if result.Total == len(result.Docs) { break } } return nil } -func parseSearchDocumentsResult(result any) (count int64, keys []string, documents []Document, err error) { - rows, ok := result.([]any) - if !ok { - return 0, nil, nil, errors.Errorf("invalid FT.SEARCH result: %#v", result) - } - count, ok = rows[0].(int64) - if !ok { - return 0, nil, nil, errors.Errorf("invalid FT.SEARCH result: %#v", rows[0]) - } - for i := 1; i < len(rows); i += 2 { - key, ok := rows[i].(string) - if !ok { - return 0, nil, nil, errors.Errorf("invalid FT.SEARCH result: %#v", rows[i]) - } - keys = append(keys, key) - row, ok := rows[i+1].([]any) - if !ok { - return 0, nil, nil, errors.Errorf("invalid FT.SEARCH result: %#v", rows[i+1]) - } - fields := make(map[string]any) - for j := 0; j < len(row); j += 2 { - fields[row[j].(string)] = row[j+1] - } - var document Document - document.Id, ok = fields["id"].(string) - if !ok { - return 0, nil, nil, errors.Errorf("invalid FT.SEARCH result: %#v", fields["id"]) - } - score, ok := fields["score"].(string) - if !ok { - return 0, nil, nil, errors.Errorf("invalid FT.SEARCH result: %#v", fields["score"]) - } - document.Score, err = strconv.ParseFloat(score, 64) - if err != nil { - return 0, nil, nil, errors.Trace(err) - } - categories, ok := fields["categories"].(string) - if !ok { - return 0, nil, nil, errors.Errorf("invalid FT.SEARCH result: %#v", fields["categories"]) - } - document.Categories, err = decodeCategories(categories) - if err != nil { - return 0, nil, nil, errors.Trace(err) - } - timestamp, ok := fields["timestamp"].(string) - if !ok { - return 0, nil, nil, errors.Errorf("invalid FT.SEARCH result: %#v", fields["timestamp"]) - } - timestampMicros, err := strconv.ParseInt(timestamp, 10, 64) - if err != nil { - return 0, nil, nil, errors.Trace(err) - } - document.Timestamp = time.UnixMicro(timestampMicros).In(time.UTC) - documents = append(documents, document) - } - return -} - func (r *Redis) pointKey(name string, timestamp time.Time) string { return fmt.Sprintf("%s:%s:%d", r.PointsTable(), name, timestamp.UnixMicro()) } @@ -447,75 +429,28 @@ func (r *Redis) AddTimeSeriesPoints(ctx context.Context, points []TimeSeriesPoin } func (r *Redis) GetTimeSeriesPoints(ctx context.Context, name string, begin, end time.Time) ([]TimeSeriesPoint, error) { - result, err := r.client.Do(ctx, "FT.SEARCH", r.PointsTable(), + result, err := r.client.FTSearchWithArgs(ctx, r.PointsTable(), fmt.Sprintf("@name:{ %s } @timestamp:[%d (%d]", escape(name), begin.UnixMicro(), end.UnixMicro()), - "SORTBY", "timestamp").Result() + &redis.FTSearchOptions{SortBy: []redis.FTSearchSortBy{{FieldName: "timestamp"}}}).Result() if err != nil { return nil, errors.Trace(err) } - _, _, points, err := parseGetTimeSeriesPointsResult(result) - if err != nil { - return nil, errors.Trace(err) - } - - // Results in the JSON serializer producing an empty array instead of a - // null value. More info at: - // https://github.com/golang/go/wiki/CodeReviewComments#declaring-empty-slices - if points == nil { - return []TimeSeriesPoint{}, nil - } - - return points, nil -} - -func parseGetTimeSeriesPointsResult(result any) (count int64, keys []string, points []TimeSeriesPoint, err error) { - rows, ok := result.([]any) - if !ok { - return 0, nil, nil, errors.Errorf("invalid FT.SEARCH result: %#v", result) - } - count, ok = rows[0].(int64) - if !ok { - return 0, nil, nil, errors.Errorf("invalid FT.SEARCH result: %#v", rows[0]) - } - for i := 1; i < len(rows); i += 2 { - key, ok := rows[i].(string) - if !ok { - return 0, nil, nil, errors.Errorf("invalid FT.SEARCH result: %#v", rows[i]) - } - keys = append(keys, key) - row, ok := rows[i+1].([]any) - if !ok { - return 0, nil, nil, errors.Errorf("invalid FT.SEARCH result: %#v", rows[i+1]) - } - fields := make(map[string]any) - for j := 0; j < len(row); j += 2 { - fields[row[j].(string)] = row[j+1] - } + points := make([]TimeSeriesPoint, 0, len(result.Docs)) + for _, doc := range result.Docs { var point TimeSeriesPoint - point.Name, ok = fields["name"].(string) - if !ok { - return 0, nil, nil, errors.Errorf("invalid FT.SEARCH result: %#v", fields["name"]) - } - value, ok := fields["value"].(string) - if !ok { - return 0, nil, nil, errors.Errorf("invalid FT.SEARCH result: %#v", fields["value"]) - } - point.Value, err = strconv.ParseFloat(value, 64) + point.Name = doc.Fields["name"] + point.Value, err = strconv.ParseFloat(doc.Fields["value"], 64) if err != nil { - return 0, nil, nil, errors.Trace(err) - } - timestamp, ok := fields["timestamp"].(string) - if !ok { - return 0, nil, nil, errors.Errorf("invalid FT.SEARCH result: %#v", fields["timestamp"]) + return nil, errors.Trace(err) } - timestampMicros, err := strconv.ParseInt(timestamp, 10, 64) + timestamp, err := strconv.ParseInt(doc.Fields["timestamp"], 10, 64) if err != nil { - return 0, nil, nil, errors.Trace(err) + return nil, errors.Trace(err) } - point.Timestamp = time.UnixMicro(timestampMicros).In(time.UTC) + point.Timestamp = time.UnixMicro(timestamp).In(time.UTC) points = append(points, point) } - return + return points, nil } func encdodeCategory(category string) string {