Skip to content

Commit

Permalink
VDDK: Fix NBD status coalescing for large blocks. (kubevirt#3242)
Browse files Browse the repository at this point in the history
* Mock NBD functions in VDDK unit tests.

Also add some example tests for GetBlockStatus.

Signed-off-by: Matthew Arnold <[email protected]>

* Avoid infinite loop by returning whole block.

Also add a unit test to trigger this code. Without the fix, this spins
"No new block status data" messages forever, as reported in the bug.
With the fix, this continues the data transfer without zero-range or
hole-punch optimizations.

Signed-off-by: Matthew Arnold <[email protected]>

* Correctly extend merged block length.

Bring in the previously proposed fix to make sure merged blocks have
correct lengths, avoiding the initial issue.

Signed-off-by: Matthew Arnold <[email protected]>

* Add two more GetBlockStatus unit tests.

Hopefully this makes it more obvious what GetBlockStatus actually does.

Signed-off-by: Matthew Arnold <[email protected]>

* Add unit tests for larger block sizes.

The first test intentionally overflows the result block's length field
and causes an infinite loop in GetBlockStatus, as pointed out in the
problem report. This will be fixed in the next commit.

Signed-off-by: Matthew Arnold <[email protected]>

* Avoid integer overflow and sort out casts.

Increase the size of BlockStatusData.Length to an int64, to avoid
overflow in GetBlockStatus. Also change BlockStatusData.Offset to an
int64 and remove a handful of unnerving integer conversions. Favor int64
over uint64 to match system libraries, and add a few necessary
conversions mostly isolated to libnbd interfaces.

Signed-off-by: Matthew Arnold <[email protected]>

* Fix alignment for multi-stage VDDK documentation.

Signed-off-by: Matthew Arnold <[email protected]>

---------

Signed-off-by: Matthew Arnold <[email protected]>
  • Loading branch information
mrnold committed Nov 5, 2024
1 parent 043fe5d commit 38f1159
Show file tree
Hide file tree
Showing 3 changed files with 311 additions and 60 deletions.
12 changes: 6 additions & 6 deletions doc/datavolumes.md
Original file line number Diff line number Diff line change
Expand Up @@ -338,12 +338,12 @@ spec:
uuid: "52260566-b032-36cb-55b1-79bf29e30490"
thumbprint: "20:6C:8A:5D:44:40:B3:79:4B:28:EA:76:13:60:90:6E:49:D9:D9:A3" # SSL fingerprint of vCenter/ESX host
secretRef: "vddk-credentials"
finalCheckpoint: true
checkpoints:
- current: "snapshot-1"
previous: ""
- current: "snapshot-2"
previous: "snapshot-1"
finalCheckpoint: true
checkpoints:
- current: "snapshot-1"
previous: ""
- current: "snapshot-2"
previous: "snapshot-1"
pvc:
accessModes:
- ReadWriteOnce
Expand Down
93 changes: 52 additions & 41 deletions pkg/importer/vddk-datasource_amd64.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"net/url"
"os"
"regexp"
Expand Down Expand Up @@ -509,7 +510,7 @@ const MaxPreadLengthVC = (2 << 20)
// MaxPreadLength is the maxmimum read size to request from VMware. Default to
// the larger option, and reduce it in createVddkDataSource when connecting to
// vCenter endpoints.
var MaxPreadLength uint32 = MaxPreadLengthESX
var MaxPreadLength int = MaxPreadLengthESX

// NbdOperations provides a mockable interface for the things needed from libnbd.
type NbdOperations interface {
Expand All @@ -521,8 +522,8 @@ type NbdOperations interface {

// BlockStatusData holds zero/hole status for one block of data
type BlockStatusData struct {
Offset uint64
Length uint32
Offset int64
Length int64
Flags uint32
}

Expand All @@ -538,7 +539,13 @@ func GetBlockStatus(handle NbdOperations, extent types.DiskChangeExtent) []*Bloc
var blocks []*BlockStatusData

// Callback for libnbd.BlockStatus. Needs to modify blocks list above.
updateBlocksCallback := func(metacontext string, offset uint64, extents []uint32, err *int) int {
updateBlocksCallback := func(metacontext string, nbdOffset uint64, extents []uint32, err *int) int {
if nbdOffset > math.MaxInt64 {
klog.Errorf("Block status offset too big for conversion: 0x%x", nbdOffset)
return -2
}
offset := int64(nbdOffset)

if *err != 0 {
klog.Errorf("Block status callback error at offset %d: error code %d", offset, *err)
return *err
Expand All @@ -552,17 +559,17 @@ func GetBlockStatus(handle NbdOperations, extent types.DiskChangeExtent) []*Bloc
return -1
}
for i := 0; i < len(extents); i += 2 {
length, flags := extents[i], extents[i+1]
length, flags := int64(extents[i]), extents[i+1]
if blocks != nil {
last := len(blocks) - 1
lastBlock := blocks[last]
lastFlags := lastBlock.Flags
lastOffset := lastBlock.Offset + uint64(lastBlock.Length)
lastOffset := lastBlock.Offset + lastBlock.Length
if lastFlags == flags && lastOffset == offset {
// Merge with previous block
blocks[last] = &BlockStatusData{
Offset: lastBlock.Offset,
Length: lastBlock.Length,
Length: lastBlock.Length + length,
Flags: lastFlags,
}
} else {
Expand All @@ -571,46 +578,50 @@ func GetBlockStatus(handle NbdOperations, extent types.DiskChangeExtent) []*Bloc
} else {
blocks = append(blocks, &BlockStatusData{Offset: offset, Length: length, Flags: flags})
}
offset += uint64(length)
offset += length
}
return 0
}

if extent.Length < 1024*1024 {
blocks = append(blocks, &BlockStatusData{
Offset: uint64(extent.Start),
Length: uint32(extent.Length),
Offset: extent.Start,
Length: extent.Length,
Flags: 0})
return blocks
}

lastOffset := extent.Start
endOffset := extent.Start + extent.Length
for lastOffset < endOffset {
var length uint64
var length int64
missingLength := endOffset - lastOffset
if missingLength > (MaxBlockStatusLength) {
length = (MaxBlockStatusLength)
length = MaxBlockStatusLength
} else {
length = uint64(missingLength)
length = missingLength
}
err := handle.BlockStatus(length, uint64(lastOffset), updateBlocksCallback, &fixedOptArgs)
if err != nil {
klog.Errorf("Error getting block status at offset %d, returning whole block instead. Error was: %v", lastOffset, err)
createWholeBlock := func() []*BlockStatusData {
block := &BlockStatusData{
Offset: uint64(extent.Start),
Length: uint32(extent.Length),
Offset: extent.Start,
Length: extent.Length,
Flags: 0,
}
blocks = []*BlockStatusData{block}
return blocks
}
err := handle.BlockStatus(uint64(length), uint64(lastOffset), updateBlocksCallback, &fixedOptArgs)
if err != nil {
klog.Errorf("Error getting block status at offset %d, returning whole block instead. Error was: %v", lastOffset, err)
return createWholeBlock()
}
last := len(blocks) - 1
newOffset := blocks[last].Offset + uint64(blocks[last].Length)
if uint64(lastOffset) == newOffset {
klog.Infof("No new block status data at offset %d", newOffset)
newOffset := blocks[last].Offset + blocks[last].Length
if lastOffset == newOffset {
klog.Infof("No new block status data at offset %d, returning whole block.", newOffset)
return createWholeBlock()
}
lastOffset = int64(newOffset)
lastOffset = newOffset
}

return blocks
Expand All @@ -636,29 +647,29 @@ func CopyRange(handle NbdOperations, sink VDDKDataSink, block *BlockStatusData,
return err
}

buffer := bytes.Repeat([]byte{0}, int(MaxPreadLength))
count := uint32(0)
buffer := bytes.Repeat([]byte{0}, MaxPreadLength)
count := int64(0)
for count < block.Length {
if block.Length-count < MaxPreadLength {
if block.Length-count < int64(MaxPreadLength) {
buffer = bytes.Repeat([]byte{0}, int(block.Length-count))
}
length := len(buffer)

offset := block.Offset + uint64(count)
err := handle.Pread(buffer, offset, nil)
offset := block.Offset + count
err := handle.Pread(buffer, uint64(offset), nil)
if err != nil {
klog.Errorf("Error reading from source at offset %d: %v", offset, err)
return err
}

written, err := sink.Pwrite(buffer, offset)
written, err := sink.Pwrite(buffer, uint64(offset))
if err != nil {
klog.Errorf("Failed to write data block at offset %d to local file: %v", block.Offset, err)
return err
}

updateProgress(written)
count += uint32(length)
count += int64(length)
}
return nil
}
Expand All @@ -669,7 +680,7 @@ func CopyRange(handle NbdOperations, sink VDDKDataSink, block *BlockStatusData,
type VDDKDataSink interface {
Pwrite(buf []byte, offset uint64) (int, error)
Write(buf []byte) (int, error)
ZeroRange(offset uint64, length uint32) error
ZeroRange(offset int64, length int64) error
Close()
}

Expand Down Expand Up @@ -726,11 +737,11 @@ func (sink *VDDKFileSink) Write(buf []byte) (int, error) {
}

// ZeroRange fills the destination range with zero bytes
func (sink *VDDKFileSink) ZeroRange(offset uint64, length uint32) error {
punch := func(offset uint64, length uint32) error {
func (sink *VDDKFileSink) ZeroRange(offset int64, length int64) error {
punch := func(offset int64, length int64) error {
klog.Infof("Punching %d-byte hole at offset %d", length, offset)
flags := uint32(unix.FALLOC_FL_PUNCH_HOLE | unix.FALLOC_FL_KEEP_SIZE)
return syscall.Fallocate(int(sink.file.Fd()), flags, int64(offset), int64(length))
return syscall.Fallocate(int(sink.file.Fd()), flags, offset, length)
}

var err error
Expand All @@ -742,31 +753,31 @@ func (sink *VDDKFileSink) ZeroRange(offset uint64, length uint32) error {
if err != nil {
klog.Errorf("Unable to stat destination file: %v", err)
} else { // Filesystem
if offset+uint64(length) > uint64(info.Size()) { // Truncate only if extending the file
err = syscall.Ftruncate(int(sink.file.Fd()), int64(offset+uint64(length)))
if offset+length > info.Size() { // Truncate only if extending the file
err = syscall.Ftruncate(int(sink.file.Fd()), offset+length)
} else { // Otherwise, try to punch a hole in the file
err = punch(offset, length)
}
}
}

if err != nil { // Fall back to regular pwrite
klog.Errorf("Unable to zero range %d - %d on destination, falling back to pwrite: %v", offset, offset+uint64(length), err)
klog.Errorf("Unable to zero range %d - %d on destination, falling back to pwrite: %v", offset, offset+length, err)
err = nil
count := uint32(0)
blocksize := uint32(16 << 20)
buffer := bytes.Repeat([]byte{0}, int(blocksize))
count := int64(0)
const blocksize = 16 << 20
buffer := bytes.Repeat([]byte{0}, blocksize)
for count < length {
remaining := length - count
if remaining < blocksize {
buffer = bytes.Repeat([]byte{0}, int(remaining))
}
written, err := sink.Pwrite(buffer, offset)
written, err := sink.Pwrite(buffer, uint64(offset))
if err != nil {
klog.Errorf("Unable to write %d zeroes at offset %d: %v", length, offset, err)
break
}
count += uint32(written)
count += int64(written)
}
}

Expand Down
Loading

0 comments on commit 38f1159

Please sign in to comment.