From d2fbedf9984d9555351c9fc53c8f59858ca30f2a Mon Sep 17 00:00:00 2001 From: Felicitas Pojtinger Date: Fri, 6 Sep 2024 20:38:37 -0700 Subject: [PATCH 1/6] feat: Allow passing in custom service definitions into IP agent and server instead of hard-coded hooks Signed-off-by: Felicitas Pojtinger --- cmd/drafter-agent/main.go | 4 +-- internal/remotes/agent.go | 8 ------ pkg/ipc/agent_client.go | 53 +++++++++++++++++++++++++++++---------- pkg/ipc/agent_server.go | 40 ++++++++++++++++++++--------- pkg/runner/resume.go | 8 +++--- pkg/snapshotter/create.go | 6 +++-- 6 files changed, 79 insertions(+), 40 deletions(-) delete mode 100644 internal/remotes/agent.go diff --git a/cmd/drafter-agent/main.go b/cmd/drafter-agent/main.go index 4a58c1b..06e301f 100644 --- a/cmd/drafter-agent/main.go +++ b/cmd/drafter-agent/main.go @@ -55,7 +55,7 @@ func main() { cancel() }() - agentClient := ipc.NewAgentClient( + agentClient := ipc.NewHookAgentClient( func(ctx context.Context) error { log.Println("Running pre-suspend command") @@ -95,7 +95,7 @@ func main() { dialCtx, cancelDialCtx := context.WithTimeout(goroutineManager.Context(), *vsockTimeout) defer cancelDialCtx() - connectedAgentClient, err := ipc.StartAgentClient( + connectedAgentClient, err := ipc.StartAgentClient[ipc.AgentClientLocal, struct{}]( dialCtx, goroutineManager.Context(), diff --git a/internal/remotes/agent.go b/internal/remotes/agent.go deleted file mode 100644 index 6f0a84a..0000000 --- a/internal/remotes/agent.go +++ /dev/null @@ -1,8 +0,0 @@ -package remotes - -import "context" - -type AgentRemote struct { - BeforeSuspend func(ctx context.Context) error - AfterResume func(ctx context.Context) error -} diff --git a/pkg/ipc/agent_client.go b/pkg/ipc/agent_client.go index d663182..25a19f8 100644 --- a/pkg/ipc/agent_client.go +++ b/pkg/ipc/agent_client.go @@ -19,44 +19,57 @@ var ( ErrAgentContextCancelled = errors.New("agent context cancelled") ) -type AgentClient struct { +// The RPCs the agent server can call on this client +// See https://github.com/pojntfx/panrpc/tree/main?tab=readme-ov-file#5-calling-the-clients-rpcs-from-the-server +type AgentClientLocal interface { + BeforeSuspend(ctx context.Context) error + AfterResume(ctx context.Context) error +} + +// The RPCs this client can call on the agent server +// See https://github.com/pojntfx/panrpc/tree/main?tab=readme-ov-file#4-calling-the-servers-rpcs-from-the-client +type AgentClientRemote any + +type HookAgentClient struct { beforeSuspend func(ctx context.Context) error afterResume func(ctx context.Context) error } -func NewAgentClient( +func NewHookAgentClient( beforeSuspend func(ctx context.Context) error, afterResume func(ctx context.Context) error, -) *AgentClient { - return &AgentClient{ +) *HookAgentClient { + return &HookAgentClient{ beforeSuspend: beforeSuspend, afterResume: afterResume, } } -func (l *AgentClient) BeforeSuspend(ctx context.Context) error { +func (l *HookAgentClient) BeforeSuspend(ctx context.Context) error { return l.beforeSuspend(ctx) } -func (l *AgentClient) AfterResume(ctx context.Context) error { +func (l *HookAgentClient) AfterResume(ctx context.Context) error { return l.afterResume(ctx) } -type ConnectedAgentClient struct { +type ConnectedAgentClient[L AgentClientLocal, R AgentClientRemote] struct { + Remote R + Wait func() error Close func() } -func StartAgentClient( +func StartAgentClient[L AgentClientLocal, R AgentClientRemote]( dialCtx context.Context, remoteCtx context.Context, vsockCID uint32, vsockPort uint32, - agentClient *AgentClient, -) (connectedAgentClient *ConnectedAgentClient, errs error) { - connectedAgentClient = &ConnectedAgentClient{ + agentClientLocal L, +) (connectedAgentClient *ConnectedAgentClient[L, R], errs error) { + connectedAgentClient = &ConnectedAgentClient[L, R]{ Wait: func() error { return nil }, @@ -120,8 +133,8 @@ func StartAgentClient( } }) - registry := rpc.NewRegistry[struct{}, json.RawMessage]( - agentClient, + registry := rpc.NewRegistry[R, json.RawMessage]( + agentClientLocal, &rpc.RegistryHooks{ OnClientConnect: func(remoteID string) { @@ -199,5 +212,19 @@ func StartAgentClient( break } + found := false + if err := registry.ForRemotes(func(remoteID string, r R) error { + connectedAgentClient.Remote = r + found = true + + return nil + }); err != nil { + panic(err) + } + + if !found { + panic(ErrNoRemoteFound) + } + return } diff --git a/pkg/ipc/agent_server.go b/pkg/ipc/agent_server.go index 5f94362..6ca0971 100644 --- a/pkg/ipc/agent_server.go +++ b/pkg/ipc/agent_server.go @@ -9,7 +9,6 @@ import ( "os" "sync" - "github.com/loopholelabs/drafter/internal/remotes" "github.com/loopholelabs/goroutine-manager/pkg/manager" "github.com/pojntfx/panrpc/go/pkg/rpc" ) @@ -23,7 +22,18 @@ var ( ErrCouldNotLinkRegistry = errors.New("could not link registry") ) -type AgentServer struct { +// The RPCs the agent client can call on this server +// See https://github.com/pojntfx/panrpc/tree/main?tab=readme-ov-file#5-calling-the-clients-rpcs-from-the-server +type AgentServerLocal any + +// The RPCs this server can call on the agent client +// See https://github.com/pojntfx/panrpc/tree/main?tab=readme-ov-file#4-calling-the-servers-rpcs-from-the-client +type AgentServerRemote struct { + BeforeSuspend func(ctx context.Context) error + AfterResume func(ctx context.Context) error +} + +type AgentServer[L AgentServerLocal, R AgentServerRemote] struct { VSockPath string Close func() @@ -32,18 +42,24 @@ type AgentServer struct { closed bool closeLock sync.Mutex + + agentServerLocal L } -func StartAgentServer( +func StartAgentServer[L AgentServerLocal, R AgentServerRemote]( vsockPath string, vsockPort uint32, + + agentServerLocal L, ) ( - agentServer *AgentServer, + agentServer *AgentServer[L, R], err error, ) { - agentServer = &AgentServer{ + agentServer = &AgentServer[L, R]{ Close: func() {}, + + agentServerLocal: agentServerLocal, } agentServer.VSockPath = fmt.Sprintf("%s_%d", vsockPath, vsockPort) @@ -68,15 +84,15 @@ func StartAgentServer( return } -type AcceptingAgentServer struct { - Remote remotes.AgentRemote +type AcceptingAgentServer[L AgentServerLocal, R AgentServerRemote] struct { + Remote R Wait func() error Close func() error } -func (agentServer *AgentServer) Accept(acceptCtx context.Context, remoteCtx context.Context) (acceptingAgentServer *AcceptingAgentServer, errs error) { - acceptingAgentServer = &AcceptingAgentServer{ +func (agentServer *AgentServer[L, R]) Accept(acceptCtx context.Context, remoteCtx context.Context) (acceptingAgentServer *AcceptingAgentServer[L, R], errs error) { + acceptingAgentServer = &AcceptingAgentServer[L, R]{ Wait: func() error { return nil }, @@ -170,8 +186,8 @@ func (agentServer *AgentServer) Accept(acceptCtx context.Context, remoteCtx cont } }) - registry := rpc.NewRegistry[remotes.AgentRemote, json.RawMessage]( - &struct{}{}, + registry := rpc.NewRegistry[R, json.RawMessage]( + agentServer.agentServerLocal, &rpc.RegistryHooks{ OnClientConnect: func(remoteID string) { @@ -246,7 +262,7 @@ func (agentServer *AgentServer) Accept(acceptCtx context.Context, remoteCtx cont } found := false - if err := registry.ForRemotes(func(remoteID string, r remotes.AgentRemote) error { + if err := registry.ForRemotes(func(remoteID string, r R) error { acceptingAgentServer.Remote = r found = true diff --git a/pkg/runner/resume.go b/pkg/runner/resume.go index 907d712..9304b30 100644 --- a/pkg/runner/resume.go +++ b/pkg/runner/resume.go @@ -25,8 +25,8 @@ type ResumedRunner struct { runner *Runner - agent *ipc.AgentServer - acceptingAgent *ipc.AcceptingAgentServer + agent *ipc.AgentServer[struct{}, ipc.AgentServerRemote] + acceptingAgent *ipc.AcceptingAgentServer[struct{}, ipc.AgentServerRemote] createSnapshot func(ctx context.Context) error } @@ -191,9 +191,11 @@ func (runner *Runner) Resume( }) var err error - resumedRunner.agent, err = ipc.StartAgentServer( + resumedRunner.agent, err = ipc.StartAgentServer[struct{}, ipc.AgentServerRemote]( filepath.Join(runner.server.VMPath, snapshotter.VSockName), uint32(agentVSockPort), + + struct{}{}, ) if err != nil { panic(errors.Join(snapshotter.ErrCouldNotStartAgentServer, err)) diff --git a/pkg/snapshotter/create.go b/pkg/snapshotter/create.go index edf46bf..ab35f9e 100644 --- a/pkg/snapshotter/create.go +++ b/pkg/snapshotter/create.go @@ -141,9 +141,11 @@ func CreateSnapshot( panic(errors.Join(ErrCouldNotChownLivenessServerVSock, err)) } - agent, err := ipc.StartAgentServer( + agent, err := ipc.StartAgentServer[struct{}, ipc.AgentServerRemote]( filepath.Join(server.VMPath, VSockName), uint32(agentConfiguration.AgentVSockPort), + + struct{}{}, ) if err != nil { panic(errors.Join(ErrCouldNotStartAgentServer, err)) @@ -247,7 +249,7 @@ func CreateSnapshot( } } - var acceptingAgent *ipc.AcceptingAgentServer + var acceptingAgent *ipc.AcceptingAgentServer[struct{}, ipc.AgentServerRemote] { acceptCtx, cancel := context.WithTimeout(goroutineManager.Context(), agentConfiguration.ResumeTimeout) defer cancel() From 8cebe5dcd8d3e0bc3c5404299e264223908c0ffa Mon Sep 17 00:00:00 2001 From: Felicitas Pojtinger Date: Fri, 6 Sep 2024 21:37:42 -0700 Subject: [PATCH 2/6] feat: Allow passing custom service definitions all the way up to the peer and runner APIs Signed-off-by: Felicitas Pojtinger --- cmd/drafter-peer/main.go | 9 ++++++--- cmd/drafter-runner/main.go | 5 ++++- pkg/peer/make_migratable.go | 13 ++++++++----- pkg/peer/migrate_from.go | 15 ++++++++------- pkg/peer/migrate_to.go | 9 +++++---- pkg/peer/resume.go | 18 ++++++++++++------ pkg/peer/start.go | 13 +++++++------ pkg/peer/suspend.go | 2 +- pkg/runner/msync.go | 2 +- pkg/runner/resume.go | 29 +++++++++++++++++++---------- pkg/runner/start.go | 9 +++++---- pkg/runner/suspend.go | 9 +++++++-- 12 files changed, 83 insertions(+), 50 deletions(-) diff --git a/cmd/drafter-peer/main.go b/cmd/drafter-peer/main.go index 1e74389..bd8f9c0 100644 --- a/cmd/drafter-peer/main.go +++ b/cmd/drafter-peer/main.go @@ -16,6 +16,7 @@ import ( "sync" "time" + "github.com/loopholelabs/drafter/pkg/ipc" "github.com/loopholelabs/drafter/pkg/mounter" "github.com/loopholelabs/drafter/pkg/packager" "github.com/loopholelabs/drafter/pkg/peer" @@ -277,7 +278,7 @@ func main() { writers = []io.Writer{conn} } - p, err := peer.StartPeer( + p, err := peer.StartPeer[struct{}, ipc.AgentServerRemote]( goroutineManager.Context(), context.Background(), // Never give up on rescue operations @@ -328,9 +329,9 @@ func main() { } }) - migrateFromDevices := []peer.MigrateFromDevice{} + migrateFromDevices := []peer.MigrateFromDevice[struct{}, ipc.AgentServerRemote]{} for _, device := range devices { - migrateFromDevices = append(migrateFromDevices, peer.MigrateFromDevice{ + migrateFromDevices = append(migrateFromDevices, peer.MigrateFromDevice[struct{}, ipc.AgentServerRemote]{ Name: device.Name, Base: device.Base, @@ -419,6 +420,8 @@ func main() { *resumeTimeout, *rescueTimeout, + struct{}{}, + runner.SnapshotLoadConfiguration{ ExperimentalMapPrivate: *experimentalMapPrivate, diff --git a/cmd/drafter-runner/main.go b/cmd/drafter-runner/main.go index d91014a..2d7cda6 100644 --- a/cmd/drafter-runner/main.go +++ b/cmd/drafter-runner/main.go @@ -13,6 +13,7 @@ import ( "syscall" "time" + "github.com/loopholelabs/drafter/pkg/ipc" "github.com/loopholelabs/drafter/pkg/packager" "github.com/loopholelabs/drafter/pkg/peer" "github.com/loopholelabs/drafter/pkg/runner" @@ -174,7 +175,7 @@ func main() { cancel() }() - r, err := runner.StartRunner( + r, err := runner.StartRunner[struct{}, ipc.AgentServerRemote]( goroutineManager.Context(), context.Background(), // Never give up on rescue operations @@ -303,6 +304,8 @@ func main() { *rescueTimeout, packageConfig.AgentVSockPort, + struct{}{}, + runner.SnapshotLoadConfiguration{ ExperimentalMapPrivate: *experimentalMapPrivate, diff --git a/pkg/peer/make_migratable.go b/pkg/peer/make_migratable.go index 2655b57..ab2d565 100644 --- a/pkg/peer/make_migratable.go +++ b/pkg/peer/make_migratable.go @@ -5,6 +5,7 @@ import ( "errors" "github.com/loopholelabs/drafter/internal/utils" + "github.com/loopholelabs/drafter/pkg/ipc" "github.com/loopholelabs/drafter/pkg/mounter" "github.com/loopholelabs/drafter/pkg/runner" "github.com/loopholelabs/goroutine-manager/pkg/manager" @@ -14,21 +15,23 @@ import ( "github.com/loopholelabs/silo/pkg/storage/volatilitymonitor" ) -type ResumedPeer struct { +type ResumedPeer[L ipc.AgentServerLocal, R ipc.AgentServerRemote] struct { + Remote R + Wait func() error Close func() error - resumedRunner *runner.ResumedRunner + resumedRunner *runner.ResumedRunner[L, R] stage2Inputs []migrateFromStage } -func (resumedPeer *ResumedPeer) MakeMigratable( +func (resumedPeer *ResumedPeer[L, R]) MakeMigratable( ctx context.Context, devices []mounter.MakeMigratableDevice, -) (migratablePeer *MigratablePeer, errs error) { - migratablePeer = &MigratablePeer{ +) (migratablePeer *MigratablePeer[L, R], errs error) { + migratablePeer = &MigratablePeer[L, R]{ Close: func() {}, resumedPeer: resumedPeer, diff --git a/pkg/peer/migrate_from.go b/pkg/peer/migrate_from.go index de476bb..02f147b 100644 --- a/pkg/peer/migrate_from.go +++ b/pkg/peer/migrate_from.go @@ -14,6 +14,7 @@ import ( "syscall" "github.com/loopholelabs/drafter/internal/utils" + "github.com/loopholelabs/drafter/pkg/ipc" "github.com/loopholelabs/drafter/pkg/mounter" "github.com/loopholelabs/drafter/pkg/registry" "github.com/loopholelabs/drafter/pkg/snapshotter" @@ -28,7 +29,7 @@ import ( "golang.org/x/sys/unix" ) -type MigrateFromDevice struct { +type MigrateFromDevice[L ipc.AgentServerLocal, R ipc.AgentServerRemote] struct { Name string `json:"name"` Base string `json:"base"` @@ -40,21 +41,21 @@ type MigrateFromDevice struct { Shared bool `json:"shared"` } -func (peer *Peer) MigrateFrom( +func (peer *Peer[L, R]) MigrateFrom( ctx context.Context, - devices []MigrateFromDevice, + devices []MigrateFromDevice[L, R], readers []io.Reader, writers []io.Writer, hooks mounter.MigrateFromHooks, ) ( - migratedPeer *MigratedPeer, + migratedPeer *MigratedPeer[L, R], errs error, ) { - migratedPeer = &MigratedPeer{ + migratedPeer = &MigratedPeer[L, R]{ Wait: func() error { return nil }, @@ -410,7 +411,7 @@ func (peer *Peer) MigrateFrom( break } - stage1Inputs := []MigrateFromDevice{} + stage1Inputs := []MigrateFromDevice[L, R]{} for _, input := range devices { if slices.ContainsFunc( migratedPeer.stage2Inputs, @@ -430,7 +431,7 @@ func (peer *Peer) MigrateFrom( _, deferFuncs, err := utils.ConcurrentMap( stage1Inputs, - func(index int, input MigrateFromDevice, _ *struct{}, addDefer func(deferFunc func() error)) error { + func(index int, input MigrateFromDevice[L, R], _ *struct{}, addDefer func(deferFunc func() error)) error { if hook := hooks.OnLocalDeviceRequested; hook != nil { hook(uint32(index), input.Name) } diff --git a/pkg/peer/migrate_to.go b/pkg/peer/migrate_to.go index 561df47..6eae3f6 100644 --- a/pkg/peer/migrate_to.go +++ b/pkg/peer/migrate_to.go @@ -9,6 +9,7 @@ import ( "time" "github.com/loopholelabs/drafter/internal/utils" + "github.com/loopholelabs/drafter/pkg/ipc" "github.com/loopholelabs/drafter/pkg/mounter" "github.com/loopholelabs/drafter/pkg/packager" "github.com/loopholelabs/drafter/pkg/registry" @@ -37,15 +38,15 @@ type MigrateToHooks struct { OnAllMigrationsCompleted func() } -type MigratablePeer struct { +type MigratablePeer[L ipc.AgentServerLocal, R ipc.AgentServerRemote] struct { Close func() - resumedPeer *ResumedPeer + resumedPeer *ResumedPeer[L, R] stage4Inputs []makeMigratableDeviceStage - resumedRunner *runner.ResumedRunner + resumedRunner *runner.ResumedRunner[L, R] } -func (migratablePeer *MigratablePeer) MigrateTo( +func (migratablePeer *MigratablePeer[L, R]) MigrateTo( ctx context.Context, devices []mounter.MigrateToDevice, diff --git a/pkg/peer/resume.go b/pkg/peer/resume.go index 8e799f3..2234e50 100644 --- a/pkg/peer/resume.go +++ b/pkg/peer/resume.go @@ -8,30 +8,33 @@ import ( "strings" "time" + "github.com/loopholelabs/drafter/pkg/ipc" "github.com/loopholelabs/drafter/pkg/packager" "github.com/loopholelabs/drafter/pkg/runner" "github.com/loopholelabs/drafter/pkg/snapshotter" ) -type MigratedPeer struct { +type MigratedPeer[L ipc.AgentServerLocal, R ipc.AgentServerRemote] struct { Wait func() error Close func() error - devices []MigrateFromDevice - runner *runner.Runner + devices []MigrateFromDevice[L, R] + runner *runner.Runner[L, R] stage2Inputs []migrateFromStage } -func (migratedPeer *MigratedPeer) Resume( +func (migratedPeer *MigratedPeer[L, R]) Resume( ctx context.Context, resumeTimeout, rescueTimeout time.Duration, + agentServerLocal L, + snapshotLoadConfiguration runner.SnapshotLoadConfiguration, -) (resumedPeer *ResumedPeer, errs error) { - resumedPeer = &ResumedPeer{ +) (resumedPeer *ResumedPeer[L, R], errs error) { + resumedPeer = &ResumedPeer[L, R]{ Wait: func() error { return nil }, @@ -73,11 +76,14 @@ func (migratedPeer *MigratedPeer) Resume( rescueTimeout, packageConfig.AgentVSockPort, + agentServerLocal, + snapshotLoadConfiguration, ) if err != nil { return nil, errors.Join(ErrCouldNotResumeRunner, err) } + resumedPeer.Remote = resumedPeer.resumedRunner.Remote resumedPeer.Wait = resumedPeer.resumedRunner.Wait resumedPeer.Close = resumedPeer.resumedRunner.Close diff --git a/pkg/peer/start.go b/pkg/peer/start.go index fdcfc58..d742bb7 100644 --- a/pkg/peer/start.go +++ b/pkg/peer/start.go @@ -4,12 +4,13 @@ import ( "context" "errors" + "github.com/loopholelabs/drafter/pkg/ipc" "github.com/loopholelabs/drafter/pkg/runner" "github.com/loopholelabs/drafter/pkg/snapshotter" "github.com/loopholelabs/goroutine-manager/pkg/manager" ) -type Peer struct { +type Peer[L ipc.AgentServerLocal, R ipc.AgentServerRemote] struct { VMPath string VMPid int @@ -18,10 +19,10 @@ type Peer struct { hypervisorCtx context.Context - runner *runner.Runner + runner *runner.Runner[L, R] } -func StartPeer( +func StartPeer[L ipc.AgentServerLocal, R ipc.AgentServerRemote]( hypervisorCtx context.Context, rescueCtx context.Context, @@ -30,11 +31,11 @@ func StartPeer( stateName string, memoryName string, ) ( - peer *Peer, + peer *Peer[L, R], errs error, ) { - peer = &Peer{ + peer = &Peer[L, R]{ hypervisorCtx: hypervisorCtx, Wait: func() error { @@ -55,7 +56,7 @@ func StartPeer( defer goroutineManager.CreateBackgroundPanicCollector()() var err error - peer.runner, err = runner.StartRunner( + peer.runner, err = runner.StartRunner[L, R]( hypervisorCtx, rescueCtx, diff --git a/pkg/peer/suspend.go b/pkg/peer/suspend.go index 322bdb0..bc5a244 100644 --- a/pkg/peer/suspend.go +++ b/pkg/peer/suspend.go @@ -5,7 +5,7 @@ import ( "time" ) -func (resumedPeer *ResumedPeer) SuspendAndCloseAgentServer(ctx context.Context, resumeTimeout time.Duration) error { +func (resumedPeer *ResumedPeer[L, R]) SuspendAndCloseAgentServer(ctx context.Context, resumeTimeout time.Duration) error { return resumedPeer.resumedRunner.SuspendAndCloseAgentServer( ctx, diff --git a/pkg/runner/msync.go b/pkg/runner/msync.go index 92755a5..251658b 100644 --- a/pkg/runner/msync.go +++ b/pkg/runner/msync.go @@ -8,7 +8,7 @@ import ( "github.com/loopholelabs/drafter/pkg/snapshotter" ) -func (resumedRunner *ResumedRunner) Msync(ctx context.Context) error { +func (resumedRunner *ResumedRunner[L, R]) Msync(ctx context.Context) error { if !resumedRunner.snapshotLoadConfiguration.ExperimentalMapPrivate { if err := firecracker.CreateSnapshot( ctx, diff --git a/pkg/runner/resume.go b/pkg/runner/resume.go index 9304b30..9054f9e 100644 --- a/pkg/runner/resume.go +++ b/pkg/runner/resume.go @@ -6,6 +6,7 @@ import ( "io" "os" "path/filepath" + "reflect" "time" "github.com/lithammer/shortuuid/v4" @@ -17,34 +18,38 @@ import ( "github.com/loopholelabs/goroutine-manager/pkg/manager" ) -type ResumedRunner struct { +type ResumedRunner[L ipc.AgentServerLocal, R ipc.AgentServerRemote] struct { + Remote R + Wait func() error Close func() error snapshotLoadConfiguration SnapshotLoadConfiguration - runner *Runner + runner *Runner[L, R] - agent *ipc.AgentServer[struct{}, ipc.AgentServerRemote] - acceptingAgent *ipc.AcceptingAgentServer[struct{}, ipc.AgentServerRemote] + agent *ipc.AgentServer[L, R] + acceptingAgent *ipc.AcceptingAgentServer[L, R] createSnapshot func(ctx context.Context) error } -func (runner *Runner) Resume( +func (runner *Runner[L, R]) Resume( ctx context.Context, resumeTimeout time.Duration, rescueTimeout time.Duration, agentVSockPort uint32, + agentServerLocal L, + snapshotLoadConfiguration SnapshotLoadConfiguration, ) ( - resumedRunner *ResumedRunner, + resumedRunner *ResumedRunner[L, R], errs error, ) { - resumedRunner = &ResumedRunner{ + resumedRunner = &ResumedRunner[L, R]{ Wait: func() error { return nil }, Close: func() error { return nil }, @@ -191,11 +196,11 @@ func (runner *Runner) Resume( }) var err error - resumedRunner.agent, err = ipc.StartAgentServer[struct{}, ipc.AgentServerRemote]( + resumedRunner.agent, err = ipc.StartAgentServer[L, R]( filepath.Join(runner.server.VMPath, snapshotter.VSockName), uint32(agentVSockPort), - struct{}{}, + agentServerLocal, ) if err != nil { panic(errors.Join(snapshotter.ErrCouldNotStartAgentServer, err)) @@ -234,6 +239,7 @@ func (runner *Runner) Resume( if err != nil { panic(errors.Join(ErrCouldNotAcceptAgent, err)) } + resumedRunner.Remote = resumedRunner.acceptingAgent.Remote } // We intentionally don't call `wg.Add` and `wg.Done` here since we return the process's wait method @@ -263,7 +269,10 @@ func (runner *Runner) Resume( afterResumeCtx, cancelAfterResumeCtx := context.WithTimeout(goroutineManager.Context(), resumeTimeout) defer cancelAfterResumeCtx() - if err := resumedRunner.acceptingAgent.Remote.AfterResume(afterResumeCtx); err != nil { + // This is a safe type cast because R is constrained by ipc.AgentServerRemote, so this specific AfterResume field + // must be defined or there will be a compile-time error. + // The Go Generics system can't catch this here however, it can only catch it once the type is concrete, so we need to manually cast. + if err := reflect.ValueOf(resumedRunner.acceptingAgent.Remote).Interface().(ipc.AgentServerRemote).AfterResume(afterResumeCtx); err != nil { panic(errors.Join(ErrCouldNotCallAfterResumeRPC, err)) } } diff --git a/pkg/runner/start.go b/pkg/runner/start.go index e014abe..4f70dfb 100644 --- a/pkg/runner/start.go +++ b/pkg/runner/start.go @@ -10,6 +10,7 @@ import ( "sync" "github.com/loopholelabs/drafter/internal/firecracker" + "github.com/loopholelabs/drafter/pkg/ipc" "github.com/loopholelabs/drafter/pkg/snapshotter" "github.com/loopholelabs/goroutine-manager/pkg/manager" ) @@ -21,7 +22,7 @@ type SnapshotLoadConfiguration struct { ExperimentalMapPrivateMemoryOutput string } -type Runner struct { +type Runner[L ipc.AgentServerLocal, R ipc.AgentServerRemote] struct { VMPath string VMPid int @@ -41,7 +42,7 @@ type Runner struct { rescueCtx context.Context } -func StartRunner( +func StartRunner[L ipc.AgentServerLocal, R ipc.AgentServerRemote]( hypervisorCtx context.Context, rescueCtx context.Context, @@ -50,11 +51,11 @@ func StartRunner( stateName string, memoryName string, ) ( - runner *Runner, + runner *Runner[L, R], errs error, ) { - runner = &Runner{ + runner = &Runner[L, R]{ Wait: func() error { return nil }, Close: func() error { return nil }, diff --git a/pkg/runner/suspend.go b/pkg/runner/suspend.go index 36d1df0..aed8276 100644 --- a/pkg/runner/suspend.go +++ b/pkg/runner/suspend.go @@ -3,16 +3,21 @@ package runner import ( "context" "errors" + "reflect" "time" + "github.com/loopholelabs/drafter/pkg/ipc" "github.com/loopholelabs/drafter/pkg/snapshotter" ) -func (resumedRunner *ResumedRunner) SuspendAndCloseAgentServer(ctx context.Context, suspendTimeout time.Duration) error { +func (resumedRunner *ResumedRunner[L, R]) SuspendAndCloseAgentServer(ctx context.Context, suspendTimeout time.Duration) error { suspendCtx, cancelSuspendCtx := context.WithTimeout(ctx, suspendTimeout) defer cancelSuspendCtx() - if err := resumedRunner.acceptingAgent.Remote.BeforeSuspend(suspendCtx); err != nil { + // This is a safe type cast because R is constrained by ipc.AgentServerRemote, so this specific BeforeSuspend field + // must be defined or there will be a compile-time error. + // The Go Generics system can't catch this here however, it can only catch it once the type is concrete, so we need to manually cast. + if err := reflect.ValueOf(resumedRunner.acceptingAgent.Remote).Interface().(ipc.AgentServerRemote).BeforeSuspend(suspendCtx); err != nil { return errors.Join(ErrCouldNotCallBeforeSuspendRPC, err) } From bee558b8fae09b6202d66a05b3308fe688d13520 Mon Sep 17 00:00:00 2001 From: Felicitas Pojtinger Date: Thu, 12 Sep 2024 23:54:12 -0700 Subject: [PATCH 3/6] feat: Switch over agents to dynamically extensible RPC system via new panrpc support for nested services Signed-off-by: Felicitas Pojtinger --- cmd/drafter-agent/main.go | 6 +- cmd/drafter-peer/main.go | 6 +- cmd/drafter-runner/main.go | 2 +- go.mod | 35 ++++++++++- go.sum | 115 ++++++++++++++++++++++++++++++++++++ pkg/ipc/agent_client.go | 33 ++++++----- pkg/ipc/agent_server.go | 18 +++--- pkg/peer/make_migratable.go | 10 ++-- pkg/peer/migrate_from.go | 14 ++--- pkg/peer/migrate_to.go | 8 +-- pkg/peer/resume.go | 12 ++-- pkg/peer/start.go | 10 ++-- pkg/peer/suspend.go | 2 +- pkg/runner/msync.go | 2 +- pkg/runner/resume.go | 19 +++--- pkg/runner/start.go | 8 +-- pkg/runner/suspend.go | 7 ++- pkg/snapshotter/create.go | 4 +- 18 files changed, 233 insertions(+), 78 deletions(-) diff --git a/cmd/drafter-agent/main.go b/cmd/drafter-agent/main.go index 06e301f..16695bf 100644 --- a/cmd/drafter-agent/main.go +++ b/cmd/drafter-agent/main.go @@ -55,7 +55,9 @@ func main() { cancel() }() - agentClient := ipc.NewHookAgentClient( + agentClient := ipc.NewAgentClient[struct{}]( + struct{}{}, + func(ctx context.Context) error { log.Println("Running pre-suspend command") @@ -95,7 +97,7 @@ func main() { dialCtx, cancelDialCtx := context.WithTimeout(goroutineManager.Context(), *vsockTimeout) defer cancelDialCtx() - connectedAgentClient, err := ipc.StartAgentClient[ipc.AgentClientLocal, struct{}]( + connectedAgentClient, err := ipc.StartAgentClient[*ipc.AgentClientLocal[struct{}], struct{}]( dialCtx, goroutineManager.Context(), diff --git a/cmd/drafter-peer/main.go b/cmd/drafter-peer/main.go index bd8f9c0..348ea3a 100644 --- a/cmd/drafter-peer/main.go +++ b/cmd/drafter-peer/main.go @@ -278,7 +278,7 @@ func main() { writers = []io.Writer{conn} } - p, err := peer.StartPeer[struct{}, ipc.AgentServerRemote]( + p, err := peer.StartPeer[struct{}, ipc.AgentServerRemote[struct{}]]( goroutineManager.Context(), context.Background(), // Never give up on rescue operations @@ -329,9 +329,9 @@ func main() { } }) - migrateFromDevices := []peer.MigrateFromDevice[struct{}, ipc.AgentServerRemote]{} + migrateFromDevices := []peer.MigrateFromDevice[struct{}, ipc.AgentServerRemote[struct{}], struct{}]{} for _, device := range devices { - migrateFromDevices = append(migrateFromDevices, peer.MigrateFromDevice[struct{}, ipc.AgentServerRemote]{ + migrateFromDevices = append(migrateFromDevices, peer.MigrateFromDevice[struct{}, ipc.AgentServerRemote[struct{}], struct{}]{ Name: device.Name, Base: device.Base, diff --git a/cmd/drafter-runner/main.go b/cmd/drafter-runner/main.go index 2d7cda6..1ee45f9 100644 --- a/cmd/drafter-runner/main.go +++ b/cmd/drafter-runner/main.go @@ -175,7 +175,7 @@ func main() { cancel() }() - r, err := runner.StartRunner[struct{}, ipc.AgentServerRemote]( + r, err := runner.StartRunner[struct{}, ipc.AgentServerRemote[struct{}]]( goroutineManager.Context(), context.Background(), // Never give up on rescue operations diff --git a/go.mod b/go.mod index 88a8370..bd47ff4 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/loopholelabs/goroutine-manager v0.1.1 github.com/loopholelabs/silo v0.0.8 github.com/metal-stack/go-ipam v1.14.0 - github.com/pojntfx/panrpc/go v0.0.0-20240816011753-7169be8c89fb + github.com/pojntfx/panrpc/go v0.0.0-20240913062914-ea5ef6b07692 github.com/vishvananda/netlink v1.1.0 github.com/vishvananda/netns v0.0.4 golang.org/x/sys v0.22.0 @@ -28,8 +28,10 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dustin/go-humanize v1.0.1 // indirect + github.com/fxamacker/cbor/v2 v2.7.0 // indirect github.com/go-ini/ini v1.67.0 // indirect github.com/goccy/go-json v0.10.3 // indirect github.com/gogo/protobuf v1.3.2 // indirect @@ -37,20 +39,49 @@ require ( github.com/golang/snappy v0.0.4 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/gorilla/websocket v1.5.3 // indirect github.com/hashicorp/hcl/v2 v2.20.0 // indirect github.com/jmoiron/sqlx v1.4.0 // indirect github.com/josharian/native v1.1.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/cpuid/v2 v2.2.8 // indirect github.com/lib/pq v1.10.9 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect github.com/mdlayher/genetlink v1.3.2 // indirect github.com/mdlayher/netlink v1.7.2 // indirect github.com/mdlayher/socket v0.4.1 // indirect github.com/minio/md5-simd v1.1.2 // indirect github.com/minio/minio-go/v7 v7.0.73 // indirect github.com/mitchellh/go-wordwrap v0.0.0-20150314170334-ad45545899c7 // indirect + github.com/mitchellh/mapstructure v1.5.0 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect github.com/montanaflynn/stats v0.7.1 // indirect + github.com/pion/datachannel v1.5.8 // indirect + github.com/pion/dtls/v2 v2.2.12 // indirect + github.com/pion/ice/v2 v2.3.31 // indirect + github.com/pion/interceptor v0.1.29 // indirect + github.com/pion/logging v0.2.2 // indirect + github.com/pion/mdns v0.0.12 // indirect + github.com/pion/randutil v0.1.0 // indirect + github.com/pion/rtcp v1.2.14 // indirect + github.com/pion/rtp v1.8.7 // indirect + github.com/pion/sctp v1.8.19 // indirect + github.com/pion/sdp/v3 v3.0.9 // indirect + github.com/pion/srtp/v2 v2.0.20 // indirect + github.com/pion/stun v0.6.1 // indirect + github.com/pion/transport/v2 v2.2.9 // indirect + github.com/pion/turn/v2 v2.1.6 // indirect + github.com/pion/webrtc/v3 v3.2.50 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/pojntfx/weron v0.2.6 // indirect github.com/redis/go-redis/v9 v9.6.1 // indirect github.com/rs/xid v1.5.0 // indirect + github.com/rs/zerolog v1.33.0 // indirect + github.com/stretchr/testify v1.9.0 // indirect + github.com/wlynxg/anet v0.0.3 // indirect + github.com/x448/float16 v0.8.4 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect @@ -73,4 +104,6 @@ require ( google.golang.org/genproto/googleapis/rpc v0.0.0-20240604185151-ef581f913117 // indirect google.golang.org/grpc v1.64.1 // indirect google.golang.org/protobuf v1.34.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect + nhooyr.io/websocket v1.8.11 // indirect ) diff --git a/go.sum b/go.sum index a550e86..71d136a 100644 --- a/go.sum +++ b/go.sum @@ -44,6 +44,7 @@ github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8 github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/dockercfg v0.3.1 h1:/FpZ+JaygUR/lZP2NlFI2DVfrOEMAIKP5wWEJdoYe9E= github.com/cpuguy83/dockercfg v0.3.1/go.mod h1:sugsbF4//dDlL/i+S+rtpIWp+5h0BHJHfjj5/jFyUJc= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= @@ -64,6 +65,8 @@ github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2 github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/freddierice/go-losetup/v2 v2.0.1 h1:wPDx/Elu9nDV8y/CvIbEDz5Xi5Zo80y4h7MKbi3XaAI= github.com/freddierice/go-losetup/v2 v2.0.1/go.mod h1:TEyBrvlOelsPEhfWD5rutNXDmUszBXuFnwT1kIQF4J8= +github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E= +github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ= github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A= github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= @@ -87,11 +90,15 @@ github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4= github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hashicorp/hcl/v2 v2.20.0 h1:l++cRs/5jQOiKVvqXZm/P1ZEfVXJmvLS9WSVxkaeTb4= github.com/hashicorp/hcl/v2 v2.20.0/go.mod h1:WmcD/Ym72MDOOx5F62Ly+leloeu6H7m0pG7VBiU6pQk= github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU= @@ -100,6 +107,8 @@ github.com/jmoiron/sqlx v1.4.0 h1:1PLqN7S1UYp5t4SrVVnt4nUVNemrDAtxlulVe+Qgm3o= github.com/jmoiron/sqlx v1.4.0/go.mod h1:ZrZ7UsYB/weZdl2Bxg6jCRO9c3YHl8r3ahlKmRT4JLY= github.com/josharian/native v1.1.0 h1:uuaP0hAbW7Y4l0ZRQ6C9zfb7Mg1mbFKry/xzDAfmtLA= github.com/josharian/native v1.1.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= @@ -107,8 +116,11 @@ github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ib github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM= github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348 h1:MtvEpTB6LX3vkb4ax0b5D2DHbNAUsen0Gx5wZoq3lV4= @@ -125,6 +137,12 @@ github.com/lufia/plan9stats v0.0.0-20240513124658-fba389f38bae h1:dIZY4ULFcto4tA github.com/lufia/plan9stats v0.0.0-20240513124658-fba389f38bae/go.mod h1:ilwx/Dta8jXAgpFYFvSWEMwxmbWXyiUHkd5FwyKhb5k= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/mdlayher/genetlink v1.3.2 h1:KdrNKe+CTu+IbZnm/GVUMXSqBBLqcGpRDa0xkQy56gw= @@ -153,6 +171,11 @@ github.com/moby/sys/user v0.1.0 h1:WmZ93f5Ux6het5iituh9x2zAG7NFY9Aqi49jjE1PaQg= github.com/moby/sys/user v0.1.0/go.mod h1:fKJhFOnsCN6xZ5gSfbM6zaHGgDJMrqt9/reuj4T7MmU= github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0= github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE= github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= @@ -165,12 +188,57 @@ github.com/opencontainers/runc v1.1.12 h1:BOIssBaW1La0/qbNZHXOOa71dZfZEQOzW7dqQf github.com/opencontainers/runc v1.1.12/go.mod h1:S+lQwSfncpBha7XTy/5lBwWgm5+y5Ma/O44Ekby9FK8= github.com/ory/dockertest/v3 v3.10.0 h1:4K3z2VMe8Woe++invjaTB7VRyQXQy5UY+loujO4aNE4= github.com/ory/dockertest/v3 v3.10.0/go.mod h1:nr57ZbRWMqfsdGdFNLHz5jjNdDb7VVFnzAeW1n5N1Lg= +github.com/pion/datachannel v1.5.8 h1:ph1P1NsGkazkjrvyMfhRBUAWMxugJjq2HfQifaOoSNo= +github.com/pion/datachannel v1.5.8/go.mod h1:PgmdpoaNBLX9HNzNClmdki4DYW5JtI7Yibu8QzbL3tI= +github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s= +github.com/pion/dtls/v2 v2.2.12 h1:KP7H5/c1EiVAAKUmXyCzPiQe5+bCJrpOeKg/L05dunk= +github.com/pion/dtls/v2 v2.2.12/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE= +github.com/pion/ice/v2 v2.3.31 h1:qag/YqiOn5qPi0kgeVdsytxjx8szuriWSIeXKu8dDQc= +github.com/pion/ice/v2 v2.3.31/go.mod h1:8fac0+qftclGy1tYd/nfwfHC729BLaxtVqMdMVCAVPU= +github.com/pion/interceptor v0.1.29 h1:39fsnlP1U8gw2JzOFWdfCU82vHvhW9o0rZnZF56wF+M= +github.com/pion/interceptor v0.1.29/go.mod h1:ri+LGNjRUc5xUNtDEPzfdkmSqISixVTBF/z/Zms/6T4= +github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= +github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= +github.com/pion/mdns v0.0.12 h1:CiMYlY+O0azojWDmxdNr7ADGrnZ+V6Ilfner+6mSVK8= +github.com/pion/mdns v0.0.12/go.mod h1:VExJjv8to/6Wqm1FXK+Ii/Z9tsVk/F5sD/N70cnYFbk= +github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= +github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= +github.com/pion/rtcp v1.2.12/go.mod h1:sn6qjxvnwyAkkPzPULIbVqSKI5Dv54Rv7VG0kNxh9L4= +github.com/pion/rtcp v1.2.14 h1:KCkGV3vJ+4DAJmvP0vaQShsb0xkRfWkO540Gy102KyE= +github.com/pion/rtcp v1.2.14/go.mod h1:sn6qjxvnwyAkkPzPULIbVqSKI5Dv54Rv7VG0kNxh9L4= +github.com/pion/rtp v1.8.3/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU= +github.com/pion/rtp v1.8.7 h1:qslKkG8qxvQ7hqaxkmL7Pl0XcUm+/Er7nMnu6Vq+ZxM= +github.com/pion/rtp v1.8.7/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU= +github.com/pion/sctp v1.8.19 h1:2CYuw+SQ5vkQ9t0HdOPccsCz1GQMDuVy5PglLgKVBW8= +github.com/pion/sctp v1.8.19/go.mod h1:P6PbDVA++OJMrVNg2AL3XtYHV4uD6dvfyOovCgMs0PE= +github.com/pion/sdp/v3 v3.0.9 h1:pX++dCHoHUwq43kuwf3PyJfHlwIj4hXA7Vrifiq0IJY= +github.com/pion/sdp/v3 v3.0.9/go.mod h1:B5xmvENq5IXJimIO4zfp6LAe1fD9N+kFv+V/1lOdz8M= +github.com/pion/srtp/v2 v2.0.20 h1:HNNny4s+OUmG280ETrCdgFndp4ufx3/uy85EawYEhTk= +github.com/pion/srtp/v2 v2.0.20/go.mod h1:0KJQjA99A6/a0DOVTu1PhDSw0CXF2jTkqOoMg3ODqdA= +github.com/pion/stun v0.6.1 h1:8lp6YejULeHBF8NmV8e2787BogQhduZugh5PdhDyyN4= +github.com/pion/stun v0.6.1/go.mod h1:/hO7APkX4hZKu/D0f2lHzNyvdkTGtIy3NDmLR7kSz/8= +github.com/pion/transport/v2 v2.2.1/go.mod h1:cXXWavvCnFF6McHTft3DWS9iic2Mftcz1Aq29pGcU5g= +github.com/pion/transport/v2 v2.2.3/go.mod h1:q2U/tf9FEfnSBGSW6w5Qp5PFWRLRj3NjLhCCgpRK4p0= +github.com/pion/transport/v2 v2.2.4/go.mod h1:q2U/tf9FEfnSBGSW6w5Qp5PFWRLRj3NjLhCCgpRK4p0= +github.com/pion/transport/v2 v2.2.8/go.mod h1:sq1kSLWs+cHW9E+2fJP95QudkzbK7wscs8yYgQToO5E= +github.com/pion/transport/v2 v2.2.9 h1:WEDygVovkJlV2CCunM9KS2kds+kcl7zdIefQA5y/nkE= +github.com/pion/transport/v2 v2.2.9/go.mod h1:sq1kSLWs+cHW9E+2fJP95QudkzbK7wscs8yYgQToO5E= +github.com/pion/transport/v3 v3.0.1/go.mod h1:UY7kiITrlMv7/IKgd5eTUcaahZx5oUN3l9SzK5f5xE0= +github.com/pion/turn/v2 v2.1.3/go.mod h1:huEpByKKHix2/b9kmTAM3YoX6MKP+/D//0ClgUYR2fY= +github.com/pion/turn/v2 v2.1.6 h1:Xr2niVsiPTB0FPtt+yAWKFUkU1eotQbGgpTIld4x1Gc= +github.com/pion/turn/v2 v2.1.6/go.mod h1:huEpByKKHix2/b9kmTAM3YoX6MKP+/D//0ClgUYR2fY= +github.com/pion/webrtc/v3 v3.2.50 h1:C/rwL2mBfCxHv6tlLzDAO3krJpQXfVx8A8WHnGJ2j34= +github.com/pion/webrtc/v3 v3.2.50/go.mod h1:dytYYoSBy7ZUWhJMbndx9UckgYvzNAfL7xgVnrIKxqo= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pojntfx/panrpc/go v0.0.0-20240816011753-7169be8c89fb h1:2vU/ZbsJ1uZhR2BCjc3mIfzR9GVuAWHSIUrw8jnLK9g= github.com/pojntfx/panrpc/go v0.0.0-20240816011753-7169be8c89fb/go.mod h1:G9YawT9jiXDf6z7WEv7KMIca+A41mjm8KL8AfBxN37U= +github.com/pojntfx/panrpc/go v0.0.0-20240913062914-ea5ef6b07692 h1:kiSksMNOL9fQQLen6hrOIC/Mgxcts6mAxcZBunOfCuA= +github.com/pojntfx/panrpc/go v0.0.0-20240913062914-ea5ef6b07692/go.mod h1:G9YawT9jiXDf6z7WEv7KMIca+A41mjm8KL8AfBxN37U= +github.com/pojntfx/weron v0.2.6 h1:WdxChtSlGk/04pRlBerWc5d1D5020bHpaZvzV0MOimw= +github.com/pojntfx/weron v0.2.6/go.mod h1:dnxOXCpQ0bgITpa7Uax/UYdpBJ+I32LzBhmHu+RT6gA= github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 h1:o4JXh1EVt9k/+g42oCprj/FisM4qX9L3sZB3upGN2ZU= github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4= @@ -179,6 +247,8 @@ github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjR github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc= github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= +github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8= +github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/shirou/gopsutil/v3 v3.24.5 h1:i0t8kL+kQTvpAYToeuiVk3TgDeKOFioZO3Ztz/iZ9pI= @@ -187,6 +257,15 @@ github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFt github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/testcontainers/testcontainers-go v0.31.0 h1:W0VwIhcEVhRflwL9as3dhY6jXjVCA27AkmbnZ+UTh3U= @@ -200,6 +279,10 @@ github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYp github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU= github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8= github.com/vishvananda/netns v0.0.4/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM= +github.com/wlynxg/anet v0.0.3 h1:PvR53psxFXstc12jelG6f1Lv4MWqE0tI76/hHGjh9rg= +github.com/wlynxg/anet v0.0.3/go.mod h1:eay5PRQr7fIVAMbTbchTnO9gG65Hg/uYGdc7mguHxoA= +github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= +github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= @@ -249,11 +332,15 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE= +golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= +golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30= golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA= golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= @@ -262,12 +349,18 @@ golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= +golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -278,15 +371,32 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU= +golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -294,6 +404,7 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg= golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -309,9 +420,13 @@ google.golang.org/grpc v1.64.1/go.mod h1:hiQF4LFZelK2WKaP6W0L92zGHtiQdZxk8CrSdvy google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 h1:pUdcCO1Lk/tbT5ztQWOBi5HBgbBP1J8+AsQnQCKsi8A= k8s.io/utils v0.0.0-20240711033017-18e509b52bc8/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= +nhooyr.io/websocket v1.8.11 h1:f/qXNc2/3DpoSZkHt1DQu6rj4zGC8JmkkLkWss0MgN0= +nhooyr.io/websocket v1.8.11/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c= diff --git a/pkg/ipc/agent_client.go b/pkg/ipc/agent_client.go index 25a19f8..1c5ed62 100644 --- a/pkg/ipc/agent_client.go +++ b/pkg/ipc/agent_client.go @@ -21,46 +21,47 @@ var ( // The RPCs the agent server can call on this client // See https://github.com/pojntfx/panrpc/tree/main?tab=readme-ov-file#5-calling-the-clients-rpcs-from-the-server -type AgentClientLocal interface { - BeforeSuspend(ctx context.Context) error - AfterResume(ctx context.Context) error +type AgentClientLocal[G any] struct { + GuestService G + + beforeSuspend func(ctx context.Context) error + afterResume func(ctx context.Context) error } // The RPCs this client can call on the agent server // See https://github.com/pojntfx/panrpc/tree/main?tab=readme-ov-file#4-calling-the-servers-rpcs-from-the-client type AgentClientRemote any -type HookAgentClient struct { - beforeSuspend func(ctx context.Context) error - afterResume func(ctx context.Context) error -} +func NewAgentClient[G any]( + guestService G, -func NewHookAgentClient( beforeSuspend func(ctx context.Context) error, afterResume func(ctx context.Context) error, -) *HookAgentClient { - return &HookAgentClient{ +) *AgentClientLocal[G] { + return &AgentClientLocal[G]{ + GuestService: guestService, + beforeSuspend: beforeSuspend, afterResume: afterResume, } } -func (l *HookAgentClient) BeforeSuspend(ctx context.Context) error { +func (l *AgentClientLocal[G]) BeforeSuspend(ctx context.Context) error { return l.beforeSuspend(ctx) } -func (l *HookAgentClient) AfterResume(ctx context.Context) error { +func (l *AgentClientLocal[G]) AfterResume(ctx context.Context) error { return l.afterResume(ctx) } -type ConnectedAgentClient[L AgentClientLocal, R AgentClientRemote] struct { +type ConnectedAgentClient[L *AgentClientLocal[G], R AgentClientRemote, G any] struct { Remote R Wait func() error Close func() } -func StartAgentClient[L AgentClientLocal, R AgentClientRemote]( +func StartAgentClient[L *AgentClientLocal[G], R AgentClientRemote, G any]( dialCtx context.Context, remoteCtx context.Context, @@ -68,8 +69,8 @@ func StartAgentClient[L AgentClientLocal, R AgentClientRemote]( vsockPort uint32, agentClientLocal L, -) (connectedAgentClient *ConnectedAgentClient[L, R], errs error) { - connectedAgentClient = &ConnectedAgentClient[L, R]{ +) (connectedAgentClient *ConnectedAgentClient[L, R, G], errs error) { + connectedAgentClient = &ConnectedAgentClient[L, R, G]{ Wait: func() error { return nil }, diff --git a/pkg/ipc/agent_server.go b/pkg/ipc/agent_server.go index 6ca0971..35fba10 100644 --- a/pkg/ipc/agent_server.go +++ b/pkg/ipc/agent_server.go @@ -28,12 +28,14 @@ type AgentServerLocal any // The RPCs this server can call on the agent client // See https://github.com/pojntfx/panrpc/tree/main?tab=readme-ov-file#4-calling-the-servers-rpcs-from-the-client -type AgentServerRemote struct { +type AgentServerRemote[G any] struct { + GuestService G + BeforeSuspend func(ctx context.Context) error AfterResume func(ctx context.Context) error } -type AgentServer[L AgentServerLocal, R AgentServerRemote] struct { +type AgentServer[L AgentServerLocal, R AgentServerRemote[G], G any] struct { VSockPath string Close func() @@ -46,17 +48,17 @@ type AgentServer[L AgentServerLocal, R AgentServerRemote] struct { agentServerLocal L } -func StartAgentServer[L AgentServerLocal, R AgentServerRemote]( +func StartAgentServer[L AgentServerLocal, R AgentServerRemote[G], G any]( vsockPath string, vsockPort uint32, agentServerLocal L, ) ( - agentServer *AgentServer[L, R], + agentServer *AgentServer[L, R, G], err error, ) { - agentServer = &AgentServer[L, R]{ + agentServer = &AgentServer[L, R, G]{ Close: func() {}, agentServerLocal: agentServerLocal, @@ -84,15 +86,15 @@ func StartAgentServer[L AgentServerLocal, R AgentServerRemote]( return } -type AcceptingAgentServer[L AgentServerLocal, R AgentServerRemote] struct { +type AcceptingAgentServer[L AgentServerLocal, R AgentServerRemote[G], G any] struct { Remote R Wait func() error Close func() error } -func (agentServer *AgentServer[L, R]) Accept(acceptCtx context.Context, remoteCtx context.Context) (acceptingAgentServer *AcceptingAgentServer[L, R], errs error) { - acceptingAgentServer = &AcceptingAgentServer[L, R]{ +func (agentServer *AgentServer[L, R, G]) Accept(acceptCtx context.Context, remoteCtx context.Context) (acceptingAgentServer *AcceptingAgentServer[L, R, G], errs error) { + acceptingAgentServer = &AcceptingAgentServer[L, R, G]{ Wait: func() error { return nil }, diff --git a/pkg/peer/make_migratable.go b/pkg/peer/make_migratable.go index ab2d565..d1a6714 100644 --- a/pkg/peer/make_migratable.go +++ b/pkg/peer/make_migratable.go @@ -15,23 +15,23 @@ import ( "github.com/loopholelabs/silo/pkg/storage/volatilitymonitor" ) -type ResumedPeer[L ipc.AgentServerLocal, R ipc.AgentServerRemote] struct { +type ResumedPeer[L ipc.AgentServerLocal, R ipc.AgentServerRemote[G], G any] struct { Remote R Wait func() error Close func() error - resumedRunner *runner.ResumedRunner[L, R] + resumedRunner *runner.ResumedRunner[L, R, G] stage2Inputs []migrateFromStage } -func (resumedPeer *ResumedPeer[L, R]) MakeMigratable( +func (resumedPeer *ResumedPeer[L, R, G]) MakeMigratable( ctx context.Context, devices []mounter.MakeMigratableDevice, -) (migratablePeer *MigratablePeer[L, R], errs error) { - migratablePeer = &MigratablePeer[L, R]{ +) (migratablePeer *MigratablePeer[L, R, G], errs error) { + migratablePeer = &MigratablePeer[L, R, G]{ Close: func() {}, resumedPeer: resumedPeer, diff --git a/pkg/peer/migrate_from.go b/pkg/peer/migrate_from.go index 02f147b..0dc71d5 100644 --- a/pkg/peer/migrate_from.go +++ b/pkg/peer/migrate_from.go @@ -29,7 +29,7 @@ import ( "golang.org/x/sys/unix" ) -type MigrateFromDevice[L ipc.AgentServerLocal, R ipc.AgentServerRemote] struct { +type MigrateFromDevice[L ipc.AgentServerLocal, R ipc.AgentServerRemote[G], G any] struct { Name string `json:"name"` Base string `json:"base"` @@ -41,21 +41,21 @@ type MigrateFromDevice[L ipc.AgentServerLocal, R ipc.AgentServerRemote] struct { Shared bool `json:"shared"` } -func (peer *Peer[L, R]) MigrateFrom( +func (peer *Peer[L, R, G]) MigrateFrom( ctx context.Context, - devices []MigrateFromDevice[L, R], + devices []MigrateFromDevice[L, R, G], readers []io.Reader, writers []io.Writer, hooks mounter.MigrateFromHooks, ) ( - migratedPeer *MigratedPeer[L, R], + migratedPeer *MigratedPeer[L, R, G], errs error, ) { - migratedPeer = &MigratedPeer[L, R]{ + migratedPeer = &MigratedPeer[L, R, G]{ Wait: func() error { return nil }, @@ -411,7 +411,7 @@ func (peer *Peer[L, R]) MigrateFrom( break } - stage1Inputs := []MigrateFromDevice[L, R]{} + stage1Inputs := []MigrateFromDevice[L, R, G]{} for _, input := range devices { if slices.ContainsFunc( migratedPeer.stage2Inputs, @@ -431,7 +431,7 @@ func (peer *Peer[L, R]) MigrateFrom( _, deferFuncs, err := utils.ConcurrentMap( stage1Inputs, - func(index int, input MigrateFromDevice[L, R], _ *struct{}, addDefer func(deferFunc func() error)) error { + func(index int, input MigrateFromDevice[L, R, G], _ *struct{}, addDefer func(deferFunc func() error)) error { if hook := hooks.OnLocalDeviceRequested; hook != nil { hook(uint32(index), input.Name) } diff --git a/pkg/peer/migrate_to.go b/pkg/peer/migrate_to.go index 6eae3f6..27d6dd6 100644 --- a/pkg/peer/migrate_to.go +++ b/pkg/peer/migrate_to.go @@ -38,15 +38,15 @@ type MigrateToHooks struct { OnAllMigrationsCompleted func() } -type MigratablePeer[L ipc.AgentServerLocal, R ipc.AgentServerRemote] struct { +type MigratablePeer[L ipc.AgentServerLocal, R ipc.AgentServerRemote[G], G any] struct { Close func() - resumedPeer *ResumedPeer[L, R] + resumedPeer *ResumedPeer[L, R, G] stage4Inputs []makeMigratableDeviceStage - resumedRunner *runner.ResumedRunner[L, R] + resumedRunner *runner.ResumedRunner[L, R, G] } -func (migratablePeer *MigratablePeer[L, R]) MigrateTo( +func (migratablePeer *MigratablePeer[L, R, G]) MigrateTo( ctx context.Context, devices []mounter.MigrateToDevice, diff --git a/pkg/peer/resume.go b/pkg/peer/resume.go index 2234e50..e399653 100644 --- a/pkg/peer/resume.go +++ b/pkg/peer/resume.go @@ -14,17 +14,17 @@ import ( "github.com/loopholelabs/drafter/pkg/snapshotter" ) -type MigratedPeer[L ipc.AgentServerLocal, R ipc.AgentServerRemote] struct { +type MigratedPeer[L ipc.AgentServerLocal, R ipc.AgentServerRemote[G], G any] struct { Wait func() error Close func() error - devices []MigrateFromDevice[L, R] - runner *runner.Runner[L, R] + devices []MigrateFromDevice[L, R, G] + runner *runner.Runner[L, R, G] stage2Inputs []migrateFromStage } -func (migratedPeer *MigratedPeer[L, R]) Resume( +func (migratedPeer *MigratedPeer[L, R, G]) Resume( ctx context.Context, resumeTimeout, @@ -33,8 +33,8 @@ func (migratedPeer *MigratedPeer[L, R]) Resume( agentServerLocal L, snapshotLoadConfiguration runner.SnapshotLoadConfiguration, -) (resumedPeer *ResumedPeer[L, R], errs error) { - resumedPeer = &ResumedPeer[L, R]{ +) (resumedPeer *ResumedPeer[L, R, G], errs error) { + resumedPeer = &ResumedPeer[L, R, G]{ Wait: func() error { return nil }, diff --git a/pkg/peer/start.go b/pkg/peer/start.go index d742bb7..e410739 100644 --- a/pkg/peer/start.go +++ b/pkg/peer/start.go @@ -10,7 +10,7 @@ import ( "github.com/loopholelabs/goroutine-manager/pkg/manager" ) -type Peer[L ipc.AgentServerLocal, R ipc.AgentServerRemote] struct { +type Peer[L ipc.AgentServerLocal, R ipc.AgentServerRemote[G], G any] struct { VMPath string VMPid int @@ -19,10 +19,10 @@ type Peer[L ipc.AgentServerLocal, R ipc.AgentServerRemote] struct { hypervisorCtx context.Context - runner *runner.Runner[L, R] + runner *runner.Runner[L, R, G] } -func StartPeer[L ipc.AgentServerLocal, R ipc.AgentServerRemote]( +func StartPeer[L ipc.AgentServerLocal, R ipc.AgentServerRemote[G], G any]( hypervisorCtx context.Context, rescueCtx context.Context, @@ -31,11 +31,11 @@ func StartPeer[L ipc.AgentServerLocal, R ipc.AgentServerRemote]( stateName string, memoryName string, ) ( - peer *Peer[L, R], + peer *Peer[L, R, G], errs error, ) { - peer = &Peer[L, R]{ + peer = &Peer[L, R, G]{ hypervisorCtx: hypervisorCtx, Wait: func() error { diff --git a/pkg/peer/suspend.go b/pkg/peer/suspend.go index bc5a244..de69ba4 100644 --- a/pkg/peer/suspend.go +++ b/pkg/peer/suspend.go @@ -5,7 +5,7 @@ import ( "time" ) -func (resumedPeer *ResumedPeer[L, R]) SuspendAndCloseAgentServer(ctx context.Context, resumeTimeout time.Duration) error { +func (resumedPeer *ResumedPeer[L, R, G]) SuspendAndCloseAgentServer(ctx context.Context, resumeTimeout time.Duration) error { return resumedPeer.resumedRunner.SuspendAndCloseAgentServer( ctx, diff --git a/pkg/runner/msync.go b/pkg/runner/msync.go index 251658b..beef62a 100644 --- a/pkg/runner/msync.go +++ b/pkg/runner/msync.go @@ -8,7 +8,7 @@ import ( "github.com/loopholelabs/drafter/pkg/snapshotter" ) -func (resumedRunner *ResumedRunner[L, R]) Msync(ctx context.Context) error { +func (resumedRunner *ResumedRunner[L, R, G]) Msync(ctx context.Context) error { if !resumedRunner.snapshotLoadConfiguration.ExperimentalMapPrivate { if err := firecracker.CreateSnapshot( ctx, diff --git a/pkg/runner/resume.go b/pkg/runner/resume.go index 9054f9e..7c21cb9 100644 --- a/pkg/runner/resume.go +++ b/pkg/runner/resume.go @@ -6,8 +6,8 @@ import ( "io" "os" "path/filepath" - "reflect" "time" + "unsafe" "github.com/lithammer/shortuuid/v4" "github.com/loopholelabs/drafter/internal/firecracker" @@ -18,7 +18,7 @@ import ( "github.com/loopholelabs/goroutine-manager/pkg/manager" ) -type ResumedRunner[L ipc.AgentServerLocal, R ipc.AgentServerRemote] struct { +type ResumedRunner[L ipc.AgentServerLocal, R ipc.AgentServerRemote[G], G any] struct { Remote R Wait func() error @@ -26,15 +26,15 @@ type ResumedRunner[L ipc.AgentServerLocal, R ipc.AgentServerRemote] struct { snapshotLoadConfiguration SnapshotLoadConfiguration - runner *Runner[L, R] + runner *Runner[L, R, G] - agent *ipc.AgentServer[L, R] - acceptingAgent *ipc.AcceptingAgentServer[L, R] + agent *ipc.AgentServer[L, R, G] + acceptingAgent *ipc.AcceptingAgentServer[L, R, G] createSnapshot func(ctx context.Context) error } -func (runner *Runner[L, R]) Resume( +func (runner *Runner[L, R, G]) Resume( ctx context.Context, resumeTimeout time.Duration, @@ -45,11 +45,11 @@ func (runner *Runner[L, R]) Resume( snapshotLoadConfiguration SnapshotLoadConfiguration, ) ( - resumedRunner *ResumedRunner[L, R], + resumedRunner *ResumedRunner[L, R, G], errs error, ) { - resumedRunner = &ResumedRunner[L, R]{ + resumedRunner = &ResumedRunner[L, R, G]{ Wait: func() error { return nil }, Close: func() error { return nil }, @@ -272,7 +272,8 @@ func (runner *Runner[L, R]) Resume( // This is a safe type cast because R is constrained by ipc.AgentServerRemote, so this specific AfterResume field // must be defined or there will be a compile-time error. // The Go Generics system can't catch this here however, it can only catch it once the type is concrete, so we need to manually cast. - if err := reflect.ValueOf(resumedRunner.acceptingAgent.Remote).Interface().(ipc.AgentServerRemote).AfterResume(afterResumeCtx); err != nil { + remote := *(*ipc.AgentServerRemote[G])(unsafe.Pointer(&resumedRunner.acceptingAgent.Remote)) + if err := remote.AfterResume(afterResumeCtx); err != nil { panic(errors.Join(ErrCouldNotCallAfterResumeRPC, err)) } } diff --git a/pkg/runner/start.go b/pkg/runner/start.go index 4f70dfb..23ddf08 100644 --- a/pkg/runner/start.go +++ b/pkg/runner/start.go @@ -22,7 +22,7 @@ type SnapshotLoadConfiguration struct { ExperimentalMapPrivateMemoryOutput string } -type Runner[L ipc.AgentServerLocal, R ipc.AgentServerRemote] struct { +type Runner[L ipc.AgentServerLocal, R ipc.AgentServerRemote[G], G any] struct { VMPath string VMPid int @@ -42,7 +42,7 @@ type Runner[L ipc.AgentServerLocal, R ipc.AgentServerRemote] struct { rescueCtx context.Context } -func StartRunner[L ipc.AgentServerLocal, R ipc.AgentServerRemote]( +func StartRunner[L ipc.AgentServerLocal, R ipc.AgentServerRemote[G], G any]( hypervisorCtx context.Context, rescueCtx context.Context, @@ -51,11 +51,11 @@ func StartRunner[L ipc.AgentServerLocal, R ipc.AgentServerRemote]( stateName string, memoryName string, ) ( - runner *Runner[L, R], + runner *Runner[L, R, G], errs error, ) { - runner = &Runner[L, R]{ + runner = &Runner[L, R, G]{ Wait: func() error { return nil }, Close: func() error { return nil }, diff --git a/pkg/runner/suspend.go b/pkg/runner/suspend.go index aed8276..2fd00d3 100644 --- a/pkg/runner/suspend.go +++ b/pkg/runner/suspend.go @@ -3,21 +3,22 @@ package runner import ( "context" "errors" - "reflect" "time" + "unsafe" "github.com/loopholelabs/drafter/pkg/ipc" "github.com/loopholelabs/drafter/pkg/snapshotter" ) -func (resumedRunner *ResumedRunner[L, R]) SuspendAndCloseAgentServer(ctx context.Context, suspendTimeout time.Duration) error { +func (resumedRunner *ResumedRunner[L, R, G]) SuspendAndCloseAgentServer(ctx context.Context, suspendTimeout time.Duration) error { suspendCtx, cancelSuspendCtx := context.WithTimeout(ctx, suspendTimeout) defer cancelSuspendCtx() // This is a safe type cast because R is constrained by ipc.AgentServerRemote, so this specific BeforeSuspend field // must be defined or there will be a compile-time error. // The Go Generics system can't catch this here however, it can only catch it once the type is concrete, so we need to manually cast. - if err := reflect.ValueOf(resumedRunner.acceptingAgent.Remote).Interface().(ipc.AgentServerRemote).BeforeSuspend(suspendCtx); err != nil { + remote := *(*ipc.AgentServerRemote[G])(unsafe.Pointer(&resumedRunner.acceptingAgent.Remote)) + if err := remote.BeforeSuspend(suspendCtx); err != nil { return errors.Join(ErrCouldNotCallBeforeSuspendRPC, err) } diff --git a/pkg/snapshotter/create.go b/pkg/snapshotter/create.go index ab35f9e..2a08452 100644 --- a/pkg/snapshotter/create.go +++ b/pkg/snapshotter/create.go @@ -141,7 +141,7 @@ func CreateSnapshot( panic(errors.Join(ErrCouldNotChownLivenessServerVSock, err)) } - agent, err := ipc.StartAgentServer[struct{}, ipc.AgentServerRemote]( + agent, err := ipc.StartAgentServer[struct{}, ipc.AgentServerRemote[struct{}]]( filepath.Join(server.VMPath, VSockName), uint32(agentConfiguration.AgentVSockPort), @@ -249,7 +249,7 @@ func CreateSnapshot( } } - var acceptingAgent *ipc.AcceptingAgentServer[struct{}, ipc.AgentServerRemote] + var acceptingAgent *ipc.AcceptingAgentServer[struct{}, ipc.AgentServerRemote[struct{}], struct{}] { acceptCtx, cancel := context.WithTimeout(goroutineManager.Context(), agentConfiguration.ResumeTimeout) defer cancel() From 46fe29c7262cc6b8221656c092e95b1c6a0d0e35 Mon Sep 17 00:00:00 2001 From: Felicitas Pojtinger Date: Tue, 17 Sep 2024 22:49:20 -0700 Subject: [PATCH 4/6] refactor: Drop all uses of manual `.Close` calls (see https://github.com/firecracker-microvm/firecracker/issues/4811) Signed-off-by: Felicitas Pojtinger --- internal/vsock/dialer.go | 8 ++++++++ pkg/ipc/agent_client.go | 2 +- pkg/ipc/agent_server.go | 2 +- pkg/ipc/liveness_client.go | 7 ++----- 4 files changed, 12 insertions(+), 7 deletions(-) diff --git a/internal/vsock/dialer.go b/internal/vsock/dialer.go index 29c8dee..bcf1ea8 100644 --- a/internal/vsock/dialer.go +++ b/internal/vsock/dialer.go @@ -4,6 +4,7 @@ import ( "context" "errors" "io" + "sync" "github.com/loopholelabs/goroutine-manager/pkg/manager" "golang.org/x/sys/unix" @@ -35,10 +36,14 @@ func DialContext( panic(errors.Join(ErrVSockSocketCreationFailed, err)) } + var cLock sync.Mutex goroutineManager.StartForegroundGoroutine(func(_ context.Context) { <-goroutineManager.Context().Done() // Non-happy path; context was cancelled before `connect()` completed + cLock.Lock() + defer cLock.Unlock() + if c == nil { if err := unix.Shutdown(fd, unix.SHUT_RDWR); err != nil { // Always close the file descriptor even if shutdown fails @@ -70,6 +75,9 @@ func DialContext( panic(errors.Join(ErrVSockConnectFailed, err)) } + cLock.Lock() + defer cLock.Unlock() + c = &conn{fd} return diff --git a/pkg/ipc/agent_client.go b/pkg/ipc/agent_client.go index 1c5ed62..bf4d394 100644 --- a/pkg/ipc/agent_client.go +++ b/pkg/ipc/agent_client.go @@ -145,7 +145,7 @@ func StartAgentClient[L *AgentClientLocal[G], R AgentClientRemote, G any]( ) connectedAgentClient.Wait = sync.OnceValue(func() error { - defer conn.Close() // We ignore errors here since we might interrupt a network connection + // We don't `defer conn.Close` here since Firecracker handles resetting active VSock connections for us defer cancelLinkCtx(nil) encoder := json.NewEncoder(conn) diff --git a/pkg/ipc/agent_server.go b/pkg/ipc/agent_server.go index 35fba10..33f05c2 100644 --- a/pkg/ipc/agent_server.go +++ b/pkg/ipc/agent_server.go @@ -199,7 +199,7 @@ func (agentServer *AgentServer[L, R, G]) Accept(acceptCtx context.Context, remot ) acceptingAgentServer.Wait = sync.OnceValue(func() error { - defer conn.Close() // We ignore errors here since we might interrupt a network connection + // We don't `defer conn.Close` here since Firecracker handles resetting active VSock connections for us defer cancelLinkCtx(nil) encoder := json.NewEncoder(conn) diff --git a/pkg/ipc/liveness_client.go b/pkg/ipc/liveness_client.go index ce00809..ab42aeb 100644 --- a/pkg/ipc/liveness_client.go +++ b/pkg/ipc/liveness_client.go @@ -17,14 +17,11 @@ func SendLivenessPing( vsockCID uint32, vsockPort uint32, ) error { - conn, err := vsock.DialContext(ctx, vsockCID, vsockPort) - if err != nil { + if _, err := vsock.DialContext(ctx, vsockCID, vsockPort); err != nil { return errors.Join(ErrCouldNotDialLivenessVSockConnection, err) } - if err := conn.Close(); err != nil { - return errors.Join(ErrCouldNotCloseLivenessVSockConnection, err) - } + // We don't `conn.Close` here since Firecracker handles resetting active VSock connections for us return nil } From 17d4a2f374154665dce7ae0dfc7ef4095044d4ce Mon Sep 17 00:00:00 2001 From: Felicitas Pojtinger Date: Fri, 20 Sep 2024 00:23:03 -0700 Subject: [PATCH 5/6] chore: Bump buildroot to latest release Signed-off-by: Felicitas Pojtinger --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index c744193..307d1cd 100644 --- a/Makefile +++ b/Makefile @@ -8,7 +8,7 @@ OCI_IMAGE_URI ?= docker://valkey/valkey:latest OCI_IMAGE_ARCHITECTURE ?= amd64 OCI_IMAGE_HOSTNAME ?= drafterguest -OS_URL ?= https://buildroot.org/downloads/buildroot-2024.08-rc1.tar.gz +OS_URL ?= https://buildroot.org/downloads/buildroot-2024.08.tar.gz OS_DEFCONFIG ?= drafteros-firecracker-x86_64_defconfig OS_BR2_EXTERNAL ?= ../../os From dc12384efeb8f13956d2ca955c03bb57a85798f3 Mon Sep 17 00:00:00 2001 From: Felicitas Pojtinger Date: Thu, 26 Sep 2024 22:12:09 -0700 Subject: [PATCH 6/6] feat: Add support for bi-directional RPC guest services by exposing the registry through hooks in agent client and server Signed-off-by: Felicitas Pojtinger --- cmd/drafter-agent/main.go | 1 + cmd/drafter-peer/main.go | 1 + cmd/drafter-runner/main.go | 1 + pkg/ipc/agent_client.go | 9 +++++++++ pkg/ipc/agent_server.go | 15 ++++++++++++++- pkg/peer/resume.go | 2 ++ pkg/runner/resume.go | 8 +++++++- pkg/snapshotter/create.go | 7 ++++++- 8 files changed, 41 insertions(+), 3 deletions(-) diff --git a/cmd/drafter-agent/main.go b/cmd/drafter-agent/main.go index 16695bf..fd64453 100644 --- a/cmd/drafter-agent/main.go +++ b/cmd/drafter-agent/main.go @@ -105,6 +105,7 @@ func main() { uint32(*vsockPort), agentClient, + ipc.StartAgentClientHooks[struct{}]{}, ) if err != nil { return err diff --git a/cmd/drafter-peer/main.go b/cmd/drafter-peer/main.go index 348ea3a..075085d 100644 --- a/cmd/drafter-peer/main.go +++ b/cmd/drafter-peer/main.go @@ -421,6 +421,7 @@ func main() { *rescueTimeout, struct{}{}, + ipc.AgentServerAcceptHooks[ipc.AgentServerRemote[struct{}], struct{}]{}, runner.SnapshotLoadConfiguration{ ExperimentalMapPrivate: *experimentalMapPrivate, diff --git a/cmd/drafter-runner/main.go b/cmd/drafter-runner/main.go index 1ee45f9..cf1db04 100644 --- a/cmd/drafter-runner/main.go +++ b/cmd/drafter-runner/main.go @@ -305,6 +305,7 @@ func main() { packageConfig.AgentVSockPort, struct{}{}, + ipc.AgentServerAcceptHooks[ipc.AgentServerRemote[struct{}], struct{}]{}, runner.SnapshotLoadConfiguration{ ExperimentalMapPrivate: *experimentalMapPrivate, diff --git a/pkg/ipc/agent_client.go b/pkg/ipc/agent_client.go index bf4d394..5017f94 100644 --- a/pkg/ipc/agent_client.go +++ b/pkg/ipc/agent_client.go @@ -61,6 +61,10 @@ type ConnectedAgentClient[L *AgentClientLocal[G], R AgentClientRemote, G any] st Close func() } +type StartAgentClientHooks[R AgentClientRemote] struct { + OnAfterRegistrySetup func(forRemotes func(cb func(remoteID string, remote R) error) error) error +} + func StartAgentClient[L *AgentClientLocal[G], R AgentClientRemote, G any]( dialCtx context.Context, remoteCtx context.Context, @@ -69,6 +73,7 @@ func StartAgentClient[L *AgentClientLocal[G], R AgentClientRemote, G any]( vsockPort uint32, agentClientLocal L, + hooks StartAgentClientHooks[R], ) (connectedAgentClient *ConnectedAgentClient[L, R, G], errs error) { connectedAgentClient = &ConnectedAgentClient[L, R, G]{ Wait: func() error { @@ -144,6 +149,10 @@ func StartAgentClient[L *AgentClientLocal[G], R AgentClientRemote, G any]( }, ) + if hook := hooks.OnAfterRegistrySetup; hook != nil { + hook(registry.ForRemotes) + } + connectedAgentClient.Wait = sync.OnceValue(func() error { // We don't `defer conn.Close` here since Firecracker handles resetting active VSock connections for us defer cancelLinkCtx(nil) diff --git a/pkg/ipc/agent_server.go b/pkg/ipc/agent_server.go index 33f05c2..9289151 100644 --- a/pkg/ipc/agent_server.go +++ b/pkg/ipc/agent_server.go @@ -86,6 +86,10 @@ func StartAgentServer[L AgentServerLocal, R AgentServerRemote[G], G any]( return } +type AgentServerAcceptHooks[R AgentServerRemote[G], G any] struct { + OnAfterRegistrySetup func(forRemotes func(cb func(remoteID string, remote R) error) error) error +} + type AcceptingAgentServer[L AgentServerLocal, R AgentServerRemote[G], G any] struct { Remote R @@ -93,7 +97,12 @@ type AcceptingAgentServer[L AgentServerLocal, R AgentServerRemote[G], G any] str Close func() error } -func (agentServer *AgentServer[L, R, G]) Accept(acceptCtx context.Context, remoteCtx context.Context) (acceptingAgentServer *AcceptingAgentServer[L, R, G], errs error) { +func (agentServer *AgentServer[L, R, G]) Accept( + acceptCtx context.Context, + remoteCtx context.Context, + + hooks AgentServerAcceptHooks[R, G], +) (acceptingAgentServer *AcceptingAgentServer[L, R, G], errs error) { acceptingAgentServer = &AcceptingAgentServer[L, R, G]{ Wait: func() error { return nil @@ -198,6 +207,10 @@ func (agentServer *AgentServer[L, R, G]) Accept(acceptCtx context.Context, remot }, ) + if hook := hooks.OnAfterRegistrySetup; hook != nil { + hook(registry.ForRemotes) + } + acceptingAgentServer.Wait = sync.OnceValue(func() error { // We don't `defer conn.Close` here since Firecracker handles resetting active VSock connections for us defer cancelLinkCtx(nil) diff --git a/pkg/peer/resume.go b/pkg/peer/resume.go index e399653..573df21 100644 --- a/pkg/peer/resume.go +++ b/pkg/peer/resume.go @@ -31,6 +31,7 @@ func (migratedPeer *MigratedPeer[L, R, G]) Resume( rescueTimeout time.Duration, agentServerLocal L, + agentServerHooks ipc.AgentServerAcceptHooks[R, G], snapshotLoadConfiguration runner.SnapshotLoadConfiguration, ) (resumedPeer *ResumedPeer[L, R, G], errs error) { @@ -77,6 +78,7 @@ func (migratedPeer *MigratedPeer[L, R, G]) Resume( packageConfig.AgentVSockPort, agentServerLocal, + agentServerHooks, snapshotLoadConfiguration, ) diff --git a/pkg/runner/resume.go b/pkg/runner/resume.go index 7c21cb9..8ac738b 100644 --- a/pkg/runner/resume.go +++ b/pkg/runner/resume.go @@ -42,6 +42,7 @@ func (runner *Runner[L, R, G]) Resume( agentVSockPort uint32, agentServerLocal L, + agentServerHooks ipc.AgentServerAcceptHooks[R, G], snapshotLoadConfiguration SnapshotLoadConfiguration, ) ( @@ -235,7 +236,12 @@ func (runner *Runner[L, R, G]) Resume( suspendOnPanicWithError = true - resumedRunner.acceptingAgent, err = resumedRunner.agent.Accept(resumeSnapshotAndAcceptCtx, ctx) + resumedRunner.acceptingAgent, err = resumedRunner.agent.Accept( + resumeSnapshotAndAcceptCtx, + ctx, + + agentServerHooks, + ) if err != nil { panic(errors.Join(ErrCouldNotAcceptAgent, err)) } diff --git a/pkg/snapshotter/create.go b/pkg/snapshotter/create.go index 2a08452..e59ae71 100644 --- a/pkg/snapshotter/create.go +++ b/pkg/snapshotter/create.go @@ -254,7 +254,12 @@ func CreateSnapshot( acceptCtx, cancel := context.WithTimeout(goroutineManager.Context(), agentConfiguration.ResumeTimeout) defer cancel() - acceptingAgent, err = agent.Accept(acceptCtx, goroutineManager.Context()) + acceptingAgent, err = agent.Accept( + acceptCtx, + goroutineManager.Context(), + + ipc.AgentServerAcceptHooks[ipc.AgentServerRemote[struct{}], struct{}]{}, + ) if err != nil { panic(errors.Join(ErrCouldNotAcceptAgentConnection, err)) }