Skip to content

Commit

Permalink
Fix check if shard exists in qdb (#776)
Browse files Browse the repository at this point in the history
* add shards to qdb from config, check if shard exists in qdb

* fix regress error msg

* check if cfg is specified
  • Loading branch information
diPhantxm authored Sep 23, 2024
1 parent 2989f55 commit 1a2a55d
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 5 deletions.
6 changes: 5 additions & 1 deletion cmd/coordinator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ var rootCmd = &cobra.Command{
return fmt.Errorf("init frontend TLS: %w", err)
}

coordinator := provider.NewCoordinator(frTLS, db)
coordinator, err := provider.NewCoordinator(frTLS, db)
if err != nil {
return err
}

app := app.NewApp(coordinator)
return app.Run(true)
},
Expand Down
6 changes: 5 additions & 1 deletion cmd/router/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,11 @@ var runCmd = &cobra.Command{
return fmt.Errorf("init frontend TLS: %w", err)
}

coordinator := provider.NewCoordinator(frTLS, db)
coordinator, err := provider.NewCoordinator(frTLS, db)
if err != nil {
return err
}

app := coordApp.NewApp(coordinator)
return app.Run(false)
}(); err != nil {
Expand Down
19 changes: 17 additions & 2 deletions coordinator/provider/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,11 +276,26 @@ func (qc *qdbCoordinator) watchRouters(ctx context.Context) {
}
}

func NewCoordinator(tlsconfig *tls.Config, db qdb.XQDB) *qdbCoordinator {
func NewCoordinator(tlsconfig *tls.Config, db qdb.XQDB) (*qdbCoordinator, error) {
if config.CoordinatorConfig().ShardDataCfg != "" {
shards, err := config.LoadShardDataCfg(config.CoordinatorConfig().ShardDataCfg)
if err != nil {
return nil, err
}

if shards != nil {
for id, cfg := range shards.ShardsData {
if err := db.AddShard(context.TODO(), qdb.NewShard(id, cfg.Hosts)); err != nil {
return nil, err
}
}
}
}

return &qdbCoordinator{
db: db,
tlsconfig: tlsconfig,
}
}, nil
}

// TODO : unit tests
Expand Down
4 changes: 4 additions & 0 deletions qdb/etcdqdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,10 @@ func (q *EtcdQDB) GetShard(ctx context.Context, id string) (*Shard, error) {
return nil, err
}

if len(resp.Kvs) == 0 {
return nil, spqrerror.Newf(spqrerror.SPQR_NO_DATASHARD, "shard \"%s\" not found", id)
}

shardInfo := &Shard{
ID: id,
}
Expand Down
1 change: 1 addition & 0 deletions test/regress/conf/coordinator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ host: regress_coordinator
coordinator_port: 7002
grpc_api_port: 7003
qdb_addr: '[regress_qdb_0_1]:2379'
shard_data: '/spqr/test/feature/conf/shard_data.yaml'
13 changes: 13 additions & 0 deletions test/regress/conf/shard_data.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
shards:
sh1:
db: regress
usr: regress
pwd: 12345678
hosts:
- 'spqr_shard_1:6432'
sh2:
db: regress
usr: regress
pwd: 12345678
hosts:
- 'spqr_shard_2:6432'
52 changes: 52 additions & 0 deletions test/regress/tests/coordinator/expected/coordinator.out
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,55 @@ REGISTER ROUTER r1 ADDRESS regress_router:7000;
router -> r1-regress_router:7000
(1 row)

CREATE DISTRIBUTION ds1 COLUMN TYPES integer;
add distribution
------------------------
distribution id -> ds1
(1 row)

CREATE KEY RANGE krid1 FROM 1 ROUTE TO sh1 FOR DISTRIBUTION ds1;
add key range
---------------
bound -> 1
(1 row)

CREATE KEY RANGE krid2 FROM 11 ROUTE TO sh1 FOR DISTRIBUTION ds1;
add key range
---------------
bound -> 11
(1 row)

SHOW key_ranges;
Key range ID | Shard ID | Distribution ID | Lower bound
--------------+----------+-----------------+-------------
krid1 | sh1 | ds1 | 1
krid2 | sh1 | ds1 | 11
(2 rows)

DROP KEY RANGE krid1;
drop key range
-----------------------
key range id -> krid1
(1 row)

CREATE KEY RANGE krid2 FROM 11 ROUTE TO sh2 FOR DISTRIBUTION ds1;
ERROR: key range krid2 already present in qdb.
SHOW key_ranges;
Key range ID | Shard ID | Distribution ID | Lower bound
--------------+----------+-----------------+-------------
krid2 | sh1 | ds1 | 11
(1 row)

CREATE KEY RANGE krid2 FROM 33 ROUTE TO nonexistentshard FOR DISTRIBUTION ds1;
ERROR: shard "nonexistentshard" not found.
DROP DISTRIBUTION ALL CASCADE;
drop distribution
------------------------
distribution id -> ds1
(1 row)

DROP KEY RANGE ALL;
drop key range
----------------
(0 rows)

19 changes: 18 additions & 1 deletion test/regress/tests/coordinator/sql/coordinator.sql
Original file line number Diff line number Diff line change
@@ -1,2 +1,19 @@
-- UNREGISTER ROUTER ALL
REGISTER ROUTER r1 ADDRESS regress_router:7000;
REGISTER ROUTER r1 ADDRESS regress_router:7000;

CREATE DISTRIBUTION ds1 COLUMN TYPES integer;
CREATE KEY RANGE krid1 FROM 1 ROUTE TO sh1 FOR DISTRIBUTION ds1;
CREATE KEY RANGE krid2 FROM 11 ROUTE TO sh1 FOR DISTRIBUTION ds1;

SHOW key_ranges;

DROP KEY RANGE krid1;

CREATE KEY RANGE krid2 FROM 11 ROUTE TO sh2 FOR DISTRIBUTION ds1;

SHOW key_ranges;

CREATE KEY RANGE krid2 FROM 33 ROUTE TO nonexistentshard FOR DISTRIBUTION ds1;

DROP DISTRIBUTION ALL CASCADE;
DROP KEY RANGE ALL;

0 comments on commit 1a2a55d

Please sign in to comment.