Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] 7 speedy pushes #13

Merged
merged 15 commits into from
Oct 4, 2023
2 changes: 1 addition & 1 deletion .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
contents: read
pull-requests: write
steps:
- uses: coderabbitai/ai-pr-reviewer@d7f9486e8bfddc6e55f9879d4e93b8eaac6f120f # v1.16.0
- uses: coderabbitai/ai-pr-reviewer@44244a9e06f5acf72a93f661c7dbb8d8d808143d # v1.16.2
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
OPENAI_API_KEY: ${{ secrets.AZURE_OPENAI_API_KEY }}
Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
.direnv/
.env
.vscode
.vscode/
bin/
dist/
example/input.zip
result
.vscode
3 changes: 1 addition & 2 deletions cmd/cloudexec/launch.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ func InitLaunchConfig() error {

// Write the default launch config to the file
_, err = launchConfigFile.WriteString(`

# Set the directory to upload to the droplet.
[input]
directory = ""
Expand Down Expand Up @@ -124,7 +123,7 @@ func Launch(user *user.User, config config.Config, dropletSize string, dropletRe
// upload local files to the bucket
sourcePath := lc.Input.Directory // TODO: verify that this path exists & throw informative error if not
destPath := fmt.Sprintf("job-%v", thisJobId)
fmt.Printf("Uploading contents of directory %s to bucket %s/%s...\n", sourcePath, bucketName, destPath)
fmt.Printf("Compressing and uploading contents of directory %s to bucket %s/%s...\n", sourcePath, bucketName, destPath)
err = UploadDirectoryToSpaces(config, bucketName, sourcePath, destPath)
if err != nil {
return fmt.Errorf("Failed to upload files: %w", err)
Expand Down
24 changes: 11 additions & 13 deletions cmd/cloudexec/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,17 @@ func main() {
return nil
},
},
{
Name: "init",
Usage: "Create a new cloudexec.toml launch configuration in the current directory",
Action: func(c *cli.Context) error {
err := InitLaunchConfig()
if err != nil {
return err
}
return nil
},
},
{
Name: "launch",
Usage: "Launch a droplet and start a job",
Expand All @@ -95,19 +106,6 @@ func main() {
Usage: "Optional droplet region",
},
},
Subcommands: []*cli.Command{
{
Name: "init",
Usage: "Create a new cloudexec.toml launch configuration in the current directory",
Action: func(c *cli.Context) error {
err := InitLaunchConfig()
if err != nil {
return err
}
return nil
},
},
},
Action: func(c *cli.Context) error {
// Abort on configuration error
if configErr != nil {
Expand Down
125 changes: 113 additions & 12 deletions cmd/cloudexec/push.go
Original file line number Diff line number Diff line change
@@ -1,40 +1,141 @@
package main

import (
"archive/zip"
"fmt"
"io"
"os"
"path/filepath"

"github.com/crytic/cloudexec/pkg/config"
"github.com/crytic/cloudexec/pkg/s3"
)

func UploadDirectoryToSpaces(config config.Config, bucketName string, sourcePath string, destPath string) error {
// Walk the directory and upload files recursively
return filepath.Walk(sourcePath, func(path string, info os.FileInfo, err error) error {
func UploadDirectoryToSpaces(config config.Config, bucket string, sourcePath string, destPath string) error {
// Compute the path for the zipped archive of sourcePath
zipFileName := "input.zip"
zipFilePath := filepath.Join(os.TempDir(), zipFileName)

// Create a file where we will write the zipped archive
fmt.Printf("Creating zipped archive at %s\n", zipFilePath)
zipFile, err := os.Create(zipFilePath)
if err != nil {
return err
}
defer zipFile.Close()

// Create a new zip writer
zipWriter := zip.NewWriter(zipFile)
defer zipWriter.Close()

// Walk the directory and recursively add files to the zipped archive
err = filepath.Walk(sourcePath, func(path string, info os.FileInfo, err error) error {
target := path
if err != nil {
return err
}

if !info.IsDir() {
// Read the file
fileBytes, err := os.ReadFile(path)
// If it's a symbolic link, resolve the target
if info.Mode()&os.ModeSymlink == os.ModeSymlink {
target, err = os.Readlink(path)
fmt.Printf("Resolved link from %s to %s\n", path, target)
if err != nil {
return fmt.Errorf("Failed to read file %s: %w", path, err)
return err
}
}

// Compute the destination key (path) in the bucket
relativePath, _ := filepath.Rel(sourcePath, path)
destinationKey := filepath.Join(destPath, "input", relativePath)
// If this is a subdirectory, make sure the path ends with a trailing slash before we create it
// See https://pkg.go.dev/archive/zip#Writer.Create for details
targetInfo, err := os.Stat(target)
if err != nil {
return err
}

err = s3.PutObject(config, bucketName, destinationKey, fileBytes)
if targetInfo.IsDir() {
cleanPath := filepath.Clean(path) + string(filepath.Separator)
fmt.Printf("Creating directory %s in the zipped archive\n", cleanPath)
_, err = zipWriter.Create(cleanPath)
if err != nil {
return err
}
return nil
}

// Don't recursively add this zipped archive
if filepath.Base(path) == zipFileName {
return nil
}

fmt.Printf("Adding %s to the zipped archive\n", target)

fmt.Printf("Successfully uploaded %s to %s/%s\n", path, bucketName, destinationKey)
// Create a new file entry in the zipped archive
zipFileEntry, err := zipWriter.Create(path)
if err != nil {
return err
}

// Open the file we're adding to the zipped archive
file, err := os.Open(target)
if err != nil {
return err
}

// Write this file to the zipped archive
_, err = io.Copy(zipFileEntry, file)
if err != nil {
return err
}

// Explicitly close the file once we're done to prevent a "too many open files" error
err = file.Close()
if err != nil {
return err
}

return nil
})
if err != nil {
return err
}
fmt.Printf("Successfully added all files from %s to zipped archive at %s\n", sourcePath, zipFilePath)

// Make sure all prior writes are sync'd to the filesystem
// This is necessary bc we're going to read the file right after writing it
err = zipWriter.Flush()
if err != nil {
return err
}
err = zipFile.Sync()
if err != nil {
return err
}

// Manually Closing is necessary to prevent zip file corruption during upload
err = zipWriter.Close()
if err != nil {
return err
}
err = zipFile.Close()
if err != nil {
return err
}

// Read the zipped archive
fileBytes, err := os.ReadFile(zipFilePath)
if err != nil {
return fmt.Errorf("Failed to read zipped archive %s: %w", zipFilePath, err)
}
if len(fileBytes) == 0 {
return fmt.Errorf("Failed to read zipped archive at %s: read zero bytes of data", zipFilePath)
}
bohendo marked this conversation as resolved.
Show resolved Hide resolved

// Upload the zipped archive
destKey := filepath.Join(destPath, "input.zip")
fmt.Printf("Uploading archive (%v bytes) to %s\n", len(fileBytes), destKey)
err = s3.PutObject(config, bucket, destKey, fileBytes)
if err != nil {
return err
}
bohendo marked this conversation as resolved.
Show resolved Hide resolved

return nil
}
33 changes: 23 additions & 10 deletions cmd/cloudexec/user_data.sh.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,13 @@ cleanup() {
update_state "failed"
fi

echo "Uploading results..."
# shellcheck disable=SC2310
s3cmd put -r ~/output/* "s3://${BUCKET_NAME}/job-${JOB_ID}/output/" || true # proceed to destroy droplet on error

echo "Uploading logs..."
# shellcheck disable=SC2310
s3cmd put /var/log/cloud-init-output.log "s3://${BUCKET_NAME}/job-${JOB_ID}/logs/" || true # proceed to destroy droplet on error
output_files="$(ls -A ~/output/)"
if [[ -n ${output_files} ]]; then
echo "Uploading results..."
s3cmd put -r ~/output/* "s3://${BUCKET_NAME}/job-${JOB_ID}/output/"
else
echo "Skipping results upload, no files found in ~/ouput"
fi

if [[ -s ${stdout_log} ]]; then
echo
Expand All @@ -150,6 +150,13 @@ cleanup() {
echo "No error logs generated"
fi

if [[ -s "/var/log/cloud-init-output.log" ]]; then
echo "Uploading logs..."
s3cmd put /var/log/cloud-init-output.log "s3://${BUCKET_NAME}/job-${JOB_ID}/logs/"
else
echo "No logs to upload.."
fi

echo
echo "Destroying droplet..."
THIS_DROPLET_ID=$(curl -s http://169.254.169.254/metadata/v1/id)
Expand Down Expand Up @@ -195,11 +202,17 @@ echo "Running setup..."
mkdir -p ~/output
eval "${SETUP_COMMANDS}"

echo "Downloading inputs..."
s3cmd get -r "s3://${BUCKET_NAME}/job-${JOB_ID}/input" ~/
echo "Downloading input archive..."
s3cmd get -r "s3://${BUCKET_NAME}/job-${JOB_ID}/input.zip" ~/
if [[ ! -s ~/input.zip ]]; then
echo "Error: Failed to download input archive"
exit 1
fi

echo "Unzipping input archive..."
unzip ~/input.zip -d ~/
if [[ ! -d ~/input ]]; then
echo "Error: Failed to download inputs directory"
echo "Error: Failed to unzip inputs directory"
exit 1
fi

Expand Down
3 changes: 2 additions & 1 deletion pkg/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func PutObject(config config.Config, bucket string, key string, value []byte) er
if err != nil {
return fmt.Errorf("Failed to create %s directory in bucket %s: %w", key, bucket, err)
}
fmt.Printf("Successfully created directory in s3 bucket: %s/%s\n", bucket, key)
return nil
}

Expand All @@ -179,8 +180,8 @@ func PutObject(config config.Config, bucket string, key string, value []byte) er
if err != nil {
return fmt.Errorf("Failed to upload file %s to bucket %s: %w", key, bucket, err)
}
// fmt.Printf("Successfully uploaded %s to %s\n", key, bucket)

fmt.Printf("Successfully uploaded file to s3 bucket: %s/%s\n", bucket, key)
return nil
}

Expand Down