Skip to content

Commit

Permalink
queue: Add occupancy histogram
Browse files Browse the repository at this point in the history
  • Loading branch information
dtnaylor committed May 15, 2019
1 parent 1642bcb commit 6abb18a
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 6 deletions.
103 changes: 103 additions & 0 deletions bessctl/module_tests/queue_occupancy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Copyright (c) 2016-2019, Nefeli Networks, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the names of the copyright holders nor the names of their
# contributors may be used to endorse or promote products derived from this
# software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

from test_utils import *


class BessQueueOccupancyTest(BessModuleTestCase):
def _send_packets(self, q):
eth = scapy.Ether(src='02:1e:67:9f:4d:ae', dst='06:16:3e:1b:72:32')
ip = scapy.IP(src='172.16.0.2', dst='8.8.8.8')
tcp = scapy.TCP(sport=52428, dport=80)
l7 = 'helloworld'
pkt = eth / ip / tcp / l7

pkts = [pkt] * 100
_ = self.run_module(q, 0, pkts, [0])
return len(pkts)

def test_hist_enable(self):
q = Queue(size=1024, track_occupancy=True)
sent = self._send_packets(q)
resp = q.get_status()
self.assertEqual(resp.enqueued, sent)
self.assertEqual(resp.dequeued, sent)
self.assertEqual(resp.occupancy_summary.count, sent)

def test_hist_disable(self):
q = Queue(size=1024, track_occupancy=False)
sent = self._send_packets(q)
resp = q.get_status()
self.assertEqual(resp.enqueued, sent)
self.assertEqual(resp.dequeued, sent)
self.assertEqual(resp.occupancy_summary.count, 0)

def test_hist_size(self):
q = Queue(size=1024, track_occupancy=True)
resp = q.get_status()
self.assertEqual(resp.size, 1024)
self.assertEqual(resp.occupancy_summary.num_buckets, 32)
self.assertEqual(resp.occupancy_summary.bucket_width, 32)

q.set_size(size=2048)
resp = q.get_status()
self.assertEqual(resp.size, 2048)
self.assertEqual(resp.occupancy_summary.num_buckets, 32)
self.assertEqual(resp.occupancy_summary.bucket_width, 64)

q = Queue(size=1024, track_occupancy=True, occupancy_hist_buckets=64)
resp = q.get_status()
self.assertEqual(resp.size, 1024)
self.assertEqual(resp.occupancy_summary.num_buckets, 64)
self.assertEqual(resp.occupancy_summary.bucket_width, 16)

def test_hist_summary(self):
q = Queue(size=1024, track_occupancy=True)
sent = self._send_packets(q)

resp = q.get_status(occupancy_percentiles=[0.5, 0.9, 0.99])
self.assertEqual(resp.occupancy_summary.count, 100)
self.assertEqual(len(resp.occupancy_summary.percentile_values), 3)

resp = q.get_status(occupancy_percentiles=[0, 0.5, 0.9, 0.99])
self.assertEqual(resp.occupancy_summary.count, 100)
self.assertEqual(len(resp.occupancy_summary.percentile_values), 4)

resp = q.get_status(clear_hist=True)
self.assertEqual(resp.occupancy_summary.count, 100)

resp = q.get_status()
self.assertEqual(resp.occupancy_summary.count, 0)


suite = unittest.TestLoader().loadTestsFromTestCase(BessQueueOccupancyTest)
results = unittest.TextTestRunner(verbosity=2, stream=sys.stdout).run(suite)

if results.failures or results.errors:
sys.exit(1)
61 changes: 57 additions & 4 deletions core/modules/queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@

#include "../utils/format.h"

#define DEFAULT_QUEUE_SIZE 1024

