Skip to content

Commit

Permalink
Merge branch 'master' into bakerstu-ble-basics
Browse files Browse the repository at this point in the history
* master:
  Fixes detecting EOF in the memory config protocol handler. (#789)
  Adds a new hub application using DirectHub (#761)
  High-performance hub component for dealing with many sockets and high throughput (#760)
  Fix build of esp8266 train implementation.
  • Loading branch information
balazsracz committed Jul 4, 2024
2 parents 633cdd4 + 0b853c6 commit a8d7880
Show file tree
Hide file tree
Showing 38 changed files with 5,255 additions and 46 deletions.
3 changes: 3 additions & 0 deletions applications/direct_hub/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
SUBDIRS = targets
-include config.mk
include $(OPENMRNPATH)/etc/recurse.mk
1 change: 1 addition & 0 deletions applications/direct_hub/config.mk
240 changes: 240 additions & 0 deletions applications/direct_hub/main.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
/** \copyright
* Copyright (c) 2013, Balazs Racz
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
* \file main.cxx
*
* An application which acts as an openlcb hub with the GC protocol, using the
* DirectHub infrastructure.
*
* @author Balazs Racz
* @date 31 Dec 2023
*/

#include <fcntl.h>
#include <getopt.h>
#include <stdio.h>
#include <unistd.h>

#include <memory>

#include "executor/Executor.hxx"
#include "executor/Service.hxx"
#include "os/os.h"
#include "utils/ClientConnection.hxx"
#include "utils/DirectHub.hxx"
#include "utils/HubDeviceSelect.hxx"
#include "utils/SocketCan.hxx"
#include "utils/constants.hxx"

Executor<1> g_executor("g_executor", 0, 1024);
Service g_service(&g_executor);

std::unique_ptr<ByteDirectHubInterface> g_direct_hub {create_hub(&g_executor)};

CanHubFlow can_hub0(&g_service);

OVERRIDE_CONST(gc_generate_newlines, 1);
OVERRIDE_CONST(gridconnect_tcp_snd_buffer_size, 8192);
OVERRIDE_CONST(gridconnect_tcp_rcv_buffer_size, 8192);
OVERRIDE_CONST(gridconnect_tcp_notsent_lowat_buffer_size, 1024);

int port = 12021;
const char *device_path = nullptr;
const char *socket_can_path = nullptr;
int upstream_port = 12021;
const char *upstream_host = nullptr;
bool timestamped = false;
bool export_mdns = false;
const char *mdns_name = "openmrn_hub";
bool printpackets = false;

void usage(const char *e)
{
fprintf(stderr,
"Usage: %s [-p port] [-d device_path] [-u upstream_host] "
"[-q upstream_port] [-m] [-n mdns_name] "
#if defined(__linux__)
"[-s socketcan_interface] "
#endif
"[-t] [-l]\n\n",
e);
fprintf(stderr,
"GridConnect CAN HUB.\nListens to a specific TCP port, "
"reads CAN packets from the incoming connections using "
"the GridConnect protocol, and forwards all incoming "
"packets to all other participants.\n\nArguments:\n");
fprintf(stderr,
"\t-p port specifies the port number to listen on, "
"default is 12021.\n");
fprintf(stderr,
"\t-d device is a path to a physical device doing "
"serial-CAN or USB-CAN. If specified, opens device and "
"adds it to the hub.\n");
#if defined(__linux__)
fprintf(stderr,
"\t-s socketcan_interface is a socketcan device (e.g. 'can0'). "
"If specified, opens device and adds it to the hub.\n");
#endif
fprintf(stderr,
"\t-u upstream_host is the host name for an upstream "
"hub. If specified, this hub will connect to an upstream "
"hub.\n");
fprintf(stderr,
"\t-q upstream_port is the port number for the upstream hub.\n");
fprintf(stderr, "\t-t prints timestamps for each packet.\n");
fprintf(stderr, "\t-l print all packets.\n");
#ifdef HAVE_AVAHI_CLIENT
fprintf(stderr, "\t-m exports the current service on mDNS.\n");
fprintf(
stderr, "\t-n mdns_name sets the exported mDNS name. Implies -m.\n");
#endif
exit(1);
}

void parse_args(int argc, char *argv[])
{
int opt;
while ((opt = getopt(argc, argv, "hp:d:s:u:q:tlmn:")) >= 0)
{
switch (opt)
{
case 'h':
usage(argv[0]);
break;
case 'd':
device_path = optarg;
break;
#if defined(__linux__)
case 's':
socket_can_path = optarg;
break;
#endif
case 'p':
port = atoi(optarg);
break;
case 'u':
upstream_host = optarg;
break;
case 'q':
upstream_port = atoi(optarg);
break;
case 't':
timestamped = true;
break;
case 'm':
export_mdns = true;
break;
case 'n':
mdns_name = optarg;
export_mdns = true;
break;
case 'l':
printpackets = true;
break;
default:
fprintf(stderr, "Unknown option %c\n", opt);
usage(argv[0]);
}
}
}

void create_legacy_bridge() {
static bool is_created = false;
if (!is_created) {
is_created = true;
create_gc_to_legacy_can_bridge(g_direct_hub.get(), &can_hub0);
}
}

/** Entry point to application.
* @param argc number of command line arguments
* @param argv array of command line arguments
* @return 0, should never return
*/
int appl_main(int argc, char *argv[])
{
parse_args(argc, argv);
// GcPacketPrinter packet_printer(&can_hub0, timestamped);
GcPacketPrinter *packet_printer = NULL;
if (printpackets)
{
create_legacy_bridge();
packet_printer = new GcPacketPrinter(&can_hub0, timestamped);
}
fprintf(stderr, "packet_printer points to %p\n", packet_printer);
create_direct_gc_tcp_hub(g_direct_hub.get(), port);
vector<std::unique_ptr<ConnectionClient>> connections;

#ifdef HAVE_AVAHI_CLIENT
void mdns_client_start();
void mdns_publish(const char *name, uint16_t port);

if (export_mdns)
{
mdns_client_start();
mdns_publish(mdns_name, port);
}
#endif

#if defined(__linux__)
if (socket_can_path)
{
int s = socketcan_open(socket_can_path, 1);
if (s >= 0)
{
create_legacy_bridge();
new HubDeviceSelect<CanHubFlow>(&can_hub0, s);
fprintf(stderr, "Opened SocketCan %s: fd %d\n", socket_can_path, s);
}
else
{
fprintf(stderr, "Failed to open SocketCan %s.\n", socket_can_path);
}
}
#endif

if (upstream_host)
{
connections.emplace_back(new UpstreamConnectionClient(
"upstream", g_direct_hub.get(), upstream_host, upstream_port));
}

if (device_path)
{
connections.emplace_back(new DeviceConnectionClient(
"device", g_direct_hub.get(), device_path));
}

while (1)
{
for (const auto &p : connections)
{
p->ping();
}
sleep(1);
}
return 0;
}
2 changes: 2 additions & 0 deletions applications/direct_hub/subdirs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
SUBDIRS = \

3 changes: 3 additions & 0 deletions applications/direct_hub/targets/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
SUBDIRS = linux.x86 \

include $(OPENMRNPATH)/etc/recurse.mk
2 changes: 2 additions & 0 deletions applications/direct_hub/targets/linux.x86/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
direct_hub
*_test
1 change: 1 addition & 0 deletions applications/direct_hub/targets/linux.x86/AvaHiMDNS.cxx
5 changes: 5 additions & 0 deletions applications/direct_hub/targets/linux.x86/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-include ../../config.mk
include $(OPENMRNPATH)/etc/prog.mk

SYSLIBRARIES += -lavahi-client -lavahi-common
CXXFLAGS += -DHAVE_AVAHI_CLIENT
1 change: 1 addition & 0 deletions applications/direct_hub/targets/linux.x86/lib/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include $(OPENMRNPATH)/etc/app_target_lib.mk
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ class ESPHuzzahTrain : public openlcb::TrainImpl
void set_speed(openlcb::SpeedType speed) override
{
lastSpeed_ = speed;
estop = false;
g_speed_controller.call_speed(speed);
if (f0)
{
Expand All @@ -257,10 +258,16 @@ class ESPHuzzahTrain : public openlcb::TrainImpl
/** Sets the train to emergency stop. */
void set_emergencystop() override
{
// g_speed_controller.call_estop();
g_speed_controller.call_estop();
lastSpeed_.set_mph(0); // keeps direction
estop = true;
}

bool get_emergencystop() override
{
return estop;
}

/** Sets the value of a function.
* @param address is a 24-bit address of the function to set. For legacy DCC
* locomotives, see @ref TractionDefs for the address definitions (0=light,
Expand Down Expand Up @@ -331,6 +338,7 @@ class ESPHuzzahTrain : public openlcb::TrainImpl
openlcb::SpeedType lastSpeed_ = 0.0;
bool f0 = false;
bool f1 = false;
bool estop = false;
};

const char kFdiXml[] =
Expand Down
8 changes: 8 additions & 0 deletions include/nmranet_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,14 @@ DECLARE_CONST(gridconnect_buffer_delay_usec);
* two threads per client (multi-threaded) execution model. */
DECLARE_CONST(gridconnect_tcp_use_select);

/// Maximum number of packets to parse from a single DirectHubPort before we
/// wait for data to drain from the system.
DECLARE_CONST(directhub_port_max_incoming_packets);

/// Number of bytes that we will be reading in one go from an incoming port. We
/// will allocate at least this many bytes dedicated for each input port.
DECLARE_CONST(directhub_port_incoming_buffer_size);

/** Number of entries in the remote alias cache */
DECLARE_CONST(remote_alias_cache_size);

Expand Down
68 changes: 68 additions & 0 deletions src/executor/AsyncNotifiableBlock.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/** \copyright
* Copyright (c) 2020, Balazs Racz
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
* \file AsyncNotifiableBlock.cxx
*
* An advanced notifiable construct that acts as a fixed pool of
* BarrierNotifiables. A stateflow can pend on acquiring one of them, use that
* barrier, with it automatically returning to the next caller when the Barrier
* goes out of counts.
*
* @author Balazs Racz
* @date 18 Feb 2020
*/

#ifndef _DEFAULT_SOURCE
#define _DEFAULT_SOURCE
#endif

#include "AsyncNotifiableBlock.hxx"

#include "os/sleep.h"

AsyncNotifiableBlock::~AsyncNotifiableBlock()
{
// Recollects all notifiable instances, including waiting as long as needed
// if there are some that have not finished yet.
for (unsigned i = 0; i < count_; ++i)
{
while (true)
{
QMember *m = next().item;
if (!m)
{
LOG(VERBOSE,
"shutdown async notifiable block: waiting for returns");
microsleep(100);
}
else
{
HASSERT(initialize(m)->abort_if_almost_done());
break;
}
}
}
}
Loading

0 comments on commit a8d7880

Please sign in to comment.