Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/vitessio/vitess into arthur…
Browse files Browse the repository at this point in the history
…/fix-conn-kill-hang

Signed-off-by: Arthur Schreiber <[email protected]>
  • Loading branch information
arthurschreiber committed Mar 4, 2024
2 parents ee52f63 + 9ff255d commit 0f30e84
Show file tree
Hide file tree
Showing 236 changed files with 17,125 additions and 12,732 deletions.
15 changes: 15 additions & 0 deletions changelog/20.0/20.0.0/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,31 @@
### Table of Contents

- **[Major Changes](#major-changes)**
- **[Breaking changes](#breaking-changes)**
- [`shutdown_grace_period` Default Change](#shutdown-grace-period-default)
- **[Query Compatibility](#query-compatibility)**
- [Vindex Hints](#vindex-hints)
- [Update with Limit Support](#update-limit)
- [Update with Multi Table Support](#multi-table-update)
- [Delete with Subquery Support](#delete-subquery)
- **[Flag changes](#flag-changes)**
- [`pprof-http` default change](#pprof-http-default)
- [New `healthcheck-dial-concurrency` flag](#healthcheck-dial-concurrency-flag)
- **[Minor Changes](#minor-changes)**
- **[New Stats](#new-stats)**
- [VTTablet Query Cache Hits and Misses](#vttablet-query-cache-hits-and-misses)

## <a id="major-changes"/>Major Changes

### <a id="breaking-changes"/>Breaking Changes

#### <a id="shutdown-grace-period-default"/>`shutdown_grace_period` Default Change

The `--shutdown_grace_period` flag, which was introduced in v2 with a default of `0 seconds`, has now been changed to default to `3 seconds`.
This makes reparenting in Vitess resilient to client errors, and prevents PlannedReparentShard from timing out.

In order to preserve the old behaviour, the users can set the flag back to `0 seconds` causing open transactions to never be shutdown, but in that case, they run the risk of PlannedReparentShard calls timing out.

### <a id="query-compatibility"/>Query Compatibility

#### <a id="vindex-hints"/> Vindex Hints
Expand Down Expand Up @@ -61,6 +73,9 @@ The `--pprof-http` flag, which was introduced in v19 with a default of `true`, h
This makes HTTP `pprof` endpoints now an *opt-in* feature, rather than opt-out.
To continue enabling these endpoints, explicitly set `--pprof-http` when starting up Vitess components.

#### <a id="healthcheck-dial-concurrency-flag"/>New `--healthcheck-dial-concurrency` flag

The new `--healthcheck-dial-concurrency` flag defines the maximum number of healthcheck connections that can open concurrently. This limit is to avoid hitting Go runtime panics on deployments watching enough tablets [to hit the runtime's maximum thread limit of `10000`](https://pkg.go.dev/runtime/debug#SetMaxThreads) due to blocking network syscalls. This flag applies to `vtcombo`, `vtctld` and `vtgate` only and a value less than the runtime max thread limit _(`10000`)_ is recommended.

## <a id="minor-changes"/>Minor Changes

Expand Down
3 changes: 0 additions & 3 deletions config/tablet/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,21 +57,18 @@ oltpReadPool:
idleTimeoutSeconds: 1800 # queryserver-config-idle-timeout
maxLifetimeSeconds: 0 # queryserver-config-pool-conn-max-lifetime
prefillParallelism: 0 # queryserver-config-pool-prefill-parallelism
maxWaiters: 50000 # queryserver-config-query-pool-waiter-cap

olapReadPool:
size: 200 # queryserver-config-stream-pool-size
timeoutSeconds: 0 # queryserver-config-query-pool-timeout
idleTimeoutSeconds: 1800 # queryserver-config-idle-timeout
prefillParallelism: 0 # queryserver-config-stream-pool-prefill-parallelism
maxWaiters: 0

txPool:
size: 20 # queryserver-config-transaction-cap
timeoutSeconds: 1 # queryserver-config-txpool-timeout
idleTimeoutSeconds: 1800 # queryserver-config-idle-timeout
prefillParallelism: 0 # queryserver-config-transaction-prefill-parallelism
maxWaiters: 50000 # queryserver-config-txpool-waiter-cap

oltp:
queryTimeoutSeconds: 30 # queryserver-config-query-timeout
Expand Down
3 changes: 0 additions & 3 deletions doc/design-docs/TabletServerParamsAsYAML.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,21 +95,18 @@ oltpReadPool:
timeoutSeconds: 0 # queryserver-config-query-pool-timeout
idleTimeoutSeconds: 1800 # queryserver-config-idle-timeout
prefillParallelism: 0 # queryserver-config-pool-prefill-parallelism
maxWaiters: 50000 # queryserver-config-query-pool-waiter-cap
olapReadPool:
size: 200 # queryserver-config-stream-pool-size
timeoutSeconds: 0 # queryserver-config-query-pool-timeout
idleTimeoutSeconds: 1800 # queryserver-config-idle-timeout
prefillParallelism: 0 # queryserver-config-stream-pool-prefill-parallelism
maxWaiters: 0
txPool:
size: 20 # queryserver-config-transaction-cap
timeoutSeconds: 1 # queryserver-config-txpool-timeout
idleTimeoutSeconds: 1800 # queryserver-config-idle-timeout
prefillParallelism: 0 # queryserver-config-transaction-prefill-parallelism
maxWaiters: 50000 # queryserver-config-txpool-waiter-cap
oltp:
queryTimeoutSeconds: 30 # queryserver-config-query-timeout
Expand Down
34 changes: 10 additions & 24 deletions go/cache/lru_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package cache

import (
"testing"

"github.com/stretchr/testify/assert"
)

type CacheValue struct {
Expand All @@ -27,24 +29,12 @@ type CacheValue struct {
func TestInitialState(t *testing.T) {
cache := NewLRUCache[*CacheValue](5)
l, sz, c, e, h, m := cache.Len(), cache.UsedCapacity(), cache.MaxCapacity(), cache.Evictions(), cache.Hits(), cache.Misses()
if l != 0 {
t.Errorf("length = %v, want 0", l)
}
if sz != 0 {
t.Errorf("size = %v, want 0", sz)
}
if c != 5 {
t.Errorf("capacity = %v, want 5", c)
}
if e != 0 {
t.Errorf("evictions = %v, want 0", c)
}
if h != 0 {
t.Errorf("hits = %v, want 0", c)
}
if m != 0 {
t.Errorf("misses = %v, want 0", c)
}
assert.Zero(t, l)
assert.EqualValues(t, 0, sz)
assert.EqualValues(t, 5, c)
assert.EqualValues(t, 0, e)
assert.EqualValues(t, 0, h)
assert.EqualValues(t, 0, m)
}

func TestSetInsertsValue(t *testing.T) {
Expand Down Expand Up @@ -137,12 +127,8 @@ func TestCapacityIsObeyed(t *testing.T) {
// Insert one more; something should be evicted to make room.
cache.Set("key4", value)
sz, evictions := cache.UsedCapacity(), cache.Evictions()
if sz != size {
t.Errorf("post-evict cache.UsedCapacity() = %v, expected %v", sz, size)
}
if evictions != 1 {
t.Errorf("post-evict cache.Evictions() = %v, expected 1", evictions)
}
assert.Equal(t, size, sz)
assert.EqualValues(t, 1, evictions)

// Check various other stats
if l := cache.Len(); int64(l) != size {
Expand Down
14 changes: 8 additions & 6 deletions go/cache/theine/singleflight_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func TestDo(t *testing.T) {
return "bar", nil
})

assert.Equal(t, "bar (string)", fmt.Sprintf("%v (%T)", v, v), "incorrect Do value")
assert.NoError(t, err, "got Do error")
assert.Equal(t, "bar (string)", fmt.Sprintf("%v (%T)", v, v))
assert.NoError(t, err)
}

func TestDoErr(t *testing.T) {
Expand Down Expand Up @@ -85,11 +85,11 @@ func TestDoDupSuppress(t *testing.T) {
defer wg2.Done()
wg1.Done()
v, err, _ := g.Do("key", fn)
if !assert.NoError(t, err, "unexpected Do error") {
if !assert.NoError(t, err) {
return
}

assert.Equal(t, "bar", v, "unexpected Do value")
assert.Equal(t, "bar", v)
}()
}
wg1.Wait()
Expand All @@ -98,7 +98,8 @@ func TestDoDupSuppress(t *testing.T) {
c <- "bar"
wg2.Wait()
got := atomic.LoadInt32(&calls)
assert.True(t, got > 0 && got < n, "number of calls not between 0 and %d", n)
assert.Greater(t, got, int32(0))
assert.Less(t, got, int32(n))
}

// Test singleflight behaves correctly after Do panic.
Expand Down Expand Up @@ -131,7 +132,7 @@ func TestPanicDo(t *testing.T) {

select {
case <-done:
assert.Equal(t, int32(n), panicCount, "unexpected number of panics")
assert.EqualValues(t, n, panicCount)
case <-time.After(time.Second):
require.Fail(t, "Do hangs")
}
Expand All @@ -152,6 +153,7 @@ func TestGoexitDo(t *testing.T) {
var err error
defer func() {
assert.NoError(t, err)

if atomic.AddInt32(&waited, -1) == 0 {
close(done)
}
Expand Down
10 changes: 7 additions & 3 deletions go/cmd/vtcombo/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

"vitess.io/vitess/go/acl"
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
Expand Down Expand Up @@ -84,6 +85,8 @@ In particular, it contains:
tabletTypesToWait []topodatapb.TabletType

env *vtenv.Environment

srvTopoCounts *stats.CountersWithSingleLabel
)

func init() {
Expand Down Expand Up @@ -131,6 +134,7 @@ func init() {
if err != nil {
log.Fatalf("unable to initialize env: %v", err)
}
srvTopoCounts = stats.NewCountersWithSingleLabel("ResilientSrvTopoServer", "Resilient srvtopo server operations", "type")
}

func startMysqld(uid uint32) (mysqld *mysqlctl.Mysqld, cnf *mysqlctl.Mycnf, err error) {
Expand Down Expand Up @@ -234,7 +238,7 @@ func run(cmd *cobra.Command, args []string) (err error) {
// to be the "internal" protocol that InitTabletMap registers.
cmd.Flags().Set("tablet_manager_protocol", "internal")
cmd.Flags().Set("tablet_protocol", "internal")
uid, err := vtcombo.InitTabletMap(env, ts, &tpb, mysqld, &dbconfigs.GlobalDBConfigs, schemaDir, startMysql)
uid, err := vtcombo.InitTabletMap(env, ts, &tpb, mysqld, &dbconfigs.GlobalDBConfigs, schemaDir, startMysql, srvTopoCounts)
if err != nil {
// ensure we start mysql in the event we fail here
if startMysql {
Expand All @@ -260,7 +264,7 @@ func run(cmd *cobra.Command, args []string) (err error) {
}

wr := wrangler.New(env, logutil.NewConsoleLogger(), ts, nil)
newUID, err := vtcombo.CreateKs(ctx, env, ts, &tpb, mysqld, &dbconfigs.GlobalDBConfigs, schemaDir, ks, true, uid, wr)
newUID, err := vtcombo.CreateKs(ctx, env, ts, &tpb, mysqld, &dbconfigs.GlobalDBConfigs, schemaDir, ks, true, uid, wr, srvTopoCounts)
if err != nil {
return err
}
Expand Down Expand Up @@ -297,7 +301,7 @@ func run(cmd *cobra.Command, args []string) (err error) {
}

// vtgate configuration and init
resilientServer = srvtopo.NewResilientServer(context.Background(), ts, "ResilientSrvTopoServer")
resilientServer = srvtopo.NewResilientServer(context.Background(), ts, srvTopoCounts)

tabletTypes := make([]topodatapb.TabletType, 0, 1)
if len(tabletTypesToWait) != 0 {
Expand Down
39 changes: 39 additions & 0 deletions go/cmd/vtctldclient/command/shards.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ that shard.`,
Args: cobra.ExactArgs(1),
RunE: commandGetShard,
}
// GetShardReplication makes a GetShardReplication gRPC request to a vtctld.
GetShardReplication = &cobra.Command{
Use: "GetShardReplication <keyspace/shard> [cell1 [cell2...]]",
Short: "Returns information about the replication relationships for a shard in the given cell(s).",
DisableFlagsInUseLine: true,
Args: cobra.MinimumNArgs(1),
RunE: commandGetShardReplication,
}
// RemoveShardCell makes a RemoveShardCell gRPC request to a vtctld.
RemoveShardCell = &cobra.Command{
Use: "RemoveShardCell [--force|-f] [--recursive|-r] <keyspace/shard> <cell>",
Expand Down Expand Up @@ -286,6 +294,36 @@ func commandGetShard(cmd *cobra.Command, args []string) error {
return nil
}

func commandGetShardReplication(cmd *cobra.Command, args []string) error {
keyspace, shard, err := topoproto.ParseKeyspaceShard(cmd.Flags().Arg(0))
if err != nil {
return err
}

cells := cmd.Flags().Args()[1:]

cli.FinishedParsing(cmd)

resp, err := client.GetShardReplication(commandCtx, &vtctldatapb.GetShardReplicationRequest{
Keyspace: keyspace,
Shard: shard,
Cells: cells,
})
if err != nil {
return err
}

data, err := cli.MarshalJSON(resp)
if err != nil {
return err
}

fmt.Printf("%s\n", data)

return nil

}

var removeShardCellOptions = struct {
Force bool
Recursive bool
Expand Down Expand Up @@ -624,6 +662,7 @@ func init() {
Root.AddCommand(DeleteShards)

Root.AddCommand(GetShard)
Root.AddCommand(GetShardReplication)
Root.AddCommand(GenerateShardRanges)

RemoveShardCell.Flags().BoolVarP(&removeShardCellOptions.Force, "force", "f", false, "Proceed even if the cell's topology server cannot be reached. The assumption is that you turned down the entire cell, and just need to update the global topo data.")
Expand Down
12 changes: 5 additions & 7 deletions go/cmd/vtctldclient/command/vreplication/reshard/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,18 @@ func commandReshardCreate(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

req := &vtctldatapb.ReshardCreateRequest{
Workflow: common.BaseOptions.Workflow,
Keyspace: common.BaseOptions.TargetKeyspace,

Workflow: common.BaseOptions.Workflow,
Keyspace: common.BaseOptions.TargetKeyspace,
TabletTypes: common.CreateOptions.TabletTypes,
TabletSelectionPreference: tsp,
Cells: common.CreateOptions.Cells,
OnDdl: common.CreateOptions.OnDDL,
DeferSecondaryKeys: common.CreateOptions.DeferSecondaryKeys,
AutoStart: common.CreateOptions.AutoStart,
StopAfterCopy: common.CreateOptions.StopAfterCopy,

SourceShards: reshardCreateOptions.sourceShards,
TargetShards: reshardCreateOptions.targetShards,
SkipSchemaCopy: reshardCreateOptions.skipSchemaCopy,
SourceShards: reshardCreateOptions.sourceShards,
TargetShards: reshardCreateOptions.targetShards,
SkipSchemaCopy: reshardCreateOptions.skipSchemaCopy,
}
resp, err := common.GetClient().ReshardCreate(common.GetCommandCtx(), req)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion go/cmd/vtexplain/cli/vtexplain.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"os"

"vitess.io/vitess/go/acl"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/topo/memorytopo"
Expand Down Expand Up @@ -186,7 +187,8 @@ func parseAndRun() error {
}
ctx := context.Background()
ts := memorytopo.NewServer(ctx, vtexplain.Cell)
vte, err := vtexplain.Init(ctx, env, ts, vschema, schema, ksShardMap, opts)
srvTopoCounts := stats.NewCountersWithSingleLabel("", "Resilient srvtopo server operations", "type")
vte, err := vtexplain.Init(ctx, env, ts, vschema, schema, ksShardMap, opts, srvTopoCounts)
if err != nil {
return err
}
Expand Down
9 changes: 8 additions & 1 deletion go/cmd/vtgate/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"vitess.io/vitess/go/acl"
"vitess.io/vitess/go/exit"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/srvtopo"
Expand Down Expand Up @@ -73,8 +74,14 @@ var (
PreRunE: servenv.CobraPreRunE,
RunE: run,
}

srvTopoCounts *stats.CountersWithSingleLabel
)

func init() {
srvTopoCounts = stats.NewCountersWithSingleLabel("ResilientSrvTopoServer", "Resilient srvtopo server operations", "type")
}

// CheckCellFlags will check validation of cell and cells_to_watch flag
// it will help to avoid strange behaviors when vtgate runs but actually does not work
func CheckCellFlags(ctx context.Context, serv srvtopo.Server, cell string, cellsToWatch string) error {
Expand Down Expand Up @@ -139,7 +146,7 @@ func run(cmd *cobra.Command, args []string) error {
ts := topo.Open()
defer ts.Close()

resilientServer = srvtopo.NewResilientServer(context.Background(), ts, "ResilientSrvTopoServer")
resilientServer = srvtopo.NewResilientServer(context.Background(), ts, srvTopoCounts)

tabletTypes := make([]topodatapb.TabletType, 0, 1)
for _, tt := range tabletTypesToWait {
Expand Down
Loading

0 comments on commit 0f30e84

Please sign in to comment.