Skip to content

Commit

Permalink
[wip] more concurrent catalogers
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Goodman <[email protected]>
  • Loading branch information
wagoodman committed Oct 1, 2024
1 parent 7815d8e commit edd910f
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 45 deletions.
3 changes: 3 additions & 0 deletions cmd/syft/internal/commands/attest.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/wagoodman/go-progress"

"github.com/anchore/clio"
"github.com/anchore/go-sync"
"github.com/anchore/stereoscope"
"github.com/anchore/syft/cmd/syft/internal/options"
"github.com/anchore/syft/cmd/syft/internal/ui"
Expand Down Expand Up @@ -252,6 +253,8 @@ func generateSBOMForAttestation(ctx context.Context, id clio.Identification, opt
return nil, fmt.Errorf("attest requires use of an OCI registry directly, one or more of the specified sources is unsupported: %v", opts.From)
}

ctx = sync.SetContextExecutor(ctx, sync.NewExecutor(opts.Parallelism))

src, err := getSource(ctx, opts, userInput, stereoscope.RegistryTag)

if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions cmd/syft/internal/commands/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/anchore/clio"
"github.com/anchore/go-collections"
"github.com/anchore/go-sync"
"github.com/anchore/stereoscope"
"github.com/anchore/stereoscope/pkg/image"
"github.com/anchore/syft/cmd/syft/internal/options"
Expand Down Expand Up @@ -167,6 +168,8 @@ func validateArgs(cmd *cobra.Command, args []string, error string) error {
}

