Skip to content

Commit

Permalink
chore: batch insert vulns
Browse files Browse the repository at this point in the history
  • Loading branch information
RTann committed Dec 12, 2023
1 parent b75b87d commit 6e461eb
Show file tree
Hide file tree
Showing 4 changed files with 274 additions and 8 deletions.
85 changes: 85 additions & 0 deletions pkg/vulndump/batch_loader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package vulndump

import (
"encoding/json"
"io"

"github.com/stackrox/scanner/database"
)

const defaultBatchSize = 10_000

// osLoader batch loads OS-level vulnerabilities into a buffer.
type osLoader struct {
rc io.ReadCloser

dec *json.Decoder

batchSize int
buf []database.Vulnerability
done bool
err error
}

func newOSLoader(source io.ReadCloser) (*osLoader, error) {
// See https://pkg.go.dev/encoding/json#example-Decoder.Decode-Stream
// for an example of how this decoder will be used.
dec := json.NewDecoder(source)
// Read the initial token, "[".
_, err := dec.Token()
if err != nil {
return nil, err
}

return &osLoader{
rc: source,
dec: dec,
batchSize: defaultBatchSize,
buf: make([]database.Vulnerability, 0, defaultBatchSize),
}, nil
}

// Next loads the next batch of vulnerabilities and returns
// whether it was successful or not.
func (l *osLoader) Next() bool {
if l.done || l.err != nil {
return false
}

l.buf = l.buf[:0]
for i := 0; i < l.batchSize; i++ {
if !l.dec.More() {
// JSON array has no more values.
l.done = true
return true
}
l.buf = append(l.buf, database.Vulnerability{})
if err := l.dec.Decode(&l.buf[i]); err != nil {
l.err = err
return false
}
}

return true
}

// Vulns returns the "next" bath of vulnerabilities.
// It is expected to be called once for each successful call to Next.
func (l *osLoader) Vulns() []database.Vulnerability {
return l.buf
}

// Err returns the error associated with loading vulnerabilities.
// It is expected to be non-nil upon an unsuccessful call to Next.
func (l *osLoader) Err() error {
return l.err
}

// Close closes the loader.
func (l *osLoader) Close() error {
l.buf = nil // hint to GC to clean this.
// Don't bother reading the final token, "]",
// as it is possible there was a failure reading
// the JSON array. Just close the file.
return l.rc.Close()
}
111 changes: 111 additions & 0 deletions pkg/vulndump/batch_loader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package vulndump

import (
"archive/zip"
"io"
"net/http"
"os"
"path/filepath"
"runtime"
"testing"
"time"

"github.com/stackrox/rox/pkg/utils"
"github.com/stackrox/scanner/pkg/ziputil"
"github.com/stretchr/testify/require"
)

const (
defURL = "https://definitions.stackrox.io/93AEC554-29EE-4E24-96D6-744092A98444/diff.zip"
offlineURL = "https://install.stackrox.io/scanner/scanner-vuln-updates.zip"
)

func mustFetchOSVulns(b *testing.B) *os.File {
f, err := os.Create(filepath.Join(b.TempDir(), "vulns.zip"))
require.NoError(b, err)

c := &http.Client{Timeout: 30 * time.Second}
resp, err := c.Get(defURL)
require.NoError(b, err)
defer utils.IgnoreError(resp.Body.Close)

_, err = io.Copy(f, resp.Body)
require.NoError(b, err)

return f
}

func mustFetchOfflineOSVulns(b *testing.B) *os.File {
tmpPath := filepath.Join(b.TempDir(), "tmp.zip")
tmpF, err := os.Create(tmpPath)
require.NoError(b, err)
defer func() {
_ = os.Remove(tmpPath)
}()
defer utils.IgnoreError(tmpF.Close)

c := &http.Client{Timeout: 30 * time.Second}
resp, err := c.Get(offlineURL)
require.NoError(b, err)
defer utils.IgnoreError(resp.Body.Close)

_, err = io.Copy(tmpF, resp.Body)
require.NoError(b, err)

tmpZIP, err := zip.OpenReader(tmpPath)
require.NoError(b, err)
defer utils.IgnoreError(tmpZIP.Close)
rc, err := ziputil.OpenFile(&tmpZIP.Reader, "scanner-defs.zip")
require.NoError(b, err)
defer utils.IgnoreError(rc.Close)

f, err := os.Create(filepath.Join(b.TempDir(), "vulns.zip"))
require.NoError(b, err)

_, err = io.Copy(f, rc)
require.NoError(b, err)

return f
}

func BenchmarkOSLoader(b *testing.B) {
f := mustFetchOSVulns(b)
defer utils.IgnoreError(f.Close)

benchmarkOSLoader(b, f)
}

func BenchmarkOSLoader_Offline(b *testing.B) {
f := mustFetchOfflineOSVulns(b)
defer utils.IgnoreError(f.Close)

benchmarkOSLoader(b, f)
}

