Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

High-performance hub component for dealing with many sockets and high throughput #760

Merged
merged 96 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from 89 commits
Commits
Show all changes
96 commits
Select commit Hold shift + click to select a range
a1e7416
Adds support for compiling simplelink with select async callback.
balazsracz Feb 10, 2020
fff2ff4
Adds variety of code and documentation about routing.
balazsracz Feb 15, 2020
8ece440
more comments on design.
balazsracz Feb 15, 2020
80a5588
More documentation.
balazsracz Feb 16, 2020
4ef9caa
Moves documentation to a separate .md file.
balazsracz Feb 16, 2020
0ae6bf1
more docs.
balazsracz Feb 16, 2020
0178288
Merge branch 'master' of github.com:bakerstu/openmrn into bracz-direc…
balazsracz Feb 16, 2020
8a7d3c8
Starts adding the outline of directhub.
balazsracz Feb 16, 2020
d8a5615
Adds implementation of input and output.
balazsracz Feb 16, 2020
e659189
Adds the class owning the message payloads.
balazsracz Feb 16, 2020
22e2579
Adds DataBuffer with a respective pool to allocate buffers holding un…
balazsracz Feb 17, 2020
9c68404
Adds basic write flow to DirectHub.
balazsracz Feb 17, 2020
4f8e8db
Adds code to RingBuffer that allows direct writes and reads into the …
balazsracz Feb 17, 2020
78691b9
swithes DirectHub to use DataBuffer.
balazsracz Feb 17, 2020
dd1ed87
Merges LinkedDataBuffer and DataBuffer.
balazsracz Feb 17, 2020
0fc10f0
Adds implementation of the read flow.
balazsracz Feb 17, 2020
6de3725
Adds end to end test for direct hub.
balazsracz Feb 17, 2020
7132261
Adds more test cases to the end to end test.
balazsracz Feb 17, 2020
a98007f
Adds a TCP listener hub to the DirectHub classes.
balazsracz Feb 18, 2020
b2d2f49
Adds a block notifiable class that can be used as a semaphore of Barr…
balazsracz Feb 18, 2020
d6fce5e
Adds limiting the memory buffer in the hub.
balazsracz Feb 18, 2020
b1a3162
Fixes bugs in the direct hub implementation.
balazsracz Feb 18, 2020
6697d6e
Part of the code for shutdown is copied from HuBDeviceSelect.
balazsracz Feb 23, 2020
83314f8
Implements clean shutdown in the hub.
balazsracz Feb 23, 2020
4003263
Adds a unit test that tries to make the socket blocked.
balazsracz Feb 26, 2020
75cce4c
Test total bytes transmitted via blocked test.
balazsracz Feb 27, 2020
be7ffd5
Adds allocation of the limited pool barrier notifiable into the read …
balazsracz Feb 28, 2020
3caaf9e
Merge branch 'master' into bracz-direct-hub-router
balazsracz Mar 1, 2020
b057848
Adds a bunch more unittests including for race conditions etc.
balazsracz Mar 1, 2020
0b178cb
reformat
balazsracz Mar 1, 2020
412eb86
Adds test for skipping and chaining.
balazsracz Mar 1, 2020
bda595b
fix compilation error
balazsracz Mar 1, 2020
d384fcd
updates chaining test to also check for a buffer with limited size.
balazsracz Mar 1, 2020
51fe936
Adds segmenter API and gridconnect segmenter implementation.
balazsracz Mar 1, 2020
9e622c2
Adds trivial message segmenter.
balazsracz Mar 1, 2020
c4c9bff
Adds LinkedDataBufferPtr to hold the buf_ head tail skip and free ele…
balazsracz Mar 1, 2020
803f5b7
Rewrites DirectHub to use LinkedDataBufferPtr.
balazsracz Mar 1, 2020
6696891
Fix compilation errors.
balazsracz Mar 2, 2020
f0ee164
Fix broken tests due to leaked reference.
balazsracz Mar 2, 2020
024f96d
Fixes some bugs around segmentation.
balazsracz Mar 4, 2020
c666165
Adds support for calling tests with TESTARGS.
balazsracz Mar 4, 2020
f568068
Adds more expectations on correct segmenting:
balazsracz Mar 4, 2020
42c370e
Fixes a bug on calling the segmenter clear too many times.
balazsracz Mar 4, 2020
70ecb96
Adds (one direction) of a legacy conversion routine.
balazsracz Mar 6, 2020
9e03b5b
Moves the code to reassemble a linkeddatabuffer into a string as a he…
balazsracz Mar 6, 2020
e996774
Implements the reversed conversion routine.
balazsracz Mar 6, 2020
13ed147
starts adding a unit test for the legacy CAN converter.
balazsracz Mar 6, 2020
f9fe772
gc_format fixes:
balazsracz Mar 7, 2020
f56ac28
Adds tests for the new gridconnect hub to old can-hub connection.
balazsracz Mar 7, 2020
bad32e7
Ensures that notifiable blocks are reclaimed when being deleted.
balazsracz Mar 7, 2020
c96e108
Makes DirectHub delete-able.
balazsracz Mar 7, 2020
e3aba68
Fixes race conditions on exit and memory leaks.
balazsracz Mar 7, 2020
0465ed1
Fix whitespace.
balazsracz Mar 7, 2020
addec73
Reorder includes.
balazsracz Mar 7, 2020
b155643
Fixes destructor and unit tests on async notifiable block.
balazsracz Mar 7, 2020
0ce5214
Adds support for SO_RCVBUF.
balazsracz Mar 8, 2020
1a9f939
Adds better debug/logging.
balazsracz Mar 8, 2020
710a9bb
Fixes bugs in leaking buffers when the remote socket was unexpectedly…
balazsracz Mar 8, 2020
ff1f1c3
Adds support for inplace append to next data buffer.
balazsracz Mar 8, 2020
ebef72e
Adds support to requesting next() while having the lock acquired exte…
balazsracz Mar 17, 2020
0e5f960
Adds support for appending to the tail buffer instead of enqueueing a…
balazsracz Mar 17, 2020
0d1ea13
Ensures that inbounds CAN frames are sent at the topmost priority.
balazsracz Mar 17, 2020
f40c92c
Merge branch 'master' into bracz-direct-hub-router
balazsracz Mar 28, 2022
4422475
Fixes a template instantiation problem.
Apr 9, 2022
49bc1be
Merge branch 'master' of github.com:bakerstu/openmrn into bracz-direc…
balazsracz Dec 22, 2023
04c9aaf
Adds a facility to override the directhub buffer pool allocation size…
balazsracz Dec 30, 2023
c86c375
Updates documentation about incoming direct byte buffering, as the pa…
balazsracz Dec 30, 2023
19ceb97
Avoid too deep recursion in state flow.
balazsracz Dec 30, 2023
dac5d9a
Fix broken test.
balazsracz Dec 30, 2023
0475a1f
Fix comment
balazsracz Dec 30, 2023
d0890cb
Refactors blocked socket tests.
balazsracz Dec 30, 2023
28968ce
Makes main buffer pool contain only specifically allocated buffers by…
balazsracz Dec 30, 2023
ad6c8da
Adds an extra blocked socket test with the prod buffer sizes.
balazsracz Dec 30, 2023
b84727e
Fixes incorrectly set done notifiable.
balazsracz Dec 30, 2023
26ccb99
Removes expectation on total number of bytes transferred, because it …
balazsracz Dec 31, 2023
e68dbad
Improves readability in data buffer, adds additional tests on an inva…
balazsracz Dec 31, 2023
aa60f18
Adds a facility to change constant values in unit tests.
balazsracz Dec 31, 2023
7bf8b3b
Adds some verbose log messages.
balazsracz Dec 31, 2023
5ff2f5d
Adds more comments for the data and code flow.
balazsracz Dec 31, 2023
60779aa
Uses the constant override facility to restoare the blocked tests to …
balazsracz Dec 31, 2023
c2d2ac1
Makes the directhub buffer size configurable with a link option.
balazsracz Dec 31, 2023
e97d1b1
Fix whitespace.
balazsracz Dec 31, 2023
9864e0b
Merge branch 'master' into bracz-direct-hub-router
balazsracz Dec 31, 2023
cff3c6e
Fix spacing and duplicate lines in sources file after merge conflict.
balazsracz Dec 31, 2023
0f7960a
Fix compile error on FdUtils under freertos.
balazsracz Jan 1, 2024
2edb40e
Ensures that we keep the barrier pointer in a child when a CAN packet…
balazsracz Jan 1, 2024
67b73a1
Write a new overview / introduction to the directhub page.
balazsracz Jan 2, 2024
ed2c799
Makes the documentation up to date.
balazsracz Jan 2, 2024
2e58e61
Minor updates to the docs.
balazsracz Jan 2, 2024
d869a7b
Removes limitation on wait time.
balazsracz Jan 6, 2024
3b883c7
FIx comments.
balazsracz Jun 21, 2024
39d50d2
Fix comments and reduce unnecessary log level.
balazsracz Jun 21, 2024
4820101
Remove unnecessary log.
balazsracz Jun 21, 2024
523412d
Fixed comment and adds a todo.
balazsracz Jun 21, 2024
3b9647e
Merge branch 'master' of github.com:bakerstu/openmrn into bracz-direc…
balazsracz Jun 21, 2024
6b80335
Fix test build.
balazsracz Jun 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions include/nmranet_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,14 @@ DECLARE_CONST(gridconnect_buffer_delay_usec);
* two threads per client (multi-threaded) execution model. */
DECLARE_CONST(gridconnect_tcp_use_select);

