Skip to content

Commit

Permalink
Rework build process to support both directories and single files
Browse files Browse the repository at this point in the history
Rework kit build to handle both layers specified as directories and as
regular files. In particular, we need to be able to package and extract
both

  model:
    path: ./model (directory)

and

  model:
    path: ./model.onnx (file)

This requires significant reworking of the build process as previously
we assumed we could always 1) create a directory and 2) extract files
into it.

In addition, this commit changes the build process to stream the build
to a temporary file rather than an in-memory buffer to avoid requiring a
lot of memory.
  • Loading branch information
amisevsk committed Mar 1, 2024
1 parent ff9fbc1 commit d7529ae
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 114 deletions.
86 changes: 0 additions & 86 deletions pkg/artifact/layer.go

This file was deleted.

21 changes: 21 additions & 0 deletions pkg/artifact/model.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,28 @@
package artifact

import "kitops/pkg/lib/constants"

type Model struct {
Repository string
Layers []ModelLayer
Config *KitFile
}

type ModelLayer struct {
Path string
MediaType string
}

func (l *ModelLayer) Type() string {
switch l.MediaType {
case constants.CodeLayerMediaType:
return "code"
case constants.DataSetLayerMediaType:
return "dataset"
case constants.ModelConfigMediaType:
return "config"
case constants.ModelLayerMediaType:
return "model"
}
return "<unknown>"
}
6 changes: 3 additions & 3 deletions pkg/cmd/build/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func RunBuild(ctx context.Context, options *buildOptions) error {
return err
}
layer := &artifact.ModelLayer{
BaseDir: codePath,
Path: codePath,
MediaType: constants.CodeLayerMediaType,
}
model.Layers = append(model.Layers, *layer)
Expand All @@ -41,7 +41,7 @@ func RunBuild(ctx context.Context, options *buildOptions) error {
return err
}
layer := &artifact.ModelLayer{
BaseDir: datasetPath,
Path: datasetPath,
MediaType: constants.DataSetLayerMediaType,
}
model.Layers = append(model.Layers, *layer)
Expand All @@ -54,7 +54,7 @@ func RunBuild(ctx context.Context, options *buildOptions) error {
return err
}
layer := &artifact.ModelLayer{
BaseDir: modelPath,
Path: modelPath,
MediaType: constants.ModelLayerMediaType,
}
model.Layers = append(model.Layers, *layer)
Expand Down
11 changes: 5 additions & 6 deletions pkg/cmd/export/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func exportConfig(config *artifact.KitFile, exportDir string, overwrite bool) er
return nil
}

