Skip to content

Pipemode #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Table of Contents
#. `Getting Started <#getting-started>`__
#. `Building your Image <#building-your-image>`__
#. `Running the tests <#running-the-tests>`__
#. `The sagemaker_tensorflow_containers Python package <#python package>`__

Getting Started
---------------
Expand Down Expand Up @@ -245,6 +246,15 @@ SageMaker <https://aws.amazon.com/sagemaker/>`__, then use:
--instance-type ml.m4.xlarge \
--tag 1.0

The sagemaker_tensorflow_containers Python package
--------------------------------------------------

Building
~~~~~~~~
This is a standard Python package with a Python C extension module, built using CMake, stored in the "cpp" subdirectory.
This module has a series of tests which are run during a setuptools build of hte package. The tests may be explicitly run by invoking
the executables built in each test directory.

Contributing
------------

Expand Down
25 changes: 25 additions & 0 deletions cpp/pipemode/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
cmake_minimum_required(VERSION 3.10)
enable_language(CXX)
set(CMAKE_CXX_STANDARD 11)
set(CURL_LIBRARY "-lcurl")
find_package(CURL REQUIRED)
project(TFPipeModeDataset)

enable_testing()

add_subdirectory(PipeReader)
add_subdirectory(RecordIOReader)
add_subdirectory(test)
add_executable(go go.cpp)




add_executable(in in.cpp)
target_include_directories(in PUBLIC
PipeReader
RecordIOReader
)
target_link_libraries(in
libPipeReader
libRecordIOReader)
9 changes: 9 additions & 0 deletions cpp/pipemode/PipeReader/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
cmake_minimum_required(VERSION 3.10)
set (CMAKE_CXX_STANDARD 11)

file(GLOB_RECURSE sources ./*.cpp ./*.hpp)
add_library(libPipeReader STATIC ${sources})

target_include_directories(libPipeReader PUBLIC
${CMAKE_CURRENT_SOURCE_DIR}
)
76 changes: 76 additions & 0 deletions cpp/pipemode/PipeReader/PipeReader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#include "PipeReader.hpp"
#include <system_error>
#include <fcntl.h>
#include <iostream>

using namespace sagemaker::tensorflow;

PipeReader::PipeReader(const std::string & channel_directory, const std::string & channel_name) :
channel_directory_(channel_directory),
channel_name_(channel_name),
current_pipe_index_(0),
current_pipe_(-1) {
Open();
}

PipeReader::PipeReader(PipeReader&& other) :
channel_directory_(other.channel_directory_),
channel_name_(other.channel_name_),
current_pipe_index_(other.current_pipe_index_) {
Open();
close(other.current_pipe_);
other.current_pipe_ = -1;
}

PipeReader& PipeReader::operator=(PipeReader&& other) {
Close();
channel_directory_ = std::move(other.channel_directory_);
channel_name_ = std::move(other.channel_name_);
current_pipe_index_ = std::move(other.current_pipe_index_);
Open();
close(other.current_pipe_);
other.current_pipe_ = -1;
return *this;
}

PipeReader::~PipeReader() {
Close();
}

void PipeReader::Open() {
current_pipe_ = open(BuildCurrentPipeName().c_str(), O_RDONLY);
if(-1 == current_pipe_) {
throw std::system_error(errno, std::system_category());
}
}

void PipeReader::Close() {
if(-1 != current_pipe_) {
close(current_pipe_);
current_pipe_ = -1;
}
}

void PipeReader::Reset() {
Close();
current_pipe_index_ ++;
Open();
}

std::size_t PipeReader::Read(void* buffer, std::size_t size) {
ssize_t nbytes = read(current_pipe_, buffer, size);
if(nbytes < 0) {
throw std::system_error(errno, std::system_category());
}
return static_cast<size_t>(nbytes);
}

std::string PipeReader::BuildCurrentPipeName() const {
std::string pipe_name = channel_name_ + "_" + std::to_string(current_pipe_index_);
std::string channel_path = channel_directory_;
if (channel_path[channel_path.length() - 1] != '/') {
channel_path += '/';
}
channel_path += pipe_name;
return channel_path;
}
90 changes: 90 additions & 0 deletions cpp/pipemode/PipeReader/PipeReader.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#ifndef SAGEMAKER_PIPE_READER_H
#define SAGEMAKER_PIPE_READER_H

#include <cstdio>
#include <string>
#include <unistd.h>

namespace sagemaker {
namespace tensorflow {

class PipeReader {
public:
/**
Constructs a new PipeReader.

The PipeReader reads data from the SageMaker Channel channelName, as a named pipe in
channelDirectory. This PipeReader may be immediately Read after construction.

Objects of this class implement the SageMaker PipeMode channel protocol. The entire channel data
may be read sequentially by invoking Read. The reader can be reset to read from the beginning of the
channel by invoking Reset. This can occur when the Reader has reached the end of the channel data or before.
At anytime Close may be called to release the underlying filesystem resources. Reset can be invoked after Close,
to begin reginning from the channel again.

@param[in] channelDirectory The Posix filesystem directory where SageMaker channel named pipes
are created.
@param[in] channelName The name of the channel to read.
*/
PipeReader(const std::string & channel_directory, const std::string & channel_name);

PipeReader(const PipeReader& other) = delete;
PipeReader& operator=(const PipeReader&) = delete;
PipeReader(PipeReader&& other);
PipeReader& operator=(PipeReader&& other);

/**
Destructs a PipeReader. Invokes Close.
*/
~PipeReader();

/**
Reads up-to size bytes from the current pipe into buffer. Returns the number of bytes read. Will
return 0 if there is no data left to read or size was 0.

Raises system_error with errno set on read error.

@param[out] buffer The buffer to read data into.
@param[in] size The desired number of bytes to be read.
@return The number of bytes read.

*/
std::size_t Read(void* buffer, std::size_t size);

/**
Advances the reader to the next channel pipe.
*/
void Reset();

/**
Closes the current pipe. Subsequent calls to Read will raise TODO:Exception, unless a call
to Reset is first made.
*/
void Close();

/**
Returns the directory where channel pipes are read.
*/
std::string GetChannelDirectory() const {
return channel_directory_;
}

/**
Returns the SageMaker Pipe Mode channel name that is being read by this PipeReader.
*/
std::string GetChannelName() const {
return channel_name_;
}

private:
std::string BuildCurrentPipeName() const;
void Open();
std::uint32_t current_pipe_index_;
int current_pipe_;
std::string channel_directory_;
std::string channel_name_;
};
} // tensorflow
} // sagemaker

