Skip to content

Commit

Permalink
Add commands for task group to SQL (#637)
Browse files Browse the repository at this point in the history
* Add task group commands to gram

* Add lexer test

* Add DROP TASK GROUP processing to meta

* Add 'SHOW task_group' processing to meta

* Add json field names to TaskGroup & Task

* Add test for 'SHOW task_group'

* Add test for 'DROP TASK GROUP'

* FIx linter
  • Loading branch information
EinKrebs authored Apr 24, 2024
1 parent e1194bb commit 4a27609
Show file tree
Hide file tree
Showing 13 changed files with 936 additions and 611 deletions.
54 changes: 54 additions & 0 deletions pkg/clientinteractor/interactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package clientinteractor
import (
"context"
"fmt"
"github.com/pg-sharding/spqr/pkg/models/tasks"
"net"
"sort"
"strconv"
Expand Down Expand Up @@ -327,6 +328,59 @@ func (pi *PSQLInteractor) UnlockKeyRange(ctx context.Context, krid string) error
return pi.CompleteMsg(0)
}

func (pi *PSQLInteractor) Tasks(_ context.Context, ts []*tasks.Task) error {
spqrlog.Zero.Debug().Msg("listing move tasks")

for _, msg := range []pgproto3.BackendMessage{
&pgproto3.RowDescription{Fields: []pgproto3.FieldDescription{
TextOidFD("State"),
TextOidFD("Bound"),
TextOidFD("Source key range ID"),
TextOidFD("Destination key range ID"),
},
},
} {
if err := pi.cl.Send(msg); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
return err
}
}

for _, task := range ts {

if err := pi.cl.Send(&pgproto3.DataRow{
Values: [][]byte{
[]byte(tasks.TaskStateToStr(task.State)),
task.Bound,
[]byte(task.KrIdFrom),
[]byte(task.KrIdTo),
},
}); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
return err
}
}
return pi.CompleteMsg(0)
}

func (pi *PSQLInteractor) DropTaskGroup(_ context.Context) error {
if err := pi.WriteHeader("drop task group"); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
return err
}

for _, msg := range []pgproto3.BackendMessage{
&pgproto3.DataRow{Values: [][]byte{[]byte("dropped all tasks")}},
} {
if err := pi.cl.Send(msg); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
return err
}
}

return pi.CompleteMsg(0)
}

// TODO : unit tests
func (pi *PSQLInteractor) Shards(ctx context.Context, shards []*datashards.DataShard) error {
if err := pi.WriteHeader("listing data shards"); err != nil {
Expand Down
11 changes: 11 additions & 0 deletions pkg/meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ func processDrop(ctx context.Context, dstmt spqrparser.Statement, isCascade bool
return err
}
return cli.DropShard(stmt.ID)
case *spqrparser.TaskGroupSelector:
if err := mngr.RemoveTaskGroup(ctx); err != nil {
return err
}
return cli.DropTaskGroup(ctx)
default:
return fmt.Errorf("unknown drop statement")
}
Expand Down Expand Up @@ -414,6 +419,12 @@ func ProcessShow(ctx context.Context, stmt *spqrparser.Show, mngr EntityMgr, ci
}

return cli.Relations(dsToRels, stmt.Where)
case spqrparser.TaskGroupStr:
group, err := mngr.GetTaskGroup(ctx)
if err != nil {
return err
}
return cli.Tasks(ctx, group.Tasks)
default:
return unknownCoordinatorCommand
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/models/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,19 @@ func TaskStateToProto(state TaskState) protos.TaskStatus {
}
}

func TaskStateToStr(state TaskState) string {
switch state {
case TaskPlanned:
return "PLANNED"
case TaskSplit:
return "SPLIT"
case TaskMoved:
return "MOVED"
default:
panic("incorrect task state")
}
}

