Skip to content

Commit

Permalink
Upgrade NeoFS SDK module to RC-10 (#2470)
Browse files Browse the repository at this point in the history
  • Loading branch information
cthulhu-rider authored Aug 8, 2023
2 parents d67d5d3 + 9674748 commit a7541ff
Show file tree
Hide file tree
Showing 81 changed files with 456 additions and 1,081 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ Changelog for NeoFS Node
- BlobStor tries to store object in any sub-storage with free space (#2450)

### Updated
- `neofs-sdk-go` to `v1.0.0-rc.9`
- `neofs-sdk-go` to `v1.0.0-rc.10`

### Updating from v0.37.0
CLI command timeouts (flag `--timeout`) now limit the total command execution
Expand Down
96 changes: 39 additions & 57 deletions cmd/neofs-cli/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package internal
import (
"bytes"
"context"
"errors"
"fmt"
"io"

Expand Down Expand Up @@ -79,6 +78,7 @@ func ListContainers(ctx context.Context, prm ListContainersPrm) (res ListContain
// PutContainerPrm groups parameters of PutContainer operation.
type PutContainerPrm struct {
commonPrm
signerRFC6979Prm

cnr containerSDK.Container
client.PrmContainerPut
Expand Down Expand Up @@ -108,7 +108,7 @@ func (x PutContainerRes) ID() cid.ID {
//
// Returns any error which prevented the operation from completing correctly in error return.
func PutContainer(ctx context.Context, prm PutContainerPrm) (res PutContainerRes, err error) {
cliRes, err := prm.cli.ContainerPut(ctx, prm.cnr, prm.PrmContainerPut)
cliRes, err := prm.cli.ContainerPut(ctx, prm.cnr, prm.signer, prm.PrmContainerPut)
if err == nil {
res.cnr = cliRes
}
Expand Down Expand Up @@ -166,6 +166,7 @@ func IsACLExtendable(ctx context.Context, c *client.Client, cnr cid.ID) (bool, e
// DeleteContainerPrm groups parameters of DeleteContainerPrm operation.
type DeleteContainerPrm struct {
commonPrm
signerRFC6979Prm

cid cid.ID
client.PrmContainerDelete
Expand All @@ -188,7 +189,7 @@ type DeleteContainerRes struct{}
//
// Returns any error which prevented the operation from completing correctly in error return.
func DeleteContainer(ctx context.Context, prm DeleteContainerPrm) (res DeleteContainerRes, err error) {
err = prm.cli.ContainerDelete(ctx, prm.cid, prm.PrmContainerDelete)
err = prm.cli.ContainerDelete(ctx, prm.cid, prm.signer, prm.PrmContainerDelete)

return
}
Expand Down Expand Up @@ -229,6 +230,7 @@ func EACL(ctx context.Context, prm EACLPrm) (res EACLRes, err error) {
// SetEACLPrm groups parameters of SetEACL operation.
type SetEACLPrm struct {
commonPrm
signerRFC6979Prm

table eacl.Table
client.PrmContainerSetEACL
Expand All @@ -251,7 +253,7 @@ type SetEACLRes struct{}
//
// Returns any error which prevented the operation from completing correctly in error return.
func SetEACL(ctx context.Context, prm SetEACLPrm) (res SetEACLRes, err error) {
err = prm.cli.ContainerSetEACL(ctx, prm.table, prm.PrmContainerSetEACL)
err = prm.cli.ContainerSetEACL(ctx, prm.table, prm.signer, prm.PrmContainerSetEACL)

return
}
Expand Down Expand Up @@ -337,6 +339,7 @@ func NetMapSnapshot(ctx context.Context, prm NetMapSnapshotPrm) (res NetMapSnaps
// CreateSessionPrm groups parameters of CreateSession operation.
type CreateSessionPrm struct {
commonPrm
signerPrm
client.PrmSessionCreate
}

Expand All @@ -359,7 +362,7 @@ func (x CreateSessionRes) SessionKey() []byte {
//
// Returns any error which prevented the operation from completing correctly in error return.
func CreateSession(ctx context.Context, prm CreateSessionPrm) (res CreateSessionRes, err error) {
res.cliRes, err = prm.cli.SessionCreate(ctx, prm.PrmSessionCreate)
res.cliRes, err = prm.cli.SessionCreate(ctx, prm.signer, prm.PrmSessionCreate)

return
}
Expand Down Expand Up @@ -421,64 +424,48 @@ func PutObject(ctx context.Context, prm PutObjectPrm) (*PutObjectRes, error) {

putPrm.WithXHeaders(prm.xHeaders...)

wrt, err := prm.cli.ObjectPutInit(ctx, putPrm)
wrt, err := prm.cli.ObjectPutInit(ctx, *prm.hdr, prm.signer, putPrm)
if err != nil {
return nil, fmt.Errorf("init object writing: %w", err)
}

if wrt.WriteHeader(*prm.hdr) {
if prm.headerCallback != nil {
prm.headerCallback(prm.hdr)
}

sz := prm.hdr.PayloadSize()
if prm.headerCallback != nil {
prm.headerCallback(prm.hdr)
}

if data := prm.hdr.Payload(); len(data) > 0 {
if prm.rdr != nil {
prm.rdr = io.MultiReader(bytes.NewReader(data), prm.rdr)
} else {
prm.rdr = bytes.NewReader(data)
sz = uint64(len(data))
}
}
sz := prm.hdr.PayloadSize()

if data := prm.hdr.Payload(); len(data) > 0 {
if prm.rdr != nil {
const defaultBufferSizePut = 3 << 20 // Maximum chunk size is 3 MiB in the SDK.

if sz == 0 || sz > defaultBufferSizePut {
sz = defaultBufferSizePut
}

buf := make([]byte, sz)

var n int
prm.rdr = io.MultiReader(bytes.NewReader(data), prm.rdr)
} else {
prm.rdr = bytes.NewReader(data)
sz = uint64(len(data))
}
}

for {
n, err = prm.rdr.Read(buf)
if n > 0 {
if !wrt.WritePayloadChunk(buf[:n]) {
break
}
if prm.rdr != nil {
const defaultBufferSizePut = 3 << 20 // Maximum chunk size is 3 MiB in the SDK.

continue
}
if sz == 0 || sz > defaultBufferSizePut {
sz = defaultBufferSizePut
}

if errors.Is(err, io.EOF) {
break
}
buf := make([]byte, sz)

return nil, fmt.Errorf("read payload: %w", err)
}
_, err = io.CopyBuffer(wrt, prm.rdr, buf)
if err != nil {
return nil, fmt.Errorf("copy data into object stream: %w", err)
}
}

cliRes, err := wrt.Close()
if err != nil { // here err already carries both status and client errors
return nil, fmt.Errorf("client failure: %w", err)
err = wrt.Close()
if err != nil {
return nil, fmt.Errorf("finish object stream: %w", err)
}

return &PutObjectRes{
id: cliRes.StoredObjectID(),
id: wrt.GetResult().StoredObjectID(),
}, nil
}

Expand Down Expand Up @@ -514,7 +501,7 @@ func DeleteObject(ctx context.Context, prm DeleteObjectPrm) (*DeleteObjectRes, e

delPrm.WithXHeaders(prm.xHeaders...)

cliRes, err := prm.cli.ObjectDelete(ctx, prm.objAddr.Container(), prm.objAddr.Object(), delPrm)
cliRes, err := prm.cli.ObjectDelete(ctx, prm.objAddr.Container(), prm.objAddr.Object(), prm.signer, delPrm)
if err != nil {
return nil, fmt.Errorf("remove object via client: %w", err)
}
Expand Down Expand Up @@ -576,16 +563,11 @@ func GetObject(ctx context.Context, prm GetObjectPrm) (*GetObjectRes, error) {

getPrm.WithXHeaders(prm.xHeaders...)

rdr, err := prm.cli.ObjectGetInit(ctx, prm.objAddr.Container(), prm.objAddr.Object(), getPrm)
hdr, rdr, err := prm.cli.ObjectGetInit(ctx, prm.objAddr.Container(), prm.objAddr.Object(), prm.signer, getPrm)
if err != nil {
return nil, fmt.Errorf("init object reading on client: %w", err)
}

var hdr object.Object

if !rdr.ReadHeader(&hdr) {
return nil, fmt.Errorf("read object header: %w", rdr.Close())
}
if prm.headerCallback != nil {
prm.headerCallback(&hdr)
}
Expand Down Expand Up @@ -649,7 +631,7 @@ func HeadObject(ctx context.Context, prm HeadObjectPrm) (*HeadObjectRes, error)

cliPrm.WithXHeaders(prm.xHeaders...)

res, err := prm.cli.ObjectHead(ctx, prm.objAddr.Container(), prm.objAddr.Object(), cliPrm)
res, err := prm.cli.ObjectHead(ctx, prm.objAddr.Container(), prm.objAddr.Object(), prm.signer, cliPrm)
if err != nil {
return nil, fmt.Errorf("read object header via client: %w", err)
}
Expand Down Expand Up @@ -709,7 +691,7 @@ func SearchObjects(ctx context.Context, prm SearchObjectsPrm) (*SearchObjectsRes

cliPrm.WithXHeaders(prm.xHeaders...)

rdr, err := prm.cli.ObjectSearchInit(ctx, prm.cnrID, cliPrm)
rdr, err := prm.cli.ObjectSearchInit(ctx, prm.cnrID, prm.signer, cliPrm)
if err != nil {
return nil, fmt.Errorf("init object search: %w", err)
}
Expand Down Expand Up @@ -812,7 +794,7 @@ func HashPayloadRanges(ctx context.Context, prm HashPayloadRangesPrm) (*HashPayl

cliPrm.WithXHeaders(prm.xHeaders...)

res, err := prm.cli.ObjectHash(ctx, prm.objAddr.Container(), prm.objAddr.Object(), cliPrm)
res, err := prm.cli.ObjectHash(ctx, prm.objAddr.Container(), prm.objAddr.Object(), prm.signer, cliPrm)
if err != nil {
return nil, fmt.Errorf("read payload hashes via client: %w", err)
}
Expand Down Expand Up @@ -867,7 +849,7 @@ func PayloadRange(ctx context.Context, prm PayloadRangePrm) (*PayloadRangeRes, e

cliPrm.WithXHeaders(prm.xHeaders...)

rdr, err := prm.cli.ObjectRangeInit(ctx, prm.objAddr.Container(), prm.objAddr.Object(), prm.rng.GetOffset(), prm.rng.GetLength(), cliPrm)
rdr, err := prm.cli.ObjectRangeInit(ctx, prm.objAddr.Container(), prm.objAddr.Object(), prm.rng.GetOffset(), prm.rng.GetLength(), prm.signer, cliPrm)
if err != nil {
return nil, fmt.Errorf("init payload reading: %w", err)
}
Expand Down
21 changes: 21 additions & 0 deletions cmd/neofs-cli/internal/client/prm.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package internal

import (
"crypto/ecdsa"
"io"

"github.com/nspcc-dev/neofs-sdk-go/bearer"
"github.com/nspcc-dev/neofs-sdk-go/client"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/session"
"github.com/nspcc-dev/neofs-sdk-go/user"
)

// here are small structures with public setters to share between parameter structures
Expand Down Expand Up @@ -68,6 +70,7 @@ func (x *payloadWriterPrm) SetPayloadWriter(wrt io.Writer) {
type commonObjectPrm struct {
commonPrm
bearerTokenPrm
signerPrm

sessionToken *session.Object

Expand All @@ -90,3 +93,21 @@ func (x *commonObjectPrm) SetXHeaders(hs []string) {
func (x *commonObjectPrm) SetSessionToken(tok *session.Object) {
x.sessionToken = tok
}

type signerPrm struct {
signer user.Signer
}

// SetPrivateKey sets ecdsa.PrivateKey to be used for the operation.
func (x *signerPrm) SetPrivateKey(key ecdsa.PrivateKey) {
x.signer = user.NewAutoIDSigner(key)
}

type signerRFC6979Prm struct {
signer user.Signer
}

// SetPrivateKey sets ecdsa.PrivateKey to be used for the operation.
func (p *signerRFC6979Prm) SetPrivateKey(key ecdsa.PrivateKey) {
p.signer = user.NewAutoIDSignerRFC6979(key)
}
41 changes: 25 additions & 16 deletions cmd/neofs-cli/internal/client/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,51 +2,48 @@ package internal

import (
"context"
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
"errors"
"fmt"
"time"

"github.com/nspcc-dev/neofs-node/cmd/neofs-cli/internal/common"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-sdk-go/client"
neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

var errInvalidEndpoint = errors.New("provided RPC endpoint is incorrect")

// GetSDKClientByFlag returns default neofs-sdk-go client using the specified flag for the address.
// On error, outputs to stderr of cmd and exits with non-zero code.
func GetSDKClientByFlag(ctx context.Context, cmd *cobra.Command, key *ecdsa.PrivateKey, endpointFlag string) *client.Client {
cli, err := getSDKClientByFlag(ctx, cmd, key, endpointFlag)
func GetSDKClientByFlag(ctx context.Context, cmd *cobra.Command, endpointFlag string) *client.Client {
cli, err := getSDKClientByFlag(ctx, cmd, endpointFlag)
if err != nil {
common.ExitOnErr(cmd, "can't create API client: %w", err)
}
return cli
}

func getSDKClientByFlag(ctx context.Context, cmd *cobra.Command, key *ecdsa.PrivateKey, endpointFlag string) (*client.Client, error) {
func getSDKClientByFlag(ctx context.Context, cmd *cobra.Command, endpointFlag string) (*client.Client, error) {
var addr network.Address

err := addr.FromString(viper.GetString(endpointFlag))
if err != nil {
return nil, fmt.Errorf("%v: %w", errInvalidEndpoint, err)
}
return GetSDKClient(ctx, cmd, key, addr)
return GetSDKClient(ctx, cmd, addr)
}

// GetSDKClient returns default neofs-sdk-go client.
func GetSDKClient(ctx context.Context, cmd *cobra.Command, key *ecdsa.PrivateKey, addr network.Address) (*client.Client, error) {
func GetSDKClient(ctx context.Context, cmd *cobra.Command, addr network.Address) (*client.Client, error) {
var (
prmInit client.PrmInit
prmDial client.PrmDial
)

prmInit.SetDefaultSigner(neofsecdsa.SignerRFC6979(*key))
prmDial.SetServerURI(addr.URIAddr())
prmDial.SetContext(ctx)

Expand All @@ -67,6 +64,23 @@ func GetSDKClient(ctx context.Context, cmd *cobra.Command, key *ecdsa.PrivateKey
}

if err := c.Dial(prmDial); err != nil { //nolint:contextcheck // SetContext is used above.
// Here is a hack helping IR healthcheck to work. Current API client revision
// calls NetmapService.EndpointInfo RPC which is a part of the NeoFS API
// protocol. Inner ring nodes don't serve NeoFS API services, so they respond
// with Unimplemented code. We ignore this error here:
// - if nodes responds, then dial was successful
// - even if we connect to storage node which MUST provide NeoFS API services,
// subsequent EndpointInfo method will return Unimplemented error anyway
// This behavior is going to be fixed on SDK side.
//
// Track https://github.com/nspcc-dev/neofs-node/issues/2477
wErr := err
for e := errors.Unwrap(wErr); e != nil; e = errors.Unwrap(wErr) {
wErr = e
}
if status.Code(wErr) == codes.Unimplemented {
return c, nil
}
return nil, fmt.Errorf("can't init SDK client: %w", err)
}

Expand All @@ -81,12 +95,7 @@ func GetCurrentEpoch(ctx context.Context, cmd *cobra.Command, endpoint string) (
return 0, fmt.Errorf("can't parse RPC endpoint: %w", err)
}

key, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
if err != nil {
return 0, fmt.Errorf("can't generate key to sign query: %w", err)
}

c, err := GetSDKClient(ctx, cmd, key, addr)
c, err := GetSDKClient(ctx, cmd, addr)
if err != nil {
return 0, err
}
Expand Down
Loading

0 comments on commit a7541ff

Please sign in to comment.