Skip to content

Commit

Permalink
refs #6: Implement offload-task reuse
Browse files Browse the repository at this point in the history
 * Minor optimization using unlikely from perf monitoring results.
   (about ~3%)

 * IPv4/IPv6 works well as before, but still IPsec hangs.

 * Removes dead codes.
  • Loading branch information
achimnol committed Dec 28, 2015
1 parent daec371 commit f63dd07
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 34 deletions.
6 changes: 1 addition & 5 deletions include/nba/framework/elementgraph.hh
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,8 @@ public:
bool check_postproc(OffloadableElement *oel, int dbid);
bool check_postproc_all(OffloadableElement *oel);

/**
* Check if the given datablock (represented as a global ID) is reused
* after the given offloaded element in a same (linear?) path.
*/
bool check_datablock_reuse(Element *offloaded_elem, int datablock_id);
bool check_next_offloadable(Element *offloaded_elem);
Element *get_first_next(Element *elem);

/**
* Add a new element instance to the graph.
Expand Down
25 changes: 8 additions & 17 deletions src/lib/elementgraph.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ ElementGraph::ElementGraph(comp_thread_context *ctx)
: elements(128, ctx->loc.node_id), sched_elements(16, ctx->loc.node_id),
queue(2048, ctx->loc.node_id)
{
const size_t ready_task_qlen = 256;
this->ctx = ctx;
input_elem = nullptr;
assert(0 == rte_malloc_validate(ctx, NULL));
/* IMPORTANT: ready_tasks must be larger than task_pool. */
assert(ready_task_qlen <= ctx->num_taskpool_size);
for (int i = 0; i < NBA_MAX_COPROCESSOR_TYPES; i++)
ready_tasks[i].init(256, ctx->loc.node_id);
ready_tasks[i].init(ready_task_qlen, ctx->loc.node_id);
}

void ElementGraph::flush_offloaded_tasks()
Expand Down Expand Up @@ -280,7 +282,7 @@ void ElementGraph::process_batch(PacketBatch *batch)
batch->clean_drops(ctx->io_ctx->drop_queue);
#endif
}
if (current_elem->next_elems[0]->get_type() & ELEMTYPE_OUTPUT) {
if (unlikely(current_elem->next_elems[0]->get_type() & ELEMTYPE_OUTPUT)) {
/* We are at the end leaf of the pipeline.
* Inidicate free of the original batch. */
if (ctx->inspector) {
Expand Down Expand Up @@ -606,25 +608,14 @@ bool ElementGraph::check_postproc_all(OffloadableElement *oel)
#endif
}

bool ElementGraph::check_datablock_reuse(Element *offloaded_elem, int datablock_id)
bool ElementGraph::check_next_offloadable(Element *offloaded_elem)
{
//bool is_offloadable = ((offloaded_elem->next_elems[0]->get_type() & ELEMTYPE_OFFLOADABLE) != 0);
//int used_dbids[NBA_MAX_DATABLOCKS];
//if (is_offloadable) {
// OffloadableElement *oelem = dynamic_cast<OffloadableElement*> (offloaded_elem);
// assert(oelem != nullptr);
// size_t n = oelem->get_used_datablocks(used_dbids);
// for (unsigned i = 0; i < n; i++)
// if (used_dbids[i] == datablock_id)
// return true;
//}
return false;
return ((offloaded_elem->next_elems[0]->get_type() & ELEMTYPE_OFFLOADABLE) != 0);
}

bool ElementGraph::check_next_offloadable(Element *offloaded_elem)
Element *ElementGraph::get_first_next(Element *elem)
{
// FIXME: generalize for branched offloaded_elem
return ((offloaded_elem->next_elems[0]->get_type() & ELEMTYPE_OFFLOADABLE) != 0);
return elem->next_elems[0];
}

int ElementGraph::add_element(Element *new_elem)
Expand Down
20 changes: 16 additions & 4 deletions src/lib/io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,22 @@ static void comp_offload_task_completion_cb(struct ev_loop *loop, struct ev_asyn
uint64_t total_batch_size = 0;
for (size_t b = 0, b_max = task->batches.size(); b < b_max; b ++)
total_batch_size += task->batches[b]->count;
for (size_t b = 0, b_max = task->batches.size(); b < b_max; b ++) {
task->batches[b]->compute_time += (uint64_t) ((float) task_cycles / total_batch_size - ((float) task->batches[b]->delay_time / task->batches[b]->count));
// TODO: if next elem is offloadable, then use enqueue_offloadtask
task->elem->enqueue_batch(task->batches[b]);
if (ctx->elem_graph->check_next_offloadable(task->elem)) {
for (size_t b = 0, b_max = task->batches.size(); b < b_max; b ++) {
task->batches[b]->compute_time += (uint64_t)
((float) task_cycles / total_batch_size
- ((float) task->batches[b]->delay_time / task->batches[b]->count));
}
ctx->elem_graph->enqueue_offload_task(task,
ctx->elem_graph->get_first_next(task->elem),
0);
} else {
for (size_t b = 0, b_max = task->batches.size(); b < b_max; b ++) {
task->batches[b]->compute_time += (uint64_t)
((float) task_cycles / total_batch_size
- ((float) task->batches[b]->delay_time / task->batches[b]->count));
task->elem->enqueue_batch(task->batches[b]);
}
}

/* Free the task object. */
Expand Down
16 changes: 8 additions & 8 deletions src/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,14 @@ int main(int argc, char **argv)
computation_threads[i].comp_ctx = ctx;
ctx->thread_init_barrier = comp_init_barrier;

ctx->num_combatch_size = system_params["COMP_BATCH_SIZE"];
ctx->num_coproc_ppdepth = system_params["COPROC_PPDEPTH"];
ctx->num_batchpool_size = system_params["BATCHPOOL_SIZE"];
ctx->num_taskpool_size = system_params["TASKPOOL_SIZE"];
ctx->task_completion_queue_size = system_params["COPROC_COMPLETIONQ_LENGTH"];
ctx->num_tx_ports = num_ports;
ctx->num_nodes = num_nodes;

ctx->io_ctx = NULL;
ctx->coproc_ctx = NULL;
ctx->ready_flag = &ready_flag;
Expand All @@ -712,14 +720,6 @@ int main(int argc, char **argv)
new (ctx->elem_graph) ElementGraph(ctx);
ctx->inspector = NULL;

ctx->num_combatch_size = system_params["COMP_BATCH_SIZE"];
ctx->num_coproc_ppdepth = system_params["COPROC_PPDEPTH"];
ctx->num_batchpool_size = system_params["BATCHPOOL_SIZE"];
ctx->num_taskpool_size = system_params["TASKPOOL_SIZE"];
ctx->task_completion_queue_size = system_params["COPROC_COMPLETIONQ_LENGTH"];
ctx->num_tx_ports = num_ports;
ctx->num_nodes = num_nodes;

// TODO: extend to multiple devices
ctx->named_offload_devices = (unordered_map<string, ComputeDevice *> *)
rte_malloc_socket(NULL, sizeof(unordered_map<string, ComputeDevice *>), CACHE_LINE_SIZE, node_id);
Expand Down

0 comments on commit f63dd07

Please sign in to comment.