Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Neo Store UT] Add leveldb thread UT and readme. #3427

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 113 additions & 0 deletions src/Plugins/LevelDBStore/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
Sure, here's the revised README document in a more natural, native English style:
shargon marked this conversation as resolved.
Show resolved Hide resolved

---

# LevelDB and Snapshot Thread Safety Issues

## Overview

LevelDB is a fast key-value storage library developed by Google. It's designed to provide high performance and support for high concurrency applications. While LevelDB excels in single-threaded environments, it has some limitations when it comes to multi-threaded operations.

## What is LevelDB?

LevelDB offers the following key features:

- **Key-Value Storage**: Supports storage of keys and values of any size.
- **Ordered Storage**: Key-value pairs are sorted by the key in lexicographical order.
- **Efficient Read/Write**: Optimized for disk access to improve read/write performance.
- **Snapshots**: Provides a consistent view of the database at a specific point in time.
- **Batch Writes**: Allows multiple write operations to be grouped into a single atomic operation.

## Snapshots

LevelDB snapshots let you capture the state of the database at a specific moment. This means you can read data from a snapshot without worrying about changes occurring during the read. Snapshots are great for read-only operations that need a consistent view of the data.

## Thread Safety Issues

Despite its many strengths, LevelDB has some limitations regarding multi-threaded operations, especially with writes.

### Multi-Threaded Writes

LevelDB is not thread-safe when it comes to concurrent writes. Multiple threads trying to write to the database at the same time can lead to data corruption, crashes, and other undefined behaviors. Thus, writing to the same database from multiple threads without proper synchronization is unsafe.

### Snapshot Thread Safety

Snapshots, while useful for consistent reads, are not designed for concurrent write operations. Here are the key issues:

1. **Concurrent Write Conflicts**: Multiple threads attempting to write to a snapshot can result in data inconsistencies or corruption.
2. **Lack of Thread Safety**: Snapshots are not inherently thread-safe, so concurrent operations on a single snapshot can lead to unpredictable behavior.

```text
A database may only be opened by one process at a time. The leveldb implementation acquires a lock from the operating system to prevent misuse. Within a single process, the same leveldb::DB object may be safely shared by multiple concurrent threads. I.e., different threads may write into or fetch iterators or call Get on the same database without any external synchronization (the leveldb implementation will automatically do the required synchronization). However other objects (like Iterator and WriteBatch) may require external synchronization. If two threads share such an object, they must protect access to it using their own locking protocol. More details are available in the public header files.
```

## Example

Here's an example test class demonstrating the thread safety issues with LevelDB and snapshots:

```csharp
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Neo.IO.Data.LevelDB;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace Neo.Plugins.Storage.Tests
{
[TestClass]
public partial class StoreTest
{
[TestMethod]
[ExpectedException(typeof(AggregateException))]
public void TestMultiThreadLevelDbSnapshotPut()
{
using var store = levelDbStore.GetStore(path_leveldb);
using var snapshot = store.GetSnapshot();
var testKey = new byte[] { 0x01, 0x02, 0x03 };

var threadCount = 1;
while (true)
{
var tasks = new Task[threadCount];
try
{
for (var i = 0; i < tasks.Length; i++)
{
var value = new byte[] { 0x04, 0x05, 0x06, (byte)i };
tasks[i] = Task.Run(() =>
{
Thread.Sleep(new Random().Next(1, 10));
snapshot.Put(testKey, value);
snapshot.Commit();
});
}

Task.WaitAll(tasks);
threadCount++;
}
catch (AggregateException)
{
Console.WriteLine($"AggregateException caught with {threadCount} threads.");
throw;
}
catch (LevelDBException ex)
{
Console.WriteLine($"LevelDBException caught with {threadCount} threads: {ex.Message}");
break;
}
catch (Exception ex)
{
Assert.Fail("Unexpected exception: " + ex.Message);
}
}
}
}
}
```

In this test, we increment the number of threads and demonstrate the exceptions that can occur when multiple threads write to a LevelDB snapshot.

## References

