From 869076ff603b374d700465340b19ebd87ca601b6 Mon Sep 17 00:00:00 2001 From: Alex Gaetano Padula Date: Wed, 30 Oct 2024 02:21:50 -0400 Subject: [PATCH] flushing correction on clearing memtable and minor commenting a variety of functions --- k4.go | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/k4.go b/k4.go index f03deb0..99becb9 100644 --- a/k4.go +++ b/k4.go @@ -283,6 +283,7 @@ func (k4 *K4) Close() error { } // printLog prints a log message to the log file or stdout +// takes a string message func (k4 *K4) printLog(msg string) { if k4.logging { log.Println(msg) @@ -290,14 +291,18 @@ func (k4 *K4) printLog(msg string) { } // backgroundWalWriter writes operations to the write ahead log +// This function runs in the background and pops operations from the wal queue and writes +// to write ahead log. The reason we do this is to optimize write speed func (k4 *K4) backgroundWalWriter() { - defer k4.wg.Done() + defer k4.wg.Done() // Defer completion of routine + for { - k4.walQueueLock.Lock() - if len(k4.walQueue) > 0 { - op := k4.walQueue[0] - k4.walQueue = k4.walQueue[1:] - k4.walQueueLock.Unlock() + k4.walQueueLock.Lock() // lock up the wal queue + + if len(k4.walQueue) > 0 { // Check if there are operations in the wal queue + op := k4.walQueue[0] // Get the first operation + k4.walQueue = k4.walQueue[1:] // Remove the first operation + k4.walQueueLock.Unlock() // Unlock the wal queue // Serialize operation data := serializeOp(op.Op, op.Key, op.Value) @@ -305,10 +310,12 @@ func (k4 *K4) backgroundWalWriter() { // Write to WAL _, err := k4.wal.Write(data) if err != nil { - k4.printLog(fmt.Sprintf("Failed to write to WAL: %v", err)) + k4.printLog(fmt.Sprintf("Failed to write to WAL: %v", err)) // Log error } } else { - k4.walQueueLock.Unlock() + k4.walQueueLock.Unlock() // Unlock the wal queue + // We will sleep for a bit to avoid CPU bursts + time.Sleep(28 * time.Millisecond) } select { @@ -339,7 +346,7 @@ func serializeOp(op OPR_CODE, key, value []byte) []byte { return nil } - return buf.Bytes() + return buf.Bytes() // return the encoded bytes } @@ -351,7 +358,6 @@ func deserializeOp(data []byte) (OPR_CODE, []byte, []byte, error) { dec := gob.NewDecoder(bytes.NewReader(data)) // Create a new decoder err := dec.Decode(&operation) // Decode the operation - if err != nil { return 0, nil, nil, err } @@ -397,9 +403,11 @@ func deserializeKv(data []byte) (key, value []byte, err error) { // loadSSTables loads SSTables from the directory func (k4 *K4) loadSSTables() { - // Open configured directory + // Open configured K4 directory + dir, err := os.Open(k4.directory) if err != nil { + k4.printLog(fmt.Sprintf("Failed to open directory: %v", err)) return } @@ -505,7 +513,7 @@ func (k4 *K4) flushMemtable() error { k4.sstables = append(k4.sstables, sstable) // Clear memtable - skiplist.NewSkipList(k4.memtableMaxLevel, k4.memtableP) + k4.memtable = skiplist.NewSkipList(k4.memtableMaxLevel, k4.memtableP) k4.printLog("Flushed memtable")