Skip to content

Commit

Permalink
Disable jetstream if filesystem permission denied detected during write
Browse files Browse the repository at this point in the history
Signed-off-by: Sourabh Agrawal <[email protected]>
  • Loading branch information
souravagrawal committed Dec 24, 2024
1 parent 0e11988 commit c162327
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 6 deletions.
15 changes: 10 additions & 5 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2039,6 +2039,7 @@ func (fs *fileStore) expireMsgsOnRecover() error {
bytes += mb.bytes
err := deleteEmptyBlock(mb)
if err != nil && err == errFileSystemPermissionDenied && fs.fcfg.JetStreamDisableOnDiskError{
mb.mu.Unlock()
return err
}
mb.mu.Unlock()
Expand Down Expand Up @@ -8365,7 +8366,15 @@ func (fs *fileStore) flushStreamStateLoop(qch, done chan struct{}) {
for {
select {
case <-t.C:
fs.writeFullState()
err := fs.writeFullState()
if err != nil && os.IsPermission(err) && fs.fcfg.JetStreamDisableOnDiskError {
fs.warn("file system permission denied when flushing stream state, disabling jetstream %v", err)
// messages in block cache could be lost in the worst case.
// In the clustered mode it is very highly unlikely as a result of replication.
fs.srv.DisableJetStream()
return
}

case <-qch:
return
}
Expand Down Expand Up @@ -8575,10 +8584,6 @@ func (fs *fileStore) _writeFullState(force bool) error {
err := os.WriteFile(fn, buf, defaultFilePerms)
// if file system is not writable os.IsPermission is set to true
if err != nil && os.IsPermission(err) && fs.fcfg.JetStreamDisableOnDiskError {
fs.warn("file system permission denied when flushing stream state, disabling jetstream %v", err)
// messages in block cache could be lost in the worst case.
// In the clustered mode it is very highly unlikely as a result of replication.
fs.srv.DisableJetStream()
return err
}
dios <- struct{}{}
Expand Down
116 changes: 115 additions & 1 deletion server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"errors"
"fmt"
"io"
"io/fs"
"math/bits"
"math/rand"
"os"
Expand Down Expand Up @@ -144,9 +145,9 @@ func TestFileStoreBasics(t *testing.T) {
func TestFileStoreMsgHeaders(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
fs, err := newFileStoreWithCreated(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage}, time.Now(), prf(&fcfg), nil)

require_NoError(t, err)
defer fs.Stop()

subj, hdr, msg := "foo", []byte("name:derek"), []byte("Hello World")
elen := 22 + len(subj) + 4 + len(hdr) + len(msg) + 8
if sz := int(fileStoreMsgSize(subj, hdr, msg)); sz != elen {
Expand Down Expand Up @@ -8296,3 +8297,116 @@ func TestFileStoreNumPendingMulti(t *testing.T) {
}
require_Equal(t, total, checkTotal)
}

func TestFileStoreExpireMsgsOnRecoverFailIfFSModeReadOnly(t *testing.T) {
cfg := StreamConfig{Name: "zzz", Subjects: []string{"ev.*"}, Storage: FileStorage, MaxAge: 500 * time.Millisecond}
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir(), JetStreamDisableOnDiskError: true},
cfg)
require_NoError(t, err)
defer fs.Stop()

totalMsgs := 100_000
totalSubjects := 10_000

subjects := make([]string, 0, totalSubjects)
for i := 0; i < totalSubjects; i++ {
subjects = append(subjects, fmt.Sprintf("ev.%s", nuid.Next()))
}

// Put in 100k msgs with random subjects.
msg := bytes.Repeat([]byte("ZZZ"), 333)
for i := 0; i < totalMsgs; i++ {
_, _, err = fs.StoreMsg(subjects[rand.Intn(totalSubjects)], nil, msg)
require_NoError(t, err)
}
time.Sleep(1 * time.Second)

directory := fs.fcfg.StoreDir
ORIGINAL_FILE_MODE , _ := os.Stat(directory)
READONLY_MODE := os.FileMode(0o555)
changeDirectoryPermission(directory, READONLY_MODE)
err = fs.expireMsgsOnRecover()
changeDirectoryPermission(directory, ORIGINAL_FILE_MODE.Mode())
require_Error(t, err, errFileSystemPermissionDenied)

}


func TestStoreRawMessageThrowsPermissionErrorIfFSModeReadOnly(t *testing.T) {
cfg := StreamConfig{Name: "zzz", Subjects: []string{"ev.1"}, Storage: FileStorage, MaxAge: 500 * time.Millisecond}
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir(), JetStreamDisableOnDiskError: true},
cfg)

require_NoError(t, err)
defer fs.Stop()

msg := bytes.Repeat([]byte("Z"), 1024)
directory := fs.fcfg.StoreDir
ORIGINAL_FILE_MODE , _ := os.Stat(directory)
READONLY_MODE := os.FileMode(0o555)
changeDirectoryPermission(directory, READONLY_MODE)
require_NoError(t, err)
totalMsgs := 10000
i:=0
for i = 0; i < totalMsgs; i++ {
_, _, err = fs.StoreMsg("ev.1", nil, msg)
if err != nil {
break;
}
}
changeDirectoryPermission(directory, ORIGINAL_FILE_MODE.Mode())
require_Error(t, err, os.ErrPermission)
}

func TestWriteFullStateThrowsPermissionErrorIfFSModeReadOnly(t *testing.T) {
cfg := StreamConfig{Name: "zzz", Subjects: []string{"ev.1"}, Storage: FileStorage, MaxAge: 500 * time.Millisecond}
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir(), JetStreamDisableOnDiskError: true},
cfg)

require_NoError(t, err)
defer fs.Stop()

msg := bytes.Repeat([]byte("Z"), 1024)
directory := fs.fcfg.StoreDir
ORIGINAL_FILE_MODE , _ := os.Stat(directory)
READONLY_MODE := os.FileMode(0o555)
require_NoError(t, err)
totalMsgs := 10000
i:=0
for i = 0; i < totalMsgs; i++ {
_, _, err = fs.StoreMsg("ev.1", nil, msg)
if err != nil {
break;
}
}
changeDirectoryPermission(directory, READONLY_MODE)
err = fs.writeFullState()
changeDirectoryPermission(directory, ORIGINAL_FILE_MODE.Mode())
require_Error(t, err, os.ErrPermission)
}

func changeDirectoryPermission(directory string, mode fs.FileMode) error {
err := filepath.Walk(directory, func(path string, info os.FileInfo, err error) error {
if err != nil {
return fmt.Errorf("error accessing path %q: %w", path, err)
}

// Check if the path is a directory or file and set permissions accordingly
if info.IsDir() {
err = os.Chmod(path, mode)
if err != nil {
return fmt.Errorf("error changing directory permissions for %q: %w", path, err)
}
} else {
err = os.Chmod(path, mode)
if err != nil {
return fmt.Errorf("error changing file permissions for %q: %w", path, err)
}
}
return nil
})
return err
}

0 comments on commit c162327

Please sign in to comment.