Skip to content

Commit

Permalink
refs #6, #8: Unify task queue for batches and offloading.
Browse files Browse the repository at this point in the history
 * No significant performance changes.
  • Loading branch information
achimnol committed Jan 13, 2016
1 parent f6b3a73 commit ae60d38
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 117 deletions.
3 changes: 0 additions & 3 deletions include/nba/element/element.hh
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,6 @@ public:
* it has sufficient amount of work. */
int offload(ElementGraph *mother, PacketBatch *in_batch, int input_port);

/** Immediately begins offloading of the given (reused) offload-task. */
int offload(ElementGraph *mother, OffloadTask *reused_offl_task, int input_port);

/** Stores the batches that are returned from offloading. */
int enqueue_batch(PacketBatch *batch);

Expand Down
10 changes: 1 addition & 9 deletions include/nba/framework/elementgraph.hh
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,6 @@ public:
void enqueue_batch(PacketBatch *batch, Element *start_elem, int input_port = 0);
void enqueue_offload_task(OffloadTask *otask, Element *start_elem, int input_port = 0);

// TODO: merge with flush_tasks()
/* Tries to execute all pending offloaded tasks.
* This method does not allocate/free any batches. */
void flush_offloaded_tasks();

/* Tries to run all pending computation tasks. */
void flush_tasks();

Expand Down Expand Up @@ -123,7 +118,6 @@ private:
comp_thread_context *ctx;

FixedRing<void *, nullptr> queue;
FixedRing<OffloadTask *, nullptr> ready_tasks[NBA_MAX_COPROCESSOR_TYPES];

/* Executes the element graph for the given batch and free it after
* processing. Internally it manages a queue to handle diverged paths
Expand All @@ -133,14 +127,12 @@ private:
* the batch to the delayed_batches queue. */
void process_batch(PacketBatch *batch);
void process_offload_task(OffloadTask *otask);
void send_offload_task_to_device(OffloadTask *task);

struct rte_hash *offl_actions;

/* The entry point of packet processing pipeline (graph). */
SchedulableElement *input_elem;

friend int OffloadableElement::offload(ElementGraph *mother, OffloadTask *otask, int input_port);
friend int OffloadableElement::offload(ElementGraph *mother, PacketBatch *in_batch, int input_port);
};

}
Expand Down
2 changes: 1 addition & 1 deletion include/nba/framework/offloadtask.hh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

