Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ROX-21387: batch insert vulns #1345

Merged
merged 1 commit into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions pkg/vulndump/batch_loader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package vulndump

import (
"encoding/json"
"io"

"github.com/stackrox/scanner/database"
)

const defaultBatchSize = 10_000

// osLoader batch loads OS-level vulnerabilities into a buffer.
type osLoader struct {
rc io.ReadCloser

dec *json.Decoder

batchSize int
buf []database.Vulnerability
done bool
err error
}

func newOSLoader(source io.ReadCloser) (*osLoader, error) {
// See https://pkg.go.dev/encoding/json#example-Decoder.Decode-Stream
// for an example of how this decoder will be used.
dec := json.NewDecoder(source)
// Read the initial token, "[".
_, err := dec.Token()
if err != nil {
return nil, err
}

return &osLoader{
rc: source,
dec: dec,
batchSize: defaultBatchSize,
buf: make([]database.Vulnerability, 0, defaultBatchSize),
}, nil
}

// Next loads the next batch of vulnerabilities and returns
// whether it was successful or not.
func (l *osLoader) Next() bool {
if l.done || l.err != nil {
return false
}

l.buf = l.buf[:0]
for i := 0; i < l.batchSize; i++ {
if !l.dec.More() {
// JSON array has no more values.
l.done = true
return true
}
l.buf = append(l.buf, database.Vulnerability{})
if err := l.dec.Decode(&l.buf[i]); err != nil {
l.err = err
return false
}
}

return true
}

// Vulns returns the "next" bath of vulnerabilities.
// It is expected to be called once for each successful call to Next.
func (l *osLoader) Vulns() []database.Vulnerability {
return l.buf
}

// Err returns the error associated with loading vulnerabilities.
// It is expected to be non-nil upon an unsuccessful call to Next.
func (l *osLoader) Err() error {
return l.err
}

// Close closes the loader.
func (l *osLoader) Close() error {
l.buf = nil // hint to GC to clean this.
// Don't bother reading the final token, "]",
// as it is possible there was a failure reading
// the JSON array. Just close the file.
return l.rc.Close()
}
111 changes: 111 additions & 0 deletions pkg/vulndump/batch_loader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package vulndump

import (
"archive/zip"
"io"
"net/http"
"os"
"path/filepath"
"runtime"
"testing"
"time"

"github.com/stackrox/rox/pkg/utils"
"github.com/stackrox/scanner/pkg/ziputil"
"github.com/stretchr/testify/require"
)

const (
defURL = "https://definitions.stackrox.io/93AEC554-29EE-4E24-96D6-744092A98444/diff.zip"
offlineURL = "https://install.stackrox.io/scanner/scanner-vuln-updates.zip"
)

func mustFetchOSVulns(b *testing.B) *os.File {
f, err := os.Create(filepath.Join(b.TempDir(), "vulns.zip"))
require.NoError(b, err)

c := &http.Client{Timeout: 30 * time.Second}
resp, err := c.Get(defURL)
require.NoError(b, err)
defer utils.IgnoreError(resp.Body.Close)

_, err = io.Copy(f, resp.Body)
require.NoError(b, err)

return f
}

func mustFetchOfflineOSVulns(b *testing.B) *os.File {
tmpPath := filepath.Join(b.TempDir(), "tmp.zip")
tmpF, err := os.Create(tmpPath)
require.NoError(b, err)
defer func() {
_ = os.Remove(tmpPath)
}()
defer utils.IgnoreError(tmpF.Close)

c := &http.Client{Timeout: 30 * time.Second}
resp, err := c.Get(offlineURL)
require.NoError(b, err)
defer utils.IgnoreError(resp.Body.Close)

_, err = io.Copy(tmpF, resp.Body)
require.NoError(b, err)

tmpZIP, err := zip.OpenReader(tmpPath)
require.NoError(b, err)
defer utils.IgnoreError(tmpZIP.Close)
rc, err := ziputil.OpenFile(&tmpZIP.Reader, "scanner-defs.zip")
require.NoError(b, err)
defer utils.IgnoreError(rc.Close)

f, err := os.Create(filepath.Join(b.TempDir(), "vulns.zip"))
require.NoError(b, err)

_, err = io.Copy(f, rc)
require.NoError(b, err)

return f
}

func BenchmarkOSLoader(b *testing.B) {
f := mustFetchOSVulns(b)
defer utils.IgnoreError(f.Close)

benchmarkOSLoader(b, f)
}

func BenchmarkOSLoader_Offline(b *testing.B) {
f := mustFetchOfflineOSVulns(b)
defer utils.IgnoreError(f.Close)

benchmarkOSLoader(b, f)
}

