Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
liuqi committed Jan 29, 2015
1 parent 13198f3 commit cdaa672
Show file tree
Hide file tree
Showing 6 changed files with 346 additions and 132 deletions.
154 changes: 24 additions & 130 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,162 +1,56 @@
package main

import (
"bytes"
"encoding/json"
"flag"
"io/ioutil"
"net/http"
"strconv"
"time"

"github.com/juju/errors"
log "github.com/ngaut/logging"
"github.com/wandoulabs/codis/pkg/models"
"strconv"
"time"
)

type fnHttpCall func(objPtr interface{}, api string, method string, arg interface{}) error
type aliveCheckerFactory func(addr string, defaultTimeout time.Duration) AliveChecker

var (
apiServer = flag.String("apiserver", "localhost:18087", "api server address")
productName = flag.String("productName", "test", "product name, can be found in codis-proxy's config")

tr = http.DefaultTransport
)

func getApiResult(result interface{}, api string, method string, arg interface{}) error {
client := &http.Client{Transport: tr}
url := "http://" + *apiServer + api
rw := &bytes.Buffer{}
if arg != nil {
buf, err := json.Marshal(arg)
if err != nil {
return errors.Trace(err)
}
rw.Write(buf)
}

req, err := http.NewRequest(method, url, rw)
if err != nil {
return errors.Trace(err)
}

resp, err := client.Do(req)
if err != nil {
return errors.Trace(err)
}
defer resp.Body.Close()

buf, err := ioutil.ReadAll(resp.Body)
if err != nil {
return errors.Trace(err)
}

if resp.StatusCode/100 != 2 {
return errors.Errorf("error: %d, message: %s", resp.StatusCode, string(buf))
}

if result != nil {
return errors.Trace(json.Unmarshal(buf, result))
}

return nil
}

func GetServerGroups() ([]models.ServerGroup, error) {
var groups []models.ServerGroup
err := getApiResult(&groups, "/api/server_groups", "GET", nil)
return groups, err
}

func DoPing(instance AliveChecker) error {
return instance.CheckAlive()
}

func PingServer(s models.Server, errServerCh chan<- *models.Server) {
rc := &redisChecker{
addr: s.Addr,
defaultTimeout: 5 * time.Second,
}
if err := DoPing(rc); err != nil {
errServerCh <- &s
} else {
errServerCh <- nil
}
}

func getSlave(master *models.Server) (*models.Server, error) {
var group models.ServerGroup
err := getApiResult(&group, "/api/server_group/"+strconv.Itoa(master.GroupId), "GET", nil)
if err != nil {
return nil, errors.Trace(err)
}

for _, s := range group.Servers {
if s.Type == models.SERVER_TYPE_SLAVE {
return &s, nil
}
}

return nil, errors.Errorf("can not find any slave in this group %v", group)
}

func handleCrashedServer(s *models.Server) {
switch s.Type {
case models.SERVER_TYPE_MASTER:
//get slave and do promote
slave, err := getSlave(s)
if err != nil {
log.Warning(errors.ErrorStack(err))
return
}

err = getApiResult(nil, "/api/server_group/"+strconv.Itoa(slave.GroupId)+"/promote", "POST", slave)
if err != nil {
log.Errorf("do promote %v failed %v", slave, errors.ErrorStack(err))
return
}
case models.SERVER_TYPE_SLAVE:
log.Errorf("server is down: %+v", s)
case models.SERVER_TYPE_OFFLINE:
//no need to handle it
default:
log.Errorf("unkonwn type %+v", s)
}
}

