Skip to content

Commit

Permalink
Disable jetstream if disk permission error while writing raft state
Browse files Browse the repository at this point in the history
  • Loading branch information
souravagrawal committed Jan 20, 2025
1 parent 068bd41 commit 491222c
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 23 deletions.
32 changes: 13 additions & 19 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,11 +518,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
// Do age checks too, make sure to call in place.
if fs.cfg.MaxAge != 0 {
err := fs.expireMsgsOnRecover()
if err != nil && err == errFileSystemPermissionDenied {
fs.srv.Warnf("file system permission denied while expiring msgs, disabling jetstream: %v", err)
// messages in block cache could be lost in the worst case.
// In the clustered mode it is very highly unlikely as a result of replication.
fs.srv.DisableJetStream()
if isPermissionError(err) {
return nil, err
}
fs.startAgeChk()
Expand Down Expand Up @@ -2128,7 +2124,7 @@ func (fs *fileStore) expireMsgsOnRecover() error {
return true
})
err := mb.dirtyCloseWithRemove(true)
if err != nil && err == errFileSystemPermissionDenied {
if isPermissionError(err) {
return err
}
deleted++
Expand All @@ -2151,11 +2147,10 @@ func (fs *fileStore) expireMsgsOnRecover() error {
purged += mb.msgs
bytes += mb.bytes
err := deleteEmptyBlock(mb)
if err != nil && err == errFileSystemPermissionDenied {
mb.mu.Unlock()
mb.mu.Unlock()
if isPermissionError(err) {
return err
}
mb.mu.Unlock()
continue
}

Expand Down Expand Up @@ -3725,7 +3720,7 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
dios <- struct{}{}

if err != nil {
if os.IsPermission(err) {
if isPermissionError(err) {
return nil, err
}
mb.dirtyCloseWithRemove(true)
Expand Down Expand Up @@ -6716,7 +6711,6 @@ var (
errNoMainKey = errors.New("encrypted store encountered with no main key")
errNoBlkData = errors.New("message block data missing")
errStateTooBig = errors.New("store state too big for optional write")
errFileSystemPermissionDenied = errors.New("storage directory not writeable")
)

const (
Expand Down Expand Up @@ -8208,15 +8202,15 @@ func (mb *msgBlock) dirtyCloseWithRemove(remove bool) error {
mb.fss = nil
if mb.mfn != _EMPTY_ {
err := os.Remove(mb.mfn)
if err != nil && os.IsPermission(err){
return errFileSystemPermissionDenied
if isPermissionError(err) {
return err
}
mb.mfn = _EMPTY_
}
if mb.kfn != _EMPTY_ {
err := os.Remove(mb.kfn)
if err != nil && os.IsPermission(err){
return errFileSystemPermissionDenied
if isPermissionError(err) {
return err
}
}
}
Expand Down Expand Up @@ -8638,8 +8632,8 @@ func (fs *fileStore) flushStreamStateLoop(qch, done chan struct{}) {
select {
case <-t.C:
err := fs.writeFullState()
if err != nil && os.IsPermission(err) {
fs.warn("file system permission denied when flushing stream state, disabling jetstream %v", err)
if isPermissionError(err) && fs.srv != nil {
fs.warn("File system permission denied when flushing stream state, disabling JetStream: %v", err)
// messages in block cache could be lost in the worst case.
// In the clustered mode it is very highly unlikely as a result of replication.
fs.srv.DisableJetStream()
Expand Down Expand Up @@ -8854,8 +8848,8 @@ func (fs *fileStore) _writeFullState(force bool) error {
// Protect with dios.
<-dios
err := os.WriteFile(fn, buf, defaultFilePerms)
// if file system is not writable os.IsPermission is set to true
if err != nil && os.IsPermission(err) {
// if file system is not writable isPermissionError is set to true
if isPermissionError(err) {
return err
}
dios <- struct{}{}
Expand Down
4 changes: 2 additions & 2 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8687,7 +8687,7 @@ func TestFileStoreSubjectDeleteMarkersOnRestart(t *testing.T) {
require_Equal(t, bytesToString(getHeader(JSMessageTTL, sm.hdr)), "1s")
}

func TestStoreRawMessageThrowsPermissionErrorIfFSModeReadOnly(t *testing.T) {
func TestFileStoreStoreRawMessageThrowsPermissionErrorIfFSModeReadOnly(t *testing.T) {
cfg := StreamConfig{Name: "zzz", Subjects: []string{"ev.1"}, Storage: FileStorage, MaxAge: 500 * time.Millisecond}
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir()},
Expand All @@ -8714,7 +8714,7 @@ func TestStoreRawMessageThrowsPermissionErrorIfFSModeReadOnly(t *testing.T) {
require_Error(t, err, os.ErrPermission)
}

func TestWriteFullStateThrowsPermissionErrorIfFSModeReadOnly(t *testing.T) {
func TestFileStoreWriteFullStateThrowsPermissionErrorIfFSModeReadOnly(t *testing.T) {
cfg := StreamConfig{Name: "zzz", Subjects: []string{"ev.1"}, Storage: FileStorage, MaxAge: 500 * time.Millisecond}
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir()},
Expand Down
11 changes: 11 additions & 0 deletions server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3034,3 +3034,14 @@ func fixCfgMirrorWithDedupWindow(cfg *StreamConfig) {
cfg.Duplicates = 0
}
}

func (s *Server) handleWritePermissionError() {
//TODO Check if we should add s.jetStreamOOSPending in condition
if s.JetStreamEnabled() {
s.Errorf("File system permission denied while writing, disabling JetStream")

go s.DisableJetStream()

//TODO Send respective advisory if needed, same as in handleOutOfSpace
}
}
4 changes: 4 additions & 0 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -3978,6 +3978,10 @@ func (n *raft) setWriteErrLocked(err error) {
n.error("Critical write error: %v", err)
n.werr = err

if isPermissionError(err) {
go n.s.handleWritePermissionError()
}

if isOutOfSpaceErr(err) {
// For now since this can be happening all under the covers, we will call up and disable JetStream.
go n.s.handleOutOfSpace(nil)
Expand Down
5 changes: 5 additions & 0 deletions server/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"errors"
"fmt"
"io"
"os"
"strings"
"time"
"unsafe"
Expand Down Expand Up @@ -788,3 +789,7 @@ func copyString(s string) string {
copy(b, s)
return bytesToString(b)
}

func isPermissionError(err error) bool {
return err != nil && os.IsPermission(err)
}
4 changes: 2 additions & 2 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -5143,12 +5143,12 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
}

if err != nil {
if os.IsPermission(err){
if isPermissionError(err) {
mset.mu.Unlock()
// messages in block cache could be lost in the worst case.
// In the clustered mode it is very highly unlikely as a result of replication.
mset.srv.DisableJetStream()
mset.srv.Warnf("file system permission denied while writing msg, disabling jetstream: %v", err)
mset.srv.Warnf("File system permission denied while writing msg, disabling JetStream: %v", err)
return err
}
// If we did not succeed put those values back and increment clfs in case we are clustered.
Expand Down

0 comments on commit 491222c

Please sign in to comment.