Skip to content

Commit

Permalink
refs #6: Backport datablock reuse optimization and refactor I/O loops.
Browse files Browse the repository at this point in the history
 * Merry Christmas!

 * Adds "io_base" concept to pipeline multiple offload tasks in each
   worker thread and allow reuse of datablocks in subsequent offloadable
   elements.

   - Differently from historical initial implementation, we now reuse
     offload task objects WITHOUT re-aggregation of batches between
     subsequent offloadable elements.

   - For this, elementgraph->tasks now holds both PacketBatch and
     OffloadTask using a bitmask type specifier on void* pointers in the
     ring.  Depending on the task type, ElementGraph now chooses whether
     to run the normal pipeline loop or to feed offloadable elements so
     that they begin the offloading process immediately.

 * Preserves generalization of batching schemes.
   (Yes, it was a huge manual merge job..)

 * TODO: GPU versions do not work yet (as always expected). Let's debug!
  • Loading branch information
achimnol committed Dec 24, 2015
1 parent 55cde55 commit cb43048
Show file tree
Hide file tree
Showing 24 changed files with 1,087 additions and 821 deletions.
22 changes: 11 additions & 11 deletions include/nba/core/mempool.hh
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace nba
class MemoryPool
{
public:
MemoryPool() : max_size_(0), curpos_(0)
MemoryPool() : max_size(0), cur_pos(0)
{}

virtual ~MemoryPool() {}
Expand All @@ -25,33 +25,33 @@ public:

int _alloc(size_t size, size_t *start_offset)
{
if (curpos_ + size > max_size_)
if (cur_pos + size > max_size)
return -ENOMEM;
/* IMPORTANT: We need to return the position before adding the new size. */
if (start_offset != nullptr)
*start_offset = curpos_;
curpos_ += size;
curpos_ = ALIGN_CEIL(curpos_, CACHE_LINE_SIZE);
*start_offset = cur_pos;
cur_pos += size;
cur_pos = ALIGN_CEIL(cur_pos, CACHE_LINE_SIZE);
return 0;
}

// The device implementer's should provide his own alloc() method.

void reset()
{
curpos_ = 0;
cur_pos = 0;
}

size_t get_alloc_size()
size_t get_alloc_size() const
{
return curpos_;
return cur_pos;
}

virtual void *get_base_ptr() = 0;
virtual void *get_base_ptr() const = 0;

protected:
size_t max_size_;
size_t curpos_;
size_t max_size;
size_t cur_pos;
};

}
Expand Down
8 changes: 5 additions & 3 deletions include/nba/element/element.hh
Original file line number Diff line number Diff line change
Expand Up @@ -224,16 +224,18 @@ public:
};
offload_compute_handlers.insert({{"dummy", ch},});
}
for (int i = 0; i < NBA_MAX_COPROCESSOR_TYPES; i++)
tasks[i] = nullptr;
finished_batches.init(MAX_FINBATCH_QLEN, -1, finished_batches_arrbuf);
}
virtual ~OffloadableElement() {}
int get_type() const { return ELEMTYPE_OFFLOADABLE | ELEMTYPE_SCHEDULABLE; }

/** Begins offloading of the given batch. */
/** Enqueues the given batch for offloading and begins offloading when
* it has sufficient amount of work. */
int offload(ElementGraph *mother, PacketBatch *in_batch, int input_port);

/** Immediately begins offloading of the given (reused) offload-task. */
int offload(ElementGraph *mother, OffloadTask *reused_offl_task, int input_port);

/** Stores the batches that are returned from offloading. */
int enqueue_batch(PacketBatch *batch);

Expand Down
7 changes: 3 additions & 4 deletions include/nba/element/packetbatch.hh
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <nba/core/intrinsic.hh>
#include <nba/framework/config.hh>
#include <nba/framework/datablock.hh>
#include <nba/framework/task.hh>
#include <nba/element/annotation.hh>
#include <cstdint>
#include <cstring>
Expand Down Expand Up @@ -414,7 +415,7 @@ public:
first_idx(-1), last_idx(-1), slot_count(0),
#endif
datablock_states(nullptr), recv_timestamp(0),
generation(0), batch_id(0), element(nullptr), input_port(0), has_results(false),
generation(0), batch_id(0),
#if NBA_BATCHING_SCHEME == NBA_BATCHING_CONTINUOUS
has_dropped(false),
#endif
Expand Down Expand Up @@ -474,9 +475,7 @@ public:
uint64_t recv_timestamp;
uint64_t generation;
uint64_t batch_id;
Element* element;
int input_port;
bool has_results;
struct task_tracker tracker;
#if NBA_BATCHING_SCHEME == NBA_BATCHING_CONTINUOUS
bool has_dropped;
#endif
Expand Down
12 changes: 8 additions & 4 deletions include/nba/engines/cuda/compat.hh
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@ struct datablock_kernel_arg {
void *buffer_bases_out[NBA_MAX_COPROC_PPDEPTH];
uint32_t item_count_in[NBA_MAX_COPROC_PPDEPTH];
uint32_t item_count_out[NBA_MAX_COPROC_PPDEPTH];
uint16_t item_size_in;
uint16_t *item_sizes_in[NBA_MAX_COPROC_PPDEPTH];
uint16_t item_size_out;
uint16_t *item_sizes_out[NBA_MAX_COPROC_PPDEPTH];
union {
uint16_t item_size_in;
uint16_t *item_sizes_in[NBA_MAX_COPROC_PPDEPTH];
};
union {
uint16_t item_size_out;
uint16_t *item_sizes_out[NBA_MAX_COPROC_PPDEPTH];
};
uint16_t *item_offsets_in[NBA_MAX_COPROC_PPDEPTH];
uint16_t *item_offsets_out[NBA_MAX_COPROC_PPDEPTH];
};
Expand Down
29 changes: 18 additions & 11 deletions include/nba/engines/cuda/computecontext.hh
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@

