Skip to content

Commit

Permalink
feat: Use fsnotify to detect model/policy files change in casbin plug…
Browse files Browse the repository at this point in the history
…in (#614)

To fix #556
  • Loading branch information
lyt122 authored Jul 17, 2024
1 parent 5c681d1 commit f9cec7a
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 140 deletions.
1 change: 1 addition & 0 deletions plugins/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ require (
github.com/casbin/casbin/v2 v2.88.0
github.com/coreos/go-oidc/v3 v3.10.0
github.com/envoyproxy/envoy v1.29.4
github.com/fsnotify/fsnotify v1.7.0
github.com/google/cel-go v0.20.1
github.com/google/uuid v1.6.0
github.com/gorilla/securecookie v1.1.2
Expand Down
2 changes: 2 additions & 0 deletions plugins/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/foxcpp/go-mockdns v1.1.0 h1:jI0rD8M0wuYAxL7r/ynTrCQQq0BVqfB99Vgk7DlmewI=
github.com/foxcpp/go-mockdns v1.1.0/go.mod h1:IhLeSFGed3mJIAXPH2aiRQB+kqz7oqu8ld2qVbOu7Wk=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A=
github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
github.com/go-jose/go-jose/v4 v4.0.1 h1:QVEPDE3OluqXBQZDcnNvQrInro2h0e4eqNbnZSWqS6U=
Expand Down
158 changes: 67 additions & 91 deletions plugins/pkg/file/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
package file

import (
"os"
"path/filepath"
"sync"
"time"

"github.com/jellydator/ttlcache/v3"
"github.com/fsnotify/fsnotify"

"mosn.io/htnn/api/pkg/log"
)
Expand All @@ -29,110 +28,87 @@ var (
)

type File struct {
lock sync.RWMutex

Name string
mtime time.Time
}

func (f *File) Mtime() time.Time {
f.lock.RLock()
defer f.lock.RUnlock()
// the returned time.Time should be readonly
return f.mtime
}

func (f *File) SetMtime(t time.Time) {
f.lock.Lock()
f.mtime = t
f.lock.Unlock()
Name string
}

type fs struct {
cache *ttlcache.Cache[string, os.FileInfo]
type Watcher struct {
watcher *fsnotify.Watcher
files map[string]bool
mu sync.Mutex
dir map[string]bool
done chan struct{}
}

func newFS(ttl time.Duration) *fs {
loader := ttlcache.LoaderFunc[string, os.FileInfo](
func(c *ttlcache.Cache[string, os.FileInfo], key string) *ttlcache.Item[string, os.FileInfo] {
info, err := os.Stat(key)
if err != nil {
logger.Error(err, "reload file info to cache", "file", key)
return nil
}
item := c.Set(key, info, ttlcache.DefaultTTL)
logger.Info("update file mtime", "file", key, "mtime", item.Value().ModTime())
return item
},
)
cache := ttlcache.New(
ttlcache.WithTTL[string, os.FileInfo](ttl),
ttlcache.WithLoader[string, os.FileInfo](loader),
)
go cache.Start()

return &fs{
cache: cache,
func NewWatcher() (*Watcher, error) {
w, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
return &Watcher{
watcher: w,
files: make(map[string]bool),
done: make(chan struct{}),
dir: make(map[string]bool),
}, nil
}

var (
// TODO: rewrite it to use inotify
defaultFs = newFS(10 * time.Second)
)

func IsChanged(files ...*File) bool {
func (w *Watcher) AddFiles(files ...*File) error {
w.mu.Lock()
defer w.mu.Unlock()
for _, file := range files {
changed := defaultFs.isChanged(file)
if changed {
return true
absPath, err := filepath.Abs(file.Name)
if err != nil {
return err
}
if _, exists := w.files[absPath]; !exists {
w.files[absPath] = true
}
}
return false
}

func (f *fs) isChanged(file *File) bool {
item := f.cache.Get(file.Name)
if item == nil {
// As a protection, failed to fetch the real file means file not changed
return false
}

return file.Mtime().Before(item.Value().ModTime())
}

func (f *fs) Stat(path string) (*File, error) {
info, err := os.Stat(path)
if err != nil {
return nil, err
dir := filepath.Dir(absPath)
if _, exists := w.dir[dir]; !exists {
if err := w.watcher.Add(dir); err != nil {
return err
}
w.dir[dir] = true
}
}
f.cache.Set(path, info, ttlcache.DefaultTTL)

return &File{
Name: path,
mtime: info.ModTime(),
}, nil
return nil
}

func Stat(path string) (*File, error) {
return defaultFs.Stat(path)
func (w *Watcher) Start(onChanged func()) {
go func() {
logger.Info("start watching files")
for {
select {
case event := <-w.watcher.Events:
if event.Op == fsnotify.Chmod {
continue // Skip chmod event
}
absPath, err := filepath.Abs(event.Name)
if err != nil {
logger.Error(err, "get file absPath failed")
}
if _, exists := w.files[absPath]; exists {
logger.Info("file changed: ", "event", event)
onChanged()
}
case err := <-w.watcher.Errors:
logger.Error(err, "error watching files")
case <-w.done:
return
}
}
}()
}

func Update(files ...*File) bool {
for _, file := range files {
if !defaultFs.update(file) {
return false
}
}
return true
func (w *Watcher) Stop() error {
logger.Info("stop watcher")
close(w.done)
return w.watcher.Close()
}

func (f *fs) update(file *File) bool {
item := f.cache.Get(file.Name)
if item == nil {
return false
func Stat(file string) *File {
return &File{
Name: file,
}

file.SetMtime(item.Value().ModTime())
return true
}
44 changes: 31 additions & 13 deletions plugins/pkg/file/fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,46 @@ package file

import (
"os"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestFileMtimeDetection(t *testing.T) {
defaultFs = newFS(2000 * time.Millisecond)
func TestFileIsChanged(t *testing.T) {
changed := false
wg := sync.WaitGroup{}
once := sync.Once{}

watcher, err := NewWatcher()

assert.Nil(t, err)

tmpfile, _ := os.CreateTemp("", "example")
defer os.Remove(tmpfile.Name()) // clean up

f, err := Stat(tmpfile.Name())
file := Stat(tmpfile.Name())

assert.Equal(t, tmpfile.Name(), file.Name)

err = watcher.AddFiles(file)
assert.Nil(t, err)
wg.Add(1)
watcher.Start(func() {
once.Do(func() {
changed = true
wg.Done()
})
})
assert.Nil(t, err)
assert.False(t, IsChanged(f))
time.Sleep(1000 * time.Millisecond)
tmpfile.Write([]byte("bls"))
tmpfile.Close()
assert.False(t, IsChanged(f))
tmpfile.Sync()

wg.Wait()
assert.True(t, changed)

time.Sleep(2500 * time.Millisecond)
assert.True(t, IsChanged(f))
assert.True(t, Update(f))
assert.False(t, IsChanged(f))
err = os.Remove(tmpfile.Name())
assert.Nil(t, err)

err = watcher.Stop()
assert.Nil(t, err)
}
57 changes: 51 additions & 6 deletions plugins/plugins/casbin/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package casbin

import (
"runtime"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -51,27 +52,71 @@ type config struct {
modelFile *file.File
policyFile *file.File
updating atomic.Bool

watcher *file.Watcher
}

func (conf *config) Init(cb api.ConfigCallbackHandler) error {
conf.lock = &sync.RWMutex{}

f, err := file.Stat(conf.Rule.Model)
f := file.Stat(conf.Rule.Model)

conf.modelFile = f

f = file.Stat(conf.Rule.Policy)

conf.policyFile = f

e, err := casbin.NewEnforcer(conf.Rule.Model, conf.Rule.Policy)
if err != nil {
return err
}
conf.modelFile = f
conf.enforcer = e

f, err = file.Stat(conf.Rule.Policy)
watcher, err := file.NewWatcher()
if err != nil {
return err
}
conf.policyFile = f

e, err := casbin.NewEnforcer(conf.Rule.Model, conf.Rule.Policy)
conf.watcher = watcher

err = conf.watcher.AddFiles(conf.modelFile, conf.policyFile)
if err != nil {
return err
}
conf.enforcer = e

conf.watcher.Start(conf.reloadEnforcer)

runtime.SetFinalizer(conf, func(conf *config) {
err := conf.watcher.Stop()
if err != nil {
api.LogErrorf("failed to stop watcher, err: %v", err)
}
})
return nil
}

func (conf *config) reloadEnforcer() {
if !conf.updating.Load() {
conf.updating.Store(true)
api.LogWarnf("policy %s or model %s changed, reload enforcer", conf.policyFile.Name, conf.modelFile.Name)

go func() {
defer func() {
if r := recover(); r != nil {
api.LogErrorf("recovered from panic: %v", r)
}
conf.updating.Store(false)
}()
e, err := casbin.NewEnforcer(conf.Rule.Model, conf.Rule.Policy)
if err != nil {
api.LogErrorf("failed to update Enforcer: %v", err)
} else {
conf.lock.Lock()
conf.enforcer = e
conf.lock.Unlock()
api.LogWarnf("policy %s or model %s changed, enforcer reloaded", conf.policyFile.Name, conf.modelFile.Name)
}
}()
}
}
Loading

0 comments on commit f9cec7a

Please sign in to comment.