Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: support close admin-cli client #2162

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions admin-cli/executor/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"io"
"os"
"strings"

"github.com/apache/incubator-pegasus/admin-cli/client"
"github.com/apache/incubator-pegasus/admin-cli/util"
Expand All @@ -45,6 +46,7 @@ type Client struct {
}

// NewClient creates a client for accessing Pegasus cluster for use of admin-cli.
// This function will call os.Exit.
func NewClient(writer io.Writer, metaAddrs []string) *Client {
meta := client.NewRPCBasedMeta(metaAddrs)

Expand All @@ -67,3 +69,46 @@ func NewClient(writer io.Writer, metaAddrs []string) *Client {
Perf: aggregate.NewPerfClient(metaAddrs),
}
}

// NewClientWithoutExit creates a client for accessing Pegasus cluster for use of admin-cli.
// This function will not call os.Exit.
func NewClientWithoutExit(writer io.Writer, metaAddrs []string) (*Client, error) {
ruojieranyishen marked this conversation as resolved.
Show resolved Hide resolved
meta := client.NewRPCBasedMeta(metaAddrs)

nodes, err := meta.ListNodes()
if err != nil {
fmt.Fprintf(writer, "fatal: failed to list nodes [%s]\n", err)
ruojieranyishen marked this conversation as resolved.
Show resolved Hide resolved
return nil, fmt.Errorf("fatal: failed to list nodes [%s]", err)
}

var replicaAddrs []string
for _, node := range nodes {
replicaAddrs = append(replicaAddrs, node.Address.GetAddress())
}

return &Client{
Writer: writer,
Meta: meta,
Nodes: util.NewPegasusNodeManager(metaAddrs, replicaAddrs),
Perf: aggregate.NewPerfClient(metaAddrs),
}, nil
}

func CloseClient(writer io.Writer, client *Client) error {
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
var errorStrings []string
err := client.Meta.Close()
if err != nil {
fmt.Fprintf(writer, "fatal: failed to close meta session [%s]\n", err)
errorStrings = append(errorStrings, err.Error())
}

client.Perf.Close()

err = client.Nodes.CloseAllNodes()
if err != nil {
fmt.Fprintf(writer, "fatal: failed to close nodes session [%s]\n", err)
errorStrings = append(errorStrings, err.Error())
}

return fmt.Errorf("%s", strings.Join(errorStrings, "\n"))
}
33 changes: 30 additions & 3 deletions admin-cli/executor/disk_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ package executor
import (
"context"
"fmt"
"strings"
"time"

"github.com/apache/incubator-pegasus/admin-cli/tabular"
"github.com/apache/incubator-pegasus/admin-cli/util"
"github.com/apache/incubator-pegasus/go-client/idl/admin"
"github.com/apache/incubator-pegasus/go-client/idl/base"
"github.com/apache/incubator-pegasus/go-client/idl/radmin"
"github.com/apache/incubator-pegasus/go-client/session"
Expand All @@ -45,7 +47,7 @@ func QueryDiskInfo(client *Client, infoType DiskInfoType, replicaServer string,
}

func GetDiskInfo(client *Client, infoType DiskInfoType, replicaServer string, tableName string, diskTag string, print bool) ([]interface{}, error) {
resp, err := sendQueryDiskInfoRequest(client, replicaServer, tableName)
resp, err := SendQueryDiskInfoRequest(client, replicaServer, tableName)
if err != nil {
return nil, err
}
Expand All @@ -60,7 +62,7 @@ func GetDiskInfo(client *Client, infoType DiskInfoType, replicaServer string, ta
}
}

func sendQueryDiskInfoRequest(client *Client, replicaServer string, tableName string) (*radmin.QueryDiskInfoResponse, error) {
func SendQueryDiskInfoRequest(client *Client, replicaServer string, tableName string) (*radmin.QueryDiskInfoResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

Expand Down Expand Up @@ -88,7 +90,7 @@ func QueryAllNodesDiskInfo(client *Client, tableName string) (map[string]*radmin
}
for _, nodeInfo := range nodeInfos {
address := nodeInfo.GetAddress().GetAddress()
resp, err := sendQueryDiskInfoRequest(client, address, tableName)
resp, err := SendQueryDiskInfoRequest(client, address, tableName)
if err != nil {
return respMap, err
}
Expand All @@ -97,6 +99,31 @@ func QueryAllNodesDiskInfo(client *Client, tableName string) (map[string]*radmin
return respMap, nil
}

func QueryAliveNodesDiskInfo(client *Client, tableName string) (map[string]*radmin.QueryDiskInfoResponse, error) {
ruojieranyishen marked this conversation as resolved.
Show resolved Hide resolved
respMap := make(map[string]*radmin.QueryDiskInfoResponse)
nodeInfos, err := client.Meta.ListNodes()
if err != nil {
return respMap, err
}
for _, nodeInfo := range nodeInfos {
if nodeInfo.Status != admin.NodeStatus_NS_ALIVE {
continue
}
address := nodeInfo.GetAddress().GetAddress()
resp, err := SendQueryDiskInfoRequest(client, address, tableName)
if err != nil {
// this replica server haven't the table partition.
if strings.Contains(err.Error(), "ERR_OBJECT_NOT_FOUND") {
continue
} else {
return respMap, err
}
}
respMap[address] = resp
}
return respMap, nil
}

type DiskCapacityStruct struct {
Disk string `json:"disk"`
Capacity int64 `json:"capacity"`
Expand Down
21 changes: 21 additions & 0 deletions admin-cli/util/pegasus_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ func (n *PegasusNode) RPCAddress() *base.RPCAddress {
return base.NewRPCAddress(n.IP, n.Port)
}

func (n *PegasusNode) Close() error {
if n.session != nil {
return n.session.Close()
}
return nil
}

// NewNodeFromTCPAddr creates a node from tcp address.
// NOTE:
// - Will not initialize TCP connection unless needed.
Expand Down Expand Up @@ -211,3 +218,17 @@ func (m *PegasusNodeManager) GetPerfSession(addr string, ntype session.NodeType)

return aggregate.WrapPerf(addr, node.session)
}

func (m *PegasusNodeManager) CloseAllNodes() error {
var errorStrings []string
for _, n := range m.nodes {
err := n.Close()
if err != nil {
errorStrings = append(errorStrings, err.Error())
}
}
if len(errorStrings) != 0 {
return fmt.Errorf("%s", strings.Join(errorStrings, "\n"))
}
return nil
}
Loading