Skip to content

Commit

Permalink
Added a consumer base class. Simple file consumer does the dequeue op…
Browse files Browse the repository at this point in the history
…eration and passes the data to the Formatter
  • Loading branch information
Giuseppe Corbelli committed Oct 12, 2017
1 parent 4224550 commit 474b21d
Showing 1 changed file with 48 additions and 17 deletions.
65 changes: 48 additions & 17 deletions include/rtlog/Consumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,51 +15,82 @@

namespace rtlog {

/** Base consumer class */
template<typename LOGGER_TRAITS, typename QUEUE_TRAITS>
class CLogConsumerT
class CLogConsumerBaseT
{
protected:
moodycamel::ConcurrentQueue<rtlog::ArgumentArrayT<LOGGER_TRAITS>, QUEUE_TRAITS>& m_Queue;
std::atomic_bool m_Stop;

public:
typedef moodycamel::ConcurrentQueue<rtlog::ArgumentArrayT<LOGGER_TRAITS>, QUEUE_TRAITS> queue_type;

CLogConsumerBaseT(queue_type& queue) :
m_Queue(queue)
{
m_Stop.store(false);
}

virtual void consume() = 0;

void stop()
{
m_Stop.store(true);
}
};

/** Single file output consumer running in a new thread */
template<typename LOGGER_TRAITS, typename QUEUE_TRAITS>
class CLogConsumerSingleFileT : public CLogConsumerBaseT<LOGGER_TRAITS, QUEUE_TRAITS>
{
protected:
std::chrono::microseconds m_PollInterval;
rtlog::CFormatter m_Formatter;
rtlog::ArgumentArrayT<LOGGER_TRAITS> m_ArgumentArray;
std::thread m_ConsumerThread;
std::atomic_bool m_Stop;
std::string m_FileName;
std::ofstream m_Stream;

public:
typedef moodycamel::ConcurrentQueue<rtlog::ArgumentArrayT<LOGGER_TRAITS>, QUEUE_TRAITS> queue_type;
typedef CLogConsumerBaseT<LOGGER_TRAITS, QUEUE_TRAITS> base_type;
using queue_type = typename base_type::queue_type;

CLogConsumerT(const std::string& filename, queue_type& queue, uint32_t poll_interval_us) :
m_Queue(queue), m_PollInterval(poll_interval_us), m_FileName(filename),
CLogConsumerSingleFileT(const std::string& filename, queue_type& queue, uint32_t poll_interval_us) :
base_type(queue),
m_PollInterval(poll_interval_us), m_FileName(filename),
m_Stream(filename, std::ofstream::binary|std::ofstream::trunc|std::ofstream::out)
{
m_Stop.store(false);
m_ConsumerThread = std::thread(std::bind(&CLogConsumerT<LOGGER_TRAITS, QUEUE_TRAITS>::consume, this));
// Create and start thread
m_ConsumerThread = std::thread(std::bind(&CLogConsumerSingleFileT<LOGGER_TRAITS, QUEUE_TRAITS>::consume, this));
}

void consume()
virtual void consume()
{
const char* p;
while (!m_Stop.load(std::memory_order_acquire)) {
while ((p = m_Formatter.format(m_Queue)) != NULL)
m_Stream << p;

const typename LOGGER_TRAITS::CHAR_TYPE* p;
while (!this->m_Stop.load(std::memory_order_acquire)) {
while (this->m_Queue.try_dequeue(m_ArgumentArray)) {
// Dequeue a log message block
// It SHOULD be complete but it's not guaranteed
p = m_Formatter.format(m_ArgumentArray);
if (p)
m_Stream << p;
}
m_Stream.flush();
// TODO: sleep only for remaining poll interval
std::this_thread::sleep_for(m_PollInterval);
}

while ((p = m_Formatter.format(m_Queue)) != NULL)
m_Stream << p;
m_Stream.flush();
}

void stop()
{
m_Stop.store(true);
this->m_Stop.store(true);
m_ConsumerThread.join();
m_Stream.close();
}
};
using CLogConsumer = CLogConsumerT<rtlog::LoggerTraits, rtlog::ConcurrentQueueTraits>;
using CLogConsumerSingleFile = CLogConsumerSingleFileT<rtlog::LoggerTraits, rtlog::ConcurrentQueueTraits>;

} // namespace rtlog

0 comments on commit 474b21d

Please sign in to comment.