-
Notifications
You must be signed in to change notification settings - Fork 46
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add 'REDISTRIBUTE KEY RANGE' command #789
base: master
Are you sure you want to change the base?
Conversation
…MoveTaskGroup is now used inside coordinator only. Its elements are now of tasks.MoveTask tye
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great job!
- Please add a description to all methods in balancer.go and coordinator.go
- Please add TODOs to all new methods that you add
- Please add regress tests for redistribute commond (just to check syntax and output)
- Please add as much documentation as possible
- I fount a lot of panics in code. You should assume that a panic will be immediately fatal, for the entire program, or at the very least for the current goroutine. Ask yourself "when this happens, should the application immediately crash?" If yes, use a panic; otherwise, use an error.
func (rel *DistributedRelation) GetDistributionKeyColumns() []string { | ||
res := make([]string, len(rel.DistributionKey)) | ||
for i, col := range rel.DistributionKey { | ||
// TODO: add hash |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't forget to fix todo and add unittests here
|
||
for _, tt := range []tcase{ | ||
{ | ||
query: "REDISTRIBUTE KEY RANGE kr1 TO sh2 BATCH SIZE 500", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets add more corner cases like
- batch size in not specified
- batch size is negative
- what is maximum supported batch size value? Minimum?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not the purpose of this test, it only tests parser, thus will parse any integer constant
@@ -846,6 +854,12 @@ move_key_range_stmt: | |||
$$ = &MoveKeyRange{KeyRangeID: $2.KeyRangeID, DestShardID: $4} | |||
} | |||
|
|||
redistribute_key_range_stmt: | |||
REDISTRIBUTE key_range_stmt TO any_id BATCH SIZE any_uint |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be cool to allow REDISTRIBUTE without batch size, using some default.
@@ -679,7 +679,7 @@ func (tctx *testContext) stepWaitPostgresqlToRespond(host string) error { | |||
const trials = 10 | |||
const timeout = 20 * time.Second | |||
for i := 0; i < trials; i++ { | |||
_, err := tctx.queryPostgresql(host, "SELECT 1", struct{}{}) | |||
_, err := tctx.queryPostgresql(host, "SELECT 1", struct{}{}, postgresqlQueryTimeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
postgresqlQueryTimeout -> timeout like in 394 line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need it here? Here the default timeout is sufficient
@@ -488,222 +455,94 @@ func (b *BalancerImpl) getTasks(ctx context.Context, shardFrom *ShardMetrics, kr | |||
join = tasks.JoinLeft | |||
} | |||
|
|||
host := shardFrom.TargetReplica |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is happening here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are now using BatchMoveKeyRange method of the coordinator instead of creating move tasks in balancer
@@ -1023,6 +1030,26 @@ func (pi *PSQLInteractor) MoveKeyRange(_ context.Context, move *kr.MoveKeyRange) | |||
return nil | |||
} | |||
|
|||
// RedistributeKeyRange returns info about 'REDISTRIBUTE KEY RANGE' command to psql client |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's an incorrect statement.
// Returns: | ||
// - error: An error if any occurred. | ||
// TODO : unit tests | ||
func (pi *PSQLInteractor) RedistributeKeyRange(_ context.Context, stmt *spqrparser.RedistributeKeyRange) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do you verify that a specific key range is contained in a specific distribution?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to verify that if all key range ids must be unique?
if err := pi.WriteHeader("redistribute key range"); err != nil { | ||
return err | ||
} | ||
if err := pi.WriteDataRow(fmt.Sprintf("redistribute key range '%s' to shard '%s' by batches of %d", stmt.KeyRangeID, stmt.DestShardID, stmt.BatchSize)); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets refactor RedistributeKeyRange
method, so REDISTRIBUTE output will be smth like this:
REDISTRIBUTE KEY RANGE kr1 TO sh2 BATCH SIZE 100;
redistribute key range
----------------------------------
distribution id -> ds2
key range -> kr1
shard destination -> sh2
batch size -> 100
(4 rows)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for distribution id here
@@ -97,3 +100,30 @@ func (s *ShardConnect) GetConnStrings() []string { | |||
} | |||
return res | |||
} | |||
|
|||
func (s *ShardConnect) GetConnectionPreferReplica(ctx context.Context) (*pgx.Conn, error) { | |||
connStrings := s.GetConnStrings() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's at least leave a few tasks here, because ideally we need to make sure that this replica is functional enough.
For example, replication lag is less than value from config or we have metrics from host or maybe smth else.
No description provided.