Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] Split reflists to share their contents across snapshots #1282

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
BOTO_CONFIG: /dev/null
GO111MODULE: "on"
GOPROXY: "https://proxy.golang.org"
GOVER: '1.21'
GOVER: '1.19'

steps:
- name: Checkout repository
Expand Down Expand Up @@ -99,7 +99,7 @@ jobs:
- name: Ubuntu-20
image: "ubuntu:20.04"
suite: focal
GOVER: '1.21'
GOVER: '1.19'
install: "make ca-certificates git curl"
- name: Ubuntu-22
image: "ubuntu:22.04"
Expand All @@ -108,12 +108,12 @@ jobs:
- name: Debian-10
image: "debian:buster"
suite: buster
GOVER: '1.21'
GOVER: '1.19'
install: "make ca-certificates git curl"
- name: Debian-11
image: "debian:bullseye"
suite: bullseye
GOVER: '1.21'
GOVER: '1.19'
install: "make ca-certificates git curl"
- name: Debian-12
image: "debian:bookworm"
Expand Down
2 changes: 1 addition & 1 deletion api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func maybeRunTaskInBackground(c *gin.Context, name string, resources []string, p

// Common piece of code to show list of packages,
// with searching & details if requested
func showPackages(c *gin.Context, reflist *deb.PackageRefList, collectionFactory *deb.CollectionFactory) {
func showPackages(c *gin.Context, reflist deb.AnyRefList, collectionFactory *deb.CollectionFactory) {
result := []*deb.Package{}

list, err := deb.NewPackageListFromRefList(reflist, collectionFactory.PackageCollection(), nil)
Expand Down
81 changes: 61 additions & 20 deletions api/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"sort"

"github.com/aptly-dev/aptly/aptly"
"github.com/aptly-dev/aptly/database"
"github.com/aptly-dev/aptly/deb"
"github.com/aptly-dev/aptly/task"
"github.com/aptly-dev/aptly/utils"
Expand All @@ -20,18 +21,22 @@

collectionFactory := context.NewCollectionFactory()

// collect information about referenced packages...
existingPackageRefs := deb.NewPackageRefList()
// collect information about referenced packages and their reflist buckets...
existingPackageRefs := deb.NewSplitRefList()
existingBuckets := deb.NewRefListDigestSet()

reflistMigration := collectionFactory.RefListCollection().NewMigration()

out.Printf("Loading mirrors, local repos, snapshots and published repos...")
err = collectionFactory.RemoteRepoCollection().ForEach(func(repo *deb.RemoteRepo) error {
e := collectionFactory.RemoteRepoCollection().LoadComplete(repo)
if e != nil {
sl := deb.NewSplitRefList()
e := collectionFactory.RefListCollection().LoadCompleteAndMigrate(sl, repo.RefKey(), reflistMigration)
if e != nil && e != database.ErrNotFound {

Check warning on line 34 in api/db.go

View check run for this annotation

Codecov / codecov/patch

api/db.go#L32-L34

Added lines #L32 - L34 were not covered by tests
return e
}
if repo.RefList() != nil {
existingPackageRefs = existingPackageRefs.Merge(repo.RefList(), false, true)
}

existingPackageRefs = existingPackageRefs.Merge(sl, false, true)
existingBuckets.AddAllInRefList(sl)

Check warning on line 39 in api/db.go

View check run for this annotation

Codecov / codecov/patch

api/db.go#L38-L39

Added lines #L38 - L39 were not covered by tests

return nil
})
Expand All @@ -40,14 +45,14 @@
}

err = collectionFactory.LocalRepoCollection().ForEach(func(repo *deb.LocalRepo) error {
e := collectionFactory.LocalRepoCollection().LoadComplete(repo)
if e != nil {
sl := deb.NewSplitRefList()
e := collectionFactory.RefListCollection().LoadCompleteAndMigrate(sl, repo.RefKey(), reflistMigration)
if e != nil && e != database.ErrNotFound {

Check warning on line 50 in api/db.go

View check run for this annotation

Codecov / codecov/patch

api/db.go#L48-L50

Added lines #L48 - L50 were not covered by tests
return e
}

if repo.RefList() != nil {
existingPackageRefs = existingPackageRefs.Merge(repo.RefList(), false, true)
}
existingPackageRefs = existingPackageRefs.Merge(sl, false, true)
existingBuckets.AddAllInRefList(sl)

Check warning on line 55 in api/db.go

View check run for this annotation

Codecov / codecov/patch

api/db.go#L54-L55

Added lines #L54 - L55 were not covered by tests

return nil
})
Expand All @@ -56,12 +61,14 @@
}

err = collectionFactory.SnapshotCollection().ForEach(func(snapshot *deb.Snapshot) error {
e := collectionFactory.SnapshotCollection().LoadComplete(snapshot)
sl := deb.NewSplitRefList()
e := collectionFactory.RefListCollection().LoadCompleteAndMigrate(sl, snapshot.RefKey(), reflistMigration)

Check warning on line 65 in api/db.go

View check run for this annotation

Codecov / codecov/patch

api/db.go#L64-L65

Added lines #L64 - L65 were not covered by tests
if e != nil {
return e
}

existingPackageRefs = existingPackageRefs.Merge(snapshot.RefList(), false, true)
existingPackageRefs = existingPackageRefs.Merge(sl, false, true)
existingBuckets.AddAllInRefList(sl)

Check warning on line 71 in api/db.go

View check run for this annotation

Codecov / codecov/patch

api/db.go#L70-L71

Added lines #L70 - L71 were not covered by tests

return nil
})
Expand All @@ -73,25 +80,37 @@
if published.SourceKind != deb.SourceLocalRepo {
return nil
}
e := collectionFactory.PublishedRepoCollection().LoadComplete(published, collectionFactory)
if e != nil {
return e
}

for _, component := range published.Components() {
existingPackageRefs = existingPackageRefs.Merge(published.RefList(component), false, true)
sl := deb.NewSplitRefList()
e := collectionFactory.RefListCollection().LoadCompleteAndMigrate(sl, published.RefKey(component), reflistMigration)
if e != nil {
return e
}

Check warning on line 89 in api/db.go

View check run for this annotation

Codecov / codecov/patch

api/db.go#L85-L89

Added lines #L85 - L89 were not covered by tests

existingPackageRefs = existingPackageRefs.Merge(sl, false, true)
existingBuckets.AddAllInRefList(sl)

Check warning on line 92 in api/db.go

View check run for this annotation

Codecov / codecov/patch

api/db.go#L91-L92

Added lines #L91 - L92 were not covered by tests
}
return nil
})
if err != nil {
return nil, err
}

err = reflistMigration.Flush()
if err != nil {
return nil, err
}

Check warning on line 103 in api/db.go

View check run for this annotation

Codecov / codecov/patch

api/db.go#L102-L103

Added lines #L102 - L103 were not covered by tests
if stats := reflistMigration.Stats(); stats.Reflists > 0 {
out.Printf("Split %d reflist(s) into %d bucket(s) (%d segment(s))",
stats.Reflists, stats.Buckets, stats.Segments)
}

Check warning on line 107 in api/db.go

View check run for this annotation

Codecov / codecov/patch

api/db.go#L105-L107

Added lines #L105 - L107 were not covered by tests

// ... and compare it to the list of all packages
out.Printf("Loading list of all packages...")
allPackageRefs := collectionFactory.PackageCollection().AllPackageRefs()

toDelete := allPackageRefs.Subtract(existingPackageRefs)
toDelete := allPackageRefs.Subtract(existingPackageRefs.Flatten())

// delete packages that are no longer referenced
out.Printf("Deleting unreferenced packages (%d)...", toDelete.Len())
Expand All @@ -112,6 +131,28 @@
}
}