- [LevelDB Documentation](https://github.com/google/leveldb)
- [LevelDB Snapshots](https://github.com/google/leveldb/blob/main/doc/index.md)
198 changes: 198 additions & 0 deletions tests/Neo.Plugins.Storage.Tests/StoreTest.MultiThread.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
// Copyright (C) 2015-2024 The Neo Project.
//
// StoreSnapshotTest.cs file belongs to the neo project and is free
// software distributed under the MIT software license, see the
// accompanying file LICENSE in the main directory of the
// repository or http://www.opensource.org/licenses/mit-license.php
// for more details.
//
// Redistribution and use in source and binary forms with or without
// modifications are permitted.

using Microsoft.VisualStudio.TestTools.UnitTesting;
using Neo.IO.Data.LevelDB;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace Neo.Plugins.Storage.Tests;

/*
* LevelDB Thread Safety Explanation:
*
* LevelDB is designed to be a fast key-value storage library. However, it has
* some limitations regarding thread safety. Specifically, LevelDB is not thread-safe
* when multiple threads are attempting to write to the database concurrently. This can
* lead to data corruption, crashes, and other undefined behaviors.
*
* LevelDB provides snapshots and batch writes. Snapshots allow
* a consistent view of the database at a point in time, but they are not designed for
* concurrent write operations. Batch writes can be used to perform atomic updates,
* but they also need to be managed carefully to avoid concurrency issues.
*
* In this test class, we demonstrate these thread safety issues and how to mitigate
* them using different approaches such as locking mechanisms and creating separate
* snapshots for each thread.
*/
partial class StoreTest
{
[TestMethod]
[ExpectedException(typeof(AggregateException))]
public void TestMultiThreadLevelDbSnapshotPut()
{
using var store = levelDbStore.GetStore(path_leveldb);
using var snapshot = store.GetSnapshot();
var testKey = new byte[] { 0x01, 0x02, 0x03 };

var threadCount = 1;
while (true)
{
var tasks = new Task[threadCount];
try
{
for (var i = 0; i < tasks.Length; i++)
{
var value = new byte[] { 0x04, 0x05, 0x06, (byte)i };
tasks[i] = Task.Run(() =>
{
// Introduce delay to increase conflict chance
Thread.Sleep(new Random().Next(1, 10));
Jim8y marked this conversation as resolved.
Show resolved Hide resolved
// Attempt to write to the snapshot and commit
snapshot.Put(testKey, value);
snapshot.Commit();
});
}

// Wait for all tasks to complete
Task.WaitAll(tasks);
threadCount++;
}
catch (AggregateException)
{
// AggregateException is expected due to concurrent access
Console.WriteLine($"AggregateException caught with {threadCount} threads.");
throw;
}
catch (LevelDBException ex)
{
// LevelDBException is also possible due to LevelDB being thread-unsafe
Console.WriteLine($"LevelDBException caught with {threadCount} threads: {ex.Message}");
break;
}
catch (Exception ex)
{
Assert.Fail("Unexpected exception: " + ex.Message);
}
}
}

[TestMethod]
public void TestMultiThreadLevelDbSnapshotPutUntilException()
{
using var store = levelDbStore.GetStore(path_leveldb);
using var snapshot = store.GetSnapshot();
var testKey = new byte[] { 0x01, 0x02, 0x03 };

var threadCount = 1;
while (true)
shargon marked this conversation as resolved.
Show resolved Hide resolved
{
var tasks = new Task[threadCount];
try
{
for (var i = 0; i < tasks.Length; i++)
{
var value = new byte[] { 0x04, 0x05, 0x06, (byte)i };
tasks[i] = Task.Run(() =>
{
// Introduce delay to increase conflict chance
Thread.Sleep(new Random().Next(1, 100));
Jim8y marked this conversation as resolved.
Show resolved Hide resolved
// Attempt to write to the snapshot without committing
snapshot.Put(testKey, value);
});
}

// Wait for all tasks to complete
Task.WaitAll(tasks);

// Attempt to commit the changes
snapshot.Commit();
threadCount++;
}
catch (AggregateException ex)
{
// AggregateException is expected due to concurrent access
Console.WriteLine($"AggregateException caught with {threadCount} threads.");
break;
}
catch (LevelDBException ex)
{
// LevelDBException is also possible due to LevelDB being thread-unsafe
Console.WriteLine($"LevelDBException caught with {threadCount} threads.");
break;
}
catch (Exception ex)
{
Assert.Fail("Unexpected exception: " + ex.Message);
}
}
}

[TestMethod]
public void TestMultiThreadLevelDbSnapshotPutWithLocker()
{
using var store = levelDbStore.GetStore(path_leveldb);

object locker = new();
var testKey = new byte[] { 0x01, 0x02, 0x03 };

var tasks = new Task[10];
for (var i = 0; i < tasks.Length; i++)
{
var value = new byte[] { 0x04, 0x05, 0x06, (byte)i };
tasks[i] = Task.Run(() =>
{
using var snapshot = store.GetSnapshot();
// Use a lock to ensure thread-safe access to the snapshot
lock (locker)
{
snapshot.Put(testKey, value);
snapshot.Commit();
}
});
}

// Wait for all tasks to complete
Task.WaitAll(tasks);
}

[TestMethod]
public void TestOneSnapshotPerThreadLevelDbSnapshotPut()
{
using var store = levelDbStore.GetStore(path_leveldb);
var testKey = new byte[] { 0x01, 0x02, 0x03 };

var tasks = new Task[1000];
for (var i = 0; i < tasks.Length; i++)
{
var value = new byte[] { 0x04, 0x05, 0x06, (byte)i };
tasks[i] = Task.Run(() =>
{
try
{
// Create a new snapshot for each thread to avoid concurrent access issues
using var snapshot = store.GetSnapshot();
snapshot.Put(testKey, value);
snapshot.Commit();
}
catch (Exception ex)
{
Console.WriteLine($"Task {i} encountered an exception: {ex}");
throw;
}
});
}

// Wait for all tasks to complete
Task.WaitAll(tasks);
}
}
2 changes: 1 addition & 1 deletion tests/Neo.Plugins.Storage.Tests/StoreTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
namespace Neo.Plugins.Storage.Tests
{
[TestClass]
public class StoreTest
public partial class StoreTest
{
private const string path_leveldb = "Data_LevelDB_UT";
private const string path_rocksdb = "Data_RocksDB_UT";
Expand Down
Loading