Skip to content

Commit

Permalink
Merge pull request bf2fc6cc711aee1a0c2a#1705 from MikeEdgar/in-memory…
Browse files Browse the repository at this point in the history
…-storage

fix: create or update in-memory storage for both Lock and Store ops
  • Loading branch information
machi1990 authored Apr 18, 2023
2 parents 0cfd412 + 4b4a720 commit ce2958d
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package kafkatlscertmgmt
import (
"context"
"fmt"
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/db"
"time"

"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/db"

"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/kafka/internal/config"
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/logger"
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/shared"
Expand Down Expand Up @@ -188,7 +189,7 @@ func NewKafkaTLSCertificateManagementService(
Path: "secrets/tls/",
}
case config.InMemoryTLSCertStorageType:
storage = newInMemoryStorage()
storage = newInMemoryStorage(connectionFactory)
case config.SecureTLSCertStorageType:
storage, err = newSecureStorage(connectionFactory, awsConfig, kafkaTLSCertificateManagementConfig.AutomaticCertificateManagementConfig)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"

"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/kafka/internal/config"
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/db"
"github.com/caddyserver/certmagic"
"github.com/onsi/gomega"
)
Expand All @@ -20,7 +21,7 @@ func Test_kafkaTLSCertificateManagementService_GetCertificate(t *testing.T) {
request GetCertificateRequest
}

storageWithCerts := newInMemoryStorage()
storageWithCerts := newInMemoryStorage(db.NewMockConnectionFactory(nil))
crtRef := "some-crt-ref"
keyRef := "some-key-ref"

Expand Down Expand Up @@ -102,7 +103,7 @@ func Test_kafkaTLSCertificateManagementService_GetCertificate(t *testing.T) {
{
name: "should return an error when loading from the storage returns an error",
fields: fields{
storage: newInMemoryStorage(),
storage: newInMemoryStorage(db.NewMockConnectionFactory(nil)),
config: &config.KafkaTLSCertificateManagementConfig{
CertificateManagementStrategy: config.AutomaticCertificateManagement,
},
Expand Down Expand Up @@ -144,7 +145,7 @@ func Test_kafkaTLSCertificateManagementService_RevokeCertificate(t *testing.T) {
reason CertificateRevocationReason
}

inMemoryStorage := newInMemoryStorage()
inMemoryStorage := newInMemoryStorage(db.NewMockConnectionFactory(nil))
certKey := "cert-key"
privateKey := "private-key"
_ = inMemoryStorage.Store(context.Background(), certKey, []byte{})
Expand Down
49 changes: 18 additions & 31 deletions internal/kafka/internal/services/kafkatlscertmgmt/memory_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,65 +4,52 @@ import (
"context"
"io/fs"
"strings"
"sync"
"time"

"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/db"
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/logger"
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/shared/utils/sync"
"github.com/caddyserver/certmagic"
)

var _ certmagic.Storage = &inMemoryStorage{}

type inMemoryStorageItem struct {
value []byte
mu *sync.Mutex
sync.Locker
value []byte
lastModified time.Time
}

func (item inMemoryStorageItem) Lock() {
item.mu.Lock()
}

func (item inMemoryStorageItem) Unlock() {
item.mu.Unlock()
}

type inMemoryStorage struct {
store map[string]inMemoryStorageItem
lock sync.DistributedLockMgr
}

func newInMemoryStorage() *inMemoryStorage {
func newInMemoryStorage(connectionFactory *db.ConnectionFactory) *inMemoryStorage {
return &inMemoryStorage{
store: map[string]inMemoryStorageItem{},
lock: sync.NewDistributedLockMgr(connectionFactory.New()),
}
}

func (storage *inMemoryStorage) Lock(ctx context.Context, key string) error {
mu, ok := storage.store[key]
if !ok {
return fs.ErrNotExist
}

mu.Lock()
return nil
return storage.lock.Lock(key)
}

func (storage *inMemoryStorage) Unlock(ctx context.Context, key string) error {
mu, ok := storage.store[key]
if !ok {
return fs.ErrNotExist
}

mu.Unlock()
return nil
return storage.lock.Unlock(key)
}

func (storage *inMemoryStorage) Store(ctx context.Context, key string, value []byte) error {
storage.store[key] = inMemoryStorageItem{
value: value,
lastModified: time.Now(),
mu: &sync.Mutex{},
mu, ok := storage.store[key]
if !ok {
mu = inMemoryStorageItem{}
}
mu.value = value
mu.lastModified = time.Now()
if strings.HasPrefix(key, "acme/") {
logger.Logger.Infof("storing key '%s' with value %v", key, string(value))
}
storage.store[key] = mu
return nil
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package kafkatlscertmgmt

import (
"context"
"testing"

"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/db"
"github.com/onsi/gomega"
)

func Test_memoryStorage_Load(t *testing.T) {
type args struct {
key string
value []byte
}
tests := []struct {
name string
args args
wantErr bool
}{
{
name: "successfully loads the same value stored",
args: args{
key: "some-key",
value: []byte("some byte"),
},
wantErr: false,
},
}
for _, tt := range tests {
testcase := tt
t.Run(testcase.name, func(t *testing.T) {
t.Parallel()
g := gomega.NewWithT(t)
storage := newInMemoryStorage(db.NewMockConnectionFactory(nil))

storeErr := storage.Store(context.Background(), testcase.args.key, testcase.args.value)
g.Expect(storeErr != nil).To(gomega.Equal(testcase.wantErr))

outputValue, loadErr := storage.Load(context.Background(), testcase.args.key)
g.Expect(loadErr == nil)
g.Expect(outputValue).To(gomega.Equal(testcase.args.value))
})
}
}

0 comments on commit ce2958d

Please sign in to comment.