Skip to content

Commit

Permalink
slicer: Added split info to the first child
Browse files Browse the repository at this point in the history
closes #448

Signed-off-by: Evgenii Baidakov <[email protected]>
  • Loading branch information
smallhive committed Jul 7, 2023
1 parent 2e8790e commit 463272a
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 1 deletion.
4 changes: 3 additions & 1 deletion object/slicer/slicer.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,14 +355,16 @@ func (x *PayloadWriter) Write(chunk []byte) (int, error) {

if !x.withSplit {
x.splitID = object.NewSplitID()
// note: don't move next row, the value of this flag will be used inside writeIntermediateChild
// to fill splitInfo in all child objects.
x.withSplit = true

err = x.writeIntermediateChild(x.rootMeta)
if err != nil {
return n, fmt.Errorf("write 1st child: %w", err)
}

x.currentWriter.reset(io.MultiWriter(&x.buf, &x.rootMeta, &x.childMeta))
x.withSplit = true
} else {
err = x.writeIntermediateChild(x.childMeta)
if err != nil {
Expand Down
105 changes: 105 additions & 0 deletions object/slicer/slicer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,3 +505,108 @@ func (x *chainCollector) verify(in input, rootID oid.ID) {
require.Equal(x.tb, in.payload, rootObj.Payload())
require.NoError(x.tb, rootObj.VerifyPayloadChecksum(), "payload checksum must be correctly set")
}

type memoryWriter struct {
headers []object.Object
splitID *object.SplitID
}

func (w *memoryWriter) InitDataStream(hdr object.Object) (io.Writer, error) {
w.headers = append(w.headers, hdr)
if w.splitID == nil && hdr.SplitID() != nil {
w.splitID = hdr.SplitID()
}

return &memoryPayload{}, nil
}

type memoryPayload struct {
}

func (p *memoryPayload) Write(data []byte) (int, error) {
return len(data), nil
}

func TestSlicedObjectsHaveSplitID(t *testing.T) {
maxObjectSize := uint64(10)
overheadAmount := uint64(3)

var containerID cid.ID
id := make([]byte, sha256.Size)
_, err := rand.Read(id)
require.NoError(t, err)
containerID.Encode(id)

var ownerID user.ID
signer := test.RandomSignerRFC6979(t)
require.NoError(t, user.IDFromSigner(&ownerID, signer))

opts := slicer.Options{}
opts.SetObjectPayloadLimit(maxObjectSize)
opts.SetCurrentNeoFSEpoch(10)

t.Run("slice", func(t *testing.T) {
writer := &memoryWriter{}
sl := slicer.New(signer, containerID, ownerID, writer, opts)

payload := make([]byte, maxObjectSize*overheadAmount)
_, err = rand.Read(payload)
require.NoError(t, err)

_, err = sl.Slice(bytes.NewBuffer(payload))
require.NoError(t, err)

require.Equal(t, overheadAmount+1, uint64(len(writer.headers)))

for _, h := range writer.headers {
splitID := h.SplitID()
require.NotNil(t, splitID)
require.Equal(t, writer.splitID.ToV2(), splitID.ToV2())
}
})

t.Run("InitPayloadStream", func(t *testing.T) {
writer := &memoryWriter{}
sl := slicer.New(signer, containerID, ownerID, writer, opts)

payloadWriter, err := sl.InitPayloadStream()
require.NoError(t, err)

for i := uint64(0); i < overheadAmount; i++ {
payload := make([]byte, maxObjectSize)
_, err = rand.Read(payload)
require.NoError(t, err)

_, err := payloadWriter.Write(payload)
require.NoError(t, err)
}

require.NoError(t, payloadWriter.Close())
require.Equal(t, overheadAmount+1, uint64(len(writer.headers)))

for _, h := range writer.headers {
splitID := h.SplitID()
require.NotNil(t, splitID)
require.Equal(t, writer.splitID.ToV2(), splitID.ToV2())
}
})

t.Run("no split info if no overflow", func(t *testing.T) {
writer := &memoryWriter{}
sl := slicer.New(signer, containerID, ownerID, writer, opts)

payload := make([]byte, maxObjectSize-1)
_, err = rand.Read(payload)
require.NoError(t, err)

_, err = sl.Slice(bytes.NewBuffer(payload))
require.NoError(t, err)

require.Equal(t, uint64(1), uint64(len(writer.headers)))

for _, h := range writer.headers {
splitID := h.SplitID()
require.Nil(t, splitID)
}
})
}

0 comments on commit 463272a

Please sign in to comment.