diff --git a/cmd/admin_cluster.go b/cmd/admin_cluster.go new file mode 100644 index 00000000..9900a828 --- /dev/null +++ b/cmd/admin_cluster.go @@ -0,0 +1,31 @@ +package cmd + +import ( + "os" + + "github.com/spf13/cobra" + + "github.com/qovery/qovery-cli/utils" +) + +var ( + adminClusterCmd = &cobra.Command{ + Use: "cluster", + Short: "Manage clusters", + Run: func(cmd *cobra.Command, args []string) { + utils.Capture(cmd) + + if len(args) == 0 { + _ = cmd.Help() + os.Exit(0) + } + }, + } + // TODO (mzo) add parameter to random deploy clusters + // TODO (mzo) handle pending clusters queue, when clusters couldn't be deployed because not in a final state + // TODO (mzo) handle progression in a file to let resume from the last deployment launched in case of interruption +) + +func init() { + adminCmd.AddCommand(adminClusterCmd) +} diff --git a/cmd/admin_cluster_deploy.go b/cmd/admin_cluster_deploy.go new file mode 100644 index 00000000..d3e164d9 --- /dev/null +++ b/cmd/admin_cluster_deploy.go @@ -0,0 +1,66 @@ +package cmd + +import ( + "os" + + "github.com/spf13/cobra" + + "github.com/qovery/qovery-cli/pkg" + "github.com/qovery/qovery-cli/utils" +) + +var ( + adminClusterDeployCmd = &cobra.Command{ + Use: "deploy", + Short: "Deploy or upgrade clusters", + Run: func(cmd *cobra.Command, args []string) { + deployClusters() + }, + } + refreshDelay int + filters map[string]string + executionMode string + newK8sVersion string + parallelRuns int +) + +func init() { + adminClusterDeployCmd.Flags().BoolVarP(&dryRun, "disable-dry-run", "y", false, "Disable dry run mode") + adminClusterDeployCmd.Flags().IntVarP(¶llelRuns, "parallel-run", "n", 5, "Number of clusters to update in parallel - must be set between 1 and 20") + adminClusterDeployCmd.Flags().IntVarP(&refreshDelay, "refresh-delay", "r", 30, "Time in seconds to wait before checking clusters status during deployment - must be between [5-120]") + adminClusterDeployCmd.Flags().StringToStringVarP(&filters, "filters", "f", make(map[string]string), "Value to filter the property selected (property-to-filter must be set as well)") + adminClusterDeployCmd.Flags().StringVarP(&executionMode, "execution-mode", "e", "batch", "Batch execution mode - 'batch' will wait for the N deployments to be finished and ask validation to continue - 'on-the-fly' will deploy continuously as soon as a slot is available") + adminClusterDeployCmd.Flags().StringVarP(&newK8sVersion, "new-k8s-version", "k", "", "K8S version when upgrading clusters") + adminClusterCmd.AddCommand(adminClusterDeployCmd) + +} + +func deployClusters() { + utils.CheckAdminUrl() + + // if no filters is set, enforce to select only RUNNING clusters to avoid mistakes (e.g deploying a stopped cluster) + _, containsKey := filters["ClusterStatus"] + if !containsKey { + filters["CurrentStatus"] = "DEPLOYED" + } + + listService, err := pkg.NewAdminClusterListServiceImpl(filters) + if err != nil { + utils.PrintlnError(err) + os.Exit(1) + panic("unreachable") // staticcheck false positive: https://staticcheck.io/docs/checks#SA5011 + } + deployService, err := pkg.NewAdminClusterBatchDeployServiceImpl(dryRun, parallelRuns, refreshDelay, executionMode, newK8sVersion) + if err != nil { + utils.PrintlnError(err) + os.Exit(1) + panic("unreachable") // staticcheck false positive: https://staticcheck.io/docs/checks#SA5011 + } + + err = pkg.DeployClustersByBatch(listService, deployService) + if err != nil { + utils.PrintlnError(err) + os.Exit(1) + panic("unreachable") // staticcheck false positive: https://staticcheck.io/docs/checks#SA5011 + } +} diff --git a/cmd/admin_cluster_list.go b/cmd/admin_cluster_list.go new file mode 100644 index 00000000..d8722c47 --- /dev/null +++ b/cmd/admin_cluster_list.go @@ -0,0 +1,42 @@ +package cmd + +import ( + "os" + + "github.com/spf13/cobra" + + "github.com/qovery/qovery-cli/pkg" + "github.com/qovery/qovery-cli/utils" +) + +var ( + adminClusterListCmd = &cobra.Command{ + Use: "list", + Short: "List clusters by applying any filter", + Run: func(cmd *cobra.Command, args []string) { + listClusters() + }, + } +) + +func init() { + adminClusterListCmd.Flags().StringToStringVarP(&filters, "filters", "f", make(map[string]string), "Value to filter the property selected (property-to-filter must be set as well)") + adminClusterCmd.AddCommand(adminClusterListCmd) +} + +func listClusters() { + utils.CheckAdminUrl() + + listService, err := pkg.NewAdminClusterListServiceImpl(filters) + if err != nil { + utils.PrintlnError(err) + os.Exit(1) + panic("unreachable") // staticcheck false positive: https://staticcheck.io/docs/checks#SA5011 + } + err = pkg.ListClusters(listService) + if err != nil { + utils.PrintlnError(err) + os.Exit(1) + panic("unreachable") // staticcheck false positive: https://staticcheck.io/docs/checks#SA5011 + } +} diff --git a/pkg/admin_cluster_deploy_by_batch.go b/pkg/admin_cluster_deploy_by_batch.go new file mode 100644 index 00000000..1c02c07d --- /dev/null +++ b/pkg/admin_cluster_deploy_by_batch.go @@ -0,0 +1,52 @@ +package pkg + +import ( + "fmt" + + "github.com/qovery/qovery-cli/utils" +) + +func DeployClustersByBatch(listService AdminClusterListService, deployService AdminClusterBatchDeployService) error { + clusters, err := listService.SelectClusters() + if err != nil { + return err + } + + utils.Println(fmt.Sprintf("%d clusters to deploy:", len(clusters))) + err = PrintClustersTable(clusters) + if err != nil { + return err + } + + deployService.PrintParameters() + + utils.Println("Do you want to continue deploy process ?") + var validated = utils.Validate("deploy") + if !validated { + utils.Println("Exiting: Validation failed") + return nil + } + + deployResult, err := deployService.Deploy(clusters) + if err != nil { + return err + } + + if len(deployResult.PendingClusters) > 0 { + utils.Println(fmt.Sprintf("%d clusters not triggered because in non-terminal state (queue not implemented yet):", len(deployResult.PendingClusters))) + err := PrintClustersTable(deployResult.PendingClusters) + if err != nil { + return err + } + } + + if len(deployResult.ProcessedClusters) > 0 { + utils.Println(fmt.Sprintf("%d clusters deployed:", len(clusters))) + err := PrintClustersTable(deployResult.ProcessedClusters) + if err != nil { + return err + } + } + + return nil +} diff --git a/pkg/admin_cluster_list.go b/pkg/admin_cluster_list.go new file mode 100644 index 00000000..7691f19b --- /dev/null +++ b/pkg/admin_cluster_list.go @@ -0,0 +1,21 @@ +package pkg + +import ( + "fmt" + + "github.com/qovery/qovery-cli/utils" +) + +func ListClusters(listService AdminClusterListService) error { + clusters, err := listService.SelectClusters() + if err != nil { + return err + } + + utils.Println(fmt.Sprintf("Found %d clusters", len(clusters))) + err = PrintClustersTable(clusters) + if err != nil { + return err + } + return nil +} diff --git a/pkg/admin_cluster_services.go b/pkg/admin_cluster_services.go new file mode 100644 index 00000000..73ffcf05 --- /dev/null +++ b/pkg/admin_cluster_services.go @@ -0,0 +1,463 @@ +package pkg + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "reflect" + "strconv" + "strings" + "time" + + "github.com/qovery/qovery-cli/utils" +) + +// +// DTO + +type ListOfClustersEligibleToUpdate struct { + Results []ClusterDetails +} +type ClusterDetails struct { + OrganizationId string `json:"organization_id"` + OrganizationName string `json:"organization_name"` + OrganizationPlan string `json:"organization_plan"` + ClusterId string `json:"cluster_id"` + ClusterName string `json:"cluster_name"` + ClusterType string `json:"cluster_type"` + ClusterCreatedAt string `json:"cluster_created_at"` + ClusterLastDeployedAt string `json:"cluster_last_deployed_at"` + ClusterK8sVersion string `json:"cluster_k8s_version"` + Mode string `json:"mode"` + IsProduction bool `json:"is_production"` + CurrentStatus string `json:"current_status"` +} + +// PrintClustersTable global method to output clusters table +func PrintClustersTable(clusters []ClusterDetails) error { + var data [][]string + + utils.Println("") + for _, cluster := range clusters { + data = append(data, []string{ + cluster.OrganizationId, + cluster.OrganizationName, + cluster.OrganizationPlan, + cluster.ClusterId, + cluster.ClusterName, + cluster.ClusterType, + cluster.ClusterK8sVersion, + cluster.Mode, + strconv.FormatBool(cluster.IsProduction), + cluster.CurrentStatus, + cluster.ClusterCreatedAt, + cluster.ClusterLastDeployedAt, + }) + } + + err := utils.PrintTable([]string{ + "OrganizationId", + "OrganizationName", + "OrganizationPlan", + "ClusterId", + "ClusterName", + "ClusterType", + "ClusterK8sVersion", + "Mode", + "IsProduction", + "CurrentStatus", + "ClusterCreatedAt", + "ClusterLastDeployedAt", + }, data) + + if err != nil { + return fmt.Errorf("cannot print clusters %s", err) + } + return nil +} + +// Service to list clusters +var allowedFilterProperties = map[string]bool{ + "OrganizationId": true, + "OrganizationName": true, + "OrganizationPlan": true, + "ClusterId": true, + "ClusterName": true, + "ClusterType": true, + "ClusterK8sVersion": true, + "CurrentStatus": true, + "Mode": true, + "IsProduction": true, +} + +type AdminClusterListService interface { + SelectClusters() ([]ClusterDetails, error) +} + +type AdminClusterListServiceImpl struct { + // Filters based on ClusterDetails struct fields (reflection is used to filter fields) + Filters map[string]string +} + +func NewAdminClusterListServiceImpl(filters map[string]string) (*AdminClusterListServiceImpl, error) { + if len(filters) > 0 { + for key := range filters { + _, keyIsPresent := allowedFilterProperties[key] + if !keyIsPresent { + keys := make([]string, len(allowedFilterProperties)) + i := 0 + for k := range allowedFilterProperties { + keys[i] = k + i++ + } + err := fmt.Sprintf("Filter property '%s' not available: valid values are: "+strings.Join(keys, ", "), key) + return nil, fmt.Errorf(err) + } + } + } + + return &AdminClusterListServiceImpl{ + Filters: filters, + }, nil +} + +func (service AdminClusterListServiceImpl) SelectClusters() ([]ClusterDetails, error) { + clustersFetched, err := service.fetchClustersEligibleToUpdate() + if err != nil { + return nil, err + } + clusters := service.filterByPredicates(clustersFetched, service.Filters) + return clusters, nil +} + +func (service AdminClusterListServiceImpl) fetchClustersEligibleToUpdate() ([]ClusterDetails, error) { + tokenType, token, err := utils.GetAccessToken() + if err != nil { + return nil, err + } + + req, err := http.NewRequest(http.MethodGet, utils.AdminUrl+"/listClustersEligibleToUpdate", nil) + if err != nil { + return nil, err + } + + req.Header.Set("Authorization", utils.GetAuthorizationHeaderValue(tokenType, token)) + req.Header.Set("Content-Type", "application/json") + + res, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + if res.StatusCode != 200 { + return nil, fmt.Errorf(fmt.Sprintf("cannot fetch clusters (status_code=%d)", res.StatusCode)) + } + + list := ListOfClustersEligibleToUpdate{} + err = json.NewDecoder(res.Body).Decode(&list) + if err != nil { + return nil, err + } + + return list.Results, nil +} + +func (service AdminClusterListServiceImpl) filterByPredicates(clusters []ClusterDetails, filters map[string]string) []ClusterDetails { + var filteredClusters []ClusterDetails + for _, cluster := range clusters { + var matchAllFilters = true + for filterProperty, filterValue := range filters { + filterValuesSet := service.filterValueToHashSet(filterValue) + clusterProperty := reflect.Indirect(reflect.ValueOf(cluster)).FieldByName(filterProperty) + + // hack for IsProduction field (boolean needs to be converted to string) + if filterProperty == "IsProduction" { + boolToString := strconv.FormatBool(clusterProperty.Bool()) + if _, ok := filterValuesSet[boolToString]; !ok { + matchAllFilters = false + } + } else { + if _, ok := filterValuesSet[clusterProperty.String()]; !ok { + matchAllFilters = false + } + } + + if !matchAllFilters { + break + } + } + + if matchAllFilters { + filteredClusters = append(filteredClusters, cluster) + } + } + return filteredClusters +} + +// filterValueToHashSet Actually it's a hashmap but golang has no hashset +func (service AdminClusterListServiceImpl) filterValueToHashSet(filterValue string) map[string]bool { + splitFilterValue := strings.Split(filterValue, ",") + hashmap := make(map[string]bool, len(splitFilterValue)) + + for _, value := range splitFilterValue { + hashmap[value] = true + } + + return hashmap +} + +// +// Service to deploy clusters + +type ClusterBatchDeployResult struct { + // ProcessedClusters clusters that have been processed, non matter the final state created + ProcessedClusters []ClusterDetails + // PendingClusters clusters in the pending queue (their state were not in ready state) + PendingClusters []ClusterDetails +} + +type AdminClusterBatchDeployService interface { + Deploy(clusters []ClusterDetails) (*ClusterBatchDeployResult, error) + PrintParameters() +} + +type AdminClusterBatchDeployServiceImpl struct { + // DryRunDisabled disable dry run + DryRunDisabled bool + // ParallelRun the number of parallel requests to be processed + ParallelRun int + // RefreshDelay the delay to fetch cluster status in process + RefreshDelay int + // CompleteBatchBeforeContinue to block on N parallel runs to be processed: true = 'batch' mode / false = 'on-the-fly' mode + CompleteBatchBeforeContinue bool + // UpgradeClusterNewK8sVersion indicates next version to trigger a cluster upgrade + UpgradeClusterNewK8sVersion *string + // UpgradeMode indicates if the cluster needs to be upgraded + UpgradeMode bool +} + +func NewAdminClusterBatchDeployServiceImpl( + dryRun bool, + parallelRun int, + refreshDelay int, + executionMode string, + newK8sversionStr string, +) (*AdminClusterBatchDeployServiceImpl, error) { + // set at least 1 parallel run + if parallelRun < 1 { + parallelRun = 1 + } + // set maximum 100 parallel runs + if parallelRun > 100 { + parallelRun = 100 + } + if parallelRun > 20 { + utils.Println("") + utils.Println(fmt.Sprintf("Please increase the cluster engine autoscaler to %d, then type 'yes' to continue", parallelRun)) + var validated = utils.Validate("autoscaler-increase") + if !validated { + utils.Println("Exiting") + return nil, fmt.Errorf("exit on autoscaler validation failed") + } + utils.Println("") + } + + var newK8sVersion *string = nil + var upgradeMode = false + if newK8sversionStr != "" { + newK8sVersion = &newK8sversionStr + upgradeMode = true + } + + var completeBatchBeforeContinue = true + if executionMode == "on-the-fly" && + // Do not authorize "on-the-fly" for upgrade mode, it's too risky + !upgradeMode { + completeBatchBeforeContinue = false + } + + return &AdminClusterBatchDeployServiceImpl{ + DryRunDisabled: dryRun, + ParallelRun: parallelRun, + RefreshDelay: refreshDelay, + CompleteBatchBeforeContinue: completeBatchBeforeContinue, + UpgradeClusterNewK8sVersion: newK8sVersion, + UpgradeMode: upgradeMode, + }, nil +} + +func (service AdminClusterBatchDeployServiceImpl) PrintParameters() { + utils.Println("-------------------------------------------") + utils.Println(fmt.Sprintf("- DryRunDisabled: %t", service.DryRunDisabled)) + utils.Println(fmt.Sprintf("- ParallelRun: %d", service.ParallelRun)) + utils.Println(fmt.Sprintf("- RefreshDelay: %d", service.RefreshDelay)) + utils.Println(fmt.Sprintf("- BatchMode: %t", service.CompleteBatchBeforeContinue)) + if service.UpgradeMode { + utils.Println(fmt.Sprintf("- UpgradeMode: true (NewK8sVersion = : %s)", *service.UpgradeClusterNewK8sVersion)) + } else { + utils.Println("- UpgradeMode: false") + } + utils.Println("-------------------------------------------") +} +func (service AdminClusterBatchDeployServiceImpl) Deploy(clusters []ClusterDetails) (*ClusterBatchDeployResult, error) { + if !service.DryRunDisabled { + utils.Println("dry-run-disabled is false: following information is purely indicative, no cluster will be deployed at all") + } + + // store final state of clusters in a hashmap + var processedClusters []ClusterDetails + // store the current status for each cluster deployed, to be able to execute next parallel runs + var currentDeployingClustersByClusterId = make(map[string]ClusterDetails) + // clusters having a non-terminal state when trying to deploy them + var pendingClustersById []ClusterDetails + + var indexCurrentClusterToDeploy = -1 + for { + tokenType, token, err := utils.GetAccessToken() + if err != nil { + return nil, err + } + client := utils.GetQoveryClient(tokenType, token) + + // boolean to wait for current batch to continue, according to 'execution-mode' command flag + var waitToTriggerCluster = false + if service.CompleteBatchBeforeContinue && indexCurrentClusterToDeploy != -1 { + if len(currentDeployingClustersByClusterId) > 0 { + waitToTriggerCluster = true + } else { + utils.Println(fmt.Sprintf("Do you want to continue next batch of %d deployments ?", service.ParallelRun)) + var validated = utils.Validate("deploy") + if !validated { + utils.Println("Exiting") + return nil, fmt.Errorf("user stopped the command after batch terminated") + } + } + } + + // if enough space to start a new cluster deployment + if !waitToTriggerCluster && len(currentDeployingClustersByClusterId) < service.ParallelRun && indexCurrentClusterToDeploy < len(clusters)-1 { + // fill the hashmap according to parallel runs + for i := len(currentDeployingClustersByClusterId); i < service.ParallelRun; i++ { + indexCurrentClusterToDeploy += 1 + + // check status in case a deployment has occurred in the meantime + var cluster = clusters[indexCurrentClusterToDeploy] + clusterStatus, response, err := client.ClustersAPI.GetClusterStatus(context.Background(), cluster.OrganizationId, cluster.ClusterId).Execute() + if response.StatusCode > 200 || err != nil { + return nil, err + } + + // Trigger a deployment only when the target status is in terminal state + if utils.IsTerminalClusterState(*clusterStatus.Status) { + utils.Println(fmt.Sprintf("[Organization '%s' - Cluster '%s'] - Starting deployment - https://console.qovery.com/organization/%s/cluster/%s/logs", cluster.OrganizationName, cluster.ClusterName, cluster.OrganizationId, cluster.ClusterId)) + if service.DryRunDisabled { + var err error + if service.UpgradeClusterNewK8sVersion != nil { + err = service.upgradeCluster(cluster.ClusterId, *service.UpgradeClusterNewK8sVersion) + } else { + err = service.deployCluster(cluster.ClusterId) + } + if err != nil { + utils.Println(fmt.Sprintf("[Organization '%s' - Cluster '%s'] - Error on deploy: %s ", cluster.OrganizationName, cluster.ClusterName, err)) + } + } + cluster.CurrentStatus = "DEPLOYING" + currentDeployingClustersByClusterId[cluster.ClusterId] = cluster + } else { + var status = fmt.Sprintf("%v", *clusterStatus.Status) // only solution to get the underlying enum's string value + utils.Println(fmt.Sprintf("[Organization '%s' - Cluster '%s'] - Cluster's state is '%s' (not a terminal state), sending it to waiting queue to be processed later", cluster.OrganizationName, cluster.ClusterName, status)) + pendingClustersById = append(pendingClustersById, cluster) + } + + // if last cluster has been reached, break + if indexCurrentClusterToDeploy == len(clusters)-1 { + break + } + } + } + + // sleep some time before fetching statuses + if service.DryRunDisabled { + time.Sleep(time.Duration(service.RefreshDelay) * time.Second) + } else { + time.Sleep(time.Duration(1) * time.Second) + } + + // wait for clusters statuses + var clustersToRemoveFromMap []string + for clusterId, cluster := range currentDeployingClustersByClusterId { + clusterStatus, response, err := client.ClustersAPI.GetClusterStatus(context.Background(), cluster.OrganizationId, cluster.ClusterId).Execute() + if response.StatusCode > 200 || err != nil { + return nil, err + } + + // set cluster status + var status = fmt.Sprintf("%v", *clusterStatus.Status) // only solution to get the underlying enum's string value + cluster.CurrentStatus = status + // Mark the deployment as finished only if terminal state OR status is "INTERNAL_ERROR" (specific case) + if utils.IsTerminalClusterState(*clusterStatus.Status) || cluster.CurrentStatus == "INTERNAL_ERROR" { + utils.Println(fmt.Sprintf("[Organization '%s' - Cluster '%s'] - Cluster deployed with '%s' status ", cluster.OrganizationName, cluster.ClusterName, *clusterStatus.Status)) + + processedClusters = append(processedClusters, cluster) + clustersToRemoveFromMap = append(clustersToRemoveFromMap, clusterId) + } + } + + // remove deployed clusters + for _, clusterId := range clustersToRemoveFromMap { + delete(currentDeployingClustersByClusterId, clusterId) + } + + // check if every cluster has been deployed + if len(currentDeployingClustersByClusterId) == 0 && indexCurrentClusterToDeploy == len(clusters)-1 { + break + } + } + + utils.Println("No more deployment to process") + + return &ClusterBatchDeployResult{ + ProcessedClusters: processedClusters, + PendingClusters: pendingClustersById, + }, nil +} + +func (service AdminClusterBatchDeployServiceImpl) deployCluster(clusterId string) error { + response := deploy(utils.AdminUrl+"/cluster/deploy/"+clusterId, http.MethodPost, true) + if !strings.Contains(response.Status, "200") { + result, _ := io.ReadAll(response.Body) + return fmt.Errorf("could not deploy cluster : %s. %s", response.Status, string(result)) + } + return nil +} + +func (service AdminClusterBatchDeployServiceImpl) upgradeCluster(clusterId string, targetVersion string) error { + tokenType, token, err := utils.GetAccessToken() + if err != nil { + utils.PrintlnError(err) + os.Exit(0) + } + + body := bytes.NewBuffer([]byte(fmt.Sprintf("{ \"metadata\": { \"dry_run_deploy\": false, \"target_version\": \"%s\" } }", targetVersion))) + request, err := http.NewRequest(http.MethodPost, utils.AdminUrl+"/cluster/update/"+clusterId, body) + if err != nil { + return err + } + + request.Header.Set("Authorization", utils.GetAuthorizationHeaderValue(tokenType, token)) + request.Header.Set("Content-Type", "application/json") + + response, err := http.DefaultClient.Do(request) + if err != nil { + return err + } + + if !strings.Contains(response.Status, "200") { + result, _ := io.ReadAll(response.Body) + return fmt.Errorf("could not deploy cluster : %s. %s", response.Status, string(result)) + } + return nil +}