diff --git a/AUTHORS b/AUTHORS index 08f3b1ca7293b..289f54bf67baf 100644 --- a/AUTHORS +++ b/AUTHORS @@ -13,3 +13,7 @@ Patience Warnick Yehuda Sadeh-Weinraub Greg Farnum +Contributors +------------ + +Loic Dachary diff --git a/COPYING b/COPYING index 20ab537172d4a..888e30e679f4f 100644 --- a/COPYING +++ b/COPYING @@ -98,3 +98,6 @@ License: +Files: test/common/Throttle.cc +Copyright: Copyright (C) 2013 Cloudwatt +License: LGPL2 or later diff --git a/src/Makefile.am b/src/Makefile.am index efff334e0456b..04234229236ac 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -671,6 +671,12 @@ unittest_log_LDADD = libcommon.la ${UNITTEST_LDADD} unittest_log_CXXFLAGS = ${AM_CXXFLAGS} ${UNITTEST_CXXFLAGS} -O2 check_PROGRAMS += unittest_log +unittest_throttle_SOURCES = test/common/Throttle.cc +unittest_throttle_LDFLAGS = $(PTHREAD_CFLAGS) ${AM_LDFLAGS} +unittest_throttle_LDADD = libcommon.la ${LIBGLOBAL_LDA} ${UNITTEST_LDADD} +unittest_throttle_CXXFLAGS = ${AM_CXXFLAGS} ${UNITTEST_CXXFLAGS} -O2 +check_PROGRAMS += unittest_throttle + unittest_base64_SOURCES = test/base64.cc unittest_base64_LDFLAGS = $(PTHREAD_CFLAGS) ${AM_LDFLAGS} unittest_base64_LDADD = libcephfs.la -lm ${UNITTEST_LDADD} diff --git a/src/common/Throttle.cc b/src/common/Throttle.cc index 844263aa111d2..82ffe7a9fc58a 100644 --- a/src/common/Throttle.cc +++ b/src/common/Throttle.cc @@ -65,7 +65,7 @@ Throttle::~Throttle() void Throttle::_reset_max(int64_t m) { assert(lock.is_locked()); - if (m < ((int64_t)max.read()) && !cond.empty()) + if (!cond.empty()) cond.front()->SignalOne(); logger->set(l_throttle_max, m); max.set((size_t)m); diff --git a/src/test/common/Throttle.cc b/src/test/common/Throttle.cc new file mode 100644 index 0000000000000..f50ef2e1b7b98 --- /dev/null +++ b/src/test/common/Throttle.cc @@ -0,0 +1,253 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2013 Cloudwatt + * + * Author: Loic Dachary + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Library Public License as published by + * the Free Software Foundation; either version 2, or (at your option) + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Library Public License for more details. + * + */ + +#include +#include +#include "common/Mutex.h" +#include "common/Thread.h" +#include "common/Throttle.h" +#include "common/ceph_argparse.h" +#include "global/global_init.h" +#include + +class ThrottleTest : public ::testing::Test { +protected: + + class Thread_get : public Thread { + public: + Throttle &throttle; + int64_t count; + bool waited; + + Thread_get(Throttle& _throttle, int64_t _count) : + throttle(_throttle), + count(_count), + waited(false) + { + } + + virtual void *entry() { + waited = throttle.get(count); + throttle.put(count); + return NULL; + } + }; + +}; + +TEST_F(ThrottleTest, Throttle) { + ASSERT_THROW({ + Throttle throttle(g_ceph_context, "throttle", -1); + }, FailedAssertion); + + int64_t throttle_max = 10; + Throttle throttle(g_ceph_context, "throttle", throttle_max); + ASSERT_EQ(throttle.get_max(), throttle_max); + ASSERT_EQ(throttle.get_current(), 0); +} + +TEST_F(ThrottleTest, take) { + int64_t throttle_max = 10; + Throttle throttle(g_ceph_context, "throttle", throttle_max); + ASSERT_THROW(throttle.take(-1), FailedAssertion); + ASSERT_EQ(throttle.take(throttle_max), throttle_max); + ASSERT_EQ(throttle.take(throttle_max), throttle_max * 2); +} + +TEST_F(ThrottleTest, get) { + int64_t throttle_max = 10; + Throttle throttle(g_ceph_context, "throttle", throttle_max); + ASSERT_THROW(throttle.get(-1), FailedAssertion); + ASSERT_FALSE(throttle.get(5)); + ASSERT_EQ(throttle.put(5), 0); + + ASSERT_FALSE(throttle.get(throttle_max)); + ASSERT_FALSE(throttle.get_or_fail(1)); + ASSERT_FALSE(throttle.get(1, throttle_max + 1)); + ASSERT_EQ(throttle.put(throttle_max + 1), 0); + ASSERT_FALSE(throttle.get(0, throttle_max)); + ASSERT_FALSE(throttle.get(throttle_max)); + ASSERT_FALSE(throttle.get_or_fail(1)); + ASSERT_EQ(throttle.put(throttle_max), 0); + + useconds_t delay = 1; + + bool waited; + + do { + cout << "Trying (1) with delay " << delay << "us\n"; + + ASSERT_FALSE(throttle.get(throttle_max)); + ASSERT_FALSE(throttle.get_or_fail(throttle_max)); + + Thread_get t(throttle, 7); + t.create(); + usleep(delay); + ASSERT_EQ(throttle.put(throttle_max), 0); + t.join(); + + if (!(waited = t.waited)) + delay *= 2; + } while(!waited); + + do { + cout << "Trying (2) with delay " << delay << "us\n"; + + ASSERT_FALSE(throttle.get(throttle_max / 2)); + ASSERT_FALSE(throttle.get_or_fail(throttle_max)); + + Thread_get t(throttle, throttle_max); + t.create(); + usleep(delay); + + Thread_get u(throttle, 1); + u.create(); + usleep(delay); + + throttle.put(throttle_max / 2); + + t.join(); + u.join(); + + if (!(waited = t.waited && u.waited)) + delay *= 2; + } while(!waited); + +} + +TEST_F(ThrottleTest, get_or_fail) { + { + Throttle throttle(g_ceph_context, "throttle"); + + ASSERT_TRUE(throttle.get_or_fail(5)); + ASSERT_TRUE(throttle.get_or_fail(5)); + } + + { + int64_t throttle_max = 10; + Throttle throttle(g_ceph_context, "throttle", throttle_max); + + ASSERT_TRUE(throttle.get_or_fail(throttle_max)); + ASSERT_EQ(throttle.put(throttle_max), 0); + + ASSERT_TRUE(throttle.get_or_fail(throttle_max * 2)); + ASSERT_FALSE(throttle.get_or_fail(1)); + ASSERT_FALSE(throttle.get_or_fail(throttle_max * 2)); + ASSERT_EQ(throttle.put(throttle_max * 2), 0); + + ASSERT_TRUE(throttle.get_or_fail(throttle_max)); + ASSERT_FALSE(throttle.get_or_fail(1)); + ASSERT_EQ(throttle.put(throttle_max), 0); + } +} + +TEST_F(ThrottleTest, wait) { + int64_t throttle_max = 10; + Throttle throttle(g_ceph_context, "throttle", throttle_max); + + useconds_t delay = 1; + + bool waited; + + do { + cout << "Trying (3) with delay " << delay << "us\n"; + + ASSERT_FALSE(throttle.get(throttle_max / 2)); + ASSERT_FALSE(throttle.get_or_fail(throttle_max)); + + Thread_get t(throttle, throttle_max); + t.create(); + usleep(delay); + + // + // Throttle::_reset_max(int64_t m) used to contain a test + // that blocked the following statement, only if + // the argument was greater than throttle_max. + // Although a value lower than throttle_max would cover + // the same code in _reset_max, the throttle_max * 100 + // value is left here to demonstrate that the problem + // has been solved. + // + throttle.wait(throttle_max * 100); + usleep(delay); + ASSERT_EQ(throttle.get_current(), throttle_max / 2); + + + t.join(); + + if (!(waited = t.waited)) + delay *= 2; + } while(!waited); + +} + +TEST_F(ThrottleTest, destructor) { + Thread_get *t; + { + int64_t throttle_max = 10; + Throttle *throttle = new Throttle(g_ceph_context, "throttle", throttle_max); + + ASSERT_FALSE(throttle->get(5)); + + t = new Thread_get(*throttle, 7); + t->create(); + bool blocked; + useconds_t delay = 1; + do { + usleep(delay); + if (throttle->get_or_fail(1)) { + throttle->put(1); + blocked = false; + } else { + blocked = true; + } + delay *= 2; + } while(!blocked); + delete throttle; + } + + { // + // The thread is left hanging, otherwise it will abort(). + // Deleting the Throttle on which it is waiting creates a + // inconsistency that will be detected: the Throttle object that + // it references no longer exists. + // + pthread_t id = t->get_thread_id(); + ASSERT_EQ(pthread_kill(id, 0), 0); + delete t; + ASSERT_EQ(pthread_kill(id, 0), 0); + } +} + +int main(int argc, char **argv) { + vector args; + argv_to_vec(argc, (const char **)argv, args); + + global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0); + common_init_finish(g_ceph_context); + + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +// Local Variables: +// compile-command: "cd ../.. ; make unittest_throttle ; ./unittest_throttle # --gtest_filter=ThrottleTest.destructor --log-to-stderr=true --debug-filestore=20" +// End: