Skip to content

Commit

Permalink
Unlock single thread access to transaction earlier and dump freelist …
Browse files Browse the repository at this point in the history
…if it is too old
  • Loading branch information
kriszyp committed Mar 7, 2024
1 parent e718d44 commit d7550b5
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 22 deletions.
2 changes: 1 addition & 1 deletion dependencies/lmdb/libraries/liblmdb/lmdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -1067,7 +1067,7 @@ int mdb_env_set_assert(MDB_env *env, MDB_assert_func *func);

//<lmdb-js>
typedef int (MDB_check_fd)(const mdb_filehandle_t fd, MDB_env* env);
typedef void (MDB_txn_visible)(const void* ctx);
typedef void (MDB_txn_visible)(const void* ctx, int finished);
int mdb_env_set_callback(MDB_env *env, MDB_check_fd *func);
int mdb_txn_set_callback(MDB_txn *txn, MDB_txn_visible *func, void* ctx);
int mdb_env_set_freespace_options(MDB_env *env, unsigned int max_to_load, unsigned int max_to_retain);
Expand Down
16 changes: 10 additions & 6 deletions dependencies/lmdb/libraries/liblmdb/mdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -3978,8 +3978,8 @@ mdb_txn_end(MDB_txn *txn, unsigned mode)
mdb_midl_shrink(&txn->mt_free_pgs);
env->me_free_pgs = txn->mt_free_pgs;
/* me_pgstate: */
if (env->me_pghead && env->me_pghead[0] > env->me_maxfreepgs_to_retain) {
fprintf(stderr, "Free list too large %u, dumping from memory\n", env->me_pghead[0]);
if (env->me_pghead && env->me_pghead[0] > env->me_maxfreepgs_to_retain || env->me_freelist_end + 100 < txn->mt_txnid) {
//fprintf(stderr, "Free list too large %u or out of date (%u / %u), dumping from memory\n", env->me_pghead ? env->me_pghead[0] : 0, env->me_freelist_end, txn->mt_txnid);
// if it is too large, reset it
env->me_pghead = NULL;
env->me_freelist_start = 0;
Expand Down Expand Up @@ -4926,9 +4926,10 @@ mdb_txn_commit(MDB_txn *txn)
goto fail;

//<lmdb-js>
if (txn->mt_callback) {
MDB_txn_visible* callback = txn->mt_callback;
callback(txn->mt_ctx);
MDB_txn_visible* callback = txn->mt_callback;
void* ctx = txn->mt_ctx;
if (callback) {
callback(ctx, 0);
txn->mt_callback = NULL;
}
if (!F_ISSET(txn->mt_flags, MDB_TXN_NOSYNC))
Expand All @@ -4953,14 +4954,17 @@ mdb_txn_commit(MDB_txn *txn)
}
if ((txn->mt_flags & MDB_NOSYNC) && (env->me_flags & MDB_OVERLAPPINGSYNC)) {
size_t txn_id = txn->mt_txnid;
if (dirty_pages * (txn->mt_txnid - env->me_synced_txn_id) > 100) {
if (dirty_pages * (txn->mt_txnid - env->me_synced_txn_id) > 250) {
// for bigger txns we wait for the flush before allowing next txn
LOCK_MUTEX(rc, env, env->me_sync_mutex);
mdb_txn_end(txn, end_mode);
} else {
mdb_txn_end(txn, end_mode);
LOCK_MUTEX(rc, env, env->me_sync_mutex);
}
if (callback) {
callback(ctx, 1);
}
if (rc)
return rc;
if (env->me_synced_txn_id < txn_id) { // check to see if we still need a sync
Expand Down
29 changes: 14 additions & 15 deletions src/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -428,15 +428,19 @@ next_inst: start = instruction++;
}

bool WriteWorker::threadSafeCallsEnabled = false;
void txn_visible(const void* data) {
void txn_callback(const void* data, int finished) {
auto worker = (WriteWorker*) data;
worker->SendUpdate();
/*if (worker->txn) {
worker->txn = nullptr;
worker->interruptionStatus = 0;
pthread_cond_signal(worker->envForTxn->writingCond); // in case there a sync txn waiting for us
pthread_mutex_unlock(worker->envForTxn->writingLock);
}*/
if (finished) {
// we don't want to release our lock until *after* the txn lock is released to give other threads a better chance
// at executing next
if (worker->txn) {
worker->txn = nullptr;
worker->interruptionStatus = 0;
pthread_cond_signal(worker->envForTxn->writingCond); // in case there a sync txn waiting for us
pthread_mutex_unlock(worker->envForTxn->writingLock);
}
} else // transaction is visible (to readers), but not unlocked
worker->SendUpdate();
}


Expand Down Expand Up @@ -485,7 +489,7 @@ void WriteWorker::Write() {
progressStatus = 1;
#ifdef MDB_OVERLAPPINGSYNC
if (envForTxn->jsFlags & MDB_OVERLAPPINGSYNC) {
mdb_txn_set_callback(txn, txn_visible, this);
mdb_txn_set_callback(txn, txn_callback, this);
}
#endif
if (rc || resultCode) {
Expand All @@ -499,12 +503,7 @@ void WriteWorker::Write() {
if (rc == MDB_EMPTY_TXN)
rc = 0;
#endif
if (txn) {
txn = nullptr;
interruptionStatus = 0;
pthread_cond_signal(envForTxn->writingCond); // in case there a sync txn waiting for us
pthread_mutex_unlock(envForTxn->writingLock);
}
txn_callback(this, 1);
if (rc || resultCode) {
std::atomic_fetch_or((std::atomic<uint32_t>*) instructions, (uint32_t) TXN_HAD_ERROR);
if (rc)
Expand Down

0 comments on commit d7550b5

Please sign in to comment.