Skip to content

Commit

Permalink
cachemanager fixes (#1277)
Browse files Browse the repository at this point in the history
  • Loading branch information
anhowe authored Jul 16, 2021
1 parent 3d3fb8d commit d5d49cc
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 30 deletions.
5 changes: 3 additions & 2 deletions src/go/cmd/cachewarmer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ On the controller or jumpbox, execute the following steps
```bash
export BOOTSTRAP_PATH=/nfs/node0
export STORAGE_ACCOUNT_RESOURCE_GROUP=
export STORAGE_ACCOUNT=
export STORAGE_KEY=''
export QUEUE_PREFIX=
export BOOTSTRAP_EXPORT_PATH=/nfs1data
Expand Down Expand Up @@ -136,6 +136,7 @@ export STORAGE_ACCOUNT=
export STORAGE_KEY=''
export QUEUE_PREFIX=
```

2. Run the following script:
```bash
bash /nfs/node0/bootstrap/bootstrap.cachewarmer-worker.sh
Expand All @@ -146,5 +147,5 @@ bash /nfs/node0/bootstrap/bootstrap.cachewarmer-worker.sh
To submit a job, run a command similar to the following command, where the warm target variables are the Avere junction to warm:

```bash
sudo /usr/local/bin/cachewarmer-jobsubmitter -enableDebugging -storageAccountName "STORAGEACCOUNTREPLACE" -storageKey "STORAGEKEYREPLACE" -queueNamePrefix "QUEUEPREFIXREPLACE" -warmTargetExportPath "/nfs1data" -warmTargetMountAddresses "10.0.1.11,10.0.1.12,10.0.1.13" -warmTargetPath "/island"
sudo /usr/local/bin/cachewarmer-jobsubmitter -enableDebugging -storageAccountResourceGroup "STORAGERGREPLACE" -storageAccountName "STORAGEACCOUNTREPLACE" -queueNamePrefix "QUEUEPREFIXREPLACE" -warmTargetExportPath "/nfs1data" -warmTargetMountAddresses "10.0.1.11,10.0.1.12,10.0.1.13" -warmTargetPath "/island"
```
18 changes: 12 additions & 6 deletions src/go/cmd/cachewarmer/cachewarmer-jobsubmitter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ func initializeApplicationVariables(ctx context.Context) (*cachewarmer.WarmPathJ

var maxFileSizeBytes = flag.Int64("maxFileSizeBytes", 0, "the maximum file size in bytes to warm.")

var storageAccountResourceGroup = flag.String("storageAccountResourceGroup", "", "the storage account resource group")
var storageAccount = flag.String("storageAccountName", "", "the storage account name to host the queue")
var storageKey = flag.String("storageKey", "", "the storage key to access the queue")
var queueNamePrefix = flag.String("queueNamePrefix", "", "the queue name to be used for organizing the work. The queues will be created automatically")

var blockUntilWarm = flag.Bool("blockUntilWarm", false, "the job submitter will not return until there are no more jobs")
Expand Down Expand Up @@ -72,14 +72,14 @@ func initializeApplicationVariables(ctx context.Context) (*cachewarmer.WarmPathJ
os.Exit(1)
}

if len(*storageAccount) == 0 {
fmt.Fprintf(os.Stderr, "ERROR: storageAccount is not specified\n")
if len(*storageAccountResourceGroup) == 0 {
fmt.Fprintf(os.Stderr, "ERROR: storageAccountResourceGroup is not specified\n")
usage()
os.Exit(1)
}

if len(*storageKey) == 0 {
fmt.Fprintf(os.Stderr, "ERROR: storageKey is not specified\n")
if len(*storageAccount) == 0 {
fmt.Fprintf(os.Stderr, "ERROR: storageAccount is not specified\n")
usage()
os.Exit(1)
}
Expand All @@ -96,6 +96,12 @@ func initializeApplicationVariables(ctx context.Context) (*cachewarmer.WarmPathJ
os.Exit(1)
}

primaryKey, err := cachewarmer.GetPrimaryStorageKey(ctx, *storageAccountResourceGroup, *storageAccount)
if err != nil {
fmt.Fprintf(os.Stderr, "ERROR: unable to get storage account key: %s", err)
os.Exit(1)
}

warmJobPath := cachewarmer.InitializeWarmPathJob(
*warmTargetMountAddresses,
*warmTargetExportPath,
Expand All @@ -107,7 +113,7 @@ func initializeApplicationVariables(ctx context.Context) (*cachewarmer.WarmPathJ
cacheWarmerQueues, err := cachewarmer.InitializeCacheWarmerQueues(
ctx,
*storageAccount,
*storageKey,
primaryKey,
*queueNamePrefix)
if err != nil {
fmt.Fprintf(os.Stderr, "ERROR: error initializing queue %v\n", err)
Expand Down
22 changes: 15 additions & 7 deletions src/go/cmd/cachewarmer/cachewarmer-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ func initializeApplicationVariables(ctx context.Context) *cachewarmer.WarmPathMa
var bootstrapMountAddress = flag.String("bootstrapMountAddress", "", "the mount address that hosts the worker bootstrap script")
var bootstrapExportPath = flag.String("bootstrapExportPath", "", "the export path that hosts the worker bootstrap script")
var bootstrapScriptPath = flag.String("bootstrapScriptPath", "", "the path to the worker bootstrap script")
var workerCount = flag.Int64("workerCount", 12, "the worker count to warm the cache")

var storageAccountResourceGroup = flag.String("storageAccountResourceGroup", "", "the storage account resource group")
var storageAccount = flag.String("storageAccountName", "", "the storage account name to host the queue")
var storageKey = flag.String("storageKey", "", "the storage key to access the queue")
var queueNamePrefix = flag.String("queueNamePrefix", "", "the queue name to be used for organizing the work. The queues will be created automatically")

var vmssUserName = flag.String("vmssUserName", "", "the username for the vmss vms")
Expand Down Expand Up @@ -70,14 +71,14 @@ func initializeApplicationVariables(ctx context.Context) *cachewarmer.WarmPathMa
os.Exit(1)
}

if len(*storageAccount) == 0 {
fmt.Fprintf(os.Stderr, "ERROR: storageAccount is not specified\n")
if len(*storageAccountResourceGroup) == 0 {
fmt.Fprintf(os.Stderr, "ERROR: storageAccountResourceGroup is not specified\n")
usage()
os.Exit(1)
}

if len(*storageKey) == 0 {
fmt.Fprintf(os.Stderr, "ERROR: storageKey is not specified\n")
if len(*storageAccount) == 0 {
fmt.Fprintf(os.Stderr, "ERROR: storageAccount is not specified\n")
usage()
os.Exit(1)
}
Expand Down Expand Up @@ -112,10 +113,16 @@ func initializeApplicationVariables(ctx context.Context) *cachewarmer.WarmPathMa
os.Exit(1)
}

primaryKey, err := cachewarmer.GetPrimaryStorageKey(ctx, *storageAccountResourceGroup, *storageAccount)
if err != nil {
fmt.Fprintf(os.Stderr, "ERROR: unable to get storage account key: %s", err)
os.Exit(1)
}

cacheWarmerQueues, err := cachewarmer.InitializeCacheWarmerQueues(
ctx,
*storageAccount,
*storageKey,
primaryKey,
*queueNamePrefix)
if err != nil {
fmt.Fprintf(os.Stderr, "ERROR: error initializing queue %v\n", err)
Expand All @@ -124,6 +131,7 @@ func initializeApplicationVariables(ctx context.Context) *cachewarmer.WarmPathMa

return cachewarmer.InitializeWarmPathManager(
azureClients,
*workerCount,
cacheWarmerQueues,
*bootstrapMountAddress,
*bootstrapExportPath,
Expand All @@ -133,7 +141,7 @@ func initializeApplicationVariables(ctx context.Context) *cachewarmer.WarmPathMa
*vmssSshPublicKey,
*vmssSubnetName,
*storageAccount,
*storageKey,
primaryKey,
*queueNamePrefix,
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ LimitNOFILE=16384
Restart=always
RestartSec=2

ExecStart=/usr/local/bin/cachewarmer-manager -storageAccountName "STORAGE_ACCOUNT_REPLACE" -storageKey "STORAGE_KEY_REPLACE" -queueNamePrefix "QUEUE_PREFIX_REPLACE" -bootstrapExportPath "BOOTSTRAP_EXPORT_PATH_REPLACE" -bootstrapMountAddress "BOOTSTRAP_MOUNT_ADDRESS_REPLACE" -bootstrapScriptPath "BOOTSTRAP_SCRIPT_PATH_REPLACE" -vmssUserName "VMSS_USERNAME_REPLACE" VMSS_SSH_PUBLIC_KEY_REPLACE VMSS_PASSWORD_REPLACE VMSS_SUBNET_NAME_REPLACE
ExecStart=/usr/local/bin/cachewarmer-manager -storageAccountResourceGroup "STORAGE_RG_REPLACE" -storageAccountName "STORAGE_ACCOUNT_REPLACE"-queueNamePrefix "QUEUE_PREFIX_REPLACE" -bootstrapExportPath "BOOTSTRAP_EXPORT_PATH_REPLACE" -bootstrapMountAddress "BOOTSTRAP_MOUNT_ADDRESS_REPLACE" -bootstrapScriptPath "BOOTSTRAP_SCRIPT_PATH_REPLACE" -vmssUserName "VMSS_USERNAME_REPLACE" VMSS_SSH_PUBLIC_KEY_REPLACE VMSS_PASSWORD_REPLACE VMSS_SUBNET_NAME_REPLACE

# make sure log directory exists and owned by syslog
PermissionsStartOnly=true
Expand Down
20 changes: 13 additions & 7 deletions src/go/pkg/cachewarmer/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,25 @@ const (
MountRetrySleepSeconds = 10

// this size is the most common, and will stand up the fastest
VMSSNodeSize = "Standard_D2s_v3"
VmssName = "cwvmss"
NodesPerNFSMountAddress = 2
VMSSNodeSize = "Standard_D2s_v3"
VmssName = "cwvmss"

/* by default Ubuntu doesn't install NFS and we need a distro with NFS installed by default for airgapped environments
MarketPlacePublisher = "Canonical"
MarketPlaceOffer = "UbuntuServer"
MarketPlaceSku = "18.04-LTS"
/*PlanName = ""
PlanPublisherName = ""
PlanProductName = ""
PlanName = ""
PlanPublisherName = ""
PlanProductName = ""
*/

// the controller will work in an airgapped environment
MarketPlacePublisher = "microsoft-avere"
MarketPlaceOffer = "vfxt"
MarketPlaceSku = "avere-vfxt-controller"
PlanName = "avere-vfxt-controller"
PlanPublisherName = "microsoft-avere"
PlanProductName = "vfxt"*/
PlanProductName = "vfxt"

tick = time.Duration(1) * time.Millisecond // 1ms
timeBetweenJobCheck = time.Duration(2) * time.Second // 2 seconds between checking for jobs
Expand All @@ -66,4 +70,6 @@ const (

WorkerMultiplier = 2
MinimumJobsBeforeRefill = 100

SubscriptionIdEnvVar = "AZURE_SUBSCRIPTION_ID"
)
41 changes: 41 additions & 0 deletions src/go/pkg/cachewarmer/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,51 @@ package cachewarmer
import (
"context"
"fmt"
"os"

"github.com/Azure/Avere/src/go/pkg/azure"
"github.com/Azure/azure-sdk-for-go/profiles/2020-09-01/storage/mgmt/storage"
"github.com/Azure/go-autorest/autorest/azure/auth"
)

func GetSubscriptionID() (string, error) {
// try environment
if v := os.Getenv(SubscriptionIdEnvVar); v != "" {
return v, nil
}

// try az cli file
if fileSettings, err := auth.GetSettingsFromFile(); err == nil && fileSettings.GetSubscriptionID() != "" {
return fileSettings.GetSubscriptionID(), nil
}

// try metadata
if computeMetadata, err := GetComputeMetadata(); err == nil {
return computeMetadata.SubscriptionId, nil
}

return "", fmt.Errorf("unable to get subscription from env var '%s', az cli login file, or instance meta data. Set the environment variable '%s' or run 'az login' to resolve", SubscriptionIdEnvVar, SubscriptionIdEnvVar)
}

func GetPrimaryStorageKey(ctx context.Context, resourceGroup string, accountName string) (string, error) {
subscriptionId, err := GetSubscriptionID()
if err != nil {
return "", err
}
authorizer, err := auth.NewAuthorizerFromEnvironment()
if err != nil {
return "", fmt.Errorf("ERROR: authorizer from environment failed: %s", err)
}
accountsClient := storage.NewAccountsClient(subscriptionId)
accountsClient.Authorizer = authorizer
response, err := accountsClient.ListKeys(ctx, resourceGroup, accountName)
if err != nil {
return "", err
}

return *(((*response.Keys)[0]).Value), nil
}

type CacheWarmerQueues struct {
jobQueue *azure.Queue
workQueue *azure.Queue
Expand Down
14 changes: 14 additions & 0 deletions src/go/pkg/cachewarmer/vmss.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ func createCacheWarmerVmssModel(
publisher string,
offer string,
sku string,
planName string,
planPublisher string,
planProduct string,
priority compute.VirtualMachinePriorityTypes,
evictionPolicy compute.VirtualMachineEvictionPolicyTypes,
subnetId string,
Expand All @@ -236,6 +239,16 @@ func createCacheWarmerVmssModel(
}
}

var computePlan *compute.Plan
computePlan = nil
if len(planName) == 0 || len(planPublisher) == 0 || len(planProduct) == 0 {
computePlan = &compute.Plan{
Name: to.StringPtr(planName),
Publisher: to.StringPtr(planPublisher),
Product: to.StringPtr(planProduct),
}
}

// create the vmss model
return compute.VirtualMachineScaleSet{
Name: to.StringPtr(vmssName),
Expand All @@ -244,6 +257,7 @@ func createCacheWarmerVmssModel(
Name: to.StringPtr(vmssSKU),
Capacity: to.Int64Ptr(nodeCount),
},
Plan: computePlan,
VirtualMachineScaleSetProperties: &compute.VirtualMachineScaleSetProperties{
Overprovision: to.BoolPtr(false),
UpgradePolicy: &compute.UpgradePolicy{
Expand Down
17 changes: 10 additions & 7 deletions src/go/pkg/cachewarmer/warmpathmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
// WarmPathManager contains the information for the manager
type WarmPathManager struct {
AzureClients *AzureClients
WorkerCount int64
Queues *CacheWarmerQueues
bootstrapMountAddress string
bootstrapExportPath string
Expand All @@ -37,6 +38,7 @@ type WarmPathManager struct {
// InitializeWarmPathManager initializes the job submitter structure
func InitializeWarmPathManager(
azureClients *AzureClients,
workerCount int64,
queues *CacheWarmerQueues,
bootstrapMountAddress string,
bootstrapExportPath string,
Expand All @@ -50,6 +52,7 @@ func InitializeWarmPathManager(
queueNamePrefix string) *WarmPathManager {
return &WarmPathManager{
AzureClients: azureClients,
WorkerCount: workerCount,
Queues: queues,
bootstrapMountAddress: bootstrapMountAddress,
bootstrapExportPath: bootstrapExportPath,
Expand Down Expand Up @@ -310,8 +313,7 @@ func (m *WarmPathManager) RunVMSSManager(ctx context.Context, syncWaitGroup *syn
if workerJob == nil {
continue
}
mountCount := len(workerJob.WarmTargetMountAddresses)
m.EnsureVmssRunning(ctx, mountCount)
m.EnsureVmssRunning(ctx)
lastJobSeen = time.Now()
}
lastReadQueueSuccess = time.Now()
Expand All @@ -320,7 +322,7 @@ func (m *WarmPathManager) RunVMSSManager(ctx context.Context, syncWaitGroup *syn
}
}

func (m *WarmPathManager) EnsureVmssRunning(ctx context.Context, mountCount int) {
func (m *WarmPathManager) EnsureVmssRunning(ctx context.Context) {
vmssExists, err := VmssExists(ctx, m.AzureClients, VmssName)
if err != nil {
log.Error.Printf("checking VMSS existence failed with error %v", err)
Expand Down Expand Up @@ -357,26 +359,27 @@ func (m *WarmPathManager) EnsureVmssRunning(ctx context.Context, mountCount int)
return
}

vmssCount := int64(mountCount * NodesPerNFSMountAddress)

cacheWarmerVmss := createCacheWarmerVmssModel(
VmssName, // vmssName string,
m.AzureClients.LocalMetadata.Location, // location string,
VMSSNodeSize, // vmssSKU string,
vmssCount, // nodeCount int64,
m.WorkerCount, // nodeCount int64,
m.vmssUserName, // userName string,
m.vmssPassword, // password string,
m.vmssSshPublicKey, // sshKeyData string,
MarketPlacePublisher, // publisher string,
MarketPlaceOffer, // offer string,
MarketPlaceSku, // sku string,
PlanName, // planName string,
PlanPublisherName, // planPublisherName string,
PlanProductName, // planProductName string,
compute.Spot, // priority compute.VirtualMachinePriorityTypes,
compute.Delete, // evictionPolicy compute.VirtualMachineEvictionPolicyTypes
vmssSubnetId, // subnetId string
customData,
)

log.Info.Printf("create VMSS with %d workers", vmssCount)
log.Info.Printf("create VMSS with %d workers", m.WorkerCount)
if _, err := CreateVmss(ctx, m.AzureClients, cacheWarmerVmss); err != nil {
log.Error.Printf("error creating vmss: %v", err)
return
Expand Down

0 comments on commit d5d49cc

Please sign in to comment.