//ping codis-server find crashed codis-server
func PingCrashedNodes(groups []models.ServerGroup) ([]models.ServerGroup, error) {
errServerCh := make(chan *models.Server, 100)
var serverCnt int
for _, group := range groups { //each group
for _, s := range group.Servers { //each server
serverCnt++
go PingServer(s, errServerCh)
callHttp fnHttpCall = httpCall
acf aliveCheckerFactory = func(addr string, timeout time.Duration) AliveChecker {
return &redisChecker{
addr: addr,
defaultTimeout: timeout,
}
}
)

//get result
for i := 0; i < serverCnt; i++ {
s := <-errServerCh
if s == nil { //alive
continue
func genUrl(args ...interface{}) string {
url := "http://"
for _, v := range args {
switch v.(type) {
case string:
url += v.(string)
case int:
url += strconv.Itoa(v.(int))
default:
log.Errorf("unsupported type %T", v)
}

log.Warningf("server maybe crashed %+v", s)
handleCrashedServer(s)
}

return nil, nil
return url
}

func main() {
flag.Parse()

for {
groups, err := GetServerGroups()
if err != nil {
log.Error(errors.ErrorStack(err))
return
}

PingCrashedNodes(groups)
CheckAliveAndPromote(groups)
time.Sleep(3 * time.Second)
}
}
3 changes: 1 addition & 2 deletions redischecker.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package main

import (
"time"

"github.com/garyburd/redigo/redis"
"time"
)

var (
Expand Down
29 changes: 29 additions & 0 deletions redischecker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package main

import (
"github.com/alicebob/miniredis"
"testing"
"time"
)

func TestRedisChecker(t *testing.T) {
r, _ := miniredis.Run()
defer r.Close()
addr := r.Addr()
rc := &redisChecker{
addr: addr,
defaultTimeout: 5 * time.Second,
}

err := rc.CheckAlive()
if err != nil {
t.Error(err)
}

//test bad address
rc.addr = "xxx"
err = rc.CheckAlive()
if err == nil {
t.Error("should be error")
}
}
41 changes: 41 additions & 0 deletions rest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package main

import (
"bytes"
"net/http"

"encoding/json"
"github.com/juju/errors"
"io/ioutil"
)

//call http url and get json, then decode to objptr
func httpCall(objPtr interface{}, url string, method string, arg interface{}) error {
client := &http.Client{Transport: http.DefaultTransport}
rw := &bytes.Buffer{}
if arg != nil {
buf, err := json.Marshal(arg)
if err != nil {
return errors.Trace(err)
}
rw.Write(buf)
}

req, err := http.NewRequest(method, url, rw)
if err != nil {
return errors.Trace(err)
}

resp, err := client.Do(req)
if err != nil {
return errors.Trace(err)
}
defer resp.Body.Close()

if resp.StatusCode/100 != 2 {
msg, _ := ioutil.ReadAll(resp.Body)
return errors.Errorf("error: %d, message: %s", resp.StatusCode, string(msg))
}

return json.NewDecoder(resp.Body).Decode(objPtr)
}
101 changes: 101 additions & 0 deletions servergroup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package main

import (
"github.com/juju/errors"
log "github.com/ngaut/logging"
"github.com/wandoulabs/codis/pkg/models"
"time"
)

func GetServerGroups() ([]models.ServerGroup, error) {
var groups []models.ServerGroup
err := callHttp(&groups, genUrl(*apiServer, "/api/server_groups"), "GET", nil)
return groups, err
}

func PingServer(checker AliveChecker, errCtx interface{}, errCh chan<- interface{}) {
err := checker.CheckAlive()
log.Debugf("check %+v, result:%v, errCtx:%+v", checker, err, errCtx)
if err != nil {
errCh <- errCtx
return
}

errCh <- nil
}

func getSlave(master *models.Server) (*models.Server, error) {
var group models.ServerGroup
err := callHttp(&group, genUrl(*apiServer, "/api/server_group/", master.GroupId), "GET", nil)
if err != nil {
return nil, errors.Trace(err)
}

for _, s := range group.Servers {
if s.Type == models.SERVER_TYPE_SLAVE {
return &s, nil
}
}

return nil, errors.Errorf("can not find any slave in this group: %v", group)
}

func handleCrashedServer(s *models.Server) error {
switch s.Type {
case models.SERVER_TYPE_MASTER:
//get slave and do promote
slave, err := getSlave(s)
if err != nil {
log.Warning(errors.ErrorStack(err))
return err
}

log.Infof("try promote %+v", slave)
err = callHttp(nil, genUrl(*apiServer, "/api/server_group/", slave.GroupId, "/promote"), "POST", slave)
if err != nil {
log.Errorf("do promote %v failed %v", slave, errors.ErrorStack(err))
return err
}
case models.SERVER_TYPE_SLAVE:
log.Errorf("slave is down: %+v", s)
case models.SERVER_TYPE_OFFLINE:
//no need to handle it
default:
log.Fatalf("unkonwn type %+v", s)
}

return nil
}

//ping codis-server find crashed codis-server
func CheckAliveAndPromote(groups []models.ServerGroup) ([]models.Server, error) {
errCh := make(chan interface{}, 100)
var serverCnt int
for _, group := range groups { //each group
for _, s := range group.Servers { //each server
serverCnt++
rc := acf(s.Addr, 5*time.Second)
news := s
go PingServer(rc, &news, errCh)
}
}

//get result
crashedServer := make([]models.Server, 0)
for i := 0; i < serverCnt; i++ {
s := <-errCh
if s == nil { //alive
continue
}

log.Warningf("server maybe crashed %+v", s)
crashedServer = append(crashedServer, *s.(*models.Server))

err := handleCrashedServer(s.(*models.Server))
if err != nil {
return crashedServer, err
}
}

return crashedServer, nil
}
Loading

0 comments on commit cdaa672

Please sign in to comment.