bucketsToDelete, err := collectionFactory.RefListCollection().AllBucketDigests()
if err != nil {
return nil, err
}

Check warning on line 137 in api/db.go

View check run for this annotation

Codecov / codecov/patch

api/db.go#L136-L137

Added lines #L136 - L137 were not covered by tests

bucketsToDelete.RemoveAll(existingBuckets)

out.Printf("Deleting unreferenced reflist buckets (%d)...", bucketsToDelete.Len())
if bucketsToDelete.Len() > 0 {
batch := db.CreateBatch()
err := bucketsToDelete.ForEach(func(digest []byte) error {
return collectionFactory.RefListCollection().UnsafeDropBucket(digest, batch)
})
if err != nil {
return nil, err
}

Check warning on line 149 in api/db.go

View check run for this annotation

Codecov / codecov/patch

api/db.go#L143-L149

Added lines #L143 - L149 were not covered by tests

if err := batch.Write(); err != nil {
return nil, err
}

Check warning on line 153 in api/db.go

View check run for this annotation

Codecov / codecov/patch

api/db.go#L151-L153

Added lines #L151 - L153 were not covered by tests
}

// now, build a list of files that should be present in Repository (package pool)
out.Printf("Building list of files referenced by packages...")
referencedFiles := make([]string, 0, existingPackageRefs.Len())
Expand Down
6 changes: 1 addition & 5 deletions api/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,7 @@ func apiFilesListFiles(c *gin.Context) {
listLock := &sync.Mutex{}
root := filepath.Join(context.UploadPath(), c.Params.ByName("dir"))

err := filepath.Walk(root, func(path string, _ os.FileInfo, err error) error {
if err != nil {
return err
}

err := walker.Walk(root, func(path string, _ os.FileInfo) error {
if path == root {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func countPackagesByRepos() {

components := repo.Components()
for _, c := range components {
count := float64(len(repo.RefList(c).Refs))
count := float64(repo.RefList(c).Len())
apiReposPackageCountGauge.WithLabelValues(fmt.Sprintf("%s", (repo.SourceNames())), repo.Distribution, c).Set(count)
}

Expand Down
12 changes: 6 additions & 6 deletions api/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func apiMirrorsCreate(c *gin.Context) {
return
}

err = collection.Add(repo)
err = collection.Add(repo, collectionFactory.RefListCollection())
if err != nil {
AbortWithJSONError(c, 500, fmt.Errorf("unable to add mirror: %s", err))
return
Expand Down Expand Up @@ -184,7 +184,7 @@ func apiMirrorsShow(c *gin.Context) {
return
}

err = collection.LoadComplete(repo)
err = collection.LoadComplete(repo, collectionFactory.RefListCollection())
if err != nil {
AbortWithJSONError(c, 500, fmt.Errorf("unable to show: %s", err))
}
Expand All @@ -204,7 +204,7 @@ func apiMirrorsPackages(c *gin.Context) {
return
}

err = collection.LoadComplete(repo)
err = collection.LoadComplete(repo, collectionFactory.RefListCollection())
if err != nil {
AbortWithJSONError(c, 500, fmt.Errorf("unable to show: %s", err))
}
Expand Down Expand Up @@ -402,12 +402,12 @@ func apiMirrorsUpdate(c *gin.Context) {
e := context.ReOpenDatabase()
if e == nil {
remote.MarkAsIdle()
collection.Update(remote)
collection.Update(remote, collectionFactory.RefListCollection())
}
}()

remote.MarkAsUpdating()
err = collection.Update(remote)
err = collection.Update(remote, collectionFactory.RefListCollection())
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
Expand Down Expand Up @@ -565,7 +565,7 @@ func apiMirrorsUpdate(c *gin.Context) {

log.Info().Msgf("%s: Finalizing download...", b.Name)
remote.FinalizeDownload(collectionFactory, out)
err = collectionFactory.RemoteRepoCollection().Update(remote)
err = collectionFactory.RemoteRepoCollection().Update(remote, collectionFactory.RefListCollection())
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
Expand Down
10 changes: 5 additions & 5 deletions api/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
}

resources = append(resources, string(snapshot.ResourceKey()))
err = snapshotCollection.LoadComplete(snapshot)
err = snapshotCollection.LoadComplete(snapshot, collectionFactory.RefListCollection())
if err != nil {
AbortWithJSONError(c, 500, fmt.Errorf("unable to publish: %s", err))
return
Expand All @@ -165,7 +165,7 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
}

resources = append(resources, string(localRepo.Key()))
err = localCollection.LoadComplete(localRepo)
err = localCollection.LoadComplete(localRepo, collectionFactory.RefListCollection())
if err != nil {
AbortWithJSONError(c, 500, fmt.Errorf("unable to publish: %s", err))
}
Expand Down Expand Up @@ -232,7 +232,7 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err)
}

err = collection.Add(published)
err = collection.Add(published, collectionFactory.RefListCollection())
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
}
Expand Down Expand Up @@ -313,7 +313,7 @@ func apiPublishUpdateSwitch(c *gin.Context) {
return
}

err2 = snapshotCollection.LoadComplete(snapshot)
err2 = snapshotCollection.LoadComplete(snapshot, collectionFactory.RefListCollection())
if err2 != nil {
AbortWithJSONError(c, 500, err2)
return
Expand Down Expand Up @@ -348,7 +348,7 @@ func apiPublishUpdateSwitch(c *gin.Context) {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}

err = collection.Update(published)
err = collection.Update(published, collectionFactory.RefListCollection())
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
}
Expand Down
20 changes: 10 additions & 10 deletions api/repos.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@

collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.LocalRepoCollection()
err := collection.Add(repo)
err := collection.Add(repo, collectionFactory.RefListCollection())
if err != nil {
AbortWithJSONError(c, 400, err)
return
Expand Down Expand Up @@ -132,7 +132,7 @@
repo.DefaultComponent = *b.DefaultComponent
}

err = collection.Update(repo)
err = collection.Update(repo, collectionFactory.RefListCollection())

Check warning on line 135 in api/repos.go

View check run for this annotation

Codecov / codecov/patch

api/repos.go#L135

Added line #L135 was not covered by tests
if err != nil {
AbortWithJSONError(c, 500, err)
return
Expand Down Expand Up @@ -201,7 +201,7 @@
return
}

err = collection.LoadComplete(repo)
err = collection.LoadComplete(repo, collectionFactory.RefListCollection())
if err != nil {
AbortWithJSONError(c, 500, err)
return
Expand Down Expand Up @@ -229,7 +229,7 @@
return
}

err = collection.LoadComplete(repo)
err = collection.LoadComplete(repo, collectionFactory.RefListCollection())
if err != nil {
AbortWithJSONError(c, 500, err)
return
Expand Down Expand Up @@ -261,9 +261,9 @@
}
}

repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list))
repo.UpdateRefList(deb.NewSplitRefListFromPackageList(list))

err = collectionFactory.LocalRepoCollection().Update(repo)
err = collectionFactory.LocalRepoCollection().Update(repo, collectionFactory.RefListCollection())
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err)
}
Expand Down Expand Up @@ -320,7 +320,7 @@
return
}

err = collection.LoadComplete(repo)
err = collection.LoadComplete(repo, collectionFactory.RefListCollection())
if err != nil {
AbortWithJSONError(c, 500, err)
return
Expand Down Expand Up @@ -369,9 +369,9 @@
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to import package files: %s", err)
}

repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list))
repo.UpdateRefList(deb.NewSplitRefListFromPackageList(list))

err = collectionFactory.LocalRepoCollection().Update(repo)
err = collectionFactory.LocalRepoCollection().Update(repo, collectionFactory.RefListCollection())
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err)
}
Expand Down Expand Up @@ -489,7 +489,7 @@
_, failedFiles2, err = deb.ImportChangesFiles(
changesFiles, reporter, acceptUnsigned, ignoreSignature, forceReplace, noRemoveFiles, verifier,
repoTemplate, context.Progress(), collectionFactory.LocalRepoCollection(), collectionFactory.PackageCollection(),
context.PackagePool(), collectionFactory.ChecksumCollection, nil, query.Parse)
collectionFactory.RefListCollection(), context.PackagePool(), collectionFactory.ChecksumCollection, nil, query.Parse)
failedFiles = append(failedFiles, failedFiles2...)

if err != nil {
Expand Down
Loading
Loading