diff --git a/client/container_statistic_test.go b/client/container_statistic_test.go index f6145f59..8c8c51bf 100644 --- a/client/container_statistic_test.go +++ b/client/container_statistic_test.go @@ -620,9 +620,10 @@ func TestClientStatistic_ObjectPut(t *testing.T) { writer, err := c.ObjectPutInit(ctx, hdr, signer, prm) require.NoError(t, err) - require.True(t, writer.WritePayloadChunk(randBytes(10))) + _, err = writer.Write(randBytes(10)) + require.NoError(t, err) - _, err = writer.Close() + err = writer.Close() require.NoError(t, err) require.Equal(t, 2, collector.methods[stat.MethodObjectPut].requests) diff --git a/client/object_put.go b/client/object_put.go index f9492aef..8773f284 100644 --- a/client/object_put.go +++ b/client/object_put.go @@ -11,11 +11,9 @@ import ( rpcapi "github.com/nspcc-dev/neofs-api-go/v2/rpc" "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" "github.com/nspcc-dev/neofs-sdk-go/bearer" - cid "github.com/nspcc-dev/neofs-sdk-go/container/id" neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" - "github.com/nspcc-dev/neofs-sdk-go/object/slicer" "github.com/nspcc-dev/neofs-sdk-go/stat" "github.com/nspcc-dev/neofs-sdk-go/user" ) @@ -58,10 +56,15 @@ func (x ResObjectPut) StoredObjectID() oid.ID { } // ObjectWriter is designed to write one object to NeoFS system. +type ObjectWriter interface { + io.WriteCloser + GetResult() ResObjectPut +} + +// DefaultObjectWriter implements [ObjectWriter]. // -// Must be initialized using Client.ObjectPutInit, any other -// usage is unsafe. -type ObjectWriter struct { +// Must be initialized using [Client.ObjectPutInit], any other usage is unsafe. +type DefaultObjectWriter struct { cancelCtxStream context.CancelFunc client *Client @@ -106,8 +109,8 @@ func (x *PrmObjectPutInit) WithXHeaders(hs ...string) { } // writeHeader writes header of the object. Result means success. -// Failure reason can be received via Close. -func (x *ObjectWriter) writeHeader(hdr object.Object) error { +// Failure reason can be received via [DefaultObjectWriter.Close]. +func (x *DefaultObjectWriter) writeHeader(hdr object.Object) error { if x.statisticCallback != nil { defer func() { x.statisticCallback(x.err) @@ -134,8 +137,8 @@ func (x *ObjectWriter) writeHeader(hdr object.Object) error { } // WritePayloadChunk writes chunk of the object payload. Result means success. -// Failure reason can be received via Close. -func (x *ObjectWriter) WritePayloadChunk(chunk []byte) bool { +// Failure reason can be received via [DefaultObjectWriter.Close]. +func (x *DefaultObjectWriter) Write(chunk []byte) (n int, err error) { if x.statisticCallback != nil { defer func() { x.statisticCallback(x.err) @@ -147,6 +150,8 @@ func (x *ObjectWriter) WritePayloadChunk(chunk []byte) bool { x.req.GetBody().SetObjectPart(&x.partChunk) } + var writtenBytes int + for ln := len(chunk); ln > 0; ln = len(chunk) { // maxChunkLen restricts maximum byte length of the chunk // transmitted in a single stream message. It depends on @@ -174,22 +179,23 @@ func (x *ObjectWriter) WritePayloadChunk(chunk []byte) bool { x.err = signServiceMessage(x.signer, &x.req) if x.err != nil { x.err = fmt.Errorf("sign message: %w", x.err) - return false + return writtenBytes, x.err } x.err = x.stream.Write(&x.req) if x.err != nil { - return false + return writtenBytes, x.err } + writtenBytes += len(chunk[:ln]) chunk = chunk[ln:] } - return true + return writtenBytes, nil } // Close ends writing the object and returns the result of the operation -// along with the final results. Must be called after using the ObjectWriter. +// along with the final results. Must be called after using the [DefaultObjectWriter]. // // Exactly one return value is non-nil. By default, server status is returned in res structure. // Any client's internal or transport errors are returned as Go built-in error. @@ -204,7 +210,7 @@ func (x *ObjectWriter) WritePayloadChunk(chunk []byte) bool { // - [apistatus.ErrLockNonRegularObject] // - [apistatus.ErrSessionTokenNotFound] // - [apistatus.ErrSessionTokenExpired] -func (x *ObjectWriter) Close() (*ResObjectPut, error) { +func (x *DefaultObjectWriter) Close() error { var err error if x.statisticCallback != nil { defer func() { @@ -219,17 +225,17 @@ func (x *ObjectWriter) Close() (*ResObjectPut, error) { // message. Server returns an error in response message (in status). if x.err != nil && !errors.Is(x.err, io.EOF) { err = x.err - return nil, err + return err } if x.err = x.stream.Close(); x.err != nil { err = x.err - return nil, err + return err } if x.err = x.client.processResponse(&x.respV2); x.err != nil { err = x.err - return nil, err + return err } const fieldID = "ID" @@ -237,7 +243,7 @@ func (x *ObjectWriter) Close() (*ResObjectPut, error) { idV2 := x.respV2.GetBody().GetObjectID() if idV2 == nil { err = newErrMissingResponseField(fieldID) - return nil, err + return err } x.err = x.res.obj.ReadFromV2(*idV2) @@ -246,27 +252,33 @@ func (x *ObjectWriter) Close() (*ResObjectPut, error) { err = x.err } - return &x.res, nil + return nil +} + +// GetResult returns the put operation result. +func (x *DefaultObjectWriter) GetResult() ResObjectPut { + return x.res } // ObjectPutInit initiates writing an object through a remote server using NeoFS API protocol. // -// The call only opens the transmission channel, explicit recording is done using the ObjectWriter. +// The call only opens the transmission channel, explicit recording is done using the [ObjectWriter]. // Exactly one return value is non-nil. Resulting writer must be finally closed. // -// Context is required and must not be nil. It is used for network communication. +// Context is required and must not be nil. It will be used for network communication for the whole object transmission, +// including put init (this method) and subsequent object payload writes via ObjectWriter. // // Signer is required and must not be nil. The operation is executed on behalf of // the account corresponding to the specified Signer, which is taken into account, in particular, for access control. // // Returns errors: // - [ErrMissingSigner] -func (c *Client) ObjectPutInit(ctx context.Context, hdr object.Object, signer user.Signer, prm PrmObjectPutInit) (*ObjectWriter, error) { +func (c *Client) ObjectPutInit(ctx context.Context, hdr object.Object, signer user.Signer, prm PrmObjectPutInit) (ObjectWriter, error) { var err error defer func() { c.sendStatistic(stat.MethodObjectPut, err)() }() - var w ObjectWriter + var w DefaultObjectWriter w.statisticCallback = func(err error) { c.sendStatistic(stat.MethodObjectPutStream, err)() } @@ -292,102 +304,10 @@ func (c *Client) ObjectPutInit(ctx context.Context, hdr object.Object, signer us c.prepareRequest(&w.req, &prm.meta) if err = w.writeHeader(hdr); err != nil { - _, _ = w.Close() + _ = w.Close() err = fmt.Errorf("header write: %w", err) return nil, err } return &w, nil } - -type objectWriter struct { - context context.Context - client *Client -} - -func (x *objectWriter) InitDataStream(header object.Object, signer user.Signer) (io.Writer, error) { - var prm PrmObjectPutInit - - stream, err := x.client.ObjectPutInit(x.context, header, signer, prm) - if err != nil { - return nil, fmt.Errorf("init object stream: %w", err) - } - - return &payloadWriter{ - stream: stream, - }, nil -} - -type payloadWriter struct { - stream *ObjectWriter -} - -func (x *payloadWriter) Write(p []byte) (int, error) { - if !x.stream.WritePayloadChunk(p) { - return 0, x.Close() - } - - return len(p), nil -} - -func (x *payloadWriter) Close() error { - _, err := x.stream.Close() - if err != nil { - return err - } - - return nil -} - -// CreateObject creates new NeoFS object with given payload data and stores it -// in specified container of the NeoFS network using provided Client connection. -// The object is created on behalf of provided neofscrypto.Signer, and owned by -// the specified user.ID. -// -// In terms of NeoFS, parameterized neofscrypto.Signer represents object owner, -// object signer and request sender. Container SHOULD be public-write or sender -// SHOULD have corresponding rights. -// -// Client connection MUST be opened in advance, see Dial method for details. -// Network communication is carried out within a given context, so it MUST NOT -// be nil. -// -// Notice: This API is EXPERIMENTAL and is planned to be replaced/changed in the -// future. Be ready to refactor your code regarding imports and call mechanics, -// in essence the operation will not change. -func CreateObject(ctx context.Context, cli *Client, signer user.Signer, cnr cid.ID, owner user.ID, data io.Reader, attributes ...string) (oid.ID, error) { - s, err := NewDataSlicer(ctx, cli, signer, cnr, owner) - if err != nil { - return oid.ID{}, err - } - - return s.Slice(data, attributes...) -} - -// NewDataSlicer creates slicer.Slicer that saves data in the NeoFS network -// through provided Client. The data is packaged into NeoFS objects stored in -// the specified container. Provided signer is being used to sign the resulting -// objects as a system requirement. Produced objects are owned by the -// parameterized NeoFS user. -// -// Notice: This API is EXPERIMENTAL and is planned to be replaced/changed in the -// future. Be ready to refactor your code regarding imports and call mechanics, -// in essence the operation will not change. -func NewDataSlicer(ctx context.Context, cli *Client, signer user.Signer, cnr cid.ID, owner user.ID) (*slicer.Slicer, error) { - netInfo, err := cli.NetworkInfo(ctx, PrmNetworkInfo{}) - if err != nil { - return nil, fmt.Errorf("read current network info: %w", err) - } - - var opts slicer.Options - opts.SetObjectPayloadLimit(netInfo.MaxObjectSize()) - opts.SetCurrentNeoFSEpoch(netInfo.CurrentEpoch()) - if !netInfo.HomomorphicHashingDisabled() { - opts.CalculateHomomorphicChecksum() - } - - return slicer.New(signer, cnr, owner, &objectWriter{ - context: ctx, - client: cli, - }, opts), nil -} diff --git a/object/object.go b/object/object.go index c956bad6..db0fe1e0 100644 --- a/object/object.go +++ b/object/object.go @@ -111,6 +111,14 @@ func (o *Object) SetID(v oid.ID) { SetObjectID(&v2) } +// ResetID removes object identifier. +// +// See also [Object.SetID]. +func (o *Object) ResetID() { + (*object.Object)(o). + SetObjectID(nil) +} + // Signature returns signature of the object identifier. // // See also [Object.SetSignature]. @@ -536,6 +544,15 @@ func (o *Object) SetParentID(v oid.ID) { }) } +// ResetParentID removes identifier of the parent object. +// +// See also [Object.SetParentID]. +func (o *Object) ResetParentID() { + o.setSplitFields(func(split *object.SplitHeader) { + split.SetParent(nil) + }) +} + // Parent returns parent object w/o payload. // // See also [Object.SetParent]. diff --git a/object/slicer/options.go b/object/slicer/options.go index 2e6bf888..65e8731f 100644 --- a/object/slicer/options.go +++ b/object/slicer/options.go @@ -1,5 +1,9 @@ package slicer +import ( + "github.com/nspcc-dev/neofs-sdk-go/session" +) + // Options groups Slicer options. type Options struct { objectPayloadLimit uint64 @@ -7,6 +11,8 @@ type Options struct { currentNeoFSEpoch uint64 withHomoChecksum bool + + sessionToken *session.Object } // SetObjectPayloadLimit specifies data size limit for produced physically @@ -25,3 +31,28 @@ func (x *Options) SetCurrentNeoFSEpoch(e uint64) { func (x *Options) CalculateHomomorphicChecksum() { x.withHomoChecksum = true } + +// SetSession sets session object. +func (x *Options) SetSession(sess *session.Object) { + x.sessionToken = sess +} + +// ObjectPayloadLimit returns required max object size. +func (x *Options) ObjectPayloadLimit() uint64 { + return x.objectPayloadLimit +} + +// CurrentNeoFSEpoch returns epoch. +func (x *Options) CurrentNeoFSEpoch() uint64 { + return x.currentNeoFSEpoch +} + +// IsHomomorphicChecksumEnabled indicates homomorphic checksum calculation status. +func (x *Options) IsHomomorphicChecksumEnabled() bool { + return x.withHomoChecksum +} + +// Session returns session object. +func (x *Options) Session() *session.Object { + return x.sessionToken +} diff --git a/object/slicer/slicer.go b/object/slicer/slicer.go index 55c4ecb8..8b63ab49 100644 --- a/object/slicer/slicer.go +++ b/object/slicer/slicer.go @@ -2,6 +2,7 @@ package slicer import ( "bytes" + "context" "crypto/sha256" "errors" "fmt" @@ -9,8 +10,10 @@ import ( "io" "github.com/nspcc-dev/neofs-sdk-go/checksum" + "github.com/nspcc-dev/neofs-sdk-go/client" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" + "github.com/nspcc-dev/neofs-sdk-go/netmap" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/session" @@ -20,19 +23,26 @@ import ( ) var ( - // ErrInvalidAttributeAmount indicates wrong number of arguments. Amount of arguments MUST be even number. - ErrInvalidAttributeAmount = errors.New("attributes must be even number of strings") + // ErrIncompleteHeader indicates some fields are missing in header. + ErrIncompleteHeader = errors.New("incomplete header") ) // ObjectWriter represents a virtual object recorder. type ObjectWriter interface { - // InitDataStream initializes and returns a stream of writable data associated + // ObjectPutInit initializes and returns a stream of writable data associated // with the object according to its header. Provided header includes at least // container, owner and object ID fields. // // Signer is required and must not be nil. The operation is executed on behalf of // the account corresponding to the specified Signer, which is taken into account, in particular, for access control. - InitDataStream(header object.Object, signer user.Signer) (dataStream io.Writer, err error) + ObjectPutInit(ctx context.Context, hdr object.Object, signer user.Signer, prm client.PrmObjectPutInit) (client.ObjectWriter, error) +} + +// NetworkedClient represents a virtual object recorder with possibility to get actual [netmap.NetworkInfo] data. +type NetworkedClient interface { + ObjectWriter + + NetworkInfo(ctx context.Context, prm client.PrmNetworkInfo) (netmap.NetworkInfo, error) } // Slicer converts input raw data streams into NeoFS objects. Working Slicer @@ -40,15 +50,11 @@ type ObjectWriter interface { type Slicer struct { signer user.Signer - cnr cid.ID - - owner user.ID - w ObjectWriter opts Options - sessionToken *session.Object + hdr object.Object } // New constructs Slicer which writes sliced ready-to-go objects owned by @@ -59,8 +65,8 @@ type Slicer struct { // in Slicer.Slice after the payload of any object has been written. In this // case, Slicer.Slice fails immediately on Close error. // -// Options parameter allows you to provide optional parameters which tune -// the default Slicer behavior. They are detailed below. +// NetworkedClient parameter allows to extract all required network-depended information for default Slicer behavior +// tuning. // // If payload size limit is specified via Options.SetObjectPayloadLimit, // outgoing objects has payload not bigger than the limit. NeoFS stores the @@ -75,259 +81,204 @@ type Slicer struct { // within the limit, one object is produced. Note that Slicer can write multiple // objects, but returns the root object ID only. // -// If current NeoFS epoch is specified via Options.SetCurrentNeoFSEpoch, it is -// written to the metadata of all resulting objects as a creation epoch. -// -// See also NewSession. -func New(signer user.Signer, cnr cid.ID, owner user.ID, w ObjectWriter, opts Options) *Slicer { +// Parameter sessionToken may be nil, if no session is used. +func New(ctx context.Context, nw NetworkedClient, signer user.Signer, cnr cid.ID, owner user.ID, sessionToken *session.Object) (*Slicer, error) { + ni, err := nw.NetworkInfo(ctx, client.PrmNetworkInfo{}) + if err != nil { + return nil, fmt.Errorf("network info: %w", err) + } + + opts := Options{ + objectPayloadLimit: ni.MaxObjectSize(), + currentNeoFSEpoch: ni.CurrentEpoch(), + sessionToken: sessionToken, + } + + if !ni.HomomorphicHashingDisabled() { + opts.CalculateHomomorphicChecksum() + } + + var hdr object.Object + hdr.SetContainerID(cnr) + hdr.SetType(object.TypeRegular) + hdr.SetOwnerID(&owner) + hdr.SetCreationEpoch(ni.CurrentEpoch()) + hdr.SetSessionToken(sessionToken) return &Slicer{ - signer: signer, - cnr: cnr, - owner: owner, - w: w, opts: opts, - } + w: nw, + signer: signer, + hdr: hdr, + }, nil } -// NewSession creates Slicer which generates objects within provided session. -// NewSession work similar to New with the detail that the session issuer owns -// the produced objects. Specified session token is written to the metadata of -// all resulting objects. In this case, the object is considered to be created -// by a proxy on behalf of the session issuer. -func NewSession(signer user.Signer, cnr cid.ID, token session.Object, w ObjectWriter, opts Options) *Slicer { - return &Slicer{ - signer: signer, - cnr: cnr, - owner: token.Issuer(), - w: w, - opts: opts, - sessionToken: &token, - } +// Put creates new NeoFS object from the input data stream, associates the +// object with the configured container and writes the object via underlying +// [ObjectWriter]. After a successful write, Put returns an [oid.ID] which is a +// unique reference to the object in the container. Put sets all required +// calculated fields like payload length, checksum, etc. +// +// Put allows you to specify [object.Attribute] parameters to be written to the +// resulting object's metadata. Keys SHOULD NOT start with system-reserved +// '__NEOFS__' prefix. +// +// See [New] for details. +func (x *Slicer) Put(ctx context.Context, data io.Reader, attrs []object.Attribute) (oid.ID, error) { + x.hdr.SetAttributes(attrs...) + return slice(ctx, x.w, x.hdr, data, x.signer, x.opts) } -// fillCommonMetadata writes to the object metadata common to all objects of the -// same stream. -func (x *Slicer) fillCommonMetadata(obj *object.Object) { - currentVersion := version.Current() - obj.SetVersion(¤tVersion) - obj.SetContainerID(x.cnr) - obj.SetCreationEpoch(x.opts.currentNeoFSEpoch) - obj.SetType(object.TypeRegular) - obj.SetOwnerID(&x.owner) - obj.SetSessionToken(x.sessionToken) +// InitPut works similar to [Slicer.Put] but provides [PayloadWriter] allowing +// the caller to write data himself. +func (x *Slicer) InitPut(ctx context.Context, attrs []object.Attribute) (*PayloadWriter, error) { + x.hdr.SetAttributes(attrs...) + return initPayloadStream(ctx, x.w, x.hdr, x.signer, x.opts) +} + +// Put works similar to [Slicer.Put], but allows flexible configuration of object header. +// The method accepts [Options] for adjusting max object size, epoch, session token, etc. +func Put(ctx context.Context, ow ObjectWriter, header object.Object, signer user.Signer, data io.Reader, opts Options) (oid.ID, error) { + return slice(ctx, ow, header, data, signer, opts) +} + +// InitPut works similar to [slicer.Put], but provides [ObjectWriter] allowing +// the caller to write data himself. +func InitPut(ctx context.Context, ow ObjectWriter, header object.Object, signer user.Signer, opts Options) (*PayloadWriter, error) { + return initPayloadStream(ctx, ow, header, signer, opts) } const defaultPayloadSizeLimit = 1 << 20 // childPayloadSizeLimit returns configured size limit of the child object's // payload which defaults to 1MB. -func (x *Slicer) childPayloadSizeLimit() uint64 { - if x.opts.objectPayloadLimit > 0 { - return x.opts.objectPayloadLimit +func childPayloadSizeLimit(opts Options) uint64 { + if opts.objectPayloadLimit > 0 { + return opts.objectPayloadLimit } return defaultPayloadSizeLimit } -// Slice creates new NeoFS object from the input data stream, associates the -// object with the configured container and writes the object via underlying -// ObjectWriter. After a successful write, Slice returns an oid.ID which is a -// unique reference to the object in the container. Slice sets all required -// calculated fields like payload length, checksum, etc. -// -// Slice allows you to specify string key-value pairs to be written to the -// resulting object's metadata as object attributes. Corresponding argument MUST -// NOT be empty or have odd length. Keys SHOULD NOT start with system-reserved -// '__NEOFS__' prefix. -// -// See New for details. -func (x *Slicer) Slice(data io.Reader, attributes ...string) (oid.ID, error) { +func slice(ctx context.Context, ow ObjectWriter, header object.Object, data io.Reader, signer user.Signer, opts Options) (oid.ID, error) { var rootID oid.ID - if len(attributes)%2 != 0 { - return rootID, ErrInvalidAttributeAmount - } - - if x.opts.objectPayloadLimit == 0 { - x.opts.objectPayloadLimit = 1 << 20 - } + objectPayloadLimit := childPayloadSizeLimit(opts) - var rootHeader object.Object - var offset uint64 - var isSplit bool - var childMeta dynamicObjectMetadata - var writtenChildren []oid.ID - var childHeader object.Object - rootMeta := newDynamicObjectMetadata(x.opts.withHomoChecksum) - bChunk := make([]byte, x.opts.objectPayloadLimit+1) + var n int + bChunk := make([]byte, objectPayloadLimit) - x.fillCommonMetadata(&rootHeader) + writer, err := initPayloadStream(ctx, ow, header, signer, opts) + if err != nil { + return rootID, fmt.Errorf("init writter: %w", err) + } for { - n, err := data.Read(bChunk[offset:]) - if err == nil { - if last := offset + uint64(n); last <= x.opts.objectPayloadLimit { - rootMeta.accumulateNextPayloadChunk(bChunk[offset:last]) - if isSplit { - childMeta.accumulateNextPayloadChunk(bChunk[offset:last]) - } - offset = last - // data is not over, and we expect more bytes to form next object - continue - } - } else { + n, err = data.Read(bChunk) + if err != nil { if !errors.Is(err, io.EOF) { return rootID, fmt.Errorf("read payload chunk: %w", err) } - // there will be no more data - - toSend := offset + uint64(n) - if toSend <= x.opts.objectPayloadLimit { - // we can finalize the root object and send last part - - if len(attributes) > 0 { - attrs := make([]object.Attribute, len(attributes)/2) - - for i := 0; i < len(attrs); i++ { - attrs[i].SetKey(attributes[2*i]) - attrs[i].SetValue(attributes[2*i+1]) - } - - rootHeader.SetAttributes(attrs...) - } + // no more data to read - rootID, err = flushObjectMetadata(x.signer, rootMeta, &rootHeader) - if err != nil { - return rootID, fmt.Errorf("form root object: %w", err) - } - - if isSplit { - // when splitting, root object's header is written into its last child - childHeader.SetParent(&rootHeader) - childHeader.SetPreviousID(writtenChildren[len(writtenChildren)-1]) - - childID, err := writeInMemObject(x.signer, x.w, childHeader, bChunk[:toSend], childMeta) - if err != nil { - return rootID, fmt.Errorf("write child object: %w", err) - } - - writtenChildren = append(writtenChildren, childID) - } else { - // root object is single (full < limit), so send it directly - rootID, err = writeInMemObject(x.signer, x.w, rootHeader, bChunk[:toSend], rootMeta) - if err != nil { - return rootID, fmt.Errorf("write single root object: %w", err) - } - - return rootID, nil - } - - break + if err = writer.Close(); err != nil { + return rootID, fmt.Errorf("writer close: %w", err) } - // otherwise, form penultimate object, then do one more iteration for - // simplicity: according to io.Reader, we'll get io.EOF again, but the overflow - // will no longer occur, so we'll finish the loop + rootID = writer.ID() + break } - // according to buffer size, here we can overflow the object payload limit, e.g. - // 1. full=11B,limit=10B,read=11B (no objects created yet) - // 2. full=21B,limit=10B,read=11B (one object has been already sent with size=10B) - - toSend := offset + uint64(n) - overflow := toSend > x.opts.objectPayloadLimit - if overflow { - toSend = x.opts.objectPayloadLimit + if _, err = writer.Write(bChunk[:n]); err != nil { + return oid.ID{}, err } + } - // we could read some data even in case of io.EOF, so don't forget pick up the tail - if n > 0 { - rootMeta.accumulateNextPayloadChunk(bChunk[offset:toSend]) - if isSplit { - childMeta.accumulateNextPayloadChunk(bChunk[offset:toSend]) - } - } + return rootID, nil +} - if overflow { - isSplitCp := isSplit // we modify it in next condition below but need after it - if !isSplit { - // we send only child object below, but we can get here at the beginning (see - // option 1 described above), so we need to pre-init child resources - isSplit = true - x.fillCommonMetadata(&childHeader) - childHeader.SetSplitID(object.NewSplitID()) - childMeta = rootMeta - // we do shallow copy of rootMeta because below we take this into account and do - // not corrupt it - } else { - childHeader.SetPreviousID(writtenChildren[len(writtenChildren)-1]) - } +// headerData extract required fields from header, otherwise throw the error. +func headerData(header object.Object) (cid.ID, user.ID, error) { + containerID, isSet := header.ContainerID() + if !isSet { + return cid.ID{}, user.ID{}, fmt.Errorf("container-id: %w", ErrIncompleteHeader) + } - childID, err := writeInMemObject(x.signer, x.w, childHeader, bChunk[:toSend], childMeta) - if err != nil { - return rootID, fmt.Errorf("write child object: %w", err) - } + owner := header.OwnerID() + if owner == nil { + return cid.ID{}, user.ID{}, fmt.Errorf("owner: %w", ErrIncompleteHeader) + } - writtenChildren = append(writtenChildren, childID) + return containerID, *owner, nil +} - // shift overflow bytes to the beginning - if !isSplitCp { - childMeta = newDynamicObjectMetadata(x.opts.withHomoChecksum) // to avoid rootMeta corruption - } - childMeta.reset() - childMeta.accumulateNextPayloadChunk(bChunk[toSend:]) - rootMeta.accumulateNextPayloadChunk(bChunk[toSend:]) - offset = uint64(copy(bChunk, bChunk[toSend:])) - } +func initPayloadStream(ctx context.Context, ow ObjectWriter, header object.Object, signer user.Signer, opts Options) (*PayloadWriter, error) { + containerID, owner, err := headerData(header) + if err != nil { + return nil, err } - // linking object - childMeta.reset() - childHeader.ResetPreviousID() - childHeader.SetChildren(writtenChildren...) + var prm client.PrmObjectPutInit - _, err := writeInMemObject(x.signer, x.w, childHeader, nil, childMeta) - if err != nil { - return rootID, fmt.Errorf("write linking object: %w", err) + if opts.sessionToken != nil { + prm.WithinSession(*opts.sessionToken) + header.SetSessionToken(opts.sessionToken) + // session issuer is a container owner. + issuer := opts.sessionToken.Issuer() + owner = issuer + header.SetOwnerID(&owner) } - return rootID, nil -} + header.SetCreationEpoch(opts.currentNeoFSEpoch) + currentVersion := version.Current() + header.SetVersion(¤tVersion) + + var stubObject object.Object + stubObject.SetVersion(¤tVersion) + stubObject.SetContainerID(containerID) + stubObject.SetCreationEpoch(opts.currentNeoFSEpoch) + stubObject.SetType(object.TypeRegular) + stubObject.SetOwnerID(&owner) + stubObject.SetSessionToken(opts.sessionToken) -// InitPayloadStream works similar to Slice but provides PayloadWriter allowing -// the caller to write data himself. -func (x *Slicer) InitPayloadStream(attributes ...string) (*PayloadWriter, error) { res := &PayloadWriter{ - stream: x.w, - signer: x.signer, - container: x.cnr, - owner: x.owner, - currentEpoch: x.opts.currentNeoFSEpoch, - sessionToken: x.sessionToken, - attributes: attributes, - rootMeta: newDynamicObjectMetadata(x.opts.withHomoChecksum), - childMeta: newDynamicObjectMetadata(x.opts.withHomoChecksum), - } - - res.buf.Grow(int(x.childPayloadSizeLimit())) + ctx: ctx, + isHeaderWriteStep: true, + headerObject: header, + stream: ow, + signer: signer, + container: containerID, + owner: owner, + currentEpoch: opts.currentNeoFSEpoch, + sessionToken: opts.sessionToken, + rootMeta: newDynamicObjectMetadata(opts.withHomoChecksum), + childMeta: newDynamicObjectMetadata(opts.withHomoChecksum), + prmObjectPutInit: prm, + stubObject: &stubObject, + } + + maxObjSize := childPayloadSizeLimit(opts) + + res.buf.Grow(int(maxObjSize)) res.rootMeta.reset() - res.currentWriter = newLimitedWriter(io.MultiWriter(&res.buf, &res.rootMeta), x.childPayloadSizeLimit()) + res.currentWriter = newLimitedWriter(io.MultiWriter(&res.buf, &res.rootMeta), maxObjSize) return res, nil } // PayloadWriter is a single-object payload stream provided by Slicer. type PayloadWriter struct { + ctx context.Context stream ObjectWriter - rootID oid.ID + rootID oid.ID + headerObject object.Object + isHeaderWriteStep bool signer user.Signer container cid.ID owner user.ID currentEpoch uint64 sessionToken *session.Object - attributes []string buf bytes.Buffer @@ -339,7 +290,9 @@ type PayloadWriter struct { withSplit bool splitID *object.SplitID - writtenChildren []oid.ID + writtenChildren []oid.ID + prmObjectPutInit client.PrmObjectPutInit + stubObject *object.Object } // Write writes next chunk of the object data. Concatenation of all chunks forms @@ -362,14 +315,14 @@ func (x *PayloadWriter) Write(chunk []byte) (int, error) { // to fill splitInfo in all child objects. x.withSplit = true - err = x.writeIntermediateChild(x.rootMeta) + err = x.writeIntermediateChild(x.ctx, x.rootMeta) if err != nil { return n, fmt.Errorf("write 1st child: %w", err) } x.currentWriter.reset(io.MultiWriter(&x.buf, &x.rootMeta, &x.childMeta)) } else { - err = x.writeIntermediateChild(x.childMeta) + err = x.writeIntermediateChild(x.ctx, x.childMeta) if err != nil { return n, fmt.Errorf("write next child: %w", err) } @@ -379,6 +332,7 @@ func (x *PayloadWriter) Write(chunk []byte) (int, error) { x.buf.Reset() x.childMeta.reset() + x.isHeaderWriteStep = false n2, err := x.Write(chunk[n:]) // here n > 0 so infinite recursion shouldn't occur @@ -389,9 +343,9 @@ func (x *PayloadWriter) Write(chunk []byte) (int, error) { // the stream. Reference to the stored object can be obtained by ID method. func (x *PayloadWriter) Close() error { if x.withSplit { - return x.writeLastChild(x.childMeta, x.setID) + return x.writeLastChild(x.ctx, x.childMeta, x.setID) } - return x.writeLastChild(x.rootMeta, x.setID) + return x.writeLastChild(x.ctx, x.rootMeta, x.setID) } func (x *PayloadWriter) setID(id oid.ID) { @@ -408,31 +362,26 @@ func (x *PayloadWriter) ID() oid.ID { // writeIntermediateChild writes intermediate split-chain element with specified // dynamicObjectMetadata to the configured ObjectWriter. -func (x *PayloadWriter) writeIntermediateChild(meta dynamicObjectMetadata) error { - return x._writeChild(meta, false, nil) +func (x *PayloadWriter) writeIntermediateChild(ctx context.Context, meta dynamicObjectMetadata) error { + return x._writeChild(ctx, meta, false, nil) } // writeIntermediateChild writes last split-chain element with specified // dynamicObjectMetadata to the configured ObjectWriter. If rootIDHandler is // specified, ID of the resulting root object is passed into it. -func (x *PayloadWriter) writeLastChild(meta dynamicObjectMetadata, rootIDHandler func(id oid.ID)) error { - return x._writeChild(meta, true, rootIDHandler) +func (x *PayloadWriter) writeLastChild(ctx context.Context, meta dynamicObjectMetadata, rootIDHandler func(id oid.ID)) error { + return x._writeChild(ctx, meta, true, rootIDHandler) } -func (x *PayloadWriter) _writeChild(meta dynamicObjectMetadata, last bool, rootIDHandler func(id oid.ID)) error { - currentVersion := version.Current() - - fCommon := func(obj *object.Object) { - obj.SetVersion(¤tVersion) - obj.SetContainerID(x.container) - obj.SetCreationEpoch(x.currentEpoch) - obj.SetType(object.TypeRegular) - obj.SetOwnerID(&x.owner) - obj.SetSessionToken(x.sessionToken) - } +func (x *PayloadWriter) _writeChild(ctx context.Context, meta dynamicObjectMetadata, last bool, rootIDHandler func(id oid.ID)) error { + obj := *x.stubObject + obj.SetSplitID(nil) + obj.ResetPreviousID() + obj.SetParent(nil) + obj.ResetParentID() + obj.SetSignature(nil) + obj.ResetID() - var obj object.Object - fCommon(&obj) if x.withSplit { obj.SetSplitID(x.splitID) } @@ -440,27 +389,7 @@ func (x *PayloadWriter) _writeChild(meta dynamicObjectMetadata, last bool, rootI obj.SetPreviousID(x.writtenChildren[len(x.writtenChildren)-1]) } if last { - var rootObj *object.Object - if x.withSplit { - rootObj = new(object.Object) - } else { - rootObj = &obj - } - - fCommon(rootObj) - - if len(x.attributes) > 0 { - attrs := make([]object.Attribute, len(x.attributes)/2) - - for i := 0; i < len(attrs); i++ { - attrs[i].SetKey(x.attributes[2*i]) - attrs[i].SetValue(x.attributes[2*i+1]) - } - - rootObj.SetAttributes(attrs...) - } - - rootID, err := flushObjectMetadata(x.signer, x.rootMeta, rootObj) + rootID, err := flushObjectMetadata(x.signer, x.rootMeta, &x.headerObject) if err != nil { return fmt.Errorf("form root object: %w", err) } @@ -471,11 +400,21 @@ func (x *PayloadWriter) _writeChild(meta dynamicObjectMetadata, last bool, rootI if x.withSplit { obj.SetParentID(rootID) - obj.SetParent(rootObj) + obj.SetParent(&x.headerObject) } } - id, err := writeInMemObject(x.signer, x.stream, obj, x.buf.Bytes(), meta) + var id oid.ID + var err error + + // The first object must be a header. Note: if object is less than MaxObjectSize, we don't need to slice it. + // Thus, we have a legitimate situation when, last == true and x.isHeaderWriteStep == true. + if x.isHeaderWriteStep { + id, err = writeInMemObject(ctx, x.signer, x.stream, x.headerObject, x.buf.Bytes(), meta, x.prmObjectPutInit) + } else { + id, err = writeInMemObject(ctx, x.signer, x.stream, obj, x.buf.Bytes(), meta, x.prmObjectPutInit) + } + if err != nil { return fmt.Errorf("write formed object: %w", err) } @@ -486,8 +425,11 @@ func (x *PayloadWriter) _writeChild(meta dynamicObjectMetadata, last bool, rootI meta.reset() obj.ResetPreviousID() obj.SetChildren(x.writtenChildren...) + // we reuse already written object, we should reset these fields, to eval them one more time in writeInMemObject. + obj.ResetID() + obj.SetSignature(nil) - _, err = writeInMemObject(x.signer, x.stream, obj, nil, meta) + _, err = writeInMemObject(ctx, x.signer, x.stream, obj, nil, meta, x.prmObjectPutInit) if err != nil { return fmt.Errorf("write linking object: %w", err) } @@ -539,13 +481,23 @@ func flushObjectMetadata(signer neofscrypto.Signer, meta dynamicObjectMetadata, return id, nil } -func writeInMemObject(signer user.Signer, w ObjectWriter, header object.Object, payload []byte, meta dynamicObjectMetadata) (oid.ID, error) { - id, err := flushObjectMetadata(signer, meta, &header) - if err != nil { - return id, err +func writeInMemObject(ctx context.Context, signer user.Signer, w ObjectWriter, header object.Object, payload []byte, meta dynamicObjectMetadata, prm client.PrmObjectPutInit) (oid.ID, error) { + var ( + id oid.ID + err error + isSet bool + ) + + id, isSet = header.ID() + if !isSet || header.Signature() == nil { + id, err = flushObjectMetadata(signer, meta, &header) + + if err != nil { + return id, err + } } - stream, err := w.InitDataStream(header, signer) + stream, err := w.ObjectPutInit(ctx, header, signer, prm) if err != nil { return id, fmt.Errorf("init data stream for next object: %w", err) } diff --git a/object/slicer/slicer_test.go b/object/slicer/slicer_test.go index 644729be..9c4adbff 100644 --- a/object/slicer/slicer_test.go +++ b/object/slicer/slicer_test.go @@ -2,6 +2,7 @@ package slicer_test import ( "bytes" + "context" "crypto/ecdsa" "crypto/elliptic" cryptorand "crypto/rand" @@ -13,10 +14,13 @@ import ( "math/rand" "testing" + netmapv2 "github.com/nspcc-dev/neofs-api-go/v2/netmap" "github.com/nspcc-dev/neofs-sdk-go/checksum" + "github.com/nspcc-dev/neofs-sdk-go/client" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" "github.com/nspcc-dev/neofs-sdk-go/crypto/test" + "github.com/nspcc-dev/neofs-sdk-go/netmap" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/object/slicer" @@ -75,8 +79,18 @@ func BenchmarkSliceDataIntoObjects(b *testing.B) { } func benchmarkSliceDataIntoObjects(b *testing.B, size, sizeLimit uint64) { + ctx := context.Background() + in, opts := randomInput(b, size, sizeLimit) - s := slicer.NewSession(in.signer, in.container, *sessiontest.ObjectSigned(test.RandomSignerRFC6979(b)), discardObject{}, opts) + s, err := slicer.New( + ctx, + discardObject{opts: opts}, + in.signer, + in.container, + in.owner, + sessiontest.ObjectSigned(test.RandomSignerRFC6979(b)), + ) + require.NoError(b, err) b.Run("reader", func(b *testing.B) { var err error @@ -86,7 +100,7 @@ func benchmarkSliceDataIntoObjects(b *testing.B, size, sizeLimit uint64) { b.ResetTimer() for i := 0; i < b.N; i++ { - _, err = s.Slice(r, in.attributes...) + _, err = s.Put(ctx, r, in.attributes) b.StopTimer() require.NoError(b, err) b.StartTimer() @@ -101,7 +115,7 @@ func benchmarkSliceDataIntoObjects(b *testing.B, size, sizeLimit uint64) { var w *slicer.PayloadWriter for i := 0; i < b.N; i++ { - w, err = s.InitPayloadStream(in.attributes...) + w, err = s.InitPut(ctx, in.attributes) b.StopTimer() require.NoError(b, err) b.StartTimer() @@ -118,27 +132,67 @@ func benchmarkSliceDataIntoObjects(b *testing.B, size, sizeLimit uint64) { }) } -type discardObject struct{} +func networkInfoFromOpts(opts slicer.Options) (netmap.NetworkInfo, error) { + var ni netmap.NetworkInfo + var v2 netmapv2.NetworkInfo + var netConfig netmapv2.NetworkConfig + var p1 netmapv2.NetworkParameter + + p1.SetKey(randomData(10)) + p1.SetValue(randomData(10)) + + netConfig.SetParameters(p1) + v2.SetNetworkConfig(&netConfig) + + if err := ni.ReadFromV2(v2); err != nil { + return ni, err + } + + ni.SetCurrentEpoch(opts.CurrentNeoFSEpoch()) + ni.SetMaxObjectSize(opts.ObjectPayloadLimit()) + if !opts.IsHomomorphicChecksumEnabled() { + ni.DisableHomomorphicHashing() + } + + return ni, nil +} + +type discardObject struct { + opts slicer.Options +} -func (discardObject) InitDataStream(object.Object, user.Signer) (io.Writer, error) { +func (discardObject) ObjectPutInit(context.Context, object.Object, user.Signer, client.PrmObjectPutInit) (client.ObjectWriter, error) { return discardPayload{}, nil } +func (o discardObject) NetworkInfo(_ context.Context, _ client.PrmNetworkInfo) (netmap.NetworkInfo, error) { + return networkInfoFromOpts(o.opts) +} + type discardPayload struct{} func (discardPayload) Write(p []byte) (n int, err error) { return len(p), nil } +func (discardPayload) Close() error { + return nil +} + +func (discardPayload) GetResult() client.ResObjectPut { + return client.ResObjectPut{} +} + type input struct { signer user.Signer container cid.ID owner user.ID + objectType object.Type currentEpoch uint64 payloadLimit uint64 sessionToken *session.Object payload []byte - attributes []string + attributes []object.Attribute withHomo bool } @@ -155,11 +209,14 @@ func randomInput(tb testing.TB, size, sizeLimit uint64) (input, slicer.Options) } attrNum := rand.Int() % 5 - attrs := make([]string, 2*attrNum) + attrs := make([]object.Attribute, attrNum) + + for i := 0; i < attrNum; i++ { + var attr object.Attribute + attr.SetKey(base64.StdEncoding.EncodeToString(randomData(32))) + attr.SetValue(base64.StdEncoding.EncodeToString(randomData(32))) - for i := 0; i < len(attrs); i += 2 { - attrs[i] = base64.StdEncoding.EncodeToString(randomData(32)) - attrs[i+1] = base64.StdEncoding.EncodeToString(randomData(32)) + attrs = append(attrs, attr) } var in input @@ -174,15 +231,16 @@ func randomInput(tb testing.TB, size, sizeLimit uint64) (input, slicer.Options) in.payload = randomData(size) in.attributes = attrs + var opts slicer.Options if rand.Int()%2 == 0 { in.sessionToken = sessiontest.ObjectSigned(test.RandomSignerRFC6979(tb)) + opts.SetSession(in.sessionToken) } else { in.owner = *usertest.ID(tb) } in.withHomo = rand.Int()%2 == 0 - var opts slicer.Options opts.SetObjectPayloadLimit(in.payloadLimit) opts.SetCurrentNeoFSEpoch(in.currentEpoch) if in.withHomo { @@ -192,57 +250,125 @@ func randomInput(tb testing.TB, size, sizeLimit uint64) (input, slicer.Options) return in, opts } -func testSlicer(tb testing.TB, size, sizeLimit uint64) { - in, opts := randomInput(tb, size, sizeLimit) +func testSlicer(t *testing.T, size, sizeLimit uint64) { + in, opts := randomInput(t, size, sizeLimit) checker := &slicedObjectChecker{ - tb: tb, + opts: opts, + tb: t, input: in, - chainCollector: newChainCollector(tb), + chainCollector: newChainCollector(t), } - var s *slicer.Slicer - if checker.input.sessionToken != nil { - s = slicer.NewSession(in.signer, checker.input.container, *checker.input.sessionToken, checker, opts) - } else { - s = slicer.New(in.signer, checker.input.container, checker.input.owner, checker, opts) + for i := object.TypeRegular; i <= object.TypeLock; i++ { + in.objectType = i + + t.Run("slicer with "+i.EncodeToString(), func(t *testing.T) { + testSlicerByHeaderType(t, checker, in, opts) + }) } +} - // check reader - rootID, err := s.Slice(bytes.NewReader(in.payload), in.attributes...) - require.NoError(tb, err) - checker.chainCollector.verify(checker.input, rootID) +func testSlicerByHeaderType(t *testing.T, checker *slicedObjectChecker, in input, opts slicer.Options) { + ctx := context.Background() - // check writer with random written chunk's size - checker.chainCollector = newChainCollector(tb) + t.Run("Slicer.Put", func(t *testing.T) { + checker.chainCollector = newChainCollector(t) + s, err := slicer.New(ctx, checker, checker.input.signer, checker.input.container, checker.input.owner, checker.input.sessionToken) + require.NoError(t, err) - w, err := s.InitPayloadStream(in.attributes...) - require.NoError(tb, err) + rootID, err := s.Put(ctx, bytes.NewReader(in.payload), in.attributes) + require.NoError(t, err) + checker.chainCollector.verify(checker.input, rootID) + }) + + t.Run("slicer.Put", func(t *testing.T) { + checker.chainCollector = newChainCollector(t) + + var hdr object.Object + hdr.SetSessionToken(opts.Session()) + hdr.SetContainerID(in.container) + hdr.SetOwnerID(&in.owner) + hdr.SetAttributes(in.attributes...) + + rootID, err := slicer.Put(ctx, checker, hdr, checker.input.signer, bytes.NewReader(in.payload), opts) + require.NoError(t, err) + checker.chainCollector.verify(checker.input, rootID) + }) - var chunkSize int - if len(in.payload) > 0 { - chunkSize = rand.Int() % len(in.payload) - if chunkSize == 0 { - chunkSize = 1 + t.Run("Slicer.InitPut", func(t *testing.T) { + checker.chainCollector = newChainCollector(t) + + // check writer with random written chunk's size + s, err := slicer.New(ctx, checker, checker.input.signer, checker.input.container, checker.input.owner, checker.input.sessionToken) + require.NoError(t, err) + + w, err := s.InitPut(ctx, in.attributes) + require.NoError(t, err) + + var chunkSize int + if len(in.payload) > 0 { + chunkSize = rand.Int() % len(in.payload) + if chunkSize == 0 { + chunkSize = 1 + } } - } - for payload := in.payload; len(payload) > 0; payload = payload[chunkSize:] { - if chunkSize > len(payload) { - chunkSize = len(payload) + for payload := in.payload; len(payload) > 0; payload = payload[chunkSize:] { + if chunkSize > len(payload) { + chunkSize = len(payload) + } + n, err := w.Write(payload[:chunkSize]) + require.NoError(t, err) + require.EqualValues(t, chunkSize, n) } - n, err := w.Write(payload[:chunkSize]) - require.NoError(tb, err) - require.EqualValues(tb, chunkSize, n) - } - err = w.Close() - require.NoError(tb, err) + err = w.Close() + require.NoError(t, err) + + checker.chainCollector.verify(checker.input, w.ID()) + }) - checker.chainCollector.verify(checker.input, w.ID()) + t.Run("slicer.InitPut", func(t *testing.T) { + checker.chainCollector = newChainCollector(t) + + var hdr object.Object + hdr.SetSessionToken(opts.Session()) + hdr.SetContainerID(in.container) + hdr.SetOwnerID(&in.owner) + hdr.SetAttributes(in.attributes...) + + // check writer with random written chunk's size + w, err := slicer.InitPut(ctx, checker, hdr, checker.input.signer, opts) + require.NoError(t, err) + + var chunkSize int + if len(in.payload) > 0 { + chunkSize = rand.Int() % len(in.payload) + if chunkSize == 0 { + chunkSize = 1 + } + } + + for payload := in.payload; len(payload) > 0; payload = payload[chunkSize:] { + if chunkSize > len(payload) { + chunkSize = len(payload) + } + n, err := w.Write(payload[:chunkSize]) + require.NoError(t, err) + require.EqualValues(t, chunkSize, n) + } + + err = w.Close() + require.NoError(t, err) + + checker.chainCollector.verify(checker.input, w.ID()) + }) } type slicedObjectChecker struct { + opts slicer.Options + tb testing.TB input input @@ -250,7 +376,11 @@ type slicedObjectChecker struct { chainCollector *chainCollector } -func (x *slicedObjectChecker) InitDataStream(hdr object.Object, _ user.Signer) (io.Writer, error) { +func (x *slicedObjectChecker) NetworkInfo(_ context.Context, _ client.PrmNetworkInfo) (netmap.NetworkInfo, error) { + return networkInfoFromOpts(x.opts) +} + +func (x *slicedObjectChecker) ObjectPutInit(_ context.Context, hdr object.Object, _ user.Signer, _ client.PrmObjectPutInit) (client.ObjectWriter, error) { checkStaticMetadata(x.tb, hdr, x.input) buf := bytes.NewBuffer(nil) @@ -269,7 +399,7 @@ type writeSizeChecker struct { payloadSeen bool } -func newSizeChecker(tb testing.TB, hdr object.Object, base io.Writer, sizeLimit uint64) io.Writer { +func newSizeChecker(tb testing.TB, hdr object.Object, base io.Writer, sizeLimit uint64) *writeSizeChecker { return &writeSizeChecker{ tb: tb, hdr: hdr, @@ -304,6 +434,10 @@ func (x *writeSizeChecker) Close() error { return nil } +func (x *writeSizeChecker) GetResult() client.ResObjectPut { + return client.ResObjectPut{} +} + type payloadWithChecksum struct { r io.Reader cs []checksum.Checksum @@ -455,6 +589,8 @@ func (x *chainCollector) verify(in input, rootID oid.ID) { restoredChain := []oid.ID{x.first} restoredPayload := bytes.NewBuffer(make([]byte, 0, rootObj.PayloadSize())) + require.Equal(x.tb, in.objectType, rootObj.Type()) + for { v, ok := x.mPayloads[restoredChain[len(restoredChain)-1]] require.True(x.tb, ok) @@ -494,10 +630,10 @@ func (x *chainCollector) verify(in input, rootID oid.ID) { checkStaticMetadata(x.tb, rootObj, in) attrs := rootObj.Attributes() - require.Len(x.tb, attrs, len(in.attributes)/2) + require.Len(x.tb, attrs, len(in.attributes)) for i := range attrs { - require.Equal(x.tb, in.attributes[2*i], attrs[i].Key()) - require.Equal(x.tb, in.attributes[2*i+1], attrs[i].Value()) + require.Equal(x.tb, in.attributes[i].Key(), attrs[i].Key()) + require.Equal(x.tb, in.attributes[i].Value(), attrs[i].Value()) } require.Equal(x.tb, in.payload, rootObj.Payload()) @@ -505,11 +641,12 @@ func (x *chainCollector) verify(in input, rootID oid.ID) { } type memoryWriter struct { + opts slicer.Options headers []object.Object splitID *object.SplitID } -func (w *memoryWriter) InitDataStream(hdr object.Object, _ user.Signer) (io.Writer, error) { +func (w *memoryWriter) ObjectPutInit(_ context.Context, hdr object.Object, _ user.Signer, _ client.PrmObjectPutInit) (client.ObjectWriter, error) { w.headers = append(w.headers, hdr) if w.splitID == nil && hdr.SplitID() != nil { w.splitID = hdr.SplitID() @@ -518,6 +655,10 @@ func (w *memoryWriter) InitDataStream(hdr object.Object, _ user.Signer) (io.Writ return &memoryPayload{}, nil } +func (w *memoryWriter) NetworkInfo(_ context.Context, _ client.PrmNetworkInfo) (netmap.NetworkInfo, error) { + return networkInfoFromOpts(w.opts) +} + type memoryPayload struct { } @@ -525,9 +666,18 @@ func (p *memoryPayload) Write(data []byte) (int, error) { return len(data), nil } +func (p *memoryPayload) Close() error { + return nil +} + +func (p *memoryPayload) GetResult() client.ResObjectPut { + return client.ResObjectPut{} +} + func TestSlicedObjectsHaveSplitID(t *testing.T) { maxObjectSize := uint64(10) overheadAmount := uint64(3) + ctx := context.Background() var containerID cid.ID id := make([]byte, sha256.Size) @@ -549,31 +699,42 @@ func TestSlicedObjectsHaveSplitID(t *testing.T) { } t.Run("slice", func(t *testing.T) { - writer := &memoryWriter{} - sl := slicer.New(signer, containerID, ownerID, writer, opts) + writer := &memoryWriter{ + opts: opts, + } + sl, err := slicer.New(context.Background(), writer, signer, containerID, ownerID, nil) + require.NoError(t, err) payload := make([]byte, maxObjectSize*overheadAmount) _, err = rand.Read(payload) require.NoError(t, err) - _, err = sl.Slice(bytes.NewBuffer(payload)) + _, err = sl.Put(ctx, bytes.NewBuffer(payload), nil) require.NoError(t, err) require.Equal(t, overheadAmount+1, uint64(len(writer.headers))) - for _, h := range writer.headers { + for i, h := range writer.headers { splitID := h.SplitID() - require.NotNil(t, splitID) - require.Equal(t, writer.splitID.ToV2(), splitID.ToV2()) + if i == 0 { + require.Nil(t, splitID) + } else { + require.NotNil(t, splitID) + require.Equal(t, writer.splitID.ToV2(), splitID.ToV2()) + } + checkParentWithoutSplitInfo(h) } }) t.Run("InitPayloadStream", func(t *testing.T) { - writer := &memoryWriter{} - sl := slicer.New(signer, containerID, ownerID, writer, opts) + writer := &memoryWriter{ + opts: opts, + } + sl, err := slicer.New(context.Background(), writer, signer, containerID, ownerID, nil) + require.NoError(t, err) - payloadWriter, err := sl.InitPayloadStream() + payloadWriter, err := sl.InitPut(ctx, nil) require.NoError(t, err) for i := uint64(0); i < overheadAmount; i++ { @@ -588,23 +749,31 @@ func TestSlicedObjectsHaveSplitID(t *testing.T) { require.NoError(t, payloadWriter.Close()) require.Equal(t, overheadAmount+1, uint64(len(writer.headers))) - for _, h := range writer.headers { + for i, h := range writer.headers { splitID := h.SplitID() - require.NotNil(t, splitID) - require.Equal(t, writer.splitID.ToV2(), splitID.ToV2()) + if i == 0 { + require.Nil(t, splitID) + } else { + require.NotNil(t, splitID) + require.Equal(t, writer.splitID.ToV2(), splitID.ToV2()) + } + checkParentWithoutSplitInfo(h) } }) t.Run("no split info if no overflow", func(t *testing.T) { - writer := &memoryWriter{} - sl := slicer.New(signer, containerID, ownerID, writer, opts) + writer := &memoryWriter{ + opts: opts, + } + sl, err := slicer.New(context.Background(), writer, signer, containerID, ownerID, nil) + require.NoError(t, err) payload := make([]byte, maxObjectSize-1) _, err = rand.Read(payload) require.NoError(t, err) - _, err = sl.Slice(bytes.NewBuffer(payload)) + _, err = sl.Put(ctx, bytes.NewBuffer(payload), nil) require.NoError(t, err) require.Equal(t, uint64(1), uint64(len(writer.headers))) diff --git a/pool/object.go b/pool/object.go index 15cbdc5a..bfa5f5eb 100644 --- a/pool/object.go +++ b/pool/object.go @@ -30,7 +30,7 @@ func (p *Pool) actualSigner(signer user.Signer) user.Signer { // Operation is executed within a session automatically created by [Pool] unless parameters explicitly override session settings. // // See details in [client.Client.ObjectPutInit]. -func (p *Pool) ObjectPutInit(ctx context.Context, hdr object.Object, signer user.Signer, prm client.PrmObjectPutInit) (*client.ObjectWriter, error) { +func (p *Pool) ObjectPutInit(ctx context.Context, hdr object.Object, signer user.Signer, prm client.PrmObjectPutInit) (client.ObjectWriter, error) { c, err := p.sdkClient() if err != nil { return nil, err diff --git a/pool/pool.go b/pool/pool.go index 8e0554d4..e707d62b 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -550,44 +550,57 @@ func (c *clientWrapper) objectPut(ctx context.Context, signer user.Signer, prm P } } - if prm.payload != nil { - const defaultBufferSizePut = 3 << 20 // configure? + if err = writePayload(wObj, prm.payload, sz); err != nil { + c.updateErrorRate(err) - if sz == 0 || sz > defaultBufferSizePut { - sz = defaultBufferSizePut - } + return oid.ID{}, fmt.Errorf("writePayload: %w", err) + } + + err = wObj.Close() + c.updateErrorRate(err) + if err != nil { // here err already carries both status and client errors + return oid.ID{}, fmt.Errorf("client failure: %w", err) + } - buf := make([]byte, sz) + return wObj.GetResult().StoredObjectID(), nil +} - var n int +func writePayload(wObj io.Writer, payload io.Reader, sz uint64) error { + if payload == nil || wObj == nil { + return nil + } - for { - n, err = prm.payload.Read(buf) - if n > 0 { - successWrite := wObj.WritePayloadChunk(buf[:n]) - if !successWrite { - break - } + const defaultBufferSizePut = 3 << 20 // configure? + if sz == 0 || sz > defaultBufferSizePut { + sz = defaultBufferSizePut + } - continue - } + buf := make([]byte, sz) - if errors.Is(err, io.EOF) { - break + var ( + n int + err error + ) + + for { + n, err = payload.Read(buf) + + if err != nil { + if !errors.Is(err, io.EOF) { + return fmt.Errorf("read payload: %w", err) } + } - c.updateErrorRate(err) - return oid.ID{}, fmt.Errorf("read payload: %w", err) + if n == 0 { + break } - } - res, err := wObj.Close() - c.updateErrorRate(err) - if err != nil { // here err already carries both status and client errors - return oid.ID{}, fmt.Errorf("client failure: %w", err) + if _, err = wObj.Write(buf[:n]); err != nil { + return fmt.Errorf("write payload: %w", err) + } } - return res.StoredObjectID(), nil + return nil } // objectDelete invokes sdkClient.ObjectDelete parse response status to error. diff --git a/pool/pool_aio_test.go b/pool/pool_aio_test.go index da741d2a..e1389371 100644 --- a/pool/pool_aio_test.go +++ b/pool/pool_aio_test.go @@ -48,7 +48,7 @@ type ( } objectPutIniter interface { - ObjectPutInit(ctx context.Context, hdr object.Object, signer user.Signer, prm client.PrmObjectPutInit) (*client.ObjectWriter, error) + ObjectPutInit(ctx context.Context, hdr object.Object, signer user.Signer, prm client.PrmObjectPutInit) (client.ObjectWriter, error) } objectDeleter interface { @@ -313,12 +313,13 @@ func TestPoolWaiterWithAIO(t *testing.T) { w, err := pool.ObjectPutInit(ctxTimeout, hdr, signer, prm) require.NoError(t, err) - require.True(t, w.WritePayloadChunk(payload)) + _, err = w.Write(payload) + require.NoError(t, err) - resp, err := w.Close() + err = w.Close() require.NoError(t, err) - objectID = resp.StoredObjectID() + objectID = w.GetResult().StoredObjectID() }) t.Run("download object", func(t *testing.T) { @@ -468,12 +469,13 @@ func TestClientWaiterWithAIO(t *testing.T) { w, err := cl.ObjectPutInit(ctxTimeout, hdr, signer, prm) require.NoError(t, err) - require.True(t, w.WritePayloadChunk(payload)) + _, err = w.Write(payload) + require.NoError(t, err) - resp, err := w.Close() + err = w.Close() require.NoError(t, err) - objectID = resp.StoredObjectID() + objectID = w.GetResult().StoredObjectID() }) t.Run("download object", func(t *testing.T) { @@ -539,12 +541,13 @@ func testObjectPutInit(t *testing.T, ctx context.Context, account user.ID, conta w, err := putter.ObjectPutInit(ctx, hdr, signer, prm) require.NoError(t, err) - require.True(t, w.WritePayloadChunk(payload)) + _, err = w.Write(payload) + require.NoError(t, err) - resp, err := w.Close() + err = w.Close() require.NoError(t, err) - return resp.StoredObjectID() + return w.GetResult().StoredObjectID() } func testCreateContainer(t *testing.T, ctx context.Context, signer neofscrypto.Signer, cont container.Container, creator containerCreator) cid.ID { diff --git a/pool/pool_test.go b/pool/pool_test.go index 9d165263..0b0d7f33 100644 --- a/pool/pool_test.go +++ b/pool/pool_test.go @@ -1,8 +1,10 @@ package pool import ( + "bytes" "context" "errors" + "io" "strconv" "testing" "time" @@ -671,3 +673,124 @@ func TestSwitchAfterErrorThreshold(t *testing.T) { _, err = conn.objectGet(ctx, cid.ID{}, oid.ID{}, signer, PrmObjectGet{}) require.NoError(t, err) } + +type simpleWriter struct { + results []ioResult + data []byte +} + +func (s *simpleWriter) Write(p []byte) (n int, err error) { + if len(s.results) == 0 { + return 0, errors.New("unknown testcase") + } + + s.data = append(s.data, p...) + + d := s.results[0] + s.results = s.results[1:] + + return d.n, d.err +} + +type ioResult struct { + data []byte + n int + err error +} + +type simpleReader struct { + results []ioResult +} + +func (s *simpleReader) Read(p []byte) (n int, err error) { + if len(s.results) == 0 { + return 0, io.EOF + } + + d := s.results[0] + copy(p, d.data) + + s.results = s.results[1:] + return d.n, d.err +} + +func TestWritePayload(t *testing.T) { + t.Run("n > 0, io.EOF", func(t *testing.T) { + writer := simpleWriter{ + results: []ioResult{ + {n: 1, err: nil}, + {n: 1, err: nil}, + }, + } + + reader := simpleReader{ + results: []ioResult{ + {data: []byte{0}, n: 1, err: nil}, + {data: []byte{1}, n: 1, err: io.EOF}, + }, + } + + require.NoError(t, writePayload(&writer, &reader, 1)) + require.True(t, bytes.Equal([]byte{0, 1}, writer.data)) + }) + + t.Run("n == 0, io.EOF", func(t *testing.T) { + writer := simpleWriter{ + results: []ioResult{ + {n: 1, err: nil}, + {n: 1, err: nil}, + }, + } + + reader := simpleReader{ + results: []ioResult{ + {data: []byte{0}, n: 1, err: nil}, + }, + } + + require.NoError(t, writePayload(&writer, &reader, 1)) + require.True(t, bytes.Equal([]byte{0}, writer.data)) + }) + + t.Run("write err", func(t *testing.T) { + writer := simpleWriter{ + results: []ioResult{ + {n: 1, err: nil}, + {n: 0, err: errors.New("some error")}, + }, + } + + reader := simpleReader{ + results: []ioResult{ + {data: []byte{0}, n: 1, err: nil}, + {data: []byte{1}, n: 1, err: nil}, + }, + } + + require.Error(t, writePayload(&writer, &reader, 1)) + }) + + t.Run("read err", func(t *testing.T) { + writer := simpleWriter{ + results: []ioResult{ + {n: 1, err: nil}, + }, + } + + reader := simpleReader{ + results: []ioResult{ + {n: 0, err: errors.New("some err")}, + }, + } + + require.Error(t, writePayload(&writer, &reader, 1)) + }) + + t.Run("empty writer or reader", func(t *testing.T) { + writer := simpleWriter{} + require.NoError(t, writePayload(&writer, nil, 0)) + + reader := simpleReader{} + require.NoError(t, writePayload(nil, &reader, 0)) + }) +}