From c99afba0cce679c24c8545371457d46274eea18b Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Thu, 16 Jan 2025 12:57:53 +0000 Subject: [PATCH] First stab at refactoring mounter Signed-off-by: Jimmy Moore --- cmd/drafter-mounter/main.go | 31 +- cmd/drafter-peer/main.go | 17 +- hack/migrate_mount.sh | 14 + hack/mount.sh | 14 + pkg/common/dirty_manager.go | 7 +- pkg/common/migrations.go | 38 ++- pkg/common/migrations_test.go | 3 +- pkg/common/vm_state.go | 12 + pkg/mounter/make_migratable.go | 128 -------- pkg/mounter/migratable_mounter.go | 70 ++++ pkg/mounter/migrate_from.go | 524 ------------------------------ pkg/mounter/migrate_to.go | 406 ----------------------- pkg/mounter/migrated_mounter.go | 227 +++++++++++++ pkg/mounter/stages.go | 41 --- pkg/peer/resumed_peer.go | 3 +- 15 files changed, 403 insertions(+), 1132 deletions(-) create mode 100755 hack/migrate_mount.sh create mode 100755 hack/mount.sh delete mode 100644 pkg/mounter/make_migratable.go create mode 100644 pkg/mounter/migratable_mounter.go delete mode 100644 pkg/mounter/migrate_from.go delete mode 100644 pkg/mounter/migrate_to.go create mode 100644 pkg/mounter/migrated_mounter.go delete mode 100644 pkg/mounter/stages.go diff --git a/cmd/drafter-mounter/main.go b/cmd/drafter-mounter/main.go index b47c421..2f8a9b5 100644 --- a/cmd/drafter-mounter/main.go +++ b/cmd/drafter-mounter/main.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "flag" + "fmt" "io" "log" "net" @@ -15,9 +16,11 @@ import ( "sync" "time" + "github.com/loopholelabs/drafter/pkg/common" "github.com/loopholelabs/drafter/pkg/mounter" "github.com/loopholelabs/drafter/pkg/packager" "github.com/loopholelabs/goroutine-manager/pkg/manager" + "github.com/loopholelabs/silo/pkg/storage/migrator" ) type CompositeDevices struct { @@ -195,7 +198,7 @@ func main() { ) defer goroutineManager.Wait() defer goroutineManager.StopAllGoroutines() - defer goroutineManager.CreateBackgroundPanicCollector()() + // defer goroutineManager.CreateBackgroundPanicCollector()() bubbleSignals := false @@ -445,13 +448,13 @@ l: defer migratableMounter.Close() - migrateToDevices := []mounter.MigrateToDevice{} + migrateToDevices := []common.MigrateToDevice{} for _, device := range devices { if !device.MakeMigratable { continue } - migrateToDevices = append(migrateToDevices, mounter.MigrateToDevice{ + migrateToDevices = append(migrateToDevices, common.MigrateToDevice{ Name: device.Name, MaxDirtyBlocks: device.MaxDirtyBlocks, @@ -530,9 +533,31 @@ l: OnAllMigrationsCompleted: func() { log.Println("Completed all device migrations") }, + OnProgress: func(p map[string]*migrator.MigrationProgress) { + totalSize := 0 + totalDone := 0 + for _, prog := range p { + totalSize += (prog.TotalBlocks * prog.BlockSize) + totalDone += (prog.ReadyBlocks * prog.BlockSize) + } + + perc := float64(0.0) + if totalSize > 0 { + perc = float64(totalDone) * 100 / float64(totalSize) + } + // Report overall migration progress + log.Printf("# Overall migration Progress # (%d / %d) %.1f%%\n", totalDone, totalSize, perc) + + // Report individual devices + for name, prog := range p { + log.Printf(" [%s] Progress Migrated Blocks (%d/%d) %d ready %d total\n", name, prog.MigratedBlocks, prog.TotalBlocks, prog.ReadyBlocks, prog.TotalMigratedBlocks) + } + + }, }, ) }(); err != nil { + fmt.Printf("ERROR %v\n", err) panic(err) } } diff --git a/cmd/drafter-peer/main.go b/cmd/drafter-peer/main.go index 082a035..1f900e0 100644 --- a/cmd/drafter-peer/main.go +++ b/cmd/drafter-peer/main.go @@ -558,26 +558,13 @@ func main() { log.Println("Migrating to", conn.RemoteAddr()) - makeMigratableDevices := []mounter.MakeMigratableDevice{} + migrateToDevices := []common.MigrateToDevice{} for _, device := range devices { if !device.MakeMigratable || device.Shared { continue } - makeMigratableDevices = append(makeMigratableDevices, mounter.MakeMigratableDevice{ - Name: device.Name, - - Expiry: device.Expiry, - }) - } - - migrateToDevices := []mounter.MigrateToDevice{} - for _, device := range devices { - if !device.MakeMigratable || device.Shared { - continue - } - - migrateToDevices = append(migrateToDevices, mounter.MigrateToDevice{ + migrateToDevices = append(migrateToDevices, common.MigrateToDevice{ Name: device.Name, MaxDirtyBlocks: device.MaxDirtyBlocks, diff --git a/hack/migrate_mount.sh b/hack/migrate_mount.sh new file mode 100755 index 0000000..1196341 --- /dev/null +++ b/hack/migrate_mount.sh @@ -0,0 +1,14 @@ +sudo drafter-mounter --laddr '' --raddr 'localhost:1337' --devices '[ + { + "name": "testdata", + "base": "testdata2", + "blockSize": 1048576, + "expiry": 1000000000, + "maxDirtyBlocks": 200, + "minCycles": 5, + "maxCycles": 20, + "cycleThrottle": 500000000, + "makeMigratable": true, + "shared": false + } +]' diff --git a/hack/mount.sh b/hack/mount.sh new file mode 100755 index 0000000..8ff04e4 --- /dev/null +++ b/hack/mount.sh @@ -0,0 +1,14 @@ +sudo drafter-mounter --raddr '' --laddr 'localhost:1337' --devices '[ + { + "name": "testdata", + "base": "testdata", + "blockSize": 1048576, + "expiry": 1000000000, + "maxDirtyBlocks": 200, + "minCycles": 5, + "maxCycles": 20, + "cycleThrottle": 500000000, + "makeMigratable": true, + "shared": false + } +]' diff --git a/pkg/common/dirty_manager.go b/pkg/common/dirty_manager.go index c7c7fd0..f536031 100644 --- a/pkg/common/dirty_manager.go +++ b/pkg/common/dirty_manager.go @@ -5,10 +5,13 @@ import ( "sync" "time" - "github.com/loopholelabs/drafter/pkg/mounter" "github.com/loopholelabs/drafter/pkg/packager" ) +var ( + ErrCouldNotSuspendAndMsyncVM = errors.New("could not suspend and msync VM") +) + type DeviceStatus struct { TotalCycles int CycleThrottle time.Duration @@ -168,7 +171,7 @@ func (dm *DirtyManager) PostMigrateDirty(name string, blocks []uint) (bool, erro dm.suspendLock.Unlock() if err != nil { - return true, errors.Join(mounter.ErrCouldNotSuspendAndMsyncVM, err) + return true, errors.Join(ErrCouldNotSuspendAndMsyncVM, err) } } else { dm.suspendLock.Unlock() diff --git a/pkg/common/migrations.go b/pkg/common/migrations.go index 8038051..1211422 100644 --- a/pkg/common/migrations.go +++ b/pkg/common/migrations.go @@ -9,8 +9,8 @@ import ( "path/filepath" "strings" "syscall" + "time" - "github.com/loopholelabs/drafter/pkg/mounter" "github.com/loopholelabs/drafter/pkg/snapshotter" "github.com/loopholelabs/logging/types" "github.com/loopholelabs/silo/pkg/storage/config" @@ -22,6 +22,23 @@ import ( "golang.org/x/sys/unix" ) +var ( + ErrCouldNotCreateDeviceDirectory = errors.New("could not create device directory") + ErrCouldNotGetBaseDeviceStat = errors.New("could not get base device statistics") + ErrCouldNotCreateOverlayDirectory = errors.New("could not create overlay directory") + ErrCouldNotCreateStateDirectory = errors.New("could not create state directory") +) + +type MigrateToDevice struct { + Name string `json:"name"` + + MaxDirtyBlocks int `json:"maxDirtyBlocks"` + MinCycles int `json:"minCycles"` + MaxCycles int `json:"maxCycles"` + + CycleThrottle time.Duration `json:"cycleThrottle"` +} + type MigrateFromDevice struct { Name string `json:"name"` Base string `json:"base"` @@ -38,6 +55,9 @@ var ( // expose a Silo Device as a file within the vm directory func ExposeSiloDeviceAsFile(vmpath string, name string, devicePath string) error { + if vmpath == "" { + return nil + } deviceInfo, err := os.Stat(devicePath) if err != nil { return errors.Join(snapshotter.ErrCouldNotGetDeviceStat, err) @@ -64,7 +84,7 @@ func ExposeSiloDeviceAsFile(vmpath string, name string, devicePath string) error func CreateSiloDevSchema(i *MigrateFromDevice) (*config.DeviceSchema, error) { stat, err := os.Stat(i.Base) if err != nil { - return nil, errors.Join(mounter.ErrCouldNotGetBaseDeviceStat, err) + return nil, errors.Join(ErrCouldNotGetBaseDeviceStat, err) } ds := &config.DeviceSchema{ @@ -76,19 +96,19 @@ func CreateSiloDevSchema(i *MigrateFromDevice) (*config.DeviceSchema, error) { if strings.TrimSpace(i.Overlay) == "" || strings.TrimSpace(i.State) == "" { err := os.MkdirAll(filepath.Dir(i.Base), os.ModePerm) if err != nil { - return nil, errors.Join(mounter.ErrCouldNotCreateDeviceDirectory, err) + return nil, errors.Join(ErrCouldNotCreateDeviceDirectory, err) } ds.System = "file" ds.Location = i.Base } else { err := os.MkdirAll(filepath.Dir(i.Overlay), os.ModePerm) if err != nil { - return nil, errors.Join(mounter.ErrCouldNotCreateOverlayDirectory, err) + return nil, errors.Join(ErrCouldNotCreateOverlayDirectory, err) } err = os.MkdirAll(filepath.Dir(i.State), os.ModePerm) if err != nil { - return nil, errors.Join(mounter.ErrCouldNotCreateStateDirectory, err) + return nil, errors.Join(ErrCouldNotCreateStateDirectory, err) } ds.System = "sparsefile" @@ -114,19 +134,19 @@ func CreateIncomingSiloDevSchema(i *MigrateFromDevice, schema *config.DeviceSche if strings.TrimSpace(i.Overlay) == "" || strings.TrimSpace(i.State) == "" { err := os.MkdirAll(filepath.Dir(i.Base), os.ModePerm) if err != nil { - return nil, errors.Join(mounter.ErrCouldNotCreateDeviceDirectory, err) + return nil, errors.Join(ErrCouldNotCreateDeviceDirectory, err) } ds.System = "file" ds.Location = i.Base } else { err := os.MkdirAll(filepath.Dir(i.Overlay), os.ModePerm) if err != nil { - return nil, errors.Join(mounter.ErrCouldNotCreateOverlayDirectory, err) + return nil, errors.Join(ErrCouldNotCreateOverlayDirectory, err) } err = os.MkdirAll(filepath.Dir(i.State), os.ModePerm) if err != nil { - return nil, errors.Join(mounter.ErrCouldNotCreateStateDirectory, err) + return nil, errors.Join(ErrCouldNotCreateStateDirectory, err) } ds.System = "sparsefile" @@ -276,7 +296,7 @@ func MigrateFromPipe(log types.Logger, met metrics.SiloMetrics, vmpath string, */ func MigrateToPipe(ctx context.Context, readers []io.Reader, writers []io.Writer, dg *devicegroup.DeviceGroup, concurrency int, onProgress func(p map[string]*migrator.MigrationProgress), - vmState *VMStateMgr, devices []mounter.MigrateToDevice, getCustomPayload func() []byte) error { + vmState *VMStateMgr, devices []MigrateToDevice, getCustomPayload func() []byte) error { // Create a protocol for use by Silo pro := protocol.NewRW(ctx, readers, writers, nil) diff --git a/pkg/common/migrations_test.go b/pkg/common/migrations_test.go index 802e36d..44eb4d7 100644 --- a/pkg/common/migrations_test.go +++ b/pkg/common/migrations_test.go @@ -14,7 +14,6 @@ import ( "testing" "time" - "github.com/loopholelabs/drafter/pkg/mounter" "github.com/loopholelabs/silo/pkg/storage/config" "github.com/loopholelabs/silo/pkg/storage/devicegroup" "github.com/loopholelabs/silo/pkg/storage/migrator" @@ -144,7 +143,7 @@ func TestMigrateFromFsThenBetween(t *testing.T) { fmt.Printf("Progress...\n") } - devices := []mounter.MigrateToDevice{ + devices := []MigrateToDevice{ { Name: "test", MaxDirtyBlocks: 10, diff --git a/pkg/common/vm_state.go b/pkg/common/vm_state.go index e5f4f9e..0ac2e0a 100644 --- a/pkg/common/vm_state.go +++ b/pkg/common/vm_state.go @@ -41,6 +41,18 @@ func NewVMStateMgr(ctx context.Context, } } +func NewDummyVMStateMgr(ctx context.Context) *VMStateMgr { + return &VMStateMgr{ + ctx: ctx, + suspendFunc: func(context.Context, time.Duration) error { return nil }, + suspendTimeout: 10 * time.Second, + msyncFunc: func(context.Context) error { return nil }, + onBeforeSuspend: func() {}, + onAfterSuspend: func() {}, + suspendedCh: make(chan struct{}), + } +} + func (sm *VMStateMgr) GetSuspsnededVMCh() chan struct{} { return sm.suspendedCh } diff --git a/pkg/mounter/make_migratable.go b/pkg/mounter/make_migratable.go deleted file mode 100644 index 6fb2e5f..0000000 --- a/pkg/mounter/make_migratable.go +++ /dev/null @@ -1,128 +0,0 @@ -package mounter - -import ( - "context" - "errors" - "time" - - iutils "github.com/loopholelabs/drafter/internal/utils" - "github.com/loopholelabs/goroutine-manager/pkg/manager" - "github.com/loopholelabs/silo/pkg/storage/blocks" - "github.com/loopholelabs/silo/pkg/storage/dirtytracker" - "github.com/loopholelabs/silo/pkg/storage/modules" - "github.com/loopholelabs/silo/pkg/storage/volatilitymonitor" -) - -type MigratedDevice struct { - Name string `json:"name"` - Path string `json:"path"` -} - -type MakeMigratableDevice struct { - Name string `json:"name"` - - Expiry time.Duration `json:"expiry"` -} - -type MigratedMounter struct { - Devices []MigratedDevice - - Wait func() error - Close func() error - - stage2Inputs []migrateFromAndMountStage -} - -func (migratedMounter *MigratedMounter) MakeMigratable( - ctx context.Context, - - devices []MakeMigratableDevice, -) (migratableMounter *MigratableMounter, errs error) { - migratableMounter = &MigratableMounter{ - Close: func() {}, - } - - goroutineManager := manager.NewGoroutineManager( - ctx, - &errs, - manager.GoroutineManagerHooks{}, - ) - defer goroutineManager.Wait() - defer goroutineManager.StopAllGoroutines() - defer goroutineManager.CreateBackgroundPanicCollector()() - - stage3Inputs := []makeMigratableFilterStage{} - for _, input := range migratedMounter.stage2Inputs { - var makeMigratableDevice *MakeMigratableDevice - for _, device := range devices { - if device.Name == input.name { - makeMigratableDevice = &device - - break - } - } - - // We don't want to make this device migratable - if makeMigratableDevice == nil { - continue - } - - stage3Inputs = append(stage3Inputs, makeMigratableFilterStage{ - prev: input, - - makeMigratableDevice: *makeMigratableDevice, - }) - } - - var ( - deferFuncs [][]func() error - err error - ) - migratableMounter.stage4Inputs, deferFuncs, err = iutils.ConcurrentMap( - stage3Inputs, - func(index int, input makeMigratableFilterStage, output *makeMigratableDeviceStage, addDefer func(deferFunc func() error)) error { - output.prev = input - - dirtyLocal, dirtyRemote := dirtytracker.NewDirtyTracker(input.prev.storage, int(input.prev.blockSize)) - output.dirtyRemote = dirtyRemote - monitor := volatilitymonitor.NewVolatilityMonitor(dirtyLocal, int(input.prev.blockSize), input.makeMigratableDevice.Expiry) - - local := modules.NewLockable(monitor) - output.storage = local - addDefer(func() error { - local.Unlock() - - return nil - }) - - input.prev.device.SetProvider(local) - - totalBlocks := (int(local.Size()) + int(input.prev.blockSize) - 1) / int(input.prev.blockSize) - output.totalBlocks = totalBlocks - - orderer := blocks.NewPriorityBlockOrder(totalBlocks, monitor) - output.orderer = orderer - orderer.AddAll() - - return nil - }, - ) - - migratableMounter.Close = func() { - // Make sure that we schedule the `deferFuncs` even if we get an error - for _, deferFuncs := range deferFuncs { - for _, deferFunc := range deferFuncs { - defer deferFunc() // We can safely ignore errors here since we never call `addDefer` with a function that could return an error - } - } - } - - if err != nil { - // Make sure that we schedule the `deferFuncs` even if we get an error - migratableMounter.Close() - - panic(errors.Join(ErrCouldNotCreateMigratableMounter, err)) - } - - return -} diff --git a/pkg/mounter/migratable_mounter.go b/pkg/mounter/migratable_mounter.go new file mode 100644 index 0000000..0784462 --- /dev/null +++ b/pkg/mounter/migratable_mounter.go @@ -0,0 +1,70 @@ +package mounter + +import ( + "context" + "fmt" + "io" + "runtime/debug" + "sync" + + "github.com/loopholelabs/drafter/pkg/common" + "github.com/loopholelabs/silo/pkg/storage/devicegroup" + "github.com/loopholelabs/silo/pkg/storage/migrator" +) + +type MigratableMounter struct { + Close func() + + Dg *devicegroup.DeviceGroup + DgLock sync.Mutex +} + +type MounterMigrateToHooks struct { + OnBeforeGetDirtyBlocks func(deviceID uint32, remote bool) + + OnDeviceSent func(deviceID uint32, remote bool) + OnDeviceAuthoritySent func(deviceID uint32, remote bool) + OnDeviceInitialMigrationProgress func(deviceID uint32, remote bool, ready int, total int) + OnDeviceContinousMigrationProgress func(deviceID uint32, remote bool, delta int) + OnDeviceFinalMigrationProgress func(deviceID uint32, remote bool, delta int) + OnDeviceMigrationCompleted func(deviceID uint32, remote bool) + + OnProgress func(p map[string]*migrator.MigrationProgress) + + OnAllDevicesSent func() + OnAllMigrationsCompleted func() +} + +func (migratableMounter *MigratableMounter) MigrateTo( + ctx context.Context, + + devices []common.MigrateToDevice, + + concurrency int, + + readers []io.Reader, + writers []io.Writer, + + hooks MounterMigrateToHooks, +) (errs error) { + defer func() { + err := recover() + if err != nil { + fmt.Printf("ERROR in MigrateTo %v\n", err) + debug.PrintStack() + } + }() + + vmStateMgr := common.NewDummyVMStateMgr(ctx) + err := common.MigrateToPipe(ctx, readers, writers, migratableMounter.Dg, concurrency, hooks.OnProgress, vmStateMgr, devices, nil) + + if err != nil { + return err + } + + if hooks.OnAllMigrationsCompleted != nil { + hooks.OnAllMigrationsCompleted() + } + + return +} diff --git a/pkg/mounter/migrate_from.go b/pkg/mounter/migrate_from.go deleted file mode 100644 index b5d7fd2..0000000 --- a/pkg/mounter/migrate_from.go +++ /dev/null @@ -1,524 +0,0 @@ -package mounter - -import ( - "context" - "errors" - "fmt" - "io" - "os" - "path/filepath" - "slices" - "strings" - "sync" - "sync/atomic" - - iutils "github.com/loopholelabs/drafter/internal/utils" - "github.com/loopholelabs/drafter/pkg/registry" - "github.com/loopholelabs/drafter/pkg/terminator" - "github.com/loopholelabs/goroutine-manager/pkg/manager" - "github.com/loopholelabs/silo/pkg/storage" - "github.com/loopholelabs/silo/pkg/storage/config" - "github.com/loopholelabs/silo/pkg/storage/device" - "github.com/loopholelabs/silo/pkg/storage/protocol" - "github.com/loopholelabs/silo/pkg/storage/protocol/packets" - "github.com/loopholelabs/silo/pkg/storage/waitingcache" -) - -type MigrateFromAndMountDevice struct { - Name string `json:"name"` - - Base string `json:"base"` - Overlay string `json:"overlay"` - State string `json:"state"` - - BlockSize uint32 `json:"blockSize"` -} - -type MigrateFromHooks struct { - OnRemoteDeviceReceived func(remoteDeviceID uint32, name string) - OnRemoteDeviceExposed func(remoteDeviceID uint32, path string) - OnRemoteDeviceAuthorityReceived func(remoteDeviceID uint32) - OnRemoteDeviceMigrationCompleted func(remoteDeviceID uint32) - - OnRemoteAllDevicesReceived func() - OnRemoteAllMigrationsCompleted func() - - OnLocalDeviceRequested func(localDeviceID uint32, name string) - OnLocalDeviceExposed func(localDeviceID uint32, path string) - - OnLocalAllDevicesRequested func() - - OnXferCustomData func([]byte) -} - -func MigrateFromAndMount( - mounterCtx context.Context, - migrateFromCtx context.Context, - - devices []MigrateFromAndMountDevice, - - readers []io.Reader, - writers []io.Writer, - - hooks MigrateFromHooks, -) ( - migratedMounter *MigratedMounter, - - errs error, -) { - migratedMounter = &MigratedMounter{ - Devices: []MigratedDevice{}, - - Wait: func() error { - return nil - }, - Close: func() error { - return nil - }, - } - - var ( - allRemoteDevicesReceived = make(chan struct{}) - signalAllRemoteDevicesReceived = sync.OnceFunc(func() { - close(allRemoteDevicesReceived) // We can safely close() this channel since the caller only runs once/is `sync.OnceFunc`d - }) - - allRemoteDevicesReady = make(chan struct{}) - signalAllRemoteDevicesReady = sync.OnceFunc(func() { - close(allRemoteDevicesReady) // We can safely close() this channel since the caller only runs once/is `sync.OnceFunc`d - }) - ) - - // We don't `defer cancelProtocolCtx()` this because we cancel in the wait function - protocolCtx, cancelProtocolCtx := context.WithCancel(migrateFromCtx) - - // We overwrite this further down, but this is so that we don't leak the `protocolCtx` if we `panic()` before we set `WaitForMigrationsToComplete` - migratedMounter.Wait = func() error { - cancelProtocolCtx() - - return nil - } - - goroutineManager := manager.NewGoroutineManager( - migrateFromCtx, - &errs, - manager.GoroutineManagerHooks{}, - ) - defer goroutineManager.Wait() - defer goroutineManager.StopAllGoroutines() - defer goroutineManager.CreateBackgroundPanicCollector()() - - // Use an atomic counter and `allDevicesReady` and instead of a WaitGroup so that we can `select {}` without leaking a goroutine - var ( - receivedButNotReadyRemoteDevices atomic.Int32 - - deviceCloseFuncsLock sync.Mutex - deviceCloseFuncs []func() error - - stage2InputsLock sync.Mutex - - pro *protocol.RW - ) - if len(readers) > 0 && len(writers) > 0 { // Only open the protocol if we want passed in readers and writers - pro = protocol.NewRW( - protocolCtx, // We don't track this because we return the wait function - readers, - writers, - func(ctx context.Context, p protocol.Protocol, index uint32) { - var ( - from *protocol.FromProtocol - local *waitingcache.Local - ) - from = protocol.NewFromProtocol( - ctx, - index, - func(di *packets.DevInfo) storage.Provider { - // No need to `defer goroutineManager.HandlePanics` here - panics bubble upwards - - base := "" - for _, device := range devices { - if di.Name == device.Name { - base = device.Base - - break - } - } - - if strings.TrimSpace(base) == "" { - panic(terminator.ErrUnknownDeviceName) - } - - receivedButNotReadyRemoteDevices.Add(1) - - if hook := hooks.OnRemoteDeviceReceived; hook != nil { - hook(index, di.Name) - } - - if err := os.MkdirAll(filepath.Dir(base), os.ModePerm); err != nil { - panic(errors.Join(ErrCouldNotCreateDeviceDirectory, err)) - } - - src, device, err := device.NewDevice(&config.DeviceSchema{ - Name: di.Name, - System: "file", - Location: base, - Size: fmt.Sprintf("%v", di.Size), - BlockSize: fmt.Sprintf("%v", di.BlockSize), - Expose: true, - }) - if err != nil { - panic(errors.Join(terminator.ErrCouldNotCreateDevice, err)) - } - deviceCloseFuncsLock.Lock() - deviceCloseFuncs = append(deviceCloseFuncs, device.Shutdown) // defer device.Shutdown() - deviceCloseFuncsLock.Unlock() - - var remote *waitingcache.Remote - local, remote = waitingcache.NewWaitingCache(src, int(di.BlockSize)) - local.NeedAt = func(offset int64, length int32) { - // Only access the `from` protocol if it's not already closed - select { - case <-protocolCtx.Done(): - return - - default: - } - - if err := from.NeedAt(offset, length); err != nil { - panic(errors.Join(ErrCouldNotRequestBlock, err)) - } - } - local.DontNeedAt = func(offset int64, length int32) { - // Only access the `from` protocol if it's not already closed - select { - case <-protocolCtx.Done(): - return - - default: - } - - if err := from.DontNeedAt(offset, length); err != nil { - panic(errors.Join(ErrCouldNotReleaseBlock, err)) - } - } - - device.SetProvider(local) - - stage2InputsLock.Lock() - migratedMounter.stage2Inputs = append(migratedMounter.stage2Inputs, migrateFromAndMountStage{ - name: di.Name, - - blockSize: di.BlockSize, - - id: index, - remote: true, - - storage: local, - device: device, - }) - stage2InputsLock.Unlock() - - devicePath := filepath.Join("/dev", device.Device()) - - if hook := hooks.OnRemoteDeviceExposed; hook != nil { - hook(index, devicePath) - } - - return remote - }, - p, - ) - - goroutineManager.StartForegroundGoroutine(func(_ context.Context) { - if err := from.HandleReadAt(); err != nil { - panic(errors.Join(terminator.ErrCouldNotHandleReadAt, err)) - } - }) - - goroutineManager.StartForegroundGoroutine(func(_ context.Context) { - if err := from.HandleWriteAt(); err != nil { - panic(errors.Join(terminator.ErrCouldNotHandleWriteAt, err)) - } - }) - - goroutineManager.StartForegroundGoroutine(func(_ context.Context) { - if err := from.HandleDevInfo(); err != nil { - panic(errors.Join(terminator.ErrCouldNotHandleDevInfo, err)) - } - }) - - goroutineManager.StartForegroundGoroutine(func(_ context.Context) { - if err := from.HandleEvent(func(e *packets.Event) { - switch e.Type { - case packets.EventCustom: - switch e.CustomType { - case byte(registry.EventCustomAllDevicesSent): - signalAllRemoteDevicesReceived() - - if hook := hooks.OnRemoteAllDevicesReceived; hook != nil { - hook() - } - - case byte(registry.EventCustomTransferAuthority): - if receivedButNotReadyRemoteDevices.Add(-1) <= 0 { - signalAllRemoteDevicesReady() - } - - if hook := hooks.OnRemoteDeviceAuthorityReceived; hook != nil { - hook(index) - } - } - - case packets.EventCompleted: - if hook := hooks.OnRemoteDeviceMigrationCompleted; hook != nil { - hook(index) - } - } - }); err != nil { - panic(errors.Join(terminator.ErrCouldNotHandleEvent, err)) - } - }) - - goroutineManager.StartForegroundGoroutine(func(_ context.Context) { - if err := from.HandleDirtyList(func(blocks []uint) { - if local != nil { - local.DirtyBlocks(blocks) - } - }); err != nil { - panic(errors.Join(terminator.ErrCouldNotHandleDirtyList, err)) - } - }) - }) - } - - migratedMounter.Wait = sync.OnceValue(func() error { - defer cancelProtocolCtx() - - // If we haven't opened the protocol, don't wait for it - if pro != nil { - if err := pro.Handle(); err != nil && !errors.Is(err, io.EOF) { - return err - } - } - - // If it hasn't sent any devices, the remote Silo mounter doesn't send `EventCustomAllDevicesSent` - // After the protocol has closed without errors, we can safely assume that we won't receive any - // additional devices, so we mark all devices as received and ready - select { - case <-allRemoteDevicesReceived: - default: - signalAllRemoteDevicesReceived() - - // We need to call the hook manually too since we would otherwise only call if we received at least one device - if hook := hooks.OnRemoteAllDevicesReceived; hook != nil { - hook() - } - } - - signalAllRemoteDevicesReady() - - if hook := hooks.OnRemoteAllMigrationsCompleted; hook != nil { - hook() - } - - return nil - }) - migratedMounter.Close = func() (errs error) { - // We have to close the runner before we close the devices - defer func() { - if err := migratedMounter.Wait(); err != nil { - errs = errors.Join(errs, err) - } - }() - - deviceCloseFuncsLock.Lock() - defer deviceCloseFuncsLock.Unlock() - - for _, closeFunc := range deviceCloseFuncs { - defer func(closeFunc func() error) { - if err := closeFunc(); err != nil { - errs = errors.Join(errs, err) - } - }(closeFunc) - } - - return - } - - // We don't track this because we return the wait function - goroutineManager.StartBackgroundGoroutine(func(_ context.Context) { - if err := migratedMounter.Wait(); err != nil { - panic(errors.Join(registry.ErrCouldNotWaitForMigrationCompletion, err)) - } - }) - - // We don't track this because we return the close function - goroutineManager.StartBackgroundGoroutine(func(ctx context.Context) { - select { - // Failure case; we cancelled the internal context before all devices are ready - case <-ctx.Done(): - if err := migratedMounter.Close(); err != nil { - panic(errors.Join(ErrCouldNotCloseMigratedMounter, err)) - } - - // Happy case; all devices are ready and we want to wait with closing the devices until we stop the mounter - case <-allRemoteDevicesReady: - <-mounterCtx.Done() - - if err := migratedMounter.Close(); err != nil { - panic(errors.Join(ErrCouldNotCloseMigratedMounter, err)) - } - - break - } - }) - - select { - case <-goroutineManager.Context().Done(): - if err := goroutineManager.Context().Err(); err != nil { - panic(errors.Join(ErrMounterContextCancelled, err)) - } - - return - case <-allRemoteDevicesReceived: - break - } - - stage1Inputs := []MigrateFromAndMountDevice{} - for _, input := range devices { - if slices.ContainsFunc( - migratedMounter.stage2Inputs, - func(r migrateFromAndMountStage) bool { - return input.Name == r.name - }, - ) { - continue - } - - stage1Inputs = append(stage1Inputs, input) - } - - // Use an atomic counter instead of a WaitGroup so that we can wait without leaking a goroutine - var remainingRequestedLocalDevices atomic.Int32 - remainingRequestedLocalDevices.Add(int32(len(stage1Inputs))) - - _, deferFuncs, err := iutils.ConcurrentMap( - stage1Inputs, - func(index int, input MigrateFromAndMountDevice, _ *struct{}, addDefer func(deferFunc func() error)) error { - if hook := hooks.OnLocalDeviceRequested; hook != nil { - hook(uint32(index), input.Name) - } - - if remainingRequestedLocalDevices.Add(-1) <= 0 { - if hook := hooks.OnLocalAllDevicesRequested; hook != nil { - hook() - } - } - - stat, err := os.Stat(input.Base) - if err != nil { - return errors.Join(ErrCouldNotGetBaseDeviceStat, err) - } - - var ( - local storage.Provider - dev storage.ExposedStorage - ) - if strings.TrimSpace(input.Overlay) == "" || strings.TrimSpace(input.State) == "" { - local, dev, err = device.NewDevice(&config.DeviceSchema{ - Name: input.Name, - System: "file", - Location: input.Base, - Size: fmt.Sprintf("%v", stat.Size()), - BlockSize: fmt.Sprintf("%v", input.BlockSize), - Expose: true, - }) - } else { - if err := os.MkdirAll(filepath.Dir(input.Overlay), os.ModePerm); err != nil { - return errors.Join(ErrCouldNotCreateOverlayDirectory, err) - } - - if err := os.MkdirAll(filepath.Dir(input.State), os.ModePerm); err != nil { - return errors.Join(ErrCouldNotCreateStateDirectory, err) - } - - local, dev, err = device.NewDevice(&config.DeviceSchema{ - Name: input.Name, - System: "sparsefile", - Location: input.Overlay, - Size: fmt.Sprintf("%v", stat.Size()), - BlockSize: fmt.Sprintf("%v", input.BlockSize), - Expose: true, - ROSource: &config.DeviceSchema{ - Name: input.State, - System: "file", - Location: input.Base, - Size: fmt.Sprintf("%v", stat.Size()), - }, - }) - } - if err != nil { - return errors.Join(ErrCouldNotCreateLocalDevice, err) - } - addDefer(local.Close) - addDefer(dev.Shutdown) - - dev.SetProvider(local) - - stage2InputsLock.Lock() - migratedMounter.stage2Inputs = append(migratedMounter.stage2Inputs, migrateFromAndMountStage{ - name: input.Name, - - blockSize: input.BlockSize, - - id: uint32(index), - remote: false, - - storage: local, - device: dev, - }) - stage2InputsLock.Unlock() - - devicePath := filepath.Join("/dev", dev.Device()) - - if hook := hooks.OnLocalDeviceExposed; hook != nil { - hook(uint32(index), devicePath) - } - - return nil - }, - ) - - // Make sure that we schedule the `deferFuncs` even if we get an error during device setup - for _, deferFuncs := range deferFuncs { - for _, deferFunc := range deferFuncs { - deviceCloseFuncsLock.Lock() - deviceCloseFuncs = append(deviceCloseFuncs, deferFunc) // defer deferFunc() - deviceCloseFuncsLock.Unlock() - } - } - - if err != nil { - panic(errors.Join(ErrCouldNotSetupDevices, err)) - } - - select { - case <-goroutineManager.Context().Done(): - if err := goroutineManager.Context().Err(); err != nil { - panic(errors.Join(ErrMounterContextCancelled, err)) - } - - return - case <-allRemoteDevicesReady: - break - } - - for _, input := range migratedMounter.stage2Inputs { - migratedMounter.Devices = append(migratedMounter.Devices, MigratedDevice{ - Name: input.name, - Path: filepath.Join("/dev", input.device.Device()), - }) - } - - return -} diff --git a/pkg/mounter/migrate_to.go b/pkg/mounter/migrate_to.go deleted file mode 100644 index e83efd0..0000000 --- a/pkg/mounter/migrate_to.go +++ /dev/null @@ -1,406 +0,0 @@ -package mounter - -import ( - "context" - "errors" - "io" - "sync" - "sync/atomic" - "time" - - iutils "github.com/loopholelabs/drafter/internal/utils" - "github.com/loopholelabs/drafter/pkg/registry" - "github.com/loopholelabs/goroutine-manager/pkg/manager" - "github.com/loopholelabs/silo/pkg/storage" - "github.com/loopholelabs/silo/pkg/storage/migrator" - "github.com/loopholelabs/silo/pkg/storage/protocol" - "github.com/loopholelabs/silo/pkg/storage/protocol/packets" -) - -type MounterMigrateToHooks struct { - OnBeforeGetDirtyBlocks func(deviceID uint32, remote bool) - - OnDeviceSent func(deviceID uint32, remote bool) - OnDeviceAuthoritySent func(deviceID uint32, remote bool) - OnDeviceInitialMigrationProgress func(deviceID uint32, remote bool, ready int, total int) - OnDeviceContinousMigrationProgress func(deviceID uint32, remote bool, delta int) - OnDeviceFinalMigrationProgress func(deviceID uint32, remote bool, delta int) - OnDeviceMigrationCompleted func(deviceID uint32, remote bool) - - OnAllDevicesSent func() - OnAllMigrationsCompleted func() -} - -type MigrateToDevice struct { - Name string `json:"name"` - - MaxDirtyBlocks int `json:"maxDirtyBlocks"` - MinCycles int `json:"minCycles"` - MaxCycles int `json:"maxCycles"` - - CycleThrottle time.Duration `json:"cycleThrottle"` -} - -type MigratableMounter struct { - Close func() - - stage4Inputs []makeMigratableDeviceStage -} - -func (migratableMounter *MigratableMounter) MigrateTo( - ctx context.Context, - - devices []MigrateToDevice, - - concurrency int, - - readers []io.Reader, - writers []io.Writer, - - hooks MounterMigrateToHooks, -) (errs error) { - goroutineManager := manager.NewGoroutineManager( - ctx, - &errs, - manager.GoroutineManagerHooks{}, - ) - defer goroutineManager.Wait() - defer goroutineManager.StopAllGoroutines() - defer goroutineManager.CreateBackgroundPanicCollector()() - - pro := protocol.NewRW( - goroutineManager.Context(), - readers, - writers, - nil, - ) - - goroutineManager.StartForegroundGoroutine(func(_ context.Context) { - if err := pro.Handle(); err != nil && !errors.Is(err, io.EOF) { - panic(errors.Join(registry.ErrCouldNotHandleProtocol, err)) - } - }) - - var ( - devicesLeftToSend atomic.Int32 - devicesLeftToTransferAuthorityFor atomic.Int32 - - suspendedVMLock sync.Mutex - suspendedVM bool - ) - - suspendedVMCh := make(chan struct{}) - - suspendAndMsyncVM := sync.OnceValue(func() error { - suspendedVMLock.Lock() - suspendedVM = true - suspendedVMLock.Unlock() - - close(suspendedVMCh) // We can safely close() this channel since the caller only runs once/is `sync.OnceFunc`d - - return nil - }) - - stage5Inputs := []migrateToStage{} - for _, input := range migratableMounter.stage4Inputs { - var migrateToDevice *MigrateToDevice - for _, device := range devices { - if device.Name == input.prev.prev.name { - migrateToDevice = &device - - break - } - } - - // We don't want to serve this device - if migrateToDevice == nil { - continue - } - - stage5Inputs = append(stage5Inputs, migrateToStage{ - prev: input, - - migrateToDevice: *migrateToDevice, - }) - } - - _, deferFuncs, err := iutils.ConcurrentMap( - stage5Inputs, - func(index int, input migrateToStage, _ *struct{}, _ func(deferFunc func() error)) error { - to := protocol.NewToProtocol(input.prev.storage.Size(), uint32(index), pro) - - if err := to.SendDevInfo(input.prev.prev.prev.name, input.prev.prev.prev.blockSize, ""); err != nil { - return errors.Join(ErrCouldNotSendDevInfo, err) - } - - if hook := hooks.OnDeviceSent; hook != nil { - hook(uint32(index), input.prev.prev.prev.remote) - } - - devicesLeftToSend.Add(1) - if devicesLeftToSend.Load() >= int32(len(stage5Inputs)) { - goroutineManager.StartForegroundGoroutine(func(_ context.Context) { - if err := to.SendEvent(&packets.Event{ - Type: packets.EventCustom, - CustomType: byte(registry.EventCustomAllDevicesSent), - }); err != nil { - panic(errors.Join(ErrCouldNotSendAllDevicesSentEvent, err)) - } - - if hook := hooks.OnAllDevicesSent; hook != nil { - hook() - } - }) - } - - goroutineManager.StartForegroundGoroutine(func(_ context.Context) { - if err := to.HandleNeedAt(func(offset int64, length int32) { - // Prioritize blocks - endOffset := uint64(offset + int64(length)) - if endOffset > uint64(input.prev.storage.Size()) { - endOffset = uint64(input.prev.storage.Size()) - } - - startBlock := int(offset / int64(input.prev.prev.prev.blockSize)) - endBlock := int((endOffset-1)/uint64(input.prev.prev.prev.blockSize)) + 1 - for b := startBlock; b < endBlock; b++ { - input.prev.orderer.PrioritiseBlock(b) - } - }); err != nil { - panic(errors.Join(registry.ErrCouldNotHandleNeedAt, err)) - } - }) - - goroutineManager.StartForegroundGoroutine(func(_ context.Context) { - if err := to.HandleDontNeedAt(func(offset int64, length int32) { - // Deprioritize blocks - endOffset := uint64(offset + int64(length)) - if endOffset > uint64(input.prev.storage.Size()) { - endOffset = uint64(input.prev.storage.Size()) - } - - startBlock := int(offset / int64(input.prev.storage.Size())) - endBlock := int((endOffset-1)/uint64(input.prev.storage.Size())) + 1 - for b := startBlock; b < endBlock; b++ { - input.prev.orderer.Remove(b) - } - }); err != nil { - panic(errors.Join(registry.ErrCouldNotHandleDontNeedAt, err)) - } - }) - - cfg := migrator.NewConfig().WithBlockSize(int(input.prev.prev.prev.blockSize)) - cfg.Concurrency = map[int]int{ - storage.BlockTypeAny: concurrency, - storage.BlockTypeStandard: concurrency, - storage.BlockTypeDirty: concurrency, - storage.BlockTypePriority: concurrency, - } - cfg.LockerHandler = func() { - defer goroutineManager.CreateBackgroundPanicCollector()() - - if err := to.SendEvent(&packets.Event{ - Type: packets.EventPreLock, - }); err != nil { - panic(errors.Join(ErrCouldNotSendPreLockEvent, err)) - } - - input.prev.storage.Lock() - - if err := to.SendEvent(&packets.Event{ - Type: packets.EventPostLock, - }); err != nil { - panic(errors.Join(ErrCouldNotSendPostLockEvent, err)) - } - } - cfg.UnlockerHandler = func() { - defer goroutineManager.CreateBackgroundPanicCollector()() - - if err := to.SendEvent(&packets.Event{ - Type: packets.EventPreUnlock, - }); err != nil { - panic(errors.Join(ErrCouldNotSendPreUnlockEvent, err)) - } - - input.prev.storage.Unlock() - - if err := to.SendEvent(&packets.Event{ - Type: packets.EventPostUnlock, - }); err != nil { - panic(errors.Join(ErrCouldNotSendPostUnlockEvent, err)) - } - } - cfg.ErrorHandler = func(b *storage.BlockInfo, err error) { - defer goroutineManager.CreateBackgroundPanicCollector()() - - if err != nil { - panic(errors.Join(registry.ErrCouldNotContinueWithMigration, err)) - } - } - cfg.ProgressHandler = func(p *migrator.MigrationProgress) { - if hook := hooks.OnDeviceInitialMigrationProgress; hook != nil { - hook(uint32(index), input.prev.prev.prev.remote, p.ReadyBlocks, p.TotalBlocks) - } - } - - mig, err := migrator.NewMigrator(input.prev.dirtyRemote, to, input.prev.orderer, cfg) - if err != nil { - return errors.Join(registry.ErrCouldNotCreateMigrator, err) - } - - if err := mig.Migrate(input.prev.totalBlocks); err != nil { - return errors.Join(ErrCouldNotMigrateBlocks, err) - } - - if err := mig.WaitForCompletion(); err != nil { - return errors.Join(registry.ErrCouldNotWaitForMigrationCompletion, err) - } - - markDeviceAsReadyForAuthorityTransfer := sync.OnceFunc(func() { - devicesLeftToTransferAuthorityFor.Add(1) - }) - - var ( - cyclesBelowDirtyBlockTreshold = 0 - totalCycles = 0 - ongoingMigrationsWg sync.WaitGroup - ) - for { - ongoingMigrationsWg.Wait() - - if hook := hooks.OnBeforeGetDirtyBlocks; hook != nil { - hook(uint32(index), input.prev.prev.prev.remote) - } - - blocks := mig.GetLatestDirty() - if blocks == nil { - mig.Unlock() - - suspendedVMLock.Lock() - if suspendedVM { - suspendedVMLock.Unlock() - - break - } - suspendedVMLock.Unlock() - } - - if blocks != nil { - if err := to.DirtyList(int(input.prev.prev.prev.blockSize), blocks); err != nil { - return errors.Join(ErrCouldNotSendDirtyList, err) - } - - ongoingMigrationsWg.Add(1) - goroutineManager.StartForegroundGoroutine(func(_ context.Context) { - defer ongoingMigrationsWg.Done() - - if err := mig.MigrateDirty(blocks); err != nil { - panic(errors.Join(ErrCouldNotMigrateDirtyBlocks, err)) - } - - suspendedVMLock.Lock() - defer suspendedVMLock.Unlock() - - if suspendedVM { - if hook := hooks.OnDeviceFinalMigrationProgress; hook != nil { - hook(uint32(index), input.prev.prev.prev.remote, len(blocks)) - } - } else { - if hook := hooks.OnDeviceContinousMigrationProgress; hook != nil { - hook(uint32(index), input.prev.prev.prev.remote, len(blocks)) - } - } - }) - } - - suspendedVMLock.Lock() - if !suspendedVM && !(devicesLeftToTransferAuthorityFor.Load() >= int32(len(stage5Inputs))) { - suspendedVMLock.Unlock() - - // We use the background context here instead of the internal context because we want to distinguish - // between a context cancellation from the outside and getting a response - cycleThrottleCtx, cancelCycleThrottleCtx := context.WithTimeout(context.Background(), input.migrateToDevice.CycleThrottle) - defer cancelCycleThrottleCtx() - - select { - case <-cycleThrottleCtx.Done(): - break - - case <-suspendedVMCh: - break - - case <-goroutineManager.Context().Done(): // ctx is the goroutineManager.Context() here - if err := goroutineManager.Context().Err(); err != nil { - return errors.Join(ErrMounterContextCancelled, err) - } - - return nil - } - } else { - suspendedVMLock.Unlock() - } - - totalCycles++ - if len(blocks) < input.migrateToDevice.MaxDirtyBlocks { - cyclesBelowDirtyBlockTreshold++ - if cyclesBelowDirtyBlockTreshold > input.migrateToDevice.MinCycles { - markDeviceAsReadyForAuthorityTransfer() - } - } else if totalCycles > input.migrateToDevice.MaxCycles { - markDeviceAsReadyForAuthorityTransfer() - } else { - cyclesBelowDirtyBlockTreshold = 0 - } - - if devicesLeftToTransferAuthorityFor.Load() >= int32(len(stage5Inputs)) { - if err := suspendAndMsyncVM(); err != nil { - return errors.Join(ErrCouldNotSuspendAndMsyncVM, err) - } - } - } - - if err := to.SendEvent(&packets.Event{ - Type: packets.EventCustom, - CustomType: byte(registry.EventCustomTransferAuthority), - }); err != nil { - panic(errors.Join(ErrCouldNotSendTransferAuthorityEvent, err)) - } - - if hook := hooks.OnDeviceAuthoritySent; hook != nil { - hook(uint32(index), input.prev.prev.prev.remote) - } - - if err := mig.WaitForCompletion(); err != nil { - return errors.Join(registry.ErrCouldNotWaitForMigrationCompletion, err) - } - - if err := to.SendEvent(&packets.Event{ - Type: packets.EventCompleted, - }); err != nil { - return errors.Join(ErrCouldNotSendCompletedEvent, err) - } - - if hook := hooks.OnDeviceMigrationCompleted; hook != nil { - hook(uint32(index), input.prev.prev.prev.remote) - } - - return nil - }, - ) - - if err != nil { - panic(errors.Join(ErrCouldNotMigrateToDevice, err)) - } - - for _, deferFuncs := range deferFuncs { - for _, deferFunc := range deferFuncs { - defer deferFunc() // We can safely ignore errors here since we never call `addDefer` with a function that could return an error - } - } - - if hook := hooks.OnAllMigrationsCompleted; hook != nil { - hook() - } - - return -} diff --git a/pkg/mounter/migrated_mounter.go b/pkg/mounter/migrated_mounter.go new file mode 100644 index 0000000..16e82cc --- /dev/null +++ b/pkg/mounter/migrated_mounter.go @@ -0,0 +1,227 @@ +package mounter + +import ( + "context" + "fmt" + "io" + "path" + "sync" + "time" + + "github.com/loopholelabs/drafter/pkg/common" + "github.com/loopholelabs/logging/types" + "github.com/loopholelabs/silo/pkg/storage/config" + "github.com/loopholelabs/silo/pkg/storage/devicegroup" + "github.com/loopholelabs/silo/pkg/storage/metrics" +) + +type MigratedDevice struct { + Name string `json:"name"` + Path string `json:"path"` +} + +type MakeMigratableDevice struct { + Name string `json:"name"` + + Expiry time.Duration `json:"expiry"` +} +type MigratedMounter struct { + Devices []MigratedDevice + + Wait func() error + Close func() error + + Dg *devicegroup.DeviceGroup + DgLock sync.Mutex +} + +type MigrateFromAndMountDevice struct { + Name string `json:"name"` + + Base string `json:"base"` + Overlay string `json:"overlay"` + State string `json:"state"` + + BlockSize uint32 `json:"blockSize"` +} + +type MigrateFromHooks struct { + OnRemoteDeviceReceived func(remoteDeviceID uint32, name string) + OnRemoteDeviceExposed func(remoteDeviceID uint32, path string) + OnRemoteDeviceAuthorityReceived func(remoteDeviceID uint32) + OnRemoteDeviceMigrationCompleted func(remoteDeviceID uint32) + + OnRemoteAllDevicesReceived func() + OnRemoteAllMigrationsCompleted func() + + OnLocalDeviceRequested func(localDeviceID uint32, name string) + OnLocalDeviceExposed func(localDeviceID uint32, path string) + + OnLocalAllDevicesRequested func() + + OnXferCustomData func([]byte) +} + +func MigrateFromAndMount( + mounterCtx context.Context, + migrateFromCtx context.Context, + + devices []MigrateFromAndMountDevice, + + readers []io.Reader, + writers []io.Writer, + + hooks MigrateFromHooks, +) ( + migratedMounter *MigratedMounter, + + errs error, +) { + + // TODO: Pass these in + // TODO: This schema tweak function should be exposed / passed in + var log types.Logger + var met metrics.SiloMetrics + tweakRemote := func(index int, name string, schema *config.DeviceSchema) *config.DeviceSchema { + + for _, d := range devices { + if d.Name == schema.Name { + // Convert it... + m := &common.MigrateFromDevice{ + Name: d.Name, + Base: d.Base, + Overlay: d.Overlay, + State: d.State, + BlockSize: d.BlockSize, + Shared: false, + } + newSchema, err := common.CreateIncomingSiloDevSchema(m, schema) + if err == nil { + fmt.Printf("Tweaked schema %s\n", newSchema.EncodeAsBlock()) + return newSchema + } + } + } + + // FIXME: Error. We didn't find the local device, or couldn't set it up. + + fmt.Printf("ERROR, didn't find local device defined %s\n", name) + + return schema + } + // TODO: Add the sync stuff here... + tweakLocal := func(index int, name string, schema *config.DeviceSchema) *config.DeviceSchema { + return schema + } + + migratedMounter = &MigratedMounter{ + Devices: []MigratedDevice{}, + + Wait: func() error { + return nil + }, + Close: func() error { + return nil + }, + } + + migratedMounter.Close = func() (errs error) { + + // Close any Silo devices + migratedMounter.DgLock.Lock() + if migratedMounter.Dg != nil { + err := migratedMounter.Dg.CloseAll() + if err != nil { + migratedMounter.DgLock.Unlock() + return err + } + } + migratedMounter.DgLock.Unlock() + return nil + } + + // Migrate the devices from a protocol + if len(readers) > 0 && len(writers) > 0 { + protocolCtx, cancelProtocolCtx := context.WithCancel(migrateFromCtx) + + dg, err := common.MigrateFromPipe(log, met, "", protocolCtx, readers, writers, tweakRemote, hooks.OnXferCustomData) + if err != nil { + return nil, err + } + + migratedMounter.Wait = sync.OnceValue(func() error { + defer cancelProtocolCtx() + + if dg != nil { + err := dg.WaitForCompletion() + if err != nil { + return err + } + } + return nil + }) + + // Save dg for future migrations, AND for things like reading config + migratedMounter.DgLock.Lock() + migratedMounter.Dg = dg + migratedMounter.DgLock.Unlock() + } + + // + // IF all devices are local + // + + if len(readers) == 0 && len(writers) == 0 { + newDevices := make([]common.MigrateFromDevice, 0) + for _, d := range devices { + newDevices = append(newDevices, common.MigrateFromDevice{ + Name: d.Name, + Base: d.Base, + Overlay: d.Overlay, + State: d.State, + BlockSize: d.BlockSize, + Shared: false, + }) + } + + dg, err := common.MigrateFromFS(log, met, "", newDevices, tweakLocal) + if err != nil { + return nil, err + } + + // Save dg for later usage, when we want to migrate from here etc + migratedMounter.DgLock.Lock() + migratedMounter.Dg = dg + migratedMounter.DgLock.Unlock() + + if hook := hooks.OnLocalAllDevicesRequested; hook != nil { + hook() + } + } + + migratedMounter.DgLock.Lock() + lookupDg := migratedMounter.Dg + migratedMounter.DgLock.Unlock() + if lookupDg != nil { + for i, d := range devices { + if hooks.OnLocalDeviceExposed != nil { + exp := lookupDg.GetExposedDeviceByName(d.Name) + if exp != nil { + hooks.OnLocalDeviceExposed(uint32(i), path.Join("/dev", exp.Device())) + } + } + } + } + + return +} + +func (migratedMounter *MigratedMounter) MakeMigratable( + ctx context.Context, + devices []MakeMigratableDevice, +) (migratableMounter *MigratableMounter, errs error) { + return &MigratableMounter{ + Dg: migratedMounter.Dg, + Close: func() {}, + }, nil +} diff --git a/pkg/mounter/stages.go b/pkg/mounter/stages.go deleted file mode 100644 index dc44a5e..0000000 --- a/pkg/mounter/stages.go +++ /dev/null @@ -1,41 +0,0 @@ -package mounter - -import ( - "github.com/loopholelabs/silo/pkg/storage" - "github.com/loopholelabs/silo/pkg/storage/blocks" - "github.com/loopholelabs/silo/pkg/storage/dirtytracker" - "github.com/loopholelabs/silo/pkg/storage/modules" -) - -type migrateFromAndMountStage struct { - name string - - blockSize uint32 - - id uint32 - remote bool - - storage storage.Provider - device storage.ExposedStorage -} - -type makeMigratableFilterStage struct { - prev migrateFromAndMountStage - - makeMigratableDevice MakeMigratableDevice -} - -type makeMigratableDeviceStage struct { - prev makeMigratableFilterStage - - storage *modules.Lockable - orderer *blocks.PriorityBlockOrder - totalBlocks int - dirtyRemote *dirtytracker.Remote -} - -type migrateToStage struct { - prev makeMigratableDeviceStage - - migrateToDevice MigrateToDevice -} diff --git a/pkg/peer/resumed_peer.go b/pkg/peer/resumed_peer.go index d49ce37..f0582cb 100644 --- a/pkg/peer/resumed_peer.go +++ b/pkg/peer/resumed_peer.go @@ -7,7 +7,6 @@ import ( "github.com/loopholelabs/drafter/pkg/common" "github.com/loopholelabs/drafter/pkg/ipc" - "github.com/loopholelabs/drafter/pkg/mounter" "github.com/loopholelabs/drafter/pkg/runner" "github.com/loopholelabs/silo/pkg/storage/devicegroup" "github.com/loopholelabs/silo/pkg/storage/migrator" @@ -45,7 +44,7 @@ type MigrateToHooks struct { */ func (migratablePeer *ResumedPeer[L, R, G]) MigrateTo( ctx context.Context, - devices []mounter.MigrateToDevice, + devices []common.MigrateToDevice, suspendTimeout time.Duration, concurrency int, readers []io.Reader,