Skip to content

Commit

Permalink
Optimize sharing for instances on the same stack (#4260)
Browse files Browse the repository at this point in the history
When two instances are on the same stack and there is a sharing between
for files, the stack can optimize the share-upload worker. Before this
change, the content of the file was fetched from the VFS of the source
instance, then uploaded to the destination instance, which copies it to
its VFS. Now, the stack just makes a copy from one VFS to the other.
It's a lot lighter on I/O, and should help making the share-upload
worker faster.
  • Loading branch information
nono authored Dec 28, 2023
2 parents 5d65611 + 69e32c5 commit 349a71b
Show file tree
Hide file tree
Showing 7 changed files with 248 additions and 37 deletions.
10 changes: 10 additions & 0 deletions model/sharing/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,16 @@ func (m *Member) PrimaryName() string {
return m.Email
}

// InstanceHost returns the domain part of the Cozy URL of the member, which
// can be used to find the instance in CouchDB. It may includes the port.
func (m *Member) InstanceHost() string {
u, err := url.Parse(m.Instance)
if err != nil {
return ""
}
return u.Host
}

// Credentials is the struct with the secret stuff used for authentication &
// authorization.
type Credentials struct {
Expand Down
115 changes: 84 additions & 31 deletions model/sharing/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/cozy/cozy-stack/client/request"
"github.com/cozy/cozy-stack/model/instance"
"github.com/cozy/cozy-stack/model/instance/lifecycle"
"github.com/cozy/cozy-stack/model/vfs"
"github.com/cozy/cozy-stack/pkg/config/config"
"github.com/cozy/cozy-stack/pkg/consts"
Expand All @@ -29,6 +30,10 @@ type UploadMsg struct {
Errors int `json:"errors"`
}

// fileCreatorWithContent is a function that can be used to create a file in
// the given VFS. The content comes from the function closure.
type fileCreatorWithContent func(fs vfs.VFS, newdoc, olddoc *vfs.FileDoc) error

// Upload starts uploading files for this sharing
func (s *Sharing) Upload(inst *instance.Instance, errors int) error {
mu := config.Lock().ReadWrite(inst, "sharings/"+s.SID+"/upload")
Expand Down Expand Up @@ -318,6 +323,17 @@ func (s *Sharing) uploadFile(inst *instance.Instance, m *Member, file map[string
if err != nil {
return err
}

dstInstance, err := lifecycle.GetInstance(m.InstanceHost())
if err == nil && onSameStack(inst, dstInstance) {
err := s.optimizedUploadFile(inst, dstInstance, m, fileDoc, file, resBody)
if err != nil {
inst.Logger().WithNamespace("upload").
Warnf("optimizedUploadFile failed to upload %s to %s (%s): %s", origFileID, m.Instance, s.ID(), err)
}
return err
}

content, err := fs.OpenFile(fileDoc)
if err != nil {
return err
Expand Down Expand Up @@ -350,6 +366,43 @@ func (s *Sharing) uploadFile(inst *instance.Instance, m *Member, file map[string
return nil
}

func onSameStack(src, dst *instance.Instance) bool {
var srcPort, dstPort string
parts := strings.SplitN(src.Domain, ":", 2)
if len(parts) > 1 {
srcPort = parts[1]
}
parts = strings.SplitN(dst.Domain, ":", 2)
if len(parts) > 1 {
dstPort = parts[1]
}
return srcPort == dstPort
}

func (s *Sharing) optimizedUploadFile(
srcInstance, dstInstance *instance.Instance,
m *Member,
srcFile *vfs.FileDoc,
dstFile map[string]interface{},
key KeyToUpload,
) error {
srcInstance.Logger().WithNamespace("upload").
Debugf("optimizedUploadFile %s to %s (%s)", srcFile.ID(), m.Instance, s.ID())

create := func(fs vfs.VFS, newdoc, olddoc *vfs.FileDoc) error {
return fs.CopyFileFromOtherFS(newdoc, olddoc, srcInstance.VFS(), srcFile)
}

dstSharing, err := FindSharing(dstInstance, s.ID())
if err != nil {
return err
}
if !dstSharing.Active {
return ErrInvalidSharing
}
return dstSharing.HandleFileUpload(dstInstance, key.Key, create)
}

// FileDocWithRevisions is the struct of the payload for synchronizing a file
type FileDocWithRevisions struct {
*vfs.FileDoc
Expand Down Expand Up @@ -508,8 +561,7 @@ func (s *Sharing) updateFileMetadata(inst *instance.Instance, target *FileDocWit

// HandleFileUpload is used to receive a file upload when synchronizing just
// the metadata was not enough.
func (s *Sharing) HandleFileUpload(inst *instance.Instance, key string, body io.ReadCloser) error {
defer body.Close()
func (s *Sharing) HandleFileUpload(inst *instance.Instance, key string, create fileCreatorWithContent) error {
target, err := getStore().Get(inst, key)
if err != nil {
return err
Expand All @@ -533,13 +585,17 @@ func (s *Sharing) HandleFileUpload(inst *instance.Instance, key string, body io.
}

if current == nil {
return s.UploadNewFile(inst, target, body)
return s.UploadNewFile(inst, target, create)
}
return s.UploadExistingFile(inst, target, current, body)
return s.UploadExistingFile(inst, target, current, create)
}

// UploadNewFile is used to receive a new file.
func (s *Sharing) UploadNewFile(inst *instance.Instance, target *FileDocWithRevisions, body io.ReadCloser) error {
func (s *Sharing) UploadNewFile(
inst *instance.Instance,
target *FileDocWithRevisions,
create fileCreatorWithContent,
) error {
inst.Logger().WithNamespace("upload").Debugf("UploadNewFile")
ref := SharedRef{
Infos: make(map[string]SharedInfo),
Expand Down Expand Up @@ -617,7 +673,7 @@ func (s *Sharing) UploadNewFile(inst *instance.Instance, target *FileDocWithRevi
newdoc.ReferencedBy = append(newdoc.ReferencedBy, ref)
}

file, err := fs.CreateFile(newdoc, nil)
err = create(fs, newdoc, nil)
if errors.Is(err, os.ErrExist) {
pth, errp := newdoc.Path(fs)
if errp != nil {
Expand All @@ -632,17 +688,17 @@ func (s *Sharing) UploadNewFile(inst *instance.Instance, target *FileDocWithRevi
newdoc.DocName = name
newdoc.ResetFullpath()
}
file, err = fs.CreateFile(newdoc, nil)
err = create(fs, newdoc, nil)
}
if err != nil {
inst.Logger().WithNamespace("upload").
Debugf("Cannot create file: %s", err)
return err
}
if s.NbFiles > 0 {
defer s.countReceivedFiles(inst)
s.countReceivedFiles(inst)
}
return copyFileContent(inst, file, body)
return nil
}

// countReceivedFiles counts the number of files received during the initial
Expand Down Expand Up @@ -695,7 +751,12 @@ func (s *Sharing) countReceivedFiles(inst *instance.Instance) {
// than on content: a conflict on different content is resolved by a copy of
// the file (which is not what we want), a conflict of name+dir_id, the higher
// revision wins and it should be the good one in our case.
func (s *Sharing) UploadExistingFile(inst *instance.Instance, target *FileDocWithRevisions, newdoc *vfs.FileDoc, body io.ReadCloser) error {
func (s *Sharing) UploadExistingFile(
inst *instance.Instance,
target *FileDocWithRevisions,
newdoc *vfs.FileDoc,
create fileCreatorWithContent,
) error {
inst.Logger().WithNamespace("upload").Debugf("UploadExistingFile")
var ref SharedRef
err := couchdb.GetDoc(inst, consts.Shared, consts.Files+"/"+target.DocID, &ref)
Expand Down Expand Up @@ -731,7 +792,7 @@ func (s *Sharing) UploadExistingFile(inst *instance.Instance, target *FileDocWit
conflict := detectConflict(newdoc.DocRev, chain)
switch conflict {
case LostConflict:
return s.uploadLostConflict(inst, target, newdoc, body)
return s.uploadLostConflict(inst, target, newdoc, create)
case WonConflict:
if err = s.uploadWonConflict(inst, olddoc); err != nil {
return err
Expand All @@ -743,23 +804,15 @@ func (s *Sharing) UploadExistingFile(inst *instance.Instance, target *FileDocWit

// Easy case: only the content has changed, not its path
if newdoc.DocName == olddoc.DocName && newdoc.DirID == olddoc.DirID {
file, errf := fs.CreateFile(newdoc, olddoc)
if errf != nil {
return errf
}
return copyFileContent(inst, file, body)
return create(fs, newdoc, olddoc)
}

stash := indexer.StashRevision(false)
tmpdoc := newdoc.Clone().(*vfs.FileDoc)
tmpdoc.DocName = olddoc.DocName
tmpdoc.DirID = olddoc.DirID
tmpdoc.ResetFullpath()
file, err := fs.CreateFile(tmpdoc, olddoc)
if err != nil {
return err
}
if err = copyFileContent(inst, file, body); err != nil {
if err := create(fs, tmpdoc, olddoc); err != nil {
return err
}

Expand Down Expand Up @@ -788,7 +841,12 @@ func (s *Sharing) UploadExistingFile(inst *instance.Instance, target *FileDocWit

// uploadLostConflict manages an upload where a file is in conflict, and the
// uploaded file version goes to a new file.
func (s *Sharing) uploadLostConflict(inst *instance.Instance, target *FileDocWithRevisions, newdoc *vfs.FileDoc, body io.ReadCloser) error {
func (s *Sharing) uploadLostConflict(
inst *instance.Instance,
target *FileDocWithRevisions,
newdoc *vfs.FileDoc,
create fileCreatorWithContent,
) error {
rev := target.Rev()
inst.Logger().WithNamespace("upload").Debugf("uploadLostConflict %s", rev)
indexer := newSharingIndexer(inst, &bulkRevs{
Expand All @@ -798,21 +856,16 @@ func (s *Sharing) uploadLostConflict(inst *instance.Instance, target *FileDocWit
fs := inst.VFS().UseSharingIndexer(indexer)
newdoc.DocID = conflictID(newdoc.DocID, rev)
if _, err := fs.FileByID(newdoc.DocID); !errors.Is(err, os.ErrNotExist) {
if err != nil {
return err
}
body.Close()
return nil
return err
}
newdoc.DocName = conflictName(indexer, newdoc.DirID, newdoc.DocName, true)
newdoc.DocRev = ""
newdoc.ResetFullpath()
file, err := fs.CreateFile(newdoc, nil)
if err != nil {
if err := create(fs, newdoc, nil); err != nil {
inst.Logger().WithNamespace("upload").Debugf("1. loser = %#v", newdoc)
return err
}
inst.Logger().WithNamespace("upload").Debugf("1. loser = %#v", newdoc)
return copyFileContent(inst, file, body)
return nil
}

// uploadWonConflict manages an upload where a file is in conflict, and the
Expand Down
5 changes: 5 additions & 0 deletions model/vfs/vfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ type Fs interface {
// version.
ImportFileVersion(version *Version, content io.ReadCloser) error

// CopyFileFromOtherFS creates or updates a file by copying the content of
// a file in another Cozy. It is used for sharings, to optimize I/O when
// two instances are on the same stack.
CopyFileFromOtherFS(olddoc, newdoc *FileDoc, srcFS Fs, srcDoc *FileDoc) error

// Fsck return the list of inconsistencies in the VFS
Fsck(func(log *FsckLog), bool) (err error)
CheckFilesConsistency(func(*FsckLog), bool) error
Expand Down
27 changes: 27 additions & 0 deletions model/vfs/vfsafero/impl.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Package vfsafero is the implementation of the Virtual File System by using
// afero. Afero is a library for manipulating files and directory on the local
// file system.
package vfsafero

import (
Expand Down Expand Up @@ -572,6 +575,30 @@ func (afs *aferoVFS) RevertFileVersion(doc *vfs.FileDoc, version *vfs.Version) e
return nil
}

func (afs *aferoVFS) CopyFileFromOtherFS(
newdoc, olddoc *vfs.FileDoc,
srcFS vfs.Fs,
srcDoc *vfs.FileDoc,
) error {
content, err := srcFS.OpenFile(srcDoc)
if err != nil {
return err
}
defer content.Close()

fd, err := afs.CreateFile(newdoc, olddoc)
if err != nil {
return err
}

_, err = io.Copy(fd, content)
errc := fd.Close()
if err != nil {
return err
}
return errc
}

// UpdateFileDoc overrides the indexer's one since the afero.Fs is by essence
// also indexed by path. When moving a file, the index has to be moved and the
// filesystem should also be updated.
Expand Down
Loading

0 comments on commit 349a71b

Please sign in to comment.