/// Maximum number of packets to parse from a single DirectHubPort before we
/// wait for data to drain from the system.
DECLARE_CONST(directhub_port_max_incoming_packets);

/// Number of bytes that we will be reading in one go from an incoming port. We
/// will allocate at least this many bytes dedicated for each input port.
DECLARE_CONST(directhub_port_incoming_buffer_size);

/** Number of entries in the remote alias cache */
DECLARE_CONST(remote_alias_cache_size);

Expand Down
71 changes: 71 additions & 0 deletions src/executor/AsyncNotifiableBlock.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/** \copyright
* Copyright (c) 2013, Balazs Racz
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check date

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
* \file AsyncNotifiableBlock.cxx
*
* An advanced notifiable construct that acts as a fixed pool of
* BarrierNotifiables. A stateflow can pend on acquiring one of them, use that
* barrier, with it automatically returning to the next caller when the Barrier
* goes out of counts.
*
* @author Balazs Racz
* @date 18 Feb 2020
*/

#ifndef _DEFAULT_SOURCE
#define _DEFAULT_SOURCE
#endif

#include "AsyncNotifiableBlock.hxx"

#include <unistd.h>

AsyncNotifiableBlock::~AsyncNotifiableBlock()
{
unsigned max = 10;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be good to rename this from max to something else to prevent confusion.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this constant 10 chosen? Should it somehow be related to count_? Some commentary would help.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the constant is removed.
Originally I set this code to run as a timeout. Now there is no timeout, the code runs forever (until the condition is reached).

// Recollects all notifiable instances, including waiting a bit if
// there are some that have not finished yet. Limits the total amount
// of wait.
for (unsigned i = 0; i < count_; ++i)
{
while (true)
{
QMember *m = next().item;
if (!m)
{
LOG(VERBOSE,
"shutdown async notifiable block: waiting for returns");
usleep(100);
HASSERT(--max);
}
else
{
HASSERT(initialize(m)->abort_if_almost_done());
break;
}
}
}
}
110 changes: 110 additions & 0 deletions src/executor/AsyncNotifiableBlock.cxxtest
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
#include "executor/AsyncNotifiableBlock.hxx"

