Skip to content

Commit

Permalink
Merge pull request #42 from loopholelabs/allow-custom-agent-service
Browse files Browse the repository at this point in the history
Allow defining custom agent services
  • Loading branch information
pojntfx authored Oct 3, 2024
2 parents 257ceb6 + dc12384 commit f3cf200
Show file tree
Hide file tree
Showing 22 changed files with 221 additions and 99 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ OCI_IMAGE_URI ?= docker://valkey/valkey:latest
OCI_IMAGE_ARCHITECTURE ?= amd64
OCI_IMAGE_HOSTNAME ?= drafterguest

OS_URL ?= https://buildroot.org/downloads/buildroot-2024.08-rc1.tar.gz
OS_URL ?= https://buildroot.org/downloads/buildroot-2024.08.tar.gz
OS_DEFCONFIG ?= drafteros-firecracker-x86_64_defconfig
OS_BR2_EXTERNAL ?= ../../os

Expand Down
7 changes: 5 additions & 2 deletions cmd/drafter-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ func main() {
cancel()
}()

agentClient := ipc.NewAgentClient(
agentClient := ipc.NewAgentClient[struct{}](
struct{}{},

func(ctx context.Context) error {
log.Println("Running pre-suspend command")

Expand Down Expand Up @@ -95,14 +97,15 @@ func main() {
dialCtx, cancelDialCtx := context.WithTimeout(goroutineManager.Context(), *vsockTimeout)
defer cancelDialCtx()

connectedAgentClient, err := ipc.StartAgentClient(
connectedAgentClient, err := ipc.StartAgentClient[*ipc.AgentClientLocal[struct{}], struct{}](
dialCtx,
goroutineManager.Context(),

ipc.VSockCIDHost,
uint32(*vsockPort),

agentClient,
ipc.StartAgentClientHooks[struct{}]{},
)
if err != nil {
return err
Expand Down
10 changes: 7 additions & 3 deletions cmd/drafter-peer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"sync"
"time"

"github.com/loopholelabs/drafter/pkg/ipc"
"github.com/loopholelabs/drafter/pkg/mounter"
"github.com/loopholelabs/drafter/pkg/packager"
"github.com/loopholelabs/drafter/pkg/peer"
Expand Down Expand Up @@ -277,7 +278,7 @@ func main() {
writers = []io.Writer{conn}
}

p, err := peer.StartPeer(
p, err := peer.StartPeer[struct{}, ipc.AgentServerRemote[struct{}]](
goroutineManager.Context(),
context.Background(), // Never give up on rescue operations

Expand Down Expand Up @@ -328,9 +329,9 @@ func main() {
}
})

migrateFromDevices := []peer.MigrateFromDevice{}
migrateFromDevices := []peer.MigrateFromDevice[struct{}, ipc.AgentServerRemote[struct{}], struct{}]{}
for _, device := range devices {
migrateFromDevices = append(migrateFromDevices, peer.MigrateFromDevice{
migrateFromDevices = append(migrateFromDevices, peer.MigrateFromDevice[struct{}, ipc.AgentServerRemote[struct{}], struct{}]{
Name: device.Name,

Base: device.Base,
Expand Down Expand Up @@ -419,6 +420,9 @@ func main() {
*resumeTimeout,
*rescueTimeout,

struct{}{},
ipc.AgentServerAcceptHooks[ipc.AgentServerRemote[struct{}], struct{}]{},

runner.SnapshotLoadConfiguration{
ExperimentalMapPrivate: *experimentalMapPrivate,

Expand Down
6 changes: 5 additions & 1 deletion cmd/drafter-runner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"syscall"
"time"

"github.com/loopholelabs/drafter/pkg/ipc"
"github.com/loopholelabs/drafter/pkg/packager"
"github.com/loopholelabs/drafter/pkg/peer"
"github.com/loopholelabs/drafter/pkg/runner"
Expand Down Expand Up @@ -174,7 +175,7 @@ func main() {
cancel()
}()

r, err := runner.StartRunner(
r, err := runner.StartRunner[struct{}, ipc.AgentServerRemote[struct{}]](
goroutineManager.Context(),
context.Background(), // Never give up on rescue operations

Expand Down Expand Up @@ -303,6 +304,9 @@ func main() {
*rescueTimeout,
packageConfig.AgentVSockPort,

struct{}{},
ipc.AgentServerAcceptHooks[ipc.AgentServerRemote[struct{}], struct{}]{},

runner.SnapshotLoadConfiguration{
ExperimentalMapPrivate: *experimentalMapPrivate,

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/loopholelabs/goroutine-manager v0.1.1
github.com/loopholelabs/silo v0.0.8
github.com/metal-stack/go-ipam v1.14.0
github.com/pojntfx/panrpc/go v0.0.0-20240816011753-7169be8c89fb
github.com/pojntfx/panrpc/go v0.0.0-20240913062914-ea5ef6b07692
github.com/vishvananda/netlink v1.1.0
github.com/vishvananda/netns v0.0.4
golang.org/x/sys v0.24.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pojntfx/panrpc/go v0.0.0-20240816011753-7169be8c89fb h1:2vU/ZbsJ1uZhR2BCjc3mIfzR9GVuAWHSIUrw8jnLK9g=
github.com/pojntfx/panrpc/go v0.0.0-20240816011753-7169be8c89fb/go.mod h1:G9YawT9jiXDf6z7WEv7KMIca+A41mjm8KL8AfBxN37U=
github.com/pojntfx/panrpc/go v0.0.0-20240913062914-ea5ef6b07692 h1:kiSksMNOL9fQQLen6hrOIC/Mgxcts6mAxcZBunOfCuA=
github.com/pojntfx/panrpc/go v0.0.0-20240913062914-ea5ef6b07692/go.mod h1:G9YawT9jiXDf6z7WEv7KMIca+A41mjm8KL8AfBxN37U=
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 h1:o4JXh1EVt9k/+g42oCprj/FisM4qX9L3sZB3upGN2ZU=
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4=
Expand Down
8 changes: 0 additions & 8 deletions internal/remotes/agent.go

This file was deleted.

8 changes: 8 additions & 0 deletions internal/vsock/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"io"
"sync"

"github.com/loopholelabs/goroutine-manager/pkg/manager"
"golang.org/x/sys/unix"
Expand Down Expand Up @@ -35,10 +36,14 @@ func DialContext(
panic(errors.Join(ErrVSockSocketCreationFailed, err))
}

var cLock sync.Mutex
goroutineManager.StartForegroundGoroutine(func(_ context.Context) {
<-goroutineManager.Context().Done()

// Non-happy path; context was cancelled before `connect()` completed
cLock.Lock()
defer cLock.Unlock()

if c == nil {
if err := unix.Shutdown(fd, unix.SHUT_RDWR); err != nil {
// Always close the file descriptor even if shutdown fails
Expand Down Expand Up @@ -70,6 +75,9 @@ func DialContext(
panic(errors.Join(ErrVSockConnectFailed, err))
}

cLock.Lock()
defer cLock.Unlock()

c = &conn{fd}

return
Expand Down
65 changes: 51 additions & 14 deletions pkg/ipc/agent_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,44 +19,63 @@ var (
ErrAgentContextCancelled = errors.New("agent context cancelled")
)

type AgentClient struct {
// The RPCs the agent server can call on this client
// See https://github.com/pojntfx/panrpc/tree/main?tab=readme-ov-file#5-calling-the-clients-rpcs-from-the-server
type AgentClientLocal[G any] struct {
GuestService G

beforeSuspend func(ctx context.Context) error
afterResume func(ctx context.Context) error
}

func NewAgentClient(
// The RPCs this client can call on the agent server
// See https://github.com/pojntfx/panrpc/tree/main?tab=readme-ov-file#4-calling-the-servers-rpcs-from-the-client
type AgentClientRemote any

func NewAgentClient[G any](
guestService G,

beforeSuspend func(ctx context.Context) error,
afterResume func(ctx context.Context) error,
) *AgentClient {
return &AgentClient{
) *AgentClientLocal[G] {
return &AgentClientLocal[G]{
GuestService: guestService,

beforeSuspend: beforeSuspend,
afterResume: afterResume,
}
}

func (l *AgentClient) BeforeSuspend(ctx context.Context) error {
func (l *AgentClientLocal[G]) BeforeSuspend(ctx context.Context) error {
return l.beforeSuspend(ctx)
}

func (l *AgentClient) AfterResume(ctx context.Context) error {
func (l *AgentClientLocal[G]) AfterResume(ctx context.Context) error {
return l.afterResume(ctx)
}

type ConnectedAgentClient struct {
type ConnectedAgentClient[L *AgentClientLocal[G], R AgentClientRemote, G any] struct {
Remote R

Wait func() error
Close func()
}

func StartAgentClient(
type StartAgentClientHooks[R AgentClientRemote] struct {
OnAfterRegistrySetup func(forRemotes func(cb func(remoteID string, remote R) error) error) error
}

func StartAgentClient[L *AgentClientLocal[G], R AgentClientRemote, G any](
dialCtx context.Context,
remoteCtx context.Context,

vsockCID uint32,
vsockPort uint32,

agentClient *AgentClient,
) (connectedAgentClient *ConnectedAgentClient, errs error) {
connectedAgentClient = &ConnectedAgentClient{
agentClientLocal L,
hooks StartAgentClientHooks[R],
) (connectedAgentClient *ConnectedAgentClient[L, R, G], errs error) {
connectedAgentClient = &ConnectedAgentClient[L, R, G]{
Wait: func() error {
return nil
},
Expand Down Expand Up @@ -120,8 +139,8 @@ func StartAgentClient(
}
})

registry := rpc.NewRegistry[struct{}, json.RawMessage](
agentClient,
registry := rpc.NewRegistry[R, json.RawMessage](
agentClientLocal,

&rpc.RegistryHooks{
OnClientConnect: func(remoteID string) {
Expand All @@ -130,8 +149,12 @@ func StartAgentClient(
},
)

if hook := hooks.OnAfterRegistrySetup; hook != nil {
hook(registry.ForRemotes)
}

connectedAgentClient.Wait = sync.OnceValue(func() error {
defer conn.Close() // We ignore errors here since we might interrupt a network connection
// We don't `defer conn.Close` here since Firecracker handles resetting active VSock connections for us
defer cancelLinkCtx(nil)

encoder := json.NewEncoder(conn)
Expand Down Expand Up @@ -199,5 +222,19 @@ func StartAgentClient(
break
}

found := false
if err := registry.ForRemotes(func(remoteID string, r R) error {
connectedAgentClient.Remote = r
found = true

return nil
}); err != nil {
panic(err)
}

if !found {
panic(ErrNoRemoteFound)
}

return
}
Loading

0 comments on commit f3cf200

Please sign in to comment.