Skip to content

Commit

Permalink
datastore: fix get query caching
Browse files Browse the repository at this point in the history
This change replaces caching the DB pgx.Rows, which is an iterator, with
the actual potential vulnerabilities. Previously, subsequent records
manifesting in the same query were given a pgx.Rows object that had been
exhausted.

Signed-off-by: crozzy <[email protected]>
  • Loading branch information
crozzy committed Dec 10, 2024
1 parent 4c76010 commit 5aa95eb
Showing 1 changed file with 20 additions and 10 deletions.
30 changes: 20 additions & 10 deletions datastore/postgres/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (s *MatcherStore) Get(ctx context.Context, records []*claircore.IndexRecord
defer tx.Rollback(ctx)
// start a batch
batch := &pgx.Batch{}
resCache := map[string]pgx.Rows{}
resCache := map[string][]*claircore.Vulnerability{}
rqs := []*recordQuery{}
for _, record := range records {
query, err := buildGetQuery(record, &opts)
Expand All @@ -72,7 +72,6 @@ func (s *MatcherStore) Get(ctx context.Context, records []*claircore.IndexRecord
resCache[query] = nil
}
// send the batch

start := time.Now()
res := tx.SendBatch(ctx, batch)
// Can't just defer the close, because the batch must be fully handled
Expand All @@ -83,17 +82,28 @@ func (s *MatcherStore) Get(ctx context.Context, records []*claircore.IndexRecord
results := make(map[string][]*claircore.Vulnerability)
vulnSet := make(map[string]map[string]struct{})
for _, rq := range rqs {
rows, ok := resCache[rq.query]
rid := rq.record.Package.ID
vulns, ok := resCache[rq.query]
if !ok {
return nil, fmt.Errorf("unexpected vulnerability query: %s", rq.query)
}
if rows == nil {
rows, err = res.Query()
if err != nil {
res.Close()
return nil, err
if vulns != nil { // We already have results we don't need to go back to the DB.
if _, ok := vulnSet[rid]; !ok {
vulnSet[rid] = make(map[string]struct{})
}
resCache[rq.query] = rows
for _, v := range vulns {
if _, ok := vulnSet[rid][v.ID]; !ok {
vulnSet[rid][v.ID] = struct{}{}
results[rid] = append(results[rid], v)
}
}
continue
}
results[rid] = []*claircore.Vulnerability{}
rows, err := res.Query()
if err != nil {
res.Close()
return nil, fmt.Errorf("error getting rows: %w", err)
}

// unpack all returned rows into claircore.Vulnerability structs
Expand Down Expand Up @@ -140,7 +150,6 @@ func (s *MatcherStore) Get(ctx context.Context, records []*claircore.IndexRecord
return nil, fmt.Errorf("failed to scan vulnerability: %v", err)
}

rid := rq.record.Package.ID
if _, ok := vulnSet[rid]; !ok {
vulnSet[rid] = make(map[string]struct{})
}
Expand All @@ -149,6 +158,7 @@ func (s *MatcherStore) Get(ctx context.Context, records []*claircore.IndexRecord
results[rid] = append(results[rid], v)
}
}
resCache[rq.query] = results[rid]
}
if err := res.Close(); err != nil {
return nil, fmt.Errorf("some weird batch error: %v", err)
Expand Down

0 comments on commit 5aa95eb

Please sign in to comment.