Skip to content

Commit

Permalink
feat: [CDE-585]: Support creation and attaching of persistent disks (#…
Browse files Browse the repository at this point in the history
…538)

* feat: [CDE-585]: Support creation and attaching of persistent disks

* feat: [CDE-585]: Support creation and attaching of persistent disks

* feat: [CDE-585]: Add more checks

* feat: [CDE-585]: Handle disk already exists case

* feat: [CDE-585]: Remove testing changes

* feat: [CDE-585]: Remove debug changes

* feat: [CDE-585]: Fix storage type

* feat: [CDE-585]: Skip CDE VMs from cleanup

* feat: [CDE-585]: Fix lint

* feat: [CDE-585]: Fix lint

* feat: [CDE-585]: Address comments

* feat: [CDE-585]: Address comments
  • Loading branch information
vikyathharekal authored Feb 3, 2025
1 parent 41b144a commit ab30e8d
Show file tree
Hide file tree
Showing 7 changed files with 185 additions and 42 deletions.
160 changes: 145 additions & 15 deletions app/drivers/google/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package google

import (
"context"
"encoding/json"
"errors"
"fmt"
"math/rand"
Expand Down Expand Up @@ -295,13 +296,29 @@ func (p *config) create(ctx context.Context, opts *types.InstanceCreateOpts, nam
}
}

requestID := uuid.New().String()
op, err := p.insertInstance(ctx, p.projectID, zone, requestID, in)
if opts.StorageOpts.Identifier != "" {
operations, attachDiskErr := p.attachPersistentDisk(ctx, opts, in, zone)
if attachDiskErr != nil {
logr.WithError(attachDiskErr).Errorln("google: failed to attach persistent disk")
return nil, attachDiskErr
}
for _, operation := range operations {
if operation != nil {
// Disk not present, wait for creation
err = p.waitZoneOperation(ctx, operation.Name, zone)
if err != nil {
logr.WithError(err).Errorln("google: persistent disk creation operation failed")
return nil, err
}
}
}
}

op, err := p.insertInstance(ctx, p.projectID, zone, uuid.New().String(), in)
if err != nil {
logr.WithError(err).Errorln("google: failed to provision VM")
return nil, err
}

