Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] Part-1 - LevelDbStore #3414

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
260 changes: 212 additions & 48 deletions src/Plugins/LevelDBStore/IO/Data/LevelDB/DB.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,57 +10,156 @@
// modifications are permitted.

using System;
using System.IO;
using System.Collections;
using System.Collections.Generic;
using System.Runtime.InteropServices;

namespace Neo.IO.Data.LevelDB
namespace Neo.IO.Storage.LevelDB
{
public class DB : IDisposable
/// <summary>
/// A DB is a persistent ordered map from keys to values.
/// A DB is safe for concurrent access from multiple threads without any external synchronization.
/// </summary>
public class DB : LevelDBHandle, IEnumerable<KeyValuePair<byte[], byte[]>>
{
private IntPtr handle;
static void Throw(IntPtr error)
{
Throw(error, msg => new LevelDBException(msg));
}

static void Throw(IntPtr error, Func<string, Exception> exception)
{
if (error != IntPtr.Zero)
{
try
{
var msg = Marshal.PtrToStringAnsi(error);
throw exception(msg);
}
finally
{
Native.leveldb_free(error);
}
}
}

/// <summary>
/// Return true if haven't got valid handle
/// Open the database with the specified "name".
/// </summary>
public bool IsDisposed => handle == IntPtr.Zero;
public DB(string name, Options options)
{
Handle = Native.leveldb_open(options.Handle, name, out var error);
Throw(error, msg => new UnauthorizedAccessException(msg));
}

private DB(IntPtr handle)
public void Close()
{
this.handle = handle;
(this as IDisposable).Dispose();
}

public void Dispose()
/// <summary>
/// Set the database entry for "key" to "value".
/// </summary>
public void Put(byte[] key, byte[] value)
{
if (handle != IntPtr.Zero)
{
Native.leveldb_close(handle);
handle = IntPtr.Zero;
}
Put(key, value, new WriteOptions());
}

/// <summary>
/// Set the database entry for "key" to "value".
/// Note: consider setting new WriteOptions{ Sync = true }.
/// </summary>
public void Put(byte[] key, byte[] value, WriteOptions options)
{
Native.leveldb_put(Handle, options.Handle, key, checked((IntPtr)key.LongLength), value, checked((IntPtr)value.LongLength), out var error);
Throw(error);
}

public void Delete(WriteOptions options, byte[] key)
/// <summary>
/// Remove the database entry (if any) for "key".
/// It is not an error if "key" did not exist in the database.
/// </summary>
public void Delete(byte[] key)
{
Native.leveldb_delete(handle, options.handle, key, (UIntPtr)key.Length, out IntPtr error);
NativeHelper.CheckError(error);
Delete(key, new WriteOptions());
}

public byte[] Get(ReadOptions options, byte[] key)
/// <summary>
/// Remove the database entry (if any) for "key".
/// It is not an error if "key" did not exist in the database.
/// Note: consider setting new WriteOptions{ Sync = true }.
/// </summary>
public void Delete(byte[] key, WriteOptions options)
{
IntPtr value = Native.leveldb_get(handle, options.handle, key, (UIntPtr)key.Length, out UIntPtr length, out IntPtr error);
try
{
NativeHelper.CheckError(error);
return value.ToByteArray(length);
}
finally
Native.leveldb_delete(Handle, options.Handle, key, checked((nint)key.LongLength), out var error);
Throw(error);
}

public void Write(WriteBatch batch)
{
Write(batch, new WriteOptions());
}

public void Write(WriteBatch batch, WriteOptions options)
{
Native.leveldb_write(Handle, options.Handle, batch.Handle, out var error);
Throw(error);
}

/// <summary>
/// If the database contains an entry for "key" return the value,
/// otherwise return null.
/// </summary>
public byte[] Get(byte[] key)
{
return Get(key, ReadOptions.Default);
}

/// <summary>
/// If the database contains an entry for "key" return the value,
/// otherwise return null.
/// </summary>
public byte[] Get(byte[] key, ReadOptions options)
{
var v = Native.leveldb_get(Handle, options.Handle, key, checked((IntPtr)key.LongLength), out var length, out var error);
Throw(error);

if (v != IntPtr.Zero)
{
if (value != IntPtr.Zero) Native.leveldb_free(value);
try
{

var len = (long)length;
int c = 0, si = 0;

var bytes = new byte[len];

while (si < len)
{
if (len - si > int.MaxValue)
c += int.MaxValue;
else
c += checked((int)(len - si));

// Method has a ~2GB limit.
Marshal.Copy(v, bytes, si, c);

si += c;
}
return bytes;
}
finally
{
Native.leveldb_free(v);
}
}
return null;
}

public bool Contains(ReadOptions options, byte[] key)
public bool Contains(byte[] key, ReadOptions options)
{
IntPtr value = Native.leveldb_get(handle, options.handle, key, (UIntPtr)key.Length, out _, out IntPtr error);
NativeHelper.CheckError(error);
var value = Native.leveldb_get(Handle, options.Handle, key, key.Length, out _, out var error);
Throw(error);

if (value != IntPtr.Zero)
{
Expand All @@ -71,44 +170,109 @@ public bool Contains(ReadOptions options, byte[] key)
return false;
}

public Snapshot GetSnapshot()
/// <summary>
/// Return an iterator over the contents of the database.
/// The result of CreateIterator is initially invalid (caller must
/// call one of the Seek methods on the iterator before using it).
/// </summary>
public Iterator CreateIterator()
{
return CreateIterator(ReadOptions.Default);
}

/// <summary>
/// Return an iterator over the contents of the database.
/// The result of CreateIterator is initially invalid (caller must
/// call one of the Seek methods on the iterator before using it).
/// </summary>
public Iterator CreateIterator(ReadOptions options)
{
return new Snapshot(handle);
return new Iterator(Native.leveldb_create_iterator(Handle, options.Handle));
}

public Iterator NewIterator(ReadOptions options)
/// <summary>
/// Return a handle to the current DB state.
/// Iterators and Gets created with this handle will all observe a stable snapshot of the current DB state.
/// </summary>
public SnapShot CreateSnapshot()
{
return new Iterator(Native.leveldb_create_iterator(handle, options.handle));
return new SnapShot(Native.leveldb_create_snapshot(Handle), this);
}

public static DB Open(string name)
/// <summary>
/// DB implementations can export properties about their state
/// via this method. If "property" is a valid property understood by this
/// DB implementation, fills "*value" with its current value and returns
/// true. Otherwise returns false.
///
/// Valid property names include:
///
/// "leveldb.num-files-at-level<N>" - return the number of files at level </N>,
/// where <N> is an ASCII representation of a level number (e.g. "0")</N>.
/// "leveldb.stats" - returns a multi-line string that describes statistics
/// about the internal operation of the DB.
/// </summary>
public string PropertyValue(string name)
{
return Open(name, Options.Default);
string result = null;
var ptr = Native.leveldb_property_value(Handle, name);
if (ptr != IntPtr.Zero)
{
try
{
return Marshal.PtrToStringAnsi(ptr);
}
finally
{
Native.leveldb_free(ptr);
}
}
return result;
}

public static DB Open(string name, Options options)
/// <summary>
/// If a DB cannot be opened, you may attempt to call this method to
/// resurrect as much of the contents of the database as possible.
/// Some data may be lost, so be careful when calling this function
/// on a database that contains important information.
/// </summary>
public static void Repair(Options options, string name)
{
Native.leveldb_repair_db(options.Handle, name, out var error);
Throw(error);
}

/// <summary>
/// Destroy the contents of the specified database.
/// Be very careful using this method.
/// </summary>
public static void Destroy(Options options, string name)
{
IntPtr handle = Native.leveldb_open(options.handle, Path.GetFullPath(name), out IntPtr error);
NativeHelper.CheckError(error);
return new DB(handle);
Native.leveldb_destroy_db(options.Handle, name, out var error);
Throw(error);
}

public void Put(WriteOptions options, byte[] key, byte[] value)
protected override void FreeUnManagedObjects()
{
Native.leveldb_put(handle, options.handle, key, (UIntPtr)key.Length, value, (UIntPtr)value.Length, out IntPtr error);
NativeHelper.CheckError(error);
if (Handle != default)
Native.leveldb_close(Handle);
}

public static void Repair(string name, Options options)
public IEnumerator<KeyValuePair<byte[], byte[]>> GetEnumerator()
{
Native.leveldb_repair_db(options.handle, Path.GetFullPath(name), out IntPtr error);
NativeHelper.CheckError(error);
using var sn = CreateSnapshot();
using var iterator = CreateIterator(new ReadOptions { Snapshot = sn });
iterator.SeekToFirst();
while (iterator.IsValid())
{
yield return new KeyValuePair<byte[], byte[]>(iterator.Key(), iterator.Value());
iterator.Next();
}
}

public void Write(WriteOptions options, WriteBatch write_batch)
IEnumerator IEnumerable.GetEnumerator()
{
Native.leveldb_write(handle, options.handle, write_batch.handle, out IntPtr error);
NativeHelper.CheckError(error);
return GetEnumerator();
}
}
}
53 changes: 16 additions & 37 deletions src/Plugins/LevelDBStore/IO/Data/LevelDB/Helper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,55 +9,34 @@
// Redistribution and use in source and binary forms with or without
// modifications are permitted.

using Neo.Persistence;
using System;
using System.Collections.Generic;
using System.Runtime.InteropServices;

namespace Neo.IO.Data.LevelDB
namespace Neo.IO.Storage.LevelDB
{
public static class Helper
{
public static IEnumerable<T> Seek<T>(this DB db, ReadOptions options, byte[] prefix, SeekDirection direction, Func<byte[], byte[], T> resultSelector)
public static IEnumerable<(byte[], byte[])> Seek(this DB db, byte[] prefix, ReadOptions options)
{
using Iterator it = db.NewIterator(options);
if (direction == SeekDirection.Forward)
{
for (it.Seek(prefix); it.Valid(); it.Next())
yield return resultSelector(it.Key(), it.Value());
}
else
{
// SeekForPrev
using var it = db.CreateIterator(options);

it.Seek(prefix);
if (!it.Valid())
it.SeekToLast();
else if (it.Key().AsSpan().SequenceCompareTo(prefix) > 0)
it.Prev();

for (; it.Valid(); it.Prev())
yield return resultSelector(it.Key(), it.Value());
}
for (it.Seek(prefix); it.IsValid(); it.Next())
yield return new(it.Key(), it.Value());
}

public static IEnumerable<T> FindRange<T>(this DB db, ReadOptions options, byte[] startKey, byte[] endKey, Func<byte[], byte[], T> resultSelector)
public static IEnumerable<(byte[], byte[])> SeekPrev(this DB db, byte[] prefix, ReadOptions options)
{
using Iterator it = db.NewIterator(options);
for (it.Seek(startKey); it.Valid(); it.Next())
{
byte[] key = it.Key();
if (key.AsSpan().SequenceCompareTo(endKey) > 0) break;
yield return resultSelector(key, it.Value());
}
}
using var it = db.CreateIterator(options);

internal static byte[] ToByteArray(this IntPtr data, UIntPtr length)
{
if (data == IntPtr.Zero) return null;
byte[] buffer = new byte[(int)length];
Marshal.Copy(data, buffer, 0, (int)length);
return buffer;
it.Seek(prefix);

if (!it.IsValid())
it.SeekToLast();
else if (it.Key().AsSpan().SequenceCompareTo(prefix) > 0)
it.Prev();

for (; it.IsValid(); it.Prev())
yield return new(it.Key(), it.Value());
}
}
}
Loading
Loading