Skip to content

Commit

Permalink
- put back old code
Browse files Browse the repository at this point in the history
  • Loading branch information
iulianpascalau committed Sep 18, 2023
1 parent 2bacb0e commit 5fca300
Showing 1 changed file with 14 additions and 53 deletions.
67 changes: 14 additions & 53 deletions leveldb/leveldbSerial.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"time"

"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-core-go/core/closing"
"github.com/multiversx/mx-chain-storage-go/common"
"github.com/multiversx/mx-chain-storage-go/types"
Expand All @@ -26,14 +25,11 @@ type SerialDB struct {
maxBatchSize int
batchDelaySeconds int
sizeBatch int

accessBatch types.Batcher
writingBatch types.Batcher
mutBatch sync.RWMutex

dbAccess chan serialQueryer
cancel context.CancelFunc
closer core.SafeCloser
batch types.Batcher
mutBatch sync.RWMutex
dbAccess chan serialQueryer
cancel context.CancelFunc
closer core.SafeCloser
}

// NewSerialDB is a constructor for the leveldb persister
Expand Down Expand Up @@ -84,7 +80,7 @@ func NewSerialDB(path string, batchDelaySeconds int, maxBatchSize int, maxOpenFi
closer: closing.NewSafeChanCloser(),
}

dbStore.accessBatch = NewBatch()
dbStore.batch = NewBatch()

go dbStore.batchTimeoutHandle(ctx)
go dbStore.processLoop(ctx)
Expand Down Expand Up @@ -146,7 +142,7 @@ func (s *SerialDB) Put(key, val []byte) error {
}

s.mutBatch.RLock()
err := s.accessBatch.Put(key, val)
err := s.batch.Put(key, val)
s.mutBatch.RUnlock()
if err != nil {
return err
Expand All @@ -162,12 +158,12 @@ func (s *SerialDB) Get(key []byte) ([]byte, error) {
}

s.mutBatch.RLock()
if s.isRemoved(key) {
if s.batch.IsRemoved(key) {
s.mutBatch.RUnlock()
return nil, common.ErrKeyNotFound
}

data := s.getFromBatches(key)
data := s.batch.Get(key)
s.mutBatch.RUnlock()

if data != nil {
Expand Down Expand Up @@ -204,12 +200,12 @@ func (s *SerialDB) Has(key []byte) error {
}

s.mutBatch.RLock()
if s.isRemoved(key) {
if s.batch.IsRemoved(key) {
s.mutBatch.RUnlock()
return common.ErrKeyNotFound
}

data := s.getFromBatches(key)
data := s.batch.Get(key)
s.mutBatch.RUnlock()

if data != nil {
Expand All @@ -232,30 +228,6 @@ func (s *SerialDB) Has(key []byte) error {
return result
}

func (s *SerialDB) isRemoved(key []byte) bool {
if s.accessBatch.IsRemoved(key) {
return true
}
if check.IfNil(s.writingBatch) {
return false
}

return s.writingBatch.IsRemoved(key)
}

func (s *SerialDB) getFromBatches(key []byte) []byte {
// start testing the access batch as it will contain the most up-to-date variant
data := s.accessBatch.Get(key)
if data != nil {
return data
}
if check.IfNil(s.writingBatch) {
return nil
}

return s.writingBatch.Get(key)
}

func (s *SerialDB) tryWriteInDbAccessChan(req serialQueryer) error {
select {
case s.dbAccess <- req:
Expand All @@ -268,20 +240,13 @@ func (s *SerialDB) tryWriteInDbAccessChan(req serialQueryer) error {
// putBatch writes the Batch data into the database
func (s *SerialDB) putBatch() error {
s.mutBatch.Lock()
if !check.IfNil(s.writingBatch) {
s.mutBatch.Unlock()
return nil
}

s.writingBatch = s.accessBatch

dbBatch, ok := s.writingBatch.(*batch)
dbBatch, ok := s.batch.(*batch)
if !ok {
s.mutBatch.Unlock()
return common.ErrInvalidBatch
}
s.sizeBatch = 0
s.accessBatch = NewBatch()
s.batch = NewBatch()
s.mutBatch.Unlock()

ch := make(chan error)
Expand All @@ -297,10 +262,6 @@ func (s *SerialDB) putBatch() error {
result := <-ch
close(ch)

s.mutBatch.Lock()
s.writingBatch = nil
s.mutBatch.Unlock()

return result
}

Expand All @@ -326,7 +287,7 @@ func (s *SerialDB) Remove(key []byte) error {
}

s.mutBatch.Lock()
_ = s.accessBatch.Delete(key)
_ = s.batch.Delete(key)
s.mutBatch.Unlock()

return s.updateBatchWithIncrement()
Expand Down

0 comments on commit 5fca300

Please sign in to comment.