func JoinTypeToProto(t JoinType) protos.JoinType {
switch t {
case JoinNone:
Expand Down
18 changes: 9 additions & 9 deletions qdb/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,16 @@ func NewDistribution(id string, coltypes []string) *Distribution {
}

type Task struct {
ShardFromId string
ShardToId string
KrIdFrom string
KrIdTo string
Bound []byte
KrIdTemp string
State int
ShardFromId string `json:"shard_from_id"`
ShardToId string `json:"shard_to_id"`
KrIdFrom string `json:"kr_id_from"`
KrIdTo string `json:"kr_id_to"`
Bound []byte `json:"bound"`
KrIdTemp string `json:"kr_id_temp"`
State int `json:"state"`
}

type TaskGroup struct {
Tasks []*Task
JoinType int
Tasks []*Task `json:"tasks"`
JoinType int `json:"join_type"`
}
69 changes: 66 additions & 3 deletions test/feature/features/coordinator.feature
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ Feature: Coordinator test
Scenario: Add/Remove distribution works
When I run SQL on host "coordinator"
"""
CREATE DISTRIBUTION ds1_test COLUMN TYPES integer;
CREATE DISTRIBUTION ds1_test COLUMN TYPES integer;
CREATE KEY RANGE krid11 FROM 0 ROUTE TO sh1 FOR DISTRIBUTION ds1_test;
CREATE KEY RANGE krid22 FROM 11 ROUTE TO sh2 FOR DISTRIBUTION ds1_test;
ALTER DISTRIBUTION ds1_test ATTACH RELATION test1 DISTRIBUTION KEY id;
Expand Down Expand Up @@ -75,7 +75,7 @@ Feature: Coordinator test

When I run SQL on host "coordinator"
"""
CREATE DISTRIBUTION ds1_test COLUMN TYPES integer;
CREATE DISTRIBUTION ds1_test COLUMN TYPES integer;
CREATE KEY RANGE krid11 FROM 0 ROUTE TO sh1 FOR DISTRIBUTION ds1_test;
CREATE KEY RANGE krid22 FROM 11 ROUTE TO sh2 FOR DISTRIBUTION ds1_test;
ALTER DISTRIBUTION ds1_test ATTACH RELATION test1 DISTRIBUTION KEY id;
Expand Down Expand Up @@ -583,7 +583,7 @@ Feature: Coordinator test

When I run SQL on host "coordinator"
"""
CREATE DISTRIBUTION ds1 COLUMN TYPES integer;
CREATE DISTRIBUTION ds1 COLUMN TYPES integer;
CREATE KEY RANGE krid1 FROM 0 ROUTE TO sh1 FOR DISTRIBUTION ds1;
CREATE KEY RANGE krid2 FROM 11 ROUTE TO sh2 FOR DISTRIBUTION ds1;
ALTER DISTRIBUTION ds1 ATTACH RELATION test DISTRIBUTION KEY id;
Expand Down Expand Up @@ -644,3 +644,66 @@ Feature: Coordinator test
}
]
"""

Scenario: Dropping move task group works
When I record in qdb move task group
"""
{
"tasks":
[
{
"shard_from_id": "sh_from",
"shard_to_id": "sh_to",
"kr_id_from": "kr_from",
"kr_id_to": "kr_to",
"bound": "MQ==",
"state": 1
},
{
"shard_from_id": "sh_from",
"shard_to_id": "sh_to",
"kr_id_from": "kr_from",
"kr_id_to": "kr_to",
"bound": "MTA=",
"state": 0
}
]
}
"""
Then command return code should be "0"
When I run SQL on host "coordinator"
"""
SHOW task_group
"""
Then command return code should be "0"
And SQL result should match json_exactly
"""
[
{
"State": "SPLIT",
"Bound": "1",
"Source key range ID": "kr_from",
"Destination key range ID": "kr_to"
},
{
"State": "PLANNED",
"Bound": "10",
"Source key range ID": "kr_from",
"Destination key range ID": "kr_to"
}
]
"""
When I run SQL on host "coordinator"
"""
DROP TASK GROUP
"""
Then command return code should be "0"
When I run SQL on host "coordinator"
"""
SHOW task_group
"""
Then command return code should be "0"
And SQL result should match json_exactly
"""
[]
"""
51 changes: 50 additions & 1 deletion test/feature/features/coordinator_show.feature
Original file line number Diff line number Diff line change
Expand Up @@ -479,4 +479,53 @@ Feature: Coordinator show clients, pools and backend_connections
"used connections":"0"
}
]
"""
"""

