Skip to content

Commit

Permalink
Merge pull request #35 from apelisse/temp-dir
Browse files Browse the repository at this point in the history
Create a new "TempDir" option for atomic write
  • Loading branch information
peterbourgon authored Aug 14, 2017
2 parents ed3b67a + 40ecb11 commit 5f041e8
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 12 deletions.
59 changes: 58 additions & 1 deletion basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (BrokenReader) Read(p []byte) (n int, err error) {
return 0, errors.New("failed to read")
}

func TestAtomicWrite(t *testing.T) {
func TestRemovesIncompleteFiles(t *testing.T) {
opts := Options{
BasePath: "test-data",
CacheSizeMax: 1024,
Expand All @@ -277,3 +277,60 @@ func TestAtomicWrite(t *testing.T) {
t.Fatal("Could read the key, but it shouldn't exist")
}
}

func TestTempDir(t *testing.T) {
opts := Options{
BasePath: "test-data",
TempDir: "test-data-temp",
CacheSizeMax: 1024,
}
d := New(opts)
defer d.EraseAll()

k, v := "a", []byte{'b'}
if err := d.Write(k, v); err != nil {
t.Fatalf("write: %s", err)
}
if readVal, err := d.Read(k); err != nil {
t.Fatalf("read: %s", err)
} else if bytes.Compare(v, readVal) != 0 {
t.Fatalf("read: expected %s, got %s", v, readVal)
}
if err := d.Erase(k); err != nil {
t.Fatalf("erase: %s", err)
}
}

type CrashingReader struct{}

func (CrashingReader) Read(p []byte) (n int, err error) {
panic("System has crashed while reading the stream")
}

func TestAtomicWrite(t *testing.T) {
opts := Options{
BasePath: "test-data",
// Test would fail if TempDir is not set here.
TempDir: "test-data-temp",
CacheSizeMax: 1024,
}
d := New(opts)
defer d.EraseAll()

key := "key"
func() {
defer func() {
recover() // Ignore panicking error
}()

stream := CrashingReader{}
d.WriteStream(key, stream, false)
}()

if d.Has(key) {
t.Fatal("Has key, but it shouldn't exist")
}
if _, ok := <-d.Keys(nil); ok {
t.Fatal("Store isn't empty")
}
}
55 changes: 44 additions & 11 deletions diskv.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ type Options struct {
CacheSizeMax uint64 // bytes
PathPerm os.FileMode
FilePerm os.FileMode
// If TempDir is set, it will enable filesystem atomic writes by
// writing temporary files to that location before being moved
// to BasePath.
// Note that TempDir MUST be on the same device/partition as
// BasePath.
TempDir string

Index Index
IndexLess LessFunction
Expand Down Expand Up @@ -115,21 +121,43 @@ func (d *Diskv) WriteStream(key string, r io.Reader, sync bool) error {
return d.writeStreamWithLock(key, r, sync)
}

// createKeyFileWithLock either creates the key file directly, or
// creates a temporary file in TempDir if it is set.
func (d *Diskv) createKeyFileWithLock(key string) (*os.File, error) {
if d.TempDir != "" {
if err := os.MkdirAll(d.TempDir, d.PathPerm); err != nil {
return nil, fmt.Errorf("temp mkdir: %s", err)
}
f, err := ioutil.TempFile(d.TempDir, "")
if err != nil {
return nil, fmt.Errorf("temp file: %s", err)
}

if err := f.Chmod(d.FilePerm); err != nil {
f.Close() // error deliberately ignored
os.Remove(f.Name()) // error deliberately ignored
return nil, fmt.Errorf("chmod: %s", err)
}
return f, nil
}

mode := os.O_WRONLY | os.O_CREATE | os.O_TRUNC // overwrite if exists
f, err := os.OpenFile(d.completeFilename(key), mode, d.FilePerm)
if err != nil {
return nil, fmt.Errorf("open file: %s", err)
}
return f, nil
}

// writeStream does no input validation checking.
func (d *Diskv) writeStreamWithLock(key string, r io.Reader, sync bool) error {
if err := d.ensurePathWithLock(key); err != nil {
return fmt.Errorf("ensure path: %s", err)
}

f, err := ioutil.TempFile(d.pathFor(key), fmt.Sprintf("temp-%s-", key))
f, err := d.createKeyFileWithLock(key)
if err != nil {
return fmt.Errorf("temp file: %s", err)
}

if err := f.Chmod(d.FilePerm); err != nil {
f.Close() // error deliberately ignored
os.Remove(f.Name()) // error deliberately ignored
return fmt.Errorf("chmod: %s", err)
return fmt.Errorf("create key file: %s", err)
}

wc := io.WriteCloser(&nopWriteCloser{f})
Expand Down Expand Up @@ -166,9 +194,11 @@ func (d *Diskv) writeStreamWithLock(key string, r io.Reader, sync bool) error {
return fmt.Errorf("file close: %s", err)
}

if err := os.Rename(f.Name(), d.completeFilename(key)); err != nil {
os.Remove(f.Name()) // error deliberately ignored
return fmt.Errorf("rename: %s", err)
if f.Name() != d.completeFilename(key) {
if err := os.Rename(f.Name(), d.completeFilename(key)); err != nil {
os.Remove(f.Name()) // error deliberately ignored
return fmt.Errorf("rename: %s", err)
}
}

if d.Index != nil {
Expand Down Expand Up @@ -403,6 +433,9 @@ func (d *Diskv) EraseAll() error {
defer d.mu.Unlock()
d.cache = make(map[string][]byte)
d.cacheSize = 0
if d.TempDir != "" {
os.RemoveAll(d.TempDir) // errors ignored
}
return os.RemoveAll(d.BasePath)
}

Expand Down

0 comments on commit 5f041e8

Please sign in to comment.