Skip to content

Commit

Permalink
Feat/control service object status (#2912)
Browse files Browse the repository at this point in the history
  • Loading branch information
roman-khimov committed Aug 9, 2024
2 parents 73e8414 + c4fda9d commit 0cbd3d0
Show file tree
Hide file tree
Showing 11 changed files with 1,229 additions and 232 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Changelog for NeoFS Node
- Indexes inspection command to neofs-lens (#2882)
- Add objects sanity checker to neofs-lens (#2506)
- Support for 0.20.0+ neofs-contract archive format (#2872)
- `neofs-cli control object status` command (#2886)

### Fixed
- Control service's Drop call does not clean metabase (#2822)
Expand Down
92 changes: 92 additions & 0 deletions cmd/neofs-cli/modules/control/objects.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package control

import (
"fmt"

rawclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
"github.com/nspcc-dev/neofs-node/cmd/neofs-cli/internal/commonflags"
"github.com/nspcc-dev/neofs-node/cmd/neofs-cli/internal/key"
"github.com/nspcc-dev/neofs-node/pkg/services/control"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/spf13/cobra"
)

const objectFlag = "object"

var objectCmd = &cobra.Command{
Use: "object",
Short: "Direct object operations with storage engine",
}

var objectStatusCmd = &cobra.Command{
Use: "status",
Short: "Check current object status",
Args: cobra.NoArgs,
SilenceUsage: true,
RunE: objectStatus,
}

func objectStatus(cmd *cobra.Command, _ []string) error {
ctx, cancel := commonflags.GetCommandContext(cmd)
defer cancel()

pk := key.Get(cmd)
addressRaw, err := cmd.Flags().GetString(objectFlag)
if err != nil {
return fmt.Errorf("reading %s flag: %w", objectFlag, err)
}

var sdkAddr oid.Address
err = sdkAddr.DecodeString(addressRaw)
if err != nil {
return fmt.Errorf("validating address (%s): %w", addressRaw, err)
}

var resp *control.ObjectStatusResponse
req := &control.ObjectStatusRequest{
Body: &control.ObjectStatusRequest_Body{
ObjectAddress: addressRaw,
},
}
signRequest(cmd, pk, req)

cli := getClient(ctx, cmd)

err = cli.ExecRaw(func(client *rawclient.Client) error {
resp, err = control.ObjectStatus(client, req)
return err
})
if err != nil {
return fmt.Errorf("rpc error: %w", err)
}

verifyResponse(cmd, resp.GetSignature(), resp.GetBody())

shards := resp.GetBody().GetShards()
if len(shards) == 0 {
cmd.Println("<empty response>")
return nil
}

for _, shard := range shards {
cmd.Printf("Shard ID: %s\n", shard.ShardId)
storages := shard.GetStorages()
if len(storages) == 0 {
cmd.Println("\t<empty response>")
continue
}

for _, storage := range storages {
cmd.Printf("\t%s: %s\n", storage.Type, storage.Status)
}
}

return nil
}

func initObjectStatusFlags() {
initControlFlags(objectStatusCmd)

flags := objectStatusCmd.Flags()
flags.String(objectFlag, "", "Object address")
}
6 changes: 6 additions & 0 deletions cmd/neofs-cli/modules/control/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,23 @@ const (
)

func init() {
objectCmd.AddCommand(
objectStatusCmd,
)

Cmd.AddCommand(
healthCheckCmd,
setNetmapStatusCmd,
dropObjectsCmd,
shardsCmd,
synchronizeTreeCmd,
objectCmd,
)

initControlHealthCheckCmd()
initControlSetNetmapStatusCmd()
initControlDropObjectsCmd()
initControlShardsCmd()
initControlSynchronizeTreeCmd()
initObjectStatusFlags()
}
18 changes: 18 additions & 0 deletions pkg/services/control/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,21 @@ func (w *flushCacheResponseWrapper) FromGRPCMessage(m grpc.Message) error {
w.FlushCacheResponse = r
return nil
}

type objectStatusResponseWrapper struct {
*ObjectStatusResponse
}

func (w *objectStatusResponseWrapper) ToGRPCMessage() grpc.Message {
return w.ObjectStatusResponse
}

func (w *objectStatusResponseWrapper) FromGRPCMessage(m grpc.Message) error {
r, ok := m.(*ObjectStatusResponse)
if !ok {
return message.NewUnexpectedMessageType(m, (*ObjectStatusResponse)(nil))
}

w.ObjectStatusResponse = r
return nil
}
14 changes: 14 additions & 0 deletions pkg/services/control/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const (
rpcSynchronizeTree = "SynchronizeTree"
rpcEvacuateShard = "EvacuateShard"
rpcFlushCache = "FlushCache"
rpcObjectStatus = "ObjectStatus"
)

// HealthCheck executes ControlService.HealthCheck RPC.
Expand Down Expand Up @@ -191,3 +192,16 @@ func FlushCache(cli *client.Client, req *FlushCacheRequest, opts ...client.CallO

return wResp.FlushCacheResponse, nil
}

// ObjectStatus executes ControlService.ObjectStatus RPC.
func ObjectStatus(cli *client.Client, req *ObjectStatusRequest, opts ...client.CallOption) (*ObjectStatusResponse, error) {
wResp := &objectStatusResponseWrapper{new(ObjectStatusResponse)}
wReq := &requestWrapper{m: req}

err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcObjectStatus), wReq, wResp, opts...)
if err != nil {
return nil, err
}

return wResp.ObjectStatusResponse, nil
}
94 changes: 94 additions & 0 deletions pkg/services/control/server/object_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package control

import (
"context"
"fmt"
"strings"

"github.com/nspcc-dev/neofs-node/pkg/services/control"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func (s *Server) ObjectStatus(_ context.Context, request *control.ObjectStatusRequest) (*control.ObjectStatusResponse, error) {
err := s.isValidRequest(request)
if err != nil {
return nil, status.Error(codes.PermissionDenied, err.Error())
}

// check availability
err = s.ready()
if err != nil {
return nil, err
}

var addr oid.Address
err = addr.DecodeString(request.GetBody().GetObjectAddress())
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "parsing object address: %s", err)
}

st, err := s.storage.ObjectStatus(addr)
if err != nil {
return nil, status.Errorf(codes.Internal, "storage engine error: %s", err)
}

resp := &control.ObjectStatusResponse{
Body: &control.ObjectStatusResponse_Body{},
}

for _, sh := range st.Shards {
respSh := new(control.ObjectStatusResponse_Body_Shard)
respSh.ShardId = sh.ID

if len(sh.Shard.Metabase.State) == 0 {
// can be reconsidered since it is possible to get
// resynchronized state when meta knows nothing about
// stored objects in blob; however, it is a control
// service, not a debug util
continue
}

respSh.Storages = append(respSh.Storages, &control.ObjectStatusResponse_Body_Shard_Status{
Type: "metabase",
Status: strings.Join(sh.Shard.Metabase.State, ","),
})

for _, subStorage := range sh.Shard.Blob.Substorages {
respSh.Storages = append(respSh.Storages,
&control.ObjectStatusResponse_Body_Shard_Status{
Type: subStorage.Type,
Status: fmt.Sprintf("path: %q", subStorage.Path),
},
)
}

var wcStatus string
switch {
case sh.Shard.Writecache.PathDB != "":
wcStatus = fmt.Sprintf("database path: %q", sh.Shard.Writecache.PathDB)
case sh.Shard.Writecache.PathFSTree != "":
wcStatus = fmt.Sprintf("fsTree path: %q", sh.Shard.Writecache.PathFSTree)
}

// it can be turned off, it is OK
if wcStatus != "" {
respSh.Storages = append(respSh.Storages,
&control.ObjectStatusResponse_Body_Shard_Status{
Type: "write-cache",
Status: wcStatus,
},
)
}

resp.Body.Shards = append(resp.Body.Shards, respSh)
}

err = SignMessage(s.key, resp)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

return resp, nil
}
Loading

0 comments on commit 0cbd3d0

Please sign in to comment.