#include <deque>

#include <nba/core/queue.hh>
#include <nba/core/mempool.hh>
#include <nba/framework/config.hh>
#include <nba/framework/computedevice.hh>
#include <nba/framework/computecontext.hh>
#include <nba/core/mempool.hh>
#include <cuda.h>
#include <nba/engines/cuda/mempool.hh>
#include <nba/engines/cuda/utils.hh>
Expand All @@ -25,12 +27,14 @@ private:
public:
virtual ~CUDAComputeContext();

int alloc_input_buffer(size_t size, void **host_ptr, memory_t *dev_mem);
int alloc_output_buffer(size_t size, void **host_ptr, memory_t *dev_mem);
void clear_io_buffers();
void *get_host_input_buffer_base();
memory_t get_device_input_buffer_base();
size_t get_total_input_buffer_size();
io_base_t alloc_io_base();
int alloc_input_buffer(io_base_t io_base, size_t size, void **host_ptr, memory_t *dev_mem);
int alloc_output_buffer(io_base_t io_base, size_t size, void **host_ptr, memory_t *dev_mem);
void get_input_current_pos(io_base_t io_base, void **host_ptr, memory_t *dev_mem) const;
void get_output_current_pos(io_base_t io_base, void **host_ptr, memory_t *dev_mem) const;
size_t get_input_size(io_base_t io_base) const;
size_t get_output_size(io_base_t io_base) const;
void clear_io_buffers(io_base_t io_base);

void clear_kernel_args();
void push_kernel_arg(struct kernel_arg &arg);
Expand Down Expand Up @@ -83,13 +87,16 @@ private:
uint8_t *checkbits_d;
uint8_t *checkbits_h;
cudaStream_t _stream;
CUDAMemoryPool _cuda_mempool_in;
CUDAMemoryPool _cuda_mempool_out;
CPUMemoryPool _cpu_mempool_in;
CPUMemoryPool _cpu_mempool_out;
CUDAMemoryPool _cuda_mempool_in[NBA_MAX_IO_BASES];
CUDAMemoryPool _cuda_mempool_out[NBA_MAX_IO_BASES];
CPUMemoryPool _cpu_mempool_in[NBA_MAX_IO_BASES];
CPUMemoryPool _cpu_mempool_out[NBA_MAX_IO_BASES];

size_t num_kernel_args;
struct kernel_arg kernel_args[CUDA_MAX_KERNEL_ARGS];

FixedRing<unsigned, 0> io_base_ring;
unsigned io_base_ring_buf[NBA_MAX_IO_BASES];
};

}
Expand Down
48 changes: 28 additions & 20 deletions include/nba/engines/cuda/mempool.hh
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace nba {
class CUDAMemoryPool : public MemoryPool
{
public:
CUDAMemoryPool() : MemoryPool(), base_(NULL)
CUDAMemoryPool() : MemoryPool(), base(NULL)
{
}

Expand All @@ -23,8 +23,8 @@ public:

virtual bool init(size_t max_size)
{
max_size_ = max_size;
cutilSafeCall(cudaMalloc((void **) &base_, max_size));
max_size = max_size;
cutilSafeCall(cudaMalloc((void **) &base, max_size));
return true;
}

Expand All @@ -33,29 +33,29 @@ public:
size_t offset;
int ret = _alloc(size, &offset);
if (ret == 0)
return (void *) ((uint8_t *) base_ + (uintptr_t) offset);
return (void *) ((uint8_t *) base + (uintptr_t) offset);
return NULL;
}

void destroy()
{
if (base_ != NULL)
cudaFree(base_);
if (base != NULL)
cudaFree(base);
}

void *get_base_ptr()
void *get_base_ptr() const
{
return base_;
return base;
}

private:
void *base_;
void *base;
};

