From 7689999413e4d2ea6265ab1d1cb69c4cc5eae168 Mon Sep 17 00:00:00 2001 From: juls0730 <62722391+juls0730@users.noreply.github.com> Date: Fri, 13 Dec 2024 03:33:04 -0600 Subject: [PATCH] fix steaming, stale data, proxy bugs and more --- .gitignore | 2 +- README.md | 3 + cmd/flux/main.go | 724 ++++++++++++++++++++++++------------------- go.mod | 1 + go.sum | 2 + pkg/responses.go | 2 - server/app.go | 101 +++--- server/container.go | 158 ++++++++-- server/deploy.go | 200 ++++++------ server/deployment.go | 132 ++------ server/proxy.go | 30 +- server/schema.sql | 1 + server/server.go | 6 - 13 files changed, 751 insertions(+), 611 deletions(-) diff --git a/.gitignore b/.gitignore index bffa5f4..d976da8 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,4 @@ fluxd flux !cmd/flux !cmd/fluxd -fluxdd/ \ No newline at end of file +fluxdd* \ No newline at end of file diff --git a/README.md b/README.md index 25010c6..6bbceed 100644 --- a/README.md +++ b/README.md @@ -31,6 +31,9 @@ sudo ./fluxd FLUXD_ROOT_DIR=$PWD/fluxdd zqdgr run:daemon ``` +> [!IMPORTANT] +> CGO is required to build the daemon due to the use of [mattn/go-sqlite3](https://github.com/mattn/go-sqlite3) + ### CLI Install the CLI using the following command: diff --git a/cmd/flux/main.go b/cmd/flux/main.go index 777bfa4..d8bad12 100644 --- a/cmd/flux/main.go +++ b/cmd/flux/main.go @@ -20,6 +20,7 @@ import ( "sync" "time" + "github.com/agnivade/levenshtein" "github.com/briandowns/spinner" "github.com/juls0730/flux/pkg" ) @@ -178,18 +179,18 @@ func getProjectName(command string, args []string) (string, error) { if len(args) == 0 { if _, err := os.Stat("flux.json"); err != nil { - return "", fmt.Errorf("Usage: flux %[1]s , or run flux %[1]s in the project directory", command) + return "", fmt.Errorf("usage: flux %[1]s , or run flux %[1]s in the project directory", command) } fluxConfigFile, err := os.Open("flux.json") if err != nil { - return "", fmt.Errorf("Failed to open flux.json: %v", err) + return "", fmt.Errorf("failed to open flux.json: %v", err) } defer fluxConfigFile.Close() var config pkg.ProjectConfig if err := json.NewDecoder(fluxConfigFile).Decode(&config); err != nil { - return "", fmt.Errorf("Failed to decode flux.json: %v", err) + return "", fmt.Errorf("failed to decode flux.json: %v", err) } projectName = config.Name @@ -248,415 +249,490 @@ func (w *CustomStdout) Printf(format string, a ...interface{}) (n int, err error return w.Write([]byte(str)) } -var helpStr = `Usage: - flux +func DeployCommand(seekingHelp bool, config Config, info pkg.Info, loadingSpinner *spinner.Spinner, spinnerWriter *CustomSpinnerWriter, args []string) error { + if seekingHelp { + fmt.Println(`Usage: + flux deploy + + Flux will deploy the app in the current directory, and start routing traffic to it.`) + return nil + } -Available Commands: - init Initialize a new project - deploy Deploy a new version of the app - stop Stop a container - start Start a container - delete Delete a container - list List all containers + if _, err := os.Stat("flux.json"); err != nil { + return fmt.Errorf("no flux.json found, please run flux init first") + } -Flags: - -h, --help help for flux + loadingSpinner.Suffix = " Deploying" + loadingSpinner.Start() -Use "flux --help" for more information about a command.` + buf, err := compressDirectory(info.Compression) + if err != nil { + return fmt.Errorf("failed to compress directory: %v", err) + } -func runCommand(command string, args []string, config Config, info pkg.Info) error { - seekingHelp := false - if len(args) > 0 && (args[len(args)-1] == "--help" || args[len(args)-1] == "-h") { - seekingHelp = true - args = args[:len(args)-1] + body := &bytes.Buffer{} + writer := multipart.NewWriter(body) + configPart, err := writer.CreateFormFile("config", "flux.json") + + if err != nil { + return fmt.Errorf("failed to create config part: %v", err) } - spinnerWriter := CustomSpinnerWriter{ - currentSpinnerMsg: "", - lock: sync.Mutex{}, + fluxConfigFile, err := os.Open("flux.json") + if err != nil { + return fmt.Errorf("failed to open flux.json: %v", err) } + defer fluxConfigFile.Close() - loadingSpinner := spinner.New(spinner.CharSets[14], 100*time.Millisecond, spinner.WithWriter(&spinnerWriter)) - defer func() { - if loadingSpinner.Active() { - loadingSpinner.Stop() - } - }() + if _, err := io.Copy(configPart, fluxConfigFile); err != nil { + return fmt.Errorf("failed to write config part: %v", err) + } - signalChannel := make(chan os.Signal, 1) - signal.Notify(signalChannel, os.Interrupt) - go func() { - <-signalChannel - if loadingSpinner.Active() { - loadingSpinner.Stop() - } + codePart, err := writer.CreateFormFile("code", "code.tar.gz") + if err != nil { + return fmt.Errorf("failed to create code part: %v", err) + } - os.Exit(0) - }() + if _, err := codePart.Write(buf); err != nil { + return fmt.Errorf("failed to write code part: %v", err) + } - switch command { - case "deploy": - if seekingHelp { - fmt.Println(`Usage: - flux deploy - - Flux will deploy the app in the current directory, and start routing traffic to it.`) - return nil - } + if err := writer.Close(); err != nil { + return fmt.Errorf("failed to close writer: %v", err) + } - if _, err := os.Stat("flux.json"); err != nil { - return fmt.Errorf("No flux.json found, please run flux init first") - } + req, err := http.NewRequest("POST", config.DeamonURL+"/deploy", body) + req.Header.Set("Content-Type", writer.FormDataContentType()) - loadingSpinner.Suffix = " Deploying" - loadingSpinner.Start() + if err != nil { + return fmt.Errorf("failed to create request: %v", err) + } - buf, err := compressDirectory(info.Compression) - if err != nil { - return fmt.Errorf("Failed to compress directory: %v", err) - } + resp, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("failed to send request: %v", err) + } + defer resp.Body.Close() - body := &bytes.Buffer{} - writer := multipart.NewWriter(body) - configPart, err := writer.CreateFormFile("config", "flux.json") + customWriter := &CustomStdout{ + spinner: spinnerWriter, + lock: sync.Mutex{}, + } - if err != nil { - return fmt.Errorf("Failed to create config part: %v", err) + scanner := bufio.NewScanner(resp.Body) + var event string + var data pkg.DeploymentEvent + for scanner.Scan() { + line := scanner.Text() + if strings.HasPrefix(line, "data: ") { + if err := json.Unmarshal([]byte(line[6:]), &data); err != nil { + return fmt.Errorf("failed to parse deployment event: %v", err) + } + + switch event { + case "complete": + loadingSpinner.Stop() + var deploymentResponse struct { + App pkg.App `json:"app"` + } + if err := json.Unmarshal([]byte(data.Message), &deploymentResponse); err != nil { + return fmt.Errorf("failed to parse deployment response: %v", err) + } + fmt.Printf("App %s deployed successfully!\n", deploymentResponse.App.Name) + return nil + case "cmd_output": + customWriter.Printf("... %s\n", data.Message) + case "error": + loadingSpinner.Stop() + return fmt.Errorf("deployment failed: %s", data.Message) + default: + customWriter.Printf("%s\n", data.Message) + } + event = "" + } else if strings.HasPrefix(line, "event: ") { + event = strings.TrimPrefix(line, "event: ") } + } - fluxConfigFile, err := os.Open("flux.json") + if resp.StatusCode != http.StatusOK { + responseBody, err := io.ReadAll(resp.Body) if err != nil { - return fmt.Errorf("Failed to open flux.json: %v", err) + return fmt.Errorf("error reading response body: %v", err) } - defer fluxConfigFile.Close() - if _, err := io.Copy(configPart, fluxConfigFile); err != nil { - return fmt.Errorf("Failed to write config part: %v", err) - } + responseBody = []byte(strings.TrimSuffix(string(responseBody), "\n")) - codePart, err := writer.CreateFormFile("code", "code.tar.gz") - if err != nil { - return fmt.Errorf("Failed to create code part: %v", err) - } + return fmt.Errorf("deploy failed: %s", responseBody) + } - if _, err := codePart.Write(buf); err != nil { - return fmt.Errorf("Failed to write code part: %v", err) - } + return nil +} - if err := writer.Close(); err != nil { - return fmt.Errorf("Failed to close writer: %v", err) - } +func StopCommand(seekingHelp bool, config Config, info pkg.Info, loadingSpinner *spinner.Spinner, spinnerWriter *CustomSpinnerWriter, args []string) error { + if seekingHelp { + fmt.Println(`Usage: + flux stop + + Flux will stop the deployment of the app in the current directory.`) + return nil + } - req, err := http.NewRequest("POST", config.DeamonURL+"/deploy", body) - req.Header.Set("Content-Type", writer.FormDataContentType()) + projectName, err := getProjectName("stop", args) + if err != nil { + return err + } + + req, err := http.Post(config.DeamonURL+"/stop/"+projectName, "application/json", nil) + if err != nil { + return fmt.Errorf("failed to stop app: %v", err) + } + defer req.Body.Close() - resp, err := http.DefaultClient.Do(req) + if req.StatusCode != http.StatusOK { + responseBody, err := io.ReadAll(req.Body) if err != nil { - return fmt.Errorf("Failed to send request: %v", err) + return fmt.Errorf("error reading response body: %v", err) } - defer resp.Body.Close() - customWriter := &CustomStdout{ - spinner: &spinnerWriter, - lock: sync.Mutex{}, - } + responseBody = []byte(strings.TrimSuffix(string(responseBody), "\n")) - scanner := bufio.NewScanner(resp.Body) - for scanner.Scan() { - line := scanner.Text() - if strings.HasPrefix(line, "data: ") { - var event pkg.DeploymentEvent - if err := json.Unmarshal([]byte(line[6:]), &event); err == nil { - switch event.Stage { - case "complete": - loadingSpinner.Stop() - var deploymentResponse struct { - App pkg.App `json:"app"` - } - if err := json.Unmarshal([]byte(event.Message), &deploymentResponse); err != nil { - return fmt.Errorf("Failed to parse deployment response: %v", err) - } - - fmt.Printf("App %s deployed successfully!\n", deploymentResponse.App.Name) - - return nil - case "cmd_output": - customWriter.Printf("... %s\n", event.Message) - case "error": - loadingSpinner.Stop() - return fmt.Errorf("Deployment failed: %s\n", event.Error) - default: - customWriter.Printf("%s\n", event.Message) - } - } - } - } + return fmt.Errorf("stop failed: %s", responseBody) + } - if resp.StatusCode != http.StatusOK { - responseBody, err := io.ReadAll(resp.Body) - if err != nil { - return fmt.Errorf("error reading response body: %v", err) - } + fmt.Printf("Successfully stopped %s\n", projectName) + return nil +} - responseBody = []byte(strings.TrimSuffix(string(responseBody), "\n")) +func StartCommand(seekingHelp bool, config Config, info pkg.Info, loadingSpinner *spinner.Spinner, spinnerWriter *CustomSpinnerWriter, args []string) error { + if seekingHelp { + fmt.Println(`Usage: + flux start + + Flux will start the deployment of the app in the current directory.`) + return nil + } - return fmt.Errorf("Deploy failed: %s", responseBody) - } - case "stop": - if seekingHelp { - fmt.Println(`Usage: - flux stop - - Flux will stop the deployment of the app in the current directory.`) - return nil - } + projectName, err := getProjectName("start", args) + if err != nil { + return err + } - projectName, err := getProjectName(command, args) - if err != nil { - return err - } + req, err := http.Post(config.DeamonURL+"/start/"+projectName, "application/json", nil) + if err != nil { + return fmt.Errorf("failed to start app: %v", err) + } + defer req.Body.Close() - req, err := http.Post(config.DeamonURL+"/stop/"+projectName, "application/json", nil) + if req.StatusCode != http.StatusOK { + responseBody, err := io.ReadAll(req.Body) if err != nil { - return fmt.Errorf("Failed to stop app: %v", err) + return fmt.Errorf("error reading response body: %v", err) } - defer req.Body.Close() - if req.StatusCode != http.StatusOK { - responseBody, err := io.ReadAll(req.Body) - if err != nil { - return fmt.Errorf("error reading response body: %v", err) - } + responseBody = []byte(strings.TrimSuffix(string(responseBody), "\n")) - responseBody = []byte(strings.TrimSuffix(string(responseBody), "\n")) + return fmt.Errorf("start failed: %s", responseBody) + } - return fmt.Errorf("Stop failed: %s", responseBody) - } + fmt.Printf("Successfully started %s\n", projectName) - fmt.Printf("Successfully stopped %s\n", projectName) - case "start": - if seekingHelp { - fmt.Println(`Usage: - flux start - - Flux will start the deployment of the app in the current directory.`) - return nil - } + return nil +} - projectName, err := getProjectName(command, args) - if err != nil { - return err - } +func DeleteCommand(seekingHelp bool, config Config, info pkg.Info, loadingSpinner *spinner.Spinner, spinnerWriter *CustomSpinnerWriter, args []string) error { + if seekingHelp { + fmt.Println(`Usage: + flux delete [project-name | all] - req, err := http.Post(config.DeamonURL+"/start/"+projectName, "application/json", nil) - if err != nil { - return fmt.Errorf("Failed to start app: %v", err) - } - defer req.Body.Close() + Options: + project-name: The name of the project to delete + all: Delete all projects + + Flux will delete the deployment of the app in the current directory or the specified project.`) + return nil + } - if req.StatusCode != http.StatusOK { - responseBody, err := io.ReadAll(req.Body) + if len(args) == 1 { + if args[0] == "all" { + var response string + fmt.Print("Are you sure you want to delete all projects? this will delete all volumes and containers associated and cannot be undone. \n[y/N] ") + fmt.Scanln(&response) + + if strings.ToLower(response) != "y" { + fmt.Println("Aborting...") + return nil + } + + response = "" + + fmt.Printf("Are you really sure you want to delete all projects? \n[y/N] ") + fmt.Scanln(&response) + + if strings.ToLower(response) != "y" { + fmt.Println("Aborting...") + return nil + } + + req, err := http.NewRequest("DELETE", config.DeamonURL+"/deployments", nil) + if err != nil { + return fmt.Errorf("failed to delete deployments: %v", err) + } + resp, err := http.DefaultClient.Do(req) if err != nil { - return fmt.Errorf("error reading response body: %v", err) + return fmt.Errorf("failed to delete deployments: %v", err) } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + responseBody, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("error reading response body: %v", err) + } - responseBody = []byte(strings.TrimSuffix(string(responseBody), "\n")) + responseBody = []byte(strings.TrimSuffix(string(responseBody), "\n")) - return fmt.Errorf("Start failed: %s", responseBody) - } + return fmt.Errorf("delete failed: %s", responseBody) + } - fmt.Printf("Successfully started %s\n", projectName) - case "delete": - if seekingHelp { - fmt.Println(`Usage: - flux delete [project-name | all] - - Options: - project-name: The name of the project to delete - all: Delete all projects - - Flux will delete the deployment of the app in the current directory or the specified project.`) + fmt.Printf("Successfully deleted all projects\n") return nil } + } - if len(args) == 1 { - if args[0] == "all" { - var response string - fmt.Print("Are you sure you want to delete all projects? this will delete all volumes and containers associated and cannot be undone. \n[y/N] ") - fmt.Scanln(&response) + projectName, err := getProjectName("delete", args) + if err != nil { + return err + } - if strings.ToLower(response) != "y" { - fmt.Println("Aborting...") - return nil - } + // ask for confirmation + fmt.Printf("Are you sure you want to delete %s? this will delete all volumes and containers associated with the deployment, and cannot be undone. \n[y/N] ", projectName) + var response string + fmt.Scanln(&response) - response = "" + if strings.ToLower(response) != "y" { + fmt.Println("Aborting...") + return nil + } - fmt.Printf("Are you really sure you want to delete all projects? \n[y/N] ") - fmt.Scanln(&response) + req, err := http.NewRequest("DELETE", config.DeamonURL+"/deployments/"+projectName, nil) + if err != nil { + return fmt.Errorf("failed to delete app: %v", err) + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("failed to delete app: %v", err) + } + defer resp.Body.Close() - if strings.ToLower(response) != "y" { - fmt.Println("Aborting...") - return nil - } + if resp.StatusCode != http.StatusOK { + responseBody, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("error reading response body: %v", err) + } - req, err := http.NewRequest("DELETE", config.DeamonURL+"/deployments", nil) - if err != nil { - return fmt.Errorf("Failed to delete deployments: %v", err) - } - resp, err := http.DefaultClient.Do(req) - if err != nil { - return fmt.Errorf("failed to delete deployments: %v", err) - } - defer resp.Body.Close() + responseBody = []byte(strings.TrimSuffix(string(responseBody), "\n")) - if resp.StatusCode != http.StatusOK { - responseBody, err := io.ReadAll(resp.Body) - if err != nil { - return fmt.Errorf("error reading response body: %v", err) - } + return fmt.Errorf("delete failed: %s", responseBody) + } - responseBody = []byte(strings.TrimSuffix(string(responseBody), "\n")) + fmt.Printf("Successfully deleted %s\n", projectName) - return fmt.Errorf("delete failed: %s", responseBody) - } + return nil +} - fmt.Printf("Successfully deleted all projects\n") - return nil - } - } +func ListCommand(seekingHelp bool, config Config, info pkg.Info, loadingSpinner *spinner.Spinner, spinnerWriter *CustomSpinnerWriter, args []string) error { + if seekingHelp { + fmt.Println(`Usage: + flux list - projectName, err := getProjectName(command, args) + Flux will list all the apps in the daemon.`) + return nil + } + + resp, err := http.Get(config.DeamonURL + "/apps") + if err != nil { + return fmt.Errorf("failed to get apps: %v", err) + } + + if resp.StatusCode != http.StatusOK { + responseBody, err := io.ReadAll(resp.Body) if err != nil { - return err + return fmt.Errorf("error reading response body: %v", err) } - // ask for confirmation - fmt.Printf("Are you sure you want to delete %s? this will delete all volumes and containers associated with the deployment, and cannot be undone. \n[y/N] ", projectName) - var response string + responseBody = []byte(strings.TrimSuffix(string(responseBody), "\n")) + + return fmt.Errorf("list failed: %s", responseBody) + } + + var apps []pkg.App + if err := json.NewDecoder(resp.Body).Decode(&apps); err != nil { + return fmt.Errorf("failed to decode apps: %v", err) + } + + if len(apps) == 0 { + fmt.Println("No apps found") + return nil + } + + for _, app := range apps { + fmt.Printf("%s (%s)\n", app.Name, app.DeploymentStatus) + } + + return nil +} + +func InitCommand(seekingHelp bool, config Config, info pkg.Info, loadingSpinner *spinner.Spinner, spinnerWriter *CustomSpinnerWriter, args []string) error { + if seekingHelp { + fmt.Println(`Usage: + flux init [project-name] + + Options: + project-name: The name of the project to initialize + + Flux will initialize a new project in the current directory or the specified project.`) + return nil + } + + var projectConfig pkg.ProjectConfig + + var response string + if len(args) > 1 { + response = args[0] + } else { + fmt.Println("What is the name of your project?") fmt.Scanln(&response) + } - if strings.ToLower(response) != "y" { - fmt.Println("Aborting...") - return nil - } + projectConfig.Name = response - req, err := http.NewRequest("DELETE", config.DeamonURL+"/deployments/"+projectName, nil) - if err != nil { - return fmt.Errorf("failed to delete app: %v", err) - } - resp, err := http.DefaultClient.Do(req) - if err != nil { - return fmt.Errorf("failed to delete app: %v", err) - } - defer resp.Body.Close() + fmt.Println("What URL should your project listen to?") + fmt.Scanln(&response) + if strings.HasPrefix(response, "http") { + response = strings.TrimPrefix(response, "http://") + response = strings.TrimPrefix(response, "https://") + } - if resp.StatusCode != http.StatusOK { - responseBody, err := io.ReadAll(resp.Body) - if err != nil { - return fmt.Errorf("error reading response body: %v", err) - } + response = strings.Split(response, "/")[0] - responseBody = []byte(strings.TrimSuffix(string(responseBody), "\n")) + projectConfig.Url = response - return fmt.Errorf("delete failed: %s", responseBody) - } + fmt.Println("What port does your project listen to?") + fmt.Scanln(&response) + port, err := strconv.ParseUint(response, 10, 16) + projectConfig.Port = uint16(port) + if err != nil || projectConfig.Port < 1 || projectConfig.Port > 65535 { + return fmt.Errorf("that doesnt look like a valid port, try a number between 1 and 65535") + } - fmt.Printf("Successfully deleted %s\n", projectName) - case "list": - if seekingHelp { - fmt.Println(`Usage: - flux list + configBytes, err := json.MarshalIndent(projectConfig, "", " ") + if err != nil { + return fmt.Errorf("failed to parse project config: %v", err) + } - Flux will list all the apps in the daemon.`) - return nil - } + os.WriteFile("flux.json", configBytes, 0644) - resp, err := http.Get(config.DeamonURL + "/apps") - if err != nil { - return fmt.Errorf("failed to get apps: %v", err) - } + fmt.Printf("Successfully initialized project %s\n", projectConfig.Name) - if resp.StatusCode != http.StatusOK { - responseBody, err := io.ReadAll(resp.Body) - if err != nil { - return fmt.Errorf("error reading response body: %v", err) - } + return nil +} - responseBody = []byte(strings.TrimSuffix(string(responseBody), "\n")) +var helpStr = `Usage: + flux - return fmt.Errorf("list failed: %s", responseBody) - } +Available Commands: + init Initialize a new project + deploy Deploy a new version of the app + stop Stop a container + start Start a container + delete Delete a container + list List all containers - var apps []pkg.App - if err := json.NewDecoder(resp.Body).Decode(&apps); err != nil { - return fmt.Errorf("failed to decode apps: %v", err) - } +Flags: + -h, --help help for flux - if len(apps) == 0 { - fmt.Println("No apps found") - return nil - } +Use "flux --help" for more information about a command.` - for _, app := range apps { - fmt.Printf("%s (%s)\n", app.Name, app.DeploymentStatus) - } - case "init": - if seekingHelp { - fmt.Println(`Usage: - flux init [project-name] - - Options: - project-name: The name of the project to initialize - - Flux will initialize a new project in the current directory or the specified project.`) - return nil - } +var maxDistance = 3 - var projectConfig pkg.ProjectConfig +type CommandHandler struct { + commands map[string]func(bool, Config, pkg.Info, *spinner.Spinner, *CustomSpinnerWriter, []string) error +} - var response string - if len(args) > 1 { - response = args[0] - } else { - fmt.Println("What is the name of your project?") - fmt.Scanln(&response) - } +func (h *CommandHandler) RegisterCmd(name string, handler func(bool, Config, pkg.Info, *spinner.Spinner, *CustomSpinnerWriter, []string) error) { + h.commands[name] = handler +} - projectConfig.Name = response +func runCommand(command string, args []string, config Config, info pkg.Info, cmdHandler CommandHandler, try int) error { + if try == 2 { + return fmt.Errorf("Unknown command: %s", command) + } - fmt.Println("What URL should your project listen to?") - fmt.Scanln(&response) - if strings.HasPrefix(response, "http") { - strings.TrimPrefix(response, "http://") - strings.TrimPrefix(response, "https://") - } + seekingHelp := false + if len(args) > 0 && (args[len(args)-1] == "--help" || args[len(args)-1] == "-h") { + seekingHelp = true + args = args[:len(args)-1] + } - response = strings.Split(response, "/")[0] + spinnerWriter := CustomSpinnerWriter{ + currentSpinnerMsg: "", + lock: sync.Mutex{}, + } - projectConfig.Url = response + loadingSpinner := spinner.New(spinner.CharSets[14], 100*time.Millisecond, spinner.WithWriter(&spinnerWriter)) + defer func() { + if loadingSpinner.Active() { + loadingSpinner.Stop() + } + }() - fmt.Println("What port does your project listen to?") - fmt.Scanln(&response) - port, err := strconv.ParseUint(response, 10, 16) - projectConfig.Port = uint16(port) - if err != nil || projectConfig.Port < 1 || projectConfig.Port > 65535 { - return fmt.Errorf("That doesnt look like a valid port, try a number between 1 and 65535") + signalChannel := make(chan os.Signal, 1) + signal.Notify(signalChannel, os.Interrupt) + go func() { + <-signalChannel + if loadingSpinner.Active() { + loadingSpinner.Stop() } - configBytes, err := json.MarshalIndent(projectConfig, "", " ") - if err != nil { - return fmt.Errorf("failed to parse project config: %v", err) + os.Exit(0) + }() + + handler, ok := cmdHandler.commands[command] + if ok { + return handler(seekingHelp, config, info, loadingSpinner, &spinnerWriter, args) + } + + // diff the command against the list of commands and if we find a command that is more than 80% similar, ask if that's what the user meant + var closestMatch struct { + name string + score int + } + for cmdName := range cmdHandler.commands { + distance := levenshtein.ComputeDistance(cmdName, command) + + if distance <= maxDistance { + if closestMatch.name == "" || distance < closestMatch.score { + closestMatch.name = cmdName + closestMatch.score = distance + } } + } - os.WriteFile("flux.json", configBytes, 0644) + if closestMatch.name == "" { + return fmt.Errorf("unknown command: %s", command) + } + + var response string + fmt.Printf("No command found with the name '%s'. Did you mean '%s'?\n", command, closestMatch.name) + fmt.Scanln(&response) - fmt.Printf("Successfully initialized project %s\n", projectConfig.Name) - default: - return fmt.Errorf("unknown command: %s\n%s", command, helpStr) + if strings.ToLower(response) == "y" || strings.ToLower(response) == "yes" { + command = closestMatch.name + } else { + return nil } - return nil + return runCommand(command, args, config, info, cmdHandler, try+1) } func main() { @@ -720,7 +796,17 @@ func main() { os.Exit(1) } - err = runCommand(command, args, config, info) + cmdHandler := CommandHandler{ + commands: make(map[string]func(bool, Config, pkg.Info, *spinner.Spinner, *CustomSpinnerWriter, []string) error), + } + + cmdHandler.RegisterCmd("deploy", DeployCommand) + cmdHandler.RegisterCmd("stop", StopCommand) + cmdHandler.RegisterCmd("start", StartCommand) + cmdHandler.RegisterCmd("delete", DeleteCommand) + cmdHandler.RegisterCmd("init", InitCommand) + + err = runCommand(command, args, config, info, cmdHandler, 0) if err != nil { fmt.Printf("%v\n", err) os.Exit(1) diff --git a/go.mod b/go.mod index 4d9fd96..9ae452f 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( require ( github.com/Microsoft/go-winio v0.4.14 // indirect + github.com/agnivade/levenshtein v1.2.0 // indirect github.com/containerd/log v0.1.0 // indirect github.com/distribution/reference v0.6.0 // indirect github.com/docker/go-connections v0.5.0 // indirect diff --git a/go.sum b/go.sum index 8b9cfaa..c983eaa 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,8 @@ github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOEl github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= github.com/Microsoft/go-winio v0.4.14 h1:+hMXMk01us9KgxGb7ftKQt2Xpf5hH/yky+TDA+qxleU= github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA= +github.com/agnivade/levenshtein v1.2.0 h1:U9L4IOT0Y3i0TIlUIDJ7rVUziKi/zPbrJGaFrtYH3SY= +github.com/agnivade/levenshtein v1.2.0/go.mod h1:QVVI16kDrtSuwcpd0p1+xMC6Z/VfhtCyDIjcwga4/DU= github.com/briandowns/spinner v1.23.1 h1:t5fDPmScwUjozhDj4FA46p5acZWIPXYE30qW2Ptu650= github.com/briandowns/spinner v1.23.1/go.mod h1:LaZeM4wm2Ywy6vO571mvhQNRcWfRUnXOs0RcKV0wYKM= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= diff --git a/pkg/responses.go b/pkg/responses.go index becf493..d9847f9 100644 --- a/pkg/responses.go +++ b/pkg/responses.go @@ -17,7 +17,5 @@ type Info struct { } type DeploymentEvent struct { - Stage string `json:"stage"` Message string `json:"message"` - Error string `json:"error,omitempty"` } diff --git a/server/app.go b/server/app.go index 5bf8f13..41735be 100644 --- a/server/app.go +++ b/server/app.go @@ -12,10 +12,10 @@ import ( ) type App struct { - ID int64 `json:"id,omitempty"` - Deployment Deployment `json:"deployment,omitempty"` - Name string `json:"name,omitempty"` - DeploymentID int64 `json:"deployment_id,omitempty"` + ID int64 `json:"id,omitempty"` + Deployment *Deployment `json:"deployment,omitempty"` + Name string `json:"name,omitempty"` + DeploymentID int64 `json:"deployment_id,omitempty"` } func CreateApp(ctx context.Context, imageName string, projectPath string, projectConfig pkg.ProjectConfig) (*App, error) { @@ -24,51 +24,36 @@ func CreateApp(ctx context.Context, imageName string, projectPath string, projec } log.Printf("Creating deployment %s...\n", app.Name) - container, err := CreateDockerContainer(ctx, imageName, projectPath, projectConfig) - if err != nil || container == nil { - - return nil, fmt.Errorf("Failed to create container: %v", err) - } - - deployment, err := CreateDeployment(*container, projectConfig.Port, projectConfig.Url, Flux.db) + deployment, err := CreateDeployment(projectConfig.Port, projectConfig.Url, Flux.db) app.Deployment = deployment if err != nil { log.Printf("Failed to create deployment: %v", err) return nil, err } + container, err := CreateContainer(ctx, imageName, projectPath, projectConfig, true, deployment) + if err != nil || container == nil { + return nil, fmt.Errorf("failed to create container: %v", err) + } + if appInsertStmt == nil { appInsertStmt, err = Flux.db.Prepare("INSERT INTO apps (name, deployment_id) VALUES ($1, $2) RETURNING id, name, deployment_id") if err != nil { - return nil, fmt.Errorf("Failed to prepare statement: %v", err) + return nil, fmt.Errorf("failed to prepare statement: %v", err) } } // create app in the database err = appInsertStmt.QueryRow(projectConfig.Name, deployment.ID).Scan(&app.ID, &app.Name, &app.DeploymentID) if err != nil { - return nil, fmt.Errorf("Failed to insert app: %v", err) + return nil, fmt.Errorf("failed to insert app: %v", err) } err = deployment.Start(ctx) if err != nil { - return nil, fmt.Errorf("Failed to start deployment: %v", err) + return nil, fmt.Errorf("failed to start deployment: %v", err) } - var headContainer *Container - for _, container := range deployment.Containers { - if container.Head { - headContainer = &container - } - } - - deployment.Proxy, err = NewDeploymentProxy(&deployment, headContainer) - if err != nil { - return nil, fmt.Errorf("Failed to create deployment proxy: %v", err) - } - - Flux.proxy.AddDeployment(&deployment) - Flux.appManager.AddApp(app.Name, app) return app, nil @@ -79,16 +64,20 @@ func (app *App) Upgrade(ctx context.Context, projectConfig pkg.ProjectConfig, im // if deploy is not started, start it deploymentStatus, err := app.Deployment.Status(ctx) - if deploymentStatus != "running" || err != nil { + if err != nil { + return fmt.Errorf("failed to get deployment status: %v", err) + } + + if deploymentStatus != "running" { err = app.Deployment.Start(ctx) if err != nil { - return fmt.Errorf("Failed to start deployment: %v", err) + return fmt.Errorf("failed to start deployment: %v", err) } } err = app.Deployment.Upgrade(ctx, projectConfig, imageName, projectPath) if err != nil { - return fmt.Errorf("Failed to upgrade deployment: %v", err) + return fmt.Errorf("failed to upgrade deployment: %v", err) } return nil @@ -110,7 +99,7 @@ func (app *App) Remove(ctx context.Context) error { projectPath := filepath.Join(Flux.rootDir, "apps", app.Name) err = os.RemoveAll(projectPath) if err != nil { - return fmt.Errorf("Failed to remove project directory: %v", err) + return fmt.Errorf("failed to remove project directory: %v", err) } return nil @@ -141,13 +130,17 @@ func (am *AppManager) GetAllApps() []*App { } func (am *AppManager) AddApp(name string, app *App) { + if app.Deployment.Containers == nil || app.Deployment.Head == nil || len(app.Deployment.Containers) == 0 { + panic("nil containers") + } + am.Store(name, app) } func (am *AppManager) DeleteApp(name string) error { app := am.GetApp(name) if app == nil { - return fmt.Errorf("App not found") + return fmt.Errorf("app not found") } err := app.Remove(context.Background()) @@ -185,10 +178,10 @@ func (am *AppManager) Init() { } for _, app := range apps { - var deployment Deployment + deployment := &Deployment{} var headContainer *Container Flux.db.QueryRow("SELECT id, url, port FROM deployments WHERE id = ?", app.DeploymentID).Scan(&deployment.ID, &deployment.URL, &deployment.Port) - deployment.Containers = make([]Container, 0) + deployment.Containers = make([]*Container, 0) rows, err = Flux.db.Query("SELECT id, container_id, deployment_id, head FROM containers WHERE deployment_id = ?", app.DeploymentID) if err != nil { @@ -201,19 +194,18 @@ func (am *AppManager) Init() { var container Container var containerIDString string rows.Scan(&container.ID, &containerIDString, &container.DeploymentID, &container.Head) - container.Deployment = &deployment + container.Deployment = deployment copy(container.ContainerID[:], containerIDString) if container.Head { + if headContainer != nil { + log.Fatalf("Several containers are marked as head") + } + headContainer = &container } - deployment.Containers = append(deployment.Containers, container) - } - - for i, container := range deployment.Containers { - var volumes []Volume - rows, err := Flux.db.Query("SELECT id, volume_id, container_id FROM volumes WHERE container_id = ?", container.ID) + rows, err := Flux.db.Query("SELECT id, volume_id, container_id, mountpoint FROM volumes WHERE container_id = ?", container.ContainerID[:]) if err != nil { log.Printf("Failed to query volumes: %v\n", err) return @@ -222,17 +214,32 @@ func (am *AppManager) Init() { for rows.Next() { var volume Volume - rows.Scan(&volume.ID, &volume.VolumeID, &volume.ContainerID) - volumes = append(volumes, volume) + rows.Scan(&volume.ID, &volume.VolumeID, &volume.ContainerID, &volume.Mountpoint) + container.Volumes = append(container.Volumes, volume) } - deployment.Containers[i].Volumes = volumes + deployment.Containers = append(deployment.Containers, &container) } - deployment.Proxy, _ = NewDeploymentProxy(&deployment, headContainer) + if headContainer == nil { + log.Fatalf("head container is nil!") + } + deployment.Head = headContainer app.Deployment = deployment - am.AddApp(app.Name, &app) + + status, err := deployment.Status(context.Background()) + if err != nil { + log.Printf("Failed to get deployment status: %v\n", err) + continue + } + + if status != "running" { + continue + } + + deployment.Proxy, _ = deployment.NewDeploymentProxy() + Flux.proxy.AddDeployment(deployment) } } diff --git a/server/container.go b/server/container.go index 797b2e1..fa51756 100644 --- a/server/container.go +++ b/server/container.go @@ -2,6 +2,7 @@ package server import ( "context" + "database/sql" "fmt" "log" "net/http" @@ -17,10 +18,17 @@ import ( "github.com/juls0730/flux/pkg" ) +var ( + volumeInsertStmt *sql.Stmt + volumeUpdateStmt *sql.Stmt + containerInsertStmt *sql.Stmt +) + type Volume struct { ID int64 `json:"id"` VolumeID string `json:"volume_id"` - ContainerID int64 `json:"container_id"` + Mountpoint string `json:"mountpoint"` + ContainerID string `json:"container_id"` } type Container struct { @@ -32,14 +40,13 @@ type Container struct { DeploymentID int64 `json:"deployment_id"` } -func CreateDockerVolume(ctx context.Context, name string) (vol *Volume, err error) { +func CreateDockerVolume(ctx context.Context) (vol *Volume, err error) { dockerVolume, err := Flux.dockerClient.VolumeCreate(ctx, volume.CreateOptions{ Driver: "local", DriverOpts: map[string]string{}, - Name: name, }) if err != nil { - return nil, fmt.Errorf("Failed to create volume: %v", err) + return nil, fmt.Errorf("failed to create volume: %v", err) } log.Printf("Volume %s created at %s\n", dockerVolume.Name, dockerVolume.Mountpoint) @@ -51,21 +58,19 @@ func CreateDockerVolume(ctx context.Context, name string) (vol *Volume, err erro return vol, nil } -func CreateDockerContainer(ctx context.Context, imageName, projectPath string, projectConfig pkg.ProjectConfig) (c *Container, err error) { - log.Printf("Deploying container with image %s\n", imageName) - +func CreateDockerContainer(ctx context.Context, imageName, projectPath string, projectConfig pkg.ProjectConfig, vol *Volume) (*Container, error) { containerName := fmt.Sprintf("%s-%s", projectConfig.Name, time.Now().Format("20060102-150405")) if projectConfig.EnvFile != "" { envBytes, err := os.Open(filepath.Join(projectPath, projectConfig.EnvFile)) if err != nil { - return nil, fmt.Errorf("Failed to open env file: %v", err) + return nil, fmt.Errorf("failed to open env file: %v", err) } defer envBytes.Close() envVars, err := godotenv.Parse(envBytes) if err != nil { - return nil, fmt.Errorf("Failed to parse env file: %v", err) + return nil, fmt.Errorf("failed to parse env file: %v", err) } for key, value := range envVars { @@ -73,8 +78,6 @@ func CreateDockerContainer(ctx context.Context, imageName, projectPath string, p } } - vol, err := CreateDockerVolume(ctx, fmt.Sprintf("flux_%s-volume", projectConfig.Name)) - log.Printf("Creating container %s...\n", containerName) resp, err := Flux.dockerClient.ContainerCreate(ctx, &container.Config{ Image: imageName, @@ -90,7 +93,7 @@ func CreateDockerContainer(ctx context.Context, imageName, projectPath string, p { Type: mount.TypeVolume, Source: vol.VolumeID, - Target: "/workspace", + Target: vol.Mountpoint, ReadOnly: false, }, }, @@ -100,18 +103,131 @@ func CreateDockerContainer(ctx context.Context, imageName, projectPath string, p containerName, ) if err != nil { - return nil, fmt.Errorf("Failed to create container: %v", err) + return nil, err } - c = &Container{ + c := &Container{ ContainerID: [64]byte([]byte(resp.ID)), Volumes: []Volume{*vol}, } - log.Printf("Created new container: %s\n", containerName) return c, nil } +func CreateContainer(ctx context.Context, imageName, projectPath string, projectConfig pkg.ProjectConfig, head bool, deployment *Deployment) (c *Container, err error) { + log.Printf("Creating container with image %s\n", imageName) + + if projectConfig.EnvFile != "" { + envBytes, err := os.Open(filepath.Join(projectPath, projectConfig.EnvFile)) + if err != nil { + return nil, fmt.Errorf("failed to open env file: %v", err) + } + defer envBytes.Close() + + envVars, err := godotenv.Parse(envBytes) + if err != nil { + return nil, fmt.Errorf("failed to parse env file: %v", err) + } + + for key, value := range envVars { + projectConfig.Environment = append(projectConfig.Environment, fmt.Sprintf("%s=%s", key, value)) + } + } + + var vol *Volume + vol, err = CreateDockerVolume(ctx) + if err != nil { + return nil, err + } + + vol.Mountpoint = "/workspace" + + if volumeInsertStmt == nil { + volumeInsertStmt, err = Flux.db.Prepare("INSERT INTO volumes (volume_id, mountpoint, container_id) VALUES (?, ?, ?) RETURNING id, volume_id, mountpoint, container_id") + if err != nil { + log.Printf("Failed to prepare statement: %v\n", err) + return nil, err + } + } + + c, err = CreateDockerContainer(ctx, imageName, projectPath, projectConfig, vol) + if err != nil { + return nil, err + } + + if containerInsertStmt == nil { + containerInsertStmt, err = Flux.db.Prepare("INSERT INTO containers (container_id, head, deployment_id) VALUES ($1, $2, $3) RETURNING id, container_id, head, deployment_id") + if err != nil { + return nil, err + } + } + + var containerIDString string + err = containerInsertStmt.QueryRow(c.ContainerID[:], head, deployment.ID).Scan(&c.ID, &containerIDString, &c.Head, &c.DeploymentID) + if err != nil { + return nil, err + } + copy(c.ContainerID[:], containerIDString) + + err = volumeInsertStmt.QueryRow(vol.VolumeID, vol.Mountpoint, c.ContainerID[:]).Scan(&vol.ID, &vol.VolumeID, &vol.Mountpoint, &vol.ContainerID) + if err != nil { + return nil, err + } + + c.Deployment = deployment + if head { + deployment.Head = c + } + deployment.Containers = append(deployment.Containers, c) + + return c, nil +} + +func (c *Container) Upgrade(ctx context.Context, imageName, projectPath string, projectConfig pkg.ProjectConfig) (*Container, error) { + // Create new container with new image + log.Printf("Upgrading container %s...\n", c.ContainerID[:12]) + if c.Volumes == nil { + return nil, fmt.Errorf("no volumes found for container %s", c.ContainerID[:12]) + } + + vol := &c.Volumes[0] + + newContainer, err := CreateDockerContainer(ctx, imageName, projectPath, projectConfig, vol) + if err != nil { + return nil, err + } + newContainer.Deployment = c.Deployment + + if containerInsertStmt == nil { + containerInsertStmt, err = Flux.db.Prepare("INSERT INTO containers (container_id, head, deployment_id) VALUES ($1, $2, $3) RETURNING id, container_id, head, deployment_id") + if err != nil { + return nil, err + } + } + + var containerIDString string + err = containerInsertStmt.QueryRow(newContainer.ContainerID[:], c.Head, c.Deployment.ID).Scan(&newContainer.ID, &containerIDString, &newContainer.Head, &newContainer.DeploymentID) + if err != nil { + log.Printf("Failed to insert container: %v\n", err) + return nil, err + } + copy(newContainer.ContainerID[:], containerIDString) + + if volumeUpdateStmt == nil { + volumeUpdateStmt, err = Flux.db.Prepare("UPDATE volumes SET container_id = ? WHERE id = ? RETURNING id, volume_id, mountpoint, container_id") + if err != nil { + return nil, err + } + } + + vol = &newContainer.Volumes[0] + volumeUpdateStmt.QueryRow(newContainer.ContainerID[:], vol.ID).Scan(&vol.ID, &vol.VolumeID, &vol.Mountpoint, &vol.ContainerID) + + log.Printf("Upgraded container") + + return newContainer, nil +} + func (c *Container) Start(ctx context.Context) error { return Flux.dockerClient.ContainerStart(ctx, string(c.ContainerID[:]), container.StartOptions{}) } @@ -124,7 +240,7 @@ func (c *Container) Remove(ctx context.Context) error { err := RemoveDockerContainer(ctx, string(c.ContainerID[:])) if err != nil { - return fmt.Errorf("Failed to remove container (%s): %v", c.ContainerID[:12], err) + return fmt.Errorf("failed to remove container (%s): %v", c.ContainerID[:12], err) } tx, err := Flux.db.Begin() @@ -142,7 +258,7 @@ func (c *Container) Remove(ctx context.Context) error { for _, volume := range c.Volumes { if err := RemoveVolume(ctx, volume.VolumeID); err != nil { tx.Rollback() - return fmt.Errorf("Failed to remove volume (%s): %v", volume.VolumeID, err) + return fmt.Errorf("failed to remove volume (%s): %v", volume.VolumeID, err) } _, err = tx.Exec("DELETE FROM volumes WHERE volume_id = ?", volume.VolumeID) @@ -176,11 +292,11 @@ func (c *Container) Status(ctx context.Context) (string, error) { // RemoveContainer stops and removes a container, but be warned that this will not remove the container from the database func RemoveDockerContainer(ctx context.Context, containerID string) error { if err := Flux.dockerClient.ContainerStop(ctx, containerID, container.StopOptions{}); err != nil { - return fmt.Errorf("Failed to stop container (%s): %v", containerID[:12], err) + return fmt.Errorf("failed to stop container (%s): %v", containerID[:12], err) } if err := Flux.dockerClient.ContainerRemove(ctx, containerID, container.RemoveOptions{}); err != nil { - return fmt.Errorf("Failed to remove container (%s): %v", containerID[:12], err) + return fmt.Errorf("failed to remove container (%s): %v", containerID[:12], err) } return nil @@ -220,7 +336,7 @@ func GracefullyRemoveDockerContainer(ctx context.Context, containerID string) er Timeout: &timeout, }) if err != nil { - return fmt.Errorf("Failed to stop container: %v", err) + return fmt.Errorf("failed to stop container: %v", err) } ctx, cancel := context.WithTimeout(ctx, time.Duration(timeout)*time.Second) @@ -249,7 +365,7 @@ func RemoveVolume(ctx context.Context, volumeID string) error { log.Printf("Removed volume %s\n", volumeID) if err := Flux.dockerClient.VolumeRemove(ctx, volumeID, true); err != nil { - return fmt.Errorf("Failed to remove volume (%s): %v", volumeID, err) + return fmt.Errorf("failed to remove volume (%s): %v", volumeID, err) } return nil diff --git a/server/deploy.go b/server/deploy.go index 051a0fc..4075c1e 100644 --- a/server/deploy.go +++ b/server/deploy.go @@ -73,6 +73,12 @@ func (dt *DeploymentLock) CompleteDeployment(appName string) { var deploymentLock = NewDeploymentLock() +type DeploymentEvent struct { + Stage string `json:"stage"` + Message string `json:"message"` + StatusCode int `json:"status,omitempty"` +} + func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { if Flux.appManager == nil { panic("App manager is nil") @@ -120,59 +126,84 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { return } - eventChannel := make(chan pkg.DeploymentEvent, 10) + w.WriteHeader(http.StatusMultiStatus) + + eventChannel := make(chan DeploymentEvent, 10) + defer close(eventChannel) + var wg sync.WaitGroup + defer wg.Wait() wg.Add(1) - go func() { + go func(w http.ResponseWriter, flusher http.Flusher) { defer wg.Done() for { select { + case <-ctx.Done(): + return case event, ok := <-eventChannel: if !ok { return } - eventJSON, err := json.Marshal(event) + ev := struct { + Message string `json:"message"` + }{ + Message: event.Message, + } + + eventJSON, err := json.Marshal(ev) if err != nil { + // Write error directly to ResponseWriter + jsonErr := json.NewEncoder(w).Encode(err) + if jsonErr != nil { + fmt.Fprint(w, "data: {\"message\": \"Error encoding error\"}\n\n") + return + } + fmt.Fprintf(w, "data: %s\n\n", err.Error()) - flusher.Flush() + if flusher != nil { + flusher.Flush() + } return } + fmt.Fprintf(w, "event: %s\n", event.Stage) fmt.Fprintf(w, "data: %s\n\n", eventJSON) - flusher.Flush() - case <-ctx.Done(): - return + if flusher != nil { + flusher.Flush() + } + + if event.Stage == "error" || event.Stage == "complete" { + return + } } } - }() + }(w, flusher) - eventChannel <- pkg.DeploymentEvent{ + eventChannel <- DeploymentEvent{ Stage: "start", Message: "Uploading code", } deployRequest.Code, _, err = r.FormFile("code") if err != nil { - eventChannel <- pkg.DeploymentEvent{ - Stage: "error", - Message: "No code archive found", - Error: err.Error(), + eventChannel <- DeploymentEvent{ + Stage: "error", + Message: "No code archive found", + StatusCode: http.StatusBadRequest, } - http.Error(w, "No code archive found", http.StatusBadRequest) return } defer deployRequest.Code.Close() if projectConfig.Name == "" || projectConfig.Url == "" || projectConfig.Port == 0 { - eventChannel <- pkg.DeploymentEvent{ - Stage: "error", - Message: "Invalid flux.json, a name, url, and port must be specified", - Error: "Invalid flux.json, a name, url, and port must be specified", + eventChannel <- DeploymentEvent{ + Stage: "error", + Message: "Invalid flux.json, a name, url, and port must be specified", + StatusCode: http.StatusBadRequest, } - http.Error(w, "Invalid flux.json, a name, url, and port must be specified", http.StatusBadRequest) return } @@ -181,27 +212,32 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { projectPath, err := s.UploadAppCode(deployRequest.Code, projectConfig) if err != nil { log.Printf("Failed to upload code: %v\n", err) - eventChannel <- pkg.DeploymentEvent{ - Stage: "error", - Message: "Failed to upload code", - Error: err.Error(), + eventChannel <- DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to upload code: %s", err), + StatusCode: http.StatusInternalServerError, } - http.Error(w, err.Error(), http.StatusInternalServerError) return } + // Streams the each line of the pipe into the eventChannel, this closes the pipe when the function exits streamPipe := func(pipe io.ReadCloser) { + // we need a wait group because otherwise the function *could* exit before the pipe is closed + // and wreck havoc on every future request + wg.Add(1) + defer wg.Done() + scanner := bufio.NewScanner(pipe) for scanner.Scan() { line := scanner.Text() - eventChannel <- pkg.DeploymentEvent{ + eventChannel <- DeploymentEvent{ Stage: "cmd_output", - Message: fmt.Sprintf("%s", line), + Message: line, } } if err := scanner.Err(); err != nil { - eventChannel <- pkg.DeploymentEvent{ + eventChannel <- DeploymentEvent{ Stage: "error", Message: fmt.Sprintf("Failed to read pipe: %s", err), } @@ -210,7 +246,7 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { } log.Printf("Preparing project %s...\n", projectConfig.Name) - eventChannel <- pkg.DeploymentEvent{ + eventChannel <- DeploymentEvent{ Stage: "preparing", Message: "Preparing project", } @@ -220,25 +256,22 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { cmdOut, err := prepareCmd.StdoutPipe() if err != nil { log.Printf("Failed to get stdout pipe: %v\n", err) - eventChannel <- pkg.DeploymentEvent{ - Stage: "error", - Message: fmt.Sprintf("Failed to get stdout pipe: %s", err), - Error: err.Error(), + eventChannel <- DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to get stdout pipe: %s", err), + StatusCode: http.StatusInternalServerError, } - http.Error(w, fmt.Sprintf("Failed to get stdout pipe: %s", err), http.StatusInternalServerError) return } cmdErr, err := prepareCmd.StderrPipe() if err != nil { log.Printf("Failed to get stderr pipe: %v\n", err) - eventChannel <- pkg.DeploymentEvent{ - Stage: "error", - Message: fmt.Sprintf("Failed to get stderr pipe: %s", err), - Error: err.Error(), + eventChannel <- DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to get stderr pipe: %s", err), + StatusCode: http.StatusInternalServerError, } - - http.Error(w, fmt.Sprintf("Failed to get stderr pipe: %s", err), http.StatusInternalServerError) return } @@ -248,19 +281,16 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { err = prepareCmd.Run() if err != nil { log.Printf("Failed to prepare project: %s\n", err) - eventChannel <- pkg.DeploymentEvent{ - Stage: "error", - Message: fmt.Sprintf("Failed to prepare project: %s", err), - Error: err.Error(), + eventChannel <- DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to prepare project: %s", err), + StatusCode: http.StatusInternalServerError, } - http.Error(w, fmt.Sprintf("Failed to prepare project: %s", err), http.StatusInternalServerError) return } - cmdOut.Close() - cmdErr.Close() - eventChannel <- pkg.DeploymentEvent{ + eventChannel <- DeploymentEvent{ Stage: "building", Message: "Building project image", } @@ -272,25 +302,23 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { cmdOut, err = buildCmd.StdoutPipe() if err != nil { log.Printf("Failed to get stdout pipe: %v\n", err) - eventChannel <- pkg.DeploymentEvent{ - Stage: "error", - Message: fmt.Sprintf("Failed to get stdout pipe: %s", err), - Error: err.Error(), + eventChannel <- DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to get stdout pipe: %s", err), + StatusCode: http.StatusInternalServerError, } - http.Error(w, fmt.Sprintf("Failed to get stdout pipe: %s", err), http.StatusInternalServerError) return } cmdErr, err = buildCmd.StderrPipe() if err != nil { log.Printf("Failed to get stderr pipe: %v\n", err) - eventChannel <- pkg.DeploymentEvent{ - Stage: "error", - Message: fmt.Sprintf("Failed to get stderr pipe: %s", err), - Error: err.Error(), + eventChannel <- DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to get stderr pipe: %s", err), + StatusCode: http.StatusInternalServerError, } - http.Error(w, fmt.Sprintf("Failed to get stderr pipe: %s", err), http.StatusInternalServerError) return } @@ -300,21 +328,18 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { err = buildCmd.Run() if err != nil { log.Printf("Failed to build image: %s\n", err) - eventChannel <- pkg.DeploymentEvent{ - Stage: "error", - Message: fmt.Sprintf("Failed to build image: %s", err), - Error: err.Error(), + eventChannel <- DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to build image: %s", err), + StatusCode: http.StatusInternalServerError, } - http.Error(w, fmt.Sprintf("Failed to build image: %s", err), http.StatusInternalServerError) return } - cmdOut.Close() - cmdErr.Close() app := Flux.appManager.GetApp(projectConfig.Name) - eventChannel <- pkg.DeploymentEvent{ + eventChannel <- DeploymentEvent{ Stage: "creating", Message: "Creating deployment", } @@ -323,26 +348,24 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { app, err = CreateApp(ctx, imageName, projectPath, projectConfig) if err != nil { log.Printf("Failed to create app: %v", err) - eventChannel <- pkg.DeploymentEvent{ - Stage: "error", - Message: fmt.Sprintf("Failed to create app: %s", err), - Error: err.Error(), + eventChannel <- DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to create app: %s", err), + StatusCode: http.StatusInternalServerError, } - http.Error(w, err.Error(), http.StatusInternalServerError) return } } else { err = app.Upgrade(ctx, projectConfig, imageName, projectPath) if err != nil { log.Printf("Failed to upgrade app: %v", err) - eventChannel <- pkg.DeploymentEvent{ - Stage: "error", - Message: fmt.Sprintf("Failed to upgrade app: %s", err), - Error: err.Error(), + eventChannel <- DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to upgrade app: %s", err), + StatusCode: http.StatusInternalServerError, } - http.Error(w, err.Error(), http.StatusInternalServerError) return } } @@ -352,27 +375,21 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { }) if err != nil { log.Printf("Failed to marshal deploy response: %v\n", err) - eventChannel <- pkg.DeploymentEvent{ - Stage: "error", - Message: fmt.Sprintf("Failed to marshal deploy response: %s", err), - Error: err.Error(), + eventChannel <- DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to marshal deploy response: %s", err), + StatusCode: http.StatusInternalServerError, } - http.Error(w, err.Error(), http.StatusInternalServerError) return } - eventChannel <- pkg.DeploymentEvent{ + eventChannel <- DeploymentEvent{ Stage: "complete", - Message: fmt.Sprintf("%s", responseJSON), + Message: string(responseJSON), } log.Printf("App %s deployed successfully!\n", app.Name) - - close(eventChannel) - - // make sure all the events are flushed - wg.Wait() } func (s *FluxServer) StartDeployHandler(w http.ResponseWriter, r *http.Request) { @@ -402,14 +419,7 @@ func (s *FluxServer) StartDeployHandler(w http.ResponseWriter, r *http.Request) } if app.Deployment.Proxy == nil { - var headContainer *Container - for _, container := range app.Deployment.Containers { - if container.Head { - headContainer = &container - } - } - - app.Deployment.Proxy, _ = NewDeploymentProxy(&app.Deployment, headContainer) + app.Deployment.Proxy, _ = app.Deployment.NewDeploymentProxy() } w.WriteHeader(http.StatusOK) diff --git a/server/deployment.go b/server/deployment.go index c052cca..086571f 100644 --- a/server/deployment.go +++ b/server/deployment.go @@ -11,21 +11,19 @@ import ( var ( deploymentInsertStmt *sql.Stmt - containerInsertStmt *sql.Stmt - volumeInsertStmt *sql.Stmt - updateVolumeStmt *sql.Stmt ) type Deployment struct { ID int64 `json:"id"` - Containers []Container `json:"containers,omitempty"` + Head *Container `json:"head,omitempty"` + Containers []*Container `json:"containers,omitempty"` Proxy *DeploymentProxy `json:"-"` URL string `json:"url"` Port uint16 `json:"port"` } // Creates a deployment and containers in the database -func CreateDeployment(container Container, port uint16, appUrl string, db *sql.DB) (Deployment, error) { +func CreateDeployment(port uint16, appUrl string, db *sql.DB) (*Deployment, error) { var deployment Deployment var err error @@ -33,115 +31,33 @@ func CreateDeployment(container Container, port uint16, appUrl string, db *sql.D deploymentInsertStmt, err = db.Prepare("INSERT INTO deployments (url, port) VALUES ($1, $2) RETURNING id, url, port") if err != nil { log.Printf("Failed to prepare statement: %v\n", err) - return Deployment{}, err + return nil, err } } err = deploymentInsertStmt.QueryRow(appUrl, port).Scan(&deployment.ID, &deployment.URL, &deployment.Port) if err != nil { log.Printf("Failed to insert deployment: %v\n", err) - return Deployment{}, err + return nil, err } - if containerInsertStmt == nil { - containerInsertStmt, err = db.Prepare("INSERT INTO containers (container_id, deployment_id, head) VALUES ($1, $2, $3) RETURNING id, container_id, deployment_id, head") - if err != nil { - log.Printf("Failed to prepare statement: %v\n", err) - return Deployment{}, err - } - } - - var containerIDString string - err = containerInsertStmt.QueryRow(container.ContainerID[:], deployment.ID, true).Scan(&container.ID, &containerIDString, &container.DeploymentID, &container.Head) - if err != nil { - log.Printf("Failed to get container id: %v\n", err) - return Deployment{}, err - } - copy(container.ContainerID[:], containerIDString) - - for i, volume := range container.Volumes { - if volumeInsertStmt == nil { - volumeInsertStmt, err = db.Prepare("INSERT INTO volumes (volume_id, container_id) VALUES (?, ?) RETURNING id, volume_id, container_id") - if err != nil { - log.Printf("Failed to prepare statement: %v\n", err) - return Deployment{}, err - } - } - - if err := volumeInsertStmt.QueryRow(volume.VolumeID, container.ID).Scan(&container.Volumes[i].ID, &container.Volumes[i].VolumeID, &container.Volumes[i].ContainerID); err != nil { - log.Printf("Failed to insert volume: %v\n", err) - return Deployment{}, err - } - } - - container.Deployment = &deployment - deployment.Containers = append(deployment.Containers, container) - - return deployment, nil + return &deployment, nil } func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig pkg.ProjectConfig, imageName string, projectPath string) error { existingContainers, err := findExistingDockerContainers(ctx, projectConfig.Name) if err != nil { - return fmt.Errorf("Failed to find existing containers: %v", err) - } - - // Deploy new container before deleting old one - c, err := CreateDockerContainer(ctx, imageName, projectPath, projectConfig) - if err != nil || c == nil { - log.Printf("Failed to create container: %v\n", err) - return err - } - - var container Container = *c - if containerInsertStmt == nil { - containerInsertStmt, err = Flux.db.Prepare("INSERT INTO containers (container_id, deployment_id, head) VALUES ($1, $2, $3) RETURNING id, container_id, deployment_id, head") - if err != nil { - log.Printf("Failed to prepare statement: %v\n", err) - return err - } + return fmt.Errorf("failed to find existing containers: %v", err) } - var containerIDString string - err = containerInsertStmt.QueryRow(container.ContainerID[:], deployment.ID, true).Scan(&container.ID, &containerIDString, &container.DeploymentID, &container.Head) + container, err := deployment.Head.Upgrade(ctx, imageName, projectPath, projectConfig) if err != nil { - log.Printf("Failed to get container id: %v\n", err) + log.Printf("Failed to upgrade container: %v\n", err) return err } - container.Deployment = deployment - - // the space time complexity of this is pretty bad, but it works - for _, existingContainer := range deployment.Containers { - if !existingContainer.Head { - continue - } - for _, volume := range existingContainer.Volumes { - var targetVolume *Volume - for i, volume := range container.Volumes { - if volume.VolumeID == volume.VolumeID { - targetVolume = &container.Volumes[i] - break - } - } - - if updateVolumeStmt == nil { - updateVolumeStmt, err = Flux.db.Prepare("UPDATE volumes SET container_id = ? WHERE id = ? RETURNING id, volume_id, container_id") - if err != nil { - log.Printf("Failed to prepare statement: %v\n", err) - return err - } - } - - err := updateVolumeStmt.QueryRow(container.ID, volume.ID).Scan(&targetVolume.ID, &targetVolume.VolumeID, &targetVolume.ContainerID) - if err != nil { - log.Printf("Failed to update volume: %v\n", err) - return err - } - } - } - - copy(container.ContainerID[:], containerIDString) + // copy(container.ContainerID[:], containerIDString) + deployment.Head = container deployment.Containers = append(deployment.Containers, container) log.Printf("Starting container %s...\n", container.ContainerID[:12]) @@ -163,7 +79,7 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig pkg.Pro // Create a new proxy that points to the new head, and replace the old one, but ensure that the old one is gracefully shutdown oldProxy := deployment.Proxy - deployment.Proxy, err = NewDeploymentProxy(deployment, &container) + deployment.Proxy, err = deployment.NewDeploymentProxy() if err != nil { log.Printf("Failed to create deployment proxy: %v\n", err) return err @@ -175,16 +91,17 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig pkg.Pro return err } - var containers []Container + var containers []*Container var oldContainers []*Container for _, container := range deployment.Containers { if existingContainers[string(container.ContainerID[:])] { log.Printf("Deleting container from db: %s\n", container.ContainerID[:12]) _, err = tx.Exec("DELETE FROM containers WHERE id = ?", container.ID) - oldContainers = append(oldContainers, &container) + oldContainers = append(oldContainers, container) if err != nil { + log.Printf("Failed to delete container: %v\n", err) tx.Rollback() return err } @@ -212,9 +129,6 @@ func (deployment *Deployment) Upgrade(ctx context.Context, projectConfig pkg.Pro } deployment.Containers = containers - - Flux.proxy.AddDeployment(deployment) - return nil } @@ -245,6 +159,11 @@ func (d *Deployment) Start(ctx context.Context) error { } } + if d.Proxy == nil { + d.Proxy, _ = d.NewDeploymentProxy() + Flux.proxy.AddDeployment(d) + } + return nil } @@ -257,19 +176,20 @@ func (d *Deployment) Stop(ctx context.Context) error { } } + Flux.proxy.RemoveDeployment(d) + d.Proxy = nil + return nil } func (d *Deployment) Status(ctx context.Context) (string, error) { var status string if d == nil { - fmt.Printf("Deployment is nil\n") - return "stopped", nil + return "", fmt.Errorf("deployment is nil") } if d.Containers == nil { - fmt.Printf("Containers are nil\n") - return "stopped", nil + return "", fmt.Errorf("containers are nil") } for _, container := range d.Containers { @@ -281,7 +201,7 @@ func (d *Deployment) Status(ctx context.Context) (string, error) { // if not all containers are in the same state if status != "" && status != containerStatus { - return "", fmt.Errorf("Malformed deployment") + return "", fmt.Errorf("malformed deployment") } status = containerStatus diff --git a/server/proxy.go b/server/proxy.go index b75424f..ed5c75c 100644 --- a/server/proxy.go +++ b/server/proxy.go @@ -16,7 +16,15 @@ type Proxy struct { deployments sync.Map } +func (p *Proxy) RemoveDeployment(deployment *Deployment) { + p.deployments.Delete(deployment.URL) +} + func (p *Proxy) AddDeployment(deployment *Deployment) { + if deployment.Containers == nil { + panic("containers is nil") + } + log.Printf("Adding deployment %s\n", deployment.URL) p.deployments.Store(deployment.URL, deployment) } @@ -37,24 +45,23 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { type DeploymentProxy struct { deployment *Deployment - currentHead *Container proxy *httputil.ReverseProxy gracePeriod time.Duration activeRequests int64 } -func NewDeploymentProxy(deployment *Deployment, head *Container) (*DeploymentProxy, error) { +func (deployment *Deployment) NewDeploymentProxy() (*DeploymentProxy, error) { if deployment == nil { - return nil, fmt.Errorf("Deployment is nil") + return nil, fmt.Errorf("deployment is nil") } - containerJSON, err := Flux.dockerClient.ContainerInspect(context.Background(), string(head.ContainerID[:])) + containerJSON, err := Flux.dockerClient.ContainerInspect(context.Background(), string(deployment.Head.ContainerID[:])) if err != nil { return nil, err } if containerJSON.NetworkSettings.IPAddress == "" { - return nil, fmt.Errorf("No IP address found for container %s", head.ContainerID[:12]) + return nil, fmt.Errorf("no IP address found for container %s", deployment.Head.ContainerID[:12]) } containerUrl, err := url.Parse(fmt.Sprintf("http://%s:%d", containerJSON.NetworkSettings.IPAddress, deployment.Port)) @@ -80,7 +87,6 @@ func NewDeploymentProxy(deployment *Deployment, head *Container) (*DeploymentPro return &DeploymentProxy{ deployment: deployment, - currentHead: head, proxy: proxy, gracePeriod: time.Second * 30, activeRequests: 0, @@ -91,22 +97,18 @@ func (dp *DeploymentProxy) GracefulShutdown(oldContainers []*Container) { ctx, cancel := context.WithTimeout(context.Background(), dp.gracePeriod) defer cancel() - // Create a channel to signal when wait group is done - for { + done := false + for !done { select { case <-ctx.Done(): - break + done = true default: if atomic.LoadInt64(&dp.activeRequests) == 0 { - break + done = true } time.Sleep(time.Second) } - - if atomic.LoadInt64(&dp.activeRequests) == 0 || ctx.Err() != nil { - break - } } for _, container := range oldContainers { diff --git a/server/schema.sql b/server/schema.sql index e6cd682..662e73e 100644 --- a/server/schema.sql +++ b/server/schema.sql @@ -22,6 +22,7 @@ CREATE TABLE IF NOT EXISTS containers ( CREATE TABLE IF NOT EXISTS volumes ( id INTEGER PRIMARY KEY AUTOINCREMENT UNIQUE, volume_id TEXT NOT NULL, + mountpoint TEXT NOT NULL, container_id INTEGER NOT NULL, FOREIGN KEY(container_id) REFERENCES containers(id) ); \ No newline at end of file diff --git a/server/server.go b/server/server.go index 3160a7b..c12c2e8 100644 --- a/server/server.go +++ b/server/server.go @@ -123,12 +123,6 @@ func NewServer() *FluxServer { Flux.proxy = &Proxy{} - Flux.appManager.Range(func(key, value interface{}) bool { - app := value.(*App) - Flux.proxy.AddDeployment(&app.Deployment) - return true - }) - port := os.Getenv("FLUXD_PROXY_PORT") if port == "" { port = "7465"