Skip to content

Commit

Permalink
First stab at refactoring mounter
Browse files Browse the repository at this point in the history
Signed-off-by: Jimmy Moore <[email protected]>
  • Loading branch information
jimmyaxod committed Jan 16, 2025
1 parent c0597ce commit c99afba
Show file tree
Hide file tree
Showing 15 changed files with 403 additions and 1,132 deletions.
31 changes: 28 additions & 3 deletions cmd/drafter-mounter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"log"
"net"
Expand All @@ -15,9 +16,11 @@ import (
"sync"
"time"

"github.com/loopholelabs/drafter/pkg/common"
"github.com/loopholelabs/drafter/pkg/mounter"
"github.com/loopholelabs/drafter/pkg/packager"
"github.com/loopholelabs/goroutine-manager/pkg/manager"
"github.com/loopholelabs/silo/pkg/storage/migrator"
)

type CompositeDevices struct {
Expand Down Expand Up @@ -195,7 +198,7 @@ func main() {
)
defer goroutineManager.Wait()
defer goroutineManager.StopAllGoroutines()
defer goroutineManager.CreateBackgroundPanicCollector()()
// defer goroutineManager.CreateBackgroundPanicCollector()()

bubbleSignals := false

Expand Down Expand Up @@ -445,13 +448,13 @@ l:

defer migratableMounter.Close()

migrateToDevices := []mounter.MigrateToDevice{}
migrateToDevices := []common.MigrateToDevice{}
for _, device := range devices {
if !device.MakeMigratable {
continue
}

migrateToDevices = append(migrateToDevices, mounter.MigrateToDevice{
migrateToDevices = append(migrateToDevices, common.MigrateToDevice{
Name: device.Name,

MaxDirtyBlocks: device.MaxDirtyBlocks,
Expand Down Expand Up @@ -530,9 +533,31 @@ l:
OnAllMigrationsCompleted: func() {
log.Println("Completed all device migrations")
},
OnProgress: func(p map[string]*migrator.MigrationProgress) {
totalSize := 0
totalDone := 0
for _, prog := range p {
totalSize += (prog.TotalBlocks * prog.BlockSize)
totalDone += (prog.ReadyBlocks * prog.BlockSize)
}

perc := float64(0.0)
if totalSize > 0 {
perc = float64(totalDone) * 100 / float64(totalSize)
}
// Report overall migration progress
log.Printf("# Overall migration Progress # (%d / %d) %.1f%%\n", totalDone, totalSize, perc)

// Report individual devices
for name, prog := range p {
log.Printf(" [%s] Progress Migrated Blocks (%d/%d) %d ready %d total\n", name, prog.MigratedBlocks, prog.TotalBlocks, prog.ReadyBlocks, prog.TotalMigratedBlocks)
}

},
},
)
}(); err != nil {
fmt.Printf("ERROR %v\n", err)
panic(err)
}
}
Expand Down
17 changes: 2 additions & 15 deletions cmd/drafter-peer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,26 +558,13 @@ func main() {

log.Println("Migrating to", conn.RemoteAddr())

makeMigratableDevices := []mounter.MakeMigratableDevice{}
migrateToDevices := []common.MigrateToDevice{}
for _, device := range devices {
if !device.MakeMigratable || device.Shared {
continue
}

makeMigratableDevices = append(makeMigratableDevices, mounter.MakeMigratableDevice{
Name: device.Name,

Expiry: device.Expiry,
})
}

migrateToDevices := []mounter.MigrateToDevice{}
for _, device := range devices {
if !device.MakeMigratable || device.Shared {
continue
}

migrateToDevices = append(migrateToDevices, mounter.MigrateToDevice{
migrateToDevices = append(migrateToDevices, common.MigrateToDevice{
Name: device.Name,

MaxDirtyBlocks: device.MaxDirtyBlocks,
Expand Down
14 changes: 14 additions & 0 deletions hack/migrate_mount.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
sudo drafter-mounter --laddr '' --raddr 'localhost:1337' --devices '[
{
"name": "testdata",
"base": "testdata2",
"blockSize": 1048576,
"expiry": 1000000000,
"maxDirtyBlocks": 200,
"minCycles": 5,
"maxCycles": 20,
"cycleThrottle": 500000000,
"makeMigratable": true,
"shared": false
}
]'
14 changes: 14 additions & 0 deletions hack/mount.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
sudo drafter-mounter --raddr '' --laddr 'localhost:1337' --devices '[
{
"name": "testdata",
"base": "testdata",
"blockSize": 1048576,
"expiry": 1000000000,
"maxDirtyBlocks": 200,
"minCycles": 5,
"maxCycles": 20,
"cycleThrottle": 500000000,
"makeMigratable": true,
"shared": false
}
]'
7 changes: 5 additions & 2 deletions pkg/common/dirty_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ import (
"sync"
"time"

"github.com/loopholelabs/drafter/pkg/mounter"
"github.com/loopholelabs/drafter/pkg/packager"
)

var (
ErrCouldNotSuspendAndMsyncVM = errors.New("could not suspend and msync VM")
)

type DeviceStatus struct {
TotalCycles int
CycleThrottle time.Duration
Expand Down Expand Up @@ -168,7 +171,7 @@ func (dm *DirtyManager) PostMigrateDirty(name string, blocks []uint) (bool, erro
dm.suspendLock.Unlock()

if err != nil {
return true, errors.Join(mounter.ErrCouldNotSuspendAndMsyncVM, err)
return true, errors.Join(ErrCouldNotSuspendAndMsyncVM, err)
}
} else {
dm.suspendLock.Unlock()
Expand Down
38 changes: 29 additions & 9 deletions pkg/common/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"path/filepath"
"strings"
"syscall"
"time"

"github.com/loopholelabs/drafter/pkg/mounter"
"github.com/loopholelabs/drafter/pkg/snapshotter"
"github.com/loopholelabs/logging/types"
"github.com/loopholelabs/silo/pkg/storage/config"
Expand All @@ -22,6 +22,23 @@ import (
"golang.org/x/sys/unix"
)

var (
ErrCouldNotCreateDeviceDirectory = errors.New("could not create device directory")
ErrCouldNotGetBaseDeviceStat = errors.New("could not get base device statistics")
ErrCouldNotCreateOverlayDirectory = errors.New("could not create overlay directory")
ErrCouldNotCreateStateDirectory = errors.New("could not create state directory")
)

type MigrateToDevice struct {
Name string `json:"name"`

MaxDirtyBlocks int `json:"maxDirtyBlocks"`
MinCycles int `json:"minCycles"`
MaxCycles int `json:"maxCycles"`

CycleThrottle time.Duration `json:"cycleThrottle"`
}

type MigrateFromDevice struct {
Name string `json:"name"`
Base string `json:"base"`
Expand All @@ -38,6 +55,9 @@ var (

// expose a Silo Device as a file within the vm directory
func ExposeSiloDeviceAsFile(vmpath string, name string, devicePath string) error {
if vmpath == "" {
return nil
}
deviceInfo, err := os.Stat(devicePath)
if err != nil {
return errors.Join(snapshotter.ErrCouldNotGetDeviceStat, err)
Expand All @@ -64,7 +84,7 @@ func ExposeSiloDeviceAsFile(vmpath string, name string, devicePath string) error
func CreateSiloDevSchema(i *MigrateFromDevice) (*config.DeviceSchema, error) {
stat, err := os.Stat(i.Base)
if err != nil {
return nil, errors.Join(mounter.ErrCouldNotGetBaseDeviceStat, err)
return nil, errors.Join(ErrCouldNotGetBaseDeviceStat, err)
}

ds := &config.DeviceSchema{
Expand All @@ -76,19 +96,19 @@ func CreateSiloDevSchema(i *MigrateFromDevice) (*config.DeviceSchema, error) {
if strings.TrimSpace(i.Overlay) == "" || strings.TrimSpace(i.State) == "" {
err := os.MkdirAll(filepath.Dir(i.Base), os.ModePerm)
if err != nil {
return nil, errors.Join(mounter.ErrCouldNotCreateDeviceDirectory, err)
return nil, errors.Join(ErrCouldNotCreateDeviceDirectory, err)
}
ds.System = "file"
ds.Location = i.Base
} else {
err := os.MkdirAll(filepath.Dir(i.Overlay), os.ModePerm)
if err != nil {
return nil, errors.Join(mounter.ErrCouldNotCreateOverlayDirectory, err)
return nil, errors.Join(ErrCouldNotCreateOverlayDirectory, err)
}

err = os.MkdirAll(filepath.Dir(i.State), os.ModePerm)
if err != nil {
return nil, errors.Join(mounter.ErrCouldNotCreateStateDirectory, err)
return nil, errors.Join(ErrCouldNotCreateStateDirectory, err)
}

ds.System = "sparsefile"
Expand All @@ -114,19 +134,19 @@ func CreateIncomingSiloDevSchema(i *MigrateFromDevice, schema *config.DeviceSche
if strings.TrimSpace(i.Overlay) == "" || strings.TrimSpace(i.State) == "" {
err := os.MkdirAll(filepath.Dir(i.Base), os.ModePerm)
if err != nil {
return nil, errors.Join(mounter.ErrCouldNotCreateDeviceDirectory, err)
return nil, errors.Join(ErrCouldNotCreateDeviceDirectory, err)
}
ds.System = "file"
ds.Location = i.Base
} else {
err := os.MkdirAll(filepath.Dir(i.Overlay), os.ModePerm)
if err != nil {
return nil, errors.Join(mounter.ErrCouldNotCreateOverlayDirectory, err)
return nil, errors.Join(ErrCouldNotCreateOverlayDirectory, err)
}

err = os.MkdirAll(filepath.Dir(i.State), os.ModePerm)
if err != nil {
return nil, errors.Join(mounter.ErrCouldNotCreateStateDirectory, err)
return nil, errors.Join(ErrCouldNotCreateStateDirectory, err)
}

ds.System = "sparsefile"
Expand Down Expand Up @@ -276,7 +296,7 @@ func MigrateFromPipe(log types.Logger, met metrics.SiloMetrics, vmpath string,
*/
func MigrateToPipe(ctx context.Context, readers []io.Reader, writers []io.Writer,
dg *devicegroup.DeviceGroup, concurrency int, onProgress func(p map[string]*migrator.MigrationProgress),
vmState *VMStateMgr, devices []mounter.MigrateToDevice, getCustomPayload func() []byte) error {
vmState *VMStateMgr, devices []MigrateToDevice, getCustomPayload func() []byte) error {

// Create a protocol for use by Silo
pro := protocol.NewRW(ctx, readers, writers, nil)
Expand Down
3 changes: 1 addition & 2 deletions pkg/common/migrations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"testing"
"time"

"github.com/loopholelabs/drafter/pkg/mounter"
"github.com/loopholelabs/silo/pkg/storage/config"
"github.com/loopholelabs/silo/pkg/storage/devicegroup"
"github.com/loopholelabs/silo/pkg/storage/migrator"
Expand Down Expand Up @@ -144,7 +143,7 @@ func TestMigrateFromFsThenBetween(t *testing.T) {
fmt.Printf("Progress...\n")
}

devices := []mounter.MigrateToDevice{
devices := []MigrateToDevice{
{
Name: "test",
MaxDirtyBlocks: 10,
Expand Down
12 changes: 12 additions & 0 deletions pkg/common/vm_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@ func NewVMStateMgr(ctx context.Context,
}
}

func NewDummyVMStateMgr(ctx context.Context) *VMStateMgr {
return &VMStateMgr{
ctx: ctx,
suspendFunc: func(context.Context, time.Duration) error { return nil },
suspendTimeout: 10 * time.Second,
msyncFunc: func(context.Context) error { return nil },
onBeforeSuspend: func() {},
onAfterSuspend: func() {},
suspendedCh: make(chan struct{}),
}
}

func (sm *VMStateMgr) GetSuspsnededVMCh() chan struct{} {
return sm.suspendedCh
}
Expand Down
Loading

0 comments on commit c99afba

Please sign in to comment.