diff --git a/README.md b/README.md index 57a2bb8..25010c6 100644 --- a/README.md +++ b/README.md @@ -1,78 +1,116 @@ # Flux -Flux is a lightweight self-hosted pseudo-paas for golang web apps that emphasizes simplicity and speed. Flux is built on top of [Buildpacks](https://buildpacks.io/) and [Docker](https://docs.docker.com/get-docker/). It is designed on top of blue-green deployments designed with the "set it and forget it" principle in mind. +Flux is a lightweight self-hosted pseudo-PaaS for hosting Golang web apps with ease. Built on top of [Buildpacks](https://buildpacks.io/) and [Docker](https://docs.docker.com/get-docker/), Flux simplifies the deployment process with a focus on similicity, speed, and reliability. -(I'll make this better later school starts in 30 minutes LMAO) +## Features -## Usage +- **Blue-Green Deployments**: Deploy new versions of your app without downtime +- **Simplify Deployment**: Flux takes care of the deployment process, so you can focus on writing your app +- **Flexible Configuration**: Easily configure your app with `flux.json` +- **Automatic Container Management**: Steamline your app with automatic container management -To get started you'll want [ZQDGR](https://github.com/juls0730/zqdgr), and you can start the daemon either with: +## Dependencies -``` +- [Go](https://golang.org/dl/) +- [ZQDGR](https://github.com/juls0730/zqdgr) +- [Buildpacks](https://buildpacks.io/) (daemon only) +- [Docker](https://docs.docker.com/get-docker/) (daemon only) + +## Intallation + +### Daemon + +To install and start the Flux daemon using ZQDGR, run the following command: + +```bash +# method 1 zqdgr build:daemon sudo ./fluxd -``` -or with - -``` +# method 2 FLUXD_ROOT_DIR=$PWD/fluxdd zqdgr run:daemon ``` -To get started with the cli you can run +### CLI -``` +Install the CLI using the following command: + +```bash go install github.com/juls0730/flux/cmd/flux@latest ``` -TODO: `go install` instructions and a docker image (sowwy) +## Configuration ### Daemon -The daemon is a HTTP server that listens for incoming HTTP requests. It handles deploying new apps and managing their containers. +Flux daemon looks for a confgiuration file in `/var/fluxd/config.json` but can be configured by setting `$FLUXD_ROOT_DIR` to the directory where you want all fluxd files to be stored. + +```json +{ + "builder": "paketobuildpacks/builder-jammy-tiny" +} +``` -To run the daemon, simply run `fluxd` in the root directory of this repository. The daemon will listen on port 5647, and the reverse proxy will listen on port 7465, but is configurable with the environment variable `FLUXD_PROXY_PORT`. Once you deploy an app, you must point the domain to the reverse proxy (make sure the Host header is sent). +- `builder`: The buildpack builder to use (default: `paketobuildpacks/builder-jammy-tiny`) -#### Configuration +#### Daemon Settings -The daemon will look for a `config.json` in ~/.config/flux, all this file contains is the builder to use for building the app's image, by default this is `paketobuildpacks/builder-jammy-tiny`. +- **Default port**: 5647 (Daemon server) +- **Reverse Proxy Port**: 7465 (configurable via `FLUXD_PROXY_PORT` environment variable) ### CLI -The CLI is a command-line interface for interacting with the daemon. +The CLI looks for a configuration file in `~/.config/flux/config.json`: -``` -flux +```json +{ + "daemon_url": "http://127.0.0.1:5647" +} ``` -The following commands are available: +- `daemon_url`: The URL of the daemon to connect to (default: `http://127.0.0.1:5647`) -- `init`: Initialize a new project -- `deploy`: Deploy an app -- `start`: Start a deployed app (apps are automatically started when deployed) -- `stop`: Stop a deployed app -- `delete`: Delete a deployed app -- `list`: List all deployed apps +### Commands -#### Configuration +```bash +Flux +``` -The CLI will look for a `config.json` in ~/.config/flux, all this file contains is the URL of the daemon, by default this is http://127.0.0.1:5647 but for most real use cases, this will be a server. +Available commands: -#### flux.json +- `init`: Initialize a new project +- `deploy`: Deploy an application +- `start`: Start an application +- `stop`: Stop an application +- `delete`: Delete an application +- `list`: View application logs + +### Project Configuration (`flux.json`) + +flux.json is the configuration file in the root of your proejct that defines deployment settings: + +```json +{ + "name": "my-app", + "url": "myapp.example.com", + "port": 8080, + "env_file": ".env", + "environment": ["DEBUG=true"] +} +``` -flux.json is the configuration file for a project, it contains the name of the project, the URL it should listen to, and the port it should listen to. You can also specify an env file and environment variables to set. All the available options are shown below: +#### Configuration Options - `name`: The name of the project -- `url`: The URL the project should listen to -- `port`: The port the web server is listening on -- `env_file`: The path to an env file to load environment variables from (relative to the project directory) -- `environment`: An array of environment variables to set +- `url`: Domain for the application +- `port`: Web server's listening port +- `env_file`: Path to environment variable file +- `environment`: Additional environment variables -## Dependencies +## Deployment Notes -- [Go](https://golang.org/dl/) -- [Buildpacks](https://buildpacks.io/) (daemon only) -- [Docker](https://docs.docker.com/get-docker/) (daemon only) +- After deploying an app, point your domain to the Flux reverse proxy +- Ensure the Host header is sent with your requests ## Contributing diff --git a/cmd/flux/main.go b/cmd/flux/main.go index 7053206..777bfa4 100644 --- a/cmd/flux/main.go +++ b/cmd/flux/main.go @@ -17,6 +17,7 @@ import ( "regexp" "strconv" "strings" + "sync" "time" "github.com/briandowns/spinner" @@ -177,18 +178,18 @@ func getProjectName(command string, args []string) (string, error) { if len(args) == 0 { if _, err := os.Stat("flux.json"); err != nil { - return "", fmt.Errorf("Usage: flux %[1]s , or run flux %[1]s in the project directory\n", command) + return "", fmt.Errorf("Usage: flux %[1]s , or run flux %[1]s in the project directory", command) } fluxConfigFile, err := os.Open("flux.json") if err != nil { - return "", fmt.Errorf("Failed to open flux.json: %v\n", err) + return "", fmt.Errorf("Failed to open flux.json: %v", err) } defer fluxConfigFile.Close() var config pkg.ProjectConfig if err := json.NewDecoder(fluxConfigFile).Decode(&config); err != nil { - return "", fmt.Errorf("Failed to decode flux.json: %v\n", err) + return "", fmt.Errorf("Failed to decode flux.json: %v", err) } projectName = config.Name @@ -199,8 +200,83 @@ func getProjectName(command string, args []string) (string, error) { return projectName, nil } +type CustomSpinnerWriter struct { + currentSpinnerMsg string + lock sync.Mutex +} + +func (w *CustomSpinnerWriter) Write(p []byte) (n int, err error) { + w.lock.Lock() + defer w.lock.Unlock() + + n, err = os.Stdout.Write(p) + if err != nil { + return n, err + } + + w.currentSpinnerMsg = string(p) + + return len(p), nil +} + +type CustomStdout struct { + spinner *CustomSpinnerWriter + lock sync.Mutex +} + +func (w *CustomStdout) Write(p []byte) (n int, err error) { + w.lock.Lock() + defer w.lock.Unlock() + + n, err = os.Stdout.Write([]byte(fmt.Sprintf("\033[2K\r%s", p))) + if err != nil { + return n, err + } + + nn, err := os.Stdout.Write([]byte(w.spinner.currentSpinnerMsg)) + if err != nil { + return n, err + } + + n = nn + n + + return n, nil +} + +func (w *CustomStdout) Printf(format string, a ...interface{}) (n int, err error) { + str := fmt.Sprintf(format, a...) + return w.Write([]byte(str)) +} + +var helpStr = `Usage: + flux + +Available Commands: + init Initialize a new project + deploy Deploy a new version of the app + stop Stop a container + start Start a container + delete Delete a container + list List all containers + +Flags: + -h, --help help for flux + +Use "flux --help" for more information about a command.` + func runCommand(command string, args []string, config Config, info pkg.Info) error { - loadingSpinner := spinner.New(spinner.CharSets[14], 100*time.Millisecond) + seekingHelp := false + if len(args) > 0 && (args[len(args)-1] == "--help" || args[len(args)-1] == "-h") { + seekingHelp = true + args = args[:len(args)-1] + } + + spinnerWriter := CustomSpinnerWriter{ + currentSpinnerMsg: "", + lock: sync.Mutex{}, + } + + loadingSpinner := spinner.New(spinner.CharSets[14], 100*time.Millisecond, spinner.WithWriter(&spinnerWriter)) defer func() { if loadingSpinner.Active() { loadingSpinner.Stop() @@ -220,8 +296,16 @@ func runCommand(command string, args []string, config Config, info pkg.Info) err switch command { case "deploy": + if seekingHelp { + fmt.Println(`Usage: + flux deploy + + Flux will deploy the app in the current directory, and start routing traffic to it.`) + return nil + } + if _, err := os.Stat("flux.json"); err != nil { - return fmt.Errorf("No flux.json found, please run flux init first\n") + return fmt.Errorf("No flux.json found, please run flux init first") } loadingSpinner.Suffix = " Deploying" @@ -229,7 +313,7 @@ func runCommand(command string, args []string, config Config, info pkg.Info) err buf, err := compressDirectory(info.Compression) if err != nil { - return fmt.Errorf("Failed to compress directory: %v\n", err) + return fmt.Errorf("Failed to compress directory: %v", err) } body := &bytes.Buffer{} @@ -237,54 +321,96 @@ func runCommand(command string, args []string, config Config, info pkg.Info) err configPart, err := writer.CreateFormFile("config", "flux.json") if err != nil { - return fmt.Errorf("Failed to create config part: %v\n", err) + return fmt.Errorf("Failed to create config part: %v", err) } fluxConfigFile, err := os.Open("flux.json") if err != nil { - return fmt.Errorf("Failed to open flux.json: %v\n", err) + return fmt.Errorf("Failed to open flux.json: %v", err) } defer fluxConfigFile.Close() if _, err := io.Copy(configPart, fluxConfigFile); err != nil { - return fmt.Errorf("Failed to write config part: %v\n", err) + return fmt.Errorf("Failed to write config part: %v", err) } codePart, err := writer.CreateFormFile("code", "code.tar.gz") if err != nil { - return fmt.Errorf("Failed to create code part: %v\n", err) + return fmt.Errorf("Failed to create code part: %v", err) } if _, err := codePart.Write(buf); err != nil { - return fmt.Errorf("Failed to write code part: %v\n", err) + return fmt.Errorf("Failed to write code part: %v", err) } if err := writer.Close(); err != nil { - return fmt.Errorf("Failed to close writer: %v\n", err) + return fmt.Errorf("Failed to close writer: %v", err) } - resp, err := http.Post(config.DeamonURL+"/deploy", "multipart/form-data; boundary="+writer.Boundary(), body) + req, err := http.NewRequest("POST", config.DeamonURL+"/deploy", body) + req.Header.Set("Content-Type", writer.FormDataContentType()) + + resp, err := http.DefaultClient.Do(req) if err != nil { - return fmt.Errorf("Failed to send request: %v\n", err) + return fmt.Errorf("Failed to send request: %v", err) } defer resp.Body.Close() + customWriter := &CustomStdout{ + spinner: &spinnerWriter, + lock: sync.Mutex{}, + } + + scanner := bufio.NewScanner(resp.Body) + for scanner.Scan() { + line := scanner.Text() + if strings.HasPrefix(line, "data: ") { + var event pkg.DeploymentEvent + if err := json.Unmarshal([]byte(line[6:]), &event); err == nil { + switch event.Stage { + case "complete": + loadingSpinner.Stop() + var deploymentResponse struct { + App pkg.App `json:"app"` + } + if err := json.Unmarshal([]byte(event.Message), &deploymentResponse); err != nil { + return fmt.Errorf("Failed to parse deployment response: %v", err) + } + + fmt.Printf("App %s deployed successfully!\n", deploymentResponse.App.Name) + + return nil + case "cmd_output": + customWriter.Printf("... %s\n", event.Message) + case "error": + loadingSpinner.Stop() + return fmt.Errorf("Deployment failed: %s\n", event.Error) + default: + customWriter.Printf("%s\n", event.Message) + } + } + } + } + if resp.StatusCode != http.StatusOK { responseBody, err := io.ReadAll(resp.Body) if err != nil { - return fmt.Errorf("error reading response body: %v\n", err) + return fmt.Errorf("error reading response body: %v", err) } - if len(responseBody) > 0 && responseBody[len(responseBody)-1] == '\n' { - responseBody = responseBody[:len(responseBody)-1] - } + responseBody = []byte(strings.TrimSuffix(string(responseBody), "\n")) - return fmt.Errorf("Deploy failed: %s\n", responseBody) + return fmt.Errorf("Deploy failed: %s", responseBody) } - - loadingSpinner.Stop() - fmt.Println("Deployed successfully!") case "stop": + if seekingHelp { + fmt.Println(`Usage: + flux stop + + Flux will stop the deployment of the app in the current directory.`) + return nil + } + projectName, err := getProjectName(command, args) if err != nil { return err @@ -292,25 +418,31 @@ func runCommand(command string, args []string, config Config, info pkg.Info) err req, err := http.Post(config.DeamonURL+"/stop/"+projectName, "application/json", nil) if err != nil { - return fmt.Errorf("Failed to stop app: %v\n", err) + return fmt.Errorf("Failed to stop app: %v", err) } defer req.Body.Close() if req.StatusCode != http.StatusOK { responseBody, err := io.ReadAll(req.Body) if err != nil { - return fmt.Errorf("error reading response body: %v\n", err) + return fmt.Errorf("error reading response body: %v", err) } - if len(responseBody) > 0 && responseBody[len(responseBody)-1] == '\n' { - responseBody = responseBody[:len(responseBody)-1] - } + responseBody = []byte(strings.TrimSuffix(string(responseBody), "\n")) - return fmt.Errorf("Stop failed: %s\n", responseBody) + return fmt.Errorf("Stop failed: %s", responseBody) } fmt.Printf("Successfully stopped %s\n", projectName) case "start": + if seekingHelp { + fmt.Println(`Usage: + flux start + + Flux will start the deployment of the app in the current directory.`) + return nil + } + projectName, err := getProjectName(command, args) if err != nil { return err @@ -318,25 +450,35 @@ func runCommand(command string, args []string, config Config, info pkg.Info) err req, err := http.Post(config.DeamonURL+"/start/"+projectName, "application/json", nil) if err != nil { - return fmt.Errorf("Failed to start app: %v\n", err) + return fmt.Errorf("Failed to start app: %v", err) } defer req.Body.Close() if req.StatusCode != http.StatusOK { responseBody, err := io.ReadAll(req.Body) if err != nil { - return fmt.Errorf("error reading response body: %v\n", err) + return fmt.Errorf("error reading response body: %v", err) } - if len(responseBody) > 0 && responseBody[len(responseBody)-1] == '\n' { - responseBody = responseBody[:len(responseBody)-1] - } + responseBody = []byte(strings.TrimSuffix(string(responseBody), "\n")) - return fmt.Errorf("Start failed: %s\n", responseBody) + return fmt.Errorf("Start failed: %s", responseBody) } fmt.Printf("Successfully started %s\n", projectName) case "delete": + if seekingHelp { + fmt.Println(`Usage: + flux delete [project-name | all] + + Options: + project-name: The name of the project to delete + all: Delete all projects + + Flux will delete the deployment of the app in the current directory or the specified project.`) + return nil + } + if len(args) == 1 { if args[0] == "all" { var response string @@ -360,7 +502,7 @@ func runCommand(command string, args []string, config Config, info pkg.Info) err req, err := http.NewRequest("DELETE", config.DeamonURL+"/deployments", nil) if err != nil { - return fmt.Errorf("Failed to delete deployments: %v\n", err) + return fmt.Errorf("Failed to delete deployments: %v", err) } resp, err := http.DefaultClient.Do(req) if err != nil { @@ -371,12 +513,10 @@ func runCommand(command string, args []string, config Config, info pkg.Info) err if resp.StatusCode != http.StatusOK { responseBody, err := io.ReadAll(resp.Body) if err != nil { - return fmt.Errorf("error reading response body: %v\n", err) + return fmt.Errorf("error reading response body: %v", err) } - if len(responseBody) > 0 && responseBody[len(responseBody)-1] == '\n' { - responseBody = responseBody[:len(responseBody)-1] - } + responseBody = []byte(strings.TrimSuffix(string(responseBody), "\n")) return fmt.Errorf("delete failed: %s", responseBody) } @@ -417,15 +557,21 @@ func runCommand(command string, args []string, config Config, info pkg.Info) err return fmt.Errorf("error reading response body: %v", err) } - if len(responseBody) > 0 && responseBody[len(responseBody)-1] == '\n' { - responseBody = responseBody[:len(responseBody)-1] - } + responseBody = []byte(strings.TrimSuffix(string(responseBody), "\n")) return fmt.Errorf("delete failed: %s", responseBody) } fmt.Printf("Successfully deleted %s\n", projectName) case "list": + if seekingHelp { + fmt.Println(`Usage: + flux list + + Flux will list all the apps in the daemon.`) + return nil + } + resp, err := http.Get(config.DeamonURL + "/apps") if err != nil { return fmt.Errorf("failed to get apps: %v", err) @@ -437,9 +583,7 @@ func runCommand(command string, args []string, config Config, info pkg.Info) err return fmt.Errorf("error reading response body: %v", err) } - if len(responseBody) > 0 && responseBody[len(responseBody)-1] == '\n' { - responseBody = responseBody[:len(responseBody)-1] - } + responseBody = []byte(strings.TrimSuffix(string(responseBody), "\n")) return fmt.Errorf("list failed: %s", responseBody) } @@ -458,6 +602,17 @@ func runCommand(command string, args []string, config Config, info pkg.Info) err fmt.Printf("%s (%s)\n", app.Name, app.DeploymentStatus) } case "init": + if seekingHelp { + fmt.Println(`Usage: + flux init [project-name] + + Options: + project-name: The name of the project to initialize + + Flux will initialize a new project in the current directory or the specified project.`) + return nil + } + var projectConfig pkg.ProjectConfig var response string @@ -498,7 +653,7 @@ func runCommand(command string, args []string, config Config, info pkg.Info) err fmt.Printf("Successfully initialized project %s\n", projectConfig.Name) default: - return fmt.Errorf("unknown command: %s", command) + return fmt.Errorf("unknown command: %s\n%s", command, helpStr) } return nil @@ -506,10 +661,15 @@ func runCommand(command string, args []string, config Config, info pkg.Info) err func main() { if len(os.Args) < 2 { - fmt.Println("Usage: flux ") + fmt.Println(helpStr) os.Exit(1) } + if os.Args[1] == "--help" || os.Args[1] == "-h" { + fmt.Println(helpStr) + os.Exit(0) + } + if _, err := os.Stat(filepath.Join(configPath, "config.json")); err != nil { if err := os.MkdirAll(configPath, 0755); err != nil { fmt.Printf("Failed to create config directory: %v\n", err) diff --git a/pkg/responses.go b/pkg/responses.go index 7152a28..becf493 100644 --- a/pkg/responses.go +++ b/pkg/responses.go @@ -15,3 +15,9 @@ type Compression struct { type Info struct { Compression Compression `json:"compression"` } + +type DeploymentEvent struct { + Stage string `json:"stage"` + Message string `json:"message"` + Error string `json:"error,omitempty"` +} diff --git a/server/app.go b/server/app.go index 5d52441..5bf8f13 100644 --- a/server/app.go +++ b/server/app.go @@ -2,7 +2,6 @@ package server import ( "context" - "database/sql" "fmt" "log" "os" @@ -14,7 +13,7 @@ import ( type App struct { ID int64 `json:"id,omitempty"` - Deployment Deployment `json:"-"` + Deployment Deployment `json:"deployment,omitempty"` Name string `json:"name,omitempty"` DeploymentID int64 `json:"deployment_id,omitempty"` } @@ -161,14 +160,14 @@ func (am *AppManager) DeleteApp(name string) error { return nil } -func (am *AppManager) Init(db *sql.DB) { +func (am *AppManager) Init() { log.Printf("Initializing deployments...\n") - if db == nil { + if Flux.db == nil { log.Panicf("DB is nil") } - rows, err := db.Query("SELECT id, name, deployment_id FROM apps") + rows, err := Flux.db.Query("SELECT id, name, deployment_id FROM apps") if err != nil { log.Printf("Failed to query apps: %v\n", err) return @@ -188,10 +187,10 @@ func (am *AppManager) Init(db *sql.DB) { for _, app := range apps { var deployment Deployment var headContainer *Container - db.QueryRow("SELECT id, url, port FROM deployments WHERE id = ?", app.DeploymentID).Scan(&deployment.ID, &deployment.URL, &deployment.Port) + Flux.db.QueryRow("SELECT id, url, port FROM deployments WHERE id = ?", app.DeploymentID).Scan(&deployment.ID, &deployment.URL, &deployment.Port) deployment.Containers = make([]Container, 0) - rows, err = db.Query("SELECT id, container_id, deployment_id, head FROM containers WHERE deployment_id = ?", app.DeploymentID) + rows, err = Flux.db.Query("SELECT id, container_id, deployment_id, head FROM containers WHERE deployment_id = ?", app.DeploymentID) if err != nil { log.Printf("Failed to query containers: %v\n", err) return @@ -214,7 +213,7 @@ func (am *AppManager) Init(db *sql.DB) { for i, container := range deployment.Containers { var volumes []Volume - rows, err := db.Query("SELECT id, volume_id, container_id FROM volumes WHERE container_id = ?", container.ID) + rows, err := Flux.db.Query("SELECT id, volume_id, container_id FROM volumes WHERE container_id = ?", container.ID) if err != nil { log.Printf("Failed to query volumes: %v\n", err) return diff --git a/server/container.go b/server/container.go index 4a5fd48..797b2e1 100644 --- a/server/container.go +++ b/server/container.go @@ -13,13 +13,10 @@ import ( "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/mount" "github.com/docker/docker/api/types/volume" - "github.com/docker/docker/client" "github.com/joho/godotenv" "github.com/juls0730/flux/pkg" ) -var dockerClient *client.Client - type Volume struct { ID int64 `json:"id"` VolumeID string `json:"volume_id"` @@ -27,26 +24,16 @@ type Volume struct { } type Container struct { - ID int64 `json:"id"` - Head bool `json:"head"` // if the container is the head of the deployment - Deployment *Deployment - Volumes []Volume `json:"volumes"` - ContainerID [64]byte `json:"container_id"` - DeploymentID int64 `json:"deployment_id"` -} - -func init() { - log.Printf("Initializing Docker client...\n") - - var err error - dockerClient, err = client.NewClientWithOpts(client.FromEnv) - if err != nil { - log.Fatalf("Failed to create Docker client: %v", err) - } + ID int64 `json:"id"` + Head bool `json:"head"` // if the container is the head of the deployment + Deployment *Deployment `json:"-"` + Volumes []Volume `json:"volumes"` + ContainerID [64]byte `json:"container_id"` + DeploymentID int64 `json:"deployment_id"` } func CreateDockerVolume(ctx context.Context, name string) (vol *Volume, err error) { - dockerVolume, err := dockerClient.VolumeCreate(ctx, volume.CreateOptions{ + dockerVolume, err := Flux.dockerClient.VolumeCreate(ctx, volume.CreateOptions{ Driver: "local", DriverOpts: map[string]string{}, Name: name, @@ -89,7 +76,7 @@ func CreateDockerContainer(ctx context.Context, imageName, projectPath string, p vol, err := CreateDockerVolume(ctx, fmt.Sprintf("flux_%s-volume", projectConfig.Name)) log.Printf("Creating container %s...\n", containerName) - resp, err := dockerClient.ContainerCreate(ctx, &container.Config{ + resp, err := Flux.dockerClient.ContainerCreate(ctx, &container.Config{ Image: imageName, Env: projectConfig.Environment, Volumes: map[string]struct{}{ @@ -126,11 +113,11 @@ func CreateDockerContainer(ctx context.Context, imageName, projectPath string, p } func (c *Container) Start(ctx context.Context) error { - return dockerClient.ContainerStart(ctx, string(c.ContainerID[:]), container.StartOptions{}) + return Flux.dockerClient.ContainerStart(ctx, string(c.ContainerID[:]), container.StartOptions{}) } func (c *Container) Stop(ctx context.Context) error { - return dockerClient.ContainerStop(ctx, string(c.ContainerID[:]), container.StopOptions{}) + return Flux.dockerClient.ContainerStop(ctx, string(c.ContainerID[:]), container.StopOptions{}) } func (c *Container) Remove(ctx context.Context) error { @@ -178,7 +165,7 @@ func (c *Container) Wait(ctx context.Context, port uint16) error { } func (c *Container) Status(ctx context.Context) (string, error) { - containerJSON, err := dockerClient.ContainerInspect(ctx, string(c.ContainerID[:])) + containerJSON, err := Flux.dockerClient.ContainerInspect(ctx, string(c.ContainerID[:])) if err != nil { return "", err } @@ -188,11 +175,11 @@ func (c *Container) Status(ctx context.Context) (string, error) { // RemoveContainer stops and removes a container, but be warned that this will not remove the container from the database func RemoveDockerContainer(ctx context.Context, containerID string) error { - if err := dockerClient.ContainerStop(ctx, containerID, container.StopOptions{}); err != nil { + if err := Flux.dockerClient.ContainerStop(ctx, containerID, container.StopOptions{}); err != nil { return fmt.Errorf("Failed to stop container (%s): %v", containerID[:12], err) } - if err := dockerClient.ContainerRemove(ctx, containerID, container.RemoveOptions{}); err != nil { + if err := Flux.dockerClient.ContainerRemove(ctx, containerID, container.RemoveOptions{}); err != nil { return fmt.Errorf("Failed to remove container (%s): %v", containerID[:12], err) } @@ -210,7 +197,7 @@ func WaitForDockerContainer(ctx context.Context, containerID string, containerPo return fmt.Errorf("container failed to become ready in time") default: - containerJSON, err := dockerClient.ContainerInspect(ctx, containerID) + containerJSON, err := Flux.dockerClient.ContainerInspect(ctx, containerID) if err != nil { return err } @@ -229,7 +216,7 @@ func WaitForDockerContainer(ctx context.Context, containerID string, containerPo func GracefullyRemoveDockerContainer(ctx context.Context, containerID string) error { timeout := 30 - err := dockerClient.ContainerStop(ctx, containerID, container.StopOptions{ + err := Flux.dockerClient.ContainerStop(ctx, containerID, container.StopOptions{ Timeout: &timeout, }) if err != nil { @@ -242,15 +229,15 @@ func GracefullyRemoveDockerContainer(ctx context.Context, containerID string) er for { select { case <-ctx.Done(): - return dockerClient.ContainerRemove(ctx, containerID, container.RemoveOptions{}) + return Flux.dockerClient.ContainerRemove(ctx, containerID, container.RemoveOptions{}) default: - containerJSON, err := dockerClient.ContainerInspect(ctx, containerID) + containerJSON, err := Flux.dockerClient.ContainerInspect(ctx, containerID) if err != nil { return err } if !containerJSON.State.Running { - return dockerClient.ContainerRemove(ctx, containerID, container.RemoveOptions{}) + return Flux.dockerClient.ContainerRemove(ctx, containerID, container.RemoveOptions{}) } time.Sleep(time.Second) @@ -261,7 +248,7 @@ func GracefullyRemoveDockerContainer(ctx context.Context, containerID string) er func RemoveVolume(ctx context.Context, volumeID string) error { log.Printf("Removed volume %s\n", volumeID) - if err := dockerClient.VolumeRemove(ctx, volumeID, true); err != nil { + if err := Flux.dockerClient.VolumeRemove(ctx, volumeID, true); err != nil { return fmt.Errorf("Failed to remove volume (%s): %v", volumeID, err) } @@ -269,7 +256,7 @@ func RemoveVolume(ctx context.Context, volumeID string) error { } func findExistingDockerContainers(ctx context.Context, containerPrefix string) (map[string]bool, error) { - containers, err := dockerClient.ContainerList(ctx, container.ListOptions{ + containers, err := Flux.dockerClient.ContainerList(ctx, container.ListOptions{ All: true, }) if err != nil { diff --git a/server/deploy.go b/server/deploy.go index 9f52a08..051a0fc 100644 --- a/server/deploy.go +++ b/server/deploy.go @@ -1,13 +1,17 @@ package server import ( + "bufio" + "context" "database/sql" "encoding/json" "fmt" + "io" "log" "mime/multipart" "net/http" "os/exec" + "sync" "github.com/juls0730/flux/pkg" ) @@ -25,7 +29,59 @@ type DeployResponse struct { App App `json:"app"` } +type DeploymentLock struct { + mu sync.Mutex + deployed map[string]context.CancelFunc +} + +func NewDeploymentLock() *DeploymentLock { + return &DeploymentLock{ + deployed: make(map[string]context.CancelFunc), + } +} + +func (dt *DeploymentLock) StartDeployment(appName string, ctx context.Context) (context.Context, error) { + dt.mu.Lock() + defer dt.mu.Unlock() + + // Check if the app is already being deployed + if _, exists := dt.deployed[appName]; exists { + return nil, fmt.Errorf("app %s is already being deployed", appName) + } + + // Create a context that can be cancelled + ctx, cancel := context.WithCancel(ctx) + + // Store the cancel function + dt.deployed[appName] = cancel + + return ctx, nil +} + +func (dt *DeploymentLock) CompleteDeployment(appName string) { + dt.mu.Lock() + defer dt.mu.Unlock() + + // Remove the app from deployed tracking + if cancel, exists := dt.deployed[appName]; exists { + // Cancel the context + cancel() + // Remove from map + delete(dt.deployed, appName) + } +} + +var deploymentLock = NewDeploymentLock() + func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { + if Flux.appManager == nil { + panic("App manager is nil") + } + + w.Header().Set("Content-Type", "test/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + err := r.ParseMultipartForm(10 << 30) // 10 GiB if err != nil { log.Printf("Failed to parse multipart form: %v\n", err) @@ -33,7 +89,6 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { return } - // bind to DeployRequest struct var deployRequest DeployRequest deployRequest.Config, _, err = r.FormFile("config") if err != nil { @@ -42,21 +97,81 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { } defer deployRequest.Config.Close() - deployRequest.Code, _, err = r.FormFile("code") - if err != nil { - http.Error(w, "No code archive found", http.StatusBadRequest) - return - } - defer deployRequest.Code.Close() - var projectConfig pkg.ProjectConfig if err := json.NewDecoder(deployRequest.Config).Decode(&projectConfig); err != nil { log.Printf("Failed to decode config: %v\n", err) + http.Error(w, "Invalid flux.json", http.StatusBadRequest) return } + ctx, err := deploymentLock.StartDeployment(projectConfig.Name, r.Context()) + if err != nil { + // This will happen if the app is already being deployed + http.Error(w, err.Error(), http.StatusConflict) + return + } + + defer deploymentLock.CompleteDeployment(projectConfig.Name) + + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "Streaming unsupported!", http.StatusInternalServerError) + return + } + + eventChannel := make(chan pkg.DeploymentEvent, 10) + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + + for { + select { + case event, ok := <-eventChannel: + if !ok { + return + } + + eventJSON, err := json.Marshal(event) + if err != nil { + fmt.Fprintf(w, "data: %s\n\n", err.Error()) + flusher.Flush() + return + } + + fmt.Fprintf(w, "data: %s\n\n", eventJSON) + flusher.Flush() + case <-ctx.Done(): + return + } + } + }() + + eventChannel <- pkg.DeploymentEvent{ + Stage: "start", + Message: "Uploading code", + } + + deployRequest.Code, _, err = r.FormFile("code") + if err != nil { + eventChannel <- pkg.DeploymentEvent{ + Stage: "error", + Message: "No code archive found", + Error: err.Error(), + } + http.Error(w, "No code archive found", http.StatusBadRequest) + return + } + defer deployRequest.Code.Close() + if projectConfig.Name == "" || projectConfig.Url == "" || projectConfig.Port == 0 { + eventChannel <- pkg.DeploymentEvent{ + Stage: "error", + Message: "Invalid flux.json, a name, url, and port must be specified", + Error: "Invalid flux.json, a name, url, and port must be specified", + } http.Error(w, "Invalid flux.json, a name, url, and port must be specified", http.StatusBadRequest) return } @@ -66,58 +181,198 @@ func (s *FluxServer) DeployHandler(w http.ResponseWriter, r *http.Request) { projectPath, err := s.UploadAppCode(deployRequest.Code, projectConfig) if err != nil { log.Printf("Failed to upload code: %v\n", err) + eventChannel <- pkg.DeploymentEvent{ + Stage: "error", + Message: "Failed to upload code", + Error: err.Error(), + } http.Error(w, err.Error(), http.StatusInternalServerError) return } + streamPipe := func(pipe io.ReadCloser) { + scanner := bufio.NewScanner(pipe) + for scanner.Scan() { + line := scanner.Text() + eventChannel <- pkg.DeploymentEvent{ + Stage: "cmd_output", + Message: fmt.Sprintf("%s", line), + } + } + + if err := scanner.Err(); err != nil { + eventChannel <- pkg.DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to read pipe: %s", err), + } + log.Printf("Error reading pipe: %s\n", err) + } + } + log.Printf("Preparing project %s...\n", projectConfig.Name) + eventChannel <- pkg.DeploymentEvent{ + Stage: "preparing", + Message: "Preparing project", + } + prepareCmd := exec.Command("go", "generate") prepareCmd.Dir = projectPath + cmdOut, err := prepareCmd.StdoutPipe() + if err != nil { + log.Printf("Failed to get stdout pipe: %v\n", err) + eventChannel <- pkg.DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to get stdout pipe: %s", err), + Error: err.Error(), + } + + http.Error(w, fmt.Sprintf("Failed to get stdout pipe: %s", err), http.StatusInternalServerError) + return + } + cmdErr, err := prepareCmd.StderrPipe() + if err != nil { + log.Printf("Failed to get stderr pipe: %v\n", err) + eventChannel <- pkg.DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to get stderr pipe: %s", err), + Error: err.Error(), + } + + http.Error(w, fmt.Sprintf("Failed to get stderr pipe: %s", err), http.StatusInternalServerError) + return + } + + go streamPipe(cmdOut) + go streamPipe(cmdErr) + err = prepareCmd.Run() if err != nil { log.Printf("Failed to prepare project: %s\n", err) + eventChannel <- pkg.DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to prepare project: %s", err), + Error: err.Error(), + } + http.Error(w, fmt.Sprintf("Failed to prepare project: %s", err), http.StatusInternalServerError) return } + cmdOut.Close() + cmdErr.Close() + + eventChannel <- pkg.DeploymentEvent{ + Stage: "building", + Message: "Building project image", + } log.Printf("Building image for project %s...\n", projectConfig.Name) imageName := fmt.Sprintf("flux_%s-image", projectConfig.Name) buildCmd := exec.Command("pack", "build", imageName, "--builder", s.config.Builder) buildCmd.Dir = projectPath + cmdOut, err = buildCmd.StdoutPipe() + if err != nil { + log.Printf("Failed to get stdout pipe: %v\n", err) + eventChannel <- pkg.DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to get stdout pipe: %s", err), + Error: err.Error(), + } + + http.Error(w, fmt.Sprintf("Failed to get stdout pipe: %s", err), http.StatusInternalServerError) + return + } + cmdErr, err = buildCmd.StderrPipe() + if err != nil { + log.Printf("Failed to get stderr pipe: %v\n", err) + eventChannel <- pkg.DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to get stderr pipe: %s", err), + Error: err.Error(), + } + + http.Error(w, fmt.Sprintf("Failed to get stderr pipe: %s", err), http.StatusInternalServerError) + return + } + + go streamPipe(cmdOut) + go streamPipe(cmdErr) + err = buildCmd.Run() if err != nil { log.Printf("Failed to build image: %s\n", err) + eventChannel <- pkg.DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to build image: %s", err), + Error: err.Error(), + } + http.Error(w, fmt.Sprintf("Failed to build image: %s", err), http.StatusInternalServerError) return } - - if Flux.appManager == nil { - panic("App manager is nil") - } + cmdOut.Close() + cmdErr.Close() app := Flux.appManager.GetApp(projectConfig.Name) + eventChannel <- pkg.DeploymentEvent{ + Stage: "creating", + Message: "Creating deployment", + } + if app == nil { - app, err = CreateApp(r.Context(), imageName, projectPath, projectConfig) + app, err = CreateApp(ctx, imageName, projectPath, projectConfig) if err != nil { log.Printf("Failed to create app: %v", err) + eventChannel <- pkg.DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to create app: %s", err), + Error: err.Error(), + } + http.Error(w, err.Error(), http.StatusInternalServerError) return } } else { - err = app.Upgrade(r.Context(), projectConfig, imageName, projectPath) + err = app.Upgrade(ctx, projectConfig, imageName, projectPath) if err != nil { - log.Printf("Failed to upgrade deployment: %v", err) + log.Printf("Failed to upgrade app: %v", err) + eventChannel <- pkg.DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to upgrade app: %s", err), + Error: err.Error(), + } + http.Error(w, err.Error(), http.StatusInternalServerError) return } } - log.Printf("App %s deployed successfully!\n", app.Name) - - json.NewEncoder(w).Encode(DeployResponse{ + responseJSON, err := json.Marshal(DeployResponse{ App: *app, }) + if err != nil { + log.Printf("Failed to marshal deploy response: %v\n", err) + eventChannel <- pkg.DeploymentEvent{ + Stage: "error", + Message: fmt.Sprintf("Failed to marshal deploy response: %s", err), + Error: err.Error(), + } + + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + eventChannel <- pkg.DeploymentEvent{ + Stage: "complete", + Message: fmt.Sprintf("%s", responseJSON), + } + + log.Printf("App %s deployed successfully!\n", app.Name) + + close(eventChannel) + + // make sure all the events are flushed + wg.Wait() } func (s *FluxServer) StartDeployHandler(w http.ResponseWriter, r *http.Request) { diff --git a/server/deployment.go b/server/deployment.go index d256ef2..c052cca 100644 --- a/server/deployment.go +++ b/server/deployment.go @@ -18,7 +18,7 @@ var ( type Deployment struct { ID int64 `json:"id"` - Containers []Container `json:"-"` + Containers []Container `json:"containers,omitempty"` Proxy *DeploymentProxy `json:"-"` URL string `json:"url"` Port uint16 `json:"port"` diff --git a/server/proxy.go b/server/proxy.go index 2571f40..b75424f 100644 --- a/server/proxy.go +++ b/server/proxy.go @@ -44,7 +44,11 @@ type DeploymentProxy struct { } func NewDeploymentProxy(deployment *Deployment, head *Container) (*DeploymentProxy, error) { - containerJSON, err := dockerClient.ContainerInspect(context.Background(), string(head.ContainerID[:])) + if deployment == nil { + return nil, fmt.Errorf("Deployment is nil") + } + + containerJSON, err := Flux.dockerClient.ContainerInspect(context.Background(), string(head.ContainerID[:])) if err != nil { return nil, err } diff --git a/server/server.go b/server/server.go index 84dbf92..3160a7b 100644 --- a/server/server.go +++ b/server/server.go @@ -3,6 +3,7 @@ package server import ( "archive/tar" "compress/gzip" + "context" "database/sql" "encoding/json" "fmt" @@ -14,6 +15,8 @@ import ( _ "embed" + "github.com/docker/docker/api/types/image" + "github.com/docker/docker/client" "github.com/juls0730/flux/pkg" _ "github.com/mattn/go-sqlite3" ) @@ -37,25 +40,28 @@ type FluxServerConfig struct { } type FluxServer struct { - config FluxServerConfig - db *sql.DB - proxy *Proxy - rootDir string - appManager *AppManager + config FluxServerConfig + db *sql.DB + proxy *Proxy + rootDir string + appManager *AppManager + dockerClient *client.Client } func NewServer() *FluxServer { + Flux = new(FluxServer) + var serverConfig FluxServerConfig - rootDir := os.Getenv("FLUXD_ROOT_DIR") - if rootDir == "" { - rootDir = "/var/fluxd" + Flux.rootDir = os.Getenv("FLUXD_ROOT_DIR") + if Flux.rootDir == "" { + Flux.rootDir = "/var/fluxd" } // parse config, if it doesnt exist, create it and use the default config - configPath := filepath.Join(rootDir, "config.json") + configPath := filepath.Join(Flux.rootDir, "config.json") if _, err := os.Stat(configPath); err != nil { - if err := os.MkdirAll(rootDir, 0755); err != nil { + if err := os.MkdirAll(Flux.rootDir, 0755); err != nil { log.Fatalf("Failed to create fluxd directory: %v\n", err) } @@ -79,28 +85,47 @@ func NewServer() *FluxServer { log.Fatalf("Failed to parse config file: %v\n", err) } - if err := os.MkdirAll(filepath.Join(rootDir, "apps"), 0755); err != nil { + Flux.config = serverConfig + + Flux.dockerClient, err = client.NewClientWithOpts(client.FromEnv) + if err != nil { + log.Fatalf("Failed to create docker client: %v\n", err) + } + + log.Printf("Pulling builder image %s, this may take a while...\n", serverConfig.Builder) + + events, err := Flux.dockerClient.ImagePull(context.Background(), fmt.Sprintf("%s:latest", serverConfig.Builder), image.PullOptions{}) + if err != nil { + log.Fatalf("Failed to pull builder image: %v\n", err) + } + + // wait for the iamge to be pulled + io.Copy(io.Discard, events) + + log.Printf("Successfully pulled builder image %s\n", serverConfig.Builder) + + if err := os.MkdirAll(filepath.Join(Flux.rootDir, "apps"), 0755); err != nil { log.Fatalf("Failed to create apps directory: %v\n", err) } - db, err := sql.Open("sqlite3", filepath.Join(rootDir, "fluxd.db")) + Flux.db, err = sql.Open("sqlite3", filepath.Join(Flux.rootDir, "fluxd.db")) if err != nil { log.Fatalf("Failed to open database: %v\n", err) } - _, err = db.Exec(string(schemaBytes)) + _, err = Flux.db.Exec(string(schemaBytes)) if err != nil { log.Fatalf("Failed to create database schema: %v\n", err) } - appManager := new(AppManager) - appManager.Init(db) + Flux.appManager = new(AppManager) + Flux.appManager.Init() - proxy := &Proxy{} + Flux.proxy = &Proxy{} - appManager.Range(func(key, value interface{}) bool { + Flux.appManager.Range(func(key, value interface{}) bool { app := value.(*App) - proxy.AddDeployment(&app.Deployment) + Flux.proxy.AddDeployment(&app.Deployment) return true }) @@ -111,19 +136,11 @@ func NewServer() *FluxServer { go func() { log.Printf("Proxy server starting on http://127.0.0.1:%s\n", port) - if err := http.ListenAndServe(fmt.Sprintf(":%s", port), proxy); err != nil && err != http.ErrServerClosed { + if err := http.ListenAndServe(fmt.Sprintf(":%s", port), Flux.proxy); err != nil && err != http.ErrServerClosed { log.Fatalf("Proxy server error: %v", err) } }() - Flux = &FluxServer{ - config: serverConfig, - db: db, - proxy: proxy, - appManager: appManager, - rootDir: rootDir, - } - return Flux } diff --git a/zqdgr.config.json b/zqdgr.config.json index 01c63d2..91edc8b 100644 --- a/zqdgr.config.json +++ b/zqdgr.config.json @@ -5,10 +5,10 @@ "author": "juls0730", "license": "MIT", "scripts": { - "build:daemon": "go build -o fluxd cmd/daemon/main.go", - "build:cli": "go build -o flux cmd/cli/main.go", - "run:daemon": "go run cmd/daemon/main.go", - "run:cli": "go run cmd/cli/main.go" + "build:daemon": "go build -o fluxd cmd/fluxd/main.go", + "build:cli": "go build -o flux cmd/flux/main.go", + "run:daemon": "go run cmd/fluxd/main.go", + "run:cli": "go run cmd/flux/main.go" }, "pattern": "**/*.go", "excluded_dirs": []