Skip to content

Commit

Permalink
drive: apply gclone mod
Browse files Browse the repository at this point in the history
drive: add parsing drive id from drive url
drive: add log message for service account
drive: add support for drive id length 28
drive: merge service account file path with function instead of concat
stats: add number of files per second
  • Loading branch information
mawaya committed Jun 1, 2020
1 parent 8774381 commit 96ddd31
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 2 deletions.
186 changes: 186 additions & 0 deletions backend/drive/drive.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ import (
"io"
"io/ioutil"
"log"
"math/rand"
"mime"
"net/http"
"net/url"
"os"
"path"
"regexp"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -230,6 +232,9 @@ in with the ID of the root folder.
}, {
Name: "service_account_file",
Help: "Service Account Credentials JSON file path \nLeave blank normally.\nNeeded only if you want use SA instead of interactive login.",
}, {
Name: "service_account_file_path",
Help: "Service Account Credentials JSON folder path.",
}, {
Name: "service_account_credentials",
Help: "Service Account Credentials JSON blob\nLeave blank normally.\nNeeded only if you want use SA instead of interactive login.",
Expand Down Expand Up @@ -512,6 +517,7 @@ type Options struct {
Scope string `config:"scope"`
RootFolderID string `config:"root_folder_id"`
ServiceAccountFile string `config:"service_account_file"`
ServiceAccountFilePath string `config:"service_account_file_path"`
ServiceAccountCredentials string `config:"service_account_credentials"`
TeamDriveID string `config:"team_drive"`
AuthOwnerOnly bool `config:"auth_owner_only"`
Expand Down Expand Up @@ -564,6 +570,11 @@ type Fs struct {
grouping int32 // number of IDs to search at once in ListR - read with atomic
listRmu *sync.Mutex // protects listRempties
listRempties map[string]struct{} // IDs of supposedly empty directories which triggered grouping disable
// Mod: service account
ServiceAccountFiles map[string]int
serviceAccountMutex sync.Mutex
FileObj *fs.Object
FileName string
}

type baseObject struct {
Expand Down Expand Up @@ -634,6 +645,15 @@ func (f *Fs) shouldRetry(err error) (bool, error) {
if len(gerr.Errors) > 0 {
reason := gerr.Errors[0].Reason
if reason == "rateLimitExceeded" || reason == "userRateLimitExceeded" {
// Mod: change service account
if ok, _ := f.shouldChangeSA(); ok {
f.serviceAccountMutex.Lock()
if e := f.changeServiceAccount(reason); e != nil {
fs.Errorf(nil, "Change service account error: %v", e)
}
f.serviceAccountMutex.Unlock()
return true, err
}
if f.opt.StopOnUploadLimit && gerr.Errors[0].Message == "User rate limit exceeded." {
fs.Errorf(f, "Received upload limit error: %v", err)
return false, fserrors.FatalError(err)
Expand Down Expand Up @@ -1003,6 +1023,8 @@ func createOAuthClient(opt *Options, name string, m configmap.Mapper) (*http.Cli
var oAuthClient *http.Client
var err error

// Mod: [TODO]

// try loading service account credentials from env variable, then from a file
if len(opt.ServiceAccountCredentials) == 0 && opt.ServiceAccountFile != "" {
loadedCreds, err := ioutil.ReadFile(os.ExpandEnv(opt.ServiceAccountFile))
Expand Down Expand Up @@ -1056,12 +1078,56 @@ func (f *Fs) setUploadCutoff(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) {
return
}

func parseRootID(s string) (rootID string, err error) {
rootID = s

if strings.HasPrefix(s, "http") {
re := regexp.MustCompile(`\/(folders|files|file\/d)\/([A-Za-z0-9_-]+)\/?`)
if m := re.FindStringSubmatch(s); m != nil {
rootID = m[2]
return
}

re = regexp.MustCompile(`.+id=([A-Za-z0-9_-]+).?`)
if m := re.FindStringSubmatch(s); m != nil {
rootID = m[1]
return
}
}
return
}

// NewFs constructs an Fs from the path, container:path
func NewFs(name, path string, m configmap.Mapper) (fs.Fs, error) {
ctx := context.Background()
// Parse config into Options struct
opt := new(Options)
err := configstruct.Set(m, opt)

// Mod: parse object id from path remote:{ID}
isFileID := false
if path != "" && path[0:1] == "{" && strings.Contains(path, "}") {
idIndex := strings.Index(path, "}")
if idIndex > 0 {
rootID, err := parseRootID(path[1:idIndex])
if err != nil {

} else {
name += rootID
fs.Debugf(nil, "Root ID detected: %s", rootID)
//opt.ServerSideAcrossConfigs = true
if len(rootID) == 33 || len(rootID) == 28 {
isFileID = true
opt.RootFolderID = rootID
} else {
opt.RootFolderID = rootID
opt.TeamDriveID = rootID
}
path = path[idIndex+1:]
}
}
}

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1159,6 +1225,28 @@ func NewFs(name, path string, m configmap.Mapper) (fs.Fs, error) {
return nil, err
}

// Mod: confirm the object ID is file
if isFileID {
file, err := f.svc.Files.Get(opt.RootFolderID).Fields("name", "id", "size", "mimeType").SupportsAllDrives(true).Do()
if err == nil {
//fmt.Println("file.MimeType", file.MimeType)
if "application/vnd.google-apps.folder" != file.MimeType && file.MimeType != "" {
tempF := *f
newRoot := ""
tempF.dirCache = dircache.New(newRoot, f.rootFolderID, &tempF)
tempF.root = newRoot
f.dirCache = tempF.dirCache
f.root = tempF.root

extension, exportName, exportMimeType, isDocument := f.findExportFormat(file)
obj, _ := f.newObjectWithExportInfo(file.Name, file, extension, exportName, exportMimeType, isDocument)
f.root = "isFile:" + file.Name
f.FileObj = &obj
return f, fs.ErrorIsFile
}
}
}

// Find the current root
err = f.dirCache.FindRoot(ctx, false)
if err != nil {
Expand Down Expand Up @@ -1365,6 +1453,11 @@ func (f *Fs) newObjectWithExportInfo(
// NewObject finds the Object at remote. If it can't be found
// it returns the error fs.ErrorObjectNotFound.
func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
// DriveMod
if f.FileObj != nil {
return *f.FileObj, nil
}

info, extension, exportName, exportMimeType, isDocument, err := f.getRemoteInfoWithExport(ctx, remote)
if err != nil {
return nil, err
Expand Down Expand Up @@ -2803,6 +2896,96 @@ func (f *Fs) changeChunkSize(chunkSizeString string) (err error) {
return err
}

// Mod: shouldChangeSA determines whether multiple service accounts existed
func (f *Fs) shouldChangeSA() (bool, error) {
if len(f.opt.ServiceAccountFilePath) > 0 {
return true, nil
}
return false, nil
}

// Mod:: loadServiceAccountFiles load service account file from folder
func (f *Fs) loadServiceAccountFiles() (map[string]int, error) {
opt := &f.opt

list := make(map[string]int)

// Read service account file from folder
saFolder := opt.ServiceAccountFilePath
if len(saFolder) > 0 {
fs.Debugf(nil, "Loading Service Account File(s) from %q", saFolder)
files, err := ioutil.ReadDir(saFolder)
if err != nil {
return list, errors.Wrap(err, "error loading service account from folder")
}

// Add valid service account file
for i, file := range files {
filePath := path.Join(saFolder, file.Name())
if filePath == opt.ServiceAccountFile || path.Ext(filePath) != ".json" {
continue
}
list[filePath] = i
}
}

fs.Debugf(nil, "Loaded %d Service Account File(s)", len(list))
return list, nil
}

// Mod:: pickServiceAccountFile
func (f *Fs) pickServiceAccountFile() (filePath string, err error) {
list := f.ServiceAccountFiles

if len(list) == 0 {
err = errors.Errorf("no available service account file")
return
}

// Select by random
r := rand.Intn(len(list))
for k := range list {
if r == 0 {
filePath = k
}
r--
}
delete(list, filePath)

return
}

// Mod: changeServiceAccount change service account from list
// Read json from folder if empty.
func (f *Fs) changeServiceAccount(reason string) error {
// opt := &f.opt

// Load
if len(f.ServiceAccountFiles) == 0 {
list, err := f.loadServiceAccountFiles()
if err != nil {
return err
}
f.ServiceAccountFiles = list
}

if len(f.ServiceAccountFiles) == 0 {
return errors.Errorf("no available service account file")
}

filePath, err := f.pickServiceAccountFile()
if err != nil {
return err
}

err = f.changeServiceAccountFile(filePath)
if err == nil {
fs.Debugf(nil, "Service Account Changed (remain: %d, reason: %s)", len(f.ServiceAccountFiles), reason)
}

return err
}

func (f *Fs) changeServiceAccountFile(file string) (err error) {
fs.Debugf(nil, "Changing Service Account File from %s to %s", f.opt.ServiceAccountFile, file)
if file == f.opt.ServiceAccountFile {
Expand All @@ -2829,6 +3012,9 @@ func (f *Fs) changeServiceAccountFile(file string) (err error) {
if err != nil {
return errors.Wrap(err, "drive: failed when making oauth client")
}

f.pacer = newPacer(&f.opt)

f.client = oAuthClient
f.svc, err = drive.New(f.client)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions cmd/copy/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ recently very efficiently like this:
Run: func(command *cobra.Command, args []string) {
cmd.CheckArgs(2, 2, command, args)
fsrc, srcFileName, fdst := cmd.NewFsSrcFileDst(args)

// Mod
if len(fsrc.Root()) > 7 && "isFile:" == fsrc.Root()[0:7] {
srcFileName = fsrc.Root()[7:]
}

cmd.Run(true, true, command, func() error {
if srcFileName == "" {
return sync.CopyDir(context.Background(), fdst, fsrc, createEmptySrcDirs)
Expand Down
5 changes: 3 additions & 2 deletions fs/accounting/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,9 @@ func (s *StatsInfo) String() string {
_, _ = fmt.Fprintf(buf, "Renamed: %10d\n", s.renames)
}
if s.transfers != 0 || totalTransfer != 0 {
_, _ = fmt.Fprintf(buf, "Transferred: %10d / %d, %s\n",
s.transfers, totalTransfer, percent(s.transfers, totalTransfer))
// Mod: Add number of files
_, _ = fmt.Fprintf(buf, "Transferred: %10d / %d, %s, %.2f Files/s\n",
s.transfers, totalTransfer, percent(s.transfers, totalTransfer), float64(s.transfers)/dtSeconds)
}
_, _ = fmt.Fprintf(buf, "Elapsed time: %10ss\n", strings.TrimRight(dt.Truncate(time.Minute).String(), "0s")+fmt.Sprintf("%.1f", dtSecondsOnly.Seconds()))
}
Expand Down

0 comments on commit 96ddd31

Please sign in to comment.