Skip to content

Commit

Permalink
Relax Throttle::_reset_max conditions and associated unit tests
Browse files Browse the repository at this point in the history
Removes a condition in Throttle::_reset_max by which the waiting queue is only
Signal()ed if the new maximum is lower than the current maximum.
There is no evidence of a use case where such a restriction would be
useful. In addition waking up a thread when the maximum increases
gives it a chance to immediately continue the suspended process
instead of waiting for the next put().

Create a new test file covering 100% of src/Throttle.{cc,h} lines of code.
The following methods are tested:

* Throttle::Throttle with and without a maximum
* Throttle::~Throttle when each pending Cond is deleted
* Throttle::take
* Throttle::get when updating the maximum ( lower or higher ),
  when going to sleep waiting for the count to lower under
  the maximum, when going to sleep because another thread is
  already asleep waiting
* Throttle::get_or_fail when there is no maximum,
  when requesting a count that is larger than the maximum, either
  when the current value is under the maximum or above the maximum.
* Throttle::wait when used to reset the maximum and wake up
  another thread asleep waiting

All asserts checking the arguments sanity are exercised ( negative argument
for Throttle::take etc. ).
Adds the LGPLv2+ licensing terms to COPYING along with the others.
Adds a Contributors section to the AUTHORS file.

Notes:
Testing asserts outputs verbose error messages that should be silenced
but it does not seem possible.

Signed-off-by: Loic Dachary <[email protected]>
  • Loading branch information
Loic Dachary committed Feb 5, 2013
1 parent 4b4dba3 commit 64ded02
Show file tree
Hide file tree
Showing 5 changed files with 267 additions and 1 deletion.
4 changes: 4 additions & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,7 @@ Patience Warnick <[email protected]>
Yehuda Sadeh-Weinraub <[email protected]>
Greg Farnum <[email protected]>

Contributors
------------

Loic Dachary <[email protected]>
3 changes: 3 additions & 0 deletions COPYING
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,6 @@ License:



Files: test/common/Throttle.cc
Copyright: Copyright (C) 2013 Cloudwatt <[email protected]>
License: LGPL2 or later
6 changes: 6 additions & 0 deletions src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,12 @@ unittest_log_LDADD = libcommon.la ${UNITTEST_LDADD}
unittest_log_CXXFLAGS = ${AM_CXXFLAGS} ${UNITTEST_CXXFLAGS} -O2
check_PROGRAMS += unittest_log

unittest_throttle_SOURCES = test/common/Throttle.cc
unittest_throttle_LDFLAGS = $(PTHREAD_CFLAGS) ${AM_LDFLAGS}
unittest_throttle_LDADD = libcommon.la ${LIBGLOBAL_LDA} ${UNITTEST_LDADD}
unittest_throttle_CXXFLAGS = ${AM_CXXFLAGS} ${UNITTEST_CXXFLAGS} -O2
check_PROGRAMS += unittest_throttle

