diff --git a/BASHO_RELEASES b/BASHO_RELEASES index 1e438fcd..077012d3 100644 --- a/BASHO_RELEASES +++ b/BASHO_RELEASES @@ -1,3 +1,45 @@ +github.com tag 2.0.34 - February 15, 2017 +----------------------------------------- +mv-hot-backup2: - correct MakeTieredDbname() within db/filename.cc + for case where dbname input is blank and fast/slow + already populated in options. Corrects issue + with hot backup in non-tiered storage situations + +github.com tag 2.0.33 - November 21, 2016 +----------------------------------------- +mv-bucket-expiry: - partial branch to enable X-Riak-Meta-Expiry-Base-Seconds + property within enterprise edition + +github.com tag 2.0.32 - November 8, 2016 +---------------------------------------- + - version shipped with Riak 2.2 +** additional race condition hardening when faced with two threads on same iterator +** (one iterating async_iterator_move() and one closing async_iterator_close()) + - wrap async_iterator_move operations with locked CloseMutex + - create and then use new manual SpinLock within RetrieveItrObject + - adapt origin spin lock code for Solaris / SmartOS compiling + +github.com tag 2.0.31 - November 1, 2016 +---------------------------------------- + - includes leveldb 2.0.31 (mv-no-md-expiry & mv-tuning8) +mv-ref-hardening: - series of thread hardening changes + related to AAE use of iterators. Biggest + fix was isolating AAE using one thread to + close while another was still moving, then + defending against it + +github.com tag 2.0.30 - October 11, 2016 +---------------------------------------- + - includes leveldb 2.0.30 (mv-delayed-bloom) + +github.com tag 2.0.28 - September 7, 2016 +----------------------------------------- +Clarify which compression algorithm used for default: + 1. leveldb open source users: lz4 default + 2. eleveldb open source users: snappy default + 3. riak.conf / app.config users of older generation: snappy default + 4. riak.conf from Riak 2.2: lz4 default + github.com tag 2.0.27 - August 22, 2016 --------------------------------------- mv-mem-fences: fix iterator double delete bug in eleveldb and diff --git a/c_src/atoms.h b/c_src/atoms.h index 48a18679..7ec2da11 100644 --- a/c_src/atoms.h +++ b/c_src/atoms.h @@ -2,7 +2,7 @@ // // eleveldb: Erlang Wrapper for LevelDB (http://code.google.com/p/leveldb/) // -// Copyright (c) 2011-2013 Basho Technologies, Inc. All Rights Reserved. +// Copyright (c) 2011-2017 Basho Technologies, Inc. All Rights Reserved. // // This file is provided to you under the Apache License, // Version 2.0 (the "License"); you may not use this file @@ -23,9 +23,16 @@ #ifndef ATOMS_H #define ATOMS_H +extern "C" { + +#include "erl_nif.h" + +} + namespace eleveldb { // Atoms (initialized in on_load) +// This is mirror of non-externs in eleveldb.cc extern ERL_NIF_TERM ATOM_TRUE; extern ERL_NIF_TERM ATOM_FALSE; extern ERL_NIF_TERM ATOM_OK; @@ -35,15 +42,15 @@ extern ERL_NIF_TERM ATOM_BADARG; extern ERL_NIF_TERM ATOM_CREATE_IF_MISSING; extern ERL_NIF_TERM ATOM_ERROR_IF_EXISTS; extern ERL_NIF_TERM ATOM_WRITE_BUFFER_SIZE; -extern ERL_NIF_TERM ATOM_MAX_OPEN_FILES; -extern ERL_NIF_TERM ATOM_BLOCK_SIZE; /* DEPRECATED */ extern ERL_NIF_TERM ATOM_SST_BLOCK_SIZE; +extern ERL_NIF_TERM ATOM_BLOCK_SIZE_STEPS; extern ERL_NIF_TERM ATOM_BLOCK_RESTART_INTERVAL; extern ERL_NIF_TERM ATOM_ERROR_DB_OPEN; extern ERL_NIF_TERM ATOM_ERROR_DB_PUT; extern ERL_NIF_TERM ATOM_NOT_FOUND; extern ERL_NIF_TERM ATOM_VERIFY_CHECKSUMS; extern ERL_NIF_TERM ATOM_FILL_CACHE; +extern ERL_NIF_TERM ATOM_ITERATOR_REFRESH; extern ERL_NIF_TERM ATOM_SYNC; extern ERL_NIF_TERM ATOM_ERROR_DB_DELETE; extern ERL_NIF_TERM ATOM_CLEAR; @@ -57,15 +64,45 @@ extern ERL_NIF_TERM ATOM_FIRST; extern ERL_NIF_TERM ATOM_LAST; extern ERL_NIF_TERM ATOM_NEXT; extern ERL_NIF_TERM ATOM_PREV; +extern ERL_NIF_TERM ATOM_PREFETCH; +extern ERL_NIF_TERM ATOM_PREFETCH_STOP; extern ERL_NIF_TERM ATOM_INVALID_ITERATOR; -extern ERL_NIF_TERM ATOM_CACHE_SIZE; extern ERL_NIF_TERM ATOM_PARANOID_CHECKS; +extern ERL_NIF_TERM ATOM_VERIFY_COMPACTIONS; extern ERL_NIF_TERM ATOM_ERROR_DB_DESTROY; extern ERL_NIF_TERM ATOM_KEYS_ONLY; extern ERL_NIF_TERM ATOM_COMPRESSION; +extern ERL_NIF_TERM ATOM_ON; +extern ERL_NIF_TERM ATOM_OFF; +extern ERL_NIF_TERM ATOM_SNAPPY; +extern ERL_NIF_TERM ATOM_LZ4; extern ERL_NIF_TERM ATOM_ERROR_DB_REPAIR; extern ERL_NIF_TERM ATOM_USE_BLOOMFILTER; - +extern ERL_NIF_TERM ATOM_TOTAL_MEMORY; +extern ERL_NIF_TERM ATOM_TOTAL_LEVELDB_MEM; +extern ERL_NIF_TERM ATOM_TOTAL_LEVELDB_MEM_PERCENT; +extern ERL_NIF_TERM ATOM_BLOCK_CACHE_THRESHOLD; +extern ERL_NIF_TERM ATOM_IS_INTERNAL_DB; +extern ERL_NIF_TERM ATOM_LIMITED_DEVELOPER_MEM; +extern ERL_NIF_TERM ATOM_ELEVELDB_THREADS; +extern ERL_NIF_TERM ATOM_FADVISE_WILLNEED; +extern ERL_NIF_TERM ATOM_DELETE_THRESHOLD; +extern ERL_NIF_TERM ATOM_TIERED_SLOW_LEVEL; +extern ERL_NIF_TERM ATOM_TIERED_FAST_PREFIX; +extern ERL_NIF_TERM ATOM_TIERED_SLOW_PREFIX; +extern ERL_NIF_TERM ATOM_CACHE_OBJECT_WARMING; +extern ERL_NIF_TERM ATOM_EXPIRATION; +extern ERL_NIF_TERM ATOM_DEFAULT_TIME_TO_LIVE; +extern ERL_NIF_TERM ATOM_EXPIRATION_MODE; +extern ERL_NIF_TERM ATOM_ENABLED; +extern ERL_NIF_TERM ATOM_WHOLE_FILE; +extern ERL_NIF_TERM ATOM_PER_ITEM; +extern ERL_NIF_TERM ATOM_INVOKE; +extern ERL_NIF_TERM ATOM_UNLIMITED; +extern ERL_NIF_TERM ATOM_EXPIRY_ENABLED; +extern ERL_NIF_TERM ATOM_EXPIRY_MINUTES; +extern ERL_NIF_TERM ATOM_WHOLE_FILE_EXPIRY; +extern ERL_NIF_TERM ATOM_BUCKET_PROPS; } // namespace eleveldb diff --git a/c_src/build_deps.sh b/c_src/build_deps.sh index c488f2c5..bb5363f9 100755 --- a/c_src/build_deps.sh +++ b/c_src/build_deps.sh @@ -65,13 +65,15 @@ case "$1" in ;; *) + export MACOSX_DEPLOYMENT_TARGET=10.8 + if [ ! -d snappy-$SNAPPY_VSN ]; then tar -xzf snappy-$SNAPPY_VSN.tar.gz - (cd snappy-$SNAPPY_VSN && ./configure --prefix=$BASEDIR/system --libdir=$BASEDIR/system/lib --with-pic) + (cd snappy-$SNAPPY_VSN && ./configure --disable-shared --prefix=$BASEDIR/system --libdir=$BASEDIR/system/lib --with-pic) fi if [ ! -f system/lib/libsnappy.a ]; then - (cd snappy-$SNAPPY_VSN && $MAKE && $MAKE install) + (cd snappy-$SNAPPY_VSN && $MAKE -stdlib=libc++ && $MAKE -stdlib=libc++ install) fi export CFLAGS="$CFLAGS -I $BASEDIR/system/include" diff --git a/c_src/eleveldb.cc b/c_src/eleveldb.cc index c8c7cf3b..de935606 100644 --- a/c_src/eleveldb.cc +++ b/c_src/eleveldb.cc @@ -2,7 +2,7 @@ // // eleveldb: Erlang Wrapper for LevelDB (http://code.google.com/p/leveldb/) // -// Copyright (c) 2011-2016 Basho Technologies, Inc. All Rights Reserved. +// Copyright (c) 2011-2017 Basho Technologies, Inc. All Rights Reserved. // // This file is provided to you under the Apache License, // Version 2.0 (the "License"); you may not use this file @@ -37,18 +37,23 @@ #include "leveldb/db.h" #include "leveldb/comparator.h" #include "leveldb/env.h" +#include "leveldb/expiry.h" #include "leveldb/write_batch.h" #include "leveldb/cache.h" #include "leveldb/filter_policy.h" #include "leveldb/perf_count.h" #define LEVELDB_PLATFORM_POSIX #include "util/hot_threads.h" -#include "leveldb_os/expiry_os.h" +#include "util/expiry_os.h" #ifndef INCL_WORKITEMS_H #include "workitems.h" #endif +#ifndef INCL_ROUTER_H + #include "router.h" +#endif + #ifndef ATOMS_H #include "atoms.h" #endif @@ -73,13 +78,21 @@ static ErlNifFunc nif_funcs[] = {"async_iterator", 3, eleveldb::async_iterator}, {"async_iterator", 4, eleveldb::async_iterator}, - {"async_iterator_move", 3, eleveldb::async_iterator_move} + {"async_iterator_move", 3, eleveldb::async_iterator_move}, + + {"property_cache", 2, eleveldb::property_cache}, + {"property_cache_get", 1, eleveldb::property_cache_get}, + {"property_cache_flush", 0, eleveldb::property_cache_flush}, + {"set_metadata_pid", 2, eleveldb::set_metadata_pid}, + {"remove_metadata_pid", 2, eleveldb::remove_metadata_pid}, + {"get_metadata_pid", 1, eleveldb::get_metadata_pid} }; namespace eleveldb { // Atoms (initialized in on_load) +// This is mirror of externs in atoms.h ERL_NIF_TERM ATOM_TRUE; ERL_NIF_TERM ATOM_FALSE; ERL_NIF_TERM ATOM_OK; @@ -138,12 +151,25 @@ ERL_NIF_TERM ATOM_TIERED_SLOW_LEVEL; ERL_NIF_TERM ATOM_TIERED_FAST_PREFIX; ERL_NIF_TERM ATOM_TIERED_SLOW_PREFIX; ERL_NIF_TERM ATOM_CACHE_OBJECT_WARMING; +ERL_NIF_TERM ATOM_EXPIRATION; +ERL_NIF_TERM ATOM_DEFAULT_TIME_TO_LIVE; +ERL_NIF_TERM ATOM_EXPIRATION_MODE; +ERL_NIF_TERM ATOM_ENABLED; +ERL_NIF_TERM ATOM_WHOLE_FILE; +ERL_NIF_TERM ATOM_PER_ITEM; +ERL_NIF_TERM ATOM_INVOKE; +ERL_NIF_TERM ATOM_UNLIMITED; ERL_NIF_TERM ATOM_EXPIRY_ENABLED; ERL_NIF_TERM ATOM_EXPIRY_MINUTES; ERL_NIF_TERM ATOM_WHOLE_FILE_EXPIRY; -} // namespace eleveldb +ERL_NIF_TERM ATOM_BUCKET_PROPS; +// defining ServiceCallback here in eleveldb.cc to guarantee initialization timing +ServiceCallback gBucketPropCallback; + +} // namespace eleveldb + using std::nothrow; struct eleveldb_itr_handle; @@ -175,6 +201,7 @@ static ERL_NIF_TERM slice_to_binary(ErlNifEnv* env, leveldb::Slice s) return result; } + /** struct for grabbing eleveldb environment options via fold * ... then loading said options into eleveldb_priv_data */ @@ -227,11 +254,11 @@ class eleveldb_priv_data leveldb::HotThreadPool thread_pool; explicit eleveldb_priv_data(EleveldbOptions & Options) - : m_Opts(Options), - thread_pool(Options.m_EleveldbThreads, "Eleveldb", - leveldb::ePerfElevelDirect, leveldb::ePerfElevelQueued, - leveldb::ePerfElevelDequeued, leveldb::ePerfElevelWeighted) - {} + : m_Opts(Options), + thread_pool(Options.m_EleveldbThreads, "Eleveldb", + leveldb::ePerfElevelDirect, leveldb::ePerfElevelQueued, + leveldb::ePerfElevelDequeued, leveldb::ePerfElevelWeighted) + {}; private: eleveldb_priv_data(); // no default constructor @@ -250,7 +277,7 @@ ERL_NIF_TERM parse_init_option(ErlNifEnv* env, ERL_NIF_TERM item, EleveldbOption { if (option[0] == eleveldb::ATOM_TOTAL_LEVELDB_MEM) { - size_t memory_sz; + unsigned long memory_sz; if (enif_get_ulong(env, option[1], &memory_sz)) { if (memory_sz != 0) @@ -347,7 +374,7 @@ ERL_NIF_TERM parse_open_option(ErlNifEnv* env, ERL_NIF_TERM item, leveldb::Optio } else if (option[0] == eleveldb::ATOM_BLOCK_CACHE_THRESHOLD) { - size_t memory_sz; + unsigned long memory_sz; if (enif_get_ulong(env, option[1], &memory_sz)) { if (memory_sz != 0) @@ -485,16 +512,18 @@ ERL_NIF_TERM parse_open_option(ErlNifEnv* env, ERL_NIF_TERM item, leveldb::Optio else if (option[0] == eleveldb::ATOM_EXPIRY_ENABLED) { - if (option[1] == eleveldb::ATOM_TRUE) + if (option[1] == eleveldb::ATOM_ENABLED + || option[1] == eleveldb::ATOM_ON + || option[1] == eleveldb::ATOM_TRUE) { if (NULL==opts.expiry_module.get()) - opts.expiry_module.assign(new leveldb::ExpiryModuleOS); - ((leveldb::ExpiryModuleOS *)opts.expiry_module.get())->expiry_enabled = true; + opts.expiry_module.assign(leveldb::ExpiryModule::CreateExpiryModule(&eleveldb::leveldb_callback)); + ((leveldb::ExpiryModuleOS *)opts.expiry_module.get())->SetExpiryEnabled(true); } // if else { if (NULL!=opts.expiry_module.get()) - ((leveldb::ExpiryModuleOS *)opts.expiry_module.get())->expiry_enabled = false; + ((leveldb::ExpiryModuleOS *)opts.expiry_module.get())->SetExpiryEnabled(false); } // else } // else if else if (option[0] == eleveldb::ATOM_EXPIRY_MINUTES) @@ -503,25 +532,32 @@ ERL_NIF_TERM parse_open_option(ErlNifEnv* env, ERL_NIF_TERM item, leveldb::Optio if (enif_get_ulong(env, option[1], &minutes)) { if (NULL==opts.expiry_module.get()) - opts.expiry_module.assign(new leveldb::ExpiryModuleOS); - ((leveldb::ExpiryModuleOS *)opts.expiry_module.get())->expiry_minutes = minutes; + opts.expiry_module.assign(leveldb::ExpiryModule::CreateExpiryModule(&eleveldb::leveldb_callback)); + ((leveldb::ExpiryModuleOS *)opts.expiry_module.get())->SetExpiryMinutes(minutes); } // if + else if (option[1] == eleveldb::ATOM_UNLIMITED) + { + if (NULL==opts.expiry_module.get()) + opts.expiry_module.assign(leveldb::ExpiryModule::CreateExpiryModule(&eleveldb::leveldb_callback)); + ((leveldb::ExpiryModuleOS *)opts.expiry_module.get())->SetExpiryUnlimited(true); + } // else if + } // else if else if (option[0] == eleveldb::ATOM_WHOLE_FILE_EXPIRY) { - if (option[1] == eleveldb::ATOM_TRUE) + if (option[1] == eleveldb::ATOM_WHOLE_FILE) { if (NULL==opts.expiry_module.get()) - opts.expiry_module.assign(new leveldb::ExpiryModuleOS); - ((leveldb::ExpiryModuleOS *)opts.expiry_module.get())->whole_file_expiry = true; + opts.expiry_module.assign(leveldb::ExpiryModule::CreateExpiryModule(&eleveldb::leveldb_callback)); + ((leveldb::ExpiryModuleOS *)opts.expiry_module.get())->SetWholeFileExpiryEnabled(true); } // if - else + else if (option[1] == eleveldb::ATOM_PER_ITEM) { if (NULL!=opts.expiry_module.get()) - ((leveldb::ExpiryModuleOS *)opts.expiry_module.get())->whole_file_expiry = false; - } // else + ((leveldb::ExpiryModuleOS *)opts.expiry_module.get())->SetWholeFileExpiryEnabled(false); + } // else if + // else take default setting ... do nothing. } // else if - } return eleveldb::ATOM_OK; @@ -612,6 +648,21 @@ ERL_NIF_TERM send_reply(ErlNifEnv *env, ERL_NIF_TERM ref, ERL_NIF_TERM reply) return ATOM_OK; } +// Boilerplate for submitting to the thread queue. +// Takes ownership of the item. assumes allocated through new + +ERL_NIF_TERM +submit_to_thread_queue(eleveldb::WorkTask *work_item, ErlNifEnv* env, ERL_NIF_TERM caller_ref){ + eleveldb_priv_data& data = *static_cast(enif_priv_data(env)); + if(false == data.thread_pool.Submit(work_item)) + { + delete work_item; + return send_reply(env, caller_ref, + enif_make_tuple2(env, eleveldb::ATOM_ERROR, caller_ref)); + } // if + return eleveldb::ATOM_OK; +} + ERL_NIF_TERM async_open( ErlNifEnv* env, @@ -666,15 +717,7 @@ async_open( eleveldb::WorkTask *work_item = new eleveldb::OpenTask(env, caller_ref, db_name, opts); - - if(false == priv.thread_pool.Submit(work_item)) - { - delete work_item; - return send_reply(env, caller_ref, - enif_make_tuple2(env, eleveldb::ATOM_ERROR, caller_ref)); - } - - return eleveldb::ATOM_OK; + return submit_to_thread_queue(work_item, env, caller_ref); } // async_open @@ -705,8 +748,6 @@ async_write( if(NULL == db_ptr->m_Db) return send_reply(env, caller_ref, error_einval(env)); - eleveldb_priv_data& priv = *static_cast(enif_priv_data(env)); - // Construct a write batch: leveldb::WriteBatch* batch = new leveldb::WriteBatch; @@ -727,17 +768,8 @@ async_write( fold(env, argv[3], parse_write_option, *opts); eleveldb::WorkTask* work_item = new eleveldb::WriteTask(env, caller_ref, - db_ptr.get(), batch, opts); - - if(false == priv.thread_pool.Submit(work_item)) - { - // work_item contains "batch" and the delete below gets both memory allocations - delete work_item; - return send_reply(env, caller_ref, - enif_make_tuple2(env, eleveldb::ATOM_ERROR, caller_ref)); - } // if - - return eleveldb::ATOM_OK; + db_ptr, batch, opts); + return submit_to_thread_queue(work_item, env, caller_ref); } @@ -770,18 +802,8 @@ async_get( fold(env, opts_ref, parse_read_option, opts); eleveldb::WorkTask *work_item = new eleveldb::GetTask(env, caller_ref, - db_ptr.get(), key_ref, opts); - - eleveldb_priv_data& priv = *static_cast(enif_priv_data(env)); - - if(false == priv.thread_pool.Submit(work_item)) - { - delete work_item; - return send_reply(env, caller_ref, - enif_make_tuple2(env, eleveldb::ATOM_ERROR, caller_ref)); - } // if - - return eleveldb::ATOM_OK; + db_ptr, key_ref, opts); + return submit_to_thread_queue(work_item, env, caller_ref); } // async_get @@ -817,19 +839,8 @@ async_iterator( fold(env, options_ref, parse_read_option, opts); eleveldb::WorkTask *work_item = new eleveldb::IterTask(env, caller_ref, - db_ptr.get(), keys_only, opts); - - // Now-boilerplate setup (we'll consolidate this pattern soon, I hope): - eleveldb_priv_data& priv = *static_cast(enif_priv_data(env)); - - if(false == priv.thread_pool.Submit(work_item)) - { - delete work_item; - return send_reply(env, caller_ref, enif_make_tuple2(env, ATOM_ERROR, caller_ref)); - } // if - - return ATOM_OK; - + db_ptr, keys_only, opts); + return submit_to_thread_queue(work_item, env, caller_ref); } // async_iterator @@ -849,11 +860,14 @@ async_iterator_move( ReferencePtr itr_ptr; - itr_ptr.assign(ItrObject::RetrieveItrObject(env, itr_handle_ref)); + ItrObject::RetrieveItrObject(env, itr_handle_ref, false, itr_ptr); if(NULL==itr_ptr.get() || 0!=itr_ptr->GetCloseRequested()) return enif_make_badarg(env); + // Nov 2, 2016: Hack against AAE using iterator on two threads + leveldb::MutexLock lock(&itr_ptr->m_CloseMutex); + // Reuse ref from iterator creation const ERL_NIF_TERM& caller_ref = itr_ptr->itr_ref; @@ -873,12 +887,12 @@ async_iterator_move( } // if // debug syslog(LOG_ERR, "move state: %d, %d, %d", - // action, itr_ptr->m_Iter->m_PrefetchStarted, itr_ptr->m_Iter->m_HandoffAtomic); + // action, itr_ptr->m_Wrap.m_PrefetchStarted, itr_ptr->m_Wrap.m_HandoffAtomic); // must set this BEFORE call to compare_and_swap ... or have potential // for an "extra" message coming out of prefetch - prefetch_state = itr_ptr->m_Iter->m_PrefetchStarted; - itr_ptr->m_Iter->m_PrefetchStarted = prefetch_state && (eleveldb::MoveTask::PREFETCH_STOP != action ); + prefetch_state = itr_ptr->m_Wrap.m_PrefetchStarted; + itr_ptr->m_Wrap.m_PrefetchStarted = prefetch_state && (eleveldb::MoveTask::PREFETCH_STOP != action ); // // Three situations: @@ -899,14 +913,14 @@ async_iterator_move( ret_term = enif_make_copy(env, itr_ptr->itr_ref); // force reply to be a message - itr_ptr->m_Iter->m_HandoffAtomic=1; - itr_ptr->m_Iter->m_PrefetchStarted=false; + itr_ptr->m_Wrap.m_HandoffAtomic=1; + itr_ptr->m_Wrap.m_PrefetchStarted=false; } // if // case #2 // before we launch a background job for "next iteration", see if there is a // prefetch waiting for us - else if (leveldb::compare_and_swap(&itr_ptr->m_Iter->m_HandoffAtomic, 0, 1)) + else if (leveldb::compare_and_swap(&itr_ptr->m_Wrap.m_HandoffAtomic, 0, 1)) { // nope, no prefetch ... await a message to erlang queue // NOTE: "else" clause of MoveTask::DoWork() could be running simultaneously @@ -931,8 +945,8 @@ async_iterator_move( // (this is an absolute must since worker thread could change to false if // hits end of key space and its execution overlaps this block's execution) int cas_temp((eleveldb::MoveTask::PREFETCH_STOP != action ) // needed for Solaris CAS - && itr_ptr->m_Iter->Valid()); - leveldb::compare_and_swap(&itr_ptr->m_Iter->m_PrefetchStarted, + && itr_ptr->m_Wrap.Valid()); + leveldb::compare_and_swap(&itr_ptr->m_Wrap.m_PrefetchStarted, prefetch_state, cas_temp); } // else if @@ -943,34 +957,34 @@ async_iterator_move( // why yes there is. copy the key/value info into a return tuple before // we launch the iterator for "next" again // NOTE: worker thread is inactive at this time - if(!itr_ptr->m_Iter->Valid()) + if(!itr_ptr->m_Wrap.Valid()) ret_term=enif_make_tuple2(env, ATOM_ERROR, ATOM_INVALID_ITERATOR); - else if (itr_ptr->m_Iter->m_KeysOnly) - ret_term=enif_make_tuple2(env, ATOM_OK, slice_to_binary(env, itr_ptr->m_Iter->key())); + else if (itr_ptr->keys_only) + ret_term=enif_make_tuple2(env, ATOM_OK, slice_to_binary(env, itr_ptr->m_Wrap.key())); else ret_term=enif_make_tuple3(env, ATOM_OK, - slice_to_binary(env, itr_ptr->m_Iter->key()), - slice_to_binary(env, itr_ptr->m_Iter->value())); + slice_to_binary(env, itr_ptr->m_Wrap.key()), + slice_to_binary(env, itr_ptr->m_Wrap.value())); // reset for next race - itr_ptr->m_Iter->m_HandoffAtomic=0; + itr_ptr->m_Wrap.m_HandoffAtomic=0; // old MoveItem could still be active on its thread, cannot // reuse ... but the current Iterator is good itr_ptr->ReleaseReuseMove(); if (eleveldb::MoveTask::PREFETCH_STOP != action - && itr_ptr->m_Iter->Valid()) + && itr_ptr->m_Wrap.Valid()) { submit_new_request=true; } // if else { submit_new_request=false; - itr_ptr->m_Iter->m_HandoffAtomic=0; - itr_ptr->m_Iter->m_PrefetchStarted=false; + itr_ptr->m_Wrap.m_HandoffAtomic=0; + itr_ptr->m_Wrap.m_PrefetchStarted=false; } // else @@ -983,7 +997,7 @@ async_iterator_move( eleveldb::MoveTask * move_item; move_item = new eleveldb::MoveTask(env, caller_ref, - itr_ptr->m_Iter.get(), action); + itr_ptr, action); // prevent deletes during worker loop move_item->RefInc(); @@ -1046,16 +1060,9 @@ async_close( && db_ptr->ClaimCloseFromCThread()) { eleveldb::WorkTask *work_item = new eleveldb::CloseTask(env, caller_ref, - db_ptr.get()); - - // Now-boilerplate setup (we'll consolidate this pattern soon, I hope): - eleveldb_priv_data& priv = *static_cast(enif_priv_data(env)); + db_ptr); + return submit_to_thread_queue(work_item, env, caller_ref); - if(false == priv.thread_pool.Submit(work_item)) - { - delete work_item; - return send_reply(env, caller_ref, enif_make_tuple2(env, ATOM_ERROR, caller_ref)); - } // if } // if else if (!term_ok) { @@ -1078,29 +1085,23 @@ async_iterator_close( ReferencePtr itr_ptr; - itr_ptr.assign(ItrObject::RetrieveItrObject(env, itr_ref)); + ItrObject::RetrieveItrObject(env, itr_ref, false, itr_ptr); if(NULL==itr_ptr.get() || 0!=itr_ptr->GetCloseRequested()) { - leveldb::gPerfCounters->Inc(leveldb::ePerfDebug4); return enif_make_badarg(env); } + // Nov 2, 2016: Hack against AAE using iterator on two threads + leveldb::MutexLock lock(&itr_ptr->m_CloseMutex); + // verify that Erlang has not called ItrObjectResourceCleanup AND // that a database close has not already started death proceedings if (itr_ptr->ClaimCloseFromCThread()) { eleveldb::WorkTask *work_item = new eleveldb::ItrCloseTask(env, caller_ref, - itr_ptr.get()); - - // Now-boilerplate setup (we'll consolidate this pattern soon, I hope): - eleveldb_priv_data& priv = *static_cast(enif_priv_data(env)); - - if(false == priv.thread_pool.Submit(work_item)) - { - delete work_item; - return send_reply(env, caller_ref, enif_make_tuple2(env, ATOM_ERROR, caller_ref)); - } // if + itr_ptr); + return submit_to_thread_queue(work_item, env, caller_ref); } // if // this close/cleanup call is way late ... bad programmer! @@ -1130,25 +1131,15 @@ async_destroy( ERL_NIF_TERM caller_ref = argv[0]; - eleveldb_priv_data& priv = *static_cast(enif_priv_data(env)); - leveldb::Options *opts = new leveldb::Options; fold(env, argv[2], parse_open_option, *opts); eleveldb::WorkTask *work_item = new eleveldb::DestroyTask(env, caller_ref, db_name, opts); - - if(false == priv.thread_pool.Submit(work_item)) - { - delete work_item; - return send_reply(env, caller_ref, - enif_make_tuple2(env, eleveldb::ATOM_ERROR, caller_ref)); - } - - return eleveldb::ATOM_OK; - + return submit_to_thread_queue(work_item, env, caller_ref); } // async_destroy + } // namespace eleveldb @@ -1272,11 +1263,16 @@ eleveldb_is_empty( static void on_unload(ErlNifEnv *env, void *priv_data) { + // disable service request messages + eleveldb::gBucketPropCallback.Disable(); + + leveldb::Env::Shutdown(); + eleveldb_priv_data *p = static_cast(priv_data); delete p; - leveldb::Env::Shutdown(); -} + return; +} // on_unload static int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) @@ -1355,24 +1351,41 @@ try ATOM(eleveldb::ATOM_TIERED_FAST_PREFIX, "tiered_fast_prefix"); ATOM(eleveldb::ATOM_TIERED_SLOW_PREFIX, "tiered_slow_prefix"); ATOM(eleveldb::ATOM_CACHE_OBJECT_WARMING, "cache_object_warming"); + ATOM(eleveldb::ATOM_EXPIRATION, "expiration"); + ATOM(eleveldb::ATOM_DEFAULT_TIME_TO_LIVE, "default_time_to_live"); + ATOM(eleveldb::ATOM_EXPIRATION_MODE, "expiration_mode"); + ATOM(eleveldb::ATOM_ENABLED, "enabled"); + ATOM(eleveldb::ATOM_WHOLE_FILE, "whole_file"); + ATOM(eleveldb::ATOM_PER_ITEM, "per_item"); + ATOM(eleveldb::ATOM_INVOKE, "invoke"); + ATOM(eleveldb::ATOM_UNLIMITED, "unlimited"); ATOM(eleveldb::ATOM_EXPIRY_ENABLED, "expiry_enabled"); ATOM(eleveldb::ATOM_EXPIRY_MINUTES, "expiry_minutes"); ATOM(eleveldb::ATOM_WHOLE_FILE_EXPIRY, "whole_file_expiry"); + ATOM(eleveldb::ATOM_BUCKET_PROPS, "bucket_props"); #undef ATOM + ERL_NIF_TERM option_list; + bool good_params(false); + + if (enif_is_list(env, load_info)) + { + option_list=load_info; + good_params=true; + } // if + // read options that apply to global eleveldb environment - if(enif_is_list(env, load_info)) + if(good_params) { EleveldbOptions load_options; - fold(env, load_info, parse_init_option, load_options); + fold(env, option_list, parse_init_option, load_options); /* Spin up the thread pool, set up all private data: */ eleveldb_priv_data *priv = new eleveldb_priv_data(load_options); *priv_data = priv; - } // if else diff --git a/c_src/refobjects.cc b/c_src/refobjects.cc index 05f14031..7871b2a0 100644 --- a/c_src/refobjects.cc +++ b/c_src/refobjects.cc @@ -165,7 +165,7 @@ ErlRefObject::RefDec() return(cur_count); -} // DbObject::RefDec +} // ErlRefObject::RefDec /** @@ -341,7 +341,6 @@ DbObject::Shutdown() // if (leveldb::compare_and_swap(itr_ptr->m_ErlangThisPtr, itr_ptr, (ItrObject *)NULL)) if (itr_ptr->ClaimCloseFromCThread()) { - itr_ptr->m_Iter->LogIterator(); itr_ptr->ItrObject::InitiateCloseRequest(); } // if } // if @@ -388,51 +387,23 @@ DbObject::RemoveReference( */ LevelIteratorWrapper::LevelIteratorWrapper( - ItrObject * ItrPtr, - bool KeysOnly, - leveldb::ReadOptions & Options, - ERL_NIF_TERM itr_ref) - : m_DbPtr(ItrPtr->m_DbPtr.get()), m_ItrPtr(ItrPtr), m_Snapshot(NULL), m_Iterator(NULL), - m_HandoffAtomic(0), m_KeysOnly(KeysOnly), m_PrefetchStarted(false), - m_Options(Options), itr_ref(itr_ref), + DbObjectPtr_t & DbPtr, //!< db access for local iterator rebuild + leveldb::ReadOptions & Options) //!< options to use in iterator rebuild + : m_DbPtr(DbPtr), m_Options(Options), + m_Snapshot(NULL), m_Iterator(NULL), + m_HandoffAtomic(0), m_PrefetchStarted(false), m_IteratorStale(0), m_StillUse(true), - m_IteratorCreated(0), m_LastLogReport(0), m_MoveCount(0), m_IsValid(false) + m_IsValid(false) { - struct timeval tv; - - gettimeofday(&tv, NULL); - m_IteratorCreated=tv.tv_sec; - m_LastLogReport=tv.tv_sec; RebuildIterator(); } // LevelIteratorWrapper::LevelIteratorWrapper -/** - * put info about this iterator into leveldb LOG - */ - -void -LevelIteratorWrapper::LogIterator() -{ -#if 0 // available in different branch - struct tm created; - - localtime_r(&m_IteratorCreated, &created); - - leveldb::Log(m_DbPtr->m_Db->GetLogger(), - "Iterator created %d/%d/%d %d:%d:%d, move operations %zd (%p)", - created.tm_mon, created.tm_mday, created.tm_year-100, - created.tm_hour, created.tm_min, created.tm_sec, - m_MoveCount, m_Iterator); -#endif -} // LevelIteratorWrapper::LogIterator() - /** * Iterator management object (Erlang memory) */ - ErlNifResourceType * ItrObject::m_Itr_RESOURCE(NULL); @@ -453,18 +424,21 @@ ItrObject::CreateItrObjectType( void * ItrObject::CreateItrObject( - DbObject * DbPtr, + DbObjectPtr_t & DbPtr, bool KeysOnly, leveldb::ReadOptions & Options) { + ItrObjErlang * erl_ptr; ItrObject * ret_ptr; void * alloc_ptr; // the alloc call initializes the reference count to "one" - alloc_ptr=enif_alloc_resource(m_Itr_RESOURCE, sizeof(ItrObject *)); + alloc_ptr=enif_alloc_resource(m_Itr_RESOURCE, sizeof(ItrObjErlang)); + erl_ptr=(ItrObjErlang *)alloc_ptr; ret_ptr=new ItrObject(DbPtr, KeysOnly, Options); - *(ItrObject **)alloc_ptr=ret_ptr; + erl_ptr->m_ItrPtr=ret_ptr; + erl_ptr->m_SpinLock=0; // manual reference increase to keep active until "eleveldb_iterator_close" called ret_ptr->RefInc(); @@ -478,18 +452,28 @@ ItrObject::CreateItrObject( ItrObject * ItrObject::RetrieveItrObject( ErlNifEnv * Env, - const ERL_NIF_TERM & ItrTerm, bool ItrClosing) + const ERL_NIF_TERM & ItrTerm, + bool ItrClosing, + ItrObjectPtr_t & counted_ptr) { - ItrObject ** itr_ptr_ptr, * ret_ptr; + ItrObjErlang * erl_ptr; + ItrObject * ret_ptr; ret_ptr=NULL; - if (enif_get_resource(Env, ItrTerm, m_Itr_RESOURCE, (void **)&itr_ptr_ptr)) + if (enif_get_resource(Env, ItrTerm, m_Itr_RESOURCE, (void **)&erl_ptr)) { - ret_ptr=*itr_ptr_ptr; + ret_ptr=erl_ptr->m_ItrPtr; + // only continue if close sequence not started if (NULL!=ret_ptr) { + // need to use "const int" instead of literals for + // solaris and smartos compare_and_swap to compile + const int zero(0), one(1); + // lock access ... spin + while(!leveldb::compare_and_swap(&erl_ptr->m_SpinLock, zero, one)) ; + // has close been requested? if (ret_ptr->GetCloseRequested() || (!ItrClosing && ret_ptr->m_DbPtr->GetCloseRequested())) @@ -497,6 +481,12 @@ ItrObject::RetrieveItrObject( // object already closing ret_ptr=NULL; } // if + + // set during spin lock + counted_ptr.assign(ret_ptr); + + // use cas for memory fencing, we own the lock + leveldb::compare_and_swap(&erl_ptr->m_SpinLock, one, zero); } // if } // if @@ -510,17 +500,17 @@ ItrObject::ItrObjectResourceCleanup( ErlNifEnv * Env, void * Arg) { - ItrObject * volatile * erl_ptr; + + ItrObjErlang * erl_ptr; ItrObject * itr_ptr; - erl_ptr=(ItrObject * volatile *)Arg; - itr_ptr=*erl_ptr; + erl_ptr=(ItrObjErlang *)Arg; + itr_ptr=erl_ptr->m_ItrPtr; // is Erlang first to initiate close? - if (leveldb::compare_and_swap(erl_ptr, itr_ptr, (ItrObject *)NULL) + if (leveldb::compare_and_swap(&erl_ptr->m_ItrPtr, itr_ptr, (ItrObject *)NULL) && NULL!=itr_ptr) { - leveldb::gPerfCounters->Inc(leveldb::ePerfDebug3); itr_ptr->InitiateCloseRequest(); } // if @@ -530,13 +520,15 @@ ItrObject::ItrObjectResourceCleanup( ItrObject::ItrObject( - DbObject * DbPtr, + DbObjectPtr_t & DbPtr, bool KeysOnly, leveldb::ReadOptions & Options) - : keys_only(KeysOnly), m_ReadOptions(Options), reuse_move(NULL), + : keys_only(KeysOnly), m_ReadOptions(Options), + m_Wrap(DbPtr, m_ReadOptions), + reuse_move(NULL), m_DbPtr(DbPtr), itr_ref_env(NULL) { - if (NULL!=DbPtr) + if (NULL!=DbPtr.get()) DbPtr->AddReference(this); } // ItrObject::ItrObject @@ -564,6 +556,37 @@ ItrObject::~ItrObject() } // ItrObject::~ItrObject +/** + * matthewv - This is a hack to compensate for Riak AAE + * having two active processes using the same iterator. + * One process attempts a close while the other iterates along. + * This is to help the close succeed. (October 2016) + */ +uint32_t +ItrObject::RefDec() +{ + uint32_t cur_count; + + // Race condition: + // Thread trying to close gets into InitiateCloseRequest() and + // finishes call to Shutdown(). Thread iterating gets far enough + // into async_iterator_move() to not see GetCloseRequest() set, but + // is able to create a new MoveItem within reuse_move. + // This hack knows that async_iterator_move() uses ItrObjectPtr_t that + // holds "this" until the end of the function. ItrObjectPtr_t will + // call RefDec in its destructor. Gives a chance to cleanup a tad. + if (1==GetCloseRequested()) + ReleaseReuseMove(); + + // WARNING: the following call could delete this object. + // make no references to object members afterward + cur_count=ErlRefObject::RefDec(); + + return(cur_count); + +} // ItrObject::RefDec + + void ItrObject::Shutdown() { @@ -572,9 +595,6 @@ ItrObject::Shutdown() // release when move object destructs) ReleaseReuseMove(); - // ItrObject and m_Iter each hold pointers to other, release ours - m_Iter.assign(NULL); - return; } // ItrObject::Shutdown diff --git a/c_src/refobjects.h b/c_src/refobjects.h index d020136b..142b2021 100644 --- a/c_src/refobjects.h +++ b/c_src/refobjects.h @@ -123,7 +123,7 @@ class ReferencePtr : t(NULL) {}; - ReferencePtr(TargetT *_t) + explicit ReferencePtr(TargetT *_t) : t(_t) { if (NULL!=t) @@ -207,43 +207,42 @@ class DbObject : public ErlRefObject DbObject& operator=(const DbObject&); // nocopyassign }; // class DbObject +typedef ReferencePtr DbObjectPtr_t; + /** * A self deleting wrapper to contain leveldb iterator. * Used when an ItrObject needs to skip around and might * have a background MoveItem performing a prefetch on existing * iterator. + * + * Oct 17, 2016: new usage model does not require the Wrapper + * be replaced for reuse after Seeks. Converting to static object */ -class LevelIteratorWrapper : public RefObject +class LevelIteratorWrapper { public: - ReferencePtr m_DbPtr; //!< need to keep db open for delete of this object - ReferencePtr m_ItrPtr; //!< shared itr_ref requires we hold ItrObject + DbObjectPtr_t m_DbPtr; //!< access to db for iterator rebuild + leveldb::ReadOptions & m_Options; //!< ItrObject's ReadOptions struct + // (updates "snapshot" member + const leveldb::Snapshot * m_Snapshot; leveldb::Iterator * m_Iterator; volatile uint32_t m_HandoffAtomic; //!< matthew's atomic foreground/background prefetch flag. - bool m_KeysOnly; //!< only return key values + // m_PrefetchStarted must use uint32_t instead of bool for Solaris CAS operations volatile uint32_t m_PrefetchStarted; //!< true after first prefetch command - leveldb::ReadOptions m_Options; //!< local copy of ItrObject::options - ERL_NIF_TERM itr_ref; //!< shared copy of ItrObject::itr_ref // only used if m_Options.iterator_refresh == true std::string m_RecentKey; //!< Most recent key returned time_t m_IteratorStale; //!< time iterator should refresh bool m_StillUse; //!< true if no error or key end seen - // debug data for hung iteratos - time_t m_IteratorCreated; //!< time constructor called - time_t m_LastLogReport; //!< LOG message was last written - size_t m_MoveCount; //!< number of calls to MoveItem - // read by Erlang thread, maintained by eleveldb MoveItem::DoWork volatile bool m_IsValid; //!< iterator state after last operation - LevelIteratorWrapper(ItrObject * ItrPtr, bool KeysOnly, - leveldb::ReadOptions & Options, ERL_NIF_TERM itr_ref); + LevelIteratorWrapper(DbObjectPtr_t & DbPtr, leveldb::ReadOptions & Options); virtual ~LevelIteratorWrapper() { @@ -294,15 +293,13 @@ class LevelIteratorWrapper : public RefObject m_Iterator = m_DbPtr->m_Db->NewIterator(m_Options); } // RebuildIterator - // hung iterator debug - void LogIterator(); - private: LevelIteratorWrapper(const LevelIteratorWrapper &); // no copy LevelIteratorWrapper& operator=(const LevelIteratorWrapper &); // no assignment }; // LevelIteratorWrapper +typedef ReferencePtr LevelIteratorWrapperPtr_t; /** @@ -311,10 +308,9 @@ class LevelIteratorWrapper : public RefObject class ItrObject : public ErlRefObject { public: - ReferencePtr m_Iter; - bool keys_only; leveldb::ReadOptions m_ReadOptions; //!< local copy, pass to LevelIteratorWrapper only + LevelIteratorWrapper m_Wrap; volatile class MoveTask * reuse_move; //!< iterator work object that is reused instead of lots malloc/free @@ -328,18 +324,21 @@ class ItrObject : public ErlRefObject static ErlNifResourceType* m_Itr_RESOURCE; public: - ItrObject(DbObject *, bool, leveldb::ReadOptions &); + ItrObject(DbObjectPtr_t &, bool, leveldb::ReadOptions &); virtual ~ItrObject(); // needs to perform free_itr + virtual uint32_t RefDec(); + virtual void Shutdown(); static void CreateItrObjectType(ErlNifEnv * Env); - static void * CreateItrObject(DbObject * Db, bool KeysOnly, leveldb::ReadOptions & Options); + static void * CreateItrObject(DbObjectPtr_t & Db, bool KeysOnly, leveldb::ReadOptions & Options); static ItrObject * RetrieveItrObject(ErlNifEnv * Env, const ERL_NIF_TERM & DbTerm, - bool ItrClosing=false); + bool ItrClosing, + ReferencePtr & CountedPtr); static void ItrObjectResourceCleanup(ErlNifEnv *Env, void * Arg); @@ -352,6 +351,21 @@ class ItrObject : public ErlRefObject }; // class ItrObject + +typedef ReferencePtr ItrObjectPtr_t; + + +/** + * Container stored in Erlang heap. Used + * to allow erlang heap to destroy iterator if process(s) holding + * iterator go away. + */ +struct ItrObjErlang +{ + ItrObject * m_ItrPtr; + volatile uint32_t m_SpinLock; +}; + } // namespace eleveldb diff --git a/c_src/router.cc b/c_src/router.cc new file mode 100644 index 00000000..b66f260e --- /dev/null +++ b/c_src/router.cc @@ -0,0 +1,454 @@ +// ------------------------------------------------------------------- +// +// eleveldb: Erlang Wrapper for LevelDB (http://code.google.com/p/leveldb/) +// +// Copyright (c) 2016-2017 Basho Technologies, Inc. All Rights Reserved. +// +// This file is provided to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file +// except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// ------------------------------------------------------------------- + +#include + +#ifndef INCL_ROUTER_H + #include "router.h" +#endif + +#include "leveldb/env.h" // for Log() +#include "util/expiry_os.h" +#include "util/prop_cache.h" // hmm, not in OS builds + +namespace eleveldb { + +static ERL_NIF_TERM parse_expiry_properties(ErlNifEnv* env, ERL_NIF_TERM item, + leveldb::ExpiryModuleOS& opts); + +// based upon error_tuple in c_src/eleveldb.cc +static ERL_NIF_TERM +error_tuple_message( + ErlNifEnv* Env, + ERL_NIF_TERM Error, + const char * Message) +{ + ERL_NIF_TERM err_term, msg_term; + + if (NULL==Message || '\0'==*Message) + Message="(empty message)"; + + msg_term = enif_make_string(Env, Message, ERL_NIF_LATIN1); + + err_term = enif_make_tuple2(Env, eleveldb::ATOM_ERROR, + enif_make_tuple2(Env, Error, msg_term)); + return(err_term); + +} // error_tuple_message + + + +bool +leveldb_callback( + leveldb::EleveldbRouterActions_t Action, + int ParamCount, + const void ** Params) +{ + bool ret_flag(false); + ErlNifPid pid_ptr; + + switch(Action) + { + // 0 - type string, 1 - bucket string, 2 - slice for key + case leveldb::eGetBucketProperties: + { + ERL_NIF_TERM callback_pid; + + // defensive test + if (3==ParamCount && NULL!=Params[1] && NULL!=Params[2] + && gBucketPropCallback.GetPid(callback_pid)) + { + ERL_NIF_TERM bucket_term, type_term, key_term, tuple_term; + ErlNifEnv *msg_env = enif_alloc_env(); + int ret_val; + unsigned char * temp_ptr; + leveldb::Slice * key_slice; + + // build bucket and key first since used by both messages + // (no documented fail case to enif_make_new_binary ... ouch) + temp_ptr=enif_make_new_binary(msg_env,strlen((const char *)Params[1]),&bucket_term); + memcpy(temp_ptr, Params[1], strlen((const char *)Params[1])); + key_slice=(leveldb::Slice *)Params[2]; + temp_ptr=enif_make_new_binary(msg_env,key_slice->size(),&key_term); + memcpy(temp_ptr, key_slice->data(), key_slice->size()); + + // bucket only + if (NULL==Params[0] || '\0'==*(const char *)Params[0]) + { + // make some arrays + tuple_term=enif_make_tuple3(msg_env, + ATOM_INVOKE, + enif_make_list1(msg_env, bucket_term), + enif_make_list1(msg_env, key_term)); + } // if + + // bucket type and bucket + else + { + // build type binary + temp_ptr=enif_make_new_binary(msg_env,strlen((const char *)Params[0]),&type_term); + memcpy(temp_ptr, Params[0], strlen((const char *)Params[0])); + tuple_term=enif_make_tuple2(msg_env, type_term, bucket_term); + // Make some arrays + + tuple_term=enif_make_tuple3(msg_env, + ATOM_INVOKE, + enif_make_list1(msg_env, tuple_term), + enif_make_list1(msg_env, key_term)); + } // else + + ret_val=enif_get_local_pid(msg_env, callback_pid, &pid_ptr); + if (0!=ret_val) + ret_val=enif_send(NULL, &pid_ptr, msg_env, tuple_term); + + ret_flag=(0!=ret_val); + enif_free_env(msg_env); + } // if + break; + } // eGetBucketProperties + + // no default case ... just leave ret_flag as false + + } // switch + + return(ret_flag); + +} // leveldb_callback + + +/** + * Convert Riak Erlang properties into ExpiryModule object. + * Insert object into cache. + */ +ERL_NIF_TERM +property_cache( + ErlNifEnv* env, + int argc, + const ERL_NIF_TERM argv[]) +{ + ERL_NIF_TERM ret_term(ATOM_BADARG); + + // ignore if bad params + if (argc==2 && enif_is_binary(env, argv[0]) && enif_is_list(env, argv[1])) + { + leveldb::ExpiryPropPtr_t cache; + + const ERL_NIF_TERM& composite_bucket = argv[0]; + const ERL_NIF_TERM& properties = argv[1]; + ErlNifBinary key_bin; + + enif_inspect_binary(env, composite_bucket, &key_bin); + leveldb::Slice key_slice((const char *)key_bin.data, key_bin.size); + + // reduce property list to struct we care about + // (use options fold thingie?) + leveldb::ExpiryModuleOS * opt = + (leveldb::ExpiryModuleOS *)leveldb::ExpiryModule::CreateExpiryModule( + &eleveldb::leveldb_callback); + + fold(env, properties, parse_expiry_properties, *opt); + + // send insert command to prop_cache ... insert should broadcast to Wait() + if (cache.Insert(key_slice, opt)) + ret_term=ATOM_OK; + else + ret_term=error_tuple_message(env, ATOM_EINVAL, + "eleveldb::property_cache cache.Insert() failed"); + } // if + else + { + ret_term=error_tuple_message(env, ATOM_BADARG, + "eleveldb::property_cache called with bad arg count or arg types"); + } // else + + return(ret_term); + +} // property_cache + + +static ERL_NIF_TERM +parse_expiry_properties( + ErlNifEnv* env, + ERL_NIF_TERM item, + leveldb::ExpiryModuleOS& opts) +{ + int arity; + const ERL_NIF_TERM* option; + + + if (enif_get_tuple(env, item, &arity, &option) && 2==arity) + { + char buffer[65]={""}; + + // what if property set via json + if (enif_is_binary(env, option[1])) + { + ErlNifBinary bin; + if (0!=enif_inspect_binary(env, option[1], &bin)) + { + strncpy(buffer,(char *)bin.data,(bin.size<65?bin.size:64)); + buffer[bin.size<65?bin.size:64]='\0'; + } //if + else + { + *buffer='\0'; + } // else + } // if + + if (option[0] == eleveldb::ATOM_EXPIRATION) + { + opts.SetExpiryEnabled(option[1] == eleveldb::ATOM_ENABLED + || option[1] == eleveldb::ATOM_ON + || option[1] == eleveldb::ATOM_TRUE + || 0==strcmp(buffer, "enabled") + || 0==strcmp(buffer, "on") + || 0==strcmp(buffer, "true")); + } // else if + else if (option[0] == eleveldb::ATOM_DEFAULT_TIME_TO_LIVE) + { + if (option[1] == eleveldb::ATOM_UNLIMITED + || 0==strcmp(buffer, "unlimited")) + { + opts.SetExpiryUnlimited(true); + } // else if + + // assume it is a cuttlefish duration string + else if ('\0' != *buffer) + { + opts.SetExpiryMinutes(leveldb::CuttlefishDurationMinutes(buffer)); + } // else + } // else if + else if (option[0] == eleveldb::ATOM_EXPIRATION_MODE) + { + if (eleveldb::ATOM_WHOLE_FILE == option[1] + || 0==strcmp(buffer, "whole_file")) + opts.SetWholeFileExpiryEnabled(true); + else if (eleveldb::ATOM_PER_ITEM == option[1] + || 0==strcmp(buffer,"per_item")) + opts.SetWholeFileExpiryEnabled(false); + // else do nothing ... use global setting + + } // else if + } // if + + return eleveldb::ATOM_OK; + +} // parse_expiry_properties + + +/** + * This routine retrieves data from the property cache + * and formats into a list of property pairs. This routine + * is intended for unit tests, not production. + */ +ERL_NIF_TERM +property_cache_get( + ErlNifEnv* env, + int argc, + const ERL_NIF_TERM argv[]) +{ + ERL_NIF_TERM ret_term(ATOM_BADARG); + + // ignore if bad params + if (argc==1 && enif_is_binary(env, argv[0])) + { + leveldb::ExpiryPropPtr_t cache; + + const ERL_NIF_TERM& composite_bucket = argv[0]; + ErlNifBinary key_bin; + + enif_inspect_binary(env, composite_bucket, &key_bin); + leveldb::Slice key_slice((const char *)key_bin.data, key_bin.size); + + if (cache.Lookup(key_slice)) + { + ERL_NIF_TERM enabled_tuple, minutes_tuple, whole_file_tuple; + + // enabled + if (cache->IsExpiryEnabled()) + enabled_tuple=enif_make_tuple2(env, ATOM_EXPIRY_ENABLED, ATOM_ENABLED); + else + enabled_tuple=enif_make_tuple2(env, ATOM_EXPIRY_ENABLED, ATOM_OFF); + + // minutes + if (cache->IsExpiryUnlimited()) + minutes_tuple=enif_make_tuple2(env, ATOM_EXPIRY_MINUTES, ATOM_UNLIMITED); + else + { + ERL_NIF_TERM minutes; + minutes=enif_make_int(env, cache->GetExpiryMinutes()); + minutes_tuple=enif_make_tuple2(env, ATOM_EXPIRY_MINUTES, minutes); + } // else + + // whole file + if (cache->IsWholeFileExpiryEnabled()) + whole_file_tuple=enif_make_tuple2(env, ATOM_EXPIRATION_MODE, ATOM_WHOLE_FILE); + else + whole_file_tuple=enif_make_tuple2(env, ATOM_EXPIRATION_MODE, ATOM_PER_ITEM); + + ret_term=enif_make_list3(env, enabled_tuple, minutes_tuple, whole_file_tuple); + } //if + else + { + ret_term=ATOM_EINVAL; + } // else + } // if + + return(ret_term); + +} // property_cache_get + + +/** + * NEVER USE THIS IN PRODUCTION + * this is to support integration testing + */ +ERL_NIF_TERM +property_cache_flush( + ErlNifEnv* env, + int argc, + const ERL_NIF_TERM argv[]) +{ + leveldb::PropertyCache::Flush(); + + return(ATOM_OK); + +} // property_cache_flush + + +ERL_NIF_TERM +set_metadata_pid( + ErlNifEnv* env, + int argc, + const ERL_NIF_TERM argv[]) +{ + ERL_NIF_TERM ret_term; + + // ignore if bad params + if (argc==2 && enif_is_pid(env, argv[1])) + { + // would use switch(argv[0]) but ATOM_BUCKET_PROPS is actually a + // variable, not a constant + if (argv[0]==ATOM_BUCKET_PROPS) + { + gBucketPropCallback.SetPid(argv[1]); + ret_term=ATOM_OK; + } // if + else + { + ret_term=error_tuple_message(env, ATOM_BADARG, + "eleveldb::set_metadata_pid called with unknown atom"); + } // else + } // if + else + { + ret_term=error_tuple_message(env, ATOM_BADARG, + "eleveldb::set_metadata_pid called with bad arg count or pid"); + } // else + + return(ret_term); + +} // set_metadata_pid + + +ERL_NIF_TERM +remove_metadata_pid( + ErlNifEnv* env, + int argc, + const ERL_NIF_TERM argv[]) +{ + ERL_NIF_TERM ret_term(ATOM_BADARG); + + // ignore if bad params + if (argc==2 && enif_is_pid(env, argv[1])) + { + ERL_NIF_TERM cur_pid; + + // would use switch(argv[0]) but ATOM_BUCKET_PROPS is actually a + // variable, not a constant + if (argv[0]==ATOM_BUCKET_PROPS) + { + if (gBucketPropCallback.GetPid(cur_pid) && argv[1]==cur_pid) + gBucketPropCallback.Disable(); + ret_term=ATOM_OK; + } // if + else + { + ret_term=error_tuple_message(env, ATOM_BADARG, + "eleveldb::remove_metadata_pid called with unknown atom"); + } // else + } // if + else + { + ret_term=error_tuple_message(env, ATOM_BADARG, + "eleveldb::remove_metadata_pid called with bad arg count or pid"); + } // else + + return ret_term; + +} // remove_metadata_pid + + +/** + * get_metadata_pid is used by unit tests to verify + * actions of set_metadata_pid and remove_metadata_pid. + * No production code is known to use this. + */ +ERL_NIF_TERM +get_metadata_pid( + ErlNifEnv* env, + int argc, + const ERL_NIF_TERM argv[]) +{ + ERL_NIF_TERM ret_term(ATOM_BADARG); + + // ignore if bad params + if (argc==1) + { + ERL_NIF_TERM cur_pid; + + // would use switch(argv[0]) but ATOM_BUCKET_PROPS is actually a + // variable, not a constant + if (argv[0]==ATOM_BUCKET_PROPS) + { + if (gBucketPropCallback.GetPid(cur_pid)) + ret_term=cur_pid; + else + ret_term=ATOM_EINVAL; + } // if + else + { + ret_term=ATOM_BADARG; + } // else + } // if + else + { + ret_term=ATOM_BADARG; + } // else + + return ret_term; + +} // get_metadata_pid + +} // namespace eleveldb + + diff --git a/c_src/router.h b/c_src/router.h new file mode 100644 index 00000000..03888c30 --- /dev/null +++ b/c_src/router.h @@ -0,0 +1,72 @@ +// ------------------------------------------------------------------- +// +// eleveldb: Erlang Wrapper for LevelDB (http://code.google.com/p/leveldb/) +// +// Copyright (c) 2016 Basho Technologies, Inc. All Rights Reserved. +// +// This file is provided to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file +// except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// ------------------------------------------------------------------- + +#ifndef INCL_ROUTER_H +#define INCL_ROUTER_H + +#ifndef ATOMS_H + #include "atoms.h" +#endif + +// options.h brings in expiry.h +#include "leveldb/options.h" +#include "port/port.h" +#include "util/mutexlock.h" + +namespace eleveldb { + +// leveldb's interface to Riak functions +bool leveldb_callback(leveldb::EleveldbRouterActions_t, int , const void **); + +ERL_NIF_TERM property_cache(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +ERL_NIF_TERM property_cache_get(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +ERL_NIF_TERM property_cache_flush(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +ERL_NIF_TERM set_metadata_pid(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +ERL_NIF_TERM remove_metadata_pid(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +ERL_NIF_TERM get_metadata_pid(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); + + +struct ServiceCallback +{ + leveldb::port::Spin m_RaceLock; // protection against Erlang vs eleveldb thread races + bool m_PidSet; // true if Riak service initialized pid + ERL_NIF_TERM m_CallbackPid; // destination for callback messages + + ServiceCallback() + : m_PidSet(false), m_CallbackPid(0) {}; + + ~ServiceCallback() {m_PidSet=false;}; + + void SetPid(const ERL_NIF_TERM & Pid) + {leveldb::SpinLock l(&m_RaceLock); m_CallbackPid=Pid; m_PidSet=true;}; + bool GetPid(ERL_NIF_TERM & Pid) + {leveldb::SpinLock l(&m_RaceLock); Pid=m_CallbackPid; return(m_PidSet);}; + void Disable() + {leveldb::SpinLock l(&m_RaceLock); m_PidSet=false;}; +}; + +extern ServiceCallback gBucketPropCallback; + +} // namespace eleveldb + + +#endif // INCL_ROUTER_H diff --git a/c_src/workitems.cc b/c_src/workitems.cc index 4ef4d11e..911a45cd 100644 --- a/c_src/workitems.cc +++ b/c_src/workitems.cc @@ -79,7 +79,7 @@ WorkTask::WorkTask(ErlNifEnv *caller_env, ERL_NIF_TERM& caller_ref) } // WorkTask::WorkTask -WorkTask::WorkTask(ErlNifEnv *caller_env, ERL_NIF_TERM& caller_ref, DbObject * DbPtr) +WorkTask::WorkTask(ErlNifEnv *caller_env, ERL_NIF_TERM& caller_ref, DbObjectPtr_t & DbPtr) : m_DbPtr(DbPtr), terms_set(false) { if (NULL!=caller_env) @@ -178,67 +178,181 @@ OpenTask::DoWork() } // OpenTask::DoWork() +/** + * WriteTask functions + */ + +WriteTask::WriteTask(ErlNifEnv* _owner_env, ERL_NIF_TERM _caller_ref, + DbObjectPtr_t & _db_handle, + leveldb::WriteBatch* _batch, + leveldb::WriteOptions* _options) + : WorkTask(_owner_env, _caller_ref, _db_handle), + batch(_batch), + options(_options) +{} + +WriteTask::~WriteTask() +{ + delete batch; + delete options; +} + +work_result +WriteTask::DoWork() +{ + leveldb::Status status = m_DbPtr->m_Db->Write(*options, batch); + + return (status.ok() ? work_result(ATOM_OK) : work_result(local_env(), ATOM_ERROR_DB_WRITE, status)); +} + +/** + * GetTask functions + */ + +GetTask::GetTask(ErlNifEnv *_caller_env, + ERL_NIF_TERM _caller_ref, + DbObjectPtr_t & _db_handle, + ERL_NIF_TERM _key_term, + leveldb::ReadOptions &_options) + : WorkTask(_caller_env, _caller_ref, _db_handle), + options(_options) +{ + ErlNifBinary key; + + enif_inspect_binary(_caller_env, _key_term, &key); + m_Key.assign((const char *)key.data, key.size); +} + +GetTask::~GetTask() {} + +work_result +GetTask::DoWork() +{ + ERL_NIF_TERM value_bin; + BinaryValue value(local_env(), value_bin); + leveldb::Slice key_slice(m_Key); + + leveldb::Status status = m_DbPtr->m_Db->Get(options, key_slice, &value); + + if(!status.ok()){ + if ( status.IsNotFound() ) + return work_result(ATOM_NOT_FOUND); + else + return work_result(local_env(), ATOM_ERROR, status); + } + return work_result(local_env(), ATOM_OK, value_bin); +} + +/** + * IterTask functions + */ + +IterTask::IterTask(ErlNifEnv *_caller_env, + ERL_NIF_TERM _caller_ref, + DbObjectPtr_t & _db_handle, + const bool _keys_only, + leveldb::ReadOptions &_options) + : WorkTask(_caller_env, _caller_ref, _db_handle), + keys_only(_keys_only), options(_options) +{} + +IterTask::~IterTask() {} + +work_result +IterTask::DoWork() +{ + ItrObject * itr_ptr=0; + void * itr_ptr_ptr=0; + + // NOTE: transferring ownership of options to ItrObject + itr_ptr_ptr=ItrObject::CreateItrObject(m_DbPtr, keys_only, options); + + // Copy caller_ref to reuse in future iterator_move calls + itr_ptr=((ItrObjErlang*)itr_ptr_ptr)->m_ItrPtr; + itr_ptr->itr_ref_env = enif_alloc_env(); + itr_ptr->itr_ref = enif_make_copy(itr_ptr->itr_ref_env, caller_ref()); + + ERL_NIF_TERM result = enif_make_resource(local_env(), itr_ptr_ptr); + + // release reference created during CreateItrObject() + enif_release_resource(itr_ptr_ptr); + + return work_result(local_env(), ATOM_OK, result); +} // operator() /** * MoveTask functions */ +// Constructor with no seek target: + +MoveTask::MoveTask(ErlNifEnv *_caller_env, ERL_NIF_TERM _caller_ref, + ItrObjectPtr_t & Iter, action_t& _action) + : WorkTask(NULL, _caller_ref, Iter->m_DbPtr), + m_Itr(Iter), action(_action) +{ + // special case construction + local_env_=NULL; + enif_self(_caller_env, &local_pid); +} + +// Constructor with seek target: + +MoveTask::MoveTask(ErlNifEnv *_caller_env, ERL_NIF_TERM _caller_ref, + ItrObjectPtr_t & Iter, action_t& _action, + std::string& _seek_target) + : WorkTask(NULL, _caller_ref, Iter->m_DbPtr), + m_Itr(Iter), action(_action), + seek_target(_seek_target) +{ + // special case construction + local_env_=NULL; + enif_self(_caller_env, &local_pid); +} + +MoveTask::~MoveTask() {}; + work_result MoveTask::DoWork() { leveldb::Iterator* itr; - itr=m_ItrWrap->get(); + itr=m_Itr->m_Wrap.get(); - ++m_ItrWrap->m_MoveCount; // // race condition of prefetch clearing db iterator while // async_iterator_move looking at it. // // iterator_refresh operation - if (m_ItrWrap->m_Options.iterator_refresh && m_ItrWrap->m_StillUse) + if (m_Itr->m_Wrap.m_Options.iterator_refresh && m_Itr->m_Wrap.m_StillUse) { struct timeval tv; gettimeofday(&tv, NULL); - if (m_ItrWrap->m_IteratorStale < tv.tv_sec || NULL==itr) + if (m_Itr->m_Wrap.m_IteratorStale < tv.tv_sec || NULL==itr) { - m_ItrWrap->RebuildIterator(); - itr=m_ItrWrap->get(); + m_Itr->m_Wrap.RebuildIterator(); + itr=m_Itr->m_Wrap.get(); // recover position - if (NULL!=itr && 0!=m_ItrWrap->m_RecentKey.size()) + if (NULL!=itr && 0!=m_Itr->m_Wrap.m_RecentKey.size()) { - leveldb::Slice key_slice(m_ItrWrap->m_RecentKey); + leveldb::Slice key_slice(m_Itr->m_Wrap.m_RecentKey); itr->Seek(key_slice); - m_ItrWrap->m_StillUse=itr->Valid(); - if (!m_ItrWrap->m_StillUse) + m_Itr->m_Wrap.m_StillUse=itr->Valid(); + if (!m_Itr->m_Wrap.m_StillUse) { itr=NULL; - m_ItrWrap->PurgeIterator(); + m_Itr->m_Wrap.PurgeIterator(); } // if } // if } // if } // if - // hung iterator debug - { - struct timeval tv; - - gettimeofday(&tv, NULL); - - // 14400 is 4 hours in seconds ... 60*60*4 - if ((m_ItrWrap->m_LastLogReport + 14400) < tv.tv_sec && NULL!=m_ItrWrap->get()) - { - m_ItrWrap->LogIterator(); - m_ItrWrap->m_LastLogReport=tv.tv_sec; - } // if - } - // back to normal operation if(NULL == itr) return work_result(local_env(), ATOM_ERROR, ATOM_ITERATOR_CLOSED); @@ -273,30 +387,30 @@ MoveTask::DoWork() } // switch // set state for Erlang side to read - m_ItrWrap->SetValid(itr->Valid()); + m_Itr->m_Wrap.SetValid(itr->Valid()); // Post processing before telling the world the results // (while only one thread might be looking at objects) - if (m_ItrWrap->m_Options.iterator_refresh) + if (m_Itr->m_Wrap.m_Options.iterator_refresh) { if (itr->Valid()) { - m_ItrWrap->m_RecentKey.assign(itr->key().data(), itr->key().size()); + m_Itr->m_Wrap.m_RecentKey.assign(itr->key().data(), itr->key().size()); } // if else if (PREFETCH_STOP!=action) { // release iterator now, not later - m_ItrWrap->m_StillUse=false; - m_ItrWrap->PurgeIterator(); + m_Itr->m_Wrap.m_StillUse=false; + m_Itr->m_Wrap.PurgeIterator(); itr=NULL; } // else } // if // debug syslog(LOG_ERR, " MoveItem::DoWork() %d, %d, %d", - // action, m_ItrWrap->m_StillUse, m_ItrWrap->m_HandoffAtomic); + // action, m_Itr->m_Wrap.m_StillUse, m_Itr->m_Wrap.m_HandoffAtomic); // who got back first, us or the erlang loop - if (leveldb::compare_and_swap(&m_ItrWrap->m_HandoffAtomic, 0, 1)) + if (leveldb::compare_and_swap(&m_Itr->m_Wrap.m_HandoffAtomic, 0, 1)) { // this is prefetch of next iteration. It returned faster than actual // request to retrieve it. Stop and wait for erlang to catch up. @@ -305,15 +419,15 @@ MoveTask::DoWork() else { // setup next race for the response - m_ItrWrap->m_HandoffAtomic=0; + m_Itr->m_Wrap.m_HandoffAtomic=0; if(NULL!=itr && itr->Valid()) { - if (PREFETCH==action && m_ItrWrap->m_PrefetchStarted) + if (PREFETCH==action && m_Itr->m_Wrap.m_PrefetchStarted) m_ResubmitWork=true; // erlang is waiting, send message - if(m_ItrWrap->m_KeysOnly) + if(m_Itr->keys_only) return work_result(local_env(), ATOM_OK, slice_to_binary(local_env(), itr->key())); return work_result(local_env(), ATOM_OK, @@ -324,7 +438,7 @@ MoveTask::DoWork() { // using compare_and_swap as a hardware locking "set to false" // (a little heavy handed, but not executed often) - leveldb::compare_and_swap(&m_ItrWrap->m_PrefetchStarted, (int)true, (int)false); + leveldb::compare_and_swap(&m_Itr->m_Wrap.m_PrefetchStarted, (int)true, (int)false); return work_result(local_env(), ATOM_ERROR, ATOM_INVALID_ITERATOR); } // else @@ -342,7 +456,7 @@ MoveTask::local_env() if (!terms_set) { - caller_ref_term = enif_make_copy(local_env_, m_ItrWrap->itr_ref); + caller_ref_term = enif_make_copy(local_env_, m_Itr->itr_ref); caller_pid_term = enif_make_pid(local_env_, &local_pid); terms_set=true; } // if @@ -374,6 +488,83 @@ MoveTask::recycle() } // MoveTask::recycle +/** + * CloseTask functions + */ + +CloseTask::CloseTask(ErlNifEnv* _owner_env, ERL_NIF_TERM _caller_ref, + DbObjectPtr_t & _db_handle) + : WorkTask(_owner_env, _caller_ref, _db_handle) +{} + +CloseTask::~CloseTask() +{ +} + +work_result +CloseTask::DoWork() +{ + DbObject * db_ptr; + + // get db pointer then clear reference count to it + db_ptr=m_DbPtr.get(); + m_DbPtr.assign(NULL); + + if (NULL!=db_ptr) + { + // set closing flag, this is blocking + db_ptr->InitiateCloseRequest(); + + // db_ptr no longer valid + db_ptr=NULL; + + return(work_result(ATOM_OK)); + } // if + else + { + return work_result(local_env(), ATOM_ERROR, ATOM_BADARG); + } // else +} + +/** + * ItrCloseTask functions + */ + +ItrCloseTask::ItrCloseTask(ErlNifEnv* _owner_env, ERL_NIF_TERM _caller_ref, + ItrObjectPtr_t & _itr_handle) + : WorkTask(_owner_env, _caller_ref), + m_ItrPtr(_itr_handle) +{} + +ItrCloseTask::~ItrCloseTask() +{ +} + +work_result +ItrCloseTask::DoWork() +{ + ItrObject * itr_ptr; + + // get iterator pointer then clear reference count to it + itr_ptr=m_ItrPtr.get(); + m_ItrPtr.assign(NULL); + + if (NULL!=itr_ptr) + { + // set closing flag, this is blocking + itr_ptr->InitiateCloseRequest(); + + // itr_ptr no longer valid + itr_ptr=NULL; + + return(work_result(ATOM_OK)); + } // if + else + { + return work_result(local_env(), ATOM_ERROR, ATOM_BADARG); + } // else +} + /** * DestroyTask functions */ diff --git a/c_src/workitems.h b/c_src/workitems.h index fb84284a..475b7512 100644 --- a/c_src/workitems.h +++ b/c_src/workitems.h @@ -69,7 +69,7 @@ class WorkTask : public leveldb::ThreadTask public: WorkTask(ErlNifEnv *caller_env, ERL_NIF_TERM& caller_ref); - WorkTask(ErlNifEnv *caller_env, ERL_NIF_TERM& caller_ref, DbObject * DbPtr); + WorkTask(ErlNifEnv *caller_env, ERL_NIF_TERM& caller_ref, DbObjectPtr_t & DbPtr); virtual ~WorkTask(); @@ -132,32 +132,23 @@ class WriteTask : public WorkTask { protected: leveldb::WriteBatch* batch; - leveldb::WriteOptions* options; + leveldb::WriteOptions* options; public: - WriteTask(ErlNifEnv* _owner_env, ERL_NIF_TERM _caller_ref, - DbObject * _db_handle, + DbObjectPtr_t & _db_handle, leveldb::WriteBatch* _batch, - leveldb::WriteOptions* _options) - : WorkTask(_owner_env, _caller_ref, _db_handle), - batch(_batch), - options(_options) - {} + leveldb::WriteOptions* _options); - virtual ~WriteTask() - { - delete batch; - delete options; - } + virtual ~WriteTask(); protected: - virtual work_result DoWork() - { - leveldb::Status status = m_DbPtr->m_Db->Write(*options, batch); + virtual work_result DoWork(); - return (status.ok() ? work_result(ATOM_OK) : work_result(local_env(), ATOM_ERROR_DB_WRITE, status)); - } +private: + WriteTask(); + WriteTask(const WriteTask &); + WriteTask & operator=(const WriteTask &); }; // class WriteTask @@ -207,36 +198,13 @@ class GetTask : public WorkTask public: GetTask(ErlNifEnv *_caller_env, ERL_NIF_TERM _caller_ref, - DbObject *_db_handle, + DbObjectPtr_t & _db_handle, ERL_NIF_TERM _key_term, - leveldb::ReadOptions &_options) - : WorkTask(_caller_env, _caller_ref, _db_handle), - options(_options) - { - ErlNifBinary key; + leveldb::ReadOptions &_options); - enif_inspect_binary(_caller_env, _key_term, &key); - m_Key.assign((const char *)key.data, key.size); - } - - virtual ~GetTask() - { - } - -protected: - virtual work_result DoWork() - { - ERL_NIF_TERM value_bin; - BinaryValue value(local_env(), value_bin); - leveldb::Slice key_slice(m_Key); + virtual ~GetTask(); - leveldb::Status status = m_DbPtr->m_Db->Get(options, key_slice, &value); - - if(!status.ok()) - return work_result(ATOM_NOT_FOUND); - - return work_result(local_env(), ATOM_OK, value_bin); - } + virtual work_result DoWork(); }; // class GetTask @@ -256,41 +224,13 @@ class IterTask : public WorkTask public: IterTask(ErlNifEnv *_caller_env, ERL_NIF_TERM _caller_ref, - DbObject *_db_handle, + DbObjectPtr_t & _db_handle, const bool _keys_only, - leveldb::ReadOptions &_options) - : WorkTask(_caller_env, _caller_ref, _db_handle), - keys_only(_keys_only), options(_options) - {} - - virtual ~IterTask() - { - } - -protected: - virtual work_result DoWork() - { - ItrObject * itr_ptr; - void * itr_ptr_ptr; - - // NOTE: transfering ownership of options to ItrObject - itr_ptr_ptr=ItrObject::CreateItrObject(m_DbPtr.get(), keys_only, options); + leveldb::ReadOptions &_options); - // Copy caller_ref to reuse in future iterator_move calls - itr_ptr=*(ItrObject**)itr_ptr_ptr; - itr_ptr->itr_ref_env = enif_alloc_env(); - itr_ptr->itr_ref = enif_make_copy(itr_ptr->itr_ref_env, caller_ref()); + virtual ~IterTask(); - itr_ptr->m_Iter.assign(new LevelIteratorWrapper(itr_ptr, keys_only, - options, itr_ptr->itr_ref)); - - ERL_NIF_TERM result = enif_make_resource(local_env(), itr_ptr_ptr); - - // release reference created during CreateItrObject() - enif_release_resource(itr_ptr_ptr); - - return work_result(local_env(), ATOM_OK, result); - } + virtual work_result DoWork(); }; // class IterTask @@ -301,7 +241,7 @@ class MoveTask : public WorkTask typedef enum { FIRST, LAST, NEXT, PREV, SEEK, PREFETCH, PREFETCH_STOP } action_t; protected: - ReferencePtr m_ItrWrap; //!< access to database, and holds reference + ItrObjectPtr_t m_Itr; public: action_t action; @@ -311,28 +251,14 @@ class MoveTask : public WorkTask // No seek target: MoveTask(ErlNifEnv *_caller_env, ERL_NIF_TERM _caller_ref, - LevelIteratorWrapper * IterWrap, action_t& _action) - : WorkTask(NULL, _caller_ref, IterWrap->m_DbPtr.get()), - m_ItrWrap(IterWrap), action(_action) - { - // special case construction - local_env_=NULL; - enif_self(_caller_env, &local_pid); - } + ItrObjectPtr_t & Iter, action_t& _action); // With seek target: MoveTask(ErlNifEnv *_caller_env, ERL_NIF_TERM _caller_ref, - LevelIteratorWrapper * IterWrap, action_t& _action, - std::string& _seek_target) - : WorkTask(NULL, _caller_ref, IterWrap->m_DbPtr.get()), - m_ItrWrap(IterWrap), action(_action), - seek_target(_seek_target) - { - // special case construction - local_env_=NULL; - enif_self(_caller_env, &local_pid); - } - virtual ~MoveTask() {}; + ItrObjectPtr_t & Iter, action_t& _action, + std::string& _seek_target); + + virtual ~MoveTask(); virtual ErlNifEnv *local_env(); @@ -355,38 +281,11 @@ class CloseTask : public WorkTask public: CloseTask(ErlNifEnv* _owner_env, ERL_NIF_TERM _caller_ref, - DbObject * _db_handle) - : WorkTask(_owner_env, _caller_ref, _db_handle) - {} - - virtual ~CloseTask() - { - } + DbObjectPtr_t & _db_handle); -protected: - virtual work_result DoWork() - { - DbObject * db_ptr; + virtual ~CloseTask(); - // get db pointer then clear reference count to it - db_ptr=m_DbPtr.get(); - m_DbPtr.assign(NULL); - - if (NULL!=db_ptr) - { - // set closing flag, this is blocking - db_ptr->InitiateCloseRequest(); - - // db_ptr no longer valid - db_ptr=NULL; - - return(work_result(ATOM_OK)); - } // if - else - { - return work_result(local_env(), ATOM_ERROR, ATOM_BADARG); - } // else - } + virtual work_result DoWork(); }; // class CloseTask @@ -401,41 +300,12 @@ class ItrCloseTask : public WorkTask ReferencePtr m_ItrPtr; public: - ItrCloseTask(ErlNifEnv* _owner_env, ERL_NIF_TERM _caller_ref, - ItrObject * _itr_handle) - : WorkTask(_owner_env, _caller_ref), - m_ItrPtr(_itr_handle) - {} + ItrObjectPtr_t & _itr_handle); - virtual ~ItrCloseTask() - { - } + virtual ~ItrCloseTask(); -protected: - virtual work_result DoWork() - { - ItrObject * itr_ptr; - - // get iterator pointer then clear reference count to it - itr_ptr=m_ItrPtr.get(); - m_ItrPtr.assign(NULL); - - if (NULL!=itr_ptr) - { - // set closing flag, this is blocking - itr_ptr->InitiateCloseRequest(); - - // itr_ptr no longer valid - itr_ptr=NULL; - - return(work_result(ATOM_OK)); - } // if - else - { - return work_result(local_env(), ATOM_ERROR, ATOM_BADARG); - } // else - } + virtual work_result DoWork(); }; // class ItrCloseTask diff --git a/priv/eleveldb.schema b/priv/eleveldb.schema index 074011fb..36ffdbec 100644 --- a/priv/eleveldb.schema +++ b/priv/eleveldb.schema @@ -278,15 +278,6 @@ hidden ]}. -{translation, "eleveldb.expiry_minutes", - fun(Conf) -> - case cuttlefish:conf_get("leveldb.expiration.retention_time", Conf) of - unlimited -> 0; - I -> I - end - end -}. - %% @doc Expire entire .sst table file. Authorizes leveldb to %% eliminate entire files that contain expired data (delete files diff --git a/priv/eleveldb_multi.schema b/priv/eleveldb_multi.schema index 6c6af943..2e388f53 100644 --- a/priv/eleveldb_multi.schema +++ b/priv/eleveldb_multi.schema @@ -117,13 +117,14 @@ "multi_backend.$name.leveldb.compression", "riak_kv.multi_backend", [ {default, on}, + {commented, on}, {datatype, flag} ]}. {mapping, "multi_backend.$name.leveldb.compression.algorithm", "riak_kv.multi_backend", [ - {new_conf_value, lz4}, + {commented, lz4}, {datatype, {enum, [snappy, lz4]}} ]}. diff --git a/rebar.config.script b/rebar.config.script index 60a64b9d..9e75fbd2 100644 --- a/rebar.config.script +++ b/rebar.config.script @@ -6,7 +6,7 @@ %% actually running. case os:type() of {unix,darwin} -> - Opt = " -mmacosx-version-min=10.8", + Opt = " -mmacosx-version-min=10.8 -stdlib=libc++", [Mjr|_] = string:tokens(os:cmd("/usr/bin/uname -r"), "."), Major = list_to_integer(Mjr), if diff --git a/src/eleveldb.erl b/src/eleveldb.erl index a21edd6c..b6f591e1 100644 --- a/src/eleveldb.erl +++ b/src/eleveldb.erl @@ -1,8 +1,6 @@ %% ------------------------------------------------------------------- %% -%% eleveldb: Erlang Wrapper for LevelDB (http://code.google.com/p/leveldb/) -%% -%% Copyright (c) 2010-2012 Basho Technologies, Inc. All Rights Reserved. +%% Copyright (c) 2010-2017 Basho Technologies, Inc. %% %% This file is provided to you under the Apache License, %% Version 2.0 (the "License"); you may not use this file @@ -19,6 +17,8 @@ %% under the License. %% %% ------------------------------------------------------------------- + +%% @doc Erlang NIF wrapper for LevelDB -module(eleveldb). -export([open/2, @@ -43,6 +43,13 @@ iterator_move/2, iterator_close/1]). +-export([property_cache/2, + property_cache_get/1, + property_cache_flush/0, + set_metadata_pid/2, + remove_metadata_pid/2, + get_metadata_pid/1]). + -export_type([db_ref/0, itr_ref/0]). @@ -52,11 +59,18 @@ -compile(export_all). -ifdef(EQC). -include_lib("eqc/include/eqc.hrl"). --define(QC_OUT(P), - eqc:on_output(fun(Str, Args) -> io:format(user, Str, Args) end, P)). --endif. +-define(QC_OUT(P), eqc:on_output(fun terminal_format/2, P)). +-endif. % EQC -include_lib("eunit/include/eunit.hrl"). --endif. + +%% Maximum number of distinct database instances to create in any test. +%% The highest runtime limit used is the lower of this value or +%% ((num-schedulers x 4) + 1). +%% This limit is driven by filesystem size constraints on builders - MvM's +%% trials have shown this value to work on the majority of builders the +%% majority of the time. +-define(MAX_TEST_OPEN, 21). +-endif. % TEST %% This cannot be a separate function. Code must be inline to trigger %% Erlang compiler's use of optimized selective receive. @@ -66,6 +80,7 @@ end). -define(COMPRESSION_ENUM, [snappy, lz4, false]). +-define(EXPIRY_ENUM, [unlimited, integer]). -spec init() -> ok | {error, any()}. init() -> @@ -316,7 +331,7 @@ option_types(open) -> {tiered_slow_prefix, any}, {cache_object_warming, bool}, {expiry_enabled, bool}, - {expiry_minutes, integer}, + {expiry_minutes, ?EXPIRY_ENUM}, {whole_file_expiry, bool}]; option_types(read) -> @@ -335,6 +350,31 @@ validate_options(Type, Opts) -> validate_type(KType, V) end, Opts). +-spec property_cache(string(), string()) -> ok. +property_cache(_BucketKey, _Properties) -> + erlang:nif_error({error, not_loaded}). + +-spec property_cache_get(string()) -> badarg | einval | [{atom(), any()}]. +property_cache_get(_BucketKey) -> + erlang:nif_error({error, not_loaded}). + +%% do NOT use property_cache_flush in production ... +%% it's a segfault waiting to happen +-spec property_cache_flush() -> ok. +property_cache_flush() -> + erlang:nif_error({error, not_loaded}). + +-spec set_metadata_pid(atom(),pid()) -> ok | {error, any()}. +set_metadata_pid(_Context, _Pid) -> + erlang:nif_error({error, not_loaded}). + +-spec remove_metadata_pid(atom(),pid()) -> ok | {error, any()}. +remove_metadata_pid(_Context, _Pid) -> + erlang:nif_error({error, not_loaded}). + +-spec get_metadata_pid(atom()) -> badarg | einval | pid(). +get_metadata_pid(_Context) -> + erlang:nif_error({error, not_loaded}). %% =================================================================== @@ -370,7 +410,12 @@ do_fold(Itr, Fun, Acc0, Opts) -> true = is_binary(Start) or (Start == first), fold_loop(iterator_move(Itr, Start), Itr, Fun, Acc0) after - iterator_close(Itr) + %% This clause shouldn't change the operation's result. + %% If the iterator has been invalidated by it or the db being closed, + %% the try clause above will raise an exception, and that's the one we + %% want to propagate. Catch the exception this raises in that case and + %% ignore it so we don't obscure the original. + catch iterator_close(Itr) end. fold_loop({error, iterator_closed}, _Itr, _Fun, Acc0) -> @@ -384,116 +429,293 @@ fold_loop({ok, K, V}, Itr, Fun, Acc0) -> Acc = Fun({K, V}, Acc0), fold_loop(iterator_move(Itr, prefetch), Itr, Fun, Acc). -validate_type({_Key, bool}, true) -> true; -validate_type({_Key, bool}, false) -> true; -validate_type({_Key, integer}, Value) when is_integer(Value) -> true; -validate_type({_Key, any}, _Value) -> true; -validate_type({_Key, ?COMPRESSION_ENUM}, snappy) -> true; -validate_type({_Key, ?COMPRESSION_ENUM}, lz4) -> true; -validate_type({_Key, ?COMPRESSION_ENUM}, false) -> true; -validate_type(_, _) -> false. +validate_type({_Key, bool}, true) -> true; +validate_type({_Key, bool}, false) -> true; +validate_type({_Key, integer}, Value) when is_integer(Value) -> true; +validate_type({_Key, any}, _Value) -> true; +validate_type({_Key, ?COMPRESSION_ENUM}, snappy) -> true; +validate_type({_Key, ?COMPRESSION_ENUM}, lz4) -> true; +validate_type({_Key, ?COMPRESSION_ENUM}, false) -> true; +validate_type({_Key, ?EXPIRY_ENUM}, unlimited) -> true; +validate_type({_Key, ?EXPIRY_ENUM}, Value) when is_integer(Value) -> true; +validate_type(_, _) -> false. %% =================================================================== -%% EUnit tests +%% Tests %% =================================================================== -ifdef(TEST). -open_test() -> [{open_test_Z(), l} || l <- lists:seq(1, 20)]. -open_test_Z() -> - os:cmd("rm -rf /tmp/eleveldb.open.test"), - {ok, Ref} = open("/tmp/eleveldb.open.test", [{create_if_missing, true}]), - ok = ?MODULE:put(Ref, <<"abc">>, <<"123">>, []), - {ok, <<"123">>} = ?MODULE:get(Ref, <<"abc">>, []), - not_found = ?MODULE:get(Ref, <<"def">>, []). - -fold_test() -> [{fold_test_Z(), l} || l <- lists:seq(1, 20)]. -fold_test_Z() -> - os:cmd("rm -rf /tmp/eleveldb.fold.test"), - {ok, Ref} = open("/tmp/eleveldb.fold.test", [{create_if_missing, true}]), - ok = ?MODULE:put(Ref, <<"def">>, <<"456">>, []), - ok = ?MODULE:put(Ref, <<"abc">>, <<"123">>, []), - ok = ?MODULE:put(Ref, <<"hij">>, <<"789">>, []), - [{<<"abc">>, <<"123">>}, - {<<"def">>, <<"456">>}, - {<<"hij">>, <<"789">>}] = lists:reverse(fold(Ref, fun({K, V}, Acc) -> [{K, V} | Acc] end, - [], [])). - -fold_keys_test() -> [{fold_keys_test_Z(), l} || l <- lists:seq(1, 20)]. -fold_keys_test_Z() -> - os:cmd("rm -rf /tmp/eleveldb.fold.keys.test"), - {ok, Ref} = open("/tmp/eleveldb.fold.keys.test", [{create_if_missing, true}]), - ok = ?MODULE:put(Ref, <<"def">>, <<"456">>, []), - ok = ?MODULE:put(Ref, <<"abc">>, <<"123">>, []), - ok = ?MODULE:put(Ref, <<"hij">>, <<"789">>, []), - [<<"abc">>, <<"def">>, <<"hij">>] = lists:reverse(fold_keys(Ref, - fun(K, Acc) -> [K | Acc] end, - [], [])). - -fold_from_key_test() -> [{fold_from_key_test_Z(), l} || l <- lists:seq(1, 20)]. -fold_from_key_test_Z() -> - os:cmd("rm -rf /tmp/eleveldb.fold.fromkeys.test"), - {ok, Ref} = open("/tmp/eleveldb.fromfold.keys.test", [{create_if_missing, true}]), - ok = ?MODULE:put(Ref, <<"def">>, <<"456">>, []), - ok = ?MODULE:put(Ref, <<"abc">>, <<"123">>, []), - ok = ?MODULE:put(Ref, <<"hij">>, <<"789">>, []), - [<<"def">>, <<"hij">>] = lists:reverse(fold_keys(Ref, - fun(K, Acc) -> [K | Acc] end, - [], [{first_key, <<"d">>}])). - -destroy_test() -> [{destroy_test_Z(), l} || l <- lists:seq(1, 20)]. -destroy_test_Z() -> - os:cmd("rm -rf /tmp/eleveldb.destroy.test"), - {ok, Ref} = open("/tmp/eleveldb.destroy.test", [{create_if_missing, true}]), - ok = ?MODULE:put(Ref, <<"def">>, <<"456">>, []), - {ok, <<"456">>} = ?MODULE:get(Ref, <<"def">>, []), - close(Ref), - ok = ?MODULE:destroy("/tmp/eleveldb.destroy.test", []), - {error, {db_open, _}} = open("/tmp/eleveldb.destroy.test", [{error_if_exists, true}]). - -compression_test() -> [{compression_test_Z(), l} || l <- lists:seq(1, 20)]. -compression_test_Z() -> - CompressibleData = list_to_binary([0 || _X <- lists:seq(1,20)]), - os:cmd("rm -rf /tmp/eleveldb.compress.0 /tmp/eleveldb.compress.1"), - {ok, Ref0} = open("/tmp/eleveldb.compress.0", [{write_buffer_size, 5}, - {create_if_missing, true}, - {compression, false}]), - [ok = ?MODULE:put(Ref0, <>, CompressibleData, [{sync, true}]) || - I <- lists:seq(1,10)], - {ok, Ref1} = open("/tmp/eleveldb.compress.1", [{write_buffer_size, 5}, - {create_if_missing, true}, - {compression, true}]), - [ok = ?MODULE:put(Ref1, <>, CompressibleData, [{sync, true}]) || - I <- lists:seq(1,10)], - %% Check both of the LOG files created to see if the compression option was correctly - %% passed down - MatchCompressOption = - fun(File, Expected) -> - {ok, Contents} = file:read_file(File), - case re:run(Contents, "Options.compression: " ++ Expected) of - {match, _} -> match; - nomatch -> nomatch - end - end, - Log0Option = MatchCompressOption("/tmp/eleveldb.compress.0/LOG", "0"), - Log1Option = MatchCompressOption("/tmp/eleveldb.compress.1/LOG", "1"), - ?assert(Log0Option =:= match andalso Log1Option =:= match). - - -close_test() -> [{close_test_Z(), l} || l <- lists:seq(1, 20)]. -close_test_Z() -> - os:cmd("rm -rf /tmp/eleveldb.close.test"), - {ok, Ref} = open("/tmp/eleveldb.close.test", [{create_if_missing, true}]), - ?assertEqual(ok, close(Ref)), - ?assertEqual({error, einval}, close(Ref)). - -close_fold_test() -> [{close_fold_test_Z(), l} || l <- lists:seq(1, 20)]. -close_fold_test_Z() -> - os:cmd("rm -rf /tmp/eleveldb.close_fold.test"), - {ok, Ref} = open("/tmp/eleveldb.close_fold.test", [{create_if_missing, true}]), - ok = eleveldb:put(Ref, <<"k">>,<<"v">>,[]), - ?assertException(throw, {iterator_closed, ok}, % ok is returned by close as the acc - eleveldb:fold(Ref, fun(_,_A) -> eleveldb:close(Ref) end, undefined, [])). +%% =================================================================== +%% Exported Test Helpers +%% =================================================================== + +-spec assert_close(DbRef :: db_ref()) -> ok | no_return(). +%% +%% Closes DbRef inside an ?assert... macro. +%% +assert_close(DbRef) -> + ?assertEqual(ok, ?MODULE:close(DbRef)). + +-spec assert_open(DbPath :: string()) -> db_ref() | no_return(). +%% +%% Opens Path inside an ?assert... macro, creating the database directory if needed. +%% +assert_open(DbPath) -> + assert_open(DbPath, [{create_if_missing, true}]). + +-spec assert_open(DbPath :: string(), OpenOpts :: open_options()) + -> db_ref() | no_return(). +%% +%% Opens DbPath, with OpenOpts, inside an ?assert... macro. +%% +assert_open(DbPath, OpenOpts) -> + OpenRet = ?MODULE:open(DbPath, OpenOpts), + ?assertMatch({ok, _}, OpenRet), + {_, DbRef} = OpenRet, + DbRef. + +-spec assert_open_small(DbPath :: string()) -> db_ref() | no_return(). +%% +%% Opens Path inside an ?assert... macro, using a limited storage footprint +%% and creating the database directory if needed. +%% +assert_open_small(DbPath) -> + assert_open(DbPath, [{create_if_missing, true}, {limited_developer_mem, true}]). + +-spec create_test_dir() -> string() | no_return(). +%% +%% Creates a new, empty, uniquely-named directory for testing and returns +%% its full path. This operation *should* never fail, but would raise an +%% ?assert...-ish exception if it did. +%% +create_test_dir() -> + string:strip(?cmd("mktemp -d /tmp/" ?MODULE_STRING ".XXXXXXX"), both, $\n). + +-spec delete_test_dir(Dir :: string()) -> ok | no_return(). +%% +%% Deletes a test directory fully, whether or not it exists. +%% This operation *should* never fail, but would raise an ?assert...-ish +%% exception if it did. +%% +delete_test_dir(Dir) -> + ?assertCmd("rm -rf " ++ Dir). + +-spec terminal_format(Fmt :: io:format(), Args :: list()) -> ok. +%% +%% Writes directly to the terminal, bypassing EUnit hooks. +%% +terminal_format(Fmt, Args) -> + io:format(user, Fmt, Args). + +%% =================================================================== +%% EUnit Tests +%% =================================================================== + +-define(local_test(Timeout, TestFunc), + fun(TestRoot) -> + Title = erlang:atom_to_list(TestFunc), + TestDir = filename:join(TestRoot, TestFunc), + {Title, {timeout, Timeout, fun() -> TestFunc(TestDir) end}} + end +). +-define(local_test(TestFunc), ?local_test(10, TestFunc)). +-define(max_test_open(Calc), erlang:min(?MAX_TEST_OPEN, Calc)). + +eleveldb_test_() -> + {foreach, + fun create_test_dir/0, + fun delete_test_dir/1, + [ + ?local_test(test_open), + ?local_test(test_close), + ?local_test(test_destroy), + ?local_test(test_fold), + ?local_test(test_fold_keys), + ?local_test(test_fold_from_key), + ?local_test(test_close_fold), + % On weak machines the following can take a while, so we tweak + % them a bit to avoid timeouts. On anything resembling a competent + % computer, these should complete in a small fraction of a second, + % but on some lightweight VMs used for validation, that can be + % extended by orders of magnitude. + ?local_test(15, test_compression), + fun(TestRoot) -> + TestName = "test_open_many", + TestDir = filename:join(TestRoot, TestName), + Count = ?max_test_open(erlang:system_info(schedulers) * 4 + 1), + Title = lists:flatten(io_lib:format("~s(~b)", [TestName, Count])), + {Title, {timeout, 30, fun() -> test_open_many(TestDir, Count) end}} + end + ] + }. + +%% fold accumulator used in a few tests +accumulate(Val, Acc) -> + [Val | Acc]. + +%% +%% Individual tests +%% + +test_open(TestDir) -> + Ref = assert_open(TestDir), + ?assertEqual(ok, ?MODULE:put(Ref, <<"abc">>, <<"123">>, [])), + ?assertEqual({ok, <<"123">>}, ?MODULE:get(Ref, <<"abc">>, [])), + ?assertEqual(not_found, ?MODULE:get(Ref, <<"def">>, [])), + assert_close(Ref). + +test_open_many(TestDir, HowMany) -> + Insts = lists:seq(1, HowMany), + KNonce = erlang:make_ref(), + VNonce = erlang:self(), + WorkSet = [ + begin + D = lists:flatten(io_lib:format("~s.~b", [TestDir, N])), + T = os:timestamp(), + K = erlang:phash2([T, N, KNonce], 1 bsl 32), + V = erlang:phash2([N, T, VNonce], 1 bsl 32), + {assert_open_small(D), + <>, <>} + end || N <- Insts], + lists:foreach( + fun({Ref, Key, Val}) -> + ?assertEqual(ok, ?MODULE:put(Ref, Key, Val, [])) + end, WorkSet), + lists:foreach( + fun({Ref, Key, Val}) -> + ?assertEqual({ok, Val}, ?MODULE:get(Ref, Key, [])) + end, WorkSet), + lists:foreach(fun assert_close/1, [R || {R, _, _} <- WorkSet]). + +test_close(TestDir) -> + Ref = assert_open(TestDir, [{create_if_missing, true}]), + assert_close(Ref), + ?assertError(badarg, ?MODULE:close(Ref)). + +test_fold(TestDir) -> + Ref = assert_open(TestDir), + ?assertEqual(ok, ?MODULE:put(Ref, <<"def">>, <<"456">>, [])), + ?assertEqual(ok, ?MODULE:put(Ref, <<"abc">>, <<"123">>, [])), + ?assertEqual(ok, ?MODULE:put(Ref, <<"hij">>, <<"789">>, [])), + ?assertEqual( + [{<<"abc">>, <<"123">>}, {<<"def">>, <<"456">>}, {<<"hij">>, <<"789">>}], + lists:reverse(?MODULE:fold(Ref, fun accumulate/2, [], []))), + assert_close(Ref). + +test_fold_keys(TestDir) -> + Ref = assert_open(TestDir), + ?assertEqual(ok, ?MODULE:put(Ref, <<"def">>, <<"456">>, [])), + ?assertEqual(ok, ?MODULE:put(Ref, <<"abc">>, <<"123">>, [])), + ?assertEqual(ok, ?MODULE:put(Ref, <<"hij">>, <<"789">>, [])), + ?assertEqual( + [<<"abc">>, <<"def">>, <<"hij">>], + lists:reverse(?MODULE:fold_keys(Ref, fun accumulate/2, [], []))), + assert_close(Ref). + +test_fold_from_key(TestDir) -> + Ref = assert_open(TestDir), + ?assertEqual(ok, ?MODULE:put(Ref, <<"def">>, <<"456">>, [])), + ?assertEqual(ok, ?MODULE:put(Ref, <<"abc">>, <<"123">>, [])), + ?assertEqual(ok, ?MODULE:put(Ref, <<"hij">>, <<"789">>, [])), + ?assertEqual([<<"def">>, <<"hij">>], lists:reverse( + ?MODULE:fold_keys(Ref, fun accumulate/2, [], [{first_key, <<"d">>}]))), + assert_close(Ref). + +test_destroy(TestDir) -> + Ref = assert_open(TestDir), + ?assertEqual(ok, ?MODULE:put(Ref, <<"def">>, <<"456">>, [])), + ?assertEqual({ok, <<"456">>}, ?MODULE:get(Ref, <<"def">>, [])), + assert_close(Ref), + ?assertEqual(ok, ?MODULE:destroy(TestDir, [])), + ?assertMatch({error, {db_open, _}}, ?MODULE:open(TestDir, [{error_if_exists, true}])). + +test_compression(TestDir) -> + IntSeq = lists:seq(1, 10), + CompressibleData = list_to_binary(lists:duplicate(20, 0)), + + Ref0 = assert_open(TestDir ++ ".0", [ + {write_buffer_size, 5}, {create_if_missing, true}, {compression, false}]), + lists:foreach( + fun(I) -> + ?assertEqual(ok, + ?MODULE:put(Ref0, <>, CompressibleData, [{sync, true}])) + end, IntSeq), + + Ref1 = assert_open(TestDir ++ ".1", [ + {write_buffer_size, 5}, {create_if_missing, true}, {compression, true}]), + lists:foreach( + fun(I) -> + ?assertEqual(ok, + ?MODULE:put(Ref1, <>, CompressibleData, [{sync, true}])) + end, IntSeq), + + %% Check both of the LOG files created to see if the compression option was + %% passed down correctly + lists:foreach( + fun(Val) -> + File = filename:join(TestDir ++ [$. | Val], "LOG"), + RRet = file:read_file(File), + ?assertMatch({ok, _}, RRet), + {_, Data} = RRet, + Pattern = "Options.compression: " ++ Val, + ?assertMatch({match, _}, re:run(Data, Pattern)) + end, ["0", "1"]), + assert_close(Ref0), + assert_close(Ref1). + +test_close_fold(TestDir) -> + Ref = assert_open(TestDir), + ?assertEqual(ok, ?MODULE:put(Ref, <<"k">>,<<"v">>,[])), + ?assertError(badarg, + ?MODULE:fold(Ref, fun(_,_) -> assert_close(Ref) end, undefined, [])). + +%% +%% Parallel tests +%% + +parallel_test_() -> + ParaCnt = ?max_test_open(erlang:system_info(schedulers) * 2 + 1), + LoadCnt = 99, + TestSeq = lists:seq(1, ParaCnt), + {foreach, + fun create_test_dir/0, + fun delete_test_dir/1, + [fun(TestRoot) -> + {inparallel, [begin + T = lists:flatten(io_lib:format("load proc ~b", [N])), + D = filename:join(TestRoot, io_lib:format("parallel_test.~b", [N])), + S = lists:seq(N, (N + LoadCnt - 1)), + {T, fun() -> run_load(D, S) end} + end || N <- TestSeq]} + end] + }. + +run_load(TestDir, IntSeq) -> + KNonce = [os:timestamp(), erlang:self()], + Ref = assert_open_small(TestDir), + VNonce = [erlang:make_ref(), os:timestamp()], + KVIn = [ + begin + K = erlang:phash2([N | KNonce], 1 bsl 32), + V = erlang:phash2([N | VNonce], 1 bsl 32), + {<>, <>} + end || N <- IntSeq], + lists:foreach( + fun({Key, Val}) -> + ?assertEqual(ok, ?MODULE:put(Ref, Key, Val, [])) + end, KVIn), + {L, R} = lists:split(erlang:hd(IntSeq), KVIn), + KVOut = R ++ L, + lists:foreach( + fun({Key, Val}) -> + ?assertEqual({ok, Val}, ?MODULE:get(Ref, Key, [])) + end, KVOut), + assert_close(Ref). + +%% =================================================================== +%% QuickCheck Tests +%% =================================================================== -ifdef(EQC). @@ -512,56 +734,81 @@ ops(Keys, Values) -> apply_kv_ops([], _Ref, Acc0) -> Acc0; apply_kv_ops([{put, K, V} | Rest], Ref, Acc0) -> - ok = eleveldb:put(Ref, K, V, []), + ?assertEqual(ok, ?MODULE:put(Ref, K, V, [])), apply_kv_ops(Rest, Ref, orddict:store(K, V, Acc0)); apply_kv_ops([{async_put, K, V} | Rest], Ref, Acc0) -> MyRef = make_ref(), Context = {my_context, MyRef}, - ok = eleveldb:async_put(Ref, Context, K, V, []), + ?assertEqual(ok, ?MODULE:async_put(Ref, Context, K, V, [])), receive {Context, ok} -> apply_kv_ops(Rest, Ref, orddict:store(K, V, Acc0)); Msg -> - error({unexpected_msg, Msg}) + erlang:error({unexpected_msg, Msg}) end; apply_kv_ops([{delete, K, _} | Rest], Ref, Acc0) -> - ok = eleveldb:delete(Ref, K, []), + ?assertEqual(ok, ?MODULE:delete(Ref, K, [])), apply_kv_ops(Rest, Ref, orddict:store(K, deleted, Acc0)). -prop_put_delete() -> +prop_put_delete(TestDir) -> ?LET({Keys, Values}, {keys(), values()}, - ?FORALL(Ops, eqc_gen:non_empty(list(ops(Keys, Values))), - begin - ?cmd("rm -rf /tmp/eleveldb.putdelete.qc"), - {ok, Ref} = eleveldb:open("/tmp/eleveldb.putdelete.qc", - [{create_if_missing, true}]), - Model = apply_kv_ops(Ops, Ref, []), - - %% Valdiate that all deleted values return not_found - F = fun({K, deleted}) -> - ?assertEqual(not_found, eleveldb:get(Ref, K, [])); - ({K, V}) -> - ?assertEqual({ok, V}, eleveldb:get(Ref, K, [])) - end, - lists:map(F, Model), - - %% Validate that a fold returns sorted values - Actual = lists:reverse(fold(Ref, fun({K, V}, Acc) -> [{K, V} | Acc] end, - [], [])), - ?assertEqual([{K, V} || {K, V} <- Model, V /= deleted], - Actual), - ok = eleveldb:close(Ref), - true - end)). + ?FORALL(Ops, eqc_gen:non_empty(list(ops(Keys, Values))), + begin + delete_test_dir(TestDir), + Ref = assert_open(TestDir, [{create_if_missing, true}]), + Model = apply_kv_ops(Ops, Ref, []), + + %% Validate that all deleted values return not_found + lists:foreach( + fun({K, deleted}) -> + ?assertEqual(not_found, ?MODULE:get(Ref, K, [])); + ({K, V}) -> + ?assertEqual({ok, V}, ?MODULE:get(Ref, K, [])) + end, Model), + + %% Validate that a fold returns sorted values + Actual = lists:reverse( + ?MODULE:fold(Ref, fun({K, V}, Acc) -> [{K, V} | Acc] end, [], [])), + ?assertEqual([{K, V} || {K, V} <- Model, V /= deleted], Actual), + assert_close(Ref), + true + end)). prop_put_delete_test_() -> Timeout1 = 10, Timeout2 = 15, - %% We use the ?ALWAYS(300, ...) wrapper around the second test as a - %% regression test. - [{timeout, 3*Timeout1, {"No ?ALWAYS()", fun() -> qc(eqc:testing_time(Timeout1,prop_put_delete())) end}}, - {timeout, 10*Timeout2, {"With ?ALWAYS()", fun() -> qc(eqc:testing_time(Timeout2,?ALWAYS(150,prop_put_delete()))) end}}]. - --endif. - --endif. + {foreach, + fun create_test_dir/0, + fun delete_test_dir/1, + [ + fun(TestRoot) -> + TestDir = filename:join(TestRoot, "putdelete.qc"), + InnerTO = Timeout1, + OuterTO = (InnerTO * 3), + Title = "Without ?ALWAYS()", + TestFun = fun() -> + qc(eqc:testing_time(InnerTO, prop_put_delete(TestDir))) + end, + {timeout, OuterTO, {Title, TestFun}} + end, + fun(TestRoot) -> + TestDir = filename:join(TestRoot, "putdelete.qc"), + InnerTO = Timeout2, + OuterTO = (InnerTO * 10), + AwCount = (InnerTO * 9), + %% We use the ?ALWAYS(AwCount, ...) wrapper as a regression test. + %% It's not clear how this is effectively different than the first + %% fixture, but I'm leaving it here in case I'm missing something. + Title = lists:flatten(io_lib:format("With ?ALWAYS(~b)", [AwCount])), + TestFun = fun() -> + qc(eqc:testing_time(InnerTO, + ?ALWAYS(AwCount, prop_put_delete(TestDir)))) + end, + {timeout, OuterTO, {Title, TestFun}} + end + ] + }. + +-endif. % EQC + +-endif. % TEST diff --git a/src/eleveldb_metadata.erl b/src/eleveldb_metadata.erl new file mode 100644 index 00000000..f11fa31f --- /dev/null +++ b/src/eleveldb_metadata.erl @@ -0,0 +1,32 @@ +%% ------------------------------------------------------------------- +%% +%% eleveldb_metadata: Erlang interface to allow LevelDB to retrieve riak_core metadata +%% +%% Copyright (c) 2017 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +-module(eleveldb_metadata). + +-export([handle_metadata_response/1]). + +%% When eleveldb needs metadata, it routes through a `riak_core_info_service_process` process to get +%% the required data. This handles the response from that process, which will call the function given +%% in the initialization (currently done in `riak_core_app` - TODO: Make this more generic) +%% and call this function to return the results. +-spec handle_metadata_response({Props::proplists:proplist(), _SourceParams::list(), Key::term()}) -> ok. +handle_metadata_response({Props, _SourceParams, [Key]}) -> + eleveldb:property_cache(Key, Props). diff --git a/test/bucket_expiry.erl b/test/bucket_expiry.erl new file mode 100644 index 00000000..2b15ae67 --- /dev/null +++ b/test/bucket_expiry.erl @@ -0,0 +1,159 @@ +%% ------------------------------------------------------------------- +%% +%% eleveldb: Erlang Wrapper for LevelDB (http://code.google.com/p/leveldb/) +%% +%% Copyright (c) 2017 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +-module(bucket_expiry). + +-compile(export_all). + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + +iterator_test_() -> + {spawn, + [{setup, + fun setup/0, + fun cleanup/1, + fun(Ref) -> + [ + set_remove_pid(Ref), + set_verify_prop(Ref) + ] + end}] + }. + + +%% Property Cache needs database to be running, but does not care +%% if there is data in the db or not +setup() -> + os:cmd("rm -rf be_test"), % NOTE + {ok, Ref} = eleveldb:open("be_test", [{create_if_missing, true}]), + Ref. + +cleanup(Ref) -> + eleveldb:close(Ref). + +set_remove_pid(_Ref) -> + fun() -> + % validate default return expectations + ?assertEqual( einval, eleveldb:get_metadata_pid(bucket_props)), + ?assertEqual( badarg, eleveldb:get_metadata_pid(no_arg)), + + % wrong property name atom + Reply1 = eleveldb:set_metadata_pid(mystery_props, list_to_pid("<0.2.0>")), + ?assertMatch({error, {badarg, _}}, Reply1), + + % wrong second parameter + Reply2 = eleveldb:set_metadata_pid(bucket_props, 42), + ?assertMatch({error, {badarg, _}}, Reply2), + + % simple set + ok = eleveldb:set_metadata_pid(bucket_props, list_to_pid("<0.2.0>")), + ?assertEqual(list_to_pid("<0.2.0>"), eleveldb:get_metadata_pid(bucket_props)), + + % update existing + ok = eleveldb:set_metadata_pid(bucket_props, list_to_pid("<0.231.0>")), + ?assertEqual(list_to_pid("<0.231.0>"), eleveldb:get_metadata_pid(bucket_props)), + + % remove: wrong property name atom + Reply3 = eleveldb:remove_metadata_pid(mystery_props, list_to_pid("<0.2.0>")), + ?assertMatch({error, {badarg, _}}, Reply3), + + % remove: wrong second parameter + Reply4 = eleveldb:remove_metadata_pid(bucket_props, 42), + ?assertMatch({error, {badarg, _}}, Reply4), + + % remove old (should do nothing) + ok = eleveldb:remove_metadata_pid(bucket_props, list_to_pid("<0.2.0>")), + ?assertEqual(list_to_pid("<0.231.0>"), eleveldb:get_metadata_pid(bucket_props)), + + % remove active + ok = eleveldb:remove_metadata_pid(bucket_props, list_to_pid("<0.231.0>")), + ?assertEqual(einval, eleveldb:get_metadata_pid(bucket_props)) + end. + + +%% +%% Currently this test ALWAYS FAILS in open source build. This is +%% due to the fact that property cache is not available in open source. +%% +set_verify_prop(_Ref) -> + %% riak_core_bucket:get_bucket(<<"default">>). + + %% WARNING: binary strings used for unit test ARE not representative + %% of binary strings passed from leveldb + + fun() -> + %% test 1: simulated default properties, using atoms + ok=eleveldb:property_cache(<<"default">>, [{name,<<"default">>}, + {allow_mult,false}, + {basic_quorum,false}, + {big_vclock,50}, + {chash_keyfun,{riak_core_util,chash_std_keyfun}}, + {default_time_to_live, <<"2345m">>}, + {dvv_enabled,false}, + {dw,quorum}, + {expiration, true}, + {expiration_mode, whole_file}, + {last_write_wins,false}, + {linkfun,{modfun,riak_kv_wm_link_walker,mapreduce_linkfun}}, + {n_val,3}, + {notfound_ok,true}, + {old_vclock,86400}, + {postcommit,[]}, + {pr,0}, + {precommit,[]}, + {pw,0}, + {r,quorum}, + {rw,quorum}, + {small_vclock,50}, + {w,quorum}, + {write_once,false}, + {young_vclock,20}]), + ?assertEqual([{expiry_enabled, enabled}, {expiry_minutes, 2345}, {expiration_mode, whole_file}], + eleveldb:property_cache_get(<<"default">>)), + + %% test 2: only relevant properties, using JSON strings + ok=eleveldb:property_cache(<<"test2">>, [{default_time_to_live, <<"2h5m">>}, + {expiration, <<"true">>}, + {expiration_mode, <<"whole_file">>}]), + ?assertEqual([{expiry_enabled, enabled}, {expiry_minutes, 125}, {expiration_mode, whole_file}], + eleveldb:property_cache_get(<<"test2">>)), + + %% test 3: different JSON strings, different property order + ok=eleveldb:property_cache(<<"test3">>, [{expiration, <<"on">>}, + {expiration_mode, per_item}, + {default_time_to_live, <<"30d">>}]), + ?assertEqual([{expiry_enabled, enabled}, {expiry_minutes, 43200}, {expiration_mode, per_item}], + eleveldb:property_cache_get(<<"test3">>)), + + + %% test 4: be sure test 2 is still live + ?assertEqual([{expiry_enabled, enabled}, {expiry_minutes, 125}, {expiration_mode, whole_file}], + eleveldb:property_cache_get(<<"test2">>)), + + %% test 5: does flush work? + ok=eleveldb:property_cache_flush(), + ?assertEqual(einval, eleveldb:property_cache_get(<<"test2">>)) + + end. + +-endif. diff --git a/test/cacheleak.erl b/test/cacheleak.erl index 19b3bce1..7b24675a 100644 --- a/test/cacheleak.erl +++ b/test/cacheleak.erl @@ -1,8 +1,6 @@ %% ------------------------------------------------------------------- %% -%% eleveldb: Erlang Wrapper for LevelDB (http://code.google.com/p/leveldb/) -%% -%% Copyright (c) 2010-2013 Basho Technologies, Inc. All Rights Reserved. +%% Copyright (c) 2012-2017 Basho Technologies, Inc. %% %% This file is provided to you under the Apache License, %% Version 2.0 (the "License"); you may not use this file @@ -19,60 +17,80 @@ %% under the License. %% %% ------------------------------------------------------------------- --module(cacheleak). --compile(export_all). +-module(cacheleak). +-ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). +-define(KV_PAIRS, (1000 * 10)). +-define(VAL_SIZE, (1024 * 10)). +-define(MAX_RSS, (1000 * 500)). % driven by ?KV_PAIRS and ?VAL_SIZE ? + +-define(TEST_LOOPS, 10). +-define(TIMEOUT, (?TEST_LOOPS * 60)). + cacheleak_test_() -> - {timeout, 10*60, fun() -> - [] = os:cmd("rm -rf /tmp/eleveldb.cacheleak.test"), - Blobs = [{<>, compressible_bytes(10240)} || - I <- lists:seq(1, 10000)], - cacheleak_loop(10, Blobs, 500000) - end}. + TestRoot = eleveldb:create_test_dir(), + TestDir = filename:join(TestRoot, ?MODULE), + {setup, + fun() -> TestRoot end, + fun eleveldb:delete_test_dir/1, + {timeout, ?TIMEOUT, fun() -> + Bytes = compressible_bytes(?VAL_SIZE), + Blobs = [{<>, Bytes} || I <- lists:seq(1, ?KV_PAIRS)], + eleveldb:terminal_format("RSS limit: ~b\n", [?MAX_RSS]), + cacheleak_loop(0, Blobs, ?MAX_RSS, TestDir) + end}}. %% It's very important for this test that the data is compressible. Otherwise, -%% the file will be mmaped, and nothing will fill up the cache. +%% the file will be mmapped, and nothing will fill up the cache. compressible_bytes(Count) -> - list_to_binary([0 || _I <- lists:seq(1, Count)]). + erlang:list_to_binary(lists:duplicate(Count, 0)). -cacheleak_loop(0, _Blobs, _MaxFinalRSS) -> - ok; -cacheleak_loop(Count, Blobs, MaxFinalRSS) -> +cacheleak_loop(Count, Blobs, MaxFinalRSS, TestDir) when Count < ?TEST_LOOPS -> %% We spawn a process to open a LevelDB instance and do a series of %% reads/writes to fill up the cache. When the process exits, the LevelDB %% ref will get GC'd and we can re-evaluate the memory footprint of the %% process to make sure everything got cleaned up as expected. F = fun() -> - - {ok, Ref} = eleveldb:open("/tmp/eleveldb.cacheleak.test", - [{create_if_missing, true}, - {limited_developer_mem, true}]), - [ok = eleveldb:put(Ref, I, B, []) || {I, B} <- Blobs], - eleveldb:fold(Ref, fun({_K, _V}, A) -> A end, [], [{fill_cache, true}]), - [{ok, B} = eleveldb:get(Ref, I, []) || {I, B} <- Blobs], - ok = eleveldb:close(Ref), - erlang:garbage_collect(), - io:format(user, "RSS1: ~p\n", [rssmem()]) - end, - {_Pid, Mref} = spawn_monitor(F), + Ref = eleveldb:assert_open_small(TestDir), + lists:foreach( + fun({Key, Val}) -> + ?assertEqual(ok, eleveldb:put(Ref, Key, Val, [])) + end, Blobs), + ?assertEqual([], eleveldb:fold(Ref, + fun({_K, _V}, A) -> A end, [], [{fill_cache, true}])), + lists:foreach( + fun({Key, Val}) -> + ?assertEqual({ok, Val}, eleveldb:get(Ref, Key, [])) + end, Blobs), + eleveldb:assert_close(Ref), + erlang:garbage_collect(), + eleveldb:terminal_format("RSS ~2b: ~p\n", [Count, rssmem()]) + end, + {_Pid, Mon} = erlang:spawn_monitor(F), receive - {'DOWN', Mref, process, _, _} -> + {'DOWN', Mon, process, _, _} -> ok end, RSS = rssmem(), ?assert(MaxFinalRSS > RSS), - cacheleak_loop(Count-1, Blobs, MaxFinalRSS). + cacheleak_loop((Count + 1), Blobs, MaxFinalRSS, TestDir); + +cacheleak_loop(_Count, _Blobs, _MaxFinalRSS, _TestDir) -> + ok. rssmem() -> Cmd = io_lib:format("ps -o rss= -p ~s", [os:getpid()]), - S = string:strip(os:cmd(Cmd), both), + % Don't try to use eunit's ?cmd macro here, it won't do the right thing. + S = string:strip(os:cmd(Cmd), left), % only matters that the 1st character is $0-$9 case string:to_integer(S) of {error, _} -> - io:format(user, "Error parsing integer in: ~s\n", [S]), + eleveldb:terminal_format("Error parsing integer in: ~s\n", [S]), error; {I, _} -> I end. + +-endif. % TEST diff --git a/test/cleanup.erl b/test/cleanup.erl index 9a1c6af3..abb3f8b4 100644 --- a/test/cleanup.erl +++ b/test/cleanup.erl @@ -1,8 +1,6 @@ %% ------------------------------------------------------------------- %% -%% eleveldb: Erlang Wrapper for LevelDB (http://code.google.com/p/leveldb/) -%% -%% Copyright (c) 2010 Basho Technologies, Inc. All Rights Reserved. +%% Copyright (c) 2013-2017 Basho Technologies, Inc. %% %% This file is provided to you under the Apache License, %% Version 2.0 (the "License"); you may not use this file @@ -20,148 +18,141 @@ %% %% ------------------------------------------------------------------- -%% Test various scenarios that properly and improperly close LevelDB DB/iterator -%% handles and ensure everything cleans up properly. - +%% Test various scenarios that properly and improperly close LevelDB +%% DB/iterator handles and ensure everything cleans up properly. -module(cleanup). --compile(export_all). - +-ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). --define(COMMON_INSTANCE_DIR, "/tmp/eleveldb.cleanup.test"). +-define(local_test(Timeout, TestFunc), + fun(TestRoot) -> + Title = erlang:atom_to_list(TestFunc), + TestDir = filename:join(TestRoot, TestFunc), + {Title, {timeout, Timeout, fun() -> TestFunc(TestDir) end}} + end +). +-define(local_test(TestFunc), ?local_test(10, TestFunc)). + +cleanup_test_() -> + {foreach, + fun eleveldb:create_test_dir/0, + fun eleveldb:delete_test_dir/1, + [ + ?local_test(test_open_twice), + ?local_test(test_open_close), + ?local_test(test_open_exit), + ?local_test(test_iterator), + ?local_test(15, test_iterator_db_close), + ?local_test(15, test_iterator_exit) + ] + }. %% Purposely reopen an already opened database to test failure assumption -assumption_test() -> - DB = open(), - try - io:format(user, "assumption_test: top\n", []), - ok = failed_open(), - io:format(user, "assumption_test: bottom\n", []), - ok - after - eleveldb:close(DB), - timer:sleep(500) - end. +test_open_twice(TestDir) -> + DB = eleveldb:assert_open(TestDir), + ?assertMatch({error, {db_open, _}}, + eleveldb:open(TestDir, [{create_if_missing, true}])), + eleveldb:assert_close(DB). %% Open/close -open_close_test() -> - DB = open(), - eleveldb:close(DB), - check(). +test_open_close(TestDir) -> + check_open_close(TestDir), + check_open_close(TestDir). %% Open w/o close -open_exit_test() -> - spawn_wait(fun() -> - _DB = open() - end), - timer:sleep(500), - check(). +test_open_exit(TestDir) -> + spawn_wait(fun() -> eleveldb:assert_open(TestDir) end), + check_open_close(TestDir). %% Iterator open/close -iterator_test() -> - DB = open(), - try - write(100, DB), - {ok, Itr} = eleveldb:iterator(DB, []), - iterate(Itr), - eleveldb:iterator_close(Itr), - eleveldb:close(DB), - check(), - ok - after - catch eleveldb:close(DB), - timer:sleep(500) - end. +test_iterator(TestDir) -> + DB = eleveldb:assert_open(TestDir), + ?assertEqual(ok, write(100, DB)), + ItrRet = eleveldb:iterator(DB, []), + ?assertMatch({ok, _}, ItrRet), + {_, Itr} = ItrRet, + ?assertEqual(ok, iterate(Itr)), + ?assertEqual(ok, eleveldb:iterator_close(Itr)), + eleveldb:assert_close(DB), + check_open_close(TestDir). %% Close DB while iterator running %% Expected: reopen should fail while iterator reference alive %% however, iterator should fail after DB is closed %% once iterator process exits, open should succeed -iterator_db_close_test() -> - DB = open(), - try - write(100, DB), - Parent = self(), - spawn_monitor(fun() -> - {ok, Itr} = eleveldb:iterator(DB, []), - Parent ! continue, - try - iterate(Itr, 10) - catch - error:badarg -> - ok - end, - try - eleveldb:iterator_close(Itr) - catch - error:badarg -> - ok - end - end), - receive continue -> ok end, - eleveldb:close(DB), - %%failed_open(), - wait_down(), - erlang:garbage_collect(), - timer:sleep(500), - check(), - ok - after - catch eleveldb:close(DB), - timer:sleep(500) - end. +test_iterator_db_close(TestDir) -> + DB = eleveldb:assert_open(TestDir), + ?assertEqual(ok, write(100, DB)), + Parent = self(), + {Pid, Mon} = Proc = erlang:spawn_monitor( + fun() -> + {ok, Itr} = eleveldb:iterator(DB, []), + Parent ! continue, + try + iterate(Itr, 10) + catch + error:badarg -> + ok + end, + try + eleveldb:iterator_close(Itr) + catch + error:badarg -> + ok + end + end), + ?assertEqual(ok, receive + continue -> + ok; + {'DOWN', Mon, process, Pid, Info} -> + Info + end), + eleveldb:assert_close(DB), + ?assertEqual(ok, wait_down(Proc)), + check_open_close(TestDir). %% Iterate open, iterator process exit w/o close -iterator_exit_test() -> - DB = open(), - try - write(100, DB), - spawn_wait(fun() -> - {ok, Itr} = eleveldb:iterator(DB, []), - iterate(Itr) - end), - eleveldb:close(DB), - check(), - ok - after - catch eleveldb:close(DB), - timer:sleep(500) - end. +test_iterator_exit(TestDir) -> + DB = eleveldb:assert_open(TestDir), + ?assertEqual(ok, write(100, DB)), + spawn_wait(fun() -> + {ok, Itr} = eleveldb:iterator(DB, []), + iterate(Itr) + end), + eleveldb:assert_close(DB), + check_open_close(TestDir). spawn_wait(F) -> - spawn_monitor(F), - wait_down(). + wait_down(erlang:spawn_monitor(F)). -wait_down() -> - receive {'DOWN', _, process, _, _} -> +wait_down({Pid, Mon}) when erlang:is_pid(Pid) andalso erlang:is_reference(Mon) -> + receive + {'DOWN', Mon, process, Pid, _} -> + ok + end; +wait_down(Mon) when erlang:is_reference(Mon) -> + receive + {'DOWN', Mon, process, _, _} -> + ok + end; +wait_down(Pid) when erlang:is_pid(Pid) -> + receive + {'DOWN', _, process, Pid, _} -> ok end. -check() -> - timer:sleep(500), - DB = open(), - eleveldb:close(DB), - timer:sleep(500), - ok. - -open() -> - {ok, Ref} = eleveldb:open(?COMMON_INSTANCE_DIR, - [{create_if_missing, true}]), - Ref. - -failed_open() -> - {error, {db_open, _}} = eleveldb:open(?COMMON_INSTANCE_DIR, - [{create_if_missing, true}]), - ok. +check_open_close(TestDir) -> + eleveldb:assert_close(eleveldb:assert_open(TestDir)). write(N, DB) -> write(0, N, DB). write(Same, Same, _DB) -> ok; write(N, End, DB) -> - eleveldb:put(DB, <>, <>, []), - write(N+1, End, DB). + KV = <>, + ?assertEqual(ok, eleveldb:put(DB, KV, KV, [])), + write((N + 1), End, DB). iterate(Itr) -> iterate(Itr, 0). @@ -174,5 +165,6 @@ do_iterate({ok, K, _V}, {Itr, Expected, Delay}) -> <> = K, ?assertEqual(Expected, N), (Delay == 0) orelse timer:sleep(Delay), - do_iterate(eleveldb:iterator_move(Itr, next), - {Itr, Expected + 1, Delay}). + do_iterate(eleveldb:iterator_move(Itr, next), {Itr, (Expected + 1), Delay}). + +-endif. % TEST diff --git a/test/eleveldb_schema_tests.erl b/test/eleveldb_schema_tests.erl index 593f6f70..abb2f7f4 100644 --- a/test/eleveldb_schema_tests.erl +++ b/test/eleveldb_schema_tests.erl @@ -39,7 +39,7 @@ basic_schema_test() -> cuttlefish_unit:assert_not_configured(Config, "eleveldb.tiered_fast_prefix"), cuttlefish_unit:assert_not_configured(Config, "eleveldb.tiered_slow_prefix"), cuttlefish_unit:assert_config(Config, "eleveldb.expiry_enabled", false), - cuttlefish_unit:assert_config(Config, "eleveldb.expiry_minutes", 0), + cuttlefish_unit:assert_config(Config, "eleveldb.expiry_minutes", unlimited), cuttlefish_unit:assert_config(Config, "eleveldb.whole_file_expiry", true), %% Make sure no multi_backend @@ -205,7 +205,7 @@ expiry_minutes_schema_test() -> ["../priv/eleveldb.schema"], Case2, context(), predefined_schema()), cuttlefish_unit:assert_config(Config2, "eleveldb.expiry_enabled", true), - cuttlefish_unit:assert_config(Config2, "eleveldb.expiry_minutes", 0), + cuttlefish_unit:assert_config(Config2, "eleveldb.expiry_minutes", unlimited), cuttlefish_unit:assert_config(Config2, "eleveldb.whole_file_expiry", false), ok. @@ -241,7 +241,7 @@ multi_backend_test() -> cuttlefish_unit:assert_config(DefaultBackend, "fadvise_willneed", false), cuttlefish_unit:assert_config(DefaultBackend, "delete_threshold", 1000), cuttlefish_unit:assert_config(DefaultBackend, "expiry_enabled", false), - cuttlefish_unit:assert_config(DefaultBackend, "expiry_minutes", 0), + cuttlefish_unit:assert_config(DefaultBackend, "expiry_minutes", unlimited), cuttlefish_unit:assert_config(DefaultBackend, "whole_file_expiry", true), ok. diff --git a/test/iterators.erl b/test/iterators.erl index 438f77b4..ce303308 100644 --- a/test/iterators.erl +++ b/test/iterators.erl @@ -1,8 +1,6 @@ %% ------------------------------------------------------------------- %% -%% eleveldb: Erlang Wrapper for LevelDB (http://code.google.com/p/leveldb/) -%% -%% Copyright (c) 2010-2013 Basho Technologies, Inc. All Rights Reserved. +%% Copyright (c) 2013-2017 Basho Technologies, Inc. %% %% This file is provided to you under the Apache License, %% Version 2.0 (the "License"); you may not use this file @@ -19,128 +17,131 @@ %% under the License. %% %% ------------------------------------------------------------------- --module(iterators). --compile(export_all). +-module(iterators). -ifdef(TEST). - -include_lib("eunit/include/eunit.hrl"). iterator_test_() -> - {spawn, - [{setup, - fun setup/0, - fun cleanup/1, - fun(Ref) -> - [ - prev_test_case(Ref), - seek_and_next_test_case(Ref), - basic_prefetch_test_case(Ref), - seek_and_prefetch_test_case(Ref), - aae_prefetch1(Ref), - aae_prefetch2(Ref), - aae_prefetch3(Ref) - ] - end}] - }. + {spawn, [ + {setup, + fun setup/0, + fun cleanup/1, + fun({_, Ref}) -> [ + prev_test_case(Ref), + seek_and_next_test_case(Ref), + basic_prefetch_test_case(Ref), + seek_and_prefetch_test_case(Ref), + aae_prefetch1(Ref), + aae_prefetch2(Ref), + aae_prefetch3(Ref) + ] end + }]}. setup() -> - os:cmd("rm -rf ltest"), % NOTE - {ok, Ref} = eleveldb:open("ltest", [{create_if_missing, true}]), - eleveldb:put(Ref, <<"a">>, <<"w">>, []), - eleveldb:put(Ref, <<"b">>, <<"x">>, []), - eleveldb:put(Ref, <<"c">>, <<"y">>, []), - eleveldb:put(Ref, <<"d">>, <<"z">>, []), - Ref. - -cleanup(Ref) -> - eleveldb:close(Ref). + Dir = eleveldb:create_test_dir(), + Ref = eleveldb:assert_open(Dir), + ?assertEqual(ok, eleveldb:put(Ref, <<"a">>, <<"w">>, [])), + ?assertEqual(ok, eleveldb:put(Ref, <<"b">>, <<"x">>, [])), + ?assertEqual(ok, eleveldb:put(Ref, <<"c">>, <<"y">>, [])), + ?assertEqual(ok, eleveldb:put(Ref, <<"d">>, <<"z">>, [])), + {Dir, Ref}. + +cleanup({Dir, Ref}) -> + eleveldb:assert_close(Ref), + eleveldb:delete_test_dir(Dir). + +assert_iterator(DbRef, ItrOpts) -> + ItrRet = eleveldb:iterator(DbRef, ItrOpts), + ?assertMatch({ok, _}, ItrRet), + {_, Itr} = ItrRet, + Itr. prev_test_case(Ref) -> fun() -> - {ok, I} = eleveldb:iterator(Ref, []), - ?assertEqual({ok, <<"a">>, <<"w">>},eleveldb:iterator_move(I, <<>>)), - ?assertEqual({ok, <<"b">>, <<"x">>},eleveldb:iterator_move(I, next)), - ?assertEqual({ok, <<"a">>, <<"w">>},eleveldb:iterator_move(I, prev)) + I = assert_iterator(Ref, []), + ?assertEqual({ok, <<"a">>, <<"w">>}, eleveldb:iterator_move(I, <<>>)), + ?assertEqual({ok, <<"b">>, <<"x">>}, eleveldb:iterator_move(I, next)), + ?assertEqual({ok, <<"a">>, <<"w">>}, eleveldb:iterator_move(I, prev)) end. seek_and_next_test_case(Ref) -> fun() -> - {ok, I} = eleveldb:iterator(Ref, []), - ?assertEqual({ok, <<"b">>, <<"x">>},eleveldb:iterator_move(I, <<"b">>)), - ?assertEqual({ok, <<"c">>, <<"y">>},eleveldb:iterator_move(I, next)), - ?assertEqual({ok, <<"a">>, <<"w">>},eleveldb:iterator_move(I, <<"a">>)), - ?assertEqual({ok, <<"b">>, <<"x">>},eleveldb:iterator_move(I, next)), - ?assertEqual({ok, <<"c">>, <<"y">>},eleveldb:iterator_move(I, next)) + I = assert_iterator(Ref, []), + ?assertEqual({ok, <<"b">>, <<"x">>}, eleveldb:iterator_move(I, <<"b">>)), + ?assertEqual({ok, <<"c">>, <<"y">>}, eleveldb:iterator_move(I, next)), + ?assertEqual({ok, <<"a">>, <<"w">>}, eleveldb:iterator_move(I, <<"a">>)), + ?assertEqual({ok, <<"b">>, <<"x">>}, eleveldb:iterator_move(I, next)), + ?assertEqual({ok, <<"c">>, <<"y">>}, eleveldb:iterator_move(I, next)) end. basic_prefetch_test_case(Ref) -> fun() -> - {ok, I} = eleveldb:iterator(Ref, []), - ?assertEqual({ok, <<"a">>, <<"w">>},eleveldb:iterator_move(I, <<>>)), - ?assertEqual({ok, <<"b">>, <<"x">>},eleveldb:iterator_move(I, prefetch)), - ?assertEqual({ok, <<"c">>, <<"y">>},eleveldb:iterator_move(I, prefetch)), - ?assertEqual({ok, <<"d">>, <<"z">>},eleveldb:iterator_move(I, prefetch)) + I = assert_iterator(Ref, []), + ?assertEqual({ok, <<"a">>, <<"w">>}, eleveldb:iterator_move(I, <<>>)), + ?assertEqual({ok, <<"b">>, <<"x">>}, eleveldb:iterator_move(I, prefetch)), + ?assertEqual({ok, <<"c">>, <<"y">>}, eleveldb:iterator_move(I, prefetch)), + ?assertEqual({ok, <<"d">>, <<"z">>}, eleveldb:iterator_move(I, prefetch)) end. seek_and_prefetch_test_case(Ref) -> fun() -> - {ok, I} = eleveldb:iterator(Ref, []), - ?assertEqual({ok, <<"b">>, <<"x">>},eleveldb:iterator_move(I, <<"b">>)), - ?assertEqual({ok, <<"c">>, <<"y">>},eleveldb:iterator_move(I, prefetch)), - ?assertEqual({ok, <<"d">>, <<"z">>},eleveldb:iterator_move(I, prefetch_stop)), - ?assertEqual({ok, <<"a">>, <<"w">>},eleveldb:iterator_move(I, <<"a">>)), - ?assertEqual({ok, <<"b">>, <<"x">>},eleveldb:iterator_move(I, prefetch)), - ?assertEqual({ok, <<"c">>, <<"y">>},eleveldb:iterator_move(I, prefetch_stop)), - ?assertEqual({ok, <<"a">>, <<"w">>},eleveldb:iterator_move(I, <<"a">>)), - ?assertEqual({ok, <<"b">>, <<"x">>},eleveldb:iterator_move(I, prefetch_stop)), - ?assertEqual({ok, <<"c">>, <<"y">>},eleveldb:iterator_move(I, prefetch_stop)), - ?assertEqual({ok, <<"d">>, <<"z">>},eleveldb:iterator_move(I, prefetch_stop)), - ?assertEqual({ok, <<"a">>, <<"w">>},eleveldb:iterator_move(I, <<"a">>)), - ?assertEqual({ok, <<"b">>, <<"x">>},eleveldb:iterator_move(I, prefetch)), - ?assertEqual({ok, <<"c">>, <<"y">>},eleveldb:iterator_move(I, prefetch_stop)), - ?assertEqual({ok, <<"d">>, <<"z">>},eleveldb:iterator_move(I, prefetch)), - ?assertEqual({error,invalid_iterator},eleveldb:iterator_move(I, prefetch_stop)), - ?assertEqual({ok, <<"a">>, <<"w">>},eleveldb:iterator_move(I, <<"a">>)), - ?assertEqual({ok, <<"b">>, <<"x">>},eleveldb:iterator_move(I, prefetch)), - ?assertEqual({ok, <<"c">>, <<"y">>},eleveldb:iterator_move(I, prefetch)), - ?assertEqual({ok, <<"d">>, <<"z">>},eleveldb:iterator_move(I, prefetch)), - ?assertEqual({error,invalid_iterator},eleveldb:iterator_move(I, prefetch)), - ?assertEqual({error,invalid_iterator},eleveldb:iterator_move(I, prefetch_stop)), - ?assertEqual({error,invalid_iterator},eleveldb:iterator_move(I, prefetch_stop)), - ?assertEqual({error,invalid_iterator},eleveldb:iterator_move(I, prefetch_stop)), - ?assertEqual({ok, <<"a">>, <<"w">>},eleveldb:iterator_move(I, <<"a">>)) + I = assert_iterator(Ref, []), + ?assertEqual({ok, <<"b">>, <<"x">>}, eleveldb:iterator_move(I, <<"b">>)), + ?assertEqual({ok, <<"c">>, <<"y">>}, eleveldb:iterator_move(I, prefetch)), + ?assertEqual({ok, <<"d">>, <<"z">>}, eleveldb:iterator_move(I, prefetch_stop)), + ?assertEqual({ok, <<"a">>, <<"w">>}, eleveldb:iterator_move(I, <<"a">>)), + ?assertEqual({ok, <<"b">>, <<"x">>}, eleveldb:iterator_move(I, prefetch)), + ?assertEqual({ok, <<"c">>, <<"y">>}, eleveldb:iterator_move(I, prefetch_stop)), + ?assertEqual({ok, <<"a">>, <<"w">>}, eleveldb:iterator_move(I, <<"a">>)), + ?assertEqual({ok, <<"b">>, <<"x">>}, eleveldb:iterator_move(I, prefetch_stop)), + ?assertEqual({ok, <<"c">>, <<"y">>}, eleveldb:iterator_move(I, prefetch_stop)), + ?assertEqual({ok, <<"d">>, <<"z">>}, eleveldb:iterator_move(I, prefetch_stop)), + ?assertEqual({ok, <<"a">>, <<"w">>}, eleveldb:iterator_move(I, <<"a">>)), + ?assertEqual({ok, <<"b">>, <<"x">>}, eleveldb:iterator_move(I, prefetch)), + ?assertEqual({ok, <<"c">>, <<"y">>}, eleveldb:iterator_move(I, prefetch_stop)), + ?assertEqual({ok, <<"d">>, <<"z">>}, eleveldb:iterator_move(I, prefetch)), + ?assertEqual({error, invalid_iterator}, eleveldb:iterator_move(I, prefetch_stop)), + ?assertEqual({ok, <<"a">>, <<"w">>}, eleveldb:iterator_move(I, <<"a">>)), + ?assertEqual({ok, <<"b">>, <<"x">>}, eleveldb:iterator_move(I, prefetch)), + ?assertEqual({ok, <<"c">>, <<"y">>}, eleveldb:iterator_move(I, prefetch)), + ?assertEqual({ok, <<"d">>, <<"z">>}, eleveldb:iterator_move(I, prefetch)), + ?assertEqual({error, invalid_iterator}, eleveldb:iterator_move(I, prefetch)), + ?assertEqual({error, invalid_iterator}, eleveldb:iterator_move(I, prefetch_stop)), + ?assertEqual({error, invalid_iterator}, eleveldb:iterator_move(I, prefetch_stop)), + ?assertEqual({error, invalid_iterator}, eleveldb:iterator_move(I, prefetch_stop)), + ?assertEqual({ok, <<"a">>, <<"w">>}, eleveldb:iterator_move(I, <<"a">>)) end. aae_prefetch1(Ref) -> fun() -> - {ok, I} = eleveldb:iterator(Ref, []), - ?assertEqual({ok, <<"b">>, <<"x">>},eleveldb:iterator_move(I, <<"b">>)), - ?assertEqual({ok, <<"c">>, <<"y">>},eleveldb:iterator_move(I, prefetch_stop)), + I = assert_iterator(Ref, []), + ?assertEqual({ok, <<"b">>, <<"x">>}, eleveldb:iterator_move(I, <<"b">>)), + ?assertEqual({ok, <<"c">>, <<"y">>}, eleveldb:iterator_move(I, prefetch_stop)), - {ok, J} = eleveldb:iterator(Ref, []), - ?assertEqual({error, invalid_iterator},eleveldb:iterator_move(J, <<"z">>)), - ?assertEqual({error, invalid_iterator},eleveldb:iterator_move(J, prefetch_stop)) + J = assert_iterator(Ref, []), + ?assertEqual({error, invalid_iterator}, eleveldb:iterator_move(J, <<"z">>)), + ?assertEqual({error, invalid_iterator}, eleveldb:iterator_move(J, prefetch_stop)) end. aae_prefetch2(Ref) -> fun() -> - {ok, I} = eleveldb:iterator(Ref, []), - ?assertEqual({ok, <<"b">>, <<"x">>},eleveldb:iterator_move(I, <<"b">>)), - ?assertEqual({ok, <<"c">>, <<"y">>},eleveldb:iterator_move(I, prefetch)), - ?assertEqual({ok, <<"d">>, <<"z">>},eleveldb:iterator_move(I, prefetch_stop)), - - {ok, J} = eleveldb:iterator(Ref, []), - ?assertEqual({error, invalid_iterator},eleveldb:iterator_move(J, <<"z">>)), - ?assertEqual({error, invalid_iterator},eleveldb:iterator_move(J, prefetch)), - ?assertEqual({error, invalid_iterator},eleveldb:iterator_move(J, prefetch_stop)) + I = assert_iterator(Ref, []), + ?assertEqual({ok, <<"b">>, <<"x">>}, eleveldb:iterator_move(I, <<"b">>)), + ?assertEqual({ok, <<"c">>, <<"y">>}, eleveldb:iterator_move(I, prefetch)), + ?assertEqual({ok, <<"d">>, <<"z">>}, eleveldb:iterator_move(I, prefetch_stop)), + + J = assert_iterator(Ref, []), + ?assertEqual({error, invalid_iterator}, eleveldb:iterator_move(J, <<"z">>)), + ?assertEqual({error, invalid_iterator}, eleveldb:iterator_move(J, prefetch)), + ?assertEqual({error, invalid_iterator}, eleveldb:iterator_move(J, prefetch_stop)) end. aae_prefetch3(Ref) -> fun() -> - {ok, I} = eleveldb:iterator(Ref, []), - ?assertEqual({error,invalid_iterator},eleveldb:iterator_move(I, prefetch_stop)) + I = assert_iterator(Ref, []), + ?assertEqual({error, invalid_iterator}, eleveldb:iterator_move(I, prefetch_stop)) end. --endif. +-endif. % TEST diff --git a/test/rand_gen_1.erl b/test/rand_gen_1.erl index 704f0e88..37410601 100644 --- a/test/rand_gen_1.erl +++ b/test/rand_gen_1.erl @@ -24,13 +24,13 @@ random_bin(_Id, Size) -> %% Make keys that look like this: <<"001328681207_012345">> %% The suffix part (after the underscore) will be assigned either -%% erlang:now/0's milliseconds or an integer between 0 and MaxSuffix. +%% os:timestamp/0's milliseconds or an integer between 0 and MaxSuffix. %% The integer between 0 & MaxSuffix will be chosen PercentAlmostSeq %% percent of the time. almost_completely_sequential(_Id, MaxSuffix, PercentAlmostSeq) -> fun() -> - {A, B, C} = now(), + {A, B, C} = os:timestamp(), TimeT = (A*1000000) + B, End = case random:uniform(100) of N when N < PercentAlmostSeq -> @@ -45,8 +45,8 @@ almost_completely_sequential(_Id, MaxSuffix, PercentAlmostSeq) -> %% Make keys that look like this: <<"001328681207_012345">>. %% %% With probability of 1 - (MillionNotSequential/1000000), the keys -%% will be generated using erlang:now/0, where the suffix is exactly -%% equal to the microseconds portion of erlang:now/0's return value. +%% will be generated using os:timestamp/0, where the suffix is exactly +%% equal to the microseconds portion of os:timestamp/0's return value. %% Such keys will be perfectly sorted for time series-style keys: each %% key will be "greater than" any previous key. %% @@ -61,7 +61,7 @@ almost_completely_sequential(_Id, MaxSuffix, PercentAlmostSeq) -> mostly_sequential(_Id, MillionNotSequential) -> fun() -> - {A, B, C} = now(), + {A, B, C} = os:timestamp(), {X, Y, Z} = case random:uniform(1000*1000) of N when N < MillionNotSequential -> {A - random:uniform(3),