class CPUMemoryPool : public MemoryPool
{
public:
CPUMemoryPool(int cuda_flags) : MemoryPool(), base_(NULL), flags_(cuda_flags)
CPUMemoryPool(int cuda_flags = 0) : MemoryPool(), base(NULL), flags(cuda_flags)
{
}

Expand All @@ -66,9 +66,17 @@ public:

virtual bool init(unsigned long size)
{
max_size_ = size;
cutilSafeCall(cudaHostAlloc((void **) &base_, size,
flags_));
max_size = size;
cutilSafeCall(cudaHostAlloc((void **) &base, size,
this->flags));
return true;
}

bool init_with_flags(unsigned long size, int flags)
{
max_size = size;
cutilSafeCall(cudaHostAlloc((void **) &base, size,
flags));
return true;
}

Expand All @@ -77,24 +85,24 @@ public:
size_t offset;
int ret = _alloc(size, &offset);
if (ret == 0)
return (void *) ((uint8_t *) base_ + (uintptr_t) offset);
return (void *) ((uint8_t *) base + (uintptr_t) offset);
return NULL;
}

void destroy()
{
if (base_ != NULL)
cudaFreeHost(base_);
if (base != NULL)
cudaFreeHost(base);
}

void *get_base_ptr()
void *get_base_ptr() const
{
return base_;
return base;
}

protected:
void *base_;
int flags_;
void *base;
int flags;
};

}
Expand Down
29 changes: 18 additions & 11 deletions include/nba/engines/dummy/computecontext.hh
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@

#include <deque>

#include <nba/core/queue.hh>
#include <nba/core/mempool.hh>
#include <nba/framework/config.hh>
#include <nba/framework/computedevice.hh>
#include <nba/framework/computecontext.hh>
#include <nba/core/mempool.hh>
#include <nba/engines/dummy/mempool.hh>

namespace nba
Expand All @@ -21,12 +23,14 @@ private:
public:
virtual ~DummyComputeContext();

int alloc_input_buffer(size_t size, void **host_ptr, memory_t *dev_mem);
int alloc_output_buffer(size_t size, void **host_ptr, memory_t *dev_mem);
void clear_io_buffers();
void *get_host_input_buffer_base();
memory_t get_device_input_buffer_base();
size_t get_total_input_buffer_size();
io_base_t alloc_io_base();
int alloc_input_buffer(io_base_t io_base, size_t size, void **host_ptr, memory_t *dev_mem);
int alloc_output_buffer(io_base_t io_base, size_t size, void **host_ptr, memory_t *dev_mem);
void get_input_current_pos(io_base_t io_base, void **host_ptr, memory_t *dev_mem) const;
void get_output_current_pos(io_base_t io_base, void **host_ptr, memory_t *dev_mem) const;
size_t get_input_size(io_base_t io_base) const;
size_t get_output_size(io_base_t io_base) const;
void clear_io_buffers(io_base_t io_base);

void clear_kernel_args() { }
void push_kernel_arg(struct kernel_arg &arg) { }
Expand Down Expand Up @@ -62,10 +66,13 @@ public:
}

private:
DummyCPUMemoryPool _dev_mempool_in;
DummyCPUMemoryPool _dev_mempool_out;
DummyCPUMemoryPool _cpu_mempool_in;
DummyCPUMemoryPool _cpu_mempool_out;
DummyCPUMemoryPool _dev_mempool_in[NBA_MAX_IO_BASES];
DummyCPUMemoryPool _dev_mempool_out[NBA_MAX_IO_BASES];
DummyCPUMemoryPool _cpu_mempool_in[NBA_MAX_IO_BASES];
DummyCPUMemoryPool _cpu_mempool_out[NBA_MAX_IO_BASES];

FixedRing<unsigned, 0> io_base_ring;
unsigned io_base_ring_buf[NBA_MAX_IO_BASES];
};

}
Expand Down
20 changes: 10 additions & 10 deletions include/nba/engines/dummy/mempool.hh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace nba {
class DummyCPUMemoryPool : public MemoryPool
{
public:
DummyCPUMemoryPool() : MemoryPool(), base_(NULL)
DummyCPUMemoryPool() : MemoryPool(), base(NULL)
{
}

Expand All @@ -21,8 +21,8 @@ public:

virtual bool init(unsigned long size)
{
max_size_ = size;
base_ = malloc(size);
max_size = size;
base = malloc(size);
return true;
}

Expand All @@ -31,25 +31,25 @@ public:
size_t offset;
int ret = _alloc(size, &offset);
if (ret == 0)
return (void *) ((uint8_t *) base_ + offset);
return (void *) ((uint8_t *) base + offset);
return NULL;
}

void destroy()
{
if (base_ != NULL) {
free(base_);
base_ = NULL;
if (base != NULL) {
free(base);
base = NULL;
}
}

void *get_base_ptr()
void *get_base_ptr() const
{
return base_;
return base;
}

protected:
void *base_;
void *base;
};

}
Expand Down
Loading

0 comments on commit cb43048

Please sign in to comment.