From bc9629289b7363c2803fc61d65749d1bc51f7075 Mon Sep 17 00:00:00 2001 From: Angel Misevski Date: Thu, 29 Feb 2024 20:16:17 -0500 Subject: [PATCH 1/2] Fixup issue during build if Kitfile does not define models Since switching from a list of models to a single model per Kitfile, Kitfiles that do not define models will deserialize to a Kitfile with an empty model. To avoid this, we change the `model` field on Kitfile to a pointer so that it deserializes to nil when not present. --- pkg/artifact/kit-file.go | 19 ++++++++++++------- pkg/cmd/build/build.go | 26 +++++++++++--------------- 2 files changed, 23 insertions(+), 22 deletions(-) diff --git a/pkg/artifact/kit-file.go b/pkg/artifact/kit-file.go index c3508cbb..7e52d474 100644 --- a/pkg/artifact/kit-file.go +++ b/pkg/artifact/kit-file.go @@ -10,11 +10,11 @@ import ( type ( KitFile struct { - ManifestVersion string `json:"manifestVersion"` - Kit ModelKit `json:"package,omitempty"` - Code []Code `json:"code,omitempty"` - DataSets []DataSet `json:"datasets,omitempty"` - Model TrainedModel `json:"model,omitempty"` + ManifestVersion string `json:"manifestVersion"` + Kit ModelKit `json:"package,omitempty"` + Code []Code `json:"code,omitempty"` + DataSets []DataSet `json:"datasets,omitempty"` + Model *TrainedModel `json:"model,omitempty"` } ModelKit struct { @@ -61,9 +61,14 @@ type ( } ) -func (kf *KitFile) LoadModel(file *os.File) error { +func (kf *KitFile) LoadModel(filePath string) error { + modelfile, err := os.Open(filePath) + if err != nil { + return err + } + defer modelfile.Close() // Read the file - data, err := io.ReadAll(file) + data, err := io.ReadAll(modelfile) if err != nil { return err } diff --git a/pkg/cmd/build/build.go b/pkg/cmd/build/build.go index 5c48fe1a..8d024d02 100644 --- a/pkg/cmd/build/build.go +++ b/pkg/cmd/build/build.go @@ -9,18 +9,12 @@ import ( "kitops/pkg/lib/repo" "kitops/pkg/lib/storage" "kitops/pkg/output" - "os" ) func RunBuild(ctx context.Context, options *buildOptions) error { // 1. Read the model file - modelfile, err := os.Open(options.modelFile) - if err != nil { - return err - } - defer modelfile.Close() kitfile := &artifact.KitFile{} - if err = kitfile.LoadModel(modelfile); err != nil { + if err := kitfile.LoadModel(options.modelFile); err != nil { return err } @@ -54,15 +48,17 @@ func RunBuild(ctx context.Context, options *buildOptions) error { } // 4. package the TrainedModel - modelPath, err := filesystem.VerifySubpath(options.contextDir, kitfile.Model.Path) - if err != nil { - return err - } - layer := &artifact.ModelLayer{ - BaseDir: modelPath, - MediaType: constants.ModelLayerMediaType, + if kitfile.Model != nil { + modelPath, err := filesystem.VerifySubpath(options.contextDir, kitfile.Model.Path) + if err != nil { + return err + } + layer := &artifact.ModelLayer{ + BaseDir: modelPath, + MediaType: constants.ModelLayerMediaType, + } + model.Layers = append(model.Layers, *layer) } - model.Layers = append(model.Layers, *layer) tag := "" if options.modelRef != nil { From ba47d3ad4fbb99554321e2ef8c3081e375961fef Mon Sep 17 00:00:00 2001 From: Angel Misevski Date: Thu, 29 Feb 2024 20:18:30 -0500 Subject: [PATCH 2/2] Rework build process to support both directories and single files Rework kit build to handle both layers specified as directories and as regular files. In particular, we need to be able to package and extract both model: path: ./model (directory) and model: path: ./model.onnx (file) This requires significant reworking of the build process as previously we assumed we could always 1) create a directory and 2) extract files into it. In addition, this commit changes the build process to stream the build to a temporary file rather than an in-memory buffer to avoid requiring a lot of memory. --- pkg/artifact/layer.go | 86 ----------------------- pkg/artifact/model.go | 21 ++++++ pkg/cmd/build/build.go | 6 +- pkg/cmd/export/export.go | 11 ++- pkg/lib/storage/layer.go | 146 +++++++++++++++++++++++++++++++++++++++ pkg/lib/storage/local.go | 41 ++++++----- 6 files changed, 197 insertions(+), 114 deletions(-) delete mode 100644 pkg/artifact/layer.go create mode 100644 pkg/lib/storage/layer.go diff --git a/pkg/artifact/layer.go b/pkg/artifact/layer.go deleted file mode 100644 index d1365bdc..00000000 --- a/pkg/artifact/layer.go +++ /dev/null @@ -1,86 +0,0 @@ -package artifact - -import ( - "archive/tar" - "compress/gzip" - "io" - "os" - "path/filepath" - "strings" -) - -type ModelLayer struct { - BaseDir string - MediaType string -} - -func (layer *ModelLayer) Apply(writers ...io.Writer) error { - // Check if path exists - _, err := os.Stat(layer.BaseDir) - if err != nil { - return err - } - - mw := io.MultiWriter(writers...) - - gzw := gzip.NewWriter(mw) - defer gzw.Close() - - tw := tar.NewWriter(gzw) - defer tw.Close() - - // walk the context dir and tar everything - err = filepath.Walk(layer.BaseDir, func(file string, fi os.FileInfo, err error) error { - if err != nil { - return err - } - // Skip anything that's not a regular file or directory - if !fi.Mode().IsRegular() && !fi.Mode().IsDir() { - return nil - } - // Skip the baseDir itself - if file == layer.BaseDir { - return nil - } - - header, err := tar.FileInfoHeader(fi, fi.Name()) - if err != nil { - return err - } - - // We want the path in the tarball to be relative to the layer's base directory - subPath := strings.TrimPrefix(strings.Replace(file, layer.BaseDir, "", -1), string(filepath.Separator)) - header.Name = subPath - - if err := tw.WriteHeader(header); err != nil { - return err - } - - if fi.Mode().IsRegular() { - err := writeFileToTar(file, tw) - if err != nil { - return err - } - } - - return nil - }) - - if err != nil { - return err - } - return nil -} - -func writeFileToTar(file string, tw *tar.Writer) error { - f, err := os.Open(file) - if err != nil { - return err - } - defer f.Close() - - if _, err := io.Copy(tw, f); err != nil { - return err - } - return nil -} diff --git a/pkg/artifact/model.go b/pkg/artifact/model.go index 7b535197..24b17583 100644 --- a/pkg/artifact/model.go +++ b/pkg/artifact/model.go @@ -1,7 +1,28 @@ package artifact +import "kitops/pkg/lib/constants" + type Model struct { Repository string Layers []ModelLayer Config *KitFile } + +type ModelLayer struct { + Path string + MediaType string +} + +func (l *ModelLayer) Type() string { + switch l.MediaType { + case constants.CodeLayerMediaType: + return "code" + case constants.DataSetLayerMediaType: + return "dataset" + case constants.ModelConfigMediaType: + return "config" + case constants.ModelLayerMediaType: + return "model" + } + return "" +} diff --git a/pkg/cmd/build/build.go b/pkg/cmd/build/build.go index 8d024d02..3a24856d 100644 --- a/pkg/cmd/build/build.go +++ b/pkg/cmd/build/build.go @@ -28,7 +28,7 @@ func RunBuild(ctx context.Context, options *buildOptions) error { return err } layer := &artifact.ModelLayer{ - BaseDir: codePath, + Path: codePath, MediaType: constants.CodeLayerMediaType, } model.Layers = append(model.Layers, *layer) @@ -41,7 +41,7 @@ func RunBuild(ctx context.Context, options *buildOptions) error { return err } layer := &artifact.ModelLayer{ - BaseDir: datasetPath, + Path: datasetPath, MediaType: constants.DataSetLayerMediaType, } model.Layers = append(model.Layers, *layer) @@ -54,7 +54,7 @@ func RunBuild(ctx context.Context, options *buildOptions) error { return err } layer := &artifact.ModelLayer{ - BaseDir: modelPath, + Path: modelPath, MediaType: constants.ModelLayerMediaType, } model.Layers = append(model.Layers, *layer) diff --git a/pkg/cmd/export/export.go b/pkg/cmd/export/export.go index 82062ec4..d536692b 100644 --- a/pkg/cmd/export/export.go +++ b/pkg/cmd/export/export.go @@ -100,7 +100,7 @@ func exportConfig(config *artifact.KitFile, exportDir string, overwrite bool) er return nil } -func exportLayer(ctx context.Context, store content.Storage, desc ocispec.Descriptor, exportDir string, overwrite bool) error { +func exportLayer(ctx context.Context, store content.Storage, desc ocispec.Descriptor, exportPath string, overwrite bool) error { rc, err := store.Fetch(ctx, desc) if err != nil { return fmt.Errorf("failed get layer %s: %w", desc.Digest, err) @@ -114,14 +114,13 @@ func exportLayer(ctx context.Context, store content.Storage, desc ocispec.Descri defer gzr.Close() tr := tar.NewReader(gzr) - if fi, exists := filesystem.PathExists(exportDir); exists { + if _, exists := filesystem.PathExists(exportPath); exists { if !overwrite { - return fmt.Errorf("failed to export: path %s already exists", exportDir) - } else if !fi.IsDir() { - return fmt.Errorf("failed to export: path %s exists and is not a directory", exportDir) + return fmt.Errorf("failed to export: path %s already exists", exportPath) } - output.Debugf("Directory %s already exists", exportDir) + output.Debugf("Directory %s already exists", exportPath) } + exportDir := filepath.Dir(exportPath) if err := os.MkdirAll(exportDir, 0755); err != nil { return fmt.Errorf("failed to create directory %s: %w", exportDir, err) } diff --git a/pkg/lib/storage/layer.go b/pkg/lib/storage/layer.go new file mode 100644 index 00000000..454cabdf --- /dev/null +++ b/pkg/lib/storage/layer.go @@ -0,0 +1,146 @@ +package storage + +import ( + "archive/tar" + "compress/gzip" + "fmt" + "io" + "kitops/pkg/artifact" + "kitops/pkg/output" + "os" + "path/filepath" + "strings" + + "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" +) + +func compressLayer(layer *artifact.ModelLayer) (string, ocispec.Descriptor, error) { + pathInfo, err := os.Stat(layer.Path) + if err != nil { + return "", ocispec.DescriptorEmptyJSON, err + } + tempFile, err := os.CreateTemp("", "kitops_layer_*") + if err != nil { + return "", ocispec.DescriptorEmptyJSON, fmt.Errorf("failed to create temporary file: %w", err) + } + tempFileName := tempFile.Name() + output.Debugf("Compressing layer to temporary file %s", tempFileName) + + digester := digest.Canonical.Digester() + mw := io.MultiWriter(tempFile, digester.Hash()) + + // Note: we have to close gzip writer before reading digest from digester as closing is what writes the GZIP footer + gzw := gzip.NewWriter(mw) + tw := tar.NewWriter(gzw) + + // Wrapper function for closing writers before returning an error + handleErr := func(err error) (string, ocispec.Descriptor, error) { + // Don't care about these errors since we'll be deleting the file anyways + _ = tw.Close() + _ = gzw.Close() + _ = tempFile.Close() + removeTempFile(tempFileName) + return "", ocispec.DescriptorEmptyJSON, err + } + + if pathInfo.Mode().IsRegular() { + if err := writeHeaderToTar(pathInfo.Name(), pathInfo, tw); err != nil { + return handleErr(err) + } + if err := writeFileToTar(layer.Path, pathInfo, tw); err != nil { + return handleErr(err) + } + } else if pathInfo.IsDir() { + if err := writeDirToTar(layer.Path, tw); err != nil { + return handleErr(err) + } + } else { + return handleErr(fmt.Errorf("path %s is neither a file nor a directory", layer.Path)) + } + + callAndPrintError(tw.Close, "Failed to close tar writer: %s") + callAndPrintError(gzw.Close, "Failed to close gzip writer: %s") + + tempFileInfo, err := tempFile.Stat() + if err != nil { + removeTempFile(tempFileName) + return "", ocispec.DescriptorEmptyJSON, fmt.Errorf("failed to stat temporary file: %w", err) + } + callAndPrintError(tempFile.Close, "Failed to close temporary file: %s") + + desc := ocispec.Descriptor{ + MediaType: layer.MediaType, + Digest: digester.Digest(), + Size: tempFileInfo.Size(), + } + return tempFileName, desc, nil +} + +func writeDirToTar(basePath string, tw *tar.Writer) error { + // We'll want paths in the tarball to be relative to the *parent* of basePath since we want + // to compress the directory pointed at by basePath + trimPath := filepath.Dir(basePath) + return filepath.Walk(basePath, func(file string, fi os.FileInfo, err error) error { + if err != nil { + return err + } + // Skip anything that's not a regular file or directory + if !fi.Mode().IsRegular() && !fi.Mode().IsDir() { + return nil + } + + relPath := strings.TrimPrefix(strings.Replace(file, trimPath, "", -1), string(filepath.Separator)) + if relPath == "" { + relPath = filepath.Base(basePath) + } + if err := writeHeaderToTar(relPath, fi, tw); err != nil { + return err + } + if fi.IsDir() { + return nil + } + return writeFileToTar(file, fi, tw) + }) +} + +func writeHeaderToTar(name string, fi os.FileInfo, tw *tar.Writer) error { + header, err := tar.FileInfoHeader(fi, "") + if err != nil { + return fmt.Errorf("failed to generate header for %s: %w", name, err) + } + header.Name = name + if err := tw.WriteHeader(header); err != nil { + return fmt.Errorf("failed to write header: %w", err) + } + output.Debugf("Wrote header %s to tar file", header.Name) + return nil +} + +func writeFileToTar(file string, fi os.FileInfo, tw *tar.Writer) error { + f, err := os.Open(file) + if err != nil { + return fmt.Errorf("failed to open file for archiving: %w", err) + } + defer f.Close() + + if written, err := io.Copy(tw, f); err != nil { + return fmt.Errorf("failed to add file to archive: %w", err) + } else if written != fi.Size() { + return fmt.Errorf("error writing file: %w", err) + } + output.Debugf("Wrote file %s to tar file", file) + return nil +} + +func callAndPrintError(f func() error, msg string) { + if err := f(); err != nil { + output.Errorf(msg, err) + } +} + +func removeTempFile(filepath string) { + if err := os.Remove(filepath); err != nil && !os.IsNotExist(err) { + output.Errorf("Failed to clean up temporary file %s: %s", filepath, err) + } +} diff --git a/pkg/lib/storage/local.go b/pkg/lib/storage/local.go index 4bf5b8bd..85ac2337 100644 --- a/pkg/lib/storage/local.go +++ b/pkg/lib/storage/local.go @@ -9,6 +9,7 @@ import ( "kitops/pkg/lib/constants" "kitops/pkg/lib/repo" "kitops/pkg/output" + "os" "github.com/opencontainers/go-digest" specs "github.com/opencontainers/image-spec/specs-go" @@ -39,34 +40,36 @@ func SaveModel(ctx context.Context, store oras.Target, model *artifact.Model, ta } func saveContentLayer(ctx context.Context, store oras.Target, layer *artifact.ModelLayer) (ocispec.Descriptor, error) { - buf := &bytes.Buffer{} - err := layer.Apply(buf) + // We want to store a gzipped tar file in store, but to do so we need a descriptor, so we have to compress + // to a temporary file. Ideally, we'd also add this to the internal store by moving the file to avoid + // copying if possible. + tempPath, desc, err := compressLayer(layer) if err != nil { return ocispec.DescriptorEmptyJSON, err } + defer func() { + if err := os.Remove(tempPath); err != nil { + output.Errorf("Failed to remove temporary file %s: %s", tempPath, err) + } + }() - // Create a descriptor for the layer - desc := ocispec.Descriptor{ - MediaType: layer.MediaType, - Digest: digest.FromBytes(buf.Bytes()), - Size: int64(buf.Len()), + if exists, err := store.Exists(ctx, desc); err != nil { + return ocispec.DescriptorEmptyJSON, err + } else if exists { + output.Infof("Already saved %s layer: %s", layer.Type(), desc.Digest) + return desc, nil } - exists, err := store.Exists(ctx, desc) + file, err := os.Open(tempPath) if err != nil { - return ocispec.DescriptorEmptyJSON, err - } - if exists { - output.Infof("Model layer already saved: %s", desc.Digest) - } else { - // Does not exist in storage, need to push - err = store.Push(ctx, desc, buf) - if err != nil { - return ocispec.DescriptorEmptyJSON, err - } - output.Infof("Saved model layer: %s", desc.Digest) + return ocispec.DescriptorEmptyJSON, fmt.Errorf("Failed to open temporary file: %s", err) } + defer file.Close() + if err := store.Push(ctx, desc, file); err != nil { + return ocispec.DescriptorEmptyJSON, err + } + output.Infof("Saved %s layer: %s", layer.Type(), desc.Digest) return desc, nil }