Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Neo Store UT] Add leveldb thread UT and readme. #3427

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
158 changes: 112 additions & 46 deletions tests/Neo.Plugins.Storage.Tests/StoreTest.MultiThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,101 +12,157 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Neo.IO.Data.LevelDB;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Neo.Plugins.Storage.Tests;

/*
* LevelDB Thread Safety Explanation:
*
* LevelDB is designed to be a fast key-value storage library. However, it has
* some limitations regarding thread safety. Specifically, LevelDB is not thread-safe
* when multiple threads are attempting to write to the database concurrently. This can
* lead to data corruption, crashes, and other undefined behaviors.
*
* LevelDB provides snapshots and batch writes. Snapshots allow
* a consistent view of the database at a point in time, but they are not designed for
* concurrent write operations. Batch writes can be used to perform atomic updates,
* but they also need to be managed carefully to avoid concurrency issues.
*
* In this test class, we demonstrate these thread safety issues and how to mitigate
* them using different approaches such as locking mechanisms and creating separate
* snapshots for each thread.
*/
partial class StoreTest
{

[TestMethod]
[ExpectedException(typeof(AggregateException))]
public void TestMultiThreadLevelDbSnapshotPut()
{
using var store = levelDbStore.GetStore(path_leveldb);
var snapshot = store.GetSnapshot();
using var snapshot = store.GetSnapshot();
var testKey = new byte[] { 0x01, 0x02, 0x03 };

var tasks = new Task[100];
for (var i = 0; i < tasks.Length; i++)
var threadCount = 1;
while (true)
{
var value = new byte[] { 0x04, 0x05, 0x06, (byte)i };
tasks[i] = Task.Run(() =>
var tasks = new Task[threadCount];
try
{
snapshot.Put(testKey, value);
snapshot.Commit();
});
for (var i = 0; i < tasks.Length; i++)
{
var value = new byte[] { 0x04, 0x05, 0x06, (byte)i };
tasks[i] = Task.Run(() =>
{
// Introduce delay to increase conflict chance
Thread.Sleep(new Random().Next(1, 10));
Jim8y marked this conversation as resolved.
Show resolved Hide resolved
// Attempt to write to the snapshot and commit
snapshot.Put(testKey, value);
snapshot.Commit();
});
}

// Wait for all tasks to complete
Task.WaitAll(tasks);
threadCount++;
}
catch (AggregateException)
{
// AggregateException is expected due to concurrent access
Console.WriteLine($"AggregateException caught with {threadCount} threads.");
throw;
}
catch (LevelDBException ex)
{
// LevelDBException is also possible due to LevelDB being thread-unsafe
Console.WriteLine($"LevelDBException caught with {threadCount} threads: {ex.Message}");
break;
}
catch (Exception ex)
{
Assert.Fail("Unexpected exception: " + ex.Message);
}
}
Task.WaitAll(tasks);
snapshot.Dispose();
}

[TestMethod]
public void TestMultiThreadLevelDbSnapshotPutWithoutCommit()
public void TestMultiThreadLevelDbSnapshotPutUntilException()
{
using var store = levelDbStore.GetStore(path_leveldb);
var snapshot = store.GetSnapshot();
using var snapshot = store.GetSnapshot();
var testKey = new byte[] { 0x01, 0x02, 0x03 };

var tasks = new Task[100];
for (var i = 0; i < tasks.Length; i++)
var threadCount = 1;
while (true)
shargon marked this conversation as resolved.
Show resolved Hide resolved
{
var value = new byte[] { 0x04, 0x05, 0x06, (byte)i };
tasks[i] = Task.Run(() =>
var tasks = new Task[threadCount];
try
{
snapshot.Put(testKey, value);
});
}
for (var i = 0; i < tasks.Length; i++)
{
var value = new byte[] { 0x04, 0x05, 0x06, (byte)i };
tasks[i] = Task.Run(() =>
{
// Introduce delay to increase conflict chance
Thread.Sleep(new Random().Next(1, 100));
Jim8y marked this conversation as resolved.
Show resolved Hide resolved
// Attempt to write to the snapshot without committing
snapshot.Put(testKey, value);
});
}

try
{
Task.WaitAll(tasks);
snapshot.Commit();
}
catch (AggregateException ae)
{
var innerExceptions = ae.InnerExceptions;
var hasExpectedException = innerExceptions.Any(innerException => innerException is AggregateException or LevelDBException);
// Wait for all tasks to complete
Task.WaitAll(tasks);

if (!hasExpectedException)
// Attempt to commit the changes
snapshot.Commit();
threadCount++;
}
catch (AggregateException ex)
{
// Re-throw if none of the expected exceptions were found
throw;
// AggregateException is expected due to concurrent access
Console.WriteLine($"AggregateException caught with {threadCount} threads.");
break;
}
catch (LevelDBException ex)
{
// LevelDBException is also possible due to LevelDB being thread-unsafe
Console.WriteLine($"LevelDBException caught with {threadCount} threads.");
break;
}
catch (Exception ex)
{
Assert.Fail("Unexpected exception: " + ex.Message);
}
}
finally
{
snapshot.Dispose();
}
}


[TestMethod]
public void TestMultiThreadLevelDbSnapshotPutWithLocker()
{
using var store = levelDbStore.GetStore(path_leveldb);

object locker = new();
var snapshot = store.GetSnapshot();

var testKey = new byte[] { 0x01, 0x02, 0x03 };

var tasks = new Task[100];
var tasks = new Task[10];
for (var i = 0; i < tasks.Length; i++)
{
var value = new byte[] { 0x04, 0x05, 0x06, (byte)i };
tasks[i] = Task.Run(() =>
{
using var snapshot = store.GetSnapshot();
// Use a lock to ensure thread-safe access to the snapshot
lock (locker)
{
snapshot.Put(testKey, value);
snapshot.Commit();
}
});
}

// Wait for all tasks to complete
Task.WaitAll(tasks);
snapshot.Dispose();
}

[TestMethod]
Expand All @@ -121,12 +177,22 @@ public void TestOneSnapshotPerThreadLevelDbSnapshotPut()
var value = new byte[] { 0x04, 0x05, 0x06, (byte)i };
tasks[i] = Task.Run(() =>
{
var snapshot = store.GetSnapshot();
snapshot.Put(testKey, value);
snapshot.Commit();
snapshot.Dispose();
try
{
// Create a new snapshot for each thread to avoid concurrent access issues
using var snapshot = store.GetSnapshot();
snapshot.Put(testKey, value);
snapshot.Commit();
}
catch (Exception ex)
{
Console.WriteLine($"Task {i} encountered an exception: {ex}");
throw;
}
});
}

// Wait for all tasks to complete
Task.WaitAll(tasks);
}
}
Loading