diff --git a/include/nba/framework/elementgraph.hh b/include/nba/framework/elementgraph.hh index 3bdae05..5642b66 100644 --- a/include/nba/framework/elementgraph.hh +++ b/include/nba/framework/elementgraph.hh @@ -55,12 +55,8 @@ public: bool check_postproc(OffloadableElement *oel, int dbid); bool check_postproc_all(OffloadableElement *oel); - /** - * Check if the given datablock (represented as a global ID) is reused - * after the given offloaded element in a same (linear?) path. - */ - bool check_datablock_reuse(Element *offloaded_elem, int datablock_id); bool check_next_offloadable(Element *offloaded_elem); + Element *get_first_next(Element *elem); /** * Add a new element instance to the graph. diff --git a/src/lib/elementgraph.cc b/src/lib/elementgraph.cc index e973063..223f6d6 100644 --- a/src/lib/elementgraph.cc +++ b/src/lib/elementgraph.cc @@ -29,12 +29,14 @@ ElementGraph::ElementGraph(comp_thread_context *ctx) : elements(128, ctx->loc.node_id), sched_elements(16, ctx->loc.node_id), queue(2048, ctx->loc.node_id) { + const size_t ready_task_qlen = 256; this->ctx = ctx; input_elem = nullptr; assert(0 == rte_malloc_validate(ctx, NULL)); /* IMPORTANT: ready_tasks must be larger than task_pool. */ + assert(ready_task_qlen <= ctx->num_taskpool_size); for (int i = 0; i < NBA_MAX_COPROCESSOR_TYPES; i++) - ready_tasks[i].init(256, ctx->loc.node_id); + ready_tasks[i].init(ready_task_qlen, ctx->loc.node_id); } void ElementGraph::flush_offloaded_tasks() @@ -280,7 +282,7 @@ void ElementGraph::process_batch(PacketBatch *batch) batch->clean_drops(ctx->io_ctx->drop_queue); #endif } - if (current_elem->next_elems[0]->get_type() & ELEMTYPE_OUTPUT) { + if (unlikely(current_elem->next_elems[0]->get_type() & ELEMTYPE_OUTPUT)) { /* We are at the end leaf of the pipeline. * Inidicate free of the original batch. */ if (ctx->inspector) { @@ -606,25 +608,14 @@ bool ElementGraph::check_postproc_all(OffloadableElement *oel) #endif } -bool ElementGraph::check_datablock_reuse(Element *offloaded_elem, int datablock_id) +bool ElementGraph::check_next_offloadable(Element *offloaded_elem) { - //bool is_offloadable = ((offloaded_elem->next_elems[0]->get_type() & ELEMTYPE_OFFLOADABLE) != 0); - //int used_dbids[NBA_MAX_DATABLOCKS]; - //if (is_offloadable) { - // OffloadableElement *oelem = dynamic_cast (offloaded_elem); - // assert(oelem != nullptr); - // size_t n = oelem->get_used_datablocks(used_dbids); - // for (unsigned i = 0; i < n; i++) - // if (used_dbids[i] == datablock_id) - // return true; - //} - return false; + return ((offloaded_elem->next_elems[0]->get_type() & ELEMTYPE_OFFLOADABLE) != 0); } -bool ElementGraph::check_next_offloadable(Element *offloaded_elem) +Element *ElementGraph::get_first_next(Element *elem) { - // FIXME: generalize for branched offloaded_elem - return ((offloaded_elem->next_elems[0]->get_type() & ELEMTYPE_OFFLOADABLE) != 0); + return elem->next_elems[0]; } int ElementGraph::add_element(Element *new_elem) diff --git a/src/lib/io.cc b/src/lib/io.cc index d70d0b3..c81d170 100644 --- a/src/lib/io.cc +++ b/src/lib/io.cc @@ -142,10 +142,22 @@ static void comp_offload_task_completion_cb(struct ev_loop *loop, struct ev_asyn uint64_t total_batch_size = 0; for (size_t b = 0, b_max = task->batches.size(); b < b_max; b ++) total_batch_size += task->batches[b]->count; - for (size_t b = 0, b_max = task->batches.size(); b < b_max; b ++) { - task->batches[b]->compute_time += (uint64_t) ((float) task_cycles / total_batch_size - ((float) task->batches[b]->delay_time / task->batches[b]->count)); - // TODO: if next elem is offloadable, then use enqueue_offloadtask - task->elem->enqueue_batch(task->batches[b]); + if (ctx->elem_graph->check_next_offloadable(task->elem)) { + for (size_t b = 0, b_max = task->batches.size(); b < b_max; b ++) { + task->batches[b]->compute_time += (uint64_t) + ((float) task_cycles / total_batch_size + - ((float) task->batches[b]->delay_time / task->batches[b]->count)); + } + ctx->elem_graph->enqueue_offload_task(task, + ctx->elem_graph->get_first_next(task->elem), + 0); + } else { + for (size_t b = 0, b_max = task->batches.size(); b < b_max; b ++) { + task->batches[b]->compute_time += (uint64_t) + ((float) task_cycles / total_batch_size + - ((float) task->batches[b]->delay_time / task->batches[b]->count)); + task->elem->enqueue_batch(task->batches[b]); + } } /* Free the task object. */ diff --git a/src/main.cc b/src/main.cc index 217c589..6f898f0 100644 --- a/src/main.cc +++ b/src/main.cc @@ -702,6 +702,14 @@ int main(int argc, char **argv) computation_threads[i].comp_ctx = ctx; ctx->thread_init_barrier = comp_init_barrier; + ctx->num_combatch_size = system_params["COMP_BATCH_SIZE"]; + ctx->num_coproc_ppdepth = system_params["COPROC_PPDEPTH"]; + ctx->num_batchpool_size = system_params["BATCHPOOL_SIZE"]; + ctx->num_taskpool_size = system_params["TASKPOOL_SIZE"]; + ctx->task_completion_queue_size = system_params["COPROC_COMPLETIONQ_LENGTH"]; + ctx->num_tx_ports = num_ports; + ctx->num_nodes = num_nodes; + ctx->io_ctx = NULL; ctx->coproc_ctx = NULL; ctx->ready_flag = &ready_flag; @@ -712,14 +720,6 @@ int main(int argc, char **argv) new (ctx->elem_graph) ElementGraph(ctx); ctx->inspector = NULL; - ctx->num_combatch_size = system_params["COMP_BATCH_SIZE"]; - ctx->num_coproc_ppdepth = system_params["COPROC_PPDEPTH"]; - ctx->num_batchpool_size = system_params["BATCHPOOL_SIZE"]; - ctx->num_taskpool_size = system_params["TASKPOOL_SIZE"]; - ctx->task_completion_queue_size = system_params["COPROC_COMPLETIONQ_LENGTH"]; - ctx->num_tx_ports = num_ports; - ctx->num_nodes = num_nodes; - // TODO: extend to multiple devices ctx->named_offload_devices = (unordered_map *) rte_malloc_socket(NULL, sizeof(unordered_map), CACHE_LINE_SIZE, node_id);