Skip to content

Commit

Permalink
cache: rework ACL state cache into a generic EACL cache
Browse files Browse the repository at this point in the history
It's much more useful this way especially if we have many requests with EACLs.
ACL status can be deduced from EACL, so we don't need to cache it separately
(at least for now, walking through the list takes some time, but it's nothing
like getting EACL from the network that we did previously in many cases).

Signed-off-by: Roman Khimov <[email protected]>
  • Loading branch information
roman-khimov committed Nov 5, 2024
1 parent 77795c6 commit 8008392
Show file tree
Hide file tree
Showing 12 changed files with 84 additions and 152 deletions.
26 changes: 13 additions & 13 deletions api/cache/eacls.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import (
"time"

"github.com/bluele/gcache"
"github.com/nspcc-dev/neofs-s3-gw/api/data"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/eacl"
"go.uber.org/zap"
)

type (
// BucketACLStateCache contains cache with bucket ACL state.
BucketACLStateCache struct {
// BucketACLCache contains cache with bucket EACL.
BucketACLCache struct {
cache gcache.Cache
logger *zap.Logger
}
Expand All @@ -25,30 +25,30 @@ const (
DefaultEACLCacheLifetime = time.Minute
)

// DefaultBucketACLStateCacheConfig returns new default cache expiration values.
func DefaultBucketACLStateCacheConfig(logger *zap.Logger) *Config {
// DefaultBucketACLCacheConfig returns new default cache expiration values.
func DefaultBucketACLCacheConfig(logger *zap.Logger) *Config {
return &Config{
Size: DefaultEACLCacheSize,
Lifetime: DefaultEACLCacheLifetime,
Logger: logger,
}
}

// NewEACLCache creates an object of BucketACLStateCache.
func NewEACLCache(config *Config) *BucketACLStateCache {
return &BucketACLStateCache{
// NewEACLCache creates an object of BucketACLCache.
func NewEACLCache(config *Config) *BucketACLCache {
return &BucketACLCache{
cache: gcache.New(config.Size).LRU().Expiration(config.Lifetime).Build(),
logger: config.Logger}
}

// Get returns a cached state.
func (o *BucketACLStateCache) Get(id cid.ID) *data.BucketACLState {
func (o *BucketACLCache) Get(id cid.ID) *eacl.Table {
entry, err := o.cache.Get(id.String())
if err != nil {
return nil
}

result, ok := entry.(*data.BucketACLState)
result, ok := entry.(*eacl.Table)
if !ok {
o.logger.Warn("invalid cache entry type",
zap.String("actual", fmt.Sprintf("%T", entry)),
Expand All @@ -61,11 +61,11 @@ func (o *BucketACLStateCache) Get(id cid.ID) *data.BucketACLState {
}

// Put puts a state to cache.
func (o *BucketACLStateCache) Put(id cid.ID, v data.BucketACLState) error {
return o.cache.Set(id.String(), &v)
func (o *BucketACLCache) Put(id cid.ID, v *eacl.Table) error {
return o.cache.Set(id.String(), v)
}

// Delete deletes a state from cache.
func (o *BucketACLStateCache) Delete(id cid.ID) bool {
func (o *BucketACLCache) Delete(id cid.ID) bool {
return o.cache.Remove(id.String())
}
22 changes: 10 additions & 12 deletions api/handler/acl.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ var actionToOpMap = map[string][]eacl.Operation{

var (
errInvalidPublicKey = errors.New("invalid public key")

errBucketOwnerEnforced = errors.New("bucket owner enforced")
)

var rawWildcardJSON = []byte(`"*"`)
Expand Down Expand Up @@ -250,13 +248,13 @@ func (h *handler) PutBucketACLHandler(w http.ResponseWriter, r *http.Request) {
return
}

state, err := h.cachedACLGetter.GetState(r.Context(), bktInfo.CID)
eacl, err := h.obj.GetBucketACL(r.Context(), bktInfo)
if err != nil {
h.logAndSendError(w, "could not get bucket acl state", reqInfo, err)
h.logAndSendError(w, "could not get bucket eacl", reqInfo, err)
return
}

if state == data.BucketACLBucketOwnerEnforced {
if isBucketOwnerForced(eacl.EACL) {
if !isValidOwnerEnforced(r) {
h.logAndSendError(w, "access control list not supported", reqInfo, s3errors.GetAPIError(s3errors.ErrAccessControlListNotSupported))
return
Expand Down Expand Up @@ -381,13 +379,13 @@ func (h *handler) PutObjectACLHandler(w http.ResponseWriter, r *http.Request) {
return
}

state, err := h.cachedACLGetter.GetState(r.Context(), bktInfo.CID)
eacl, err := h.obj.GetBucketACL(r.Context(), bktInfo)
if err != nil {
h.logAndSendError(w, "could not get bucket acl state", reqInfo, err)
h.logAndSendError(w, "could not get bucket eacl", reqInfo, err)
return
}

if state == data.BucketACLBucketOwnerEnforced {
if isBucketOwnerForced(eacl.EACL) {
if !isValidOwnerEnforced(r) {
h.logAndSendError(w, "access control list not supported", reqInfo, s3errors.GetAPIError(s3errors.ErrAccessControlListNotSupported))
return
Expand Down Expand Up @@ -1698,9 +1696,9 @@ func bucketACLObjectWriterRecord() *eacl.Record {
return markerRecord
}

func checkACLRestrictions(table *eacl.Table) error {
func isBucketOwnerForced(table *eacl.Table) bool {
if table == nil {
return nil
return false
}

for _, r := range table.Records() {
Expand All @@ -1710,11 +1708,11 @@ func checkACLRestrictions(table *eacl.Table) error {
f.Value() == amzBucketOwnerEnforced &&
f.From() == eacl.HeaderFromRequest &&
f.Matcher() == eacl.MatchStringNotEqual {
return errBucketOwnerEnforced
return true
}
}
}
}

return nil
return false
}
67 changes: 0 additions & 67 deletions api/handler/acl_provider.go

This file was deleted.

20 changes: 9 additions & 11 deletions api/handler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ import (

type (
handler struct {
log *zap.Logger
obj layer.Client
notificator Notificator
cfg *Config
cachedACLGetter ACLStateProvider
log *zap.Logger
obj layer.Client
notificator Notificator
cfg *Config
}

Notificator interface {
Expand Down Expand Up @@ -66,7 +65,7 @@ const (
var _ api.Handler = (*handler)(nil)

// New creates new api.Handler using given logger and client.
func New(log *zap.Logger, obj layer.Client, notificator Notificator, cfg *Config, cachedACLGetter ACLStateProvider) (api.Handler, error) {
func New(log *zap.Logger, obj layer.Client, notificator Notificator, cfg *Config) (api.Handler, error) {
switch {
case obj == nil:
return nil, errors.New("empty NeoFS Object Layer")
Expand All @@ -81,10 +80,9 @@ func New(log *zap.Logger, obj layer.Client, notificator Notificator, cfg *Config
}

return &handler{
log: log,
obj: obj,
cfg: cfg,
notificator: notificator,
cachedACLGetter: cachedACLGetter,
log: log,
obj: obj,
cfg: cfg,
notificator: notificator,
}, nil
}
6 changes: 3 additions & 3 deletions api/handler/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,13 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
}

if containsACL {
state, err := h.cachedACLGetter.GetState(r.Context(), dstBktInfo.CID)
eacl, err := h.obj.GetBucketACL(r.Context(), dstBktInfo)
if err != nil {
h.logAndSendError(w, "could not get bucket acl state", reqInfo, err)
h.logAndSendError(w, "could not get bucket eacl", reqInfo, err)
return
}

if state == data.BucketACLBucketOwnerEnforced {
if isBucketOwnerForced(eacl.EACL) {
if !isValidOwnerEnforced(r) {
h.logAndSendError(w, "access control list not supported", reqInfo, s3errors.GetAPIError(s3errors.ErrAccessControlListNotSupported))
return
Expand Down
21 changes: 2 additions & 19 deletions api/handler/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,9 @@ import (

"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neofs-s3-gw/api"
"github.com/nspcc-dev/neofs-s3-gw/api/cache"
"github.com/nspcc-dev/neofs-s3-gw/api/data"
"github.com/nspcc-dev/neofs-s3-gw/api/layer"
"github.com/nspcc-dev/neofs-sdk-go/client"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/eacl"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
Expand All @@ -37,19 +34,6 @@ func (r *contResolver) ResolveCID(_ context.Context, name string) (cid.ID, error
return r.layer.ContainerID(name)
}

type eaclGetter struct {
neofs *layer.TestNeoFS
}

func (e *eaclGetter) ContainerEACL(ctx context.Context, id cid.ID, _ client.PrmContainerEACL) (eacl.Table, error) {
var t eacl.Table
tp, err := e.neofs.ContainerEACL(ctx, id)
if tp != nil {
t = *tp
}
return t, err
}

type handlerContext struct {
owner user.ID
t *testing.T
Expand Down Expand Up @@ -115,9 +99,8 @@ func prepareHandlerContext(t *testing.T) *handlerContext {
require.NoError(t, err)

h := &handler{
log: l,
cachedACLGetter: NewACLCachedProvider(l, &eaclGetter{tp}, cache.NewEACLCache(&cache.Config{Size: 1, Lifetime: 0, Logger: l})),
obj: layer.NewLayer(l, tp, layerCfg),
log: l,
obj: layer.NewLayer(l, tp, layerCfg),
cfg: &Config{
Policy: &placementPolicyMock{defaultPolicy: pp},
},
Expand Down
6 changes: 3 additions & 3 deletions api/handler/multipart_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,13 @@ func (h *handler) CreateMultipartUploadHandler(w http.ResponseWriter, r *http.Re
}

if containsACLHeaders(r) {
state, err := h.cachedACLGetter.GetState(r.Context(), bktInfo.CID)
eacl, err := h.obj.GetBucketACL(r.Context(), bktInfo)
if err != nil {
h.logAndSendError(w, "could not get bucket acl state", reqInfo, err)
h.logAndSendError(w, "could not get bucket eacl", reqInfo, err)
return
}

if state == data.BucketACLBucketOwnerEnforced {
if isBucketOwnerForced(eacl.EACL) {
if !isValidOwnerEnforced(r) {
h.logAndSendError(w, "access control list not supported", reqInfo, s3errors.GetAPIError(s3errors.ErrAccessControlListNotSupported))
return
Expand Down
12 changes: 6 additions & 6 deletions api/handler/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,13 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
}

if containsACL {
state, err := h.cachedACLGetter.GetState(r.Context(), bktInfo.CID)
eacl, err := h.obj.GetBucketACL(r.Context(), bktInfo)
if err != nil {
h.logAndSendError(w, "could not get bucket acl state", reqInfo, err)
h.logAndSendError(w, "could not get bucket eacl", reqInfo, err)
return
}

if state == data.BucketACLBucketOwnerEnforced {
if isBucketOwnerForced(eacl.EACL) {
if !isValidOwnerEnforced(r) {
h.logAndSendError(w, "access control list not supported", reqInfo, s3errors.GetAPIError(s3errors.ErrAccessControlListNotSupported))
return
Expand Down Expand Up @@ -459,13 +459,13 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) {
}

if containsACL {
state, err := h.cachedACLGetter.GetState(r.Context(), bktInfo.CID)
eacl, err := h.obj.GetBucketACL(r.Context(), bktInfo)
if err != nil {
h.logAndSendError(w, "could not get bucket acl state", reqInfo, err)
h.logAndSendError(w, "could not get bucket eacl", reqInfo, err)
return
}

if state == data.BucketACLBucketOwnerEnforced {
if isBucketOwnerForced(eacl.EACL) {
if !isValidOwnerEnforced(r) {
h.logAndSendError(w, "access control list not supported", reqInfo, s3errors.GetAPIError(s3errors.ErrAccessControlListNotSupported))
return
Expand Down
Loading

0 comments on commit 8008392

Please sign in to comment.