func benchmarkOSLoader(b *testing.B, f *os.File) {
zipR, err := zip.OpenReader(f.Name())
require.NoError(b, err)
defer utils.IgnoreError(zipR.Close)
vulnsF, err := ziputil.OpenFile(&zipR.Reader, OSVulnsFileName)
require.NoError(b, err)
defer utils.IgnoreError(vulnsF.Close)

runtime.GC()

loader, err := newOSLoader(vulnsF)
require.NoError(b, err)
defer func() {
require.NoError(b, loader.Close())
}()

b.ResetTimer()
for i := 0; i < b.N; i++ {
var n int
for loader.Next() {
vulns := loader.Vulns()
n += len(vulns)
}
require.NoError(b, loader.Err())
b.Logf("Loaded %d vulns", n)
}
}
46 changes: 38 additions & 8 deletions pkg/vulndump/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,15 @@ func LoadOSVulnsFromDump(zipR *zip.Reader) ([]database.Vulnerability, error) {
return vulns, nil
}

func getOSLoader(zipR *zip.Reader) (*osLoader, error) {
osVulnsFile, err := ziputil.OpenFile(zipR, OSVulnsFileName)
if err != nil {
return nil, errors.Wrap(err, "opening OS vulns file")
}

return newOSLoader(osVulnsFile)
}

func renew(sig *concurrency.Signal, db database.Datastore, interval time.Duration, expiration time.Time, instanceName string) {
// Give a buffer for this instance to renew the lock
expirationDuration := time.Until(expiration) - 10*time.Second
Expand Down Expand Up @@ -188,16 +197,27 @@ func startVulnLoad(manifest *Manifest, db database.Datastore, updateInterval tim

func loadOSVulns(zipR *zip.Reader, db database.Datastore) error {
log.Info("Loading OS vulns...")
osVulns, err := LoadOSVulnsFromDump(zipR)
loader, err := getOSLoader(zipR)
if err != nil {
return err
}
log.Infof("Done loading OS vulns. There are %d vulns to insert into the DB", len(osVulns))
defer utils.IgnoreError(loader.Close)

var n int
for loader.Next() {
osVulns := loader.Vulns()

n += len(osVulns)
log.Infof("Loaded %d OS-level vulns. Total OS-level vulns: %d", len(osVulns), n)

if err := db.InsertVulnerabilities(osVulns); err != nil {
return errors.Wrap(err, "inserting vulns into the DB")
if err := db.InsertVulnerabilities(osVulns); err != nil {
return errors.Wrap(err, "inserting vulns into the DB")
}
}
if loader.Err() != nil {
return loader.Err()
}
log.Info("Done inserting OS vulns into the DB")

return nil
}

Expand Down Expand Up @@ -275,19 +295,21 @@ func UpdateFromVulnDump(zipPath string, db database.Datastore, updateInterval ti
}
defer utils.IgnoreError(zipR.Close)

log.Info("Loading manifest...")
log.Info("Loading vulnerability manifest")
manifest, err := LoadManifestFromDump(&zipR.Reader)
if err != nil {
return err
}
log.Info("Loaded manifest")
log.Info("Successfully loaded vulnerability manifest")

if db != nil {
performUpdate, finishFn, err := startVulnLoad(manifest, db, updateInterval, instanceName)
if err != nil {
return errors.Wrap(err, "error beginning vuln loading")
}
if performUpdate {
log.Info("Loading OS-level vulnerabilities")

if err := loadRHELv2Vulns(db, &zipR.Reader, repoToCPE); err != nil {
_ = finishFn(err)
return errors.Wrap(err, "error loading RHEL vulns")
Expand All @@ -301,15 +323,23 @@ func UpdateFromVulnDump(zipPath string, db database.Datastore, updateInterval ti
if err := finishFn(nil); err != nil {
return errors.Wrap(err, "error ending vuln loading")
}

log.Info("Loaded OS-level vulnerabilities successfully")
}
}

log.Info("Loading application-level vulnerabilities")
errorList := errorhelpers.NewErrorList("loading application-level caches")
for _, appCache := range caches {
if err := loadApplicationUpdater(appCache, manifest, &zipR.Reader); err != nil {
errorList.AddError(errors.Wrapf(err, "error loading into in-mem cache %q", appCache.Dir()))
}
}
if err := errorList.ToError(); err != nil {
return err
}

return errorList.ToError()
log.Info("Successfully loaded application-level vulnerabilities")

return nil
}
40 changes: 40 additions & 0 deletions pkg/vulndump/loader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package vulndump

import (
"archive/zip"
"os"
"runtime"
"testing"

"github.com/stackrox/rox/pkg/utils"
"github.com/stretchr/testify/require"
)

func BenchmarkLoadOSVulnsFromDump(b *testing.B) {
f := mustFetchOSVulns(b)
defer utils.IgnoreError(f.Close)

benchmarkLoadOSVulnsFromDump(b, f)
}

func BenchmarkLoadOSVulnsFromDump_Offline(b *testing.B) {
f := mustFetchOfflineOSVulns(b)
defer utils.IgnoreError(f.Close)

benchmarkLoadOSVulnsFromDump(b, f)
}

func benchmarkLoadOSVulnsFromDump(b *testing.B, f *os.File) {
zipR, err := zip.OpenReader(f.Name())
require.NoError(b, err)
defer utils.IgnoreError(zipR.Close)

runtime.GC()

b.ResetTimer()
for i := 0; i < b.N; i++ {
vulns, err := LoadOSVulnsFromDump(&zipR.Reader)
require.NoError(b, err)
b.Logf("Loaded %d vulns", len(vulns))
}
}