#include "utils/test_main.hxx"

class AsyncNotifiableBlockTest : public ::testing::Test
{
protected:
AsyncNotifiableBlock b_ {2};
};

TEST_F(AsyncNotifiableBlockTest, create)
{
}

TEST_F(AsyncNotifiableBlockTest, count_request_release)
{
EXPECT_EQ(2u, b_.pending());
QMember *e = b_.next(0);
EXPECT_NE(nullptr, e);
EXPECT_EQ(1u, b_.pending());

QMember *f = b_.next(0);
EXPECT_NE(nullptr, f);
EXPECT_EQ(0u, b_.pending());

QMember *g = b_.next(0);
EXPECT_EQ(nullptr, g);
EXPECT_EQ(0u, b_.pending());

b_.initialize(e)->notify();
EXPECT_EQ(1u, b_.pending());

QMember *h = b_.next(0);
EXPECT_EQ(e, h);

EXPECT_EQ(0u, b_.pending());

b_.initialize(f)->notify();
b_.initialize(h)->notify();
}

TEST_F(AsyncNotifiableBlockTest, barrier_semantics)
{
EXPECT_EQ(2u, b_.pending());
QMember *e = b_.next(0);
BarrierNotifiable *bn = b_.initialize(e);
EXPECT_EQ(1u, b_.pending());

bn->new_child();
bn->notify();
EXPECT_EQ(1u, b_.pending());
bn->notify();
EXPECT_EQ(2u, b_.pending());
}