namespace nba {

enum TaskStates {
enum TaskStates : int {
TASK_INITIALIZING = 0,
TASK_INITIALIZED = 1,
TASK_PREPARED = 2,
Expand Down
16 changes: 1 addition & 15 deletions src/lib/element.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,20 +176,6 @@ int Element::configure(comp_thread_context *ctx, vector<string> &args) {
return 0;
}

int OffloadableElement::offload(ElementGraph *mother, OffloadTask *otask, int input_port)
{
int dev_idx = 0;
uint64_t now = rte_rdtsc();
assert(otask != nullptr);
otask->state = TASK_INITIALIZING;
otask->task_id += 100000; // for debugging
otask->offload_start = now;
otask->state = TASK_PREPARED;
mother->ready_tasks[dev_idx].push_back(otask);
/* This should always succeed. */
return 0;
}

int OffloadableElement::offload(ElementGraph *mother, PacketBatch *batch, int input_port)
{
int dev_idx = 0;
Expand Down Expand Up @@ -260,7 +246,7 @@ int OffloadableElement::offload(ElementGraph *mother, PacketBatch *batch, int in
otask->offload_start = now;

otask->state = TASK_INITIALIZED;
mother->ready_tasks[dev_idx].push_back(otask);
mother->enqueue_offload_task(otask, this, input_port);
#ifdef USE_NVPROF
nvtxRangePop();
#endif
Expand Down
156 changes: 68 additions & 88 deletions src/lib/elementgraph.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@ ElementGraph::ElementGraph(comp_thread_context *ctx)
this->ctx = ctx;
input_elem = nullptr;
assert(0 == rte_malloc_validate(ctx, NULL));
/* IMPORTANT: ready_tasks must be larger than task_pool. */
assert(ready_task_qlen <= ctx->num_taskpool_size);
for (int i = 0; i < NBA_MAX_COPROCESSOR_TYPES; i++)
ready_tasks[i].init(ready_task_qlen, ctx->loc.node_id);

struct rte_hash_parameters hparams;
char namebuf[RTE_HASH_NAMESIZE];
Expand All @@ -56,96 +52,78 @@ ElementGraph::ElementGraph(comp_thread_context *ctx)
assert(offl_actions != nullptr);
}

void ElementGraph::flush_offloaded_tasks()
void ElementGraph::send_offload_task_to_device(OffloadTask *task)
{
if (unlikely(ctx->io_ctx->loop_broken))
return;

for (int dev_idx = 0; dev_idx < NBA_MAX_COPROCESSOR_TYPES; dev_idx++) {

//uint64_t len_ready_tasks = ready_tasks[dev_idx].size();
//print_ratelimit("# ready tasks", len_ready_tasks, 10000);
// TODO: now it's possible to merge multiple tasks to increase batch size!

while (!ready_tasks[dev_idx].empty()) {
OffloadTask *task = ready_tasks[dev_idx].front();
assert(task != nullptr);

/* Start offloading! */
// TODO: create multiple cctx_list and access them via dev_idx for hetero-device systems.
ComputeContext *cctx = ctx->cctx_list.front();
assert(cctx != nullptr);
#ifdef USE_NVPROF
nvtxRangePush("offl_prepare");
#endif
task->cctx = cctx;

/* Prepare to offload. */
if (task->state < TASK_PREPARED) {
bool had_io_base = (task->io_base != INVALID_IO_BASE);
bool has_io_base = false;
if (task->io_base == INVALID_IO_BASE) {
task->io_base = cctx->alloc_io_base();
has_io_base = (task->io_base != INVALID_IO_BASE);
}
if (has_io_base) {
/* In the GPU side, datablocks argument has only used
* datablocks in the beginning of the array (not sparsely). */
int datablock_ids[NBA_MAX_DATABLOCKS];
size_t num_db_used = task->elem->get_used_datablocks(datablock_ids);
for (unsigned k = 0; k < num_db_used; k++) {
int dbid = datablock_ids[k];
task->datablocks.push_back(dbid);
task->dbid_h2d[dbid] = k;
}
/* Start offloading! */
// TODO: create multiple cctx_list and access them via dev_idx for hetero-device systems.
const int dev_idx = 0;
ComputeContext *cctx = ctx->cctx_list.front();
assert(cctx != nullptr);
#ifdef USE_NVPROF
nvtxRangePush("offl_prepare");
#endif
task->cctx = cctx;

/* Prepare to offload. */
if (task->state < TASK_PREPARED) {
//bool had_io_base = (task->io_base != INVALID_IO_BASE);
bool has_io_base = false;
if (task->io_base == INVALID_IO_BASE) {
task->io_base = cctx->alloc_io_base();
has_io_base = (task->io_base != INVALID_IO_BASE);
}
if (has_io_base) {
/* In the GPU side, datablocks argument has only used
* datablocks in the beginning of the array (not sparsely). */
int datablock_ids[NBA_MAX_DATABLOCKS];
size_t num_db_used = task->elem->get_used_datablocks(datablock_ids);
for (unsigned k = 0; k < num_db_used; k++) {
int dbid = datablock_ids[k];
task->datablocks.push_back(dbid);
task->dbid_h2d[dbid] = k;
}

size_t num_batches = task->batches.size();
/* As we reuse tasks between subsequent offloadables
* and only does in linear groups of elements,
* it is okay to check only the first batch. */
if (task->batches[0]->datablock_states == nullptr) {
void *dbstates[num_batches];
int bidx = 0;
assert(0 == rte_mempool_get_bulk(ctx->dbstate_pool, (void **) &dbstates,
num_batches));
for (PacketBatch *batch : task->batches) {
batch->datablock_states = (struct datablock_tracker *) dbstates[bidx];
bidx ++;
}
}
task->offload_start = 0;

/* Calculate required buffer sizes, allocate them, and initialize them.
* The mother buffer is statically allocated on start-up and here we
* reserve regions inside it. */
task->prepare_read_buffer();
task->prepare_write_buffer();
task->state = TASK_PREPARED;
} /* endif(has_io_base) */
} /* endif(!task.prepared) */

if (task->state == TASK_PREPARED) {
/* Enqueue the offload task. */
int ret = rte_ring_enqueue(ctx->offload_input_queues[dev_idx], (void*) task);
if (ret == -EDQUOT) {
ready_tasks[dev_idx].pop_front();
break;
} else if (ret == -ENOBUFS) {
break;
} else {
ready_tasks[dev_idx].pop_front();
ev_async_send(ctx->coproc_ctx->loop, ctx->offload_devices->at(dev_idx)->input_watcher);
if (ctx->inspector) ctx->inspector->dev_sent_batch_count[0] += task->batches.size();
size_t num_batches = task->batches.size();
/* As we reuse tasks between subsequent offloadables
* and only does in linear groups of elements,
* it is okay to check only the first batch. */
if (task->batches[0]->datablock_states == nullptr) {
void *dbstates[num_batches];
int bidx = 0;
assert(0 == rte_mempool_get_bulk(ctx->dbstate_pool, (void **) &dbstates,
num_batches));
for (PacketBatch *batch : task->batches) {
batch->datablock_states = (struct datablock_tracker *) dbstates[bidx];
bidx ++;
}
#ifdef USE_NVPROF
nvtxRangePop();
#endif
} else {
/* Delay the current offloading task and break. */
break;
}
} /* endwhile(ready_tasks) */
task->offload_start = 0;

/* Calculate required buffer sizes, allocate them, and initialize them.
* The mother buffer is statically allocated on start-up and here we
* reserve regions inside it. */
task->prepare_read_buffer();
task->prepare_write_buffer();
} /* endif(has_io_base) */
task->state = TASK_PREPARED;
} /* endif(!task.prepared) */

/* Send the offload task to device thread. */
assert(task->state == TASK_PREPARED);
int ret = rte_ring_enqueue(ctx->offload_input_queues[dev_idx], (void*) task);
if (ret == -ENOBUFS) {
enqueue_offload_task(task, task->tracker.element, task->tracker.input_port);
} else {
ev_async_send(ctx->coproc_ctx->loop, ctx->offload_devices->at(dev_idx)->input_watcher);
if (ctx->inspector) ctx->inspector->dev_sent_batch_count[0] += task->batches.size();
}
#ifdef USE_NVPROF
nvtxRangePop();
#endif
return;
}

void ElementGraph::free_batch(PacketBatch *batch, bool free_pkts)
Expand Down Expand Up @@ -593,8 +571,10 @@ void ElementGraph::process_batch(PacketBatch *batch)

void ElementGraph::process_offload_task(OffloadTask *otask)
{
OffloadableElement *offloadable = otask->elem;
assert(offloadable->offload(this, otask, otask->tracker.input_port) == 0);
uint64_t now = rte_rdtsc();
otask->task_id += 100000; // for debugging
otask->offload_start = now;
send_offload_task_to_device(otask);
}

void ElementGraph::flush_tasks()
Expand Down
3 changes: 2 additions & 1 deletion src/lib/io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ static void comp_prepare_cb(struct ev_loop *loop, struct ev_prepare *watcher, in
io_thread_context *io_ctx = (io_thread_context *) ev_userdata(loop);
comp_thread_context *ctx = io_ctx->comp_ctx;
ctx->elem_graph->flush_tasks();
ctx->elem_graph->flush_offloaded_tasks();
ctx->elem_graph->scan_offloadable_elements(0);
}

Expand Down Expand Up @@ -154,6 +153,8 @@ static void comp_offload_task_completion_cb(struct ev_loop *loop, struct ev_asyn
((float) task_cycles / total_batch_size
- ((float) task->batches[b]->delay_time / task->batches[b]->count));
}
/* We need to rewind the state so that it gets executed by ElemGraph. */
task->state = TASK_INITIALIZED;
ctx->elem_graph->enqueue_offload_task(task,
ctx->elem_graph->get_first_next(task->elem),
0);
Expand Down

0 comments on commit ae60d38

Please sign in to comment.