func runScan(ctx context.Context, id clio.Identification, opts *scanOptions, userInput string) error {
ctx = sync.SetContextExecutor(ctx, sync.NewExecutor(opts.Parallelism))

writer, err := opts.SBOMWriter()
if err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion cmd/syft/internal/options/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package options

import (
"fmt"
"runtime"
"sort"
"strings"

Expand Down Expand Up @@ -72,7 +73,7 @@ func DefaultCatalog() Catalog {
File: defaultFileConfig(),
Relationships: defaultRelationshipsConfig(),
Source: defaultSourceConfig(),
Parallelism: 1,
Parallelism: runtime.NumCPU() * 3,
}
}

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ require (
github.com/BurntSushi/toml v1.4.0
github.com/OneOfOne/xxhash v1.2.8
github.com/adrg/xdg v0.5.0
github.com/anchore/go-sync v0.0.0-20240926143818-0345bfc976f9
github.com/magiconair/properties v1.8.7
golang.org/x/exp v0.0.0-20231108232855-2478ac86f678
)
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ github.com/anchore/go-macholibre v0.0.0-20220308212642-53e6d0aaf6fb h1:iDMnx6LIj
github.com/anchore/go-macholibre v0.0.0-20220308212642-53e6d0aaf6fb/go.mod h1:DmTY2Mfcv38hsHbG78xMiTDdxFtkHpgYNVDPsF2TgHk=
github.com/anchore/go-struct-converter v0.0.0-20221118182256-c68fdcfa2092 h1:aM1rlcoLz8y5B2r4tTLMiVTrMtpfY0O8EScKJxaSaEc=
github.com/anchore/go-struct-converter v0.0.0-20221118182256-c68fdcfa2092/go.mod h1:rYqSE9HbjzpHTI74vwPvae4ZVYZd1lue2ta6xHPdblA=
github.com/anchore/go-sync v0.0.0-20240926143818-0345bfc976f9 h1:CEONkqYICLuKgiIgHIcY9cxSDvQWMtDnIY01HSVCJhc=
github.com/anchore/go-sync v0.0.0-20240926143818-0345bfc976f9/go.mod h1:43zWHVYBx8GXzjjISSD4rMpACGBpUZmE3Yy+qUNnhn4=
github.com/anchore/go-testutils v0.0.0-20200925183923-d5f45b0d3c04 h1:VzprUTpc0vW0nnNKJfJieyH/TZ9UYAnTZs5/gHTdAe8=
github.com/anchore/go-testutils v0.0.0-20200925183923-d5f45b0d3c04/go.mod h1:6dK64g27Qi1qGQZ67gFmBFvEHScy0/C8qhQhNe5B5pQ=
github.com/anchore/go-version v1.2.2-0.20200701162849-18adb9c92b9b h1:e1bmaoJfZVsCYMrIZBpFxwV26CbsuoEh5muXD5I1Ods=
Expand Down
61 changes: 39 additions & 22 deletions internal/task/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
"context"
"fmt"
"runtime/debug"
"sync"
"time"

"github.com/hashicorp/go-multierror"

"github.com/anchore/go-sync"
"github.com/anchore/syft/internal/log"
"github.com/anchore/syft/internal/sbomsync"
"github.com/anchore/syft/syft/event/monitor"
Expand All @@ -35,31 +35,48 @@ func NewTaskExecutor(tasks []Task, numWorkers int) *Executor {
}

func (p *Executor) Execute(ctx context.Context, resolver file.Resolver, s sbomsync.Builder, prog *monitor.CatalogerTaskProgress) error {
var errs error
wg := &sync.WaitGroup{}
for i := 0; i < p.numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()

for {
tsk, ok := <-p.tasks
if !ok {
return
}

if err := runTaskSafely(ctx, tsk, resolver, s); err != nil {
errs = multierror.Append(errs, fmt.Errorf("failed to run task: %w", err))
prog.SetError(err)
}
prog.Increment()
exec := sync.ContextExecutor(ctx)

collector := sync.NewCollector[error](exec)

run := func(tsk Task) sync.ProviderFunc[error] {
return func() error {
if err := runTaskSafely(ctx, tsk, resolver, s); err != nil {
prog.SetError(err)
return err
}
}()
prog.Increment()
return nil
}
}

for {
tsk, ok := <-p.tasks
if !ok {
break
}

collector.Provide(run(tsk))
}

wg.Wait()
errs := collector.Collect()

if len(errs) == 0 {
return nil
}

var nonNilErrs []error
for _, err := range errs {
if err != nil {
nonNilErrs = append(nonNilErrs, err)
}
}

if len(nonNilErrs) == 0 {
return nil
}

return errs
return multierror.Append(nil, nonNilErrs...)
}

func runTaskSafely(ctx context.Context, t Task, resolver file.Resolver, s sbomsync.Builder) (err error) {
Expand Down
62 changes: 49 additions & 13 deletions syft/file/cataloger/filedigest/cataloger.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/dustin/go-humanize"

"github.com/anchore/go-sync"
stereoscopeFile "github.com/anchore/stereoscope/pkg/file"
"github.com/anchore/syft/internal"
"github.com/anchore/syft/internal/bus"
Expand All @@ -30,6 +31,12 @@ func NewCataloger(hashes []crypto.Hash) *Cataloger {
}
}

type result struct {
coordinates file.Coordinates
digests []file.Digest
err error
}

func (i *Cataloger) Catalog(ctx context.Context, resolver file.Resolver, coordinates ...file.Coordinates) (map[file.Coordinates][]file.Digest, error) {
results := make(map[file.Coordinates][]file.Digest)
var locations []file.Location
Expand All @@ -46,37 +53,66 @@ func (i *Cataloger) Catalog(ctx context.Context, resolver file.Resolver, coordin
}
}

exec := sync.ContextExecutor(ctx)

collector := sync.NewCollector[result](exec)

prog := catalogingProgress(int64(len(locations)))
for _, location := range locations {
result, err := i.catalogLocation(resolver, location)
collector.Provide(i.run(resolver, location, prog))
}

if errors.Is(err, ErrUndigestableFile) {
for _, r := range collector.Collect() {
if r.err != nil {
log.Warnf("failed to process file %q: %+v", r.coordinates.RealPath, r.err)
continue
}

results[r.coordinates] = append(results[r.coordinates], r.digests...)
}

log.Debugf("file digests cataloger processed %d files", prog.Current())

prog.AtomicStage.Set(fmt.Sprintf("%s files", humanize.Comma(prog.Current())))
prog.SetCompleted()

return results, nil
}

func (i *Cataloger) run(resolver file.Resolver, location file.Location, prog *monitor.CatalogerTaskProgress) sync.ProviderFunc[result] {
return func() result {
digests, err := i.catalogLocation(resolver, location)

if errors.Is(err, ErrUndigestableFile) {
return result{
coordinates: location.Coordinates,
}
}

prog.AtomicStage.Set(location.Path())

if internal.IsErrPathPermission(err) {
log.Debugf("file digests cataloger skipping %q: %+v", location.RealPath, err)
continue
return result{
coordinates: location.Coordinates,
}
}

if err != nil {
prog.SetError(err)
return nil, fmt.Errorf("failed to process file %q: %w", location.RealPath, err)
return result{
coordinates: location.Coordinates,
err: fmt.Errorf("failed to process file %q: %w", location.RealPath, err),
}
}

prog.Increment()

results[location.Coordinates] = result
return result{
coordinates: location.Coordinates,
digests: digests,
err: err,
}
}

log.Debugf("file digests cataloger processed %d files", prog.Current())

prog.AtomicStage.Set(fmt.Sprintf("%s files", humanize.Comma(prog.Current())))
prog.SetCompleted()

return results, nil
}

func (i *Cataloger) catalogLocation(resolver file.Resolver, location file.Location) ([]file.Digest, error) {
Expand Down
48 changes: 39 additions & 9 deletions syft/pkg/cataloger/generic/cataloger.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/anchore/go-logger"
"github.com/anchore/go-sync"
"github.com/anchore/syft/internal"
"github.com/anchore/syft/internal/log"
"github.com/anchore/syft/syft/artifact"
Expand Down Expand Up @@ -147,36 +148,65 @@ func (c *Cataloger) Name() string {
return c.upstreamCataloger
}

type parserResult struct {
packages []pkg.Package
relationships []artifact.Relationship
err error
}

// Catalog is given an object to resolve file references and content, this function returns any discovered Packages after analyzing the catalog source.
func (c *Cataloger) Catalog(ctx context.Context, resolver file.Resolver) ([]pkg.Package, []artifact.Relationship, error) {
var packages []pkg.Package
var relationships []artifact.Relationship

logger := log.Nested("cataloger", c.upstreamCataloger)

env := Environment{
// TODO: consider passing into the cataloger, this would affect the cataloger interface (and all implementations). This can be deferred until later.
LinuxRelease: linux.IdentifyRelease(resolver),
}

exec := sync.ContextExecutor(ctx)

collector := sync.NewCollector[parserResult](exec)

for _, req := range c.selectFiles(resolver) {
collector.Provide(c.run(ctx, resolver, req, env))
}

results := collector.Collect()

for _, result := range results {
packages = append(packages, result.packages...)
relationships = append(relationships, result.relationships...)
}

return c.process(ctx, resolver, packages, relationships, nil)
}

func (c *Cataloger) run(ctx context.Context, resolver file.Resolver, req request, env Environment) sync.ProviderFunc[parserResult] {
return func() parserResult {
lgr := log.Nested("cataloger", c.upstreamCataloger)

location, parser := req.Location, req.Parser

log.WithFields("path", location.RealPath).Trace("parsing file contents")

discoveredPackages, discoveredRelationships, err := invokeParser(ctx, resolver, location, logger, parser, &env)
discoveredPackages, discoveredRelationships, err := invokeParser(ctx, resolver, location, lgr, parser, &env)
if err != nil {
continue // logging is handled within invokeParser
// note: logging is handled within invokeParser
return parserResult{
err: err,
}
}

for _, p := range discoveredPackages {
p.FoundBy = c.upstreamCataloger
packages = append(packages, p)
for i := range discoveredPackages {
discoveredPackages[i].FoundBy = c.upstreamCataloger
}

relationships = append(relationships, discoveredRelationships...)
return parserResult{
packages: discoveredPackages,
relationships: discoveredRelationships,
}
}
return c.process(ctx, resolver, packages, relationships, nil)
}

func (c *Cataloger) process(ctx context.Context, resolver file.Resolver, pkgs []pkg.Package, rels []artifact.Relationship, err error) ([]pkg.Package, []artifact.Relationship, error) {
Expand Down

0 comments on commit edd910f

Please sign in to comment.