From 309ab57aed5a1bda9653a16bb3801d34be567f63 Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Mon, 11 Jan 2016 01:54:56 +0900 Subject: [PATCH] refs #6: Fix memory corruption bug. * IPsec works well with task/datablock reuse optimization. - However, this refactored architecture has high libev/syscall overheads (> 10% CPU cycles of worker threads). - TODO: optimize it... * When reusing tasks, - we should keep task itself (do not free it!) and - we should update task->elem as well as task->tracker.element * There was a serious bug that reused GPU input buffer for outputs (for cases when roi/wri is WHOLE_PACKET) are not actually included in device-to-host copies, resulting in NO take-back of computation results. - Currently we allocate an output buffer explicitly without such buffer reuse optimization. - TODO: reuse input buffer and include its offset/lengths to coalescing of d2h copies --- elements/ipsec/IPsecAES.hh | 4 +- elements/ipsec/IPsecAES_kernel.cu | 4 +- elements/ipsec/IPsecDatablocks.cc | 16 ++--- include/nba/engines/cuda/computecontext.hh | 3 + include/nba/engines/cuda/mempool.hh | 4 +- include/nba/framework/offloadtask.hh | 4 +- src/engines/cuda/computecontext.cc | 9 +++ src/lib/coprocessor.cc | 3 +- src/lib/element.cc | 4 +- src/lib/elementgraph.cc | 3 +- src/lib/io.cc | 11 +-- src/lib/offloadtask.cc | 83 ++++++++++++++++------ 12 files changed, 103 insertions(+), 45 deletions(-) diff --git a/elements/ipsec/IPsecAES.hh b/elements/ipsec/IPsecAES.hh index ed9b81a..87b48fa 100644 --- a/elements/ipsec/IPsecAES.hh +++ b/elements/ipsec/IPsecAES.hh @@ -49,8 +49,8 @@ public: size_t get_used_datablocks(int *datablock_ids) { datablock_ids[0] = dbid_enc_payloads; - datablock_ids[1] = dbid_iv; - datablock_ids[2] = dbid_flow_ids; + datablock_ids[1] = dbid_flow_ids; + datablock_ids[2] = dbid_iv; datablock_ids[3] = dbid_aes_block_info; return 4; } diff --git a/elements/ipsec/IPsecAES_kernel.cu b/elements/ipsec/IPsecAES_kernel.cu index a07c521..9be6f10 100644 --- a/elements/ipsec/IPsecAES_kernel.cu +++ b/elements/ipsec/IPsecAES_kernel.cu @@ -16,8 +16,8 @@ /* The index is given by the order in get_used_datablocks(). */ #define dbid_enc_payloads_d (0) -#define dbid_iv_d (1) -#define dbid_flow_ids_d (2) +#define dbid_flow_ids_d (1) +#define dbid_iv_d (2) #define dbid_aes_block_info_d (3) #ifndef __AES_CORE__ /*same constants are defined in ssl/aes/aes_core.h */ diff --git a/elements/ipsec/IPsecDatablocks.cc b/elements/ipsec/IPsecDatablocks.cc index 788c67b..6fb8f34 100644 --- a/elements/ipsec/IPsecDatablocks.cc +++ b/elements/ipsec/IPsecDatablocks.cc @@ -4,8 +4,8 @@ namespace nba { int dbid_enc_payloads; -int dbid_iv; int dbid_flow_ids; +int dbid_iv; int dbid_aes_block_info; static DataBlock* db_enc_payloads_ctor (void) { @@ -14,18 +14,18 @@ static DataBlock* db_enc_payloads_ctor (void) { new (ptr) IPsecEncryptedPayloadDataBlock(); return ptr; }; -static DataBlock* db_iv_ctor (void) { - DataBlock *ptr = (DataBlock *) rte_malloc("datablock", sizeof(IPsecIVDataBlock), CACHE_LINE_SIZE); - assert(ptr != nullptr); - new (ptr) IPsecIVDataBlock(); - return ptr; -}; static DataBlock* db_flow_ids_ctor (void) { DataBlock *ptr = (DataBlock *) rte_malloc("datablock", sizeof(IPsecFlowIDsDataBlock), CACHE_LINE_SIZE); assert(ptr != nullptr); new (ptr) IPsecFlowIDsDataBlock(); return ptr; }; +static DataBlock* db_iv_ctor (void) { + DataBlock *ptr = (DataBlock *) rte_malloc("datablock", sizeof(IPsecIVDataBlock), CACHE_LINE_SIZE); + assert(ptr != nullptr); + new (ptr) IPsecIVDataBlock(); + return ptr; +}; static DataBlock* db_aes_block_info_ctor (void) { DataBlock *ptr = (DataBlock *) rte_malloc("datablock", sizeof(IPsecAESBlockInfoDataBlock), CACHE_LINE_SIZE); assert(ptr != nullptr); @@ -34,8 +34,8 @@ static DataBlock* db_aes_block_info_ctor (void) { }; declare_datablock("ipsec.enc_payloads", db_enc_payloads_ctor, dbid_enc_payloads); -declare_datablock("ipsec.iv", db_iv_ctor, dbid_iv); declare_datablock("ipsec.flow_ids", db_flow_ids_ctor, dbid_flow_ids); +declare_datablock("ipsec.iv", db_iv_ctor, dbid_iv); declare_datablock("ipsec.aes_block_info", db_aes_block_info_ctor, dbid_aes_block_info); } diff --git a/include/nba/engines/cuda/computecontext.hh b/include/nba/engines/cuda/computecontext.hh index 1206c42..22a2aa6 100644 --- a/include/nba/engines/cuda/computecontext.hh +++ b/include/nba/engines/cuda/computecontext.hh @@ -92,6 +92,9 @@ private: CPUMemoryPool _cpu_mempool_in[NBA_MAX_IO_BASES]; CPUMemoryPool _cpu_mempool_out[NBA_MAX_IO_BASES]; + void *dummy_host_buf; + memory_t dummy_dev_buf; + size_t num_kernel_args; struct kernel_arg kernel_args[CUDA_MAX_KERNEL_ARGS]; diff --git a/include/nba/engines/cuda/mempool.hh b/include/nba/engines/cuda/mempool.hh index 46f5801..a62406d 100644 --- a/include/nba/engines/cuda/mempool.hh +++ b/include/nba/engines/cuda/mempool.hh @@ -33,7 +33,7 @@ public: size_t offset; int ret = _alloc(size, &offset); if (ret == 0) - return (void *) ((uint8_t *) base + (uintptr_t) offset); + return (void *) ((uintptr_t) base + offset); return NULL; } @@ -85,7 +85,7 @@ public: size_t offset; int ret = _alloc(size, &offset); if (ret == 0) - return (void *) ((uint8_t *) base + (uintptr_t) offset); + return (void *) ((uintptr_t) base + offset); return NULL; } diff --git a/include/nba/framework/offloadtask.hh b/include/nba/framework/offloadtask.hh index b9b0e9c..6a646b9 100644 --- a/include/nba/framework/offloadtask.hh +++ b/include/nba/framework/offloadtask.hh @@ -86,8 +86,8 @@ public: struct ev_async *completion_watcher __cache_aligned; struct rte_ring *completion_queue __cache_aligned; -private: uint64_t task_id; // for deubgging +private: friend class OffloadableElement; void *host_write_begin; @@ -97,6 +97,8 @@ private: size_t input_alloc_size_begin; size_t output_alloc_size_begin; + size_t last_input_size; + size_t last_output_size; }; } diff --git a/src/engines/cuda/computecontext.cc b/src/engines/cuda/computecontext.cc index 8e56401..38eb0e9 100644 --- a/src/engines/cuda/computecontext.cc +++ b/src/engines/cuda/computecontext.cc @@ -26,6 +26,13 @@ CUDAComputeContext::CUDAComputeContext(unsigned ctx_id, ComputeDevice *mother_de _cpu_mempool_in[i].init_with_flags(io_base_size, cudaHostAllocPortable); _cpu_mempool_out[i].init_with_flags(io_base_size, cudaHostAllocPortable); } + { + void *t; + cutilSafeCall(cudaMalloc((void **) &t, 64)); + dummy_dev_buf.ptr = t; + cutilSafeCall(cudaHostAlloc((void **) &t, 64, cudaHostAllocPortable)); + dummy_host_buf = t; + } cutilSafeCall(cudaHostAlloc((void **) &checkbits_h, MAX_BLOCKS, cudaHostAllocMapped)); cutilSafeCall(cudaHostGetDevicePointer((void **) &checkbits_d, checkbits_h, 0)); assert(checkbits_h != NULL); @@ -111,12 +118,14 @@ void CUDAComputeContext::clear_io_buffers(io_base_t io_base) int CUDAComputeContext::enqueue_memwrite_op(void *host_buf, memory_t dev_buf, size_t offset, size_t size) { + //cutilSafeCall(cudaMemcpyAsync(dummy_dev_buf.ptr, dummy_host_buf, 64, cudaMemcpyHostToDevice, _stream)); cutilSafeCall(cudaMemcpyAsync(dev_buf.ptr, host_buf, size, cudaMemcpyHostToDevice, _stream)); return 0; } int CUDAComputeContext::enqueue_memread_op(void *host_buf, memory_t dev_buf, size_t offset, size_t size) { + //cutilSafeCall(cudaMemcpyAsync(dummy_host_buf, dummy_dev_buf.ptr, 64, cudaMemcpyDeviceToHost, _stream)); cutilSafeCall(cudaMemcpyAsync(host_buf, dev_buf.ptr, size, cudaMemcpyDeviceToHost, _stream)); return 0; } diff --git a/src/lib/coprocessor.cc b/src/lib/coprocessor.cc index 01599bf..f10c189 100644 --- a/src/lib/coprocessor.cc +++ b/src/lib/coprocessor.cc @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -56,7 +57,6 @@ static void coproc_task_input_cb(struct ev_loop *loop, struct ev_async *watcher, * and libev will call them first and then call earlier steps again. */ ret = rte_ring_dequeue(ctx->task_input_queue, (void **) &task); if (task != nullptr) { - assert(task->cctx != nullptr); task->coproc_ctx = ctx; task->copy_h2d(); task->execute(); @@ -87,7 +87,6 @@ static void coproc_task_d2h_cb(struct ev_loop *loop, struct ev_async *watcher, i if (ctx->d2h_pending_queue->size() > 0) { OffloadTask *task = ctx->d2h_pending_queue->front(); ctx->d2h_pending_queue->pop_front(); - assert(task != nullptr); if (task->poll_kernel_finished()) { //task->cctx->sync(); task->copy_d2h(); diff --git a/src/lib/element.cc b/src/lib/element.cc index 37b5c64..127b7b5 100644 --- a/src/lib/element.cc +++ b/src/lib/element.cc @@ -181,9 +181,9 @@ int OffloadableElement::offload(ElementGraph *mother, OffloadTask *otask, int in int dev_idx = 0; uint64_t now = rte_rdtsc(); otask->state = TASK_INITIALIZING; - otask->task_id = task_id ++; + otask->task_id += 100000; // for debugging otask->offload_start = now; - otask->state = TASK_INITIALIZED; + otask->state = TASK_PREPARED; mother->ready_tasks[dev_idx].push_back(otask); /* This should always succeed. */ return 0; diff --git a/src/lib/elementgraph.cc b/src/lib/elementgraph.cc index 223f6d6..2577f75 100644 --- a/src/lib/elementgraph.cc +++ b/src/lib/elementgraph.cc @@ -70,6 +70,7 @@ void ElementGraph::flush_offloaded_tasks() task->cctx = cctx; if (task->state < TASK_PREPARED) { + bool had_io_base = (task->io_base != INVALID_IO_BASE); bool has_io_base = false; if (task->io_base == INVALID_IO_BASE) { task->io_base = cctx->alloc_io_base(); @@ -174,6 +175,7 @@ void ElementGraph::enqueue_batch(PacketBatch *batch, Element *start_elem, int in void ElementGraph::enqueue_offload_task(OffloadTask *otask, Element *start_elem, int input_port) { assert(start_elem != nullptr); + otask->elem = dynamic_cast(start_elem); otask->tracker.element = start_elem; otask->tracker.input_port = input_port; queue.push_back(Task::to_task(otask)); @@ -533,7 +535,6 @@ void ElementGraph::process_offload_task(OffloadTask *otask) { Element *current_elem = otask->tracker.element; OffloadableElement *offloadable = dynamic_cast(current_elem); - assert(offloadable != nullptr); assert(offloadable->offload(this, otask, otask->tracker.input_port) == 0); } diff --git a/src/lib/io.cc b/src/lib/io.cc index c81d170..3a7be78 100644 --- a/src/lib/io.cc +++ b/src/lib/io.cc @@ -151,6 +151,7 @@ static void comp_offload_task_completion_cb(struct ev_loop *loop, struct ev_asyn ctx->elem_graph->enqueue_offload_task(task, ctx->elem_graph->get_first_next(task->elem), 0); + /* This task is reused. We keep them intact. */ } else { for (size_t b = 0, b_max = task->batches.size(); b < b_max; b ++) { task->batches[b]->compute_time += (uint64_t) @@ -158,12 +159,12 @@ static void comp_offload_task_completion_cb(struct ev_loop *loop, struct ev_asyn - ((float) task->batches[b]->delay_time / task->batches[b]->count)); task->elem->enqueue_batch(task->batches[b]); } - } - /* Free the task object. */ - task->cctx = nullptr; - task->~OffloadTask(); - rte_mempool_put(ctx->task_pool, (void *) task); + /* Free the task object. */ + task->cctx = nullptr; + task->~OffloadTask(); + rte_mempool_put(ctx->task_pool, (void *) task); + } /* Free the resources used for this offload task. */ cctx->currently_running_task = nullptr; diff --git a/src/lib/offloadtask.cc b/src/lib/offloadtask.cc index ee90a8d..d7e6897 100644 --- a/src/lib/offloadtask.cc +++ b/src/lib/offloadtask.cc @@ -40,21 +40,53 @@ OffloadTask::OffloadTask() offload_start = 0; num_pkts = 0; num_bytes = 0; + // for debugging + last_input_size = 0; + last_output_size = 0; } OffloadTask::~OffloadTask() { } +#if DEBUG +#define _debug_print_inb(tag, batch, dbid) { \ + void *begin_h; \ + memory_t begin_d; \ + cctx->get_input_current_pos(io_base, &begin_h, &begin_d); \ + size_t len = cctx->get_input_size(io_base) - last_input_size; \ + last_input_size = cctx->get_input_size(io_base); \ + printf("task[%lu, %p:%u] alloc_input_buffer (" tag ") %p:%d -> start:%p, end:%p, len:%'lu(0x%lx)\n", \ + task_id, cctx, (unsigned) io_base, batch, dbid, \ + (void *)((uintptr_t)begin_h - len), begin_h, len, len); \ +} +#define _debug_print_outb(tag, batch, dbid) { \ + void *begin_h; \ + memory_t begin_d; \ + cctx->get_output_current_pos(io_base, &begin_h, &begin_d); \ + size_t len = cctx->get_output_size(io_base) - last_output_size; \ + last_output_size = cctx->get_output_size(io_base); \ + printf("task[%lu, %p:%u] alloc_output_buffer (" tag ") %p:%d -> start:%p, end:%p, len:%'lu(0x%lx)\n", \ + task_id, cctx, (unsigned) io_base, batch, dbid, \ + (void *)((uintptr_t)begin_h - len), begin_h, len, len); \ +} +#else +#define _debug_print_inb(tag, batch, dbid) +#define _debug_print_outb(tag, batch, dbid) +#endif + void OffloadTask::prepare_read_buffer() { - assert(io_base != INVALID_IO_BASE); #ifdef COALESCED_COPY + // write: host-to-device input + // read: device-to-host output cctx->get_input_current_pos(io_base, &host_write_begin, &dev_write_begin); cctx->get_output_current_pos(io_base, &host_read_begin, &dev_read_begin); input_alloc_size_begin = cctx->get_input_size(io_base); output_alloc_size_begin = cctx->get_output_size(io_base); #endif + _debug_print_inb("at-beginning", nullptr, 0); + _debug_print_outb("at-beginning", nullptr, 0); for (int dbid : datablocks) { if (elemgraph->check_preproc(elem, dbid)) { @@ -68,6 +100,7 @@ void OffloadTask::prepare_read_buffer() (void **) &t->aligned_item_sizes_h, &t->aligned_item_sizes_d); } + _debug_print_inb("prepare_read_buffer.WHOLE", nullptr, dbid); } else if (rri.type == READ_PARTIAL_PACKET) { for (PacketBatch *batch : batches) { struct datablock_tracker *t = &batch->datablock_states[dbid]; @@ -75,6 +108,7 @@ void OffloadTask::prepare_read_buffer() (void **) &t->aligned_item_sizes_h, &t->aligned_item_sizes_d); } + _debug_print_inb("prepare_read_buffer.PARTIAL", nullptr, dbid); } else { for (PacketBatch *batch : batches) { struct datablock_tracker *t = &batch->datablock_states[dbid]; @@ -106,6 +140,7 @@ void OffloadTask::prepare_read_buffer() db->preprocess(batch, t->host_in_ptr); } } + _debug_print_inb("prepare_read_buffer.preproc", nullptr, dbid); } /* endcase(rri.type) */ } /* endif(check_preproc) */ } /* endfor(dbid) */ @@ -129,17 +164,20 @@ void OffloadTask::prepare_write_buffer() t->dev_out_ptr.ptr = nullptr; } } else { - for (PacketBatch *batch : batches) { - struct datablock_tracker *t = &batch->datablock_states[dbid]; - t->host_out_ptr = nullptr; - t->dev_out_ptr.ptr = nullptr; - if (rri.type == READ_WHOLE_PACKET && wri.type == WRITE_WHOLE_PACKET) { - /* Reuse read_roi currently. Do NOT update size & count here! */ - t->out_size = t->in_size; - t->out_count = t->in_count; - t->host_out_ptr = t->host_in_ptr; - t->dev_out_ptr = t->dev_in_ptr; - } else { + //if (rri.type == READ_WHOLE_PACKET && wri.type == WRITE_WHOLE_PACKET) { + // for (PacketBatch *batch : batches) { + // struct datablock_tracker *t = &batch->datablock_states[dbid]; + // /* Reuse read_roi currently. Do NOT update size & count here! */ + // t->out_size = t->in_size; + // t->out_count = t->in_count; + // t->host_out_ptr = t->host_in_ptr; + // t->dev_out_ptr = t->dev_in_ptr; + // } + //} else { + for (PacketBatch *batch : batches) { + struct datablock_tracker *t = &batch->datablock_states[dbid]; + t->host_out_ptr = nullptr; + t->dev_out_ptr.ptr = nullptr; tie(t->out_size, t->out_count) = db->calc_write_buffer_size(batch); if (t->out_size > 0 && t->out_count > 0) { cctx->alloc_output_buffer(io_base, t->out_size, @@ -147,8 +185,9 @@ void OffloadTask::prepare_write_buffer() &t->dev_out_ptr); } } - } - } + _debug_print_outb("prepare_write_buffer", nullptr, dbid); + //} /* endif(rri.type, wri.type) */ + } /* endif(wri.type) */ } /* endif(check_preproc) */ } /* endfor(dbid) */ } @@ -161,6 +200,7 @@ bool OffloadTask::copy_h2d() /* Copy the datablock information for the first kernel argument. */ size_t dbarray_size = ALIGN_CEIL(sizeof(struct datablock_kernel_arg) * datablocks.size(), CACHE_LINE_SIZE); cctx->alloc_input_buffer(io_base, dbarray_size, (void **) &dbarray_h, &dbarray_d); + _debug_print_inb("copy_h2d.dbarray", nullptr, 0); assert(dbarray_h != nullptr); #ifndef COALESCED_COPY @@ -183,6 +223,7 @@ bool OffloadTask::copy_h2d() int b = 0; for (PacketBatch *batch : batches) { + assert(batch->datablock_states != nullptr); struct datablock_tracker *t = &batch->datablock_states[dbid]; if (rri.type == READ_WHOLE_PACKET && t->in_count > 0) { @@ -194,7 +235,7 @@ bool OffloadTask::copy_h2d() item_info_h = t->aligned_item_sizes_h; item_info_d = t->aligned_item_sizes_d; } - item_info_size += ALIGN(sizeof(struct item_size_info), CACHE_LINE_SIZE); + item_info_size += ALIGN_CEIL(sizeof(struct item_size_info), CACHE_LINE_SIZE); #endif dbarray_h[dbid_d].item_sizes_in[b] = (uint16_t *) ((char *) t->aligned_item_sizes_d.ptr + (uintptr_t) offsetof(struct item_size_info, sizes)); @@ -269,9 +310,9 @@ bool OffloadTask::copy_h2d() cctx->enqueue_memwrite_op(first_host_in_ptr, first_dev_in_ptr, 0, total_size); first_host_in_ptr = t->host_in_ptr; - total_size = ALIGN(t->in_size, CACHE_LINE_SIZE); + total_size = ALIGN_CEIL(t->in_size, CACHE_LINE_SIZE); } else { - total_size += ALIGN(t->in_size, CACHE_LINE_SIZE); + total_size += ALIGN_CEIL(t->in_size, CACHE_LINE_SIZE); } } } /* endif(check_preproc) */ @@ -315,9 +356,11 @@ void OffloadTask::execute() cctx->alloc_input_buffer(io_base, sizeof(uint16_t) * all_item_count, (void **) &batch_ids_h, &batch_ids_d); + _debug_print_inb("execute.batch_ids", nullptr, 0); assert(batch_ids_h != nullptr); cctx->alloc_input_buffer(io_base, sizeof(uint16_t) * all_item_count, (void **) &item_ids_h, &item_ids_d); + _debug_print_inb("execute.item_ids", nullptr, 0); assert(item_ids_h != nullptr); res.num_workitems = all_item_count; res.num_threads_per_workgroup = 256; @@ -338,7 +381,7 @@ void OffloadTask::execute() size_t last_alloc_size = cctx->get_input_size(io_base); cctx->enqueue_memwrite_op(host_write_begin, dev_write_begin, 0, last_alloc_size - input_alloc_size_begin); #else - cctx->enqueue_memwrite_op(batch_ids_h, batch_ids_d, 0, ALIGN(sizeof(uint16_t) * all_item_count, CACHE_LINE_SIZE) * 2); + cctx->enqueue_memwrite_op(batch_ids_h, batch_ids_d, 0, ALIGN_CEIL(sizeof(uint16_t) * all_item_count, CACHE_LINE_SIZE) * 2); #endif cctx->clear_checkbits(); @@ -417,9 +460,9 @@ bool OffloadTask::copy_d2h() { cctx->enqueue_memread_op(first_host_out_ptr, first_dev_out_ptr, 0, total_size); first_host_out_ptr = t->host_out_ptr; - total_size = ALIGN(t->out_size, CACHE_LINE_SIZE); + total_size = ALIGN_CEIL(t->out_size, CACHE_LINE_SIZE); } else { - total_size += ALIGN(t->out_size, CACHE_LINE_SIZE); + total_size += ALIGN_CEIL(t->out_size, CACHE_LINE_SIZE); } } } /* endif(check_postproc) */