class FakeExecutable : public Executable
{
public:
void run() override
{
DIE("unexpected.");
}

void alloc_result(QMember *m) override
{
ASSERT_TRUE(m);
m_ = m;
}

QMember *m_ {nullptr};
};

TEST_F(AsyncNotifiableBlockTest, async_allocation)
{
EXPECT_EQ(2u, b_.pending());
QMember *e = b_.next(0);
EXPECT_NE(nullptr, e);
EXPECT_EQ(1u, b_.pending());

FakeExecutable cli1, cli2, cli3;
EXPECT_EQ(nullptr, cli1.m_);
EXPECT_EQ(nullptr, cli2.m_);
EXPECT_EQ(nullptr, cli3.m_);

b_.next_async(&cli1);
EXPECT_EQ(0u, b_.pending());
EXPECT_NE(nullptr, cli1.m_);
EXPECT_NE(e, cli1.m_);

b_.next_async(&cli2);
b_.next_async(&cli3);
EXPECT_EQ(nullptr, cli2.m_);
EXPECT_EQ(nullptr, cli3.m_);
EXPECT_EQ(0u, b_.pending());

b_.initialize(e)->notify(); // will be handed out to cli2

EXPECT_EQ(0u, b_.pending());
EXPECT_EQ(e, cli2.m_);

b_.initialize(cli1.m_)->notify(); // will be handed out to cli3
EXPECT_EQ(cli1.m_, cli3.m_);
EXPECT_EQ(0u, b_.pending());

b_.initialize(cli3.m_)->notify(); // will be handed back
EXPECT_EQ(1u, b_.pending());

b_.initialize(cli2.m_)->notify(); // will be handed back
EXPECT_EQ(2u, b_.pending());
}
139 changes: 139 additions & 0 deletions src/executor/AsyncNotifiableBlock.hxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/** \copyright
* Copyright (c) 2013, Balazs Racz
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
* \file AsyncNotifiableBlock.hxx
*
* An advanced notifiable construct that acts as a fixed pool of
* BarrierNotifiables. A stateflow can pend on acquiring one of them, use that
* barrier, with it automatically returning to the next caller when the Barrier
* goes out of counts.
*
* @author Balazs Racz
* @date 18 Feb 2020
*/

#ifndef _EXECUTOR_ASYNCNOTIFIABLEBLOCK_HXX_
#define _EXECUTOR_ASYNCNOTIFIABLEBLOCK_HXX_

#include <memory>

