From 69e32c5a9c6306df4f08c8fdb0c29d79c1963d0e Mon Sep 17 00:00:00 2001 From: Bruno Michel Date: Wed, 13 Dec 2023 17:46:11 +0100 Subject: [PATCH] Optimize sharing for instances on the same stack When two instances are on the same stack and there is a sharing between for files, the stack can optimize the share-upload worker. Before this change, the content of the file was fetched from the VFS of the source instance, then uploaded to the destination instance, which copies it to its VFS. Now, the stack just makes a copy from one VFS to the other. It's a lot lighter on I/O, and should help making the share-upload worker faster. --- model/sharing/member.go | 10 ++ model/sharing/upload.go | 115 ++++++++++++++----- model/vfs/vfs.go | 5 + model/vfs/vfsafero/impl.go | 27 +++++ model/vfs/vfsswift/impl_v3.go | 98 ++++++++++++++++ tests/system/tests/sharing_moves_n_delete.rb | 13 ++- web/sharings/replicator.go | 17 ++- 7 files changed, 248 insertions(+), 37 deletions(-) diff --git a/model/sharing/member.go b/model/sharing/member.go index 9e333c52291..c6696e626f0 100644 --- a/model/sharing/member.go +++ b/model/sharing/member.go @@ -79,6 +79,16 @@ func (m *Member) PrimaryName() string { return m.Email } +// InstanceHost returns the domain part of the Cozy URL of the member, which +// can be used to find the instance in CouchDB. It may includes the port. +func (m *Member) InstanceHost() string { + u, err := url.Parse(m.Instance) + if err != nil { + return "" + } + return u.Host +} + // Credentials is the struct with the secret stuff used for authentication & // authorization. type Credentials struct { diff --git a/model/sharing/upload.go b/model/sharing/upload.go index 1c8a1ec486b..44ec563e6f5 100644 --- a/model/sharing/upload.go +++ b/model/sharing/upload.go @@ -14,6 +14,7 @@ import ( "github.com/cozy/cozy-stack/client/request" "github.com/cozy/cozy-stack/model/instance" + "github.com/cozy/cozy-stack/model/instance/lifecycle" "github.com/cozy/cozy-stack/model/vfs" "github.com/cozy/cozy-stack/pkg/config/config" "github.com/cozy/cozy-stack/pkg/consts" @@ -29,6 +30,10 @@ type UploadMsg struct { Errors int `json:"errors"` } +// fileCreatorWithContent is a function that can be used to create a file in +// the given VFS. The content comes from the function closure. +type fileCreatorWithContent func(fs vfs.VFS, newdoc, olddoc *vfs.FileDoc) error + // Upload starts uploading files for this sharing func (s *Sharing) Upload(inst *instance.Instance, errors int) error { mu := config.Lock().ReadWrite(inst, "sharings/"+s.SID+"/upload") @@ -318,6 +323,17 @@ func (s *Sharing) uploadFile(inst *instance.Instance, m *Member, file map[string if err != nil { return err } + + dstInstance, err := lifecycle.GetInstance(m.InstanceHost()) + if err == nil && onSameStack(inst, dstInstance) { + err := s.optimizedUploadFile(inst, dstInstance, m, fileDoc, file, resBody) + if err != nil { + inst.Logger().WithNamespace("upload"). + Warnf("optimizedUploadFile failed to upload %s to %s (%s): %s", origFileID, m.Instance, s.ID(), err) + } + return err + } + content, err := fs.OpenFile(fileDoc) if err != nil { return err @@ -350,6 +366,43 @@ func (s *Sharing) uploadFile(inst *instance.Instance, m *Member, file map[string return nil } +func onSameStack(src, dst *instance.Instance) bool { + var srcPort, dstPort string + parts := strings.SplitN(src.Domain, ":", 2) + if len(parts) > 1 { + srcPort = parts[1] + } + parts = strings.SplitN(dst.Domain, ":", 2) + if len(parts) > 1 { + dstPort = parts[1] + } + return srcPort == dstPort +} + +func (s *Sharing) optimizedUploadFile( + srcInstance, dstInstance *instance.Instance, + m *Member, + srcFile *vfs.FileDoc, + dstFile map[string]interface{}, + key KeyToUpload, +) error { + srcInstance.Logger().WithNamespace("upload"). + Debugf("optimizedUploadFile %s to %s (%s)", srcFile.ID(), m.Instance, s.ID()) + + create := func(fs vfs.VFS, newdoc, olddoc *vfs.FileDoc) error { + return fs.CopyFileFromOtherFS(newdoc, olddoc, srcInstance.VFS(), srcFile) + } + + dstSharing, err := FindSharing(dstInstance, s.ID()) + if err != nil { + return err + } + if !dstSharing.Active { + return ErrInvalidSharing + } + return dstSharing.HandleFileUpload(dstInstance, key.Key, create) +} + // FileDocWithRevisions is the struct of the payload for synchronizing a file type FileDocWithRevisions struct { *vfs.FileDoc @@ -508,8 +561,7 @@ func (s *Sharing) updateFileMetadata(inst *instance.Instance, target *FileDocWit // HandleFileUpload is used to receive a file upload when synchronizing just // the metadata was not enough. -func (s *Sharing) HandleFileUpload(inst *instance.Instance, key string, body io.ReadCloser) error { - defer body.Close() +func (s *Sharing) HandleFileUpload(inst *instance.Instance, key string, create fileCreatorWithContent) error { target, err := getStore().Get(inst, key) if err != nil { return err @@ -533,13 +585,17 @@ func (s *Sharing) HandleFileUpload(inst *instance.Instance, key string, body io. } if current == nil { - return s.UploadNewFile(inst, target, body) + return s.UploadNewFile(inst, target, create) } - return s.UploadExistingFile(inst, target, current, body) + return s.UploadExistingFile(inst, target, current, create) } // UploadNewFile is used to receive a new file. -func (s *Sharing) UploadNewFile(inst *instance.Instance, target *FileDocWithRevisions, body io.ReadCloser) error { +func (s *Sharing) UploadNewFile( + inst *instance.Instance, + target *FileDocWithRevisions, + create fileCreatorWithContent, +) error { inst.Logger().WithNamespace("upload").Debugf("UploadNewFile") ref := SharedRef{ Infos: make(map[string]SharedInfo), @@ -617,7 +673,7 @@ func (s *Sharing) UploadNewFile(inst *instance.Instance, target *FileDocWithRevi newdoc.ReferencedBy = append(newdoc.ReferencedBy, ref) } - file, err := fs.CreateFile(newdoc, nil) + err = create(fs, newdoc, nil) if errors.Is(err, os.ErrExist) { pth, errp := newdoc.Path(fs) if errp != nil { @@ -632,7 +688,7 @@ func (s *Sharing) UploadNewFile(inst *instance.Instance, target *FileDocWithRevi newdoc.DocName = name newdoc.ResetFullpath() } - file, err = fs.CreateFile(newdoc, nil) + err = create(fs, newdoc, nil) } if err != nil { inst.Logger().WithNamespace("upload"). @@ -640,9 +696,9 @@ func (s *Sharing) UploadNewFile(inst *instance.Instance, target *FileDocWithRevi return err } if s.NbFiles > 0 { - defer s.countReceivedFiles(inst) + s.countReceivedFiles(inst) } - return copyFileContent(inst, file, body) + return nil } // countReceivedFiles counts the number of files received during the initial @@ -695,7 +751,12 @@ func (s *Sharing) countReceivedFiles(inst *instance.Instance) { // than on content: a conflict on different content is resolved by a copy of // the file (which is not what we want), a conflict of name+dir_id, the higher // revision wins and it should be the good one in our case. -func (s *Sharing) UploadExistingFile(inst *instance.Instance, target *FileDocWithRevisions, newdoc *vfs.FileDoc, body io.ReadCloser) error { +func (s *Sharing) UploadExistingFile( + inst *instance.Instance, + target *FileDocWithRevisions, + newdoc *vfs.FileDoc, + create fileCreatorWithContent, +) error { inst.Logger().WithNamespace("upload").Debugf("UploadExistingFile") var ref SharedRef err := couchdb.GetDoc(inst, consts.Shared, consts.Files+"/"+target.DocID, &ref) @@ -731,7 +792,7 @@ func (s *Sharing) UploadExistingFile(inst *instance.Instance, target *FileDocWit conflict := detectConflict(newdoc.DocRev, chain) switch conflict { case LostConflict: - return s.uploadLostConflict(inst, target, newdoc, body) + return s.uploadLostConflict(inst, target, newdoc, create) case WonConflict: if err = s.uploadWonConflict(inst, olddoc); err != nil { return err @@ -743,11 +804,7 @@ func (s *Sharing) UploadExistingFile(inst *instance.Instance, target *FileDocWit // Easy case: only the content has changed, not its path if newdoc.DocName == olddoc.DocName && newdoc.DirID == olddoc.DirID { - file, errf := fs.CreateFile(newdoc, olddoc) - if errf != nil { - return errf - } - return copyFileContent(inst, file, body) + return create(fs, newdoc, olddoc) } stash := indexer.StashRevision(false) @@ -755,11 +812,7 @@ func (s *Sharing) UploadExistingFile(inst *instance.Instance, target *FileDocWit tmpdoc.DocName = olddoc.DocName tmpdoc.DirID = olddoc.DirID tmpdoc.ResetFullpath() - file, err := fs.CreateFile(tmpdoc, olddoc) - if err != nil { - return err - } - if err = copyFileContent(inst, file, body); err != nil { + if err := create(fs, tmpdoc, olddoc); err != nil { return err } @@ -788,7 +841,12 @@ func (s *Sharing) UploadExistingFile(inst *instance.Instance, target *FileDocWit // uploadLostConflict manages an upload where a file is in conflict, and the // uploaded file version goes to a new file. -func (s *Sharing) uploadLostConflict(inst *instance.Instance, target *FileDocWithRevisions, newdoc *vfs.FileDoc, body io.ReadCloser) error { +func (s *Sharing) uploadLostConflict( + inst *instance.Instance, + target *FileDocWithRevisions, + newdoc *vfs.FileDoc, + create fileCreatorWithContent, +) error { rev := target.Rev() inst.Logger().WithNamespace("upload").Debugf("uploadLostConflict %s", rev) indexer := newSharingIndexer(inst, &bulkRevs{ @@ -798,21 +856,16 @@ func (s *Sharing) uploadLostConflict(inst *instance.Instance, target *FileDocWit fs := inst.VFS().UseSharingIndexer(indexer) newdoc.DocID = conflictID(newdoc.DocID, rev) if _, err := fs.FileByID(newdoc.DocID); !errors.Is(err, os.ErrNotExist) { - if err != nil { - return err - } - body.Close() - return nil + return err } newdoc.DocName = conflictName(indexer, newdoc.DirID, newdoc.DocName, true) newdoc.DocRev = "" newdoc.ResetFullpath() - file, err := fs.CreateFile(newdoc, nil) - if err != nil { + if err := create(fs, newdoc, nil); err != nil { + inst.Logger().WithNamespace("upload").Debugf("1. loser = %#v", newdoc) return err } - inst.Logger().WithNamespace("upload").Debugf("1. loser = %#v", newdoc) - return copyFileContent(inst, file, body) + return nil } // uploadWonConflict manages an upload where a file is in conflict, and the diff --git a/model/vfs/vfs.go b/model/vfs/vfs.go index dd1cf2cdb85..d173b02b575 100644 --- a/model/vfs/vfs.go +++ b/model/vfs/vfs.go @@ -126,6 +126,11 @@ type Fs interface { // version. ImportFileVersion(version *Version, content io.ReadCloser) error + // CopyFileFromOtherFS creates or updates a file by copying the content of + // a file in another Cozy. It is used for sharings, to optimize I/O when + // two instances are on the same stack. + CopyFileFromOtherFS(olddoc, newdoc *FileDoc, srcFS Fs, srcDoc *FileDoc) error + // Fsck return the list of inconsistencies in the VFS Fsck(func(log *FsckLog), bool) (err error) CheckFilesConsistency(func(*FsckLog), bool) error diff --git a/model/vfs/vfsafero/impl.go b/model/vfs/vfsafero/impl.go index 0d54af1b9ab..7b81e16c2e5 100644 --- a/model/vfs/vfsafero/impl.go +++ b/model/vfs/vfsafero/impl.go @@ -1,3 +1,6 @@ +// Package vfsafero is the implementation of the Virtual File System by using +// afero. Afero is a library for manipulating files and directory on the local +// file system. package vfsafero import ( @@ -572,6 +575,30 @@ func (afs *aferoVFS) RevertFileVersion(doc *vfs.FileDoc, version *vfs.Version) e return nil } +func (afs *aferoVFS) CopyFileFromOtherFS( + newdoc, olddoc *vfs.FileDoc, + srcFS vfs.Fs, + srcDoc *vfs.FileDoc, +) error { + content, err := srcFS.OpenFile(srcDoc) + if err != nil { + return err + } + defer content.Close() + + fd, err := afs.CreateFile(newdoc, olddoc) + if err != nil { + return err + } + + _, err = io.Copy(fd, content) + errc := fd.Close() + if err != nil { + return err + } + return errc +} + // UpdateFileDoc overrides the indexer's one since the afero.Fs is by essence // also indexed by path. When moving a file, the index has to be moved and the // filesystem should also be updated. diff --git a/model/vfs/vfsswift/impl_v3.go b/model/vfs/vfsswift/impl_v3.go index fe70757e115..97a345f5d45 100644 --- a/model/vfs/vfsswift/impl_v3.go +++ b/model/vfs/vfsswift/impl_v3.go @@ -1,3 +1,6 @@ +// Package vfsswift is the implementation of the Virtual File System by using +// Swift from the OpenStack project. The file contents are saved in the object +// storage (Swift), and the metadata are indexed in CouchDB. package vfsswift import ( @@ -591,6 +594,101 @@ func (sfs *swiftVFSV3) RevertFileVersion(doc *vfs.FileDoc, version *vfs.Version) return sfs.Indexer.DeleteVersion(version) } +func (sfs *swiftVFSV3) CopyFileFromOtherFS( + newdoc, olddoc *vfs.FileDoc, + srcFS vfs.Fs, + srcDoc *vfs.FileDoc, +) error { + if lockerr := sfs.mu.Lock(); lockerr != nil { + return lockerr + } + defer sfs.mu.Unlock() + + newsize, maxsize, capsize, err := vfs.CheckAvailableDiskSpace(sfs, newdoc) + if err != nil { + return err + } + if newsize > maxsize { + return vfs.ErrFileTooBig + } + + newpath, err := sfs.Indexer.FilePath(newdoc) + if err != nil { + return err + } + if strings.HasPrefix(newpath, vfs.TrashDirName+"/") { + return vfs.ErrParentInTrash + } + + if olddoc == nil { + var exists bool + exists, err = sfs.Indexer.DirChildExists(newdoc.DirID, newdoc.DocName) + if err != nil { + return err + } + if exists { + return os.ErrExist + } + } + + if newdoc.DocID == "" { + uid, err := uuid.NewV7() + if err != nil { + return err + } + newdoc.DocID = uid.String() + } + + newdoc.InternalID = NewInternalID() + + srcName := MakeObjectNameV3(srcDoc.DocID, srcDoc.InternalID) + dstName := MakeObjectNameV3(newdoc.DocID, newdoc.InternalID) + srcContainer := srcFS.(*swiftVFSV3).container + if _, err := sfs.c.ObjectCopy(sfs.ctx, srcContainer, srcName, sfs.container, dstName, nil); err != nil { + return err + } + + var v *vfs.Version + if olddoc != nil { + v = vfs.NewVersion(olddoc) + err = sfs.Indexer.UpdateFileDoc(olddoc, newdoc) + } else { + err = sfs.Indexer.CreateNamedFileDoc(newdoc) + } + if err != nil { + return err + } + + if v != nil { + actionV, toClean, _ := vfs.FindVersionsToClean(sfs, newdoc.DocID, v) + if bytes.Equal(newdoc.MD5Sum, olddoc.MD5Sum) { + actionV = vfs.CleanCandidateVersion + } + if actionV == vfs.KeepCandidateVersion { + if errv := sfs.Indexer.CreateVersion(v); errv != nil { + actionV = vfs.CleanCandidateVersion + } + } + if actionV == vfs.CleanCandidateVersion { + internalID := v.DocID + if parts := strings.SplitN(v.DocID, "/", 2); len(parts) > 1 { + internalID = parts[1] + } + objName := MakeObjectNameV3(newdoc.DocID, internalID) + _ = sfs.c.ObjectDelete(sfs.ctx, sfs.container, objName) + } + for _, old := range toClean { + _ = cleanOldVersion(sfs, newdoc.DocID, old) + } + } + + if capsize > 0 && newsize >= capsize { + vfs.PushDiskQuotaAlert(sfs, true) + } + + return nil +} + // UpdateFileDoc calls the indexer UpdateFileDoc function and adds a few checks // before actually calling this method: // - locks the filesystem for writing diff --git a/tests/system/tests/sharing_moves_n_delete.rb b/tests/system/tests/sharing_moves_n_delete.rb index 7bd8cbf3af2..09f557132ef 100644 --- a/tests/system/tests/sharing_moves_n_delete.rb +++ b/tests/system/tests/sharing_moves_n_delete.rb @@ -7,18 +7,21 @@ Helpers.scenario "moves_n_delete" Helpers.start_mailhog - # Create the instance + # Create the instances inst = Instance.create name: "Alice" inst_bob = Instance.create name: "Bob" - inst_charlie = Instance.create name: "Charlie" + inst_charlie = Instance.create name: "Charlie", port: inst.stack.port # Create hierarchy folder = Folder.create inst folder.couch_id.wont_be_empty subdir = Folder.create inst, dir_id: folder.couch_id - child1 = Folder.create inst, dir_id: subdir.couch_id - child2 = Folder.create inst, dir_id: subdir.couch_id - child3 = Folder.create inst, dir_id: subdir.couch_id + childname1 = "c_#{Faker::Internet.slug}1" + childname2 = "c_#{Faker::Internet.slug}1" + childname3 = "c_#{Faker::Internet.slug}1" + child1 = Folder.create inst, dir_id: subdir.couch_id, name: childname1 + child2 = Folder.create inst, dir_id: subdir.couch_id, name: childname2 + child3 = Folder.create inst, dir_id: subdir.couch_id, name: childname3 filename1 = "#{Faker::Internet.slug}1.txt" filename2 = "#{Faker::Internet.slug}2.txt" filename3 = "#{Faker::Internet.slug}3.txt" diff --git a/web/sharings/replicator.go b/web/sharings/replicator.go index e7fe867be79..05f74e03512 100644 --- a/web/sharings/replicator.go +++ b/web/sharings/replicator.go @@ -3,9 +3,11 @@ package sharings import ( "encoding/json" "errors" + "io" "net/http" "github.com/cozy/cozy-stack/model/sharing" + "github.com/cozy/cozy-stack/model/vfs" "github.com/cozy/cozy-stack/pkg/consts" "github.com/cozy/cozy-stack/pkg/jsonapi" "github.com/cozy/cozy-stack/web/middlewares" @@ -127,7 +129,20 @@ func FileHandler(c echo.Context) error { inst.Logger().WithNamespace("replicator").Infof("Sharing was not found: %s", err) return wrapErrors(err) } - if err := s.HandleFileUpload(inst, c.Param("id"), c.Request().Body); err != nil { + + create := func(fs vfs.VFS, newdoc, olddoc *vfs.FileDoc) error { + file, err := fs.CreateFile(newdoc, olddoc) + if err != nil { + return err + } + _, err = io.Copy(file, c.Request().Body) + if cerr := file.Close(); cerr != nil && err == nil { + return cerr + } + return err + } + + if err := s.HandleFileUpload(inst, c.Param("id"), create); err != nil { inst.Logger().WithNamespace("replicator").Infof("Error on file upload: %s", err) return wrapErrors(err) }