diff --git a/pkg/artifact/kit-file.go b/pkg/artifact/kit-file.go index c3508cbb..7e52d474 100644 --- a/pkg/artifact/kit-file.go +++ b/pkg/artifact/kit-file.go @@ -10,11 +10,11 @@ import ( type ( KitFile struct { - ManifestVersion string `json:"manifestVersion"` - Kit ModelKit `json:"package,omitempty"` - Code []Code `json:"code,omitempty"` - DataSets []DataSet `json:"datasets,omitempty"` - Model TrainedModel `json:"model,omitempty"` + ManifestVersion string `json:"manifestVersion"` + Kit ModelKit `json:"package,omitempty"` + Code []Code `json:"code,omitempty"` + DataSets []DataSet `json:"datasets,omitempty"` + Model *TrainedModel `json:"model,omitempty"` } ModelKit struct { @@ -61,9 +61,14 @@ type ( } ) -func (kf *KitFile) LoadModel(file *os.File) error { +func (kf *KitFile) LoadModel(filePath string) error { + modelfile, err := os.Open(filePath) + if err != nil { + return err + } + defer modelfile.Close() // Read the file - data, err := io.ReadAll(file) + data, err := io.ReadAll(modelfile) if err != nil { return err } diff --git a/pkg/artifact/layer.go b/pkg/artifact/layer.go deleted file mode 100644 index d1365bdc..00000000 --- a/pkg/artifact/layer.go +++ /dev/null @@ -1,86 +0,0 @@ -package artifact - -import ( - "archive/tar" - "compress/gzip" - "io" - "os" - "path/filepath" - "strings" -) - -type ModelLayer struct { - BaseDir string - MediaType string -} - -func (layer *ModelLayer) Apply(writers ...io.Writer) error { - // Check if path exists - _, err := os.Stat(layer.BaseDir) - if err != nil { - return err - } - - mw := io.MultiWriter(writers...) - - gzw := gzip.NewWriter(mw) - defer gzw.Close() - - tw := tar.NewWriter(gzw) - defer tw.Close() - - // walk the context dir and tar everything - err = filepath.Walk(layer.BaseDir, func(file string, fi os.FileInfo, err error) error { - if err != nil { - return err - } - // Skip anything that's not a regular file or directory - if !fi.Mode().IsRegular() && !fi.Mode().IsDir() { - return nil - } - // Skip the baseDir itself - if file == layer.BaseDir { - return nil - } - - header, err := tar.FileInfoHeader(fi, fi.Name()) - if err != nil { - return err - } - - // We want the path in the tarball to be relative to the layer's base directory - subPath := strings.TrimPrefix(strings.Replace(file, layer.BaseDir, "", -1), string(filepath.Separator)) - header.Name = subPath - - if err := tw.WriteHeader(header); err != nil { - return err - } - - if fi.Mode().IsRegular() { - err := writeFileToTar(file, tw) - if err != nil { - return err - } - } - - return nil - }) - - if err != nil { - return err - } - return nil -} - -func writeFileToTar(file string, tw *tar.Writer) error { - f, err := os.Open(file) - if err != nil { - return err - } - defer f.Close() - - if _, err := io.Copy(tw, f); err != nil { - return err - } - return nil -} diff --git a/pkg/artifact/model.go b/pkg/artifact/model.go index 7b535197..24b17583 100644 --- a/pkg/artifact/model.go +++ b/pkg/artifact/model.go @@ -1,7 +1,28 @@ package artifact +import "kitops/pkg/lib/constants" + type Model struct { Repository string Layers []ModelLayer Config *KitFile } + +type ModelLayer struct { + Path string + MediaType string +} + +func (l *ModelLayer) Type() string { + switch l.MediaType { + case constants.CodeLayerMediaType: + return "code" + case constants.DataSetLayerMediaType: + return "dataset" + case constants.ModelConfigMediaType: + return "config" + case constants.ModelLayerMediaType: + return "model" + } + return "" +} diff --git a/pkg/cmd/build/build.go b/pkg/cmd/build/build.go index 5c48fe1a..3a24856d 100644 --- a/pkg/cmd/build/build.go +++ b/pkg/cmd/build/build.go @@ -9,18 +9,12 @@ import ( "kitops/pkg/lib/repo" "kitops/pkg/lib/storage" "kitops/pkg/output" - "os" ) func RunBuild(ctx context.Context, options *buildOptions) error { // 1. Read the model file - modelfile, err := os.Open(options.modelFile) - if err != nil { - return err - } - defer modelfile.Close() kitfile := &artifact.KitFile{} - if err = kitfile.LoadModel(modelfile); err != nil { + if err := kitfile.LoadModel(options.modelFile); err != nil { return err } @@ -34,7 +28,7 @@ func RunBuild(ctx context.Context, options *buildOptions) error { return err } layer := &artifact.ModelLayer{ - BaseDir: codePath, + Path: codePath, MediaType: constants.CodeLayerMediaType, } model.Layers = append(model.Layers, *layer) @@ -47,22 +41,24 @@ func RunBuild(ctx context.Context, options *buildOptions) error { return err } layer := &artifact.ModelLayer{ - BaseDir: datasetPath, + Path: datasetPath, MediaType: constants.DataSetLayerMediaType, } model.Layers = append(model.Layers, *layer) } // 4. package the TrainedModel - modelPath, err := filesystem.VerifySubpath(options.contextDir, kitfile.Model.Path) - if err != nil { - return err - } - layer := &artifact.ModelLayer{ - BaseDir: modelPath, - MediaType: constants.ModelLayerMediaType, + if kitfile.Model != nil { + modelPath, err := filesystem.VerifySubpath(options.contextDir, kitfile.Model.Path) + if err != nil { + return err + } + layer := &artifact.ModelLayer{ + Path: modelPath, + MediaType: constants.ModelLayerMediaType, + } + model.Layers = append(model.Layers, *layer) } - model.Layers = append(model.Layers, *layer) tag := "" if options.modelRef != nil { diff --git a/pkg/cmd/export/export.go b/pkg/cmd/export/export.go index 82062ec4..d536692b 100644 --- a/pkg/cmd/export/export.go +++ b/pkg/cmd/export/export.go @@ -100,7 +100,7 @@ func exportConfig(config *artifact.KitFile, exportDir string, overwrite bool) er return nil } -func exportLayer(ctx context.Context, store content.Storage, desc ocispec.Descriptor, exportDir string, overwrite bool) error { +func exportLayer(ctx context.Context, store content.Storage, desc ocispec.Descriptor, exportPath string, overwrite bool) error { rc, err := store.Fetch(ctx, desc) if err != nil { return fmt.Errorf("failed get layer %s: %w", desc.Digest, err) @@ -114,14 +114,13 @@ func exportLayer(ctx context.Context, store content.Storage, desc ocispec.Descri defer gzr.Close() tr := tar.NewReader(gzr) - if fi, exists := filesystem.PathExists(exportDir); exists { + if _, exists := filesystem.PathExists(exportPath); exists { if !overwrite { - return fmt.Errorf("failed to export: path %s already exists", exportDir) - } else if !fi.IsDir() { - return fmt.Errorf("failed to export: path %s exists and is not a directory", exportDir) + return fmt.Errorf("failed to export: path %s already exists", exportPath) } - output.Debugf("Directory %s already exists", exportDir) + output.Debugf("Directory %s already exists", exportPath) } + exportDir := filepath.Dir(exportPath) if err := os.MkdirAll(exportDir, 0755); err != nil { return fmt.Errorf("failed to create directory %s: %w", exportDir, err) } diff --git a/pkg/lib/storage/layer.go b/pkg/lib/storage/layer.go new file mode 100644 index 00000000..454cabdf --- /dev/null +++ b/pkg/lib/storage/layer.go @@ -0,0 +1,146 @@ +package storage + +import ( + "archive/tar" + "compress/gzip" + "fmt" + "io" + "kitops/pkg/artifact" + "kitops/pkg/output" + "os" + "path/filepath" + "strings" + + "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" +) + +func compressLayer(layer *artifact.ModelLayer) (string, ocispec.Descriptor, error) { + pathInfo, err := os.Stat(layer.Path) + if err != nil { + return "", ocispec.DescriptorEmptyJSON, err + } + tempFile, err := os.CreateTemp("", "kitops_layer_*") + if err != nil { + return "", ocispec.DescriptorEmptyJSON, fmt.Errorf("failed to create temporary file: %w", err) + } + tempFileName := tempFile.Name() + output.Debugf("Compressing layer to temporary file %s", tempFileName) + + digester := digest.Canonical.Digester() + mw := io.MultiWriter(tempFile, digester.Hash()) + + // Note: we have to close gzip writer before reading digest from digester as closing is what writes the GZIP footer + gzw := gzip.NewWriter(mw) + tw := tar.NewWriter(gzw) + + // Wrapper function for closing writers before returning an error + handleErr := func(err error) (string, ocispec.Descriptor, error) { + // Don't care about these errors since we'll be deleting the file anyways + _ = tw.Close() + _ = gzw.Close() + _ = tempFile.Close() + removeTempFile(tempFileName) + return "", ocispec.DescriptorEmptyJSON, err + } + + if pathInfo.Mode().IsRegular() { + if err := writeHeaderToTar(pathInfo.Name(), pathInfo, tw); err != nil { + return handleErr(err) + } + if err := writeFileToTar(layer.Path, pathInfo, tw); err != nil { + return handleErr(err) + } + } else if pathInfo.IsDir() { + if err := writeDirToTar(layer.Path, tw); err != nil { + return handleErr(err) + } + } else { + return handleErr(fmt.Errorf("path %s is neither a file nor a directory", layer.Path)) + } + + callAndPrintError(tw.Close, "Failed to close tar writer: %s") + callAndPrintError(gzw.Close, "Failed to close gzip writer: %s") + + tempFileInfo, err := tempFile.Stat() + if err != nil { + removeTempFile(tempFileName) + return "", ocispec.DescriptorEmptyJSON, fmt.Errorf("failed to stat temporary file: %w", err) + } + callAndPrintError(tempFile.Close, "Failed to close temporary file: %s") + + desc := ocispec.Descriptor{ + MediaType: layer.MediaType, + Digest: digester.Digest(), + Size: tempFileInfo.Size(), + } + return tempFileName, desc, nil +} + +func writeDirToTar(basePath string, tw *tar.Writer) error { + // We'll want paths in the tarball to be relative to the *parent* of basePath since we want + // to compress the directory pointed at by basePath + trimPath := filepath.Dir(basePath) + return filepath.Walk(basePath, func(file string, fi os.FileInfo, err error) error { + if err != nil { + return err + } + // Skip anything that's not a regular file or directory + if !fi.Mode().IsRegular() && !fi.Mode().IsDir() { + return nil + } + + relPath := strings.TrimPrefix(strings.Replace(file, trimPath, "", -1), string(filepath.Separator)) + if relPath == "" { + relPath = filepath.Base(basePath) + } + if err := writeHeaderToTar(relPath, fi, tw); err != nil { + return err + } + if fi.IsDir() { + return nil + } + return writeFileToTar(file, fi, tw) + }) +} + +func writeHeaderToTar(name string, fi os.FileInfo, tw *tar.Writer) error { + header, err := tar.FileInfoHeader(fi, "") + if err != nil { + return fmt.Errorf("failed to generate header for %s: %w", name, err) + } + header.Name = name + if err := tw.WriteHeader(header); err != nil { + return fmt.Errorf("failed to write header: %w", err) + } + output.Debugf("Wrote header %s to tar file", header.Name) + return nil +} + +func writeFileToTar(file string, fi os.FileInfo, tw *tar.Writer) error { + f, err := os.Open(file) + if err != nil { + return fmt.Errorf("failed to open file for archiving: %w", err) + } + defer f.Close() + + if written, err := io.Copy(tw, f); err != nil { + return fmt.Errorf("failed to add file to archive: %w", err) + } else if written != fi.Size() { + return fmt.Errorf("error writing file: %w", err) + } + output.Debugf("Wrote file %s to tar file", file) + return nil +} + +func callAndPrintError(f func() error, msg string) { + if err := f(); err != nil { + output.Errorf(msg, err) + } +} + +func removeTempFile(filepath string) { + if err := os.Remove(filepath); err != nil && !os.IsNotExist(err) { + output.Errorf("Failed to clean up temporary file %s: %s", filepath, err) + } +} diff --git a/pkg/lib/storage/local.go b/pkg/lib/storage/local.go index 4bf5b8bd..85ac2337 100644 --- a/pkg/lib/storage/local.go +++ b/pkg/lib/storage/local.go @@ -9,6 +9,7 @@ import ( "kitops/pkg/lib/constants" "kitops/pkg/lib/repo" "kitops/pkg/output" + "os" "github.com/opencontainers/go-digest" specs "github.com/opencontainers/image-spec/specs-go" @@ -39,34 +40,36 @@ func SaveModel(ctx context.Context, store oras.Target, model *artifact.Model, ta } func saveContentLayer(ctx context.Context, store oras.Target, layer *artifact.ModelLayer) (ocispec.Descriptor, error) { - buf := &bytes.Buffer{} - err := layer.Apply(buf) + // We want to store a gzipped tar file in store, but to do so we need a descriptor, so we have to compress + // to a temporary file. Ideally, we'd also add this to the internal store by moving the file to avoid + // copying if possible. + tempPath, desc, err := compressLayer(layer) if err != nil { return ocispec.DescriptorEmptyJSON, err } + defer func() { + if err := os.Remove(tempPath); err != nil { + output.Errorf("Failed to remove temporary file %s: %s", tempPath, err) + } + }() - // Create a descriptor for the layer - desc := ocispec.Descriptor{ - MediaType: layer.MediaType, - Digest: digest.FromBytes(buf.Bytes()), - Size: int64(buf.Len()), + if exists, err := store.Exists(ctx, desc); err != nil { + return ocispec.DescriptorEmptyJSON, err + } else if exists { + output.Infof("Already saved %s layer: %s", layer.Type(), desc.Digest) + return desc, nil } - exists, err := store.Exists(ctx, desc) + file, err := os.Open(tempPath) if err != nil { - return ocispec.DescriptorEmptyJSON, err - } - if exists { - output.Infof("Model layer already saved: %s", desc.Digest) - } else { - // Does not exist in storage, need to push - err = store.Push(ctx, desc, buf) - if err != nil { - return ocispec.DescriptorEmptyJSON, err - } - output.Infof("Saved model layer: %s", desc.Digest) + return ocispec.DescriptorEmptyJSON, fmt.Errorf("Failed to open temporary file: %s", err) } + defer file.Close() + if err := store.Push(ctx, desc, file); err != nil { + return ocispec.DescriptorEmptyJSON, err + } + output.Infof("Saved %s layer: %s", layer.Type(), desc.Digest) return desc, nil }