func exportLayer(ctx context.Context, store content.Storage, desc ocispec.Descriptor, exportDir string, overwrite bool) error {
func exportLayer(ctx context.Context, store content.Storage, desc ocispec.Descriptor, exportPath string, overwrite bool) error {
rc, err := store.Fetch(ctx, desc)
if err != nil {
return fmt.Errorf("failed get layer %s: %w", desc.Digest, err)
Expand All @@ -114,14 +114,13 @@ func exportLayer(ctx context.Context, store content.Storage, desc ocispec.Descri
defer gzr.Close()
tr := tar.NewReader(gzr)

if fi, exists := filesystem.PathExists(exportDir); exists {
if _, exists := filesystem.PathExists(exportPath); exists {
if !overwrite {
return fmt.Errorf("failed to export: path %s already exists", exportDir)
} else if !fi.IsDir() {
return fmt.Errorf("failed to export: path %s exists and is not a directory", exportDir)
return fmt.Errorf("failed to export: path %s already exists", exportPath)
}
output.Debugf("Directory %s already exists", exportDir)
output.Debugf("Directory %s already exists", exportPath)
}
exportDir := filepath.Dir(exportPath)
if err := os.MkdirAll(exportDir, 0755); err != nil {
return fmt.Errorf("failed to create directory %s: %w", exportDir, err)
}
Expand Down
146 changes: 146 additions & 0 deletions pkg/lib/storage/layer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package storage

import (
"archive/tar"
"compress/gzip"
"fmt"
"io"
"kitops/pkg/artifact"
"kitops/pkg/output"
"os"
"path/filepath"
"strings"

"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)

func compressLayer(layer *artifact.ModelLayer) (string, ocispec.Descriptor, error) {
pathInfo, err := os.Stat(layer.Path)
if err != nil {
return "", ocispec.DescriptorEmptyJSON, err
}
tempFile, err := os.CreateTemp("", "kitops_layer_*")
if err != nil {
return "", ocispec.DescriptorEmptyJSON, fmt.Errorf("failed to create temporary file: %w", err)
}
tempFileName := tempFile.Name()
output.Debugf("Compressing layer to temporary file %s", tempFileName)

digester := digest.Canonical.Digester()
mw := io.MultiWriter(tempFile, digester.Hash())

// Note: we have to close gzip writer before reading digest from digester as closing is what writes the GZIP footer
gzw := gzip.NewWriter(mw)
tw := tar.NewWriter(gzw)

// Wrapper function for closing writers before returning an error
handleErr := func(err error) (string, ocispec.Descriptor, error) {
// Don't care about these errors since we'll be deleting the file anyways
_ = tw.Close()
_ = gzw.Close()
_ = tempFile.Close()
removeTempFile(tempFileName)
return "", ocispec.DescriptorEmptyJSON, err
}

if pathInfo.Mode().IsRegular() {
if err := writeHeaderToTar(pathInfo.Name(), pathInfo, tw); err != nil {
return handleErr(err)
}
if err := writeFileToTar(layer.Path, pathInfo, tw); err != nil {
return handleErr(err)
}
} else if pathInfo.IsDir() {
if err := writeDirToTar(layer.Path, tw); err != nil {
return handleErr(err)
}
} else {
return handleErr(fmt.Errorf("path %s is neither a file nor a directory", layer.Path))
}

callAndPrintError(tw.Close, "Failed to close tar writer: %s")
callAndPrintError(gzw.Close, "Failed to close gzip writer: %s")

tempFileInfo, err := tempFile.Stat()
if err != nil {
removeTempFile(tempFileName)
return "", ocispec.DescriptorEmptyJSON, fmt.Errorf("failed to stat temporary file: %w", err)
}
callAndPrintError(tempFile.Close, "Failed to close temporary file: %s")

desc := ocispec.Descriptor{
MediaType: layer.MediaType,
Digest: digester.Digest(),
Size: tempFileInfo.Size(),
}
return tempFileName, desc, nil
}

func writeDirToTar(basePath string, tw *tar.Writer) error {
// We'll want paths in the tarball to be relative to the *parent* of basePath since we want
// to compress the directory pointed at by basePath
trimPath := filepath.Dir(basePath)
return filepath.Walk(basePath, func(file string, fi os.FileInfo, err error) error {
if err != nil {
return err
}
// Skip anything that's not a regular file or directory
if !fi.Mode().IsRegular() && !fi.Mode().IsDir() {
return nil
}

relPath := strings.TrimPrefix(strings.Replace(file, trimPath, "", -1), string(filepath.Separator))
if relPath == "" {
relPath = filepath.Base(basePath)
}
if err := writeHeaderToTar(relPath, fi, tw); err != nil {
return err
}
if fi.IsDir() {
return nil
}
return writeFileToTar(file, fi, tw)
})
}

func writeHeaderToTar(name string, fi os.FileInfo, tw *tar.Writer) error {
header, err := tar.FileInfoHeader(fi, "")
if err != nil {
return fmt.Errorf("failed to generate header for %s: %w", name, err)
}
header.Name = name
if err := tw.WriteHeader(header); err != nil {
return fmt.Errorf("failed to write header: %w", err)
}
output.Debugf("Wrote header %s to tar file", header.Name)
return nil
}

func writeFileToTar(file string, fi os.FileInfo, tw *tar.Writer) error {
f, err := os.Open(file)
if err != nil {
return fmt.Errorf("failed to open file for archiving: %w", err)
}
defer f.Close()

if written, err := io.Copy(tw, f); err != nil {
return fmt.Errorf("failed to add file to archive: %w", err)
} else if written != fi.Size() {
return fmt.Errorf("error writing file: %w", err)
}
output.Debugf("Wrote file %s to tar file", file)
return nil
}

func callAndPrintError(f func() error, msg string) {
if err := f(); err != nil {
output.Errorf(msg, err)
}
}

func removeTempFile(filepath string) {
if err := os.Remove(filepath); err != nil && !os.IsNotExist(err) {
output.Errorf("Failed to clean up temporary file %s: %s", filepath, err)
}
}
41 changes: 22 additions & 19 deletions pkg/lib/storage/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"kitops/pkg/lib/constants"
"kitops/pkg/lib/repo"
"kitops/pkg/output"
"os"

"github.com/opencontainers/go-digest"
specs "github.com/opencontainers/image-spec/specs-go"
Expand Down Expand Up @@ -39,34 +40,36 @@ func SaveModel(ctx context.Context, store oras.Target, model *artifact.Model, ta
}

func saveContentLayer(ctx context.Context, store oras.Target, layer *artifact.ModelLayer) (ocispec.Descriptor, error) {
buf := &bytes.Buffer{}
err := layer.Apply(buf)
// We want to store a gzipped tar file in store, but to do so we need a descriptor, so we have to compress
// to a temporary file. Ideally, we'd also add this to the internal store by moving the file to avoid
// copying if possible.
tempPath, desc, err := compressLayer(layer)
if err != nil {
return ocispec.DescriptorEmptyJSON, err
}
defer func() {
if err := os.Remove(tempPath); err != nil {
output.Errorf("Failed to remove temporary file %s: %s", tempPath, err)
}
}()

// Create a descriptor for the layer
desc := ocispec.Descriptor{
MediaType: layer.MediaType,
Digest: digest.FromBytes(buf.Bytes()),
Size: int64(buf.Len()),
if exists, err := store.Exists(ctx, desc); err != nil {
return ocispec.DescriptorEmptyJSON, err
} else if exists {
output.Infof("Already saved %s layer: %s", layer.Type(), desc.Digest)
return desc, nil
}

exists, err := store.Exists(ctx, desc)
file, err := os.Open(tempPath)
if err != nil {
return ocispec.DescriptorEmptyJSON, err
}
if exists {
output.Infof("Model layer already saved: %s", desc.Digest)
} else {
// Does not exist in storage, need to push
err = store.Push(ctx, desc, buf)
if err != nil {
return ocispec.DescriptorEmptyJSON, err
}
output.Infof("Saved model layer: %s", desc.Digest)
return ocispec.DescriptorEmptyJSON, fmt.Errorf("Failed to open temporary file: %s", err)
}
defer file.Close()

if err := store.Push(ctx, desc, file); err != nil {
return ocispec.DescriptorEmptyJSON, err
}
output.Infof("Saved %s layer: %s", layer.Type(), desc.Digest)
return desc, nil
}

Expand Down

0 comments on commit d7529ae

Please sign in to comment.