#include "executor/Notifiable.hxx"
#include "utils/Queue.hxx"
#include "utils/logging.h"

#include "utils/Buffer.hxx"

/// A block of BarrierNotifiable objects, with an asynchronous allocation
/// call. Caller StateFlows can block on allocating a new entry, and then get
/// back a fresh BarrierNotifiable, which, upon being released will
/// automatically be reallocated to a waiting flow, if any.
class AsyncNotifiableBlock : private Notifiable, public QAsync
{
private:
/// Notifiable class that can act as a BarrierNotifiable but also be
/// enlisted in a queue.
class QueuedBarrier : public BarrierNotifiable, public QMember
{
public:
/// Notification implementation.
///
/// Theory of operation: If this was the last notification (count goes
/// from 1 to 0), we take the done_ pointer, cast it to the owning
/// AsyncNotifiableBlock, and release outselves into the queue
/// there. We keep the count at 1 at all times, which ensures that the
/// done_ pointer remains pointing to the owner AsyncNotifiableBlock.
void notify() override
{
AtomicHolder h(this);
if (count_ == 1)
{
LOG(VERBOSE, "block notifiable %p returned pool size %u",
(BarrierNotifiable *)this,
(unsigned)mainBufferPool->total_size());
auto *tgt = static_cast<AsyncNotifiableBlock *>(done_);
tgt->insert(this);
}
else
{
--count_;
}
}

/// Checks that there is exactly one count in here.
void check_one_count()
{
HASSERT(count_ == 1);
}
};

public:
/// Constructor. @param num_parallelism tells how many BarrierNotifiables
/// we should have and hand out to callers requesting them.
AsyncNotifiableBlock(unsigned num_parallelism)
: count_(num_parallelism)
, barriers_(new QueuedBarrier[num_parallelism])
{
for (unsigned i = 0; i < num_parallelism; ++i)
{
barriers_[i].reset(this);
this->insert(&barriers_[i]);
}
}

/// Destructor.
~AsyncNotifiableBlock();

/// Turns an allocated entry from the QAsync into a usable
/// BarrierNotifiable.
/// @param entry a QMember that was allocated from *this.
/// @return an initialized BarrierNotifiable with exactly one count, and
/// done_ set up to be returned for further use.
BarrierNotifiable *initialize(QMember *entry)
{
QueuedBarrier *b = static_cast<QueuedBarrier *>(entry);
// We must be owning this entry.
HASSERT(barriers_.get() <= b);
HASSERT(b <= (barriers_.get() + count_));
b->check_one_count();
return b;
}

/// Notification implementation -- should never be called.
void notify() override
{
DIE("Should not receive this notification");
}

private:
/// How many barriers do we have.
unsigned count_;
/// The pointer to the block of barriernotifiables.
std::unique_ptr<QueuedBarrier[]> barriers_;

DISALLOW_COPY_AND_ASSIGN(AsyncNotifiableBlock);
};

#endif // _EXECUTOR_ASYNCNOTIFIABLEBLOCK_HXX_
4 changes: 2 additions & 2 deletions src/executor/Notifiable.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ private:
/// A BarrierNotifiable allows to create a number of child Notifiable and wait
/// for all of them to finish. When the last one is finished, the parent done
/// callback is called.
class BarrierNotifiable : public Notifiable, private Atomic
class BarrierNotifiable : public Notifiable, protected Atomic
{
public:
/** Constructs a barrier notifiable that is done. Users should call reset()
Expand Down Expand Up @@ -240,7 +240,7 @@ public:
}
}

private:
protected:
/// How many outstanding notifications we are still waiting for. When 0,
/// the barrier is not live; when reaches zero, done_ will be called.
unsigned count_;
Expand Down
1 change: 1 addition & 0 deletions src/executor/sources
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ VPATH := $(SRCDIR)
CSRCS +=

CXXSRCS += \
AsyncNotifiableBlock.cxx \
Executor.cxx \
Notifiable.cxx \
Service.cxx \
Expand Down
Loading