Skip to content

Commit

Permalink
flushing correction on clearing memtable and minor commenting a varie…
Browse files Browse the repository at this point in the history
…ty of functions
  • Loading branch information
Alex Gaetano Padula committed Oct 30, 2024
1 parent af5809c commit 869076f
Showing 1 changed file with 20 additions and 12 deletions.
32 changes: 20 additions & 12 deletions k4.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,32 +283,39 @@ func (k4 *K4) Close() error {
}

// printLog prints a log message to the log file or stdout
// takes a string message
func (k4 *K4) printLog(msg string) {
if k4.logging {
log.Println(msg)
}
}

// backgroundWalWriter writes operations to the write ahead log
// This function runs in the background and pops operations from the wal queue and writes
// to write ahead log. The reason we do this is to optimize write speed
func (k4 *K4) backgroundWalWriter() {
defer k4.wg.Done()
defer k4.wg.Done() // Defer completion of routine

for {
k4.walQueueLock.Lock()
if len(k4.walQueue) > 0 {
op := k4.walQueue[0]
k4.walQueue = k4.walQueue[1:]
k4.walQueueLock.Unlock()
k4.walQueueLock.Lock() // lock up the wal queue

if len(k4.walQueue) > 0 { // Check if there are operations in the wal queue
op := k4.walQueue[0] // Get the first operation
k4.walQueue = k4.walQueue[1:] // Remove the first operation
k4.walQueueLock.Unlock() // Unlock the wal queue

// Serialize operation
data := serializeOp(op.Op, op.Key, op.Value)

// Write to WAL
_, err := k4.wal.Write(data)
if err != nil {
k4.printLog(fmt.Sprintf("Failed to write to WAL: %v", err))
k4.printLog(fmt.Sprintf("Failed to write to WAL: %v", err)) // Log error
}
} else {
k4.walQueueLock.Unlock()
k4.walQueueLock.Unlock() // Unlock the wal queue
// We will sleep for a bit to avoid CPU bursts
time.Sleep(28 * time.Millisecond)
}

select {
Expand Down Expand Up @@ -339,7 +346,7 @@ func serializeOp(op OPR_CODE, key, value []byte) []byte {
return nil
}

return buf.Bytes()
return buf.Bytes() // return the encoded bytes

}

Expand All @@ -351,7 +358,6 @@ func deserializeOp(data []byte) (OPR_CODE, []byte, []byte, error) {
dec := gob.NewDecoder(bytes.NewReader(data)) // Create a new decoder

err := dec.Decode(&operation) // Decode the operation

if err != nil {
return 0, nil, nil, err
}
Expand Down Expand Up @@ -397,9 +403,11 @@ func deserializeKv(data []byte) (key, value []byte, err error) {

// loadSSTables loads SSTables from the directory
func (k4 *K4) loadSSTables() {
// Open configured directory
// Open configured K4 directory

dir, err := os.Open(k4.directory)
if err != nil {
k4.printLog(fmt.Sprintf("Failed to open directory: %v", err))
return
}

Expand Down Expand Up @@ -505,7 +513,7 @@ func (k4 *K4) flushMemtable() error {
k4.sstables = append(k4.sstables, sstable)

// Clear memtable
skiplist.NewSkipList(k4.memtableMaxLevel, k4.memtableP)
k4.memtable = skiplist.NewSkipList(k4.memtableMaxLevel, k4.memtableP)

k4.printLog("Flushed memtable")

Expand Down

0 comments on commit 869076f

Please sign in to comment.