Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 58 additions & 52 deletions src/engine_leveldb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,43 +194,47 @@ void ustore_snapshot_list(ustore_snapshot_list_t* c_ptr) {
return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized");
return_error_if_m(c.count && c.ids, c.error, args_combo_k, "Need outputs!");

linked_memory_lock_t arena = linked_memory(c.arena, c.options, c.error);
return_if_error_m(c.error);
safe_section("Getting Snapshot List", c.error, [&] {
linked_memory_lock_t arena = linked_memory(c.arena, c.options, c.error);
return_if_error_m(c.error);

level_db_t& db = *reinterpret_cast<level_db_t*>(c.db);
std::lock_guard<std::mutex> locker(db.mutex);
std::size_t snapshots_count = db.snapshots.size();
*c.count = static_cast<ustore_size_t>(snapshots_count);
level_db_t& db = *reinterpret_cast<level_db_t*>(c.db);
std::lock_guard<std::mutex> locker(db.mutex);
std::size_t snapshots_count = db.snapshots.size();
*c.count = static_cast<ustore_size_t>(snapshots_count);

// For every snapshot we also need to export IDs
auto ids = arena.alloc_or_dummy(snapshots_count, c.error, c.ids);
return_if_error_m(c.error);
// For every snapshot we also need to export IDs
auto ids = arena.alloc_or_dummy(snapshots_count, c.error, c.ids);
return_if_error_m(c.error);

std::size_t i = 0;
for (const auto& [id, _] : db.snapshots)
ids[i++] = id;
std::size_t i = 0;
for (const auto& [id, _] : db.snapshots)
ids[i++] = id;
});
}

void ustore_snapshot_create(ustore_snapshot_create_t* c_ptr) {
ustore_snapshot_create_t& c = *c_ptr;
return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized");

level_db_t& db = *reinterpret_cast<level_db_t*>(c.db);
std::lock_guard<std::mutex> locker(db.mutex);
auto it = db.snapshots.find(*c.id);
if (it != db.snapshots.end())
return_error_if_m(it->second, c.error, args_wrong_k, "Such snapshot already exists!");
safe_section("Creating Snapshot", c.error, [&] {
level_db_t& db = *reinterpret_cast<level_db_t*>(c.db);
std::lock_guard<std::mutex> locker(db.mutex);
auto it = db.snapshots.find(*c.id);
if (it != db.snapshots.end())
return_error_if_m(it->second, c.error, args_wrong_k, "Such snapshot already exists!");

level_snapshot_t* level_snapshot = nullptr;
safe_section("Allocating snapshot handle", c.error, [&] { level_snapshot = new level_snapshot_t(); });
return_if_error_m(c.error);
level_snapshot_t* level_snapshot = nullptr;
safe_section("Allocating snapshot handle", c.error, [&] { level_snapshot = new level_snapshot_t(); });
return_if_error_m(c.error);

level_snapshot->snapshot = db.native->GetSnapshot();
if (!level_snapshot->snapshot)
*c.error = "Couldn't get a snapshot!";
level_snapshot->snapshot = db.native->GetSnapshot();
if (!level_snapshot->snapshot)
*c.error = "Couldn't get a snapshot!";

*c.id = reinterpret_cast<ustore_snapshot_t>(level_snapshot);
db.snapshots[*c.id] = level_snapshot;
*c.id = reinterpret_cast<ustore_snapshot_t>(level_snapshot);
db.snapshots[*c.id] = level_snapshot;
});
}

void ustore_snapshot_export(ustore_snapshot_export_t* c_ptr) {
Expand Down Expand Up @@ -272,25 +276,25 @@ void ustore_snapshot_export(ustore_snapshot_export_t* c_ptr) {
}

void ustore_snapshot_drop(ustore_snapshot_drop_t* c_ptr) {
if (!c_ptr)
return;

ustore_snapshot_drop_t& c = *c_ptr;
if (!c.id)
return;

level_db_t& db = *reinterpret_cast<level_db_t*>(c.db);
level_snapshot_t& snap = *reinterpret_cast<level_snapshot_t*>(c.id);
if (!snap.snapshot)
return;
safe_section("Dropping Snapshot", c.error, [&] {
level_db_t& db = *reinterpret_cast<level_db_t*>(c.db);
level_snapshot_t& snap = *reinterpret_cast<level_snapshot_t*>(c.id);
if (!snap.snapshot)
return;

db.native->ReleaseSnapshot(snap.snapshot);
snap.snapshot = nullptr;
db.native->ReleaseSnapshot(snap.snapshot);
snap.snapshot = nullptr;

auto id = reinterpret_cast<ustore_size_t>(c.id);
db.mutex.lock();
db.snapshots.erase(id);
db.mutex.unlock();
auto id = reinterpret_cast<ustore_size_t>(c.id);
db.mutex.lock();
db.snapshots.erase(id);
db.mutex.unlock();
});
}

void write_one( //
Expand Down Expand Up @@ -624,25 +628,27 @@ void ustore_collection_drop(ustore_collection_drop_t* c_ptr) {
args_combo_k,
"Collections not supported by LevelDB!");

level_db_t& db = *reinterpret_cast<level_db_t*>(c.db);
safe_section("Dropping Collection", c.error, [&] {
level_db_t& db = *reinterpret_cast<level_db_t*>(c.db);

leveldb::WriteBatch batch;
auto it = std::unique_ptr<leveldb::Iterator>(db.native->NewIterator(leveldb::ReadOptions()));
leveldb::WriteBatch batch;
auto it = std::unique_ptr<leveldb::Iterator>(db.native->NewIterator(leveldb::ReadOptions()));

if (c.mode == ustore_drop_keys_vals_k) {
for (it->SeekToFirst(); it->Valid(); it->Next())
batch.Delete(it->key());
}
if (c.mode == ustore_drop_keys_vals_k) {
for (it->SeekToFirst(); it->Valid(); it->Next())
batch.Delete(it->key());
}

else if (c.mode == ustore_drop_vals_k) {
for (it->SeekToFirst(); it->Valid(); it->Next())
batch.Put(it->key(), leveldb::Slice());
}
else if (c.mode == ustore_drop_vals_k) {
for (it->SeekToFirst(); it->Valid(); it->Next())
batch.Put(it->key(), leveldb::Slice());
}

leveldb::WriteOptions options;
options.sync = true;
level_status_t status = db.native->Write(options, &batch);
export_error(status, c.error);
leveldb::WriteOptions options;
options.sync = true;
level_status_t status = db.native->Write(options, &batch);
export_error(status, c.error);
});
}

void ustore_collection_list(ustore_collection_list_t* c_ptr) {
Expand Down
Loading