Skip to content

Commit

Permalink
Userspace Convertor: Add Deduplication Version Support
Browse files Browse the repository at this point in the history
Introduce a simple versioning system for the userspace convertor. This
prevents inconsistencies with an existing DB if a new version is used

Signed-off-by: Esteban <[email protected]>
  • Loading branch information
estebanreyl committed Mar 19, 2024
1 parent e68524b commit eb62ab2
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 45 deletions.
9 changes: 5 additions & 4 deletions cmd/convertor/builder/overlaybd_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"

testingresources "github.com/containerd/accelerated-container-image/cmd/convertor/testingresources"
"github.com/containerd/accelerated-container-image/pkg/version"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
_ "github.com/containerd/containerd/pkg/testutil" // Handle custom root flag
Expand All @@ -31,7 +32,7 @@ import (

func Test_overlaybd_builder_CheckForConvertedLayer(t *testing.T) {
ctx := context.Background()
db := testingresources.NewLocalDB()
db := testingresources.NewLocalDB(version.GetUserSpaceConsistencyVersion())
resolver := testingresources.GetTestResolver(t, ctx)
fetcher := testingresources.GetTestFetcherFromResolver(t, ctx, resolver, testingresources.DockerV2_Manifest_Simple_Ref)
base := &builderEngineBase{
Expand Down Expand Up @@ -107,7 +108,7 @@ func Test_overlaybd_builder_CheckForConvertedLayer(t *testing.T) {
rc.Close()
})

base.db = testingresources.NewLocalDB() // Reset DB
base.db = testingresources.NewLocalDB(version.GetUserSpaceConsistencyVersion()) // Reset DB
digestNotInRegistry := digest.FromString("Not in reg")
err = base.db.CreateLayerEntry(ctx, e.host, e.repository, digestNotInRegistry, fakeChainId, 10)
if err != nil {
Expand All @@ -124,7 +125,7 @@ func Test_overlaybd_builder_CheckForConvertedLayer(t *testing.T) {

func Test_overlaybd_builder_CheckForConvertedManifest(t *testing.T) {
ctx := context.Background()
db := testingresources.NewLocalDB()
db := testingresources.NewLocalDB(version.GetUserSpaceConsistencyVersion())
resolver := testingresources.GetTestResolver(t, ctx)
fetcher := testingresources.GetTestFetcherFromResolver(t, ctx, resolver, testingresources.DockerV2_Manifest_Simple_Ref)

Expand Down Expand Up @@ -207,7 +208,7 @@ func Test_overlaybd_builder_CheckForConvertedManifest(t *testing.T) {
rc.Close()
})

base.db = testingresources.NewLocalDB() // Reset DB
base.db = testingresources.NewLocalDB(version.GetUserSpaceConsistencyVersion()) // Reset DB
digestNotInRegistry := digest.FromString("Not in reg")
err = base.db.CreateManifestEntry(ctx, e.host, e.repository, outputDesc.MediaType, inputDesc.Digest, digestNotInRegistry, outputDesc.Size)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions cmd/convertor/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type LayerEntry struct {
Repository string
ChainID string
Host string
Version string
}

type ManifestEntry struct {
Expand All @@ -51,4 +52,5 @@ type ManifestEntry struct {
Repository string
Host string
MediaType string
Version string
}
33 changes: 18 additions & 15 deletions cmd/convertor/database/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,36 +21,39 @@ import (
"database/sql"
"fmt"

"github.com/containerd/accelerated-container-image/pkg/version"
"github.com/containerd/containerd/log"
"github.com/opencontainers/go-digest"
)

type sqldb struct {
db *sql.DB
db *sql.DB
version version.UserspaceVersion
}

func NewSqlDB(db *sql.DB) ConversionDatabase {
func NewSqlDB(db *sql.DB, version version.UserspaceVersion) ConversionDatabase {
return &sqldb{
db: db,
db: db,
version: version,
}
}

func (m *sqldb) CreateLayerEntry(ctx context.Context, host, repository string, convertedDigest digest.Digest, chainID string, size int64) error {
_, err := m.db.ExecContext(ctx, "insert into overlaybd_layers(host, repo, chain_id, data_digest, data_size) values(?, ?, ?, ?, ?)", host, repository, chainID, convertedDigest, size)
_, err := m.db.ExecContext(ctx, "insert into overlaybd_layers(host, repo, chain_id, data_digest, data_size, version) values(?, ?, ?, ?, ?, ?)", host, repository, chainID, convertedDigest, size, m.version.LayerVersion)
return err
}

func (m *sqldb) GetLayerEntryForRepo(ctx context.Context, host, repository, chainID string) *LayerEntry {
var entry LayerEntry
row := m.db.QueryRowContext(ctx, "select host, repo, chain_id, data_digest, data_size from overlaybd_layers where host=? and repo=? and chain_id=?", host, repository, chainID)
if err := row.Scan(&entry.Host, &entry.Repository, &entry.ChainID, &entry.ConvertedDigest, &entry.DataSize); err != nil {
row := m.db.QueryRowContext(ctx, "select host, repo, chain_id, data_digest, data_size, version from overlaybd_layers where host=? and repo=? and chain_id=? and version=?", host, repository, chainID, m.version.LayerVersion)
if err := row.Scan(&entry.Host, &entry.Repository, &entry.ChainID, &entry.ConvertedDigest, &entry.DataSize, &entry.Version); err != nil {
return nil
}
return &entry
}

func (m *sqldb) GetCrossRepoLayerEntries(ctx context.Context, host, chainID string) []*LayerEntry {
rows, err := m.db.QueryContext(ctx, "select host, repo, chain_id, data_digest, data_size from overlaybd_layers where host=? and chain_id=?", host, chainID)
rows, err := m.db.QueryContext(ctx, "select host, repo, chain_id, data_digest, data_size, version from overlaybd_layers where host=? and chain_id=? and version=?", host, chainID, m.version.LayerVersion)
if err != nil {
if err == sql.ErrNoRows {
return nil
Expand All @@ -61,7 +64,7 @@ func (m *sqldb) GetCrossRepoLayerEntries(ctx context.Context, host, chainID stri
var entries []*LayerEntry
for rows.Next() {
var entry LayerEntry
err = rows.Scan(&entry.Host, &entry.Repository, &entry.ChainID, &entry.ConvertedDigest, &entry.DataSize)
err = rows.Scan(&entry.Host, &entry.Repository, &entry.ChainID, &entry.ConvertedDigest, &entry.DataSize, &entry.Version)
if err != nil {
continue
}
Expand All @@ -72,29 +75,29 @@ func (m *sqldb) GetCrossRepoLayerEntries(ctx context.Context, host, chainID stri
}

func (m *sqldb) DeleteLayerEntry(ctx context.Context, host, repository string, chainID string) error {
_, err := m.db.Exec("delete from overlaybd_layers where host=? and repo=? and chain_id=?", host, repository, chainID)
_, err := m.db.Exec("delete from overlaybd_layers where host=? and repo=? and chain_id=? and version=?", host, repository, chainID, m.version.LayerVersion)
if err != nil {
return fmt.Errorf("failed to remove invalid record in db: %w", err)
}
return nil
}

func (m *sqldb) CreateManifestEntry(ctx context.Context, host, repository, mediaType string, original, convertedDigest digest.Digest, size int64) error {
_, err := m.db.ExecContext(ctx, "insert into overlaybd_manifests(host, repo, src_digest, out_digest, data_size, mediatype) values(?, ?, ?, ?, ?, ?)", host, repository, original, convertedDigest, size, mediaType)
_, err := m.db.ExecContext(ctx, "insert into overlaybd_manifests(host, repo, src_digest, out_digest, data_size, mediatype, version) values(?, ?, ?, ?, ?, ?, ?)", host, repository, original, convertedDigest, size, mediaType, m.version.ManifestVersion)
return err
}

func (m *sqldb) GetManifestEntryForRepo(ctx context.Context, host, repository, mediaType string, original digest.Digest) *ManifestEntry {
var entry ManifestEntry
row := m.db.QueryRowContext(ctx, "select host, repo, src_digest, out_digest, data_size, mediatype from overlaybd_manifests where host=? and repo=? and src_digest=? and mediatype=?", host, repository, original, mediaType)
if err := row.Scan(&entry.Host, &entry.Repository, &entry.OriginalDigest, &entry.ConvertedDigest, &entry.DataSize, &entry.MediaType); err != nil {
row := m.db.QueryRowContext(ctx, "select host, repo, src_digest, out_digest, data_size, mediatype, version from overlaybd_manifests where host=? and repo=? and src_digest=? and mediatype=? version=?", host, repository, original, mediaType, m.version.ManifestVersion)
if err := row.Scan(&entry.Host, &entry.Repository, &entry.OriginalDigest, &entry.ConvertedDigest, &entry.DataSize, &entry.MediaType, &entry.Version); err != nil {
return nil
}
return &entry
}

func (m *sqldb) GetCrossRepoManifestEntries(ctx context.Context, host, mediaType string, original digest.Digest) []*ManifestEntry {
rows, err := m.db.QueryContext(ctx, "select host, repo, src_digest, out_digest, data_size, mediatype from overlaybd_manifests where host=? and src_digest=? and mediatype=?", host, original, mediaType)
rows, err := m.db.QueryContext(ctx, "select host, repo, src_digest, out_digest, data_size, mediatype, version from overlaybd_manifests where host=? and src_digest=? and mediatype=? and version=?", host, original, mediaType, m.version.ManifestVersion)
if err != nil {
if err == sql.ErrNoRows {
return nil
Expand All @@ -105,7 +108,7 @@ func (m *sqldb) GetCrossRepoManifestEntries(ctx context.Context, host, mediaType
var entries []*ManifestEntry
for rows.Next() {
var entry ManifestEntry
err = rows.Scan(&entry.Host, &entry.Repository, &entry.OriginalDigest, &entry.ConvertedDigest, &entry.DataSize, &entry.MediaType)
err = rows.Scan(&entry.Host, &entry.Repository, &entry.OriginalDigest, &entry.ConvertedDigest, &entry.DataSize, &entry.MediaType, &entry.Version)
if err != nil {
continue
}
Expand All @@ -116,7 +119,7 @@ func (m *sqldb) GetCrossRepoManifestEntries(ctx context.Context, host, mediaType
}

func (m *sqldb) DeleteManifestEntry(ctx context.Context, host, repository, mediaType string, original digest.Digest) error {
_, err := m.db.Exec("delete from overlaybd_manifests where host=? and repo=? and src_digest=? and mediatype=?", host, repository, original, mediaType)
_, err := m.db.Exec("delete from overlaybd_manifests where host=? and repo=? and src_digest=? and mediatype=? and version=?", host, repository, original, mediaType, m.version.ManifestVersion)
if err != nil {
return fmt.Errorf("failed to remove invalid record in db: %w", err)
}
Expand Down
41 changes: 25 additions & 16 deletions cmd/convertor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,31 @@ import (

"github.com/containerd/accelerated-container-image/cmd/convertor/builder"
"github.com/containerd/accelerated-container-image/cmd/convertor/database"
"github.com/containerd/accelerated-container-image/pkg/version"
_ "github.com/go-sql-driver/mysql"
"github.com/sirupsen/logrus"

"github.com/spf13/cobra"
)

var (
repo string
user string
plain bool
tagInput string
tagOutput string
dir string
oci bool
mkfs bool
verbose bool
vsize int
fastoci string
turboOCI string
overlaybd string
dbstr string
dbType string
repo string
user string
plain bool
tagInput string
tagOutput string
dir string
oci bool
mkfs bool
verbose bool
vsize int
fastoci string
turboOCI string
overlaybd string
dbstr string
dbType string
dbLayerVersion string
dbMnfstVersion string

// certification
certDirs []string
Expand Down Expand Up @@ -114,7 +117,11 @@ var (
logrus.Errorf("failed to open the provided mysql db: %v", err)
os.Exit(1)
}
opt.DB = database.NewSqlDB(db)
userspaceVersion := version.UserspaceVersion{
LayerVersion: dbLayerVersion,
ManifestVersion: dbMnfstVersion,
}
opt.DB = database.NewSqlDB(db, userspaceVersion)
case "":
default:
logrus.Warnf("db-type %s was provided but is not one of known db types. Available: mysql", dbType)
Expand Down Expand Up @@ -179,6 +186,8 @@ func init() {
rootCmd.Flags().BoolVar(&reserve, "reserve", false, "reserve tmp data")
rootCmd.Flags().BoolVar(&noUpload, "no-upload", false, "don't upload layer and manifest")
rootCmd.Flags().BoolVar(&dumpManifest, "dump-manifest", false, "dump manifest")
rootCmd.Flags().StringVar(&dbLayerVersion, "db-layer-version-override", version.GetUserSpaceConsistencyVersion().LayerVersion, "version of db to use for conversion deduplication. Default 1")
rootCmd.Flags().StringVar(&dbMnfstVersion, "db-manifest-version-override", version.GetUserSpaceConsistencyVersion().ManifestVersion, "version of db to use for conversion deduplication. Default 1-1")

rootCmd.MarkFlagRequired("repository")
rootCmd.MarkFlagRequired("input-tag")
Expand Down
6 changes: 4 additions & 2 deletions cmd/convertor/resources/samples/mysql.conf
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
CREATE database conversioncache;
USE conversioncache;
CREATE TABLE `overlaybd_layers` (
`consistency_version` int(6) NOT NULL,
`host` varchar(255) NOT NULL,
`repo` varchar(255) NOT NULL,
`chain_id` varchar(255) NOT NULL COMMENT 'chain-id of the normal image layer',
`data_digest` varchar(255) NOT NULL COMMENT 'digest of overlaybd layer',
`data_size` bigint(20) NOT NULL COMMENT 'size of overlaybd layer',
PRIMARY KEY (`host`,`repo`,`chain_id`),
KEY `index_registry_chainId` (`host`,`chain_id`) USING BTREE
KEY `index_registry_chainId` (`consistency_version`,`host`,`chain_id`) USING BTREE
) DEFAULT CHARSET=utf8;

CREATE TABLE `overlaybd_manifests` (
`consistency_version` int(6) NOT NULL,
`host` varchar(255) NOT NULL,
`repo` varchar(255) NOT NULL,
`src_digest` varchar(255) NOT NULL COMMENT 'digest of the normal image manifest',
`out_digest` varchar(255) NOT NULL COMMENT 'digest of overlaybd manifest',
`data_size` bigint(20) NOT NULL COMMENT 'size of overlaybd manifest',
`mediatype` varchar(255) NOT NULL COMMENT 'mediatype of the converted image manifest',
PRIMARY KEY (`host`,`repo`,`src_digest`, `mediatype`),
KEY `index_registry_src_digest` (`host`,`src_digest`, `mediatype`) USING BTREE
KEY `index_registry_src_digest` (`consistency_version`,`host`,`src_digest`, `mediatype`) USING BTREE
) DEFAULT CHARSET=utf8;
22 changes: 14 additions & 8 deletions cmd/convertor/testingresources/local_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sync"

"github.com/containerd/accelerated-container-image/cmd/convertor/database"
"github.com/containerd/accelerated-container-image/pkg/version"
"github.com/opencontainers/go-digest"
)

Expand All @@ -29,11 +30,14 @@ type localdb struct {
manifestRecords []*database.ManifestEntry
layerLock sync.Mutex // Protects layerRecords
manifestLock sync.Mutex // Protects manifestRecords
version version.UserspaceVersion
}

// NewLocalDB returns a new local database for testing. This is a simple unoptimized in-memory database.
func NewLocalDB() database.ConversionDatabase {
return &localdb{}
func NewLocalDB(ver version.UserspaceVersion) database.ConversionDatabase {
return &localdb{
version: ver,
}
}

func (l *localdb) CreateLayerEntry(ctx context.Context, host string, repository string, convertedDigest digest.Digest, chainID string, size int64) error {
Expand All @@ -45,6 +49,7 @@ func (l *localdb) CreateLayerEntry(ctx context.Context, host string, repository
ChainID: chainID,
ConvertedDigest: convertedDigest,
DataSize: size,
Version: l.version.LayerVersion,
})
return nil
}
Expand All @@ -53,7 +58,7 @@ func (l *localdb) GetLayerEntryForRepo(ctx context.Context, host string, reposit
l.layerLock.Lock()
defer l.layerLock.Unlock()
for _, entry := range l.layerRecords {
if entry.Host == host && entry.ChainID == chainID && entry.Repository == repository {
if entry.Host == host && entry.ChainID == chainID && entry.Repository == repository && entry.Version == l.version.LayerVersion {
return entry
}
}
Expand All @@ -65,7 +70,7 @@ func (l *localdb) GetCrossRepoLayerEntries(ctx context.Context, host, chainID st
defer l.layerLock.Unlock()
var entries []*database.LayerEntry
for _, entry := range l.layerRecords {
if entry.Host == host && entry.ChainID == chainID {
if entry.Host == host && entry.ChainID == chainID && entry.Version == l.version.LayerVersion {
entries = append(entries, entry)
}
}
Expand All @@ -77,7 +82,7 @@ func (l *localdb) DeleteLayerEntry(ctx context.Context, host, repository, chainI
defer l.layerLock.Unlock()
// host - repo - chainID should be unique
for i, entry := range l.layerRecords {
if entry.Host == host && entry.ChainID == chainID && entry.Repository == repository {
if entry.Host == host && entry.ChainID == chainID && entry.Repository == repository && entry.Version == l.version.LayerVersion {
l.layerRecords = append(l.layerRecords[:i], l.layerRecords[i+1:]...)
return nil
}
Expand All @@ -95,6 +100,7 @@ func (l *localdb) CreateManifestEntry(ctx context.Context, host, repository, med
ConvertedDigest: convertedDigest,
DataSize: size,
MediaType: mediaType,
Version: l.version.LayerVersion,
})
return nil
}
Expand All @@ -103,7 +109,7 @@ func (l *localdb) GetManifestEntryForRepo(ctx context.Context, host, repository,
l.manifestLock.Lock()
defer l.manifestLock.Unlock()
for _, entry := range l.manifestRecords {
if entry.Host == host && entry.OriginalDigest == original && entry.Repository == repository && entry.MediaType == mediaType {
if entry.Host == host && entry.OriginalDigest == original && entry.Repository == repository && entry.MediaType == mediaType && entry.Version == l.version.ManifestVersion {
return entry
}
}
Expand All @@ -115,7 +121,7 @@ func (l *localdb) GetCrossRepoManifestEntries(ctx context.Context, host, mediaTy
defer l.manifestLock.Unlock()
var entries []*database.ManifestEntry
for _, entry := range l.manifestRecords {
if entry.Host == host && entry.OriginalDigest == original && entry.MediaType == mediaType {
if entry.Host == host && entry.OriginalDigest == original && entry.MediaType == mediaType && entry.Version == l.version.ManifestVersion {
entries = append(entries, entry)
}
}
Expand All @@ -127,7 +133,7 @@ func (l *localdb) DeleteManifestEntry(ctx context.Context, host, repository, med
defer l.manifestLock.Unlock()
// Identify indices of items to be deleted.
for i, entry := range l.manifestRecords {
if entry.Host == host && entry.OriginalDigest == original && entry.Repository == repository && entry.MediaType == mediaType {
if entry.Host == host && entry.OriginalDigest == original && entry.Repository == repository && entry.MediaType == mediaType && entry.Version == l.version.ManifestVersion {
l.manifestRecords = append(l.manifestRecords[:i], l.manifestRecords[i+1:]...)
}
}
Expand Down
Loading

0 comments on commit eb62ab2

Please sign in to comment.