From d7550b5d63d5333841e4335827c94cbfbe64a597 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Thu, 7 Mar 2024 16:18:52 -0700 Subject: [PATCH] Unlock single thread access to transaction earlier and dump freelist if it is too old --- dependencies/lmdb/libraries/liblmdb/lmdb.h | 2 +- dependencies/lmdb/libraries/liblmdb/mdb.c | 16 +++++++----- src/writer.cpp | 29 +++++++++++----------- 3 files changed, 25 insertions(+), 22 deletions(-) diff --git a/dependencies/lmdb/libraries/liblmdb/lmdb.h b/dependencies/lmdb/libraries/liblmdb/lmdb.h index 7ffc132b0..745460818 100644 --- a/dependencies/lmdb/libraries/liblmdb/lmdb.h +++ b/dependencies/lmdb/libraries/liblmdb/lmdb.h @@ -1067,7 +1067,7 @@ int mdb_env_set_assert(MDB_env *env, MDB_assert_func *func); // typedef int (MDB_check_fd)(const mdb_filehandle_t fd, MDB_env* env); -typedef void (MDB_txn_visible)(const void* ctx); +typedef void (MDB_txn_visible)(const void* ctx, int finished); int mdb_env_set_callback(MDB_env *env, MDB_check_fd *func); int mdb_txn_set_callback(MDB_txn *txn, MDB_txn_visible *func, void* ctx); int mdb_env_set_freespace_options(MDB_env *env, unsigned int max_to_load, unsigned int max_to_retain); diff --git a/dependencies/lmdb/libraries/liblmdb/mdb.c b/dependencies/lmdb/libraries/liblmdb/mdb.c index 717e606c1..07908559e 100644 --- a/dependencies/lmdb/libraries/liblmdb/mdb.c +++ b/dependencies/lmdb/libraries/liblmdb/mdb.c @@ -3978,8 +3978,8 @@ mdb_txn_end(MDB_txn *txn, unsigned mode) mdb_midl_shrink(&txn->mt_free_pgs); env->me_free_pgs = txn->mt_free_pgs; /* me_pgstate: */ - if (env->me_pghead && env->me_pghead[0] > env->me_maxfreepgs_to_retain) { - fprintf(stderr, "Free list too large %u, dumping from memory\n", env->me_pghead[0]); + if (env->me_pghead && env->me_pghead[0] > env->me_maxfreepgs_to_retain || env->me_freelist_end + 100 < txn->mt_txnid) { + //fprintf(stderr, "Free list too large %u or out of date (%u / %u), dumping from memory\n", env->me_pghead ? env->me_pghead[0] : 0, env->me_freelist_end, txn->mt_txnid); // if it is too large, reset it env->me_pghead = NULL; env->me_freelist_start = 0; @@ -4926,9 +4926,10 @@ mdb_txn_commit(MDB_txn *txn) goto fail; // - if (txn->mt_callback) { - MDB_txn_visible* callback = txn->mt_callback; - callback(txn->mt_ctx); + MDB_txn_visible* callback = txn->mt_callback; + void* ctx = txn->mt_ctx; + if (callback) { + callback(ctx, 0); txn->mt_callback = NULL; } if (!F_ISSET(txn->mt_flags, MDB_TXN_NOSYNC)) @@ -4953,7 +4954,7 @@ mdb_txn_commit(MDB_txn *txn) } if ((txn->mt_flags & MDB_NOSYNC) && (env->me_flags & MDB_OVERLAPPINGSYNC)) { size_t txn_id = txn->mt_txnid; - if (dirty_pages * (txn->mt_txnid - env->me_synced_txn_id) > 100) { + if (dirty_pages * (txn->mt_txnid - env->me_synced_txn_id) > 250) { // for bigger txns we wait for the flush before allowing next txn LOCK_MUTEX(rc, env, env->me_sync_mutex); mdb_txn_end(txn, end_mode); @@ -4961,6 +4962,9 @@ mdb_txn_commit(MDB_txn *txn) mdb_txn_end(txn, end_mode); LOCK_MUTEX(rc, env, env->me_sync_mutex); } + if (callback) { + callback(ctx, 1); + } if (rc) return rc; if (env->me_synced_txn_id < txn_id) { // check to see if we still need a sync diff --git a/src/writer.cpp b/src/writer.cpp index 2453f21c0..a4141faa9 100644 --- a/src/writer.cpp +++ b/src/writer.cpp @@ -428,15 +428,19 @@ next_inst: start = instruction++; } bool WriteWorker::threadSafeCallsEnabled = false; -void txn_visible(const void* data) { +void txn_callback(const void* data, int finished) { auto worker = (WriteWorker*) data; - worker->SendUpdate(); - /*if (worker->txn) { - worker->txn = nullptr; - worker->interruptionStatus = 0; - pthread_cond_signal(worker->envForTxn->writingCond); // in case there a sync txn waiting for us - pthread_mutex_unlock(worker->envForTxn->writingLock); - }*/ + if (finished) { + // we don't want to release our lock until *after* the txn lock is released to give other threads a better chance + // at executing next + if (worker->txn) { + worker->txn = nullptr; + worker->interruptionStatus = 0; + pthread_cond_signal(worker->envForTxn->writingCond); // in case there a sync txn waiting for us + pthread_mutex_unlock(worker->envForTxn->writingLock); + } + } else // transaction is visible (to readers), but not unlocked + worker->SendUpdate(); } @@ -485,7 +489,7 @@ void WriteWorker::Write() { progressStatus = 1; #ifdef MDB_OVERLAPPINGSYNC if (envForTxn->jsFlags & MDB_OVERLAPPINGSYNC) { - mdb_txn_set_callback(txn, txn_visible, this); + mdb_txn_set_callback(txn, txn_callback, this); } #endif if (rc || resultCode) { @@ -499,12 +503,7 @@ void WriteWorker::Write() { if (rc == MDB_EMPTY_TXN) rc = 0; #endif - if (txn) { - txn = nullptr; - interruptionStatus = 0; - pthread_cond_signal(envForTxn->writingCond); // in case there a sync txn waiting for us - pthread_mutex_unlock(envForTxn->writingLock); - } + txn_callback(this, 1); if (rc || resultCode) { std::atomic_fetch_or((std::atomic*) instructions, (uint32_t) TXN_HAD_ERROR); if (rc)