diff --git a/chunk.go b/chunk.go index 4f68bf1..410a9b8 100644 --- a/chunk.go +++ b/chunk.go @@ -5,9 +5,20 @@ import ( "encoding/binary" "errors" "fmt" - "io" "os" "sync" + "time" +) + +const ( + chunkVersion = 1 + versionMarker = 255 + deleted = 42 // flag for removed, tribute 2 dbf +) + +var ( + sizeHeaders = map[int]uint32{0: 8, 1: 12} + sizeHead = sizeHeaders[chunkVersion] ) // chunk - local shard @@ -24,35 +35,158 @@ type addrSize struct { size byte } -func packetMarshal(k, v []byte) (sizeb byte, b []byte) { +type Header struct { + sizeb uint8 + status uint8 + keylen uint16 + vallen uint32 + expire uint32 +} + +// https://github.com/thejerf/gomempool/blob/master/pool.go#L519 +// http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2 +// suitably modified to work on 32-bit +func nextPowerOf2(v uint32) uint32 { + v-- + v |= v >> 1 + v |= v >> 2 + v |= v >> 4 + v |= v >> 8 + v |= v >> 16 + v++ + + return v +} + +// NextPowerOf2 return next power of 2 for v and it's value +// return maxuint32 in case of overflow +func NextPowerOf2(v uint32) (power byte, val uint32) { + if v == 0 { + return 0, 0 + } + for power = 0; power < 32; power++ { + val = 1 << power + if val >= v { + break + } + } + if power == 32 { + //overflow + val = 4294967295 + } + return +} + +func detectChunkVersion(file *os.File) (version int, err error) { + b := make([]byte, 2) + n, errRead := file.Read(b) + if errRead != nil { + return -1, errRead + } + if n != 2 { + return -1, errors.New("File too short") + } + + // 255 version marker + if b[0] == versionMarker { + if b[1] == 0 || b[1] == deleted { + // first version + return 0, nil + } + return int(b[1]), nil + } + if b[1] == 0 || b[1] == deleted { + // first version + return 0, nil + } + return -1, nil +} + +func makeHeader(k, v []byte, expire uint32) (header *Header) { + header = &Header{} + header.status = 0 + header.keylen = uint16(len(k)) + header.vallen = uint32(len(v)) + header.expire = expire + sizeb, _ := NextPowerOf2(uint32(header.keylen) + header.vallen + sizeHead) + header.sizeb = sizeb + return +} + +func parseHeaderV0(b []byte) (header *Header) { + header = &Header{} + header.sizeb = b[0] + header.status = b[1] + header.keylen = binary.BigEndian.Uint16(b[2:4]) + header.vallen = binary.BigEndian.Uint32(b[4:8]) + return +} + +func parseHeader(b []byte) (header *Header) { + header = &Header{} + header.sizeb = b[0] + header.status = b[1] + header.keylen = binary.BigEndian.Uint16(b[2:4]) + header.vallen = binary.BigEndian.Uint32(b[4:8]) + header.expire = binary.BigEndian.Uint32(b[8:12]) + return +} + +func readHeader(file *os.File, version int) (header *Header, err error) { + b := make([]byte, sizeHeaders[version]) + n, errRead := file.Read(b) + if errRead != nil || n != int(sizeHeaders[version]) { + if errRead != nil && errRead.Error() != "EOF" { + err = errRead + } + return + } + switch version { + case 0: + header = parseHeaderV0(b) + case 1: + header = parseHeader(b) + default: + err = fmt.Errorf("Unknov header version %d", version) + } + return +} + +func writeHeader(b []byte, header *Header) { + b[0] = header.sizeb + b[1] = header.status + binary.BigEndian.PutUint16(b[2:4], header.keylen) + binary.BigEndian.PutUint32(b[4:8], header.vallen) + binary.BigEndian.PutUint32(b[8:12], header.expire) + return +} + +// pack addr & size to addrSize +func addrSizeMarshal(addr uint32, size byte) addrSize { + return addrSize{addr, size} +} + +// unpack addr & size +func addrSizeUnmarshal(as addrSize) (addr, size uint32) { + return as.addr, 1 << as.size +} + +func packetMarshal(k, v []byte, expire uint32) (header *Header, b []byte) { // write head - sizeb, size := NextPowerOf2(uint32(len(v) + len(k) + sizeHead)) + header = makeHeader(k, v, expire) + size := 1 << header.sizeb b = make([]byte, size) - b[0] = sizeb - b[1] = byte(0) - //len key in bigendian format - lenk := uint16(len(k)) - b[2] = byte(lenk >> 8) - b[3] = byte(lenk) - //len val in bigendian format - lenv := uint32(len(v)) - b[4] = byte(lenv >> 24) - b[5] = byte(lenv >> 16) - b[6] = byte(lenv >> 8) - b[7] = byte(lenv) + writeHeader(b, header) // write body: val and key copy(b[sizeHead:], v) - copy(b[sizeHead+lenv:], k) + copy(b[sizeHead+header.vallen:], k) return } -func packetUnmarshal(packet []byte) (k, v []byte, sizeb byte) { - _ = packet[7] - sizeb = packet[0] - lenk := binary.BigEndian.Uint16(packet[2:4]) - lenv := binary.BigEndian.Uint32(packet[4:8]) - k = packet[sizeHead+lenv : sizeHead+lenv+uint32(lenk)] - v = packet[sizeHead : sizeHead+lenv] +func packetUnmarshal(packet []byte) (header *Header, k, v []byte) { + header = parseHeader(packet) + k = packet[sizeHead+header.vallen : sizeHead+header.vallen+uint32(header.keylen)] + v = packet[sizeHead : sizeHead+header.vallen] return } @@ -73,49 +207,141 @@ func (c *chunk) init(name string) (err error) { c.h = make(map[uint32]byte) //read if f not empty if fi, e := c.f.Stat(); e == nil { + // new file if fi.Size() == 0 { + // write chunk version info + c.f.Write([]byte{versionMarker, chunkVersion}) return } + //read file var seek int - for { - b := make([]byte, 8) - n, errRead := c.f.Read(b) - if errRead != nil || n != 8 { - if errRead != nil && errRead.Error() != "EOF" { - err = errRead + // detect chunk version + version, errDetect := detectChunkVersion(c.f) + if errDetect != nil { + err = errDetect + return + } + + if version < 0 || version > 1 { + err = errors.New("Unknown chunk version in file " + name) + return + } + + if version == 0 { + // rewind to begin + c.f.Seek(0, 0) + } else { + // real chunk begin + seek = 2 + } + + // if load chunk with old version create file in new format + if version != chunkVersion { + var newfile *os.File + fmt.Printf("Load from old version chunk %s, do inplace upgrade v%d -> v%d\n", name, version, chunkVersion) + newname := name + ".new" + newfile, err = os.OpenFile(newname, os.O_CREATE|os.O_RDWR|os.O_TRUNC, os.FileMode(fileMode)) + if err != nil { + return + } + // write chunk version info + newfile.Write([]byte{versionMarker, chunkVersion}) + seek := 2 + oldsizehead := sizeHeaders[version] + sizediff := sizeHead - oldsizehead + for { + var header *Header + var errRead error + header, errRead = readHeader(c.f, version) + if errRead != nil { + newfile.Close() + return errRead + } + if header == nil { + break } + oldsizedata := (1 << header.sizeb) - oldsizehead + sizeb, size := NextPowerOf2(uint32(sizeHead) + uint32(header.keylen) + header.vallen) + header.sizeb = sizeb + b := make([]byte, size+sizediff) + writeHeader(b, header) + n, errRead := c.f.Read(b[sizeHead : sizeHead+oldsizedata]) + if errRead != nil { + return fmt.Errorf("%s: %w", errRead.Error(), ErrFormat) + } + if n != int(oldsizedata) { + return fmt.Errorf("n != record length: %w", ErrFormat) + } + + // skip deleted or expired entry + if header.status == deleted || (header.expire != 0 && int64(header.expire) < time.Now().Unix()) { + continue + } + keyidx := int(sizeHead) + int(header.vallen) + h := hash(b[keyidx : keyidx+int(header.keylen)]) + c.m[h] = addrSizeMarshal(uint32(seek), header.sizeb) + n, errRead = newfile.Write(b[0:size]) + if errRead != nil { + return fmt.Errorf("%s: %w", errRead.Error(), ErrFormat) + } + seek += n + } + // close old chunk file + errRead := c.f.Close() + if errRead != nil { + return fmt.Errorf("%s: %w", errRead.Error(), ErrFormat) + } + // set new file for chunk + c.f = newfile + // remove old chunk file from disk + errRead = os.Remove(name) + if errRead != nil { + return fmt.Errorf("%s: %w", errRead.Error(), ErrFormat) + } + // rename new file to old file + errRead = os.Rename(newname, name) + if errRead != nil { + return fmt.Errorf("%s: %w", errRead.Error(), ErrFormat) + } + return + } + + var n int + for { + header, errRead := readHeader(c.f, version) + if errRead != nil { + return fmt.Errorf("%s: %w", errRead.Error(), ErrFormat) + } + if header == nil { break } - //readed header - lenk := binary.BigEndian.Uint16(b[2:4]) - lenv := binary.BigEndian.Uint32(b[4:8]) // skip val - _, seekerr := c.f.Seek(int64(lenv), 1) + _, seekerr := c.f.Seek(int64(header.vallen), 1) if seekerr != nil { return fmt.Errorf("%s: %w", seekerr.Error(), ErrFormat) } // read key - key := make([]byte, lenk) + key := make([]byte, header.keylen) n, errRead = c.f.Read(key) - if errRead != nil || n != int(lenk) { - if errRead != nil { - return fmt.Errorf("%s: %w", errRead.Error(), ErrFormat) - } - return fmt.Errorf("n != int(lenk): %w", ErrFormat) + if errRead != nil { + return fmt.Errorf("%s: %w", errRead.Error(), ErrFormat) + } + if n != int(header.keylen) { + return fmt.Errorf("n != key length: %w", ErrFormat) } - shiftv := 1 << byte(b[0]) //2^pow - ret, seekerr := c.f.Seek(int64(shiftv-int(lenk)-int(lenv)-sizeHead), 1) // skip val && key + shiftv := 1 << header.sizeb //2^pow + ret, seekerr := c.f.Seek(int64(shiftv-int(header.keylen)-int(header.vallen)-int(sizeHead)), 1) // skip empty tail if seekerr != nil { return ErrFormat } // map store - if b[1] != deleted { + if header.status != deleted && (header.expire == 0 || int64(header.expire) >= time.Now().Unix()) { h := hash(key) - c.m[h] = addrSizeMarshal(uint32(seek), b[0]) + c.m[h] = addrSizeMarshal(uint32(seek), header.sizeb) } else { //deleted blocks store - c.h[uint32(seek)] = b[0] // seek / size + c.h[uint32(seek)] = header.sizeb // seek / size } seek = int(ret) } @@ -135,12 +361,38 @@ func (c *chunk) fsync() error { return nil } +//expirekeys walk all keys and delete expired +func (c *chunk) expirekeys() error { + c.Lock() + defer c.Unlock() + + for h, addrsize := range c.m { + addr, _ := addrSizeUnmarshal(addrsize) + headerbuf := make([]byte, sizeHead) + _, err := c.f.ReadAt(headerbuf, int64(addr)) + if err != nil { + return err + } + header := parseHeader(headerbuf) + if header.expire != 0 && int64(header.expire) < time.Now().Unix() { + delb := []byte{deleted} + _, err = c.f.WriteAt(delb, int64(addr+1)) + if err != nil { + return err + } + delete(c.m, h) + c.h[addr] = header.sizeb + } + } + return nil +} + // set - write data to file & in map -func (c *chunk) set(k, v []byte, h uint32) (err error) { +func (c *chunk) set(k, v []byte, h uint32, expire uint32) (err error) { c.Lock() defer c.Unlock() c.needFsync = true - sizeb, b := packetMarshal(k, v) + header, b := packetMarshal(k, v, expire) // write at file pos := int64(-1) @@ -151,28 +403,27 @@ func (c *chunk) set(k, v []byte, h uint32) (err error) { if err != nil { return err } - key, _, sizeold := packetUnmarshal(packet) + headerold, key, _ := packetUnmarshal(packet) if !bytes.Equal(key, k) { //println(string(key), string(k)) return ErrCollision } - if err == nil && sizeold == sizeb { + if headerold.sizeb == header.sizeb { //overwrite pos = int64(addr) } else { // mark old k/v as deleted - delb := make([]byte, 1) - delb[0] = deleted + delb := []byte{deleted} _, err = c.f.WriteAt(delb, int64(addr+1)) if err != nil { return err } - c.h[addr] = sizeold + c.h[addr] = headerold.sizeb // try to find optimal empty hole for addrh, sizeh := range c.h { - if sizeh == sizeb { + if sizeh == header.sizeb { pos = int64(addrh) delete(c.h, addrh) break @@ -188,12 +439,47 @@ func (c *chunk) set(k, v []byte, h uint32) (err error) { if err != nil { return err } - c.m[h] = addrSizeMarshal(uint32(pos), sizeb) + c.m[h] = addrSizeMarshal(uint32(pos), header.sizeb) + return +} + +// touch - write data to file & in map +func (c *chunk) touch(k []byte, h uint32, expire uint32) (err error) { + c.Lock() + defer c.Unlock() + + if addrsize, ok := c.m[h]; ok { + addr, size := addrSizeUnmarshal(addrsize) + packet := make([]byte, size) + _, err = c.f.ReadAt(packet, int64(addr)) + if err != nil { + return err + } + header, key, _ := packetUnmarshal(packet) + if !bytes.Equal(key, k) { + return ErrCollision + } + if header.expire != 0 && int64(header.expire) < time.Now().Unix() { + return ErrNotFound + } + + header.expire = expire + b := make([]byte, sizeHead) + writeHeader(b, header) + _, err = c.f.WriteAt(b, int64(addr)) + if err != nil { + return err + } + c.needFsync = true + + } else { + return ErrNotFound + } return } // get return val by key -func (c *chunk) get(k []byte, h uint32) (v []byte, err error) { +func (c *chunk) get(k []byte, h uint32) (v []byte, header *Header, err error) { c.RLock() defer c.RUnlock() if addrsize, ok := c.m[h]; ok { @@ -203,13 +489,22 @@ func (c *chunk) get(k []byte, h uint32) (v []byte, err error) { if err != nil { return } - key, val, _ := packetUnmarshal(packet) + header, key, val := packetUnmarshal(packet) if !bytes.Equal(key, k) { - return nil, ErrCollision + return nil, nil, ErrCollision + } + if header.expire != 0 && int64(header.expire) < time.Now().Unix() { + c.RUnlock() + _, err := c.delete(k, h) + c.RLock() + if err != nil { + return nil, nil, err + } + return nil, nil, ErrNotFound } v = val } else { - return nil, ErrNotFound + return nil, nil, ErrNotFound } return } @@ -250,19 +545,18 @@ func (c *chunk) delete(k []byte, h uint32) (isDeleted bool, err error) { if err != nil { return } - key, _, sizeb := packetUnmarshal(packet) + header, key, _ := packetUnmarshal(packet) if !bytes.Equal(key, k) { return false, ErrCollision } - delb := make([]byte, 1) - delb[0] = deleted + delb := []byte{deleted} _, err = c.f.WriteAt(delb, int64(addr+1)) if err != nil { return } delete(c.m, h) - c.h[addr] = sizeb + c.h[addr] = header.sizeb isDeleted = true } return @@ -272,7 +566,12 @@ func (c *chunk) delete(k []byte, h uint32) (isDeleted bool, err error) { func (c *chunk) incrdecr(k []byte, h uint32, v uint64, isIncr bool) (counter uint64, err error) { mutex.Lock() defer mutex.Unlock() - old, err := c.get(k, h) + old, header, err := c.get(k, h) + expire := uint32(0) + if header != nil { + expire = header.expire + } + if err == ErrNotFound { //create empty counter old = make([]byte, 8) @@ -294,35 +593,54 @@ func (c *chunk) incrdecr(k []byte, h uint32, v uint64, isIncr bool) (counter uin } new := make([]byte, 8) binary.BigEndian.PutUint64(new, counter) - err = c.set(k, new, h) + err = c.set(k, new, h, expire) return } -func (c *chunk) backup() (err error) { +func (c *chunk) backup(file *os.File) (err error) { c.Lock() defer c.Unlock() - name := c.f.Name() + ".bak" - os.Remove(name) - - dest, err := os.OpenFile(name, os.O_CREATE|os.O_RDWR, os.FileMode(fileMode)) - if err != nil { - return - } - err = c.f.Sync() - if err != nil { - return - } - _, err = c.f.Seek(0, 0) - if err != nil { - return - } - if _, err = io.Copy(dest, c.f); err != nil { - return + _, seekerr := c.f.Seek(2, 0) + if seekerr != nil { + return fmt.Errorf("%s: %w", seekerr.Error(), ErrFormat) } - err = dest.Sync() - if err != nil { - return + + for { + var header *Header + var errRead error + header, errRead = readHeader(c.f, chunkVersion) + if errRead != nil { + return errRead + } + if header == nil { + break + } + size := int(sizeHead) + int(header.vallen) + int(header.keylen) // record size + b := make([]byte, size) + writeHeader(b, header) + n, errRead := c.f.Read(b[sizeHead:]) + if errRead != nil { + return fmt.Errorf("%s: %w", errRead.Error(), ErrFormat) + } + if n != size-int(sizeHead) { + return fmt.Errorf("n != record length: %d != %d %w", n, size-int(sizeHead), ErrFormat) + } + + shiftv := 1 << header.sizeb //2^pow + _, seekerr := c.f.Seek(int64(shiftv-int(header.keylen)-int(header.vallen)-int(sizeHead)), 1) // skip empty tail + if seekerr != nil { + return ErrFormat + } + + // skip deleted or expired entry + if header.status == deleted || (header.expire != 0 && int64(header.expire) < time.Now().Unix()) { + continue + } + n, errRead = file.Write(b) + if errRead != nil { + return fmt.Errorf("%s: %w", errRead.Error(), ErrFormat) + } } - return dest.Close() + return nil } diff --git a/sniper.go b/sniper.go index c7f2392..67f61ea 100644 --- a/sniper.go +++ b/sniper.go @@ -13,10 +13,8 @@ import ( "github.com/tidwall/interval" ) -const dirMode = 0777 -const fileMode = 0666 -const sizeHead = 8 -const deleted = 42 // flag for removed, tribute 2 dbf +const dirMode = 0755 +const fileMode = 0644 //ErrCollision - must not happen var ErrCollision = errors.New("Error, hash collision") @@ -31,17 +29,21 @@ var counters sync.Map var mutex = &sync.RWMutex{} //global mutex for counters and so on //var chunkColCnt uint32 //chunks for collisions resolving +var expirechunk = 0 // numbeg chunk for next expiration + // Store struct // data in store sharded by chunks type Store struct { sync.RWMutex - chunksCnt int - chunks []chunk - chunkColCnt int - dir string - syncInterval time.Duration - iv interval.Interval - ss *sortedset.SortedSet + chunksCnt int + chunks []chunk + chunkColCnt int + dir string + syncInterval time.Duration + iv interval.Interval + expireInterval time.Duration + expiv interval.Interval + ss *sortedset.SortedSet //tree *btreeset.BTreeSet } @@ -116,6 +118,27 @@ func SyncInterval(interv time.Duration) OptStore { } } +// ExpireInterval - how often run key expiration process +// expire only one chunk +func ExpireInterval(interv time.Duration) OptStore { + return func(s *Store) error { + s.expireInterval = interv + if interv > 0 { + s.expiv = interval.Set(func(t time.Time) { + err := s.chunks[expirechunk].expirekeys() + if err != nil { + fmt.Printf("Error expire:%s\n", err) + } + expirechunk++ + if expirechunk >= s.chunksCnt { + expirechunk = 0 + } + }, interv) + } + return nil + } +} + func hash(b []byte) uint32 { // TODO race, test and replace with https://github.com/spaolacci/murmur3/pull/28 return murmur3.Sum32WithSeed(b, 0) @@ -137,6 +160,7 @@ func Open(opts ...OptStore) (s *Store, err error) { s = &Store{} //default s.syncInterval = 0 + s.expireInterval = 0 s.chunkColCnt = 4 s.chunksCnt = 256 // call option functions on instance to set options on it @@ -152,28 +176,46 @@ func Open(opts ...OptStore) (s *Store, err error) { } s.chunks = make([]chunk, s.chunksCnt) + chchan := make(chan int, s.chunksCnt) + errchan := make(chan error, 4) + var wg sync.WaitGroup + + exitworkers := false + for i := 0; i < 4; i++ { + wg.Add(1) + go func() { + for i := range chchan { + if exitworkers { + break + } + + err := s.chunks[i].init(fmt.Sprintf("%s/%d", s.dir, i)) + if err != nil { + errchan <- err + exitworkers = true + break + } + } + wg.Done() + }() + } + // create chuncks - for i := range s.chunks[:] { + for i := range s.chunks { + chchan <- i + } + close(chchan) - err = s.chunks[i].init(fmt.Sprintf("%s/%d", s.dir, i)) - if err != nil { - return nil, err - } + wg.Wait() + + if len(errchan) > 0 { + err = <-errchan + return } s.ss = sortedset.New() return } -// pack addr & size to addrSize -func addrSizeMarshal(addr uint32, size byte) addrSize { - return addrSize{addr, size} -} - -// unpack addr & size -func addrSizeUnmarshal(as addrSize) (addr, size uint32) { - return as.addr, 1 << as.size -} - func (s *Store) idx(h uint32) uint32 { return uint32((int(h) % (s.chunksCnt - s.chunkColCnt)) + s.chunkColCnt) } @@ -181,13 +223,30 @@ func (s *Store) idx(h uint32) uint32 { // Set - store key and val in shard // max packet size is 2^19, 512kb (524288) // packet size = len(key) + len(val) + 8 -func (s *Store) Set(k, v []byte) (err error) { +func (s *Store) Set(k, v []byte, expire uint32) (err error) { + h := hash(k) + idx := s.idx(h) + err = s.chunks[idx].set(k, v, h, expire) + if err == ErrCollision { + for i := 0; i < int(s.chunkColCnt); i++ { + err = s.chunks[i].set(k, v, h, expire) + if err == ErrCollision { + continue + } + break + } + } + return +} + +// Touch - update key expire +func (s *Store) Touch(k []byte, expire uint32) (err error) { h := hash(k) idx := s.idx(h) - err = s.chunks[idx].set(k, v, h) + err = s.chunks[idx].touch(k, h, expire) if err == ErrCollision { for i := 0; i < int(s.chunkColCnt); i++ { - err = s.chunks[i].set(k, v, h) + err = s.chunks[i].touch(k, h, expire) if err == ErrCollision { continue } @@ -201,10 +260,10 @@ func (s *Store) Set(k, v []byte) (err error) { func (s *Store) Get(k []byte) (v []byte, err error) { h := hash(k) idx := s.idx(h) - v, err = s.chunks[idx].get(k, h) + v, _, err = s.chunks[idx].get(k, h) if err == ErrCollision { for i := 0; i < int(s.chunkColCnt); i++ { - v, err = s.chunks[i].get(k, h) + v, _, err = s.chunks[i].get(k, h) if err == ErrCollision || err == ErrNotFound { continue } @@ -228,6 +287,9 @@ func (s *Store) Close() (err error) { if s.syncInterval > 0 { s.iv.Clear() } + if s.expireInterval > 0 { + s.expiv.Clear() + } for i := range s.chunks[:] { err = s.chunks[i].close() if err != nil { @@ -258,41 +320,6 @@ func (s *Store) FileSize() (fs int64, err error) { return } -// https://github.com/thejerf/gomempool/blob/master/pool.go#L519 -// http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2 -// suitably modified to work on 32-bit -func nextPowerOf2(v uint32) uint32 { - v-- - v |= v >> 1 - v |= v >> 2 - v |= v >> 4 - v |= v >> 8 - v |= v >> 16 - v++ - - return v -} - -// NextPowerOf2 return next power of 2 for v and it's value -// return maxuint32 in case of overflow -func NextPowerOf2(v uint32) (power byte, val uint32) { - if v > 0 { - val = 1 - } - for power = 0; power < 32; power++ { - val = nextPowerOf2(val) - if val >= v { - break - } - val++ - } - if power == 32 { - //overflow - val = 4294967295 - } - return -} - // Delete - delete item by key func (s *Store) Delete(k []byte) (isDeleted bool, err error) { h := hash(k) @@ -326,11 +353,71 @@ func (s *Store) Decr(k []byte, v uint64) (uint64, error) { return s.chunks[idx].incrdecr(k, h, v, false) } -// Backup is very stupid now. It remove files with same name with bak extension -// and create new backup files -func (s *Store) Backup() (err error) { +// Backup all data into one file +func (s *Store) Backup(name string) (err error) { + file, err := os.OpenFile(name, os.O_CREATE|os.O_WRONLY|os.O_EXCL, os.FileMode(fileMode)) + if err != nil { + return err + } + defer file.Close() + _, err = file.Write([]byte{chunkVersion}) + for i := range s.chunks[:] { + err = s.chunks[i].backup(file) + if err != nil { + return + } + } + return +} + +// Restore from backup file +func (s *Store) Restore(name string) (err error) { + file, err := os.OpenFile(name, os.O_RDONLY, os.FileMode(fileMode)) + if err != nil { + return err + } + defer file.Close() + b := make([]byte, 1) + _, err = file.Read(b) + if int(b[0]) != chunkVersion { + return fmt.Errorf("Bad backup version %d", b[0]) + } + + for { + var header *Header + var errRead error + header, errRead = readHeader(file, chunkVersion) + if errRead != nil { + return errRead + } + if header == nil { + break + } + size := int(sizeHead) + int(header.vallen) + int(header.keylen) // record size + b := make([]byte, size) + writeHeader(b, header) + n, errRead := file.Read(b[sizeHead:]) + if errRead != nil { + return fmt.Errorf("%s: %w", errRead.Error(), ErrFormat) + } + if n != size-int(sizeHead) { + return fmt.Errorf("n != record length: %w", ErrFormat) + } + + // skip deleted or expired entry + if header.status == deleted || (header.expire != 0 && int64(header.expire) < time.Now().Unix()) { + continue + } + _, key, val := packetUnmarshal(b) + s.Set(key, val, header.expire) + } + return +} + +// Expire - remove expired keys from all chunks +func (s *Store) Expire() (err error) { for i := range s.chunks[:] { - err = s.chunks[i].backup() + err = s.chunks[i].expirekeys() if err != nil { return } @@ -377,7 +464,7 @@ func (s *Store) Bucket(name string) (*sortedset.BucketStore, error) { buckets += "," } buckets += name - err = s.Set(bKey, []byte(buckets)) + err = s.Set(bKey, []byte(buckets), 0) if err != nil { return nil, err } @@ -390,7 +477,7 @@ func (s *Store) Bucket(name string) (*sortedset.BucketStore, error) { func (s *Store) Put(bucket *sortedset.BucketStore, k, v []byte) (err error) { key := []byte(bucket.Name) key = append(key, k...) - err = s.Set(key, v) + err = s.Set(key, v, 0) if err == nil { bucket.Put(string(k)) } diff --git a/sniper_test.go b/sniper_test.go index 6f4e102..13ab6b2 100644 --- a/sniper_test.go +++ b/sniper_test.go @@ -8,6 +8,7 @@ import ( "os" "runtime" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/tidwall/lotsa" @@ -88,10 +89,10 @@ func TestCmd(t *testing.T) { s, err := Open(Dir("1")) assert.NoError(t, err) - err = s.Set([]byte("hello"), []byte("go")) + err = s.Set([]byte("hello"), []byte("go"), 0) assert.NoError(t, err) - err = s.Set([]byte("hello"), []byte("world")) + err = s.Set([]byte("hello"), []byte("world"), 0) assert.NoError(t, err) res, err := s.Get([]byte("hello")) @@ -136,14 +137,12 @@ func TestCmd(t *testing.T) { assert.NoError(t, err) assert.Equal(t, uint64(18446744073709551615), uint64(cnt)) - err = s.Backup() - assert.NoError(t, err) - err = s.Close() assert.NoError(t, err) err = DeleteStore("1") assert.NoError(t, err) + sniperBench(seed(100_000)) } @@ -205,7 +204,7 @@ func sniperBench(keys [][]byte, N int) { b := make([]byte, 8) binary.BigEndian.PutUint64(b, uint64(i)) //println("set", i, keys[i], b) - err := s.Set(keys[i], b) + err := s.Set(keys[i], b, 0) if err == ErrCollision { coll++ err = nil @@ -255,9 +254,9 @@ func TestSingleFile(t *testing.T) { DeleteStore("2") s, err := Open(Dir("2"), ChunksCollision(0), ChunksTotal(1)) assert.NoError(t, err) - err = s.Set([]byte("mgdbywinfo"), []byte("1")) + err = s.Set([]byte("mgdbywinfo"), []byte("1"), 0) assert.NoError(t, err) - err = s.Set([]byte("uzmqkfjche"), []byte("2")) + err = s.Set([]byte("uzmqkfjche"), []byte("2"), 0) assert.NoError(t, err) v, err := s.Get([]byte("uzmqkfjche")) @@ -307,10 +306,10 @@ func TestEmptyKey(t *testing.T) { s, err := Open(Dir("1")) assert.NoError(t, err) - err = s.Set([]byte(""), []byte("go")) + err = s.Set([]byte(""), []byte("go"), 0) assert.NoError(t, err) - err = s.Set([]byte(""), []byte("world")) + err = s.Set([]byte(""), []byte("world"), 0) assert.NoError(t, err) res, err := s.Get([]byte("")) @@ -326,3 +325,165 @@ func TestEmptyKey(t *testing.T) { err = DeleteStore("1") assert.NoError(t, err) } + +func TestExpireKey(t *testing.T) { + err := DeleteStore("1") + assert.NoError(t, err) + + s, err := Open(Dir("1")) + assert.NoError(t, err) + + unixtime := uint32(time.Now().Unix()) + + // set key with expire 1 sec + err = s.Set([]byte("key1"), []byte("go"), unixtime+1) + assert.NoError(t, err) + + // set key with expire 3 sec + err = s.Set([]byte("key2"), []byte("world"), unixtime+3) + assert.NoError(t, err) + + // try get key1 + res, err := s.Get([]byte("key1")) + assert.NoError(t, err) + + assert.Equal(t, true, bytes.Equal(res, []byte("go"))) + + assert.Equal(t, 2, s.Count()) + + // sleep 2 sec, key1 should expired + time.Sleep(time.Second * 2) + + res, err = s.Get([]byte("key1")) + assert.Equal(t, ErrNotFound, err) + + assert.Equal(t, 1, s.Count()) + + // key2 must exist + res, err = s.Get([]byte("key2")) + assert.NoError(t, err) + + assert.Equal(t, true, bytes.Equal(res, []byte("world"))) + + // sleep 2 sec, key1 should expired + time.Sleep(time.Second * 2) + + res, err = s.Get([]byte("key2")) + assert.Equal(t, ErrNotFound, err) + + // all keys expired + assert.Equal(t, 0, s.Count()) + + /* test Expire method */ + unixtime = uint32(time.Now().Unix()) + + err = s.Set([]byte("key1"), []byte("go"), unixtime+1) + assert.NoError(t, err) + + // sleep 2 sec, key1 should expired + time.Sleep(time.Second * 2) + err = s.Expire() + assert.NoError(t, err) + + // all keys expired + assert.Equal(t, 0, s.Count()) + + /* test touch */ + unixtime = uint32(time.Now().Unix()) + + err = s.Set([]byte("key"), []byte("go"), unixtime+4) + assert.NoError(t, err) + + // sleep 2 sec, key1 should stay + time.Sleep(time.Second * 2) + res, err = s.Get([]byte("key")) + assert.NoError(t, err) + + unixtime = uint32(time.Now().Unix()) + err = s.Touch([]byte("key"), unixtime+3) + assert.NoError(t, err) + + // sleep 3 sec, key1 should stay + time.Sleep(time.Second * 3) + res, err = s.Get([]byte("key")) + assert.NoError(t, err) + + // sleep 3 sec, key should expired + time.Sleep(time.Second * 3) + err = s.Expire() + assert.NoError(t, err) + + // all keys expired + assert.Equal(t, 0, s.Count()) + + err = s.Close() + assert.NoError(t, err) + + err = DeleteStore("1") + assert.NoError(t, err) +} + +func getRandKey(rnd *rand.Rand, n int) []byte { + s := make([]byte, n) + rnd.Read(s) + for i := 0; i < n; i++ { + s[i] = 'a' + (s[i] % 26) + } + return s +} + +func TestBackup(t *testing.T) { + var backup = "data1.backup" + + os.Remove(backup) + err := DeleteStore("1") + assert.NoError(t, err) + + s, err := Open(Dir("1")) + assert.NoError(t, err) + + seed := time.Now().UnixNano() + rng := rand.New(rand.NewSource(seed)) + coll := 0 + + for i := 0; i < 1000000; i++ { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, uint64(i)) + err := s.Set(getRandKey(rng, 10), b, 0) + if err == ErrCollision { + coll++ + err = nil + } + if err != nil { + panic(err) + } + } + // count keys + keys1 := s.Count() + // create backup + err = s.Backup(backup) + if err != nil { + panic(err) + } + err = s.Close() + assert.NoError(t, err) + + err = DeleteStore("1") + assert.NoError(t, err) + + s, err = Open(Dir("1")) + assert.NoError(t, err) + + err = s.Restore(backup) + keys2 := s.Count() + assert.Equal(t, keys1, keys2) + + err = s.Close() + assert.NoError(t, err) + + err = DeleteStore("1") + assert.NoError(t, err) + + err = os.Remove(backup) + assert.NoError(t, err) +}