diff --git a/protobuf/CMakeLists.txt b/protobuf/CMakeLists.txt index e6fe5e065..c411214ec 100644 --- a/protobuf/CMakeLists.txt +++ b/protobuf/CMakeLists.txt @@ -3,3 +3,4 @@ add_subdirectory(echo) add_subdirectory(protoc_echo_plugin) add_subdirectory(protoc_echo_plugin_csharp) add_subdirectory(protoc_echo_plugin_java) +add_subdirectory(meta_services) diff --git a/protobuf/echo/Echo.cpp b/protobuf/echo/Echo.cpp index 1df97f818..0b4c0e372 100644 --- a/protobuf/echo/Echo.cpp +++ b/protobuf/echo/Echo.cpp @@ -2,6 +2,20 @@ namespace services { + ServiceId::ServiceId(uint32_t id) + : id(id) + {} + + Service::Service(Echo& echo, uint32_t serviceId) + : ServiceId(serviceId) + , infra::Observer(echo) + {} + + uint32_t ServiceIdAccess::GetId(const ServiceId& serviceId) + { + return serviceId.id; + } + void Service::MethodDone() { Rpc().ServiceDone(); diff --git a/protobuf/echo/Echo.hpp b/protobuf/echo/Echo.hpp index 5790515b5..b1331ae5b 100644 --- a/protobuf/echo/Echo.hpp +++ b/protobuf/echo/Echo.hpp @@ -10,20 +10,43 @@ #include "protobuf/echo/EchoErrorPolicy.hpp" #include "protobuf/echo/Proto.hpp" #include "protobuf/echo/Serialization.hpp" +#include namespace services { class Echo; + class ServiceId; class Service; class ServiceProxy; - class Service - : public infra::Observer + class ServiceIdAccess { public: - using infra::Observer::Observer; + static uint32_t GetId(const ServiceId& serviceId); + }; + + class ServiceId + { + friend class ServiceIdAccess; + + public: + ServiceId(uint32_t id); + + virtual bool AcceptsService(uint32_t id) const + { + return id == this->id; + } - virtual bool AcceptsService(uint32_t id) const = 0; + private: + const uint32_t id; + }; + + class Service + : public ServiceId + , public infra::Observer + { + public: + Service(Echo& echo, uint32_t serviceId); void MethodDone(); virtual infra::SharedPtr StartMethod(uint32_t serviceId, uint32_t methodId, uint32_t size, const EchoErrorPolicy& errorPolicy) = 0; diff --git a/protobuf/echo/ServiceForwarder.cpp b/protobuf/echo/ServiceForwarder.cpp index 2dbb54a9c..003e86a32 100644 --- a/protobuf/echo/ServiceForwarder.cpp +++ b/protobuf/echo/ServiceForwarder.cpp @@ -1,9 +1,10 @@ #include "protobuf/echo/ServiceForwarder.hpp" +#include namespace services { - ServiceForwarderBase::ServiceForwarderBase(Echo& echo, Echo& forwardTo) - : Service(echo) + ServiceForwarderBase::ServiceForwarderBase(Echo& echo, uint32_t id, Echo& forwardTo) + : Service(echo, id) , ServiceProxy(forwardTo, 0) {} @@ -89,7 +90,7 @@ namespace services } ServiceForwarder::ServiceForwarder(Echo& echo, uint32_t id, Echo& forwardTo) - : ServiceForwarderBase(echo, forwardTo) + : ServiceForwarderBase(echo, id, forwardTo) , serviceId(id) {} diff --git a/protobuf/echo/ServiceForwarder.hpp b/protobuf/echo/ServiceForwarder.hpp index 0d7f1c2e4..b8714d8b6 100644 --- a/protobuf/echo/ServiceForwarder.hpp +++ b/protobuf/echo/ServiceForwarder.hpp @@ -2,6 +2,7 @@ #define PROTOBUF_ECHO_SERVICE_FORWARDER_HPP #include "protobuf/echo/Echo.hpp" +#include namespace services { @@ -12,7 +13,7 @@ namespace services , private MethodSerializer { public: - ServiceForwarderBase(Echo& echo, Echo& forwardTo); + ServiceForwarderBase(Echo& echo, uint32_t id, Echo& forwardTo); // Implementation of Service infra::SharedPtr StartMethod(uint32_t serviceId, uint32_t methodId, uint32_t size, const EchoErrorPolicy& errorPolicy) override; diff --git a/protobuf/echo/test_doubles/ServiceStub.cpp b/protobuf/echo/test_doubles/ServiceStub.cpp index 5fc1b588d..02160a036 100644 --- a/protobuf/echo/test_doubles/ServiceStub.cpp +++ b/protobuf/echo/test_doubles/ServiceStub.cpp @@ -1,4 +1,5 @@ #include "protobuf/echo/test_doubles/ServiceStub.hpp" +#include namespace services { @@ -50,15 +51,10 @@ namespace services return value; } - ServiceStub::ServiceStub(Echo& echo) - : Service(echo) + ServiceStub::ServiceStub(Echo& echo, uint32_t serviceId) + : Service(echo, serviceId) {} - bool ServiceStub::AcceptsService(uint32_t id) const - { - return id == serviceId; - } - infra::SharedPtr ServiceStub::StartMethod(uint32_t serviceId, uint32_t methodId, uint32_t size, const services::EchoErrorPolicy& errorPolicy) { switch (methodId) @@ -84,8 +80,9 @@ namespace services } } - ServiceStubProxy::ServiceStubProxy(services::Echo& echo) + ServiceStubProxy::ServiceStubProxy(services::Echo& echo, uint32_t serviceId) : services::ServiceProxy(echo, maxMessageSize) + , serviceId(serviceId) {} void ServiceStubProxy::Method(uint32_t value) diff --git a/protobuf/echo/test_doubles/ServiceStub.hpp b/protobuf/echo/test_doubles/ServiceStub.hpp index 4c9102898..d11711a19 100644 --- a/protobuf/echo/test_doubles/ServiceStub.hpp +++ b/protobuf/echo/test_doubles/ServiceStub.hpp @@ -3,6 +3,7 @@ #include "protobuf/echo/Echo.hpp" #include "gmock/gmock.h" +#include namespace services { @@ -64,9 +65,7 @@ namespace services : public services::Service { public: - ServiceStub(Echo& echo); - - bool AcceptsService(uint32_t id) const override; + ServiceStub(Echo& echo, uint32_t serviceId = defaultServiceId); MOCK_METHOD(void, Method, (uint32_t value)); MOCK_METHOD(void, MethodNoParameter, ()); @@ -76,7 +75,7 @@ namespace services infra::SharedPtr StartMethod(uint32_t serviceId, uint32_t methodId, uint32_t size, const services::EchoErrorPolicy& errorPolicy) override; public: - static const uint32_t serviceId = 1; + static constexpr uint32_t defaultServiceId = 1; static const uint32_t idMethod = 1; static const uint32_t idMethodNoParameter = 3; static const uint32_t idMethodBytes = 4; @@ -90,20 +89,23 @@ namespace services : public services::ServiceProxy { public: - ServiceStubProxy(services::Echo& echo); + ServiceStubProxy(services::Echo& echo, uint32_t serviceId = defaultServiceId); public: void Method(uint32_t value); void MethodNoParameter(); public: - static constexpr uint32_t serviceId = 1; + static constexpr uint32_t defaultServiceId = 1; static constexpr uint32_t idMethod = 1; static const uint32_t idMethodNoParameter = 3; static constexpr uint32_t maxMessageSize = 18; public: using MethodTypeList = infra::List; + + private: + const uint32_t serviceId; }; } diff --git a/protobuf/meta_services/CMakeLists.txt b/protobuf/meta_services/CMakeLists.txt new file mode 100644 index 000000000..2e9c45aa7 --- /dev/null +++ b/protobuf/meta_services/CMakeLists.txt @@ -0,0 +1,20 @@ +add_library(protobuf.meta_services ${EMIL_EXCLUDE_FROM_ALL} STATIC) + +target_include_directories(protobuf.meta_services PUBLIC + "$" + "$" +) + +protocol_buffer_csharp(protobuf.meta_services ServiceDiscovery.proto) +protocol_buffer_java(protobuf.meta_services ServiceDiscovery.proto) +protocol_buffer_echo_all(protobuf.meta_services ServiceDiscovery.proto) + +target_sources(protobuf.meta_services PRIVATE + PeerServiceDiscoverer.cpp + PeerServiceDiscoverer.hpp + ServiceDiscoveryEcho.cpp + ServiceDiscoveryEcho.hpp +) + +add_subdirectory(test) + diff --git a/protobuf/meta_services/PeerServiceDiscoverer.cpp b/protobuf/meta_services/PeerServiceDiscoverer.cpp new file mode 100644 index 000000000..2bba922bc --- /dev/null +++ b/protobuf/meta_services/PeerServiceDiscoverer.cpp @@ -0,0 +1,92 @@ +#include "protobuf/meta_services/PeerServiceDiscoverer.hpp" +#include "infra/event/EventDispatcher.hpp" +#include "infra/util/Function.hpp" +#include +#include + +namespace application +{ + PeerServiceDiscovererEcho::PeerServiceDiscovererEcho(services::Echo& echo) + : service_discovery::ServiceDiscoveryProxy(echo) + , service_discovery::ServiceDiscoveryResponse(echo) + { + DiscoverPeerServices(); + } + + void PeerServiceDiscovererEcho::NoServiceSupported() + { + NotifyObservers([this](auto& observer) + { + observer.ServicesDiscovered(services.range()); + }); + + MethodDone(); + } + + void PeerServiceDiscovererEcho::FirstServiceSupported(uint32_t id) + { + services.push_back(id); + + if (id == SearchRangeEnd()) + NotifyObservers([this](auto& observer) + { + observer.ServicesDiscovered(services.range()); + }); + else + RequestSend([this] + { + FindFirstServiceInRange(services.back() + 1, SearchRangeEnd()); + }); + + MethodDone(); + } + + void PeerServiceDiscovererEcho::ServicesChanged(uint32_t startServiceId, uint32_t endServiceId) + { + searchRange = ServiceRange{ startServiceId, endServiceId }; + + StartDiscovery(); + + MethodDone(); + } + + void PeerServiceDiscovererEcho::DiscoverPeerServices() + { + searchRange = serviceRangeMax; + + RequestSend([this] + { + NotifyServiceChanges(true); + StartDiscovery(); + }); + } + + void PeerServiceDiscovererEcho::ClearUpdatedServices() + { + services.erase(std::remove_if(services.begin(), services.end(), [this](uint32_t serviceId) + { + return serviceId >= SearchRangeBegin() && serviceId <= SearchRangeEnd(); + }), + services.end()); + } + + void PeerServiceDiscovererEcho::StartDiscovery() + { + ClearUpdatedServices(); + + RequestSend([this] + { + FindFirstServiceInRange(SearchRangeBegin(), SearchRangeEnd()); + }); + } + + uint32_t PeerServiceDiscovererEcho::SearchRangeBegin() const + { + return std::get<0>(searchRange); + } + + uint32_t PeerServiceDiscovererEcho::SearchRangeEnd() const + { + return std::get<1>(searchRange); + } +} diff --git a/protobuf/meta_services/PeerServiceDiscoverer.hpp b/protobuf/meta_services/PeerServiceDiscoverer.hpp new file mode 100644 index 000000000..ef7b24813 --- /dev/null +++ b/protobuf/meta_services/PeerServiceDiscoverer.hpp @@ -0,0 +1,54 @@ +#ifndef PROTOBUF_ECHO_PEER_SERVICE_DISCOVERER_HPP +#define PROTOBUF_ECHO_PEER_SERVICE_DISCOVERER_HPP + +#include "generated/echo/ServiceDiscovery.pb.hpp" +#include "infra/util/BoundedVector.hpp" +#include "infra/util/MemoryRange.hpp" +#include "infra/util/Observer.hpp" +#include "protobuf/echo/Echo.hpp" +#include +#include + +namespace application +{ + class PeerServiceDiscovererEcho; + + class PeerServiceDiscoveryObserver + : public infra::Observer + { + public: + using infra::Observer::Observer; + + virtual void ServicesDiscovered(infra::MemoryRange services) = 0; + }; + + class PeerServiceDiscovererEcho + : public service_discovery::ServiceDiscoveryProxy + , public service_discovery::ServiceDiscoveryResponse + , public infra::Subject + { + public: + explicit PeerServiceDiscovererEcho(services::Echo& echo); + + void NoServiceSupported() override; + void FirstServiceSupported(uint32_t id) override; + void ServicesChanged(uint32_t startServiceId, uint32_t endServiceId) override; + + private: + using ServiceRange = std::tuple; + static constexpr ServiceRange serviceRangeMax = std::make_tuple(0, std::numeric_limits::max()); + + private: + void DiscoverPeerServices(); + void StartDiscovery(); + void ClearUpdatedServices(); + uint32_t SearchRangeBegin() const; + uint32_t SearchRangeEnd() const; + + private: + infra::BoundedVector::WithMaxSize<100> services; + ServiceRange searchRange; + }; +} + +#endif diff --git a/protobuf/meta_services/ServiceDiscovery.proto b/protobuf/meta_services/ServiceDiscovery.proto new file mode 100644 index 000000000..505deee80 --- /dev/null +++ b/protobuf/meta_services/ServiceDiscovery.proto @@ -0,0 +1,40 @@ +syntax = "proto3"; + +import "EchoAttributes.proto"; + +package service_discovery; +option java_package = "com.philips.emil.protobufEcho"; +option java_outer_classname = "ServiceDiscoveryProto"; + +message Uint32Value { + uint32 value = 1; +} + +message BoolValue +{ + bool value = 1; +} + +message ServiceRange +{ + uint32 startServiceId = 1; + uint32 endServiceId = 2; +} + +service ServiceDiscovery +{ + option (service_id) = 1000; + + rpc FindFirstServiceInRange(ServiceRange) returns (Nothing) { option (method_id) = 1; } + rpc NotifyServiceChanges(BoolValue) returns (Nothing) { option (method_id) = 2; } +} + +service ServiceDiscoveryResponse +{ + option (service_id) = 1001; + + rpc FirstServiceSupported(Uint32Value) returns (Nothing) { option (method_id) = 1; } + rpc NoServiceSupported(Nothing) returns (Nothing) { option (method_id) = 2; } + rpc ServicesChanged(ServiceRange) returns (Nothing) { option (method_id) = 3; } +} + diff --git a/protobuf/meta_services/ServiceDiscoveryEcho.cpp b/protobuf/meta_services/ServiceDiscoveryEcho.cpp new file mode 100644 index 000000000..b88d09a9f --- /dev/null +++ b/protobuf/meta_services/ServiceDiscoveryEcho.cpp @@ -0,0 +1,205 @@ +#include "protobuf/meta_services/ServiceDiscoveryEcho.hpp" +#include "echo/ServiceDiscovery.pb.hpp" +#include "infra/event/EventDispatcher.hpp" +#include "infra/util/Optional.hpp" +#include "infra/util/ReallyAssert.hpp" +#include "protobuf/echo/Echo.hpp" +#include "services/tracer/GlobalTracer.hpp" +#include +#include +#include + +namespace application +{ + void ServiceDiscoveryEcho::FindFirstServiceInRange(uint32_t startServiceId, uint32_t endServiceId) + { + if (auto service = FirstSupportedServiceId(startServiceId, endServiceId); service) + { + auto serviceId = *service; + + serviceSupportedClaimer.Claim([this, serviceId] + { + service_discovery::ServiceDiscoveryResponseProxy::RequestSend([this, serviceId] + { + FirstServiceSupported(serviceId); + serviceSupportedClaimer.Release(); + }); + }); + } + else + { + serviceSupportedClaimer.Claim([this] + { + service_discovery::ServiceDiscoveryResponseProxy::RequestSend([this] + { + NoServiceSupported(); + serviceSupportedClaimer.Release(); + }); + }); + } + + MethodDone(); + } + + uint32_t ServiceDiscoveryEcho::GetServiceId(infra::Observer& observer) const + { + return services::ServiceIdAccess::GetId(*static_cast(static_cast(&observer))); + } + + infra::Optional ServiceDiscoveryEcho::FirstSupportedServiceId(uint32_t startServiceId, uint32_t endServiceId) + { + struct FirstSupportedServiceQuery + { + void UpdateServiceId(uint32_t id) + { + if (id >= startServiceId && id <= endServiceId) + { + if (serviceId) + *serviceId = std::min(*serviceId, id); + else + serviceId = infra::MakeOptional(id); + } + } + + const uint32_t startServiceId; + const uint32_t endServiceId; + infra::Optional serviceId; + }; + + FirstSupportedServiceQuery query{ startServiceId, endServiceId }; + + services::Echo::NotifyObservers([&query, this](auto& observer) + { + query.UpdateServiceId(GetServiceId(observer)); + }); + + query.UpdateServiceId(GetServiceId(*this)); + + return query.serviceId; + } + + void ServiceDiscoveryEcho::NotifyServiceChanges(bool value) + { + notifyServiceChanges = value; + + if (!notifyServiceChanges) + changedServices = infra::none; + + MethodDone(); + } + + bool ServiceDiscoveryEcho::AcceptsService(uint32_t serviceId) const + { + return service_discovery::ServiceDiscovery::AcceptsService(serviceId) || IsProxyServiceSupported(serviceId); + } + + infra::SharedPtr ServiceDiscoveryEcho:: + StartMethod(uint32_t serviceId, uint32_t methodId, uint32_t size, const services::EchoErrorPolicy& errorPolicy) + { + if (service_discovery::ServiceDiscovery::AcceptsService(serviceId)) + return service_discovery::ServiceDiscovery::StartMethod(serviceId, methodId, size, errorPolicy); + else + return StartProxyServiceMethod(serviceId, methodId, size, errorPolicy); + } + + infra::SharedPtr ServiceDiscoveryEcho:: + StartProxyServiceMethod(uint32_t serviceId, uint32_t methodId, uint32_t size, const services::EchoErrorPolicy& errorPolicy) + { + infra::SharedPtr methodSerializer; + + auto startMethodArgs = std::tie(serviceId, methodId, size, errorPolicy); + NotifyObservers([&methodSerializer, &startMethodArgs](auto& obs) + { + if (obs.AcceptsService(std::get<0>(startMethodArgs))) + { + methodSerializer = obs.StartMethod(std::get<0>(startMethodArgs), std::get<1>(startMethodArgs), + std::get<2>(startMethodArgs), std::get<3>(startMethodArgs)); + return true; + } + + return false; + }); + + return methodSerializer; + } + + bool ServiceDiscoveryEcho::IsProxyServiceSupported(uint32_t serviceId) const + { + return services::Echo::NotifyObservers([serviceId](auto& observer) + { + return observer.AcceptsService(serviceId); + }); + } + + void ServiceDiscoveryEcho::CancelRequestSend(ServiceProxy& serviceProxy) + { + UpstreamRpc().CancelRequestSend(serviceProxy); + } + + void ServiceDiscoveryEcho::RequestSend(ServiceProxy& serviceProxy) + { + UpstreamRpc().RequestSend(serviceProxy); + } + + void ServiceDiscoveryEcho::ServiceDone() + { + UpstreamRpc().ServiceDone(); + } + + services::MethodSerializerFactory& ServiceDiscoveryEcho::SerializerFactory() + { + return UpstreamRpc().SerializerFactory(); + } + + services::Echo& ServiceDiscoveryEcho::UpstreamRpc() + { + return service_discovery::ServiceDiscovery::Rpc(); + } + + void ServiceDiscoveryEcho::RegisterObserver(infra::Observer* observer) + { + auto id = GetServiceId(*observer); + ServicesChangeNotification(id); + + services::Echo::RegisterObserver(observer); + } + + void ServiceDiscoveryEcho::UnregisterObserver(infra::Observer* observer) + { + auto id = services::ServiceIdAccess::GetId(*static_cast(static_cast(observer))); + ServicesChangeNotification(id); + + services::Echo::UnregisterObserver(observer); + } + + void ServiceDiscoveryEcho::UpdateChangedServices(uint32_t& serviceId) + { + if (!changedServices) + changedServices = std::make_pair(serviceId, serviceId); + else + changedServices = std::make_pair(std::min(changedServices->first, serviceId), std::max(changedServices->second, serviceId)); + } + + void ServiceDiscoveryEcho::ServicesChangeNotification(uint32_t serviceId) + { + if (notifyServiceChanges) + { + UpdateChangedServices(serviceId); + + if (!serviceChangeNotificationTimer.Armed()) + serviceChangeNotificationTimer.Start(std::chrono::milliseconds(500), [this] + { + if (!servicesChangedClaimer.IsQueued()) + servicesChangedClaimer.Claim([this] + { + service_discovery::ServiceDiscoveryResponseProxy::RequestSend([this] + { + ServicesChanged(changedServices->first, changedServices->second); + changedServices = infra::none; + servicesChangedClaimer.Release(); + }); + }); + }); + } + } +} diff --git a/protobuf/meta_services/ServiceDiscoveryEcho.hpp b/protobuf/meta_services/ServiceDiscoveryEcho.hpp new file mode 100644 index 000000000..69cd689b4 --- /dev/null +++ b/protobuf/meta_services/ServiceDiscoveryEcho.hpp @@ -0,0 +1,63 @@ +#ifndef PROTOBUF_ECHO_SERVICE_DISCOVERY_HPP +#define PROTOBUF_ECHO_SERVICE_DISCOVERY_HPP + +#include "generated/echo/ServiceDiscovery.pb.hpp" +#include "infra/event/ClaimableResource.hpp" +#include "infra/timer/Timer.hpp" +#include "infra/util/Optional.hpp" +#include "protobuf/echo/Echo.hpp" +#include +#include + +namespace application +{ + class ServiceDiscoveryEcho + : public service_discovery::ServiceDiscovery + , public service_discovery::ServiceDiscoveryResponseProxy + , public services::Echo + { + public: + explicit ServiceDiscoveryEcho(services::Echo& echo) + : service_discovery::ServiceDiscovery(echo) + , service_discovery::ServiceDiscoveryResponseProxy(echo) + {} + + virtual ~ServiceDiscoveryEcho() = default; + + // Implementation of services::ServiceDiscovery + void FindFirstServiceInRange(uint32_t startServiceId, uint32_t endServiceId) override; + void NotifyServiceChanges(bool value) override; + + // Implementation of services::Service + bool AcceptsService(uint32_t id) const override; + infra::SharedPtr StartMethod(uint32_t serviceId, uint32_t methodId, uint32_t size, const services::EchoErrorPolicy& errorPolicy) override; + + // Implementation of services::Echo + void RequestSend(ServiceProxy& serviceProxy) override; + void ServiceDone() override; + void CancelRequestSend(ServiceProxy& serviceProxy) override; + services::MethodSerializerFactory& SerializerFactory() override; + + private: + void RegisterObserver(infra::Observer* observer) override; + void UnregisterObserver(infra::Observer* observer) override; + + infra::Optional FirstSupportedServiceId(uint32_t startServiceId, uint32_t endServiceId); + bool IsProxyServiceSupported(uint32_t serviceId) const; + infra::SharedPtr StartProxyServiceMethod(uint32_t serviceId, uint32_t methodId, uint32_t size, const services::EchoErrorPolicy& errorPolicy); + services::Echo& UpstreamRpc(); + void UpdateChangedServices(uint32_t& serviceId); + void ServicesChangeNotification(uint32_t serviceId); + uint32_t GetServiceId(infra::Observer& observer) const; + + private: + infra::Optional> changedServices; + bool notifyServiceChanges = false; + infra::ClaimableResource serviceProxyResource; + infra::ClaimableResource::Claimer servicesChangedClaimer{ serviceProxyResource }; + infra::ClaimableResource::Claimer serviceSupportedClaimer{ serviceProxyResource }; + infra::TimerSingleShot serviceChangeNotificationTimer; + }; +} + +#endif diff --git a/protobuf/meta_services/test/CMakeLists.txt b/protobuf/meta_services/test/CMakeLists.txt new file mode 100644 index 000000000..e5fce8671 --- /dev/null +++ b/protobuf/meta_services/test/CMakeLists.txt @@ -0,0 +1,14 @@ +add_executable(protobuf.meta_services_test) +emil_build_for(protobuf.meta_services_test BOOL EMIL_BUILD_TESTS) +emil_add_test(protobuf.meta_services_test) + +target_sources(protobuf.meta_services_test PRIVATE + TestPeerServiceDiscovererEcho.cpp + TestServiceDiscoveryEcho.cpp +) + +target_link_libraries(protobuf.meta_services_test PUBLIC + gmock_main + protobuf.test_doubles + protobuf.meta_services +) diff --git a/protobuf/meta_services/test/TestPeerServiceDiscovererEcho.cpp b/protobuf/meta_services/test/TestPeerServiceDiscovererEcho.cpp new file mode 100644 index 000000000..8cad42029 --- /dev/null +++ b/protobuf/meta_services/test/TestPeerServiceDiscovererEcho.cpp @@ -0,0 +1,244 @@ +#include "echo/ServiceDiscovery.pb.hpp" +#include "infra/event/test_helper/EventDispatcherFixture.hpp" +#include "infra/util/Function.hpp" +#include "infra/util/MemoryRange.hpp" +#include "infra/util/Observer.hpp" +#include "infra/util/test_helper/MemoryRangeMatcher.hpp" +#include "protobuf/echo/test_doubles/EchoSingleLoopback.hpp" +#include "protobuf/meta_services/PeerServiceDiscoverer.hpp" +#include "gmock/gmock.h" +#include "gtest/gtest.h" +#include +#include +#include +#include +#include + +namespace +{ + class ServiceDiscoveryMock + : public service_discovery::ServiceDiscovery + { + public: + using service_discovery::ServiceDiscovery::ServiceDiscovery; + + void FindFirstServiceInRange(uint32_t startServiceId, uint32_t endServiceId) override + { + FindFirstServiceInRangeMock(startServiceId, endServiceId); + MethodDone(); + } + + void NotifyServiceChanges(bool value) override + { + NotifyServiceChangesMock(value); + MethodDone(); + } + + MOCK_METHOD(void, FindFirstServiceInRangeMock, (uint32_t, uint32_t)); + MOCK_METHOD(void, NotifyServiceChangesMock, (bool)); + }; + + class PeerServiceDiscoveryObserverMock + : public application::PeerServiceDiscoveryObserver + { + public: + using application::PeerServiceDiscoveryObserver::PeerServiceDiscoveryObserver; + + MOCK_METHOD(void, ServicesDiscovered, (infra::MemoryRange), (override)); + }; +} + +class PeerServiceDiscovererTest + : public testing::Test + , public infra::EventDispatcherFixture +{ +public: + services::MethodSerializerFactory::ForServices::AndProxies + serializerFactory; + + application::EchoSingleLoopback echo{ serializerFactory }; + + service_discovery::ServiceDiscoveryResponseProxy serviceDiscoveryResponse{ echo }; + testing::StrictMock serviceDiscovery{ echo }; + + infra::Execute execute{ [this]() + { + StartInitialServiceDiscovery(); + } }; + application::PeerServiceDiscovererEcho discoverer{ echo }; + testing::StrictMock observer{ discoverer }; + + void NoServiceSupported() + { + serviceDiscoveryResponse.RequestSend([this] + { + serviceDiscoveryResponse.NoServiceSupported(); + }); + } + + void ServicesDiscovered(const std::vector& services) + { + EXPECT_CALL(observer, ServicesDiscovered(infra::ContentsEqual(services))); + } + + void ServiceSupported(uint32_t id) + { + serviceDiscoveryResponse.RequestSend([this, id] + { + serviceDiscoveryResponse.FirstServiceSupported(id); + }); + } + + void ServicesChanged(uint32_t startServiceId, uint32_t endServiceId) + { + serviceDiscoveryResponse.RequestSend([this, startServiceId, endServiceId] + { + serviceDiscoveryResponse.ServicesChanged(startServiceId, endServiceId); + }); + } + + void StartInitialServiceDiscovery() + { + EXPECT_CALL(serviceDiscovery, NotifyServiceChangesMock(true)); + EXPECT_CALL(serviceDiscovery, FindFirstServiceInRangeMock(0, std::numeric_limits::max())); + } + + void QueryServices(const std::vector& services, uint32_t searchEnd = std::numeric_limits::max()) + { + auto orderedServices = services; + std::sort(orderedServices.begin(), orderedServices.end()); + + for (auto service : orderedServices) + { + if (service + 1 <= searchEnd) + EXPECT_CALL(serviceDiscovery, FindFirstServiceInRangeMock(service + 1, searchEnd)); + ServiceSupported(service); + } + + if (services.empty() || services.back() != searchEnd) + NoServiceSupported(); + } + + void OneCompleteRoundOfServiceDiscovery(const std::vector& services) + { + EXPECT_CALL(observer, ServicesDiscovered(infra::ContentsEqual(services))); + QueryServices(services); + } + + auto FindAddedAndRemovedServices(const std::vector& oldServiceList, const std::vector& newServiceList) + { + std::set oldSet(oldServiceList.begin(), oldServiceList.end()); + std::set newSet(newServiceList.begin(), newServiceList.end()); + + std::set addedServices; + std::set removedServices; + + std::set_difference(newSet.begin(), newSet.end(), oldSet.begin(), oldSet.end(), std::inserter(addedServices, addedServices.end())); + std::set_difference(oldSet.begin(), oldSet.end(), newSet.begin(), newSet.end(), std::inserter(removedServices, removedServices.end())); + + std::vector addedServicesVector(addedServices.begin(), addedServices.end()); + std::vector removedServicesVector(removedServices.begin(), removedServices.end()); + + return std::make_tuple(addedServicesVector, removedServicesVector); + } + + auto FindMaximalRange(const std::vector& range1, const std::vector& range2) + { + if (range1.empty()) + return std::make_tuple(*range2.begin(), *range2.rbegin()); + else if (range2.empty()) + return std::make_tuple(*range1.begin(), *range1.rbegin()); + else + return std::make_tuple(std::min(*range1.begin(), *range2.begin()), std::max(*range1.rbegin(), *range2.rbegin())); + } + + auto FindServicesInRange(const std::vector& newServiceList, const std::tuple& updatedRange) + { + const auto searchBegin = std::get<0>(updatedRange); + const auto searchEnd = std::get<1>(updatedRange); + + std::vector filteredServices; + for (const auto& service : newServiceList) + { + if (service >= searchBegin && service <= searchEnd) + { + filteredServices.push_back(service); + } + } + + std::sort(filteredServices.begin(), filteredServices.end()); + return filteredServices; + } + + void UpdateServices(const std::vector& oldServiceList, const std::vector& newServiceList) + { + auto [addedServices, removedServices] = FindAddedAndRemovedServices(oldServiceList, newServiceList); + + auto updatedRange = FindMaximalRange(addedServices, removedServices); + auto searchBegin = std::get<0>(updatedRange); + auto searchEnd = std::get<1>(updatedRange); + + EXPECT_CALL(serviceDiscovery, FindFirstServiceInRangeMock(searchBegin, searchEnd)); + ServicesChanged(searchBegin, searchEnd); + + auto filteredServices = FindServicesInRange(newServiceList, updatedRange); + EXPECT_CALL(observer, ServicesDiscovered(infra::ContentsEqual(newServiceList))); + QueryServices(filteredServices, searchEnd); + } +}; + +TEST_F(PeerServiceDiscovererTest, immediate_NoServiceSupported_ends_discovery_with_no_services) +{ + const std::vector services; + OneCompleteRoundOfServiceDiscovery(services); +} + +TEST_F(PeerServiceDiscovererTest, NoServiceSupported_ends_discovery_with_single_service) +{ + std::vector services{ 5 }; + OneCompleteRoundOfServiceDiscovery(services); +} + +TEST_F(PeerServiceDiscovererTest, NoServiceSupported_ends_discovery_with_two_services) +{ + std::vector services{ 5, 10 }; + OneCompleteRoundOfServiceDiscovery(services); +} + +TEST_F(PeerServiceDiscovererTest, ServicesChanged_triggers_rediscovery) +{ + std::vector services{ 5, 10 }; + OneCompleteRoundOfServiceDiscovery(services); + + EXPECT_CALL(serviceDiscovery, FindFirstServiceInRangeMock(0, 0)); + + ServicesChanged(0, 0); + + EXPECT_CALL(observer, ServicesDiscovered(infra::ContentsEqual(services))); + NoServiceSupported(); + + EXPECT_CALL(serviceDiscovery, FindFirstServiceInRangeMock(5, 5)); + ServicesChanged(5, 5); + + std::vector servicesUpdated{ 10 }; + EXPECT_CALL(observer, ServicesDiscovered(infra::ContentsEqual(servicesUpdated))); + NoServiceSupported(); + + EXPECT_CALL(serviceDiscovery, FindFirstServiceInRangeMock(10, 10)); + ServicesChanged(10, 10); + + std::vector servicesUpdated2; + EXPECT_CALL(observer, ServicesDiscovered(infra::ContentsEqual(servicesUpdated2))); + NoServiceSupported(); +} + +TEST_F(PeerServiceDiscovererTest, ServicesChanged_triggers_rediscovery_2) +{ + std::vector services{ 1, 3, 5 }; + OneCompleteRoundOfServiceDiscovery(services); + + std::vector newServices{ 3, 4, 5, 6 }; + UpdateServices(services, newServices); +} diff --git a/protobuf/meta_services/test/TestServiceDiscoveryEcho.cpp b/protobuf/meta_services/test/TestServiceDiscoveryEcho.cpp new file mode 100644 index 000000000..678a4e49a --- /dev/null +++ b/protobuf/meta_services/test/TestServiceDiscoveryEcho.cpp @@ -0,0 +1,261 @@ +#include "echo/ServiceDiscovery.pb.hpp" +#include "infra/event/test_helper/EventDispatcherFixture.hpp" +#include "infra/util/Optional.hpp" +#include "protobuf/echo/Echo.hpp" +#include "protobuf/echo/test_doubles/EchoSingleLoopback.hpp" +#include "protobuf/echo/test_doubles/ServiceStub.hpp" +#include "protobuf/meta_services/ServiceDiscoveryEcho.hpp" +#include "gmock/gmock.h" +#include "gtest/gtest.h" +#include +#include +#include + +namespace +{ + class RequestBlockingEchoDecorator + : public services::Echo + { + public: + RequestBlockingEchoDecorator(services::Echo& echo) + : echo(echo) + {} + + void RegisterObserver(infra::Observer* observer) override + { + observer->Attach(echo); + } + + void BlockRequest(bool block) + { + blocked = block; + + if (!blocked && serviceProxy) + { + echo.RequestSend(**serviceProxy); + serviceProxy = infra::none; + } + } + + void RequestSend(services::ServiceProxy& serviceProxy) override + { + if (blocked) + this->serviceProxy = &serviceProxy; + else + echo.RequestSend(serviceProxy); + } + + void ServiceDone() override + { + echo.ServiceDone(); + } + + void CancelRequestSend(services::ServiceProxy& serviceProxy) override + { + echo.CancelRequestSend(serviceProxy); + } + + services::MethodSerializerFactory& SerializerFactory() override + { + return echo.SerializerFactory(); + } + + private: + services::Echo& echo; + infra::Optional serviceProxy; + bool blocked = false; + }; + + class ServiceDiscoveryResponseMock + : public service_discovery::ServiceDiscoveryResponse + { + public: + using service_discovery::ServiceDiscoveryResponse::ServiceDiscoveryResponse; + virtual ~ServiceDiscoveryResponseMock() = default; + + void FirstServiceSupported(uint32_t value) override + { + FirstServiceSupportedMock(value); + MethodDone(); + } + + void NoServiceSupported() override + { + NoServiceSupportedMock(); + MethodDone(); + } + + void ServicesChanged(uint32_t startServiceId, uint32_t endServiceId) override + { + ServicesChangedMock(startServiceId, endServiceId); + MethodDone(); + } + + MOCK_METHOD(void, FirstServiceSupportedMock, (uint32_t serviceId)); + MOCK_METHOD(void, NoServiceSupportedMock, ()); + MOCK_METHOD(void, ServicesChangedMock, (uint32_t startServiceId, uint32_t endServiceId)); + }; +}; + +class ServiceDiscoveryTest + : public testing::Test + , public infra::EventDispatcherFixture +{ +public: + services::MethodSerializerFactory::ForServices::AndProxies serializerFactory; + application::EchoSingleLoopback echo{ serializerFactory }; + service_discovery::ServiceDiscoveryProxy proxy{ echo }; + testing::StrictMock serviceDiscoveryResponse{ echo }; + + RequestBlockingEchoDecorator echoBlocking{ echo }; + application::ServiceDiscoveryEcho serviceDiscoveryEcho{ echoBlocking }; + + void FindFirstServiceInRange(uint32_t startServiceId, uint32_t endServiceId) + { + auto serviceRange = std::tie(startServiceId, endServiceId); + proxy.RequestSend([this, &serviceRange] + { + proxy.FindFirstServiceInRange(std::get<0>(serviceRange), std::get<1>(serviceRange)); + }); + } + + void NotifyServiceChanges(bool allow) + { + proxy.RequestSend([this, allow] + { + proxy.NotifyServiceChanges(allow); + }); + } +}; + +TEST_F(ServiceDiscoveryTest, return_no_service) +{ + services::ServiceStub service5{ serviceDiscoveryEcho, 5 }; + + EXPECT_CALL(serviceDiscoveryResponse, NoServiceSupportedMock); + + FindFirstServiceInRange(0, 4); +} + +TEST_F(ServiceDiscoveryTest, return_service) +{ + services::ServiceStub service5{ serviceDiscoveryEcho, 5 }; + + EXPECT_CALL(serviceDiscoveryResponse, FirstServiceSupportedMock(5)); + + FindFirstServiceInRange(0, 15); +} + +TEST_F(ServiceDiscoveryTest, return_service_with_lowest_id) +{ + services::ServiceStub service5{ serviceDiscoveryEcho, 5 }; + services::ServiceStub service6{ serviceDiscoveryEcho, 6 }; + + EXPECT_CALL(serviceDiscoveryResponse, FirstServiceSupportedMock(5)); + + FindFirstServiceInRange(0, 15); +} + +TEST_F(ServiceDiscoveryTest, notify_service_change) +{ + NotifyServiceChanges(true); + + EXPECT_CALL(serviceDiscoveryResponse, ServicesChangedMock(5, 5)); + infra::Optional service5(infra::inPlace, serviceDiscoveryEcho, 5); + + EXPECT_CALL(serviceDiscoveryResponse, ServicesChangedMock(5, 5)); + service5 = infra::none; + + NotifyServiceChanges(false); + + service5.Emplace(serviceDiscoveryEcho, 5); +} + +TEST_F(ServiceDiscoveryTest, notify_simultaneous_service_changes) +{ + NotifyServiceChanges(true); + + echoBlocking.BlockRequest(true); + services::ServiceStub service5(serviceDiscoveryEcho, 5); + services::ServiceStub service15(serviceDiscoveryEcho, 15); + + EXPECT_CALL(serviceDiscoveryResponse, ServicesChangedMock(5, 15)); + echoBlocking.BlockRequest(false); + + echoBlocking.BlockRequest(true); + services::ServiceStub service7(serviceDiscoveryEcho, 7); + services::ServiceStub service12(serviceDiscoveryEcho, 12); + + EXPECT_CALL(serviceDiscoveryResponse, ServicesChangedMock(7, 12)); + echoBlocking.BlockRequest(false); + + NotifyServiceChanges(false); +} + +TEST_F(ServiceDiscoveryTest, notify_service_changes_only_after_NotifyServiceChanges) +{ + echoBlocking.BlockRequest(true); + + NotifyServiceChanges(true); + services::ServiceStub service5(serviceDiscoveryEcho, 5); + + NotifyServiceChanges(false); + services::ServiceStub service15(serviceDiscoveryEcho, 15); + + NotifyServiceChanges(true); + services::ServiceStub service25(serviceDiscoveryEcho, 25); + + EXPECT_CALL(serviceDiscoveryResponse, ServicesChangedMock(25, 25)); + echoBlocking.BlockRequest(false); + + NotifyServiceChanges(false); +} + +TEST_F(ServiceDiscoveryTest, find_own_service_id) +{ + EXPECT_CALL(serviceDiscoveryResponse, FirstServiceSupportedMock(service_discovery::ServiceDiscovery::serviceId)); + + FindFirstServiceInRange(service_discovery::ServiceDiscovery::serviceId, service_discovery::ServiceDiscovery::serviceId); +} + +TEST_F(ServiceDiscoveryTest, start_proxy_service_method) +{ + services::ServiceStub service1{ serviceDiscoveryEcho, 1 }; + services::ServiceStubProxy service1Proxy{ echo, 1 }; + + services::ServiceStub service2{ serviceDiscoveryEcho, 2 }; + services::ServiceStubProxy service2Proxy{ echo, 2 }; + + EXPECT_CALL(service1, Method(11)).WillOnce(testing::InvokeWithoutArgs([&service1] + { + service1.MethodDone(); + })); + + service1Proxy.RequestSend([&service1Proxy] + { + service1Proxy.Method(11); + }); + + EXPECT_CALL(service2, Method(22)); + service2Proxy.RequestSend([&service2Proxy] + { + service2Proxy.Method(22); + }); +} + +TEST_F(ServiceDiscoveryTest, forward_methods_only_to_first_matching_proxy_service) +{ + services::ServiceStub service1{ serviceDiscoveryEcho, 1 }; + services::ServiceStub service1_{ serviceDiscoveryEcho, 1 }; + services::ServiceStubProxy service1Proxy{ echo, 1 }; + + EXPECT_CALL(service1, Method(11)).WillOnce(testing::InvokeWithoutArgs([&service1] + { + service1.MethodDone(); + })); + + service1Proxy.RequestSend([&service1Proxy] + { + service1Proxy.Method(11); + }); +} diff --git a/protobuf/protoc_echo_plugin/ProtoCEchoPlugin.cpp b/protobuf/protoc_echo_plugin/ProtoCEchoPlugin.cpp index 8690cf804..8c89e8e37 100644 --- a/protobuf/protoc_echo_plugin/ProtoCEchoPlugin.cpp +++ b/protobuf/protoc_echo_plugin/ProtoCEchoPlugin.cpp @@ -6,6 +6,7 @@ #include "google/protobuf/stubs/strutil.h" #include "infra/syntax/ProtoFormatter.hpp" #include +#include namespace application { @@ -1187,7 +1188,7 @@ namespace application auto constructors = std::make_shared("public"); auto constructor = std::make_shared(service->name, "", 0); constructor->Parameter("services::Echo& echo"); - constructor->Initializer("services::Service(echo)"); + constructor->Initializer(std::string("services::Service(echo, ").append(google::protobuf::SimpleItoa(service->serviceId)).append(")")); constructors->Add(constructor); serviceFormatter->Add(constructors); @@ -1226,7 +1227,7 @@ namespace application auto acceptsService = std::make_shared("AcceptsService", AcceptsServiceBody(), "bool", Function::fConst | Function::fOverride); acceptsService->Parameter("uint32_t id"); - functions->Add(acceptsService); + // functions->Add(acceptsService); auto startMethod = std::make_shared("StartMethod", StartMethodBody(), "infra::SharedPtr", Function::fOverride); startMethod->Parameter("uint32_t serviceId"); diff --git a/services/echo_console/CMakeLists.txt b/services/echo_console/CMakeLists.txt index c8745b3e9..cd697053f 100644 --- a/services/echo_console/CMakeLists.txt +++ b/services/echo_console/CMakeLists.txt @@ -9,11 +9,51 @@ if (EMIL_HOST_BUILD AND EMIL_INCLUDE_MBEDTLS) protobuf.echo services.network_instantiations services.util + protobuf.meta_services ) target_sources(services.echo_console PRIVATE Console.cpp Console.hpp Main.cpp + ) + + add_subdirectory(test) + + add_library(services.console_service ${EMIL_EXCLUDE_FROM_ALL} STATIC) + + target_include_directories(services.console_service PUBLIC + "$" + "$" + ) + + target_sources(services.console_service PRIVATE + ConsoleService.cpp + ConsoleService.hpp + ) + + add_executable(services.echo_test_tcp_server ${EMIL_EXCLUDE_FROM_ALL}) + emil_install(services.echo_test_tcp_server DESTINATION bin) + + target_link_libraries(services.echo_test_tcp_server PUBLIC + args + hal.generic + protobuf.protoc_echo_plugin_lib + protobuf.echo + services.network_instantiations + services.util + protobuf.meta_services + ) + + if (EMIL_BUILD_WIN) + target_link_libraries(services.echo_test_tcp_server PRIVATE hal.windows) + endif() + + if (EMIL_BUILD_UNIX OR EMIL_BUILD_DARWIN) + target_link_libraries(services.echo_test_tcp_server PRIVATE hal.unix) + endif() + + target_sources(services.echo_test_tcp_server PRIVATE + MainTestServer.cpp ) endif() diff --git a/services/echo_console/Console.cpp b/services/echo_console/Console.cpp index 92fcba122..0ab277229 100644 --- a/services/echo_console/Console.cpp +++ b/services/echo_console/Console.cpp @@ -358,6 +358,12 @@ namespace application }) {} + void Console::ServicesDiscovered(infra::MemoryRange services) + { + std::cout << "Services discovered"; + discoveredServices.assign(services.begin(), services.end()); + } + void Console::Run() { { @@ -638,13 +644,16 @@ namespace application { for (const auto& service : root.services) { - services::GlobalTracer().Trace() << service->name; - for (const auto& method : service->methods) + if (std::find(discoveredServices.begin(), discoveredServices.end(), service->serviceId) != discoveredServices.end()) { - services::GlobalTracer().Trace() << " " << method.name << "("; - if (method.parameter != nullptr) - ListFields(*method.parameter); - services::GlobalTracer().Continue() << ")"; + services::GlobalTracer().Trace() << service->name; + for (const auto& method : service->methods) + { + services::GlobalTracer().Trace() << " " << method.name << "("; + if (method.parameter != nullptr) + ListFields(*method.parameter); + services::GlobalTracer().Continue() << ")"; + } } } @@ -822,12 +831,10 @@ namespace application std::pair, const EchoMethod&> Console::SearchMethod(MethodInvocation& methodInvocation) const { for (auto service : root.services) - { if (methodInvocation.method.size() == 1 || methodInvocation.method.front() == service->name) for (auto& method : service->methods) - if (method.name == methodInvocation.method.back()) + if (methodInvocation.method.back() == method.name) return std::pair, const EchoMethod&>(service, method); - } throw ConsoleExceptions::MethodNotFound{ methodInvocation.method }; } diff --git a/services/echo_console/Console.hpp b/services/echo_console/Console.hpp index 9a6425deb..2d73cd24c 100644 --- a/services/echo_console/Console.hpp +++ b/services/echo_console/Console.hpp @@ -4,9 +4,12 @@ #include "hal/generic/TimerServiceGeneric.hpp" #include "infra/syntax/ProtoFormatter.hpp" #include "infra/syntax/ProtoParser.hpp" +#include "protobuf/meta_services/PeerServiceDiscoverer.hpp" #include "protobuf/protoc_echo_plugin/EchoObjects.hpp" #include "services/network_instantiations/NetworkAdapter.hpp" +#include #include +#include namespace application { @@ -192,6 +195,7 @@ namespace application services::ConnectionFactory& ConnectionFactory(); services::NameResolver& NameResolver(); void DataReceived(infra::StreamReader& reader); + void ServicesDiscovered(infra::MemoryRange services); private: struct Empty @@ -253,6 +257,7 @@ namespace application std::condition_variable condition; bool processDone = false; std::string receivedData; + std::vector discoveredServices; }; namespace ConsoleExceptions diff --git a/services/echo_console/ConsoleService.cpp b/services/echo_console/ConsoleService.cpp new file mode 100644 index 000000000..e69de29bb diff --git a/services/echo_console/ConsoleService.hpp b/services/echo_console/ConsoleService.hpp new file mode 100644 index 000000000..baa451d22 --- /dev/null +++ b/services/echo_console/ConsoleService.hpp @@ -0,0 +1,113 @@ +#ifndef SERVICES_CONSOLE_SERVICE_HPP +#define SERVICES_CONSOLE_SERVICE_HPP + +#include "infra/stream/StringInputStream.hpp" +#include "infra/util/SharedOptional.hpp" +#include "infra/util/SharedPtr.hpp" +#include "protobuf/echo/Echo.hpp" +#include "services/echo_console/Console.hpp" +#include +#include + +namespace services +{ + class GenericMethodDeserializer + : public MethodDeserializer + { + public: + void MethodContents(infra::SharedPtr&& reader) override + { + int a = 3; + a++; + } + + void ExecuteMethod() override + { + int a = 3; + a++; + } + + bool Failed() const override + { + return false; + } + }; + + class ConsoleServiceMethodExecute + { + public: + ConsoleServiceMethodExecute(application::Console& console) + : console(console) + {} + + infra::SharedPtr StartMethod(uint32_t serviceId, uint32_t methodId, uint32_t size, const EchoErrorPolicy& errorPolicy) + { + return methodDeserializer.Emplace(); + } + + private: + application::Console& console; + infra::SharedOptional methodDeserializer; + }; + + class ConsoleService + : public Service + { + public: + ConsoleService(Echo& echo, uint32_t serviceId, ConsoleServiceMethodExecute& methodExecute) + : Service(echo, serviceId) + , methodExecute(methodExecute) + {} + + infra::SharedPtr StartMethod(uint32_t serviceId, uint32_t methodId, uint32_t size, const EchoErrorPolicy& errorPolicy) override + { + return methodExecute.StartMethod(serviceId, methodId, size, errorPolicy); + } + + private: + ConsoleServiceMethodExecute& methodExecute; + }; + + class GenericMethodSerializer + : public MethodSerializer + { + public: + GenericMethodSerializer(infra::SharedPtr& inputStream) + : inputStream(inputStream) + {} + + bool Serialize(infra::SharedPtr&& writer) override + { + infra::DataOutputStream::WithErrorPolicy stream(*writer, infra::softFail); + + stream << inputStream->ContiguousRange(stream.Available()); + + auto partlySent = stream.Failed() || inputStream->Available(); + writer = nullptr; + + return partlySent; + } + + private: + infra::SharedPtr inputStream; + }; + + class ConsoleServiceProxy + : public ServiceProxy + { + public: + ConsoleServiceProxy(Echo& echo, uint32_t maxMessageSize = 1000) + : ServiceProxy(echo, maxMessageSize) + {} + + void SendMessage(infra::SharedPtr& inputStream) + { + auto serializer = methodSerializer.Emplace(inputStream); + SetSerializer(serializer); + } + + private: + infra::SharedOptional methodSerializer; + }; +} +#endif // CONSOLESERVICE_HPP diff --git a/services/echo_console/Main.cpp b/services/echo_console/Main.cpp index 69084eb2d..d9d719c7c 100644 --- a/services/echo_console/Main.cpp +++ b/services/echo_console/Main.cpp @@ -1,5 +1,25 @@ #include "args.hxx" +#include "echo/TracingServiceDiscovery.pb.hpp" #include "hal/generic/SynchronousRandomDataGeneratorGeneric.hpp" +#include "infra/stream/StdStringInputStream.hpp" +#include "infra/stream/StreamErrorPolicy.hpp" +#include "infra/stream/StringInputStream.hpp" +#include "infra/util/Optional.hpp" +#include "infra/util/SharedOptional.hpp" +#include "protobuf/echo/Serialization.hpp" +#include "protobuf/meta_services/PeerServiceDiscoverer.hpp" +#include "services/echo_console/ConsoleService.hpp" +#include "services/network/EchoOnConnection.hpp" +#include "services/network/TracingEchoOnConnection.hpp" +#include "services/tracer/Tracer.hpp" +#include +#include +#include +#ifdef EMIL_HAL_WINDOWS +#include "hal/windows/UartWindows.hpp" +#else +#include "hal/unix/UartUnix.hpp" +#endif #include "hal/generic/UartGeneric.hpp" #include "infra/stream/IoOutputStream.hpp" #include "infra/syntax/Json.hpp" @@ -12,6 +32,7 @@ #include "services/util/SesameCobs.hpp" #include "services/util/SesameWindowed.hpp" #include +#include #include #include @@ -85,70 +106,139 @@ void ConsoleClientUart::CheckDataToBeSent() } class ConsoleClientConnection - : public services::ConnectionObserver + : private services::MethodSerializerFactory::OnHeap + , public services::TracingEchoOnConnection , public application::ConsoleObserver { public: - explicit ConsoleClientConnection(application::Console& console); + explicit ConsoleClientConnection(application::Console& console, services::Tracer& tracer); - // Implementation of ConnectionObserver - void SendStreamAvailable(infra::SharedPtr&& writer) override; - void DataReceived() override; + ~ConsoleClientConnection(); + + // ConnectionObserver void Attached() override; // Implementation of ConsoleObserver void Send(const std::string& message) override; +private: + class PeerServiceDiscoveryObserverTracer + : public application::PeerServiceDiscoveryObserver + { + public: + PeerServiceDiscoveryObserverTracer(application::PeerServiceDiscovererEcho& subject, services::Tracer& tracer) + : PeerServiceDiscoveryObserver(subject) + , tracer(tracer) + {} + + // Implementation of PeerServiceDiscoveryObserver + void ServicesDiscovered(infra::MemoryRange services) override + { + tracer.Trace() << "Services discovered: "; + for (auto service : services) + tracer.Continue() << service << " "; + } + + private: + services::Tracer& tracer; + }; + + class PeerServiceDiscoveryConsoleInteractor + : public application::PeerServiceDiscoveryObserver + { + public: + PeerServiceDiscoveryConsoleInteractor(application::PeerServiceDiscovererEcho& subject, application::Console& console) + : PeerServiceDiscoveryObserver(subject) + , console(console) + {} + + // Implementation of PeerServiceDiscoveryObserver + void ServicesDiscovered(infra::MemoryRange services) override + { + console.ServicesDiscovered(services); + } + + private: + application::Console& console; + }; + private: void CheckDataToBeSent(); + void SendAsEcho(); private: - std::string dataToBeSent; + services::Tracer& tracer; + std::queue dataQueue; + infra::NotifyingSharedOptional reader; infra::SharedPtr writer; + service_discovery::ServiceDiscoveryTracer serviceDiscoveryTracer; + service_discovery::ServiceDiscoveryResponseTracer serviceDiscoveryResponseTracer; + infra::Optional peerServiceDiscoverer; + infra::Optional peerServiceDiscoveryObserverTracer; + infra::Optional peerServiceDiscoveryConsoleInteractor; + infra::Optional consoleServiceProxy; }; -ConsoleClientConnection::ConsoleClientConnection(application::Console& console) +ConsoleClientConnection::ConsoleClientConnection(application::Console& console, services::Tracer& tracer) : application::ConsoleObserver(console) -{} - -void ConsoleClientConnection::SendStreamAvailable(infra::SharedPtr&& writer) + , services::TracingEchoOnConnection(*static_cast(this), services::echoErrorPolicyAbortOnMessageFormatError, tracer) + , serviceDiscoveryTracer(*this) + , serviceDiscoveryResponseTracer(*this) + , tracer(tracer) { - this->writer = writer; - writer = nullptr; - - CheckDataToBeSent(); + tracer.Trace() << "ConsoleClientConnection"; } -void ConsoleClientConnection::DataReceived() +ConsoleClientConnection::~ConsoleClientConnection() { - auto stream = services::ConnectionObserver::Subject().ReceiveStream(); - ConsoleObserver::Subject().DataReceived(*stream); - services::ConnectionObserver::Subject().AckReceived(); + tracer.Trace() << "~ConsoleClientConnection"; } void ConsoleClientConnection::Attached() { - services::ConnectionObserver::Subject().RequestSendStream(services::ConnectionObserver::Subject().MaxSendStreamSize()); + peerServiceDiscoverer.Emplace(*this); + peerServiceDiscoveryObserverTracer.Emplace(*peerServiceDiscoverer, tracer); + peerServiceDiscoveryConsoleInteractor.Emplace(*peerServiceDiscoverer, application::ConsoleObserver::Subject()); + consoleServiceProxy.Emplace(*this); } void ConsoleClientConnection::Send(const std::string& message) { - dataToBeSent += message; - CheckDataToBeSent(); + // Do we need to know the serviceid, methodid and treat them differently or do we just send strings back to back? + dataQueue.push(message); + SendAsEcho(); } void ConsoleClientConnection::CheckDataToBeSent() { - if (writer != nullptr && !dataToBeSent.empty()) - { - infra::TextOutputStream::WithErrorPolicy stream(*writer); - std::size_t amount = std::min(stream.Available(), dataToBeSent.size()); - stream << dataToBeSent.substr(0, amount); - dataToBeSent.erase(0, amount); + // if (writer != nullptr && !dataToBeSent.empty()) + // { + // infra::TextOutputStream::WithErrorPolicy stream(*writer); + // std::size_t amount = std::min(stream.Available(), dataToBeSent.size()); + // stream << dataToBeSent.substr(0, amount); + // dataToBeSent.erase(0, amount); + + // writer = nullptr; + // services::ConnectionObserver::Subject().RequestSendStream(services::ConnectionObserver::Subject().MaxSendStreamSize()); + // } +} - writer = nullptr; - services::ConnectionObserver::Subject().RequestSendStream(services::ConnectionObserver::Subject().MaxSendStreamSize()); - } +void ConsoleClientConnection::SendAsEcho() +{ + if (!consoleServiceProxy) + throw std::runtime_error("No console service proxy available"); + + if (!dataQueue.empty() && reader.Allocatable()) + consoleServiceProxy->RequestSend([this]() + { + auto inputReader = reader.Emplace(dataQueue.front(), infra::softFail); + reader.OnAllocatable([this] + { + dataQueue.pop(); + SendAsEcho(); + }); + consoleServiceProxy->SendMessage(inputReader); + }); } class ConsoleClientTcp @@ -185,6 +275,8 @@ ConsoleClientTcp::ConsoleClientTcp(services::ConnectionFactoryWithNameResolver& ConsoleClientTcp::~ConsoleClientTcp() { + tracer.Trace() << "~ConsoleClientTcp"; + if (!!consoleClientConnection) consoleClientConnection->services::ConnectionObserver::Subject().AbortAndDestroy(); } @@ -203,7 +295,7 @@ void ConsoleClientTcp::ConnectionEstablished(infra::AutoResetFunction pbFiles; + + for (const fs::path path : paths) + { + if (fs::is_directory(path)) + for (const fs::directory_entry& entry : fs::recursive_directory_iterator(path)) + { + auto path = entry.path(); + + if (path.extension() == ".pb" && path.parent_path().string().rfind("/echo") != std::string::npos) + pbFiles.push_back(path); + } + else if (path.extension() == ".pb") + pbFiles.push_back(path); + } + + for (const auto& path : pbFiles) { std::ifstream stream(path, std::ios::binary); if (!stream) diff --git a/services/echo_console/MainTestServer.cpp b/services/echo_console/MainTestServer.cpp new file mode 100644 index 000000000..daab5272d --- /dev/null +++ b/services/echo_console/MainTestServer.cpp @@ -0,0 +1,73 @@ +#ifndef MAIN_ECHO_TEST_TCP_SERVER_HPP +#define MAIN_ECHO_TEST_TCP_SERVER_HPP + +#include "echo/TracingServiceDiscovery.pb.hpp" +#include "infra/stream/IoOutputStream.hpp" +#include "infra/util/SharedPtr.hpp" +#include "protobuf/echo/Serialization.hpp" +#include "protobuf/meta_services/ServiceDiscoveryEcho.hpp" +#include "services/echo_console/Console.hpp" +#include "services/network/Connection.hpp" +#include "services/network/TracingEchoOnConnection.hpp" +#include "services/tracer/GlobalTracer.hpp" + +class EchoServerConnectionObserver + : private services::MethodSerializerFactory::OnHeap + , public services::TracingEchoOnConnection +{ +public: + EchoServerConnectionObserver(services::Tracer& tracer); + +private: + service_discovery::ServiceDiscoveryTracer serviceDiscoveryTracer; + service_discovery::ServiceDiscoveryResponseTracer serviceDiscoveryResponseTracer; + application::ServiceDiscoveryEcho serviceDiscoveryEcho; +}; + +EchoServerConnectionObserver::EchoServerConnectionObserver(services::Tracer& tracer) + : services::TracingEchoOnConnection(tracer, *this) + , serviceDiscoveryTracer(*this) + , serviceDiscoveryResponseTracer(*this) + , serviceDiscoveryEcho(*this){}; + +class EchoServerConnection + : public services::ServerConnectionObserverFactory +{ +public: + EchoServerConnection(services::ConnectionFactory& connectionFactory, services::Tracer& tracer); + + void ConnectionAccepted(infra::AutoResetFunction connectionObserver)>&& createdObserver, services::IPAddress address) override; + +private: + infra::SharedPtr listenPort; + infra::SharedOptional observer; + services::Tracer& tracer; +}; + +EchoServerConnection::EchoServerConnection(services::ConnectionFactory& connectionFactory, services::Tracer& tracer) + : listenPort(connectionFactory.Listen(1234, *this)) + , tracer(tracer) +{} + +void EchoServerConnection::ConnectionAccepted(infra::AutoResetFunction connectionObserver)>&& createdObserver, services::IPAddress address) +{ + tracer.Trace() << "Connection accepted"; + tracer.Trace(); + createdObserver(observer.Emplace(tracer)); +} + +int main(int argc, char* argv[], const char* env[]) +{ + infra::IoOutputStream ioOutputStream; + services::Tracer tracer(ioOutputStream); + services::SetGlobalTracerInstance(tracer); + + tracer.Trace() << "Starting echo server"; + + main_::NetworkAdapter network; + EchoServerConnection echoServerConnection(network.ConnectionFactory(), tracer); + + network.Run(); +} + +#endif // MAIN_ECHO_TEST_TCP_SERVER_HPP diff --git a/services/echo_console/test/CMakeLists.txt b/services/echo_console/test/CMakeLists.txt new file mode 100644 index 000000000..c557f070c --- /dev/null +++ b/services/echo_console/test/CMakeLists.txt @@ -0,0 +1,17 @@ +add_executable(services.echo_console_test) +emil_build_for(services.echo_console_test BOOL EMIL_BUILD_TESTS) +emil_add_test(services.echo_console_test) + +target_sources(services.echo_console_test PRIVATE + TestConsoleService.cpp +) + +target_link_libraries(services.echo_console_test PUBLIC + gmock_main + services.console_service + protobuf.protoc_echo_plugin_lib + protobuf.echo + services.network_instantiations + services.util + protobuf.meta_services +) diff --git a/services/echo_console/test/TestConsoleService.cpp b/services/echo_console/test/TestConsoleService.cpp new file mode 100644 index 000000000..26e9ff7b6 --- /dev/null +++ b/services/echo_console/test/TestConsoleService.cpp @@ -0,0 +1,15 @@ +#include "protobuf/echo/test_doubles/EchoSingleLoopback.hpp" +#include "services/echo_console/ConsoleService.hpp" +#include "gtest/gtest.h" + +class ConsoleServiceTest + : public testing::Test +{ +public: + services::MethodSerializerFactory::ForServices::AndProxies + serializerFactory; + + application::EchoSingleLoopback echo{}; +};