err = p.waitZoneOperation(ctx, op.Name, zone)
if err != nil {
logr.WithError(err).Errorln("instance insert operation failed")
Expand All @@ -321,7 +338,11 @@ func (p *config) create(ctx context.Context, opts *types.InstanceCreateOpts, nam
return nil, err
}

instanceMap := p.mapToInstance(vm, zone, opts, enableNestedVirtualization)
instanceMap, err := p.mapToInstance(vm, zone, opts, enableNestedVirtualization)
if err != nil {
logr.WithError(err).Errorln("google: failed to map VM to instance")
return nil, err
}
logr.
WithField("ip", instanceMap.Address).
WithField("time", fmt.Sprintf("%.2fs", time.Since(startTime).Seconds())).
Expand All @@ -330,6 +351,49 @@ func (p *config) create(ctx context.Context, opts *types.InstanceCreateOpts, nam
return &instanceMap, nil
}

func (p *config) attachPersistentDisk(
ctx context.Context,
opts *types.InstanceCreateOpts,
in *compute.Instance,
diskZone string,
) ([]*compute.Operation, error) {
storageIdentifiers := strings.Split(opts.StorageOpts.Identifier, ",")
var operations []*compute.Operation
for i, diskName := range storageIdentifiers {
requestID := uuid.New().String()
diskType := fmt.Sprintf("projects/%s/zones/%s/diskTypes/%s", p.projectID, diskZone, opts.StorageOpts.Type)
diskSize, err := strconv.ParseInt(opts.StorageOpts.Size, 10, 64)
if err != nil {
return nil, fmt.Errorf("error converting string to int64: %v", err)
}
persistentDisk := &compute.Disk{
Name: diskName,
SizeGb: diskSize,
Type: diskType,
Zone: diskZone,
}
op, err := p.createPersistentDiskIfNotExists(ctx, p.projectID, diskZone, requestID, persistentDisk)
if err != nil {
return nil, err
}
if op != nil {
// this means we have submitted disk creation request(s)
operations = append(operations, op)
}

// attach to instance
attachedDisk := &compute.AttachedDisk{
DeviceName: fmt.Sprintf("disk-%d", i),
Boot: false,
Type: "PERSISTENT",
Source: "projects/" + p.projectID + "/zones/" + diskZone + "/disks/" + diskName,
Mode: "READ_WRITE",
}
in.Disks = append(in.Disks, attachedDisk)
}
return operations, nil
}

// Set the instance metadata (not network tags)
func (p *config) SetTags(ctx context.Context, instance *types.Instance, tags map[string]string) error {
logr := logger.FromContext(ctx).
Expand Down Expand Up @@ -376,7 +440,8 @@ func (p *config) Destroy(ctx context.Context, instances []*types.Instance) (err
return p.DestroyInstanceAndStorage(ctx, instances, nil)
}

func (p *config) DestroyInstanceAndStorage(ctx context.Context, instances []*types.Instance, _ *storage.CleanupType) (err error) {
func (p *config) DestroyInstanceAndStorage(ctx context.Context, instances []*types.Instance, storageCleanupType *storage.CleanupType) error {
var err error
if len(instances) == 0 {
return errors.New("no instances provided")
}
Expand All @@ -394,21 +459,53 @@ func (p *config) DestroyInstanceAndStorage(ctx context.Context, instances []*typ
}
}

requestID := uuid.New().String()

_, err = p.deleteInstance(ctx, p.projectID, zone, instance.ID, requestID)
if err != nil {
instanceDeleteOperation, deleteInstanceErr := p.deleteInstance(ctx, p.projectID, zone, instance.ID, uuid.New().String())
if deleteInstanceErr != nil {
// https://github.com/googleapis/google-api-go-client/blob/master/googleapi/googleapi.go#L135
if gerr, ok := err.(*googleapi.Error); ok &&
if gerr, ok := deleteInstanceErr.(*googleapi.Error); ok &&
gerr.Code == http.StatusNotFound {
logr.WithError(err).Errorln("google: VM not found")
logr.WithError(deleteInstanceErr).Errorln("google: VM not found")
} else {
logr.WithError(err).Errorln("google: failed to delete the VM")
logr.WithError(deleteInstanceErr).Errorln("google: failed to delete the VM")
}
}
err = deleteInstanceErr
logr.Info("google: sent delete instance request")

if storageCleanupType != nil && *storageCleanupType == storage.Delete && instance.StorageIdentifier != "" {
logr.Info("google: waiting for instance deletion")
err = p.waitZoneOperation(ctx, instanceDeleteOperation.Name, zone)
if err != nil {
logr.WithError(err).Errorln("google: could not delete instance. skipping disk deletion")
return err
}
logr.Info("google: deleting persistent disk")
storageIdentifiers := strings.Split(instance.StorageIdentifier, ",")
for _, storageIdentifier := range storageIdentifiers {
diskDeleteOperation, diskDeletionErr := p.deletePersistentDisk(
ctx,
p.projectID,
zone,
storageIdentifier,
uuid.New().String(),
)
if diskDeletionErr != nil {
var googleErr *googleapi.Error
if errors.As(diskDeletionErr, &googleErr) &&
googleErr.Code == http.StatusNotFound {
logr.WithError(diskDeletionErr).
Errorln("google: persistent disk %s not found", storageIdentifier)
}
}
err = p.waitZoneOperation(ctx, diskDeleteOperation.Name, zone)
if err != nil {
logr.WithError(err).Errorln("google: could not delete persistent disk %s", storageIdentifier)
return err
}
}
}
}
return
return err
}

func (p *config) Hibernate(ctx context.Context, instanceID, _ string) error {
Expand Down Expand Up @@ -514,7 +611,33 @@ func (p *config) deleteInstance(ctx context.Context, projectID, zone, instanceID
})
}

func (p *config) mapToInstance(vm *compute.Instance, zone string, opts *types.InstanceCreateOpts, enableNestedVitualization bool) types.Instance {
func (p *config) createPersistentDiskIfNotExists(ctx context.Context, projectID, zone, requestID string, disk *compute.Disk) (*compute.Operation, error) {
// Check if the disk already exists
_, err := retry(ctx, getRetries, secSleep, func() (*compute.Disk, error) {
return p.service.Disks.Get(projectID, zone, disk.Name).Context(ctx).Do()
})

var getErr *googleapi.Error
if errors.As(err, &getErr) && getErr.Code == 404 {
// Disk doesn't exist, create it
return retry(ctx, insertRetries, secSleep, func() (*compute.Operation, error) {
return p.service.Disks.Insert(projectID, zone, disk).RequestId(requestID).Context(ctx).Do()
})
} else if err != nil {
return nil, fmt.Errorf("failed to check disk existence: %w", err)
}

// Disk already exists
return nil, nil
}

func (p *config) deletePersistentDisk(ctx context.Context, projectID, zone, diskName, requestID string) (*compute.Operation, error) {
return retry(ctx, deleteRetries, secSleep, func() (*compute.Operation, error) {
return p.service.Disks.Delete(projectID, zone, diskName).RequestId(requestID).Context(ctx).Do()
})
}

func (p *config) mapToInstance(vm *compute.Instance, zone string, opts *types.InstanceCreateOpts, enableNestedVitualization bool) (types.Instance, error) {
network := vm.NetworkInterfaces[0]
instanceIP := ""
if p.privateIP {
Expand All @@ -523,6 +646,11 @@ func (p *config) mapToInstance(vm *compute.Instance, zone string, opts *types.In
instanceIP = network.AccessConfigs[0].NatIP
}

labelsBytes, marshalErr := json.Marshal(opts.Labels)
if marshalErr != nil {
return types.Instance{}, fmt.Errorf("scheduler: could not marshal labels: %v, err: %w", opts.Labels, marshalErr)
}

started, _ := time.Parse(time.RFC3339, vm.CreationTimestamp)
return types.Instance{
ID: strconv.FormatUint(vm.Id, 10),
Expand All @@ -544,7 +672,9 @@ func (p *config) mapToInstance(vm *compute.Instance, zone string, opts *types.In
IsHibernated: false,
Port: lehelper.LiteEnginePort,
EnableNestedVirtualization: enableNestedVitualization,
}
StorageIdentifier: opts.StorageOpts.Identifier,
Labels: labelsBytes,
}, nil
}

func (p *config) findInstanceZone(ctx context.Context, instanceID string) (
Expand Down
1 change: 1 addition & 0 deletions app/drivers/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,7 @@ func (m *Manager) setupInstance(
CephPoolIdentifier: storageConfig.CephPoolIdentifier,
Identifier: storageConfig.Identifier,
Size: storageConfig.Size,
Type: storageConfig.Type,
}
}
createOptions.AutoInjectionBinaryURI = m.autoInjectionBinaryURI
Expand Down
19 changes: 10 additions & 9 deletions command/harness/common.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package harness

type InstanceInfo struct {
ID string `json:"id"`
Name string `json:"name"`
IPAddress string `json:"ip_address"`
Port int64 `json:"port"`
OS string `json:"os"`
Arch string `json:"arch"`
Provider string `json:"provider"`
PoolName string `json:"pool_name"`
Zone string `json:"zone"`
ID string `json:"id"`
Name string `json:"name"`
IPAddress string `json:"ip_address"`
Port int64 `json:"port"`
OS string `json:"os"`
Arch string `json:"arch"`
Provider string `json:"provider"`
PoolName string `json:"pool_name"`
Zone string `json:"zone"`
StorageIdentifier string `json:"storage_identifier"`
}
19 changes: 13 additions & 6 deletions command/harness/delegate/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
errors "github.com/drone-runners/drone-runner-aws/app/types"
"github.com/drone-runners/drone-runner-aws/command/config"
"github.com/drone-runners/drone-runner-aws/command/harness"
"github.com/drone-runners/drone-runner-aws/command/harness/storage"
"github.com/drone-runners/drone-runner-aws/engine/resource"
"github.com/drone-runners/drone-runner-aws/metric"
"github.com/drone-runners/drone-runner-aws/store"
Expand Down Expand Up @@ -235,11 +236,12 @@ func (c *delegateCommand) handleStep(w http.ResponseWriter, r *http.Request) {
func (c *delegateCommand) handleDestroy(w http.ResponseWriter, r *http.Request) {
// TODO: Change the java object to match VmCleanupRequest
rs := &struct {
ID string `json:"id"`
InstanceID string `json:"instance_id"`
PoolID string `json:"pool_id"`
CorrelationID string `json:"correlation_id"`
InstanceInfo harness.InstanceInfo `json:"instance_info"`
ID string `json:"id"`
InstanceID string `json:"instance_id"`
PoolID string `json:"pool_id"`
CorrelationID string `json:"correlation_id"`
InstanceInfo harness.InstanceInfo `json:"instance_info"`
StorageCleanupType storage.CleanupType `json:"storage_cleanup_type"`
}{}
if err := json.NewDecoder(r.Body).Decode(rs); err != nil {
logrus.WithError(err).Error("could not decode VM destroy request body")
Expand All @@ -248,7 +250,12 @@ func (c *delegateCommand) handleDestroy(w http.ResponseWriter, r *http.Request)
}
logrus.Infoln("Received destroy request with taskId " + rs.CorrelationID)

req := &harness.VMCleanupRequest{PoolID: rs.PoolID, StageRuntimeID: rs.ID, InstanceInfo: rs.InstanceInfo}
req := &harness.VMCleanupRequest{
PoolID: rs.PoolID,
StageRuntimeID: rs.ID,
InstanceInfo: rs.InstanceInfo,
StorageCleanupType: rs.StorageCleanupType,
}
req.Context.TaskID = rs.CorrelationID

ctx := r.Context()
Expand Down
7 changes: 4 additions & 3 deletions command/harness/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ func buildInstanceFromRequest(instanceInfo InstanceInfo) *types.Instance { //nol
OS: instanceInfo.OS,
Arch: instanceInfo.Arch,
},
IsHibernated: false,
Port: instanceInfo.Port,
Zone: instanceInfo.Zone,
IsHibernated: false,
Port: instanceInfo.Port,
Zone: instanceInfo.Zone,
StorageIdentifier: instanceInfo.StorageIdentifier,
}
}
19 changes: 10 additions & 9 deletions command/harness/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,15 +192,16 @@ func HandleSetup(

metrics.BuildCount.WithLabelValues(selectedPool, instance.OS, instance.Arch, string(instance.Provider), strconv.FormatBool(poolManager.IsDistributed()), instance.Zone, owner).Inc()
instanceInfo := InstanceInfo{
ID: instance.ID,
Name: instance.Name,
IPAddress: instance.Address,
Port: instance.Port,
OS: platform.OS,
Arch: platform.Arch,
Provider: string(instance.Provider),
PoolName: selectedPool,
Zone: instance.Zone,
ID: instance.ID,
Name: instance.Name,
IPAddress: instance.Address,
Port: instance.Port,
OS: platform.OS,
Arch: platform.Arch,
Provider: string(instance.Provider),
PoolName: selectedPool,
Zone: instance.Zone,
StorageIdentifier: instance.StorageIdentifier,
}
resp := &SetupVMResponse{InstanceID: instance.ID, IPAddress: instance.Address, GitspacesPortMappings: instance.GitspacePortMappings, InstanceInfo: instanceInfo}

Expand Down
2 changes: 2 additions & 0 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ type StorageOpts struct {
CephPoolIdentifier string
Identifier string
Size string
Type string
}

type GitspaceAgentConfig struct {
Expand All @@ -157,4 +158,5 @@ type StorageConfig struct {
CephPoolIdentifier string `json:"ceph_pool_identifier"`
Identifier string `json:"identifier"`
Size string `json:"size"`
Type string `json:"type" default:"pd-balanced"`
}

0 comments on commit ab30e8d

Please sign in to comment.