Skip to content

Commit

Permalink
Throttle concurrent reads to avoid having too many open file handles.
Browse files Browse the repository at this point in the history
  • Loading branch information
LTLA committed Feb 3, 2025
1 parent f60e5a3 commit 61c4386
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 63 deletions.
43 changes: 27 additions & 16 deletions database.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ func compareToExistingPaths(tx *writeTransaction, did int64, all_paths map[strin
return new_paths, update_paths, purge_paths, nil
}

func addDirectoryContents(tx *writeTransaction, path string, did int64, base_names []string, tokenizer* unicodeTokenizer) ([]string, error) {
func addDirectoryContents(tx *writeTransaction, path string, did int64, base_names []string, tokenizer* unicodeTokenizer, concurrency int) ([]string, error) {
all_failures := []string{}

dir_contents, dir_failures := listMetadata(path, base_names)
Expand All @@ -473,27 +473,38 @@ func addDirectoryContents(tx *writeTransaction, path string, did int64, base_nam
return nil, err
}

// Loading the metadata into memory.
// Loading the metadata into memory; we use a thread pool to avoid opening too many file handles at once.
new_assets := make([]*loadedMetadata, len(new_paths))
update_assets := make([]*loadedMetadata, len(update_paths))
{
var wg sync.WaitGroup
wg.Add(len(new_paths) + len(update_paths))
wg.Add(concurrency)
ichannel := make(chan int)
uchannel := make(chan int)

for i, e := range new_paths {
go func(i int, e *FileInfoWithPath) {
for t := 0; t < concurrency; t++ {
go func() {
defer wg.Done()
new_assets[i] = loadMetadata(e.Path, e.Info)
}(i, e)
for i := range ichannel {
e := new_paths[i]
new_assets[i] = loadMetadata(e.Path, e.Info)
}
for i := range uchannel {
e := update_paths[i]
update_assets[i] = loadMetadata(e.Path, e.Info)
}
}()
}

for i, e := range update_paths {
go func(i int, e *FileInfoWithPath) {
defer wg.Done()
update_assets[i] = loadMetadata(e.Path, e.Info)
}(i, e)
for i := 0; i < len(new_paths); i++ {
ichannel <- i
}
close(ichannel)

for i := 0; i < len(update_paths); i++ {
uchannel <- i
}
close(uchannel)
wg.Wait()
}

Expand Down Expand Up @@ -598,7 +609,7 @@ func addDirectoryContents(tx *writeTransaction, path string, did int64, base_nam

/**********************************************************************/

func addNewDirectory(db *sql.DB, path string, base_names []string, user string, tokenizer* unicodeTokenizer) ([]string, error) {
func addNewDirectory(db *sql.DB, path string, base_names []string, user string, tokenizer* unicodeTokenizer, concurrency int) ([]string, error) {
b, err := json.Marshal(base_names)
if err != nil {
return nil, fmt.Errorf("failed to encode names as JSON; %w", err)
Expand Down Expand Up @@ -636,7 +647,7 @@ func addNewDirectory(db *sql.DB, path string, base_names []string, user string,
return nil, fmt.Errorf("failed to insert new directory; %w", err)
}

failures, err := addDirectoryContents(atx, path, did, base_names, tokenizer)
failures, err := addDirectoryContents(atx, path, did, base_names, tokenizer, concurrency)

err = atx.Commit()
if err != nil {
Expand Down Expand Up @@ -682,7 +693,7 @@ func listDirectories(tx *writeTransaction) ([]*registeredDirectory, error) {
return all_dirs, nil
}

func updateDirectories(db *sql.DB, tokenizer* unicodeTokenizer) ([]string, error) {
func updateDirectories(db *sql.DB, tokenizer *unicodeTokenizer, concurrency int) ([]string, error) {
atx, err := createWriteTransaction(db)
if err != nil {
return nil, fmt.Errorf("failed to prepare transaction for update; %w", err)
Expand All @@ -696,7 +707,7 @@ func updateDirectories(db *sql.DB, tokenizer* unicodeTokenizer) ([]string, error

all_failures := []string{}
for _, d := range all_dirs {
curfailures, err := addDirectoryContents(atx, d.Path, d.Id, d.Names, tokenizer)
curfailures, err := addDirectoryContents(atx, d.Path, d.Id, d.Names, tokenizer, concurrency)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 61c4386

Please sign in to comment.