diff --git a/admin-cli/executor/client.go b/admin-cli/executor/client.go index b41c7189f0..edaee5504b 100644 --- a/admin-cli/executor/client.go +++ b/admin-cli/executor/client.go @@ -23,6 +23,7 @@ import ( "fmt" "io" "os" + "strings" "github.com/apache/incubator-pegasus/admin-cli/client" "github.com/apache/incubator-pegasus/admin-cli/util" @@ -45,6 +46,7 @@ type Client struct { } // NewClient creates a client for accessing Pegasus cluster for use of admin-cli. +// This function will call os.Exit. func NewClient(writer io.Writer, metaAddrs []string) *Client { meta := client.NewRPCBasedMeta(metaAddrs) @@ -67,3 +69,46 @@ func NewClient(writer io.Writer, metaAddrs []string) *Client { Perf: aggregate.NewPerfClient(metaAddrs), } } + +// NewClientWithoutExit creates a client for accessing Pegasus cluster for use of admin-cli. +// This function will not call os.Exit. +func NewClientWithoutExit(writer io.Writer, metaAddrs []string) (*Client, error) { + meta := client.NewRPCBasedMeta(metaAddrs) + + nodes, err := meta.ListNodes() + if err != nil { + fmt.Fprintf(writer, "fatal: failed to list nodes [%s]\n", err) + return nil, fmt.Errorf("fatal: failed to list nodes [%s]", err) + } + + var replicaAddrs []string + for _, node := range nodes { + replicaAddrs = append(replicaAddrs, node.Address.GetAddress()) + } + + return &Client{ + Writer: writer, + Meta: meta, + Nodes: util.NewPegasusNodeManager(metaAddrs, replicaAddrs), + Perf: aggregate.NewPerfClient(metaAddrs), + }, nil +} + +func CloseClient(writer io.Writer, client *Client) error { + var errorStrings []string + err := client.Meta.Close() + if err != nil { + fmt.Fprintf(writer, "fatal: failed to close meta session [%s]\n", err) + errorStrings = append(errorStrings, err.Error()) + } + + client.Perf.Close() + + err = client.Nodes.CloseAllNodes() + if err != nil { + fmt.Fprintf(writer, "fatal: failed to close nodes session [%s]\n", err) + errorStrings = append(errorStrings, err.Error()) + } + + return fmt.Errorf("%s", strings.Join(errorStrings, "\n")) +} diff --git a/admin-cli/executor/disk_info.go b/admin-cli/executor/disk_info.go index a7793bb4d1..09c3e08eef 100644 --- a/admin-cli/executor/disk_info.go +++ b/admin-cli/executor/disk_info.go @@ -22,10 +22,12 @@ package executor import ( "context" "fmt" + "strings" "time" "github.com/apache/incubator-pegasus/admin-cli/tabular" "github.com/apache/incubator-pegasus/admin-cli/util" + "github.com/apache/incubator-pegasus/go-client/idl/admin" "github.com/apache/incubator-pegasus/go-client/idl/base" "github.com/apache/incubator-pegasus/go-client/idl/radmin" "github.com/apache/incubator-pegasus/go-client/session" @@ -45,7 +47,7 @@ func QueryDiskInfo(client *Client, infoType DiskInfoType, replicaServer string, } func GetDiskInfo(client *Client, infoType DiskInfoType, replicaServer string, tableName string, diskTag string, print bool) ([]interface{}, error) { - resp, err := sendQueryDiskInfoRequest(client, replicaServer, tableName) + resp, err := SendQueryDiskInfoRequest(client, replicaServer, tableName) if err != nil { return nil, err } @@ -60,7 +62,7 @@ func GetDiskInfo(client *Client, infoType DiskInfoType, replicaServer string, ta } } -func sendQueryDiskInfoRequest(client *Client, replicaServer string, tableName string) (*radmin.QueryDiskInfoResponse, error) { +func SendQueryDiskInfoRequest(client *Client, replicaServer string, tableName string) (*radmin.QueryDiskInfoResponse, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() @@ -88,7 +90,7 @@ func QueryAllNodesDiskInfo(client *Client, tableName string) (map[string]*radmin } for _, nodeInfo := range nodeInfos { address := nodeInfo.GetAddress().GetAddress() - resp, err := sendQueryDiskInfoRequest(client, address, tableName) + resp, err := SendQueryDiskInfoRequest(client, address, tableName) if err != nil { return respMap, err } @@ -97,6 +99,31 @@ func QueryAllNodesDiskInfo(client *Client, tableName string) (map[string]*radmin return respMap, nil } +func QueryAliveNodesDiskInfo(client *Client, tableName string) (map[string]*radmin.QueryDiskInfoResponse, error) { + respMap := make(map[string]*radmin.QueryDiskInfoResponse) + nodeInfos, err := client.Meta.ListNodes() + if err != nil { + return respMap, err + } + for _, nodeInfo := range nodeInfos { + if nodeInfo.Status != admin.NodeStatus_NS_ALIVE { + continue + } + address := nodeInfo.GetAddress().GetAddress() + resp, err := SendQueryDiskInfoRequest(client, address, tableName) + if err != nil { + // this replica server haven't the table partition. + if strings.Contains(err.Error(), "ERR_OBJECT_NOT_FOUND") { + continue + } else { + return respMap, err + } + } + respMap[address] = resp + } + return respMap, nil +} + type DiskCapacityStruct struct { Disk string `json:"disk"` Capacity int64 `json:"capacity"` diff --git a/admin-cli/util/pegasus_node.go b/admin-cli/util/pegasus_node.go index 5f7320820c..a8e1b02125 100644 --- a/admin-cli/util/pegasus_node.go +++ b/admin-cli/util/pegasus_node.go @@ -84,6 +84,10 @@ func (n *PegasusNode) RPCAddress() *base.RPCAddress { return base.NewRPCAddress(n.IP, n.Port) } +func (n *PegasusNode) Close() error { + return n.session.Close() +} + // NewNodeFromTCPAddr creates a node from tcp address. // NOTE: // - Will not initialize TCP connection unless needed. @@ -211,3 +215,14 @@ func (m *PegasusNodeManager) GetPerfSession(addr string, ntype session.NodeType) return aggregate.WrapPerf(addr, node.session) } + +func (m *PegasusNodeManager) CloseAllNodes() error { + var errorStrings []string + for _, n := range m.nodes { + err := n.Close() + if err != nil { + errorStrings = append(errorStrings, err.Error()) + } + } + return fmt.Errorf("%s", strings.Join(errorStrings, "\n")) +}