Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
rbx committed Dec 3, 2020
1 parent 92d3144 commit d969ed0
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 52 deletions.
52 changes: 13 additions & 39 deletions fairmq/FairMQChannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ class FairMQChannel
/// Sends a message to the socket queue.
/// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been queued,TransferCode::timeout if timed out,TransferCode::error if there was an error,TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Send(FairMQMessagePtr& msg, int sndTimeoutInMs = -1)
{
CheckSendCompatibility(msg);
Expand All @@ -267,7 +267,7 @@ class FairMQChannel
/// Receives a message from the socket queue.
/// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been received,TransferCode::timeout if timed out,TransferCode::error if there was an error,TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Receive(FairMQMessagePtr& msg, int rcvTimeoutInMs = -1)
{
CheckReceiveCompatibility(msg);
Expand All @@ -277,7 +277,7 @@ class FairMQChannel
/// Send a vector of messages
/// @param msgVec message vector reference
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been queued,TransferCode::timeout if timed out,TransferCode::error if there was an error,TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Send(std::vector<FairMQMessagePtr>& msgVec, int sndTimeoutInMs = -1)
{
CheckSendCompatibility(msgVec);
Expand All @@ -287,7 +287,7 @@ class FairMQChannel
/// Receive a vector of messages
/// @param msgVec message vector reference
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been received,TransferCode::timeout if timed out,TransferCode::error if there was an error,TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Receive(std::vector<FairMQMessagePtr>& msgVec, int rcvTimeoutInMs = -1)
{
CheckReceiveCompatibility(msgVec);
Expand All @@ -297,7 +297,7 @@ class FairMQChannel
/// Send FairMQParts
/// @param parts FairMQParts reference
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been queued,TransferCode::timeout if timed out,TransferCode::error if there was an error,TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Send(FairMQParts& parts, int sndTimeoutInMs = -1)
{
return Send(parts.fParts, sndTimeoutInMs);
Expand All @@ -306,7 +306,7 @@ class FairMQChannel
/// Receive FairMQParts
/// @param parts FairMQParts reference
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been received,TransferCode::timeout if timed out,TransferCode::error if there was an error,TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Receive(FairMQParts& parts, int rcvTimeoutInMs = -1)
{
return Receive(parts.fParts, rcvTimeoutInMs);
Expand All @@ -330,46 +330,20 @@ class FairMQChannel
auto Transport() -> FairMQTransportFactory* { return fTransportFactory.get(); };

template<typename... Args>
FairMQMessagePtr NewMessage(Args&&... args)
{
return Transport()->CreateMessage(std::forward<Args>(args)...);
}

FairMQMessagePtr NewMessage(Args&&... args) { return Transport()->CreateMessage(std::forward<Args>(args)...); }
template<typename... Args>
FairMQMessagePtr NewBuffer(Args&&... args)
{
return Transport()->CreateBuffer(std::forward<Args>(args)...);
}

fair::mq::Buffer NewBuffer(Args&&... args) { return Transport()->NewBuffer(std::forward<Args>(args)...); }
template<typename T>
FairMQMessagePtr NewSimpleMessage(const T& data)
{
return Transport()->NewSimpleMessage(data);
}

FairMQMessagePtr NewSimpleMessage(const T& data) { return Transport()->NewSimpleMessage(data); }
template<typename T>
FairMQMessagePtr NewSimpleBuffer(const T& data)
{
return Transport()->NewSimpleMessage(data);
}

fair::mq::Buffer NewSimpleBuffer(const T& data) { return Transport()->NewSimpleBuffer(data); }
template<typename T>
FairMQMessagePtr NewStaticMessage(const T& data)
{
return Transport()->NewStaticMessage(data);
}

FairMQMessagePtr NewStaticMessage(const T& data) { return Transport()->NewStaticMessage(data); }
template<typename T>
FairMQMessagePtr NewStaticBuffer(const T& data)
{
return Transport()->NewStaticMessage(data);
}
fair::mq::Buffer NewStaticBuffer(const T& data) { return Transport()->NewStaticBuffer(data); }

template<typename... Args>
FairMQUnmanagedRegionPtr NewUnmanagedRegion(Args&&... args)
{
return Transport()->CreateUnmanagedRegion(std::forward<Args>(args)...);
}
FairMQUnmanagedRegionPtr NewUnmanagedRegion(Args&&... args) { return Transport()->CreateUnmanagedRegion(std::forward<Args>(args)...); }

static constexpr fair::mq::Transport DefaultTransportType = fair::mq::Transport::DEFAULT;
static constexpr const char* DefaultTransportName = "default";
Expand Down
35 changes: 28 additions & 7 deletions fairmq/FairMQDevice.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,7 @@ class FairMQDevice
}

/// @brief Getter for default transport factory
auto Transport() const -> FairMQTransportFactory*
{
return fTransportFactory.get();
}
auto Transport() const -> FairMQTransportFactory* { return fTransportFactory.get(); }

// creates message with the default device transport
template<typename... Args>
Expand All @@ -168,14 +165,14 @@ class FairMQDevice

// creates buffer with the default device transport
template<typename... Args>
FairMQMessagePtr NewBuffer(Args&&... args)
fair::mq::Buffer NewBuffer(Args&&... args)
{
return Transport()->CreateBuffer(std::forward<Args>(args)...);
return Transport()->NewBuffer(std::forward<Args>(args)...);
}

// creates buffer with the transport of the specified channel
template<typename... Args>
FairMQMessagePtr NewBufferFor(const std::string& channel, int index, Args&&... args)
fair::mq::Buffer NewBufferFor(const std::string& channel, int index, Args&&... args)
{
return GetChannel(channel, index).NewBuffer(std::forward<Args>(args)...);
}
Expand All @@ -194,6 +191,18 @@ class FairMQDevice
return GetChannel(channel, index).NewStaticMessage(data);
}

template<typename T>
fair::mq::Buffer NewStaticBuffer(const T& data)
{
return Transport()->NewStaticBuffer(data);
}

template<typename T>
fair::mq::Buffer NewStaticBufferFor(const std::string& channel, int index, const T& data)
{
return GetChannel(channel, index).NewStaticBuffer(data);
}

// creates a message with a copy of the provided data, with the default device transport
template<typename T>
FairMQMessagePtr NewSimpleMessage(const T& data)
Expand All @@ -208,6 +217,18 @@ class FairMQDevice
return GetChannel(channel, index).NewSimpleMessage(data);
}

template<typename T>
fair::mq::Buffer NewSimpleBuffer(const T& data)
{
return Transport()->NewSimpleBuffer(data);
}

template<typename T>
fair::mq::Buffer NewSimpleBufferFor(const std::string& channel, int index, const T& data)
{
return GetChannel(channel, index).NewSimpleBuffer(data);
}

// creates unamanaged region with the default device transport
template<typename... Args>
FairMQUnmanagedRegionPtr NewUnmanagedRegion(Args&&... args)
Expand Down
18 changes: 12 additions & 6 deletions fairmq/FairMQTransportFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class FairMQTransportFactory
operator fair::mq::ChannelResource*() { return &fMemoryResource; }

template<typename... Args>
fair::mq::Buffer CreateBuffer(Args&&... args)
fair::mq::Buffer NewBuffer(Args&&... args)
{
return CreateMessage(std::forward<Args>(args)...);
}
Expand Down Expand Up @@ -133,9 +133,7 @@ class FairMQTransportFactory

static auto CreateTransportFactory(const std::string& type, const std::string& id = "", const fair::mq::ProgOptions* config = nullptr) -> std::shared_ptr<FairMQTransportFactory>;

static void FairMQNoCleanup(void* /*data*/, void* /*obj*/)
{
}
static void FairMQNoCleanup(void* /*data*/, void* /*obj*/) {}

template<typename T>
static void FairMQSimpleMsgCleanup(void* /*data*/, void* obj)
Expand All @@ -144,7 +142,10 @@ class FairMQTransportFactory
}

template<typename... Args>
fair::mq::Buffer NewSimpleBuffer()
fair::mq::Buffer NewSimpleBuffer(Args&&... args)
{
return NewSimpleMessage(std::forward<Args>(args)...);
}

template<typename T>
FairMQMessagePtr NewSimpleMessage(const T& data)
Expand All @@ -164,11 +165,16 @@ class FairMQTransportFactory

FairMQMessagePtr NewSimpleMessage(const std::string& str)
{

std::string* msgStr = new std::string(str);
return CreateMessage(const_cast<char*>(msgStr->c_str()), msgStr->length(), FairMQSimpleMsgCleanup<std::string>, msgStr);
}

template<typename... Args>
fair::mq::Buffer NewStaticBuffer(Args&&... args)
{
return NewStaticMessage(std::forward<Args>(args)...);
}

template<typename T>
FairMQMessagePtr NewStaticMessage(const T& data)
{
Expand Down

0 comments on commit d969ed0

Please sign in to comment.