unittest_base64_SOURCES = test/base64.cc
unittest_base64_LDFLAGS = $(PTHREAD_CFLAGS) ${AM_LDFLAGS}
unittest_base64_LDADD = libcephfs.la -lm ${UNITTEST_LDADD}
Expand Down
2 changes: 1 addition & 1 deletion src/common/Throttle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ Throttle::~Throttle()
void Throttle::_reset_max(int64_t m)
{
assert(lock.is_locked());
if (m < ((int64_t)max.read()) && !cond.empty())
if (!cond.empty())
cond.front()->SignalOne();
logger->set(l_throttle_max, m);
max.set((size_t)m);
Expand Down
253 changes: 253 additions & 0 deletions src/test/common/Throttle.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2013 Cloudwatt <[email protected]>
*
* Author: Loic Dachary <[email protected]>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Library Public License as published by
* the Free Software Foundation; either version 2, or (at your option)
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Library Public License for more details.
*
*/

#include <stdio.h>
#include <signal.h>
#include "common/Mutex.h"
#include "common/Thread.h"
#include "common/Throttle.h"
#include "common/ceph_argparse.h"
#include "global/global_init.h"
#include <gtest/gtest.h>

class ThrottleTest : public ::testing::Test {
protected:

class Thread_get : public Thread {
public:
Throttle &throttle;
int64_t count;
bool waited;

Thread_get(Throttle& _throttle, int64_t _count) :
throttle(_throttle),
count(_count),
waited(false)
{
}

virtual void *entry() {
waited = throttle.get(count);
throttle.put(count);
return NULL;
}
};

};

TEST_F(ThrottleTest, Throttle) {
ASSERT_THROW({
Throttle throttle(g_ceph_context, "throttle", -1);
}, FailedAssertion);

int64_t throttle_max = 10;
Throttle throttle(g_ceph_context, "throttle", throttle_max);
ASSERT_EQ(throttle.get_max(), throttle_max);
ASSERT_EQ(throttle.get_current(), 0);
}

TEST_F(ThrottleTest, take) {
int64_t throttle_max = 10;
Throttle throttle(g_ceph_context, "throttle", throttle_max);
ASSERT_THROW(throttle.take(-1), FailedAssertion);
ASSERT_EQ(throttle.take(throttle_max), throttle_max);
ASSERT_EQ(throttle.take(throttle_max), throttle_max * 2);
}

TEST_F(ThrottleTest, get) {
int64_t throttle_max = 10;
Throttle throttle(g_ceph_context, "throttle", throttle_max);
ASSERT_THROW(throttle.get(-1), FailedAssertion);
ASSERT_FALSE(throttle.get(5));
ASSERT_EQ(throttle.put(5), 0);

ASSERT_FALSE(throttle.get(throttle_max));
ASSERT_FALSE(throttle.get_or_fail(1));
ASSERT_FALSE(throttle.get(1, throttle_max + 1));
ASSERT_EQ(throttle.put(throttle_max + 1), 0);
ASSERT_FALSE(throttle.get(0, throttle_max));
ASSERT_FALSE(throttle.get(throttle_max));
ASSERT_FALSE(throttle.get_or_fail(1));
ASSERT_EQ(throttle.put(throttle_max), 0);

useconds_t delay = 1;

bool waited;

do {
cout << "Trying (1) with delay " << delay << "us\n";

ASSERT_FALSE(throttle.get(throttle_max));
ASSERT_FALSE(throttle.get_or_fail(throttle_max));

Thread_get t(throttle, 7);
t.create();
usleep(delay);
ASSERT_EQ(throttle.put(throttle_max), 0);
t.join();

if (!(waited = t.waited))
delay *= 2;
} while(!waited);

do {
cout << "Trying (2) with delay " << delay << "us\n";

ASSERT_FALSE(throttle.get(throttle_max / 2));
ASSERT_FALSE(throttle.get_or_fail(throttle_max));

Thread_get t(throttle, throttle_max);
t.create();
usleep(delay);

Thread_get u(throttle, 1);
u.create();
usleep(delay);

throttle.put(throttle_max / 2);

t.join();
u.join();

if (!(waited = t.waited && u.waited))
delay *= 2;
} while(!waited);

}

TEST_F(ThrottleTest, get_or_fail) {
{
Throttle throttle(g_ceph_context, "throttle");

ASSERT_TRUE(throttle.get_or_fail(5));
ASSERT_TRUE(throttle.get_or_fail(5));
}

{
int64_t throttle_max = 10;
Throttle throttle(g_ceph_context, "throttle", throttle_max);

ASSERT_TRUE(throttle.get_or_fail(throttle_max));
ASSERT_EQ(throttle.put(throttle_max), 0);

ASSERT_TRUE(throttle.get_or_fail(throttle_max * 2));
ASSERT_FALSE(throttle.get_or_fail(1));
ASSERT_FALSE(throttle.get_or_fail(throttle_max * 2));
ASSERT_EQ(throttle.put(throttle_max * 2), 0);

ASSERT_TRUE(throttle.get_or_fail(throttle_max));
ASSERT_FALSE(throttle.get_or_fail(1));
ASSERT_EQ(throttle.put(throttle_max), 0);
}
}

TEST_F(ThrottleTest, wait) {
int64_t throttle_max = 10;
Throttle throttle(g_ceph_context, "throttle", throttle_max);

useconds_t delay = 1;

bool waited;

do {
cout << "Trying (3) with delay " << delay << "us\n";

ASSERT_FALSE(throttle.get(throttle_max / 2));
ASSERT_FALSE(throttle.get_or_fail(throttle_max));

Thread_get t(throttle, throttle_max);
t.create();
usleep(delay);

//
// Throttle::_reset_max(int64_t m) used to contain a test
// that blocked the following statement, only if
// the argument was greater than throttle_max.
// Although a value lower than throttle_max would cover
// the same code in _reset_max, the throttle_max * 100
// value is left here to demonstrate that the problem
// has been solved.
//
throttle.wait(throttle_max * 100);
usleep(delay);
ASSERT_EQ(throttle.get_current(), throttle_max / 2);


t.join();

if (!(waited = t.waited))
delay *= 2;
} while(!waited);

}

TEST_F(ThrottleTest, destructor) {
Thread_get *t;
{
int64_t throttle_max = 10;
Throttle *throttle = new Throttle(g_ceph_context, "throttle", throttle_max);

ASSERT_FALSE(throttle->get(5));

t = new Thread_get(*throttle, 7);
t->create();
bool blocked;
useconds_t delay = 1;
do {
usleep(delay);
if (throttle->get_or_fail(1)) {
throttle->put(1);
blocked = false;
} else {
blocked = true;
}
delay *= 2;
} while(!blocked);
delete throttle;
}

{ //
// The thread is left hanging, otherwise it will abort().
// Deleting the Throttle on which it is waiting creates a
// inconsistency that will be detected: the Throttle object that
// it references no longer exists.
//
pthread_t id = t->get_thread_id();
ASSERT_EQ(pthread_kill(id, 0), 0);
delete t;
ASSERT_EQ(pthread_kill(id, 0), 0);
}
}

int main(int argc, char **argv) {
vector<const char*> args;
argv_to_vec(argc, (const char **)argv, args);

global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0);
common_init_finish(g_ceph_context);

::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

// Local Variables:
// compile-command: "cd ../.. ; make unittest_throttle ; ./unittest_throttle # --gtest_filter=ThrottleTest.destructor --log-to-stderr=true --debug-filestore=20"
// End:

0 comments on commit 64ded02

Please sign in to comment.