#endif
36 changes: 36 additions & 0 deletions cpp/pipemode/PipeReader/RecordReader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#include <RecordReader.hpp>
#include <stdexcept>

using namespace sagemaker::tensorflow;

RecordReader::RecordReader(PipeReader& reader, const std::size_t read_size,
const std::size_t buffer_capacity) : read_size_(read_size), pipe_reader_(reader),
buffer_(buffer_capacity), buffer_capacity_(buffer_capacity) {
if (buffer_capacity < 4) {
throw std::invalid_argument("buffer_capacity must be at least 4");
}
if (buffer_capacity < read_size) {
throw std::invalid_argument("buffer_capacity must be larger than read_size");
}
if (0 == read_size) {
throw std::invalid_argument("read_size must be greater-than zero");
}
}

std::size_t RecordReader::FillBuffer(std::size_t desired_size) {
buffer_.resize(buffer_capacity_);
std::size_t fill_read_size = std::min(buffer_capacity_, desired_size);
std::size_t bytes_read = 0;
while(bytes_read < fill_read_size) {

std::size_t next_read_size = pipe_reader_.Read(buffer_.data() + bytes_read,
std::min(fill_read_size, desired_size - bytes_read));

if(! next_read_size) {
break;
}
bytes_read += next_read_size;
}
buffer_.resize(bytes_read);
return bytes_read;
}
64 changes: 64 additions & 0 deletions cpp/pipemode/PipeReader/RecordReader.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#ifndef RECORD_READER_H
#define RECORD_READER_H

#include <vector>
#include <string>
#include <PipeReader.hpp>

namespace sagemaker {
namespace tensorflow {

/**
An abstract record reader that reads records from a PipeReader.
*/
class RecordReader {

public:

/**
Constructs a new RecordReader.

Records are read from the specified PipeReader reader. Each call to read on reader reads
read_size bytes. The RecordReader buffers up to buffer_capacity bytes from the underlying
PipeReader
*/
RecordReader(PipeReader& reader, const std::size_t read_size, const std::size_t buffer_capacity);

/**
Reads a record from the underlying PipeReader and stores the record data in the
specified string pointer. The specified string is resized to accomodate the record.

param [out] storage The string where the record is written to.
return true if a record could be read, false otherwise.
*/
virtual bool ReadRecord(std::string* storage) = 0;

protected:

/**
Reads up-to desired size bytes from this RecordReader's PipeReader. Returns the
number of bytes read.

param[in] desired_size The number of bytes to read.
return The number of bytes actually read.
*/
std::size_t FillBuffer(std::size_t desired_size);

/**
The current pipe reader of this RecordReader
*/
PipeReader& pipe_reader_;

/**
The buffer of characters read from pipe_reader_
*/
std::vector<char> buffer_;

private:
std::size_t read_size_;
std::size_t buffer_capacity_;
};
} // tensorflow
} // sagemaker

#endif
11 changes: 11 additions & 0 deletions cpp/pipemode/RecordIOReader/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
cmake_minimum_required(VERSION 3.10)
set (CMAKE_CXX_STANDARD 17)

file(GLOB_RECURSE sources ./*.cpp ./*.hpp)
add_library(libRecordIOReader STATIC ${sources})

target_link_libraries(libRecordIOReader libPipeReader)

target_include_directories(libRecordIOReader PUBLIC
${CMAKE_CURRENT_SOURCE_DIR}
)
Loading