diff --git a/src/engine_leveldb.cpp b/src/engine_leveldb.cpp index 19eae136f..9ed5ca774 100644 --- a/src/engine_leveldb.cpp +++ b/src/engine_leveldb.cpp @@ -194,43 +194,47 @@ void ustore_snapshot_list(ustore_snapshot_list_t* c_ptr) { return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); return_error_if_m(c.count && c.ids, c.error, args_combo_k, "Need outputs!"); - linked_memory_lock_t arena = linked_memory(c.arena, c.options, c.error); - return_if_error_m(c.error); + safe_section("Getting Snapshot List", c.error, [&] { + linked_memory_lock_t arena = linked_memory(c.arena, c.options, c.error); + return_if_error_m(c.error); - level_db_t& db = *reinterpret_cast(c.db); - std::lock_guard locker(db.mutex); - std::size_t snapshots_count = db.snapshots.size(); - *c.count = static_cast(snapshots_count); + level_db_t& db = *reinterpret_cast(c.db); + std::lock_guard locker(db.mutex); + std::size_t snapshots_count = db.snapshots.size(); + *c.count = static_cast(snapshots_count); - // For every snapshot we also need to export IDs - auto ids = arena.alloc_or_dummy(snapshots_count, c.error, c.ids); - return_if_error_m(c.error); + // For every snapshot we also need to export IDs + auto ids = arena.alloc_or_dummy(snapshots_count, c.error, c.ids); + return_if_error_m(c.error); - std::size_t i = 0; - for (const auto& [id, _] : db.snapshots) - ids[i++] = id; + std::size_t i = 0; + for (const auto& [id, _] : db.snapshots) + ids[i++] = id; + }); } void ustore_snapshot_create(ustore_snapshot_create_t* c_ptr) { ustore_snapshot_create_t& c = *c_ptr; return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); - level_db_t& db = *reinterpret_cast(c.db); - std::lock_guard locker(db.mutex); - auto it = db.snapshots.find(*c.id); - if (it != db.snapshots.end()) - return_error_if_m(it->second, c.error, args_wrong_k, "Such snapshot already exists!"); + safe_section("Creating Snapshot", c.error, [&] { + level_db_t& db = *reinterpret_cast(c.db); + std::lock_guard locker(db.mutex); + auto it = db.snapshots.find(*c.id); + if (it != db.snapshots.end()) + return_error_if_m(it->second, c.error, args_wrong_k, "Such snapshot already exists!"); - level_snapshot_t* level_snapshot = nullptr; - safe_section("Allocating snapshot handle", c.error, [&] { level_snapshot = new level_snapshot_t(); }); - return_if_error_m(c.error); + level_snapshot_t* level_snapshot = nullptr; + safe_section("Allocating snapshot handle", c.error, [&] { level_snapshot = new level_snapshot_t(); }); + return_if_error_m(c.error); - level_snapshot->snapshot = db.native->GetSnapshot(); - if (!level_snapshot->snapshot) - *c.error = "Couldn't get a snapshot!"; + level_snapshot->snapshot = db.native->GetSnapshot(); + if (!level_snapshot->snapshot) + *c.error = "Couldn't get a snapshot!"; - *c.id = reinterpret_cast(level_snapshot); - db.snapshots[*c.id] = level_snapshot; + *c.id = reinterpret_cast(level_snapshot); + db.snapshots[*c.id] = level_snapshot; + }); } void ustore_snapshot_export(ustore_snapshot_export_t* c_ptr) { @@ -272,25 +276,25 @@ void ustore_snapshot_export(ustore_snapshot_export_t* c_ptr) { } void ustore_snapshot_drop(ustore_snapshot_drop_t* c_ptr) { - if (!c_ptr) - return; ustore_snapshot_drop_t& c = *c_ptr; if (!c.id) return; - level_db_t& db = *reinterpret_cast(c.db); - level_snapshot_t& snap = *reinterpret_cast(c.id); - if (!snap.snapshot) - return; + safe_section("Dropping Snapshot", c.error, [&] { + level_db_t& db = *reinterpret_cast(c.db); + level_snapshot_t& snap = *reinterpret_cast(c.id); + if (!snap.snapshot) + return; - db.native->ReleaseSnapshot(snap.snapshot); - snap.snapshot = nullptr; + db.native->ReleaseSnapshot(snap.snapshot); + snap.snapshot = nullptr; - auto id = reinterpret_cast(c.id); - db.mutex.lock(); - db.snapshots.erase(id); - db.mutex.unlock(); + auto id = reinterpret_cast(c.id); + db.mutex.lock(); + db.snapshots.erase(id); + db.mutex.unlock(); + }); } void write_one( // @@ -624,25 +628,27 @@ void ustore_collection_drop(ustore_collection_drop_t* c_ptr) { args_combo_k, "Collections not supported by LevelDB!"); - level_db_t& db = *reinterpret_cast(c.db); + safe_section("Dropping Collection", c.error, [&] { + level_db_t& db = *reinterpret_cast(c.db); - leveldb::WriteBatch batch; - auto it = std::unique_ptr(db.native->NewIterator(leveldb::ReadOptions())); + leveldb::WriteBatch batch; + auto it = std::unique_ptr(db.native->NewIterator(leveldb::ReadOptions())); - if (c.mode == ustore_drop_keys_vals_k) { - for (it->SeekToFirst(); it->Valid(); it->Next()) - batch.Delete(it->key()); - } + if (c.mode == ustore_drop_keys_vals_k) { + for (it->SeekToFirst(); it->Valid(); it->Next()) + batch.Delete(it->key()); + } - else if (c.mode == ustore_drop_vals_k) { - for (it->SeekToFirst(); it->Valid(); it->Next()) - batch.Put(it->key(), leveldb::Slice()); - } + else if (c.mode == ustore_drop_vals_k) { + for (it->SeekToFirst(); it->Valid(); it->Next()) + batch.Put(it->key(), leveldb::Slice()); + } - leveldb::WriteOptions options; - options.sync = true; - level_status_t status = db.native->Write(options, &batch); - export_error(status, c.error); + leveldb::WriteOptions options; + options.sync = true; + level_status_t status = db.native->Write(options, &batch); + export_error(status, c.error); + }); } void ustore_collection_list(ustore_collection_list_t* c_ptr) { diff --git a/src/engine_rocksdb.cpp b/src/engine_rocksdb.cpp index 83d6b029f..0e31b36c2 100644 --- a/src/engine_rocksdb.cpp +++ b/src/engine_rocksdb.cpp @@ -244,21 +244,24 @@ void ustore_snapshot_list(ustore_snapshot_list_t* c_ptr) { return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); return_error_if_m(c.count && c.ids, c.error, args_combo_k, "Need outputs!"); - linked_memory_lock_t arena = linked_memory(c.arena, c.options, c.error); - return_if_error_m(c.error); + auto list_snap = [&] { + linked_memory_lock_t arena = linked_memory(c.arena, c.options, c.error); + return_if_error_m(c.error); - rocks_db_t& db = *reinterpret_cast(c.db); - std::lock_guard locker(db.mutex); - std::size_t snapshots_count = db.snapshots.size(); - *c.count = static_cast(snapshots_count); + rocks_db_t& db = *reinterpret_cast(c.db); + std::lock_guard locker(db.mutex); + std::size_t snapshots_count = db.snapshots.size(); + *c.count = static_cast(snapshots_count); - // For every snapshot we also need to export IDs - auto ids = arena.alloc_or_dummy(snapshots_count, c.error, c.ids); - return_if_error_m(c.error); + // For every snapshot we also need to export IDs + auto ids = arena.alloc_or_dummy(snapshots_count, c.error, c.ids); + return_if_error_m(c.error); - std::size_t i = 0; - for (const auto& [id, _] : db.snapshots) - ids[i++] = id; + std::size_t i = 0; + for (const auto& [id, _] : db.snapshots) + ids[i++] = id; + }; + safe_section("Getting Snapshot List", c.error, list_snap); } void ustore_snapshot_create(ustore_snapshot_create_t* c_ptr) { @@ -266,29 +269,33 @@ void ustore_snapshot_create(ustore_snapshot_create_t* c_ptr) { ustore_snapshot_create_t& c = *c_ptr; return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); - rocks_db_t& db = *reinterpret_cast(c.db); - std::lock_guard locker(db.mutex); - auto it = db.snapshots.find(*c.id); - if (it != db.snapshots.end()) - return_error_if_m(it->second, c.error, args_wrong_k, "Such snapshot already exists!"); + auto create_snap = [&] { + rocks_db_t& db = *reinterpret_cast(c.db); + std::lock_guard locker(db.mutex); + auto it = db.snapshots.find(*c.id); + if (it != db.snapshots.end()) + return_error_if_m(it->second, c.error, args_wrong_k, "Such snapshot already exists!"); - rocks_snapshot_t* rocks_snapshot = nullptr; - safe_section("Allocating snapshot handle", c.error, [&] { rocks_snapshot = new rocks_snapshot_t(); }); - return_if_error_m(c.error); + rocks_snapshot_t* rocks_snapshot = nullptr; + safe_section("Allocating snapshot handle", c.error, [&] { rocks_snapshot = new rocks_snapshot_t(); }); + return_if_error_m(c.error); - rocks_snapshot->snapshot = db.native->GetSnapshot(); - if (!rocks_snapshot->snapshot) - *c.error = "Couldn't get a snapshot!"; + rocks_snapshot->snapshot = db.native->GetSnapshot(); + if (!rocks_snapshot->snapshot) + *c.error = "Couldn't get a snapshot!"; - *c.id = reinterpret_cast(rocks_snapshot); - db.snapshots[*c.id] = rocks_snapshot; + *c.id = reinterpret_cast(rocks_snapshot); + db.snapshots[*c.id] = rocks_snapshot; + }; + safe_section("Creating Snapshot", c.error, create_snap); } void ustore_snapshot_export(ustore_snapshot_export_t* c_ptr) { + ustore_snapshot_export_t& c = *c_ptr; return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); - try { + auto export_snap = [&] { rocks_db_t& db = *reinterpret_cast(c.db); rocksdb::Checkpoint* chp_ptr = nullptr; @@ -306,33 +313,31 @@ void ustore_snapshot_export(ustore_snapshot_export_t* c_ptr) { rocks_status_t status = chp_ptr->CreateCheckpoint(c.path, 0, &snapshot_id); export_error(status, c.error); - } - catch (...) { - *c.error = "Snapshot Export Failure"; - } + }; + safe_section("Exporting Snapshot", c.error, export_snap); } void ustore_snapshot_drop(ustore_snapshot_drop_t* c_ptr) { - if (!c_ptr) - return; - ustore_snapshot_drop_t& c = *c_ptr; if (!c.id) return; - rocks_db_t& db = *reinterpret_cast(c.db); - rocks_snapshot_t& snap = *reinterpret_cast(c.id); - if (!snap.snapshot) - return; + auto drop_snap = [&] { + rocks_db_t& db = *reinterpret_cast(c.db); + rocks_snapshot_t& snap = *reinterpret_cast(c.id); + if (!snap.snapshot) + return; - db.native->ReleaseSnapshot(snap.snapshot); - snap.snapshot = nullptr; + db.native->ReleaseSnapshot(snap.snapshot); + snap.snapshot = nullptr; - auto id = reinterpret_cast(c.id); - db.mutex.lock(); - db.snapshots.erase(id); - db.mutex.unlock(); + auto id = reinterpret_cast(c.id); + db.mutex.lock(); + db.snapshots.erase(id); + db.mutex.unlock(); + }; + safe_section("Dropping Snapshot", c.error, drop_snap); } void write_one( // @@ -773,21 +778,26 @@ void ustore_collection_create(ustore_collection_create_t* c_ptr) { return_error_if_m(name_len, c.error, args_wrong_k, "Default collection is always present"); return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); - rocks_db_t& db = *reinterpret_cast(c.db); + safe_section("Creating Collection", c.error, [&] { + rocks_db_t& db = *reinterpret_cast(c.db); - for (auto handle : db.columns) { - if (handle) - return_error_if_m(handle->GetName() != c.name, c.error, args_wrong_k, "Such collection already exists!"); - } + for (auto handle : db.columns) { + if (handle) + return_error_if_m(handle->GetName() != c.name, + c.error, + args_wrong_k, + "Such collection already exists!"); + } - rocks_collection_t* collection = nullptr; - auto cf_options = rocksdb::ColumnFamilyOptions(); - cf_options.comparator = &key_comparator_k; - rocks_status_t status = db.native->CreateColumnFamily(std::move(cf_options), c.name, &collection); - if (!export_error(status, c.error)) { - db.columns.push_back(collection); - *c.id = reinterpret_cast(collection); - } + rocks_collection_t* collection = nullptr; + auto cf_options = rocksdb::ColumnFamilyOptions(); + cf_options.comparator = &key_comparator_k; + rocks_status_t status = db.native->CreateColumnFamily(std::move(cf_options), c.name, &collection); + if (!export_error(status, c.error)) { + db.columns.push_back(collection); + *c.id = reinterpret_cast(collection); + } + }); } void ustore_collection_drop(ustore_collection_drop_t* c_ptr) { @@ -801,56 +811,58 @@ void ustore_collection_drop(ustore_collection_drop_t* c_ptr) { args_combo_k, "Default collection can't be invalidated."); - rocks_db_t& db = *reinterpret_cast(c.db); - rocks_collection_t* collection_ptr = reinterpret_cast(c.id); - rocks_collection_t* collection_ptr_to_clear = nullptr; + safe_section("Dropping Collection", c.error, [&] { + rocks_db_t& db = *reinterpret_cast(c.db); + rocks_collection_t* collection_ptr = reinterpret_cast(c.id); + rocks_collection_t* collection_ptr_to_clear = nullptr; - if (c.id == ustore_collection_main_k) - collection_ptr_to_clear = db.native->DefaultColumnFamily(); - else { - for (auto it = db.columns.begin(); it != db.columns.end(); it++) { - collection_ptr_to_clear = reinterpret_cast(*it); - if (collection_ptr_to_clear == collection_ptr) - break; + if (c.id == ustore_collection_main_k) + collection_ptr_to_clear = db.native->DefaultColumnFamily(); + else { + for (auto it = db.columns.begin(); it != db.columns.end(); it++) { + collection_ptr_to_clear = reinterpret_cast(*it); + if (collection_ptr_to_clear == collection_ptr) + break; + } } - } - rocksdb::WriteOptions options; - options.sync = true; - - if (c.mode == ustore_drop_keys_vals_handle_k) { - for (auto it = db.columns.begin(); it != db.columns.end(); it++) { - if (collection_ptr_to_clear == *it) { - rocks_status_t status = db.native->DropColumnFamily(collection_ptr_to_clear); - if (export_error(status, c.error)) - return; - db.columns.erase(it); - break; + rocksdb::WriteOptions options; + options.sync = true; + + if (c.mode == ustore_drop_keys_vals_handle_k) { + for (auto it = db.columns.begin(); it != db.columns.end(); it++) { + if (collection_ptr_to_clear == *it) { + rocks_status_t status = db.native->DropColumnFamily(collection_ptr_to_clear); + if (export_error(status, c.error)) + return; + db.columns.erase(it); + break; + } } + return; + } + else if (c.mode == ustore_drop_keys_vals_k) { + rocksdb::WriteBatch batch; + auto it = std::unique_ptr( + db.native->NewIterator(rocksdb::ReadOptions(), collection_ptr_to_clear)); + for (it->SeekToFirst(); it->Valid(); it->Next()) + batch.Delete(collection_ptr_to_clear, it->key()); + rocks_status_t status = db.native->Write(options, &batch); + export_error(status, c.error); + return; } - return; - } - else if (c.mode == ustore_drop_keys_vals_k) { - rocksdb::WriteBatch batch; - auto it = - std::unique_ptr(db.native->NewIterator(rocksdb::ReadOptions(), collection_ptr_to_clear)); - for (it->SeekToFirst(); it->Valid(); it->Next()) - batch.Delete(collection_ptr_to_clear, it->key()); - rocks_status_t status = db.native->Write(options, &batch); - export_error(status, c.error); - return; - } - else if (c.mode == ustore_drop_vals_k) { - rocksdb::WriteBatch batch; - auto it = - std::unique_ptr(db.native->NewIterator(rocksdb::ReadOptions(), collection_ptr_to_clear)); - for (it->SeekToFirst(); it->Valid(); it->Next()) - batch.Put(collection_ptr_to_clear, it->key(), rocksdb::Slice()); - rocks_status_t status = db.native->Write(options, &batch); - export_error(status, c.error); - return; - } + else if (c.mode == ustore_drop_vals_k) { + rocksdb::WriteBatch batch; + auto it = std::unique_ptr( + db.native->NewIterator(rocksdb::ReadOptions(), collection_ptr_to_clear)); + for (it->SeekToFirst(); it->Valid(); it->Next()) + batch.Put(collection_ptr_to_clear, it->key(), rocksdb::Slice()); + rocks_status_t status = db.native->Write(options, &batch); + export_error(status, c.error); + return; + } + }); } void ustore_collection_list(ustore_collection_list_t* c_ptr) { @@ -859,42 +871,44 @@ void ustore_collection_list(ustore_collection_list_t* c_ptr) { return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); return_error_if_m(c.count && c.names, c.error, args_combo_k, "Need names and outputs!"); - linked_memory_lock_t arena = linked_memory(c.arena, c.options, c.error); - return_if_error_m(c.error); - - rocks_db_t& db = *reinterpret_cast(c.db); - std::size_t collections_count = db.columns.size() - 1; - *c.count = static_cast(collections_count); + safe_section("Getting Collection List", c.error, [&] { + linked_memory_lock_t arena = linked_memory(c.arena, c.options, c.error); + return_if_error_m(c.error); - // Every string will be null-terminated - std::size_t strings_length = 0; - for (auto const& column : db.columns) - strings_length += column->GetName().size() + 1; + rocks_db_t& db = *reinterpret_cast(c.db); + std::size_t collections_count = db.columns.size() - 1; + *c.count = static_cast(collections_count); - auto names = arena.alloc(strings_length, c.error).begin(); - *c.names = names; - return_if_error_m(c.error); + // Every string will be null-terminated + std::size_t strings_length = 0; + for (auto const& column : db.columns) + strings_length += column->GetName().size() + 1; - // For every collection we also need to export IDs and offsets - auto ids = arena.alloc_or_dummy(collections_count, c.error, c.ids); - return_if_error_m(c.error); - auto offs = arena.alloc_or_dummy(collections_count + 1, c.error, c.offsets); - return_if_error_m(c.error); + auto names = arena.alloc(strings_length, c.error).begin(); + *c.names = names; + return_if_error_m(c.error); - std::size_t i = 0; - for (auto const& column : db.columns) { - if (column->GetName() == rocksdb::kDefaultColumnFamilyName) - continue; + // For every collection we also need to export IDs and offsets + auto ids = arena.alloc_or_dummy(collections_count, c.error, c.ids); + return_if_error_m(c.error); + auto offs = arena.alloc_or_dummy(collections_count + 1, c.error, c.offsets); + return_if_error_m(c.error); - auto len = column->GetName().size(); - std::memcpy(names, column->GetName().data(), len); - names[len] = '\0'; - ids[i] = reinterpret_cast(column); + std::size_t i = 0; + for (auto const& column : db.columns) { + if (column->GetName() == rocksdb::kDefaultColumnFamilyName) + continue; + + auto len = column->GetName().size(); + std::memcpy(names, column->GetName().data(), len); + names[len] = '\0'; + ids[i] = reinterpret_cast(column); + offs[i] = static_cast(names - *c.names); + names += len + 1; + ++i; + } offs[i] = static_cast(names - *c.names); - names += len + 1; - ++i; - } - offs[i] = static_cast(names - *c.names); + }); } void ustore_database_control(ustore_database_control_t* c_ptr) { @@ -911,19 +925,21 @@ void ustore_transaction_init(ustore_transaction_init_t* c_ptr) { validate_transaction_begin(c.transaction, c.options, c.error); return_if_error_m(c.error); - bool const safe = c.options & ustore_option_write_flush_k; - rocks_db_t& db = *reinterpret_cast(c.db); - rocks_txn_t& txn = **reinterpret_cast(c.transaction); - rocksdb::OptimisticTransactionOptions txn_options; - txn_options.set_snapshot = false; - rocksdb::WriteOptions options; - options.sync = safe; - options.disableWAL = !safe; - auto new_txn = db.native->BeginTransaction(options, txn_options, &txn); - if (!new_txn) - *c.error = "Couldn't start a transaction!"; - else - *c.transaction = new_txn; + safe_section("Initializing Transaction", c.error, [&] { + bool const safe = c.options & ustore_option_write_flush_k; + rocks_db_t& db = *reinterpret_cast(c.db); + rocks_txn_t& txn = **reinterpret_cast(c.transaction); + rocksdb::OptimisticTransactionOptions txn_options; + txn_options.set_snapshot = false; + rocksdb::WriteOptions options; + options.sync = safe; + options.disableWAL = !safe; + auto new_txn = db.native->BeginTransaction(options, txn_options, &txn); + if (!new_txn) + *c.error = "Couldn't start a transaction!"; + else + *c.transaction = new_txn; + }); } void ustore_transaction_commit(ustore_transaction_commit_t* c_ptr) { @@ -934,18 +950,20 @@ void ustore_transaction_commit(ustore_transaction_commit_t* c_ptr) { validate_transaction_commit(c.transaction, c.options, c.error); return_if_error_m(c.error); - rocks_db_t& db = *reinterpret_cast(c.db); - rocks_txn_t& txn = *reinterpret_cast(c.transaction); + safe_section("Committing Transaction", c.error, [&] { + rocks_db_t& db = *reinterpret_cast(c.db); + rocks_txn_t& txn = *reinterpret_cast(c.transaction); - if (c.sequence_number) - db.mutex.lock(); - rocks_status_t status = txn.Commit(); - export_error(status, c.error); - if (c.sequence_number) { - if (status.ok()) - *c.sequence_number = db.native->GetLatestSequenceNumber(); - db.mutex.unlock(); - } + if (c.sequence_number) + db.mutex.lock(); + rocks_status_t status = txn.Commit(); + export_error(status, c.error); + if (c.sequence_number) { + if (status.ok()) + *c.sequence_number = db.native->GetLatestSequenceNumber(); + db.mutex.unlock(); + } + }); } void ustore_arena_free(ustore_arena_t c_arena) { diff --git a/src/flight_client.cpp b/src/flight_client.cpp index 17f9b7153..60e585d15 100644 --- a/src/flight_client.cpp +++ b/src/flight_client.cpp @@ -1311,31 +1311,33 @@ void ustore_collection_create(ustore_collection_create_t* c_ptr) { auto name_len = c.name ? std::strlen(c.name) : 0; return_error_if_m(name_len, c.error, args_wrong_k, "Default collection is always present"); - rpc_client_t& db = *reinterpret_cast(c.db); - - arf::Action action; - fmt::format_to(std::back_inserter(action.type), "{}?{}={}", kFlightColCreate, kParamCollectionName, c.name); - if (c.config) - action.body = std::make_shared(std::string_view {c.config}); + safe_section("Creating Collection", c.error, [&] { + rpc_client_t& db = *reinterpret_cast(c.db); - ar::Result> maybe_stream; - { - std::lock_guard lk(db.arena_lock); - arrow_mem_pool_t pool(db.arena); - arf::FlightCallOptions options = arrow_call_options(pool); - maybe_stream = db.flight->DoAction(options, action); - } - return_error_if_m(maybe_stream.ok(), c.error, network_k, "Failed to act on Arrow server"); - auto& stream_ptr = maybe_stream.ValueUnsafe(); - ar::Result> maybe_id = stream_ptr->Next(); - return_error_if_m(maybe_id.ok(), c.error, network_k, "No response received"); - - auto& id_ptr = maybe_id.ValueUnsafe(); - return_error_if_m(id_ptr->body->size() == sizeof(ustore_collection_t), - c.error, - error_unknown_k, - "Inadequate response"); - std::memcpy(c.id, id_ptr->body->data(), sizeof(ustore_collection_t)); + arf::Action action; + fmt::format_to(std::back_inserter(action.type), "{}?{}={}", kFlightColCreate, kParamCollectionName, c.name); + if (c.config) + action.body = std::make_shared(std::string_view {c.config}); + + ar::Result> maybe_stream; + { + std::lock_guard lk(db.arena_lock); + arrow_mem_pool_t pool(db.arena); + arf::FlightCallOptions options = arrow_call_options(pool); + maybe_stream = db.flight->DoAction(options, action); + } + return_error_if_m(maybe_stream.ok(), c.error, network_k, "Failed to act on Arrow server"); + auto& stream_ptr = maybe_stream.ValueUnsafe(); + ar::Result> maybe_id = stream_ptr->Next(); + return_error_if_m(maybe_id.ok(), c.error, network_k, "No response received"); + + auto& id_ptr = maybe_id.ValueUnsafe(); + return_error_if_m(id_ptr->body->size() == sizeof(ustore_collection_t), + c.error, + error_unknown_k, + "Inadequate response"); + std::memcpy(c.id, id_ptr->body->data(), sizeof(ustore_collection_t)); + }); } void ustore_collection_drop(ustore_collection_drop_t* c_ptr) { @@ -1343,77 +1345,82 @@ void ustore_collection_drop(ustore_collection_drop_t* c_ptr) { ustore_collection_drop_t& c = *c_ptr; return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); - std::string_view mode; - switch (c.mode) { - case ustore_drop_vals_k: mode = kParamDropModeValues; break; - case ustore_drop_keys_vals_k: mode = kParamDropModeContents; break; - case ustore_drop_keys_vals_handle_k: mode = kParamDropModeCollection; break; - } + safe_section("Dropping Collection", c.error, [&] { + std::string_view mode; + switch (c.mode) { + case ustore_drop_vals_k: mode = kParamDropModeValues; break; + case ustore_drop_keys_vals_k: mode = kParamDropModeContents; break; + case ustore_drop_keys_vals_handle_k: mode = kParamDropModeCollection; break; + } - rpc_client_t& db = *reinterpret_cast(c.db); + rpc_client_t& db = *reinterpret_cast(c.db); - arf::Action action; - fmt::format_to(std::back_inserter(action.type), - "{}?{}=0x{:0>16x}&{}={}", - kFlightColDrop, - kParamCollectionID, - c.id, - kParamDropMode, - mode); - - std::lock_guard lk(db.arena_lock); - arrow_mem_pool_t pool(db.arena); - arf::FlightCallOptions options = arrow_call_options(pool); - ar::Result> maybe_stream = db.flight->DoAction(options, action); - return_error_if_m(maybe_stream.ok(), c.error, network_k, "Failed to act on Arrow server"); + arf::Action action; + fmt::format_to(std::back_inserter(action.type), + "{}?{}=0x{:0>16x}&{}={}", + kFlightColDrop, + kParamCollectionID, + c.id, + kParamDropMode, + mode); + + std::lock_guard lk(db.arena_lock); + arrow_mem_pool_t pool(db.arena); + arf::FlightCallOptions options = arrow_call_options(pool); + ar::Result> maybe_stream = db.flight->DoAction(options, action); + return_error_if_m(maybe_stream.ok(), c.error, network_k, "Failed to act on Arrow server"); + }); } void ustore_collection_list(ustore_collection_list_t* c_ptr) { ustore_collection_list_t& c = *c_ptr; return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); - rpc_client_t& db = *reinterpret_cast(c.db); - if (!(c.options & ustore_option_dont_discard_memory_k)) - db.readers.clear(); - - linked_memory_lock_t arena = linked_memory(c.arena, c.options, c.error); - return_if_error_m(c.error); - ar::Status ar_status; - arrow_mem_pool_t pool(arena); - arf::FlightCallOptions options = arrow_call_options(pool); + safe_section("Getting Collection List", c.error, [&] { + rpc_client_t& db = *reinterpret_cast(c.db); + if (!(c.options & ustore_option_dont_discard_memory_k)) + db.readers.clear(); - arf::Ticket ticket {kFlightListCols}; - if (c.transaction) - fmt::format_to(std::back_inserter(ticket.ticket), - "?{}=0x{:0>16x}", - kParamTransactionID, - std::uintptr_t(c.transaction)); + linked_memory_lock_t arena = linked_memory(c.arena, c.options, c.error); + return_if_error_m(c.error); - auto maybe_stream = db.flight->DoGet(options, ticket); - return_error_if_m(maybe_stream.ok(), c.error, network_k, "Failed to act on Arrow server"); - auto& stream_ptr = maybe_stream.ValueUnsafe(); + ar::Status ar_status; + arrow_mem_pool_t pool(arena); + arf::FlightCallOptions options = arrow_call_options(pool); - auto maybe_table = stream_ptr->ToTable(); - return_error_if_m(maybe_table.ok(), c.error, error_unknown_k, "Failed to create table"); - auto table = maybe_table.ValueUnsafe(); + arf::Ticket ticket {kFlightListCols}; + if (c.transaction) + fmt::format_to(std::back_inserter(ticket.ticket), + "?{}=0x{:0>16x}", + kParamTransactionID, + std::uintptr_t(c.transaction)); - if (c.count) - *c.count = static_cast(table->num_rows()); - if (c.names) { - auto array = std::static_pointer_cast(table->column(1)->chunk(0)); - return_error_if_m(table->column(1)->num_chunks() == 1, c.error, network_k, "Expected one chunk"); - *c.names = (ustore_str_span_t)array->value_data()->data(); - if (c.offsets) - *c.offsets = (ustore_length_t*)array->value_offsets()->data(); - } - if (c.ids) { - auto array = std::static_pointer_cast>(table->column(0)->chunk(0)); - return_error_if_m(table->column(0)->num_chunks() == 1, c.error, network_k, "Expected one chunk"); - *c.ids = (ustore_collection_t*)array->raw_values(); - } + auto maybe_stream = db.flight->DoGet(options, ticket); + return_error_if_m(maybe_stream.ok(), c.error, network_k, "Failed to act on Arrow server"); + auto& stream_ptr = maybe_stream.ValueUnsafe(); + + auto maybe_table = stream_ptr->ToTable(); + return_error_if_m(maybe_table.ok(), c.error, error_unknown_k, "Failed to create table"); + auto table = maybe_table.ValueUnsafe(); + + if (c.count) + *c.count = static_cast(table->num_rows()); + if (c.names) { + auto array = std::static_pointer_cast(table->column(1)->chunk(0)); + return_error_if_m(table->column(1)->num_chunks() == 1, c.error, network_k, "Expected one chunk"); + *c.names = (ustore_str_span_t)array->value_data()->data(); + if (c.offsets) + *c.offsets = (ustore_length_t*)array->value_offsets()->data(); + } + if (c.ids) { + auto array = std::static_pointer_cast>(table->column(0)->chunk(0)); + return_error_if_m(table->column(0)->num_chunks() == 1, c.error, network_k, "Expected one chunk"); + *c.ids = (ustore_collection_t*)array->raw_values(); + } - db.readers.push_back(std::move(stream_ptr)); + db.readers.push_back(std::move(stream_ptr)); + }); } void ustore_database_control(ustore_database_control_t* c_ptr) { @@ -1433,70 +1440,74 @@ void ustore_snapshot_list(ustore_snapshot_list_t* c_ptr) { ustore_snapshot_list_t& c = *c_ptr; return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); - linked_memory_lock_t arena = linked_memory(c.arena, c.options, c.error); - return_if_error_m(c.error); + safe_section("Getting Snapshot List", c.error, [&] { + linked_memory_lock_t arena = linked_memory(c.arena, c.options, c.error); + return_if_error_m(c.error); - ar::Status ar_status; - arrow_mem_pool_t pool(arena); - arf::FlightCallOptions options = arrow_call_options(pool); + ar::Status ar_status; + arrow_mem_pool_t pool(arena); + arf::FlightCallOptions options = arrow_call_options(pool); - rpc_client_t& db = *reinterpret_cast(c.db); + rpc_client_t& db = *reinterpret_cast(c.db); - arf::Ticket ticket {kFlightListSnap}; - ar::Result> maybe_stream = db.flight->DoGet(options, ticket); - return_error_if_m(maybe_stream.ok(), c.error, network_k, "Failed to act on Arrow server"); + arf::Ticket ticket {kFlightListSnap}; + ar::Result> maybe_stream = db.flight->DoGet(options, ticket); + return_error_if_m(maybe_stream.ok(), c.error, network_k, "Failed to act on Arrow server"); - auto& stream_ptr = maybe_stream.ValueUnsafe(); - ar::Result> maybe_table = stream_ptr->ToTable(); + auto& stream_ptr = maybe_stream.ValueUnsafe(); + ar::Result> maybe_table = stream_ptr->ToTable(); - ArrowSchema schema_c; - ArrowArray batch_c; - ar_status = unpack_table(maybe_table, schema_c, batch_c, &pool); - return_error_if_m(ar_status.ok(), c.error, args_combo_k, "Failed to unpack list of snapshots"); + ArrowSchema schema_c; + ArrowArray batch_c; + ar_status = unpack_table(maybe_table, schema_c, batch_c, &pool); + return_error_if_m(ar_status.ok(), c.error, args_combo_k, "Failed to unpack list of snapshots"); - auto ids_column_idx = column_idx(schema_c, kArgSnaps); - return_error_if_m(ids_column_idx, c.error, args_combo_k, "Expecting one column"); + auto ids_column_idx = column_idx(schema_c, kArgSnaps); + return_error_if_m(ids_column_idx, c.error, args_combo_k, "Expecting one column"); - if (c.count) - *c.count = static_cast(batch_c.length); - if (c.ids) - *c.ids = (ustore_collection_t*)batch_c.children[*ids_column_idx]->buffers[1]; + if (c.count) + *c.count = static_cast(batch_c.length); + if (c.ids) + *c.ids = (ustore_collection_t*)batch_c.children[*ids_column_idx]->buffers[1]; + }); } void ustore_snapshot_create(ustore_snapshot_create_t* c_ptr) { ustore_snapshot_create_t& c = *c_ptr; return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); - rpc_client_t& db = *reinterpret_cast(c.db); - - arf::Action action; - fmt::format_to(std::back_inserter(action.type), "{}", kFlightSnapCreate); + safe_section("Creating Snapshot", c.error, [&] { + rpc_client_t& db = *reinterpret_cast(c.db); - ar::Result> maybe_stream; - { - std::lock_guard lk(db.arena_lock); - arrow_mem_pool_t pool(db.arena); - arf::FlightCallOptions options = arrow_call_options(pool); - maybe_stream = db.flight->DoAction(options, action); - } - return_error_if_m(maybe_stream.ok(), c.error, network_k, "Failed to act on Arrow server"); - auto& stream_ptr = maybe_stream.ValueUnsafe(); - ar::Result> maybe_id = stream_ptr->Next(); - return_error_if_m(maybe_id.ok(), c.error, network_k, "No response received"); - - auto& id_ptr = maybe_id.ValueUnsafe(); - return_error_if_m(id_ptr->body->size() == sizeof(ustore_snapshot_t), - c.error, - error_unknown_k, - "Inadequate response"); - std::memcpy(c.id, id_ptr->body->data(), sizeof(ustore_snapshot_t)); + arf::Action action; + fmt::format_to(std::back_inserter(action.type), "{}", kFlightSnapCreate); + + ar::Result> maybe_stream; + { + std::lock_guard lk(db.arena_lock); + arrow_mem_pool_t pool(db.arena); + arf::FlightCallOptions options = arrow_call_options(pool); + maybe_stream = db.flight->DoAction(options, action); + } + return_error_if_m(maybe_stream.ok(), c.error, network_k, "Failed to act on Arrow server"); + auto& stream_ptr = maybe_stream.ValueUnsafe(); + ar::Result> maybe_id = stream_ptr->Next(); + return_error_if_m(maybe_id.ok(), c.error, network_k, "No response received"); + + auto& id_ptr = maybe_id.ValueUnsafe(); + return_error_if_m(id_ptr->body->size() == sizeof(ustore_snapshot_t), + c.error, + error_unknown_k, + "Inadequate response"); + std::memcpy(c.id, id_ptr->body->data(), sizeof(ustore_snapshot_t)); + }); } void ustore_snapshot_export(ustore_snapshot_export_t* c_ptr) { ustore_snapshot_export_t& c = *c_ptr; return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); - try { + safe_section("Exporting Snapshot", c.error, [&] { rpc_client_t& db = *reinterpret_cast(c.db); arf::Action action; @@ -1513,26 +1524,25 @@ void ustore_snapshot_export(ustore_snapshot_export_t* c_ptr) { arf::FlightCallOptions options = arrow_call_options(pool); ar::Result> maybe_stream = db.flight->DoAction(options, action); return_error_if_m(maybe_stream.ok(), c.error, network_k, "Failed to act on Arrow server"); - } - catch (...) { - *c.error = "Snapshot Export Failure"; - } + }); } void ustore_snapshot_drop(ustore_snapshot_drop_t* c_ptr) { ustore_snapshot_drop_t& c = *c_ptr; return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); - rpc_client_t& db = *reinterpret_cast(c.db); + safe_section("Dropping Collection", c.error, [&] { + rpc_client_t& db = *reinterpret_cast(c.db); - arf::Action action; - fmt::format_to(std::back_inserter(action.type), "{}?{}={}", kFlightSnapDrop, kParamSnapshotID, c.id); + arf::Action action; + fmt::format_to(std::back_inserter(action.type), "{}?{}={}", kFlightSnapDrop, kParamSnapshotID, c.id); - std::lock_guard lk(db.arena_lock); - arrow_mem_pool_t pool(db.arena); - arf::FlightCallOptions options = arrow_call_options(pool); - ar::Result> maybe_stream = db.flight->DoAction(options, action); - return_error_if_m(maybe_stream.ok(), c.error, network_k, "Failed to act on Arrow server"); + std::lock_guard lk(db.arena_lock); + arrow_mem_pool_t pool(db.arena); + arf::FlightCallOptions options = arrow_call_options(pool); + ar::Result> maybe_stream = db.flight->DoAction(options, action); + return_error_if_m(maybe_stream.ok(), c.error, network_k, "Failed to act on Arrow server"); + }); } /*********************************************************/ @@ -1545,35 +1555,37 @@ void ustore_transaction_init(ustore_transaction_init_t* c_ptr) { return_error_if_m(c.db, c.error, uninitialized_state_k, "DataBase is uninitialized"); return_error_if_m(c.transaction, c.error, uninitialized_state_k, "Transaction is uninitialized"); - rpc_client_t& db = *reinterpret_cast(c.db); + safe_section("Initializing Transaction", c.error, [&] { + rpc_client_t& db = *reinterpret_cast(c.db); + + arf::Action action; + ustore_size_t txn_id = *reinterpret_cast(c.transaction); + fmt::format_to(std::back_inserter(action.type), "{}?", kFlightTxnBegin); + if (txn_id != 0) + fmt::format_to(std::back_inserter(action.type), "{}=0x{:0>16x}&", kParamTransactionID, txn_id); + if (c.options & ustore_option_transaction_dont_watch_k) + fmt::format_to(std::back_inserter(action.type), "{}&", kParamFlagDontWatch); + + ar::Result> maybe_stream; + { + std::lock_guard lk(db.arena_lock); + arrow_mem_pool_t pool(db.arena); + arf::FlightCallOptions options = arrow_call_options(pool); + maybe_stream = db.flight->DoAction(options, action); + } + return_error_if_m(maybe_stream.ok(), c.error, network_k, "Failed to act on Arrow server"); - arf::Action action; - ustore_size_t txn_id = *reinterpret_cast(c.transaction); - fmt::format_to(std::back_inserter(action.type), "{}?", kFlightTxnBegin); - if (txn_id != 0) - fmt::format_to(std::back_inserter(action.type), "{}=0x{:0>16x}&", kParamTransactionID, txn_id); - if (c.options & ustore_option_transaction_dont_watch_k) - fmt::format_to(std::back_inserter(action.type), "{}&", kParamFlagDontWatch); + auto& stream_ptr = maybe_stream.ValueUnsafe(); + ar::Result> maybe_id = stream_ptr->Next(); + return_error_if_m(maybe_id.ok(), c.error, network_k, "No response received"); - ar::Result> maybe_stream; - { - std::lock_guard lk(db.arena_lock); - arrow_mem_pool_t pool(db.arena); - arf::FlightCallOptions options = arrow_call_options(pool); - maybe_stream = db.flight->DoAction(options, action); - } - return_error_if_m(maybe_stream.ok(), c.error, network_k, "Failed to act on Arrow server"); - - auto& stream_ptr = maybe_stream.ValueUnsafe(); - ar::Result> maybe_id = stream_ptr->Next(); - return_error_if_m(maybe_id.ok(), c.error, network_k, "No response received"); - - auto& id_ptr = maybe_id.ValueUnsafe(); - return_error_if_m(id_ptr->body->size() == sizeof(ustore_transaction_t), - c.error, - error_unknown_k, - "Inadequate response"); - std::memcpy(c.transaction, id_ptr->body->data(), sizeof(ustore_transaction_t)); + auto& id_ptr = maybe_id.ValueUnsafe(); + return_error_if_m(id_ptr->body->size() == sizeof(ustore_transaction_t), + c.error, + error_unknown_k, + "Inadequate response"); + std::memcpy(c.transaction, id_ptr->body->data(), sizeof(ustore_transaction_t)); + }); } void ustore_transaction_commit(ustore_transaction_commit_t* c_ptr) { @@ -1581,22 +1593,24 @@ void ustore_transaction_commit(ustore_transaction_commit_t* c_ptr) { ustore_transaction_commit_t& c = *c_ptr; return_error_if_m(c.transaction, c.error, uninitialized_state_k, "Transaction is uninitialized"); - rpc_client_t& db = *reinterpret_cast(c.db); + safe_section("Commiting Transaction", c.error, [&] { + rpc_client_t& db = *reinterpret_cast(c.db); - arf::Action action; - fmt::format_to(std::back_inserter(action.type), - "{}?{}=0x{:0>16x}&", - kFlightTxnCommit, - kParamTransactionID, - std::uintptr_t(c.transaction)); - if (c.options & ustore_option_write_flush_k) - fmt::format_to(std::back_inserter(action.type), "{}&", kParamFlagFlushWrite); - - std::lock_guard lk(db.arena_lock); - arrow_mem_pool_t pool(db.arena); - arf::FlightCallOptions options = arrow_call_options(pool); - ar::Result> maybe_stream = db.flight->DoAction(options, action); - return_error_if_m(maybe_stream.ok(), c.error, network_k, "Failed to act on Arrow server"); + arf::Action action; + fmt::format_to(std::back_inserter(action.type), + "{}?{}=0x{:0>16x}&", + kFlightTxnCommit, + kParamTransactionID, + std::uintptr_t(c.transaction)); + if (c.options & ustore_option_write_flush_k) + fmt::format_to(std::back_inserter(action.type), "{}&", kParamFlagFlushWrite); + + std::lock_guard lk(db.arena_lock); + arrow_mem_pool_t pool(db.arena); + arf::FlightCallOptions options = arrow_call_options(pool); + ar::Result> maybe_stream = db.flight->DoAction(options, action); + return_error_if_m(maybe_stream.ok(), c.error, network_k, "Failed to act on Arrow server"); + }); } /*********************************************************/