const Commands Queue::cmds = {
{"set_burst", "QueueCommandSetBurstArg",
MODULE_CMD_FUNC(&Queue::CommandSetBurst), Command::THREAD_SAFE},
Expand Down Expand Up @@ -79,6 +77,10 @@ int Queue::Resize(int slots) {
queue_ = new_queue;
size_ = slots;

if (track_occupancy_) {
occupancy_hist_.Resize(occupancy_buckets_, slots / occupancy_buckets_);
}

if (backpressure_) {
AdjustWaterLevels();
}
Expand All @@ -97,6 +99,15 @@ CommandResponse Queue::Init(const bess::pb::QueueArg &arg) {

burst_ = bess::PacketBatch::kMaxBurst;

if (arg.track_occupancy()) {
track_occupancy_ = true;
occupancy_buckets_ = kDefaultBuckets;
if (arg.occupancy_hist_buckets() != 0) {
occupancy_buckets_ = arg.occupancy_hist_buckets();
}
VLOG(1) << "Occupancy tracking enabled for " << name() << "::Queue (" << occupancy_buckets_ << " buckets)";
}

if (arg.backpressure()) {
VLOG(1) << "Backpressure enabled for " << name() << "::Queue";
backpressure_ = true;
Expand Down Expand Up @@ -191,7 +202,19 @@ struct task_result Queue::RunTask(Context *ctx, bess::PacketBatch *batch,

RunNextModule(ctx, batch);

if (backpressure_ && llring_count(queue_) < low_water_) {
uint32_t occupancy;
if (track_occupancy_ || backpressure_) {
occupancy = llring_count(queue_);
}

if (track_occupancy_) {
mcslock_node_t mynode;
mcs_lock(&lock_, &mynode);
occupancy_hist_.Insert(occupancy);
mcs_unlock(&lock_, &mynode);
}

if (backpressure_ && occupancy < low_water_) {
SignalUnderload();
}

Expand Down Expand Up @@ -236,16 +259,46 @@ CommandResponse Queue::CommandSetSize(
}

CommandResponse Queue::CommandGetStatus(
const bess::pb::QueueCommandGetStatusArg &) {
const bess::pb::QueueCommandGetStatusArg &arg) {
bess::pb::QueueCommandGetStatusResponse resp;

std::vector<double> occupancy_percentiles;
std::copy(arg.occupancy_percentiles().begin(), arg.occupancy_percentiles().end(),
back_inserter(occupancy_percentiles));
if (!IsValidPercentiles(occupancy_percentiles)) {
return CommandFailure(EINVAL, "invalid 'occupancy_percentiles'");
}
const auto &occupancy_summary = occupancy_hist_.Summarize(occupancy_percentiles);

resp.set_count(llring_count(queue_));
resp.set_size(size_);
resp.set_enqueued(stats_.enqueued);
resp.set_dequeued(stats_.dequeued);
resp.set_dropped(stats_.dropped);
SetSummary(resp.mutable_occupancy_summary(), occupancy_summary);

if (arg.clear_hist()) {
// Note that some samples might be lost due to the small gap between
// Summarize() and the next mcs_lock... but we posit that smaller
// critical section is more important.
ClearOccupancyHist();
}

return CommandSuccess(resp);
}

void Queue::ClearOccupancyHist() {
// vector initialization is expensive thus should be out of critical section
decltype(occupancy_hist_) new_occupancy_hist(occupancy_hist_.num_buckets(),
occupancy_hist_.bucket_width());

// Use move semantics to minimize critical section
mcslock_node_t mynode;
mcs_lock(&lock_, &mynode);
occupancy_hist_ = std::move(new_occupancy_hist);
mcs_unlock(&lock_, &mynode);
}

void Queue::AdjustWaterLevels() {
high_water_ = static_cast<uint64_t>(size_ * kHighWaterRatio);
low_water_ = static_cast<uint64_t>(size_ * kLowWaterRatio);
Expand Down
18 changes: 17 additions & 1 deletion core/modules/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
#include "../kmod/llring.h"
#include "../module.h"
#include "../pb/module_msg.pb.h"
#include "../utils/histogram.h"
#include "../utils/mcslock.h"

#define DEFAULT_QUEUE_SIZE 1024

class Queue : public Module {
public:
Expand All @@ -48,7 +52,9 @@ class Queue : public Module {
size_(),
high_water_(),
low_water_(),
stats_() {
stats_(),
track_occupancy_(),
occupancy_hist_(kDefaultBuckets, kDefaultBucketWidth) {
is_task_ = true;
propagate_workers_ = false;
max_allowed_workers_ = Worker::kMaxWorkers;
Expand Down Expand Up @@ -77,6 +83,8 @@ class Queue : public Module {

int Resize(int slots);

void ClearOccupancyHist();

// Readjusts the water level according to `size_`.
void AdjustWaterLevels();

Expand Down Expand Up @@ -105,6 +113,14 @@ class Queue : public Module {
uint64_t dequeued;
uint64_t dropped;
} stats_;

// Queue occupancy statistics
const uint64_t kDefaultBuckets = 32;
const uint64_t kDefaultBucketWidth = DEFAULT_QUEUE_SIZE / kDefaultBuckets;
bool track_occupancy_;
uint64_t occupancy_buckets_;
Histogram<uint64_t> occupancy_hist_;
mcslock lock_;
};

#endif // BESS_MODULES_QUEUE_H_
8 changes: 7 additions & 1 deletion protobuf/module_msg.proto
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,10 @@ message QueueCommandSetSizeArg {
* Modules that are queues or contain queues may contain functions
* `get_status()` that return QueueCommandGetStatusResponse.
*/
message QueueCommandGetStatusArg {}
message QueueCommandGetStatusArg {
bool clear_hist = 1; /// if true, occupancy histogram will be all cleared after read
repeated double occupancy_percentiles = 2; /// ascending list of real numbers in [0.0, 100.0]
}

/**
* Modules that are queues or contain queues may contain functions
Expand All @@ -346,6 +349,7 @@ message QueueCommandGetStatusResponse {
uint64 enqueued = 3; /// total enqueued
uint64 dequeued = 4; /// total dequeued
uint64 dropped = 5; /// total dropped
HistogramSummary occupancy_summary = 6; /// Valid only if queue created with track_occupancy
}

/**
Expand Down Expand Up @@ -810,6 +814,8 @@ message QueueArg {
uint64 size = 1; /// The maximum number of packets to store in the queue.
bool prefetch = 2; /// When prefetch is enabled, the module will perform CPU prefetch on the first 64B of each packet onto CPU L1 cache. Default value is false.
bool backpressure = 3; // When backpressure is enabled, the module will notify upstream if it is overloaded.
bool track_occupancy = 4; // When occupancy tracking is enabled, the module will keep a histogram of queue occupancies (observations recorded after each dequeue).
uint64 occupancy_hist_buckets = 5; // The number of buckets to use in the histogram when occupancy tracking is enabled.
}

/**
Expand Down

0 comments on commit 6abb18a

Please sign in to comment.