Skip to content

Commit

Permalink
node/control: add ObjectStatus control command
Browse files Browse the repository at this point in the history
Includes API definition extending; server side implementation; protoc version
update (my local version is higher, and it seems we do not have a strict policy
about the version).
The command requests server's storage engine (bypass any other logical check,
only storage) to answer its status according to its parts (shards
and their write-caches, blobstors, metabases),

Signed-off-by: Pavel Karpy <[email protected]>
  • Loading branch information
carpawell committed Aug 9, 2024
1 parent 73e8414 commit 9fabc49
Show file tree
Hide file tree
Showing 8 changed files with 1,130 additions and 232 deletions.
18 changes: 18 additions & 0 deletions pkg/services/control/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,21 @@ func (w *flushCacheResponseWrapper) FromGRPCMessage(m grpc.Message) error {
w.FlushCacheResponse = r
return nil
}

type objectStatusResponseWrapper struct {
*ObjectStatusResponse
}

func (w *objectStatusResponseWrapper) ToGRPCMessage() grpc.Message {
return w.ObjectStatusResponse
}

func (w *objectStatusResponseWrapper) FromGRPCMessage(m grpc.Message) error {
r, ok := m.(*ObjectStatusResponse)
if !ok {
return message.NewUnexpectedMessageType(m, (*ObjectStatusResponse)(nil))
}

w.ObjectStatusResponse = r
return nil
}
14 changes: 14 additions & 0 deletions pkg/services/control/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const (
rpcSynchronizeTree = "SynchronizeTree"
rpcEvacuateShard = "EvacuateShard"
rpcFlushCache = "FlushCache"
rpcObjectStatus = "ObjectStatus"
)

// HealthCheck executes ControlService.HealthCheck RPC.
Expand Down Expand Up @@ -191,3 +192,16 @@ func FlushCache(cli *client.Client, req *FlushCacheRequest, opts ...client.CallO

return wResp.FlushCacheResponse, nil
}

// ObjectStatus executes ControlService.ObjectStatus RPC.
func ObjectStatus(cli *client.Client, req *ObjectStatusRequest, opts ...client.CallOption) (*ObjectStatusResponse, error) {
wResp := &objectStatusResponseWrapper{new(ObjectStatusResponse)}
wReq := &requestWrapper{m: req}

err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcObjectStatus), wReq, wResp, opts...)
if err != nil {
return nil, err
}

return wResp.ObjectStatusResponse, nil
}
94 changes: 94 additions & 0 deletions pkg/services/control/server/object_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package control

import (
"context"
"fmt"
"strings"

"github.com/nspcc-dev/neofs-node/pkg/services/control"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func (s *Server) ObjectStatus(_ context.Context, request *control.ObjectStatusRequest) (*control.ObjectStatusResponse, error) {
err := s.isValidRequest(request)
if err != nil {
return nil, status.Error(codes.PermissionDenied, err.Error())
}

// check availability
err = s.ready()
if err != nil {
return nil, err
}

var addr oid.Address
err = addr.DecodeString(request.GetBody().GetObjectAddress())
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "parsing object address: %s", err)
}

st, err := s.storage.ObjectStatus(addr)
if err != nil {
return nil, status.Errorf(codes.Internal, "storage engine error: %s", err.Error())
}

resp := &control.ObjectStatusResponse{
Body: &control.ObjectStatusResponse_Body{},
}

for _, sh := range st.Shards {
respSh := new(control.ObjectStatusResponse_Body_Shard)
respSh.ShardId = sh.ID

if len(sh.Shard.Metabase.State) == 0 {
// can be reconsidered since it is possible to get
// resynchronized state when meta knows nothing about
// stored objects in blob; however, it is a control
// service, not a debug util
continue
}

respSh.Storages = append(respSh.Storages, &control.ObjectStatusResponse_Body_Shard_Status{
Type: "metabase",
Status: strings.Join(sh.Shard.Metabase.State, ","),
})

for _, subStorage := range sh.Shard.Blob.Substorages {
respSh.Storages = append(respSh.Storages,
&control.ObjectStatusResponse_Body_Shard_Status{
Type: subStorage.Type,
Status: fmt.Sprintf("path: %q", subStorage.Path),
},
)
}

var wcStatus string
switch {
case sh.Shard.Writecache.PathDB != "":
wcStatus = fmt.Sprintf("database path: %q", sh.Shard.Writecache.PathDB)
case sh.Shard.Writecache.PathFSTree != "":
wcStatus = fmt.Sprintf("fsTree path: %q", sh.Shard.Writecache.PathFSTree)
}

// it can be turned off, it is OK
if wcStatus != "" {
respSh.Storages = append(respSh.Storages,
&control.ObjectStatusResponse_Body_Shard_Status{
Type: "write-cache",
Status: wcStatus,
},
)
}

resp.Body.Shards = append(resp.Body.Shards, respSh)
}

err = SignMessage(s.key, resp)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

return resp, nil
}
Loading

0 comments on commit 9fabc49

Please sign in to comment.