Scenario: Show task group
When I record in qdb move task group
"""
{
"tasks":
[
{
"shard_from_id": "sh_from",
"shard_to_id": "sh_to",
"kr_id_from": "kr_from",
"kr_id_to": "kr_to",
"bound": "MQ==",
"state": 1
},
{
"shard_from_id": "sh_from",
"shard_to_id": "sh_to",
"kr_id_from": "kr_from",
"kr_id_to": "kr_to",
"bound": "MTA=",
"state": 0
}
]
}
"""
Then command return code should be "0"
When I run SQL on host "coordinator"
"""
SHOW task_group
"""
Then command return code should be "0"
And SQL result should match json_exactly
"""
[
{
"State": "SPLIT",
"Bound": "1",
"Source key range ID": "kr_from",
"Destination key range ID": "kr_to"
},
{
"State": "PLANNED",
"Bound": "10",
"Source key range ID": "kr_from",
"Destination key range ID": "kr_to"
}
]
"""
12 changes: 12 additions & 0 deletions test/feature/spqr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,17 @@ func (tctx *testContext) stepRecordQDBKRMove(body *godog.DocString) error {
return tctx.qdb.RecordKeyRangeMove(context.TODO(), &m)
}

func (tctx *testContext) stepRecordQDBTaskGroup(body *godog.DocString) error {
query := strings.TrimSpace(body.Content)
var taskGroup qdb.TaskGroup
if err := json.Unmarshal([]byte(query), &taskGroup); err != nil {
spqrlog.Zero.Error().Err(err).Msg("Failed to unmarshal request")
return err
}

return tctx.qdb.WriteTaskGroup(context.TODO(), &taskGroup)
}

func (tctx *testContext) stepQDBShouldContainTx(key string) error {
tx, err := tctx.qdb.GetTransferTx(context.TODO(), key)
if err != nil {
Expand Down Expand Up @@ -997,6 +1008,7 @@ func InitializeScenario(s *godog.ScenarioContext, t *testing.T, debug bool) {
s.Step(`^I record in qdb data transfer transaction with name "([^"]*)"$`, tctx.stepRecordQDBTx)
s.Step(`^qdb should not contain relation "([^"]*)"$`, tctx.stepQDBShouldNotContainRelation)
s.Step(`^I record in qdb key range move$`, tctx.stepRecordQDBKRMove)
s.Step(`^I record in qdb move task group$`, tctx.stepRecordQDBTaskGroup)
s.Step(`^qdb should contain transaction "([^"]*)"$`, tctx.stepQDBShouldContainTx)
s.Step(`^qdb should not contain transaction "([^"]*)"$`, tctx.stepQDBShouldNotContainTx)
s.Step(`^qdb should not contain key range moves$`, tctx.stepQDBShouldNotContainKRMoves)
Expand Down
9 changes: 5 additions & 4 deletions yacc/console/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,13 @@ type ShardSelector struct {
ID string
}

type DropRoutersAll struct{}

func (*DropRoutersAll) iStatement() {}
type TaskGroupSelector struct{}

func (*KeyRangeSelector) iDrop() {}
func (*ShardingRuleSelector) iDrop() {}
func (*DistributionSelector) iDrop() {}
func (*ShardSelector) iDrop() {}
func (*TaskGroupSelector) iDrop() {}

const (
EntityRouters = "ROUTERS"
Expand Down Expand Up @@ -231,7 +230,7 @@ func (*DetachRelation) iStatement() {}
func (*DetachRelation) iAlter() {}
func (*DetachRelation) iAlterDistribution() {}

// The frollowing constants represent SHOW statements.
// The following constants represent SHOW statements.
const (
DatabasesStr = "databases"
DistributionsStr = "distributions"
Expand All @@ -245,6 +244,7 @@ const (
StatusStr = "status"
VersionStr = "version"
RelationsStr = "relations"
TaskGroupStr = "task_group"
UnsupportedStr = "unsupported"
)

Expand All @@ -263,6 +263,7 @@ func (*KeyRangeSelector) iStatement() {}
func (*ShardingRuleSelector) iStatement() {}
func (*DistributionSelector) iStatement() {}
func (*ShardSelector) iStatement() {}
func (*TaskGroupSelector) iStatement() {}
func (*Lock) iStatement() {}
func (*Unlock) iStatement() {}
func (*Shutdown) iStatement() {}
Expand Down
Loading

0 comments on commit 4a27609

Please sign in to comment.