func benchmarkOSLoader(b *testing.B, f *os.File) {
zipR, err := zip.OpenReader(f.Name())
require.NoError(b, err)
defer utils.IgnoreError(zipR.Close)
vulnsF, err := ziputil.OpenFile(&zipR.Reader, OSVulnsFileName)
require.NoError(b, err)
defer utils.IgnoreError(vulnsF.Close)

runtime.GC()

loader, err := newOSLoader(vulnsF)
require.NoError(b, err)
defer func() {
require.NoError(b, loader.Close())
}()

b.ResetTimer()
for i := 0; i < b.N; i++ {
var n int
for loader.Next() {
vulns := loader.Vulns()
n += len(vulns)
}
require.NoError(b, loader.Err())
b.Logf("Loaded %d vulns", n)
}
}
46 changes: 38 additions & 8 deletions pkg/vulndump/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,15 @@ func LoadOSVulnsFromDump(zipR *zip.Reader) ([]database.Vulnerability, error) {
return vulns, nil
}

func getOSLoader(zipR *zip.Reader) (*osLoader, error) {
osVulnsFile, err := ziputil.OpenFile(zipR, OSVulnsFileName)
if err != nil {
return nil, errors.Wrap(err, "opening OS vulns file")
}

return newOSLoader(osVulnsFile)
}

func renew(sig *concurrency.Signal, db database.Datastore, interval time.Duration, expiration time.Time, instanceName string) {
// Give a buffer for this instance to renew the lock
expirationDuration := time.Until(expiration) - 10*time.Second
Expand Down Expand Up @@ -188,16 +197,27 @@ func startVulnLoad(manifest *Manifest, db database.Datastore, updateInterval tim

func loadOSVulns(zipR *zip.Reader, db database.Datastore) error {
log.Info("Loading OS vulns...")
osVulns, err := LoadOSVulnsFromDump(zipR)
loader, err := getOSLoader(zipR)
if err != nil {
return err
}
log.Infof("Done loading OS vulns. There are %d vulns to insert into the DB", len(osVulns))
defer utils.IgnoreError(loader.Close)

var n int
for loader.Next() {
osVulns := loader.Vulns()

n += len(osVulns)
log.Infof("Loaded %d OS-level vulns. Total OS-level vulns: %d", len(osVulns), n)

if err := db.InsertVulnerabilities(osVulns); err != nil {
return errors.Wrap(err, "inserting vulns into the DB")
if err := db.InsertVulnerabilities(osVulns); err != nil {
return errors.Wrap(err, "inserting vulns into the DB")
}
}
if loader.Err() != nil {
return loader.Err()
}
log.Info("Done inserting OS vulns into the DB")

return nil
}

Expand Down Expand Up @@ -275,19 +295,21 @@ func UpdateFromVulnDump(zipPath string, db database.Datastore, updateInterval ti
}
defer utils.IgnoreError(zipR.Close)

log.Info("Loading manifest...")
log.Info("Loading vulnerability manifest")
manifest, err := LoadManifestFromDump(&zipR.Reader)
if err != nil {
return err
}
log.Info("Loaded manifest")
log.Info("Successfully loaded vulnerability manifest")

if db != nil {
performUpdate, finishFn, err := startVulnLoad(manifest, db, updateInterval, instanceName)
if err != nil {
return errors.Wrap(err, "error beginning vuln loading")
}
if performUpdate {
log.Info("Loading OS-level vulnerabilities")

if err := loadRHELv2Vulns(db, &zipR.Reader, repoToCPE); err != nil {
_ = finishFn(err)
return errors.Wrap(err, "error loading RHEL vulns")
Expand All @@ -301,15 +323,23 @@ func UpdateFromVulnDump(zipPath string, db database.Datastore, updateInterval ti
if err := finishFn(nil); err != nil {
return errors.Wrap(err, "error ending vuln loading")
}

log.Info("Loaded OS-level vulnerabilities successfully")
}
}

log.Info("Loading application-level vulnerabilities")
errorList := errorhelpers.NewErrorList("loading application-level caches")
for _, appCache := range caches {
if err := loadApplicationUpdater(appCache, manifest, &zipR.Reader); err != nil {
errorList.AddError(errors.Wrapf(err, "error loading into in-mem cache %q", appCache.Dir()))
}
}
if err := errorList.ToError(); err != nil {
return err
}

return errorList.ToError()
log.Info("Successfully loaded application-level vulnerabilities")

return nil
}
40 changes: 40 additions & 0 deletions pkg/vulndump/loader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package vulndump

import (
"archive/zip"
"os"
"runtime"
"testing"

"github.com/stackrox/rox/pkg/utils"
"github.com/stretchr/testify/require"
)

func BenchmarkLoadOSVulnsFromDump(b *testing.B) {
f := mustFetchOSVulns(b)
defer utils.IgnoreError(f.Close)

benchmarkLoadOSVulnsFromDump(b, f)
}

func BenchmarkLoadOSVulnsFromDump_Offline(b *testing.B) {
f := mustFetchOfflineOSVulns(b)
defer utils.IgnoreError(f.Close)

benchmarkLoadOSVulnsFromDump(b, f)
}

func benchmarkLoadOSVulnsFromDump(b *testing.B, f *os.File) {
zipR, err := zip.OpenReader(f.Name())
require.NoError(b, err)
defer utils.IgnoreError(zipR.Close)

runtime.GC()

b.ResetTimer()
for i := 0; i < b.N; i++ {
vulns, err := LoadOSVulnsFromDump(&zipR.Reader)
require.NoError(b, err)
b.Logf("Loaded %d vulns", len(vulns))
}
}

0 comments on commit 6e461eb

Please sign in to comment.