Skip to content

Commit

Permalink
Merge pull request #980 from SiaFoundation/pj/fix-packed-slab-uploads
Browse files Browse the repository at this point in the history
Only upload a singly synchronous packed slab when buffer is reached
  • Loading branch information
ChrisSchinnerl authored Feb 28, 2024
2 parents e031cf4 + c956182 commit 1ef39bc
Show file tree
Hide file tree
Showing 7 changed files with 215 additions and 109 deletions.
7 changes: 6 additions & 1 deletion api/setting.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,12 @@ func (rs RedundancySettings) Redundancy() float64 {
return float64(rs.TotalShards) / float64(rs.MinShards)
}

// SlabSizeNoRedundancy returns the size of a slab without added redundancy.
// SlabSize returns the size of a slab.
func (rs RedundancySettings) SlabSize() uint64 {
return uint64(rs.TotalShards) * rhpv2.SectorSize
}

// SlabSizeNoRedundancy returns the size of a slab without redundancy.
func (rs RedundancySettings) SlabSizeNoRedundancy() uint64 {
return uint64(rs.MinShards) * rhpv2.SectorSize
}
Expand Down
3 changes: 1 addition & 2 deletions internal/test/e2e/pruning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"testing"
"time"

rhpv2 "go.sia.tech/core/rhp/v2"
"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/hostdb"
Expand Down Expand Up @@ -208,7 +207,7 @@ func TestSectorPruning(t *testing.T) {
tt.Retry(100, 100*time.Millisecond, func() error {
res, err = b.PrunableData(context.Background())
tt.OK(err)
if res.TotalPrunable != uint64(math.Ceil(float64(numObjects)/2))*uint64(rs.TotalShards)*rhpv2.SectorSize {
if res.TotalPrunable != uint64(math.Ceil(float64(numObjects)/2))*rs.SlabSize() {
return fmt.Errorf("unexpected prunable data %v", n)
}
return nil
Expand Down
45 changes: 36 additions & 9 deletions worker/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"math/big"
"sync"
"time"
Expand Down Expand Up @@ -346,24 +347,27 @@ var _ ObjectStore = (*objectStoreMock)(nil)

type (
objectStoreMock struct {
mu sync.Mutex
objects map[string]map[string]object.Object
partials map[string]packedSlabMock
bufferIDCntr uint // allows marking packed slabs as uploaded
mu sync.Mutex
objects map[string]map[string]object.Object
partials map[string]*packedSlabMock
slabBufferMaxSizeSoft int
bufferIDCntr uint // allows marking packed slabs as uploaded
}

packedSlabMock struct {
parameterKey string // ([minshards]-[totalshards]-[contractset])
bufferID uint
slabKey object.EncryptionKey
data []byte
lockedUntil time.Time
}
)

func newObjectStoreMock(bucket string) *objectStoreMock {
os := &objectStoreMock{
objects: make(map[string]map[string]object.Object),
partials: make(map[string]packedSlabMock),
objects: make(map[string]map[string]object.Object),
partials: make(map[string]*packedSlabMock),
slabBufferMaxSizeSoft: math.MaxInt64,
}
os.objects[bucket] = make(map[string]object.Object)
return os
Expand Down Expand Up @@ -421,15 +425,15 @@ func (os *objectStoreMock) AddPartialSlab(ctx context.Context, data []byte, minS
}

// update store
os.partials[ec.String()] = packedSlabMock{
os.partials[ec.String()] = &packedSlabMock{
parameterKey: fmt.Sprintf("%d-%d-%v", minShards, totalShards, contractSet),
bufferID: os.bufferIDCntr,
slabKey: ec,
data: data,
}
os.bufferIDCntr++

return []object.SlabSlice{ss}, false, nil
return []object.SlabSlice{ss}, os.totalSlabBufferSize() > os.slabBufferMaxSizeSoft, nil
}

func (os *objectStoreMock) Object(ctx context.Context, bucket, path string, opts api.GetObjectOptions) (api.ObjectsResponse, error) {
Expand Down Expand Up @@ -511,14 +515,22 @@ func (os *objectStoreMock) PackedSlabsForUpload(ctx context.Context, lockingDura
os.mu.Lock()
defer os.mu.Unlock()

if limit == -1 {
limit = math.MaxInt
}

parameterKey := fmt.Sprintf("%d-%d-%v", minShards, totalShards, set)
for _, ps := range os.partials {
if ps.parameterKey == parameterKey {
if ps.parameterKey == parameterKey && time.Now().After(ps.lockedUntil) {
ps.lockedUntil = time.Now().Add(lockingDuration)
pss = append(pss, api.PackedSlab{
BufferID: ps.bufferID,
Data: ps.data,
Key: ps.slabKey,
})
if len(pss) == limit {
break
}
}
}
return
Expand Down Expand Up @@ -557,6 +569,21 @@ func (os *objectStoreMock) MultipartUpload(ctx context.Context, uploadID string)
return api.MultipartUpload{}, nil
}

func (os *objectStoreMock) totalSlabBufferSize() (total int) {
for _, p := range os.partials {
if time.Now().After(p.lockedUntil) {
total += len(p.data)
}
}
return
}

func (os *objectStoreMock) setSlabBufferMaxSizeSoft(n int) {
os.mu.Lock()
defer os.mu.Unlock()
os.slabBufferMaxSizeSoft = n
}

func (os *objectStoreMock) forEachObject(fn func(bucket, path string, o object.Object)) {
for bucket, objects := range os.objects {
for path, object := range objects {
Expand Down
143 changes: 71 additions & 72 deletions worker/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,119 +177,118 @@ func (w *worker) upload(ctx context.Context, r io.Reader, contracts []api.Contra
return "", err
}

// if packing was enabled try uploading packed slabs
if up.packing {
if err := w.tryUploadPackedSlabs(ctx, up.rs, up.contractSet, bufferSizeLimitReached); err != nil {
w.logger.Errorf("couldn't upload packed slabs, err: %v", err)
// return early if worker was shut down or if we don't have to consider
// packed uploads
if w.isStopped() || !up.packing {
return eTag, nil
}

// try and upload one slab synchronously
if bufferSizeLimitReached {
mem := w.uploadManager.mm.AcquireMemory(ctx, up.rs.SlabSize())
if mem != nil {
defer mem.Release()

// fetch packed slab to upload
packedSlabs, err := w.bus.PackedSlabsForUpload(ctx, defaultPackedSlabsLockDuration, uint8(up.rs.MinShards), uint8(up.rs.TotalShards), up.contractSet, 1)
if err != nil {
return "", fmt.Errorf("couldn't fetch packed slabs from bus: %v", err)
}

// upload packed slab
if len(packedSlabs) > 0 {
if err := w.tryUploadPackedSlab(ctx, mem, packedSlabs[0], up.rs, up.contractSet, lockingPriorityBlockedUpload); err != nil {
w.logger.Errorf("couldn't upload packed slabs, err: %v", err)
}
}
}

// make sure there's a goroutine uploading the remainder of the packed slabs
go w.threadedUploadPackedSlabs(up.rs, up.contractSet, lockingPriorityBackgroundUpload)
}

return eTag, nil
}

func (w *worker) threadedUploadPackedSlabs(rs api.RedundancySettings, contractSet string, lockPriority int) {
key := fmt.Sprintf("%d-%d_%s", rs.MinShards, rs.TotalShards, contractSet)

w.uploadsMu.Lock()
if w.uploadingPackedSlabs[key] {
if _, ok := w.uploadingPackedSlabs[key]; ok {
w.uploadsMu.Unlock()
return
}
w.uploadingPackedSlabs[key] = true
w.uploadingPackedSlabs[key] = struct{}{}
w.uploadsMu.Unlock()

// make sure we mark uploading packed slabs as false when we're done
defer func() {
w.uploadsMu.Lock()
w.uploadingPackedSlabs[key] = false
delete(w.uploadingPackedSlabs, key)
w.uploadsMu.Unlock()
}()

// keep uploading packed slabs until we're done
for {
uploaded, err := w.uploadPackedSlabs(w.shutdownCtx, defaultPackedSlabsLockDuration, rs, contractSet, lockPriority)
if err != nil {
w.logger.Errorf("couldn't upload packed slabs, err: %v", err)
return
} else if uploaded == 0 {
return
}
}
}

func (w *worker) tryUploadPackedSlabs(ctx context.Context, rs api.RedundancySettings, contractSet string, block bool) (err error) {
// if we want to block, try and upload one packed slab synchronously, we use
// a slightly higher upload priority to avoid reaching the context deadline
if block {
_, err = w.uploadPackedSlabs(ctx, defaultPackedSlabsLockDuration, rs, contractSet, lockingPriorityBlockedUpload)
}

// make sure there's a goroutine uploading the remainder of the packed slabs
go w.threadedUploadPackedSlabs(rs, contractSet, lockingPriorityBackgroundUpload)
return
}

func (w *worker) uploadPackedSlabs(ctx context.Context, lockingDuration time.Duration, rs api.RedundancySettings, contractSet string, lockPriority int) (uploaded int, err error) {
// upload packed slabs
var mu sync.Mutex
var errs error

var wg sync.WaitGroup
totalSize := uint64(rs.TotalShards) * rhpv2.SectorSize

// derive a context that we can use as an interrupt in case of an error.
interruptCtx, cancel := context.WithCancel(ctx)
defer cancel()
// derive a context that we can use as an interrupt in case of an error or shutdown.
interruptCtx, interruptCancel := context.WithCancel(w.shutdownCtx)
defer interruptCancel()

var wg sync.WaitGroup
for {
// block until we have memory for a slab or until we are interrupted
mem := w.uploadManager.mm.AcquireMemory(interruptCtx, totalSize)
// block until we have memory
mem := w.uploadManager.mm.AcquireMemory(interruptCtx, rs.SlabSize())
if mem == nil {
break // interrupted
}

// fetch packed slabs to upload
var packedSlabs []api.PackedSlab
packedSlabs, err = w.bus.PackedSlabsForUpload(ctx, lockingDuration, uint8(rs.MinShards), uint8(rs.TotalShards), contractSet, 1)
// fetch packed slab to upload
packedSlabs, err := w.bus.PackedSlabsForUpload(interruptCtx, defaultPackedSlabsLockDuration, uint8(rs.MinShards), uint8(rs.TotalShards), contractSet, 1)
if err != nil {
err = fmt.Errorf("couldn't fetch packed slabs from bus: %v", err)
mu.Lock()
errs = errors.Join(errs, fmt.Errorf("couldn't fetch packed slabs from bus: %v", err))
mu.Unlock()
}

// no more packed slabs to upload
if len(packedSlabs) == 0 {
mem.Release()
break
} else if len(packedSlabs) == 0 {
mem.Release()
break // no more slabs
}
ps := packedSlabs[0]

// launch upload for slab
wg.Add(1)
go func(ps api.PackedSlab) {
defer mem.Release()
defer wg.Done()
err := w.uploadPackedSlab(ctx, rs, ps, mem, contractSet, lockPriority)
mu.Lock()
if err != nil {
defer mem.Release()

// we use the background context here, but apply a sane timeout,
// this ensures ongoing uploads are handled gracefully during
// shutdown
ctx, cancel := context.WithTimeout(context.Background(), defaultPackedSlabsUploadTimeout)
defer cancel()

// try to upload a packed slab, if there were no packed slabs left to upload ok is false
if err := w.tryUploadPackedSlab(ctx, mem, ps, rs, contractSet, lockPriority); err != nil {
mu.Lock()
errs = errors.Join(errs, err)
cancel() // prevent new uploads from being launched
} else {
uploaded++
mu.Unlock()
interruptCancel() // prevent new uploads from being launched
}
mu.Unlock()
}(ps)
}(packedSlabs[0])
}

// wait for all threads to finish
wg.Wait()

// return collected errors
err = errors.Join(err, errs)
// log errors
if err := errors.Join(errs); err != nil {
w.logger.Errorf("couldn't upload packed slabs, err: %v", err)
}
return
}

func (w *worker) uploadPackedSlab(ctx context.Context, rs api.RedundancySettings, ps api.PackedSlab, mem Memory, contractSet string, lockPriority int) error {
// create a context with sane timeout
ctx, cancel := context.WithTimeout(ctx, defaultPackedSlabsUploadTimeout)
defer cancel()

func (w *worker) tryUploadPackedSlab(ctx context.Context, mem Memory, ps api.PackedSlab, rs api.RedundancySettings, contractSet string, lockPriority int) error {
// fetch contracts
contracts, err := w.bus.Contracts(ctx, api.ContractsOpts{ContractSet: contractSet})
if err != nil {
Expand Down Expand Up @@ -434,9 +433,9 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, contracts []a
// channel to notify main thread of the number of slabs to wait for
numSlabsChan := make(chan int, 1)

// prepare slab size
size := int64(up.rs.MinShards) * rhpv2.SectorSize
redundantSize := uint64(up.rs.TotalShards) * rhpv2.SectorSize
// prepare slab sizes
slabSizeNoRedundancy := up.rs.SlabSizeNoRedundancy()
slabSize := up.rs.SlabSize()
var partialSlab []byte

// launch uploads in a separate goroutine
Expand All @@ -451,14 +450,14 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, contracts []a
default:
}
// acquire memory
mem := mgr.mm.AcquireMemory(ctx, redundantSize)
mem := mgr.mm.AcquireMemory(ctx, slabSize)
if mem == nil {
return // interrupted
}

// read next slab's data
data := make([]byte, size)
length, err := io.ReadFull(io.LimitReader(cr, size), data)
data := make([]byte, slabSizeNoRedundancy)
length, err := io.ReadFull(io.LimitReader(cr, int64(slabSizeNoRedundancy)), data)
if err == io.EOF {
mem.Release()

Expand Down
Loading

0 comments on commit 1ef39bc

Please sign in to comment.