diff --git a/.azure/templates/build-test.yml b/.azure/templates/build-test.yml index 84bd7973..18530cb6 100644 --- a/.azure/templates/build-test.yml +++ b/.azure/templates/build-test.yml @@ -27,27 +27,21 @@ steps: echo "###vso[task.setvariable variable=build_type;]Debug" name: setup - bash: | - echo "###vso[task.setvariable variable=pip_cache;]${HOME}/.cache/pip" - echo "###vso[task.setvariable variable=conan_cache;]${HOME}/.conan/data" echo "###vso[task.setvariable variable=PATH;]$(python3 -m site --user-base)/bin:${PATH}" echo "###vso[task.setvariable variable=build_tool_options;]-j 4" - sudo apt-get install -y clang clang-tools clang-tidy + sudo apt install -y clang clang-tools clang-tidy libboost-dev condition: eq(variables['Agent.OS'], 'Linux') name: setup_linux - bash: | - echo "###vso[task.setvariable variable=pip_cache;]${HOME}/Library/Caches/pip" - echo "###vso[task.setvariable variable=conan_cache;]${HOME}/.conan/data" echo "###vso[task.setvariable variable=PATH;]$(python3 -m site --user-base)/bin:${PATH}" echo "###vso[task.setvariable variable=build_tool_options;]-j 4" sudo /usr/libexec/ApplicationFirewall/socketfilterfw --setglobalstate off - brew install bison + brew install googletest condition: eq(variables['Agent.OS'], 'Darwin') name: setup_macos # Use PowerShell rather than Bash to ensure Windows-style paths - pwsh: | $python_bin = python -m site --user-base - Write-Host "###vso[task.setvariable variable=pip_cache;]${env:LOCALAPPDATA}\\pip\\Cache" - Write-Host "###vso[task.setvariable variable=conan_cache;]${env:USERPROFILE}\\.conan\\data" Write-Host "###vso[task.setvariable variable=PATH;]$python_bin\\bin;${env:PATH}" # Visual Studio is most likely used on Windows agents if (${env:GENERATOR} -match "2019" -and -not ${env:PLATFORM}) { @@ -61,29 +55,27 @@ steps: } } Write-Host "###vso[task.setvariable variable=build_tool_options;]-nologo -verbosity:minimal -maxcpucount:4 -p:CL_MPCount=4" - choco install winflexbison3 condition: eq(variables['Agent.OS'], 'Windows_NT') name: setup_windows - - task: Cache@2 - inputs: - key: pip | 2 | $(Agent.OS) - path: $(pip_cache) - name: cache_pip - - task: Cache@2 - inputs: - key: conan | 2 | $(Agent.OS) | $(arch) | $(build_type) - path: $(conan_cache) - name: cache_conan - - bash: | - set -e -x - pip install "conan==1.59.0" --user - conan profile new default --detect - name: install_conan + - pwsh: | + # boost-msvc-14.1 for VS 2017 + # boost-msvc-14.2 for VS 2019 + # boost-msvc-14.3 for VS 2022 + choco install -y boost-msvc-14.2 + condition: and(eq(variables['Agent.OS'], 'Windows_NT'), eq(variables['legacy'], 'on')) + name: setup_windows_boost + # Google Test doesn't seem to be available in Chocolatey + # Google Test 1.12.1 is the latest one to support C++11 (only relevant if legacy=on) + # and "apt install googletest" doesn't seem to do the trick on Linux - bash: | - set -e -x - conan profile update settings.compiler.libcxx=libstdc++11 default - condition: eq(variables['Agent.OS'], 'Linux') - name: update_conan_cxx_library + git clone --depth 1 --branch release-1.12.1 https://github.com/google/googletest.git + mkdir googletest/build + cd googletest/build + cmake -DCMAKE_INSTALL_PREFIX=install -DBUILD_SHARED_LIBS=on \ + ${GENERATOR:+-G} "${GENERATOR}" -A "${PLATFORM}" -T "${TOOLSET}" .. + cmake --build . --config ${BUILD_TYPE} --target install -- ${BUILD_TOOL_OPTIONS} + condition: or(eq(variables['Agent.OS'], 'Linux'), eq(variables['Agent.OS'], 'Windows_NT')) + name: setup_googletest - bash: | set -e -x sudo apt-get install libacl1-dev libncurses5-dev pkg-config @@ -109,7 +101,6 @@ steps: cyclonedds mkdir cyclonedds/build cd cyclonedds/build - conan install -b missing -s arch=${ARCH} -s build_type=${BUILD_TYPE} .. cmake -DCMAKE_BUILD_TYPE=${BUILD_TYPE} \ -DCMAKE_INSTALL_PREFIX=install \ -DCMAKE_PREFIX_PATH="${BUILD_SOURCESDIRECTORY}/iceoryx/build/install" \ @@ -122,10 +113,9 @@ steps: set -e -x mkdir build cd build - conan install -b missing -s arch=${ARCH} -s build_type=${BUILD_TYPE} .. cmake -DCMAKE_BUILD_TYPE=${BUILD_TYPE} \ -DCMAKE_INSTALL_PREFIX=install \ - -DCMAKE_PREFIX_PATH="${BUILD_SOURCESDIRECTORY}/cyclonedds/build/install;${BUILD_SOURCESDIRECTORY}/iceoryx/build/install" \ + -DCMAKE_PREFIX_PATH="${BUILD_SOURCESDIRECTORY}/cyclonedds/build/install;${BUILD_SOURCESDIRECTORY}/iceoryx/build/install;${BUILD_SOURCESDIRECTORY}/googletest/build/install" \ -DANALYZER=${ANALYZER:-off} \ -DSANITIZER=${SANITIZER:-none} \ -DENABLE_SHM=${ICEORYX:-off} \ diff --git a/CMakeLists.txt b/CMakeLists.txt index 1320987b..b42e8e4d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -10,37 +10,10 @@ # SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause # cmake_minimum_required(VERSION 3.16) -project(CycloneDDS-CXX VERSION 0.10.3 LANGUAGES C CXX) +project(CycloneDDS-CXX VERSION 0.10.4 LANGUAGES C CXX) set(CMAKE_MODULE_PATH "${CMAKE_CURRENT_LIST_DIR}/cmake/Modules") -# Conan -if(EXISTS "${CMAKE_BINARY_DIR}/conanbuildinfo.cmake" AND NOT CONAN_DEPENDENCIES) - include(${CMAKE_BINARY_DIR}/conanbuildinfo.cmake) - if(APPLE) - # By default Conan strips all RPATHs (see conanbuildinfo.cmake), which - # causes tests to fail as the executables cannot find the library target. - # By setting KEEP_RPATHS, Conan does not set CMAKE_SKIP_RPATH and the - # resulting binaries still have the RPATH information. This is fine because - # CMake will strip the build RPATH information in the install step. - # - # NOTE: - # Conan's default approach is to use the "imports" feature, which copies - # all the dependencies into the bin directory. Of course, this doesn't work - # quite that well for libraries generated in this Project (see Conan - # documentation). - # - # See the links below for more information. - # https://github.com/conan-io/conan/issues/337 - # https://docs.conan.io/en/latest/howtos/manage_shared_libraries/rpaths.html - # https://gitlab.kitware.com/cmake/community/wikis/doc/cmake/RPATH-handling - conan_basic_setup(KEEP_RPATHS) - else() - conan_basic_setup() - endif() - conan_define_targets() -endif() - # By default don't treat warnings as errors, else anyone building it with a # different compiler that just happens to generate a warning, as well as # anyone adding or modifying something and making a small mistake would run @@ -165,6 +138,9 @@ if(SANITIZER) add_compile_options("-fno-omit-frame-pointer") add_link_options("-fno-omit-frame-pointer") endif() + if(san STREQUAL "undefined") + add_compile_options("-fno-sanitize-recover=all") + endif() if(san AND NOT san STREQUAL "none") message(STATUS "Enabling sanitizer: ${san}") add_compile_options("-fsanitize=${san}") diff --git a/README.md b/README.md index 191c3fbf..1c53f28f 100644 --- a/README.md +++ b/README.md @@ -126,22 +126,10 @@ variable `BUILD_TESTING` to on when configuring, e.g.: $ cmake --build . $ ctest -Such a build requires the presence of [Google Test][7]. You can install this -yourself, or you can choose to instead rely on the [Conan][8] package manager -that the CI build infrastructure also uses. In that case, install Conan and do: - - $ conan install .. --build missing - -in the build directory prior to running `cmake`. This will automatically -download and/or build Google Test. - -For Windows, depending on the generator, you might also need to add switches -to select the architecture and build type, e.g., - - $ conan install -s arch=x86_64 -s build_type=Debug .. +Such a build requires the presence of [Google Test][7]. You need to install this +yourself. [7]: https://github.com/google/googletest -[8]: https://conan.io/ ## Documentation diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 38d1007b..83439c06 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -42,10 +42,33 @@ strategy: 'Ubuntu 20.04 LTS with Clang 10 (Release, x86_64)': image: ubuntu-20.04 build_type: Release - cc: clang-10 - cxx: clang++-10 - 'macOS 10.15 with Clang 12 (Debug, x86_64)': - image: macOS-10.15 + sanitizer: address,undefined + cc: clang-12 + cxx: clang++-12 + 'Ubuntu 22.04 LTS with CLang 12 (Release, x86_64, Iceoryx)': + image: ubuntu-22.04 + build_type: Release + iceoryx: on + sanitizer: address,undefined + cc: clang-12 + cxx: clang++-12 + # C++11 requires boost, installing boost on Windows takes forever, so use Linux + # Google test requires C++14 at minimum, so we can't build tests ... + 'Ubuntu 22.04 LTS with CLang 12 (Debug, x86_64, C++11)': + image: ubuntu-22.04 + build_type: Debug + cc: clang-12 + cxx: clang++-12 + legacy: on + 'Ubuntu 22.04 LTS with GCC 12 (Debug, x86_64, no type discovery)': + image: ubuntu-22.04 + sanitizer: address + cc: gcc-12 + cxx: g++-12 + type_discovery: off + topic_discovery: off + 'macOS 11 with Clang 12 (Debug, x86_64)': + image: macOS-11 sanitizer: address cc: clang cxx: clang++ @@ -58,7 +81,6 @@ strategy: #'Windows 2019 with Visual Studio 2019 (Visual Studio 2017, Debug, x86)': # arch: x86 # image: windows-2019 - # conanfile: conanfile102.txt # generator: 'Visual Studio 16 2019' # toolkit: v141 'Windows 2019 with Visual Studio 2019 (Debug, x86_64)': @@ -69,11 +91,12 @@ strategy: image: windows-2019 build_type: Release generator: 'Visual Studio 16 2019' - 'Windows 2019 with Visual Studio 2019 (Release, x86_64, c++11)': - image: windows-2019 - build_type: Release - generator: 'Visual Studio 16 2019' - legacy: on + #C++11 needs boost, installing boost takes forever on Windows + #'Windows 2019 with Visual Studio 2019 (Release, x86_64, c++11)': + # image: windows-2019 + # build_type: Release + # generator: 'Visual Studio 16 2019' + # legacy: on pool: vmImage: $(image) diff --git a/conanfile.txt b/conanfile.txt deleted file mode 100644 index fb211d9d..00000000 --- a/conanfile.txt +++ /dev/null @@ -1,9 +0,0 @@ -[requires] -gtest/1.10.0 -boost/1.78.0 - -[generators] -cmake - -[options] -gtest:shared=True diff --git a/src/ddscxx/include/dds/core/policy/CorePolicy.hpp b/src/ddscxx/include/dds/core/policy/CorePolicy.hpp index 78935380..7d073032 100644 --- a/src/ddscxx/include/dds/core/policy/CorePolicy.hpp +++ b/src/ddscxx/include/dds/core/policy/CorePolicy.hpp @@ -152,6 +152,9 @@ UserData; typedef dds::core::policy::detail::WriterDataLifecycle WriterDataLifecycle; +typedef dds::core::policy::detail::WriterBatching +WriterBatching; + #ifdef OMG_DDS_PERSISTENCE_SUPPORT typedef ::dds::core::policy::detail::DurabilityService DurabilityService; @@ -192,6 +195,7 @@ OMG_DDS_POLICY_TRAITS(DurabilityService, 22) OMG_DDS_POLICY_TRAITS(DataRepresentation, 23) OMG_DDS_POLICY_TRAITS(TypeConsistencyEnforcement, 24) #endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT +OMG_DDS_POLICY_TRAITS(WriterBatching, 25) } } diff --git a/src/ddscxx/include/dds/core/policy/TCorePolicy.hpp b/src/ddscxx/include/dds/core/policy/TCorePolicy.hpp index 4faefc07..9644fe2c 100644 --- a/src/ddscxx/include/dds/core/policy/TCorePolicy.hpp +++ b/src/ddscxx/include/dds/core/policy/TCorePolicy.hpp @@ -842,6 +842,66 @@ class TWriterDataLifecycle : public dds::core::Value */ static TWriterDataLifecycle ManuallyDisposeUnregisteredInstances(); +}; +//============================================================================== + +template +class TWriterBatching : public dds::core::Value +{ +public: + /** + * Creates a WriterBatching QoS instance + * + * @param batch_updates a boolean indicating if updates should be batched + * before being explicitly flushed + */ + explicit TWriterBatching(bool batch_updates = false); + + /** + * Copies a WriterBatching QoS instance + * + * @param other the WriterBatching QoS instance to copy + */ + TWriterBatching(const TWriterBatching& other); + + /** + * Copies a WriterBatching QoS instance + * + * @param other the WriterBatching QoS instance to copy + * + * @return reference to the WriterBatching QoS instance that was copied to + */ + TWriterBatching& operator=(const TWriterBatching& other) = default; + +public: + /** + * Gets a boolean indicating if updates should be batched + * + * @return a boolean indicating if updates should be batched + */ + bool batch_updates() const; + + /** + * Sets a boolean indicating if updates should be batched + * + * @param batch_updates a boolean indicating if updates should be batched + */ + TWriterBatching& batch_updates( + bool batch_updates); + +public: + /** + * @return a WriterBatching QoS instance with batch_updates + * set to true + */ + static TWriterBatching BatchUpdates(); + + /** + * @return a WriterBatching QoS instance with batch_updates + * set to false + */ + static TWriterBatching DoNotBatchUpdates(); + }; //============================================================================== diff --git a/src/ddscxx/include/dds/core/policy/detail/CorePolicy.hpp b/src/ddscxx/include/dds/core/policy/detail/CorePolicy.hpp index ba3eefe2..4b38c0ee 100644 --- a/src/ddscxx/include/dds/core/policy/detail/CorePolicy.hpp +++ b/src/ddscxx/include/dds/core/policy/detail/CorePolicy.hpp @@ -105,6 +105,9 @@ namespace dds { namespace core { namespace policy { namespace detail { typedef dds::core::policy::TWriterDataLifecycle WriterDataLifecycle; + + typedef dds::core::policy::TWriterBatching + WriterBatching; } } } } // namespace dds::core::policy::detail diff --git a/src/ddscxx/include/dds/core/policy/detail/TCorePolicyImpl.hpp b/src/ddscxx/include/dds/core/policy/detail/TCorePolicyImpl.hpp index 00c67aa6..ab298732 100644 --- a/src/ddscxx/include/dds/core/policy/detail/TCorePolicyImpl.hpp +++ b/src/ddscxx/include/dds/core/policy/detail/TCorePolicyImpl.hpp @@ -501,6 +501,45 @@ TWriterDataLifecycle TWriterDataLifecycle::ManuallyDisposeUnregisteredInst return TWriterDataLifecycle(false); } +//TWriterBatching +template +TWriterBatching::TWriterBatching(bool batch_updates): dds::core::Value(batch_updates) +{ +} + +template +TWriterBatching::TWriterBatching(const TWriterBatching& other): dds::core::Value(other.delegate()) +{ +} + +template +bool TWriterBatching::batch_updates() const +{ + return this->delegate().batch_updates(); +} + +template +TWriterBatching& TWriterBatching::batch_updates( + bool batch_updates) +{ + this->delegate().batch_updates(batch_updates); + return *this; +} + +template +TWriterBatching TWriterBatching::BatchUpdates() +{ + return TWriterBatching(true); +} + + +template +TWriterBatching TWriterBatching::DoNotBatchUpdates() +{ + return TWriterBatching(false); +} + + //TReaderDataLifecycle template TReaderDataLifecycle::TReaderDataLifecycle(const dds::core::Duration& autopurge_nowriter_samples_delay, const dds::core::Duration& autopurge_disposed_samples_delay) diff --git a/src/ddscxx/include/dds/pub/detail/DataWriter.hpp b/src/ddscxx/include/dds/pub/detail/DataWriter.hpp index 1f329e67..3b1e1211 100644 --- a/src/ddscxx/include/dds/pub/detail/DataWriter.hpp +++ b/src/ddscxx/include/dds/pub/detail/DataWriter.hpp @@ -61,9 +61,7 @@ class dds::pub::detail::DataWriter : public ::org::eclipse::cyclonedds::pub::Any DataWriter(const dds::pub::Publisher& pub, const ::dds::topic::Topic& topic, - const dds::pub::qos::DataWriterQos& qos, - dds::pub::DataWriterListener* listener, - const dds::core::status::StatusMask& mask); + const dds::pub::qos::DataWriterQos& qos); virtual ~DataWriter(); diff --git a/src/ddscxx/include/dds/pub/detail/DataWriterImpl.hpp b/src/ddscxx/include/dds/pub/detail/DataWriterImpl.hpp index 1689d5ff..af48e057 100644 --- a/src/ddscxx/include/dds/pub/detail/DataWriterImpl.hpp +++ b/src/ddscxx/include/dds/pub/detail/DataWriterImpl.hpp @@ -44,8 +44,9 @@ DataWriter::DataWriter( const dds::pub::Publisher& pub, const dds::topic::Topic& topic) : dds::core::Reference< DELEGATE >( - new DELEGATE(pub, topic, pub.default_datawriter_qos(), NULL, dds::core::status::StatusMask::none())) + new DELEGATE(pub, topic, pub.default_datawriter_qos())) { + this->delegate()->listener(NULL, dds::core::status::StatusMask::none()); this->delegate()->init(this->impl_); } @@ -56,8 +57,9 @@ DataWriter::DataWriter(const dds::pub::Publisher& pub, dds::pub::DataWriterListener* listener, const dds::core::status::StatusMask& mask) : dds::core::Reference< DELEGATE >( - new DELEGATE(pub, topic, qos, listener, mask)) + new DELEGATE(pub, topic, qos)) { + this->delegate()->listener(listener, mask); this->delegate()->init(this->impl_); } @@ -367,9 +369,7 @@ template dds::pub::detail::DataWriter::DataWriter( const dds::pub::Publisher& pub, const ::dds::topic::Topic& topic, - const dds::pub::qos::DataWriterQos& qos, - dds::pub::DataWriterListener* listener, - const dds::core::status::StatusMask& mask) + const dds::pub::qos::DataWriterQos& qos) : ::org::eclipse::cyclonedds::pub::AnyDataWriterDelegate(qos, topic), pub_(pub), topic_(topic) { DDSCXX_WARNING_MSVC_OFF(6326) @@ -389,7 +389,7 @@ dds::pub::detail::DataWriter::DataWriter( std::string name = topic.name() + "_datawriter"; - this->listener(listener, mask); + this->listener_set(nullptr, dds::core::status::StatusMask::all(), false); dds_entity_t ddsc_writer = dds_create_writer (ddsc_pub, ddsc_topic, ddsc_qos, this->listener_callbacks); dds_delete_qos(ddsc_qos); ISOCPP_DDSC_RESULT_CHECK_AND_THROW(ddsc_writer, "Could not create DataWriter."); @@ -421,38 +421,6 @@ dds::pub::detail::DataWriter::init(ObjectDelegate::weak_ref_type weak_ref) /* Register writer at publisher. */ this->pub_.delegate()->add_datawriter(*this); - // Because listeners are added after writer is created (which is in enabled state, because - // disabled state is not yet supported), events could have occured before listeners were - // registered. Therefore the event handlers for those events are called here. - if (this->listener_get()) { - dds::core::status::StatusMask writerStatus = status_changes(); - - if (listener_mask.to_ulong() & dds::core::status::StatusMask::liveliness_lost().to_ulong() - && writerStatus.test(DDS_LIVELINESS_LOST_STATUS_ID)) - { - dds::core::status::LivelinessLostStatus status = liveliness_lost_status(); - on_liveliness_lost(this->ddsc_entity, status); - } - if (listener_mask.to_ulong() & dds::core::status::StatusMask::offered_deadline_missed().to_ulong() - && writerStatus.test(DDS_OFFERED_DEADLINE_MISSED_STATUS_ID)) - { - dds::core::status::OfferedDeadlineMissedStatus status = offered_deadline_missed_status(); - on_offered_deadline_missed(this->ddsc_entity, status); - } - if (listener_mask.to_ulong() & dds::core::status::StatusMask::offered_incompatible_qos().to_ulong() - && writerStatus.test(DDS_OFFERED_INCOMPATIBLE_QOS_STATUS_ID)) - { - dds::core::status::OfferedIncompatibleQosStatus status = offered_incompatible_qos_status(); - on_offered_incompatible_qos(this->ddsc_entity, status); - } - if (listener_mask.to_ulong() & dds::core::status::StatusMask::publication_matched().to_ulong() - && writerStatus.test(DDS_PUBLICATION_MATCHED_STATUS_ID)) - { - dds::core::status::PublicationMatchedStatus status = publication_matched_status(); - on_publication_matched(this->ddsc_entity, status); - } - } - /* Enable when needed. */ if (this->pub_.delegate()->is_auto_enable()) { this->enable(); @@ -882,7 +850,7 @@ dds::pub::detail::DataWriter::listener(DataWriterListener* listener, { org::eclipse::cyclonedds::core::ScopedObjectLock scopedLock(*this); this->check(); - this->listener_set(listener, mask); + this->listener_set(listener, mask, true); } template @@ -900,7 +868,7 @@ dds::pub::detail::DataWriter::close() { org::eclipse::cyclonedds::core::ScopedObjectLock scopedLock(*this); - this->listener_set(NULL, dds::core::status::StatusMask::none()); + this->listener_set(NULL, dds::core::status::StatusMask::none(), true); topic_.delegate()->decrNrDependents(); diff --git a/src/ddscxx/include/dds/pub/qos/detail/DataWriterQos.hpp b/src/ddscxx/include/dds/pub/qos/detail/DataWriterQos.hpp index 5034a423..537a2676 100644 --- a/src/ddscxx/include/dds/pub/qos/detail/DataWriterQos.hpp +++ b/src/ddscxx/include/dds/pub/qos/detail/DataWriterQos.hpp @@ -53,6 +53,7 @@ * dds::core::policy::WriterDataLifecycle | Dispose with unregister or not (@ref DCPS_QoS_WriterDataLifecycle "info") | WriterDataLifecycle::AutoDisposeUnregisteredInstances() * dds::core::policy::DataRepresentation | Supported data representation kinds (@ref DCPS_QoS_DataRepresentation "info") | DataRepresentation::DataRepresentation(dds::core::policy::DataRepresentationId::XCDR1) * dds::core::policy::TypeConsistencyEnforcement | Type consistency enforcement policies (@ref DCPS_QoS_TypeConsistencyEnforcement "info") | dds::core::policy::TypeConsistencyKind::DISALLOW_TYPE_COERCION + * dds::core::policy::WriterBatching | Writer data batching | dds::core::policy::WriterBatching::DoNotBatchUpdates() * * A QosPolicy can be set when the DataWriter is created or modified with the set * qos operation. diff --git a/src/ddscxx/include/dds/sub/detail/DataReader.hpp b/src/ddscxx/include/dds/sub/detail/DataReader.hpp index 59556a4f..e4b8b481 100644 --- a/src/ddscxx/include/dds/sub/detail/DataReader.hpp +++ b/src/ddscxx/include/dds/sub/detail/DataReader.hpp @@ -46,18 +46,13 @@ class dds::sub::detail::DataReader : public ::org::eclipse::cyclonedds::sub::Any DataReader(const dds::sub::Subscriber& sub, const dds::topic::Topic& topic, - const dds::sub::qos::DataReaderQos& qos, - dds::sub::DataReaderListener* listener = NULL, - const dds::core::status::StatusMask& mask = ::dds::core::status::StatusMask::none()); + const dds::sub::qos::DataReaderQos& qos); DataReader(const dds::sub::Subscriber& sub, const dds::topic::ContentFilteredTopic& topic, - const dds::sub::qos::DataReaderQos& qos, - dds::sub::DataReaderListener* listener = NULL, - const dds::core::status::StatusMask& mask = ::dds::core::status::StatusMask::none()); + const dds::sub::qos::DataReaderQos& qos); - void common_constructor(dds::sub::DataReaderListener* listener, - const dds::core::status::StatusMask& mask); + void common_constructor(); virtual ~DataReader(); diff --git a/src/ddscxx/include/dds/sub/detail/TDataReaderImpl.hpp b/src/ddscxx/include/dds/sub/detail/TDataReaderImpl.hpp index 30a4ebe7..420f8129 100644 --- a/src/ddscxx/include/dds/sub/detail/TDataReaderImpl.hpp +++ b/src/ddscxx/include/dds/sub/detail/TDataReaderImpl.hpp @@ -229,6 +229,7 @@ DataReader::DataReader( const dds::topic::Topic& topic): ::dds::core::Reference< DELEGATE >(new DELEGATE(sub, topic, sub->default_datareader_qos())) { + this->delegate()->listener(NULL, dds::core::status::StatusMask::none()); this->delegate()->init(this->impl_); } @@ -239,8 +240,9 @@ DataReader::DataReader( const dds::sub::qos::DataReaderQos& qos, dds::sub::DataReaderListener* listener, const dds::core::status::StatusMask& mask) : - ::dds::core::Reference< DELEGATE >(new DELEGATE(sub, topic, qos, listener, mask)) + ::dds::core::Reference< DELEGATE >(new DELEGATE(sub, topic, qos)) { + this->delegate()->listener(listener, mask); this->delegate()->init(this->impl_); } @@ -251,6 +253,7 @@ DataReader::DataReader( const dds::topic::ContentFilteredTopic& topic) : ::dds::core::Reference< DELEGATE >(new DELEGATE(sub, topic, sub.default_datareader_qos())) { + this->delegate()->listener(NULL, dds::core::status::StatusMask::none()); this->delegate()->init(this->impl_); } @@ -261,8 +264,9 @@ DataReader::DataReader( const dds::sub::qos::DataReaderQos& qos, dds::sub::DataReaderListener* listener, const dds::core::status::StatusMask& mask) : - ::dds::core::Reference< DELEGATE >(new DELEGATE(sub, topic, qos, listener, mask)) + ::dds::core::Reference< DELEGATE >(new DELEGATE(sub, topic, qos)) { + this->delegate()->listener(listener, mask); this->delegate()->init(this->impl_); } #endif /* OMG_DDS_CONTENT_SUBSCRIPTION_SUPPORT */ @@ -274,6 +278,7 @@ DataReader::DataReader( const dds::topic::MultiTopic& topic) : ::dds::core::Reference< DELEGATE >(new DELEGATE(sub, topic)) { + this->delegate()->listener(NULL, dds::core::status::StatusMask::none()); this->delegate()->init(this->impl_); } @@ -284,8 +289,9 @@ DataReader::DataReader( const dds::sub::qos::DataReaderQos& qos, dds::sub::DataReaderListener* listener, const dds::core::status::StatusMask& mask) : - ::dds::core::Reference< DELEGATE >(new DELEGATE(sub, topic, qos, listener, mask)) + ::dds::core::Reference< DELEGATE >(new DELEGATE(sub, topic, qos)) { + this->delegate()->listener(listener, mask); this->delegate()->init(this->impl_); } #endif /* OMG_DDS_MULTI_TOPIC_SUPPORT */ @@ -449,33 +455,27 @@ DataReader::listener() const template dds::sub::detail::DataReader::DataReader(const dds::sub::Subscriber& sub, const dds::topic::Topic& topic, - const dds::sub::qos::DataReaderQos& qos, - dds::sub::DataReaderListener* listener, - const dds::core::status::StatusMask& mask) + const dds::sub::qos::DataReaderQos& qos) : ::org::eclipse::cyclonedds::sub::AnyDataReaderDelegate(qos, topic), sub_(sub), typed_sample_() { - common_constructor(listener, mask); + common_constructor(); } template dds::sub::detail::DataReader::DataReader(const dds::sub::Subscriber& sub, const dds::topic::ContentFilteredTopic& topic, - const dds::sub::qos::DataReaderQos& qos, - dds::sub::DataReaderListener* listener, - const dds::core::status::StatusMask& mask) + const dds::sub::qos::DataReaderQos& qos) : ::org::eclipse::cyclonedds::sub::AnyDataReaderDelegate(qos, topic), sub_(sub), typed_sample_() { - common_constructor(listener, mask); + common_constructor(); } template void -dds::sub::detail::DataReader::common_constructor( - dds::sub::DataReaderListener* listener, - const dds::core::status::StatusMask& mask) +dds::sub::detail::DataReader::common_constructor() { DDSCXX_WARNING_MSVC_OFF(4127) DDSCXX_WARNING_MSVC_OFF(6326) @@ -499,7 +499,7 @@ dds::sub::detail::DataReader::common_constructor( c_value *params = this->AnyDataReaderDelegate::td_.delegate()->reader_parameters(); #endif - this->listener(listener, mask); + this->listener_set(nullptr, dds::core::status::StatusMask::all(), false); dds_entity_t ddsc_reader = dds_create_reader(ddsc_sub, ddsc_top, ddsc_qos, this->listener_callbacks); dds_delete_qos(ddsc_qos); ISOCPP_DDSC_RESULT_CHECK_AND_THROW(ddsc_reader, "Could not create DataReader."); @@ -758,7 +758,7 @@ dds::sub::detail::DataReader::close() this->prevent_callbacks(); org::eclipse::cyclonedds::core::ScopedObjectLock scopedLock(*this); - this->listener_set(NULL, dds::core::status::StatusMask::none()); + this->listener_set(NULL, dds::core::status::StatusMask::none(), true); this->sub_.delegate()->remove_datareader(*this); @@ -791,7 +791,7 @@ dds::sub::detail::DataReader::listener( const dds::core::status::StatusMask& event_mask) { org::eclipse::cyclonedds::core::ScopedObjectLock scopedLock(*this); - this->listener_set( l, event_mask ) ; + this->listener_set( l, event_mask, true ) ; scopedLock.unlock(); } diff --git a/src/ddscxx/include/dds/topic/detail/TTopicImpl.hpp b/src/ddscxx/include/dds/topic/detail/TTopicImpl.hpp index 37f1df24..6075f62e 100644 --- a/src/ddscxx/include/dds/topic/detail/TTopicImpl.hpp +++ b/src/ddscxx/include/dds/topic/detail/TTopicImpl.hpp @@ -230,7 +230,7 @@ dds::topic::detail::Topic::close() ISOCPP_THROW_EXCEPTION(ISOCPP_PRECONDITION_NOT_MET_ERROR, "Topic still has unclosed dependencies (e.g. Readers/Writers/ContentFilteredTopics)"); } - this->listener_set(NULL, dds::core::status::StatusMask::none()); + this->listener_set(NULL, dds::core::status::StatusMask::none(), true); this->myParticipant.delegate()->remove_topic(*this); @@ -261,7 +261,7 @@ dds::topic::detail::Topic::listener(TopicListener* listener, const ::dds::core::status::StatusMask& mask) { org::eclipse::cyclonedds::core::ScopedObjectLock scopedLock(*this); - this->listener_set(listener, mask); + this->listener_set(listener, mask, true); scopedLock.unlock(); } diff --git a/src/ddscxx/include/org/eclipse/cyclonedds/core/EntityDelegate.hpp b/src/ddscxx/include/org/eclipse/cyclonedds/core/EntityDelegate.hpp index c883ff94..446ca2c0 100644 --- a/src/ddscxx/include/org/eclipse/cyclonedds/core/EntityDelegate.hpp +++ b/src/ddscxx/include/org/eclipse/cyclonedds/core/EntityDelegate.hpp @@ -33,6 +33,14 @@ namespace cyclonedds { namespace core { +class OMG_DDS_API EntityDelegate; + +struct ListenerArg { + EntityDelegate *cpp_ref; + bool reset_on_invoke; + + ListenerArg(EntityDelegate *cpp_ref_, bool reset_on_invoke_); +}; class OMG_DDS_API EntityDelegate : public virtual ::org::eclipse::cyclonedds::core::DDScObjectDelegate @@ -65,7 +73,10 @@ class OMG_DDS_API EntityDelegate : protected: void listener_set(void *listener, - const dds::core::status::StatusMask& mask); + const dds::core::status::StatusMask& mask, + bool reset_on_invoke); + + void prevent_callbacks(); public: const dds::core::status::StatusMask get_listener_mask() const ; @@ -121,7 +132,6 @@ class OMG_DDS_API EntityDelegate : static volatile unsigned int entityID_; bool enabled_; dds::core::status::StatusMask listener_mask; - void prevent_callbacks(); long callback_count; dds_listener_t *listener_callbacks; diff --git a/src/ddscxx/include/org/eclipse/cyclonedds/core/policy/PolicyDelegate.hpp b/src/ddscxx/include/org/eclipse/cyclonedds/core/policy/PolicyDelegate.hpp index 2e0756ec..38a5b3db 100644 --- a/src/ddscxx/include/org/eclipse/cyclonedds/core/policy/PolicyDelegate.hpp +++ b/src/ddscxx/include/org/eclipse/cyclonedds/core/policy/PolicyDelegate.hpp @@ -898,6 +898,29 @@ class OMG_DDS_API WriterDataLifecycleDelegate bool autodispose_; }; +//============================================================================== + +class OMG_DDS_API WriterBatchingDelegate +{ +public: + WriterBatchingDelegate(const WriterBatchingDelegate& other); + explicit WriterBatchingDelegate(bool batch_updates); + + bool batch_updates() const; + void batch_updates(bool b); + + bool operator ==(const WriterBatchingDelegate& other) const; + + WriterBatchingDelegate& operator =(const WriterBatchingDelegate& other) = default; + + void check() const; + + void set_iso_policy(const dds_qos_t* qos); + void set_c_policy(dds_qos_t* qos) const; + +private: + bool batch_updates_; +}; #ifdef OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT diff --git a/src/ddscxx/include/org/eclipse/cyclonedds/pub/qos/DataWriterQosDelegate.hpp b/src/ddscxx/include/org/eclipse/cyclonedds/pub/qos/DataWriterQosDelegate.hpp index dc3d9183..62f1fa7a 100644 --- a/src/ddscxx/include/org/eclipse/cyclonedds/pub/qos/DataWriterQosDelegate.hpp +++ b/src/ddscxx/include/org/eclipse/cyclonedds/pub/qos/DataWriterQosDelegate.hpp @@ -60,6 +60,7 @@ class OMG_DDS_API DataWriterQosDelegate void policy(const dds::core::policy::DataRepresentation& datarepresentation); void policy(const dds::core::policy::TypeConsistencyEnforcement& typeconsistencyenforcement); #endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT + void policy(const dds::core::policy::WriterBatching& writerbatching); template const POLICY& policy() const; template POLICY& policy(); @@ -96,6 +97,7 @@ class OMG_DDS_API DataWriterQosDelegate dds::core::policy::DataRepresentation datarepresentation_; dds::core::policy::TypeConsistencyEnforcement typeconsistencyenforcement_; #endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT + dds::core::policy::WriterBatching writerbatching_; }; @@ -296,6 +298,18 @@ DataWriterQosDelegate::policy() } #endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT +template<> inline const dds::core::policy::WriterBatching& +DataWriterQosDelegate::policy() const +{ + return writerbatching_; +} + +template<> inline dds::core::policy::WriterBatching& +DataWriterQosDelegate::policy() +{ + return writerbatching_; +} + } } } diff --git a/src/ddscxx/src/dds/core/policy/CorePolicy.cpp b/src/ddscxx/src/dds/core/policy/CorePolicy.cpp index a6acb07e..dfb050e5 100644 --- a/src/ddscxx/src/dds/core/policy/CorePolicy.cpp +++ b/src/ddscxx/src/dds/core/policy/CorePolicy.cpp @@ -50,3 +50,4 @@ OMG_DDS_DEFINE_POLICY_TRAITS(dds::core::policy::TypeConsistencyEnforcement, Type #ifdef OMG_DDS_PERSISTENCE_SUPPORT OMG_DDS_DEFINE_POLICY_TRAITS(dds::core::policy::DurabilityService, DurabilityService) #endif // OMG_DDS_PERSISTENCE_SUPPORT +OMG_DDS_DEFINE_POLICY_TRAITS(dds::core::policy::WriterBatching, WriterBatching) diff --git a/src/ddscxx/src/org/eclipse/cyclonedds/core/EntityDelegate.cpp b/src/ddscxx/src/org/eclipse/cyclonedds/core/EntityDelegate.cpp index 26be0d31..9e379170 100644 --- a/src/ddscxx/src/org/eclipse/cyclonedds/core/EntityDelegate.cpp +++ b/src/ddscxx/src/org/eclipse/cyclonedds/core/EntityDelegate.cpp @@ -30,6 +30,10 @@ #include +org::eclipse::cyclonedds::core::ListenerArg::ListenerArg(EntityDelegate *cpp_ref_, bool reset_on_invoke_) : + cpp_ref(cpp_ref_), reset_on_invoke(reset_on_invoke_) +{ +} org::eclipse::cyclonedds::core::EntityDelegate::EntityDelegate() : enabled_(false), @@ -50,7 +54,10 @@ org::eclipse::cyclonedds::core::EntityDelegate::~EntityDelegate() { if (this->listener_callbacks != NULL) { + void *arg; + dds_lget_data_available_arg(this->listener_callbacks, nullptr, &arg, nullptr); dds_delete_listener(this->listener_callbacks); + delete reinterpret_cast(arg); } this->listener_callbacks = NULL; @@ -145,72 +152,74 @@ org::eclipse::cyclonedds::core::EntityDelegate::retain() void org::eclipse::cyclonedds::core::EntityDelegate::listener_set( void *_listener, - const dds::core::status::StatusMask& mask) + const dds::core::status::StatusMask& mask, + bool reset_on_invoke) { dds_listener_t *callbacks; this->listener = _listener; this->listener_mask = mask; - callbacks = dds_create_listener(this); + org::eclipse::cyclonedds::core::ListenerArg *arg = new org::eclipse::cyclonedds::core::ListenerArg(this, reset_on_invoke); + callbacks = dds_create_listener(arg); // Set topic callbacks if (STATUS_MASK_CONTAINS(mask, dds::core::status::StatusMask::inconsistent_topic())) { - dds_lset_inconsistent_topic(callbacks, callback_on_inconsistent_topic); + dds_lset_inconsistent_topic_arg(callbacks, callback_on_inconsistent_topic, arg, reset_on_invoke); } // Set writer callbacks if (STATUS_MASK_CONTAINS(mask, dds::core::status::StatusMask::offered_deadline_missed())) { - dds_lset_offered_deadline_missed(callbacks, callback_on_offered_deadline_missed); + dds_lset_offered_deadline_missed_arg(callbacks, callback_on_offered_deadline_missed, arg, reset_on_invoke); } if (STATUS_MASK_CONTAINS(mask, dds::core::status::StatusMask::offered_incompatible_qos())) { - dds_lset_offered_incompatible_qos(callbacks, callback_on_offered_incompatible_qos); + dds_lset_offered_incompatible_qos_arg(callbacks, callback_on_offered_incompatible_qos, arg, reset_on_invoke); } if (STATUS_MASK_CONTAINS(mask, dds::core::status::StatusMask::liveliness_lost())) { - dds_lset_liveliness_lost(callbacks, callback_on_liveliness_lost); + dds_lset_liveliness_lost_arg(callbacks, callback_on_liveliness_lost, arg, reset_on_invoke); } if (STATUS_MASK_CONTAINS(mask, dds::core::status::StatusMask::publication_matched())) { - dds_lset_publication_matched(callbacks, callback_on_publication_matched); + dds_lset_publication_matched_arg(callbacks, callback_on_publication_matched, arg, reset_on_invoke); } // Set reader callbacks if (STATUS_MASK_CONTAINS(mask, dds::core::status::StatusMask::requested_deadline_missed())) { - dds_lset_requested_deadline_missed(callbacks, callback_on_requested_deadline_missed); + dds_lset_requested_deadline_missed_arg(callbacks, callback_on_requested_deadline_missed, arg, reset_on_invoke); } if (STATUS_MASK_CONTAINS(mask, dds::core::status::StatusMask::requested_incompatible_qos())) { - dds_lset_requested_incompatible_qos(callbacks, callback_on_requested_incompatible_qos); + dds_lset_requested_incompatible_qos_arg(callbacks, callback_on_requested_incompatible_qos, arg, reset_on_invoke); } if (STATUS_MASK_CONTAINS(mask, dds::core::status::StatusMask::sample_rejected())) { - dds_lset_sample_rejected(callbacks, callback_on_sample_rejected); + dds_lset_sample_rejected_arg(callbacks, callback_on_sample_rejected, arg, reset_on_invoke); } if (STATUS_MASK_CONTAINS(mask, dds::core::status::StatusMask::liveliness_changed())) { - dds_lset_liveliness_changed(callbacks, callback_on_liveliness_changed); + dds_lset_liveliness_changed_arg(callbacks, callback_on_liveliness_changed, arg, reset_on_invoke); } if (STATUS_MASK_CONTAINS(mask, dds::core::status::StatusMask::data_available())) { - dds_lset_data_available(callbacks, callback_on_data_available); + dds_lset_data_available_arg(callbacks, callback_on_data_available, arg, reset_on_invoke); } if (STATUS_MASK_CONTAINS(mask, dds::core::status::StatusMask::subscription_matched())) { - dds_lset_subscription_matched(callbacks, callback_on_subscription_matched); + dds_lset_subscription_matched_arg(callbacks, callback_on_subscription_matched, arg, reset_on_invoke); } if (STATUS_MASK_CONTAINS(mask, dds::core::status::StatusMask::sample_lost())) { - dds_lset_sample_lost(callbacks, callback_on_sample_lost); + dds_lset_sample_lost_arg(callbacks, callback_on_sample_lost, arg, reset_on_invoke); } // Set subscriber callbacks if (STATUS_MASK_CONTAINS(mask, dds::core::status::StatusMask::data_on_readers())) { - dds_lset_data_on_readers(callbacks, callback_on_data_readers); + dds_lset_data_on_readers_arg(callbacks, callback_on_data_readers, arg, reset_on_invoke); } // If entity enabled: set listener on ddsc entity @@ -224,7 +233,10 @@ org::eclipse::cyclonedds::core::EntityDelegate::listener_set( // Delete previous ddsc listener callbacks object if (this->listener_callbacks != NULL) { + void *prev_arg; + dds_lget_data_available_arg(this->listener_callbacks, nullptr, &prev_arg, nullptr); dds_delete_listener(this->listener_callbacks); + delete reinterpret_cast(prev_arg); } // Store new listener diff --git a/src/ddscxx/src/org/eclipse/cyclonedds/core/ListenerDispatcher.cpp b/src/ddscxx/src/org/eclipse/cyclonedds/core/ListenerDispatcher.cpp index 7efd157d..5f24c3a8 100644 --- a/src/ddscxx/src/org/eclipse/cyclonedds/core/ListenerDispatcher.cpp +++ b/src/ddscxx/src/org/eclipse/cyclonedds/core/ListenerDispatcher.cpp @@ -36,15 +36,15 @@ extern "C" DDS_FN_EXPORT void callback_on_inconsistent_topic (dds_entity_t topic, dds_inconsistent_topic_status_t status, void* arg) { - org::eclipse::cyclonedds::core::EntityDelegate *ed = - reinterpret_cast(arg); + org::eclipse::cyclonedds::core::ListenerArg *la = + reinterpret_cast(arg); - if (ed->obtain_callback_lock()) + if (la->reset_on_invoke && la->cpp_ref->obtain_callback_lock()) { - org::eclipse::cyclonedds::core::InconsistentTopicStatusDelegate sd; - sd.ddsc_status(&status); - ed->on_inconsistent_topic(topic, sd); - ed->release_callback_lock(); + org::eclipse::cyclonedds::core::InconsistentTopicStatusDelegate sd; + sd.ddsc_status(&status); + la->cpp_ref->on_inconsistent_topic(topic, sd); + la->cpp_ref->release_callback_lock(); } } @@ -52,60 +52,60 @@ extern "C" DDS_FN_EXPORT void callback_on_offered_deadline_missed (dds_entity_t writer, dds_offered_deadline_missed_status_t status, void* arg) { - org::eclipse::cyclonedds::core::EntityDelegate *ed = - reinterpret_cast(arg); + org::eclipse::cyclonedds::core::ListenerArg *la = + reinterpret_cast(arg); - if (ed->obtain_callback_lock()) + if (la->reset_on_invoke && la->cpp_ref->obtain_callback_lock()) { - org::eclipse::cyclonedds::core::OfferedDeadlineMissedStatusDelegate sd; - sd.ddsc_status(&status); - ed->on_offered_deadline_missed(writer, sd); - ed->release_callback_lock(); + org::eclipse::cyclonedds::core::OfferedDeadlineMissedStatusDelegate sd; + sd.ddsc_status(&status); + la->cpp_ref->on_offered_deadline_missed(writer, sd); + la->cpp_ref->release_callback_lock(); } } DDS_FN_EXPORT void callback_on_offered_incompatible_qos (dds_entity_t writer, dds_offered_incompatible_qos_status_t status, void* arg) { - org::eclipse::cyclonedds::core::EntityDelegate *ed = - reinterpret_cast(arg); + org::eclipse::cyclonedds::core::ListenerArg *la = + reinterpret_cast(arg); - if (ed->obtain_callback_lock()) + if (la->reset_on_invoke && la->cpp_ref->obtain_callback_lock()) { - org::eclipse::cyclonedds::core::OfferedIncompatibleQosStatusDelegate sd; - sd.ddsc_status(&status); - ed->on_offered_incompatible_qos(writer, sd); - ed->release_callback_lock(); + org::eclipse::cyclonedds::core::OfferedIncompatibleQosStatusDelegate sd; + sd.ddsc_status(&status); + la->cpp_ref->on_offered_incompatible_qos(writer, sd); + la->cpp_ref->release_callback_lock(); } } DDS_FN_EXPORT void callback_on_liveliness_lost (dds_entity_t writer, dds_liveliness_lost_status_t status, void* arg) { - org::eclipse::cyclonedds::core::EntityDelegate *ed = - reinterpret_cast(arg); + org::eclipse::cyclonedds::core::ListenerArg *la = + reinterpret_cast(arg); - if (ed->obtain_callback_lock()) + if (la->reset_on_invoke && la->cpp_ref->obtain_callback_lock()) { - org::eclipse::cyclonedds::core::LivelinessLostStatusDelegate sd; - sd.ddsc_status(&status); - ed->on_liveliness_lost(writer, sd); - ed->release_callback_lock(); + org::eclipse::cyclonedds::core::LivelinessLostStatusDelegate sd; + sd.ddsc_status(&status); + la->cpp_ref->on_liveliness_lost(writer, sd); + la->cpp_ref->release_callback_lock(); } } DDS_FN_EXPORT void callback_on_publication_matched (dds_entity_t writer, dds_publication_matched_status_t status, void* arg) { - org::eclipse::cyclonedds::core::EntityDelegate *ed = - reinterpret_cast(arg); + org::eclipse::cyclonedds::core::ListenerArg *la = + reinterpret_cast(arg); - if (ed->obtain_callback_lock()) + if (la->reset_on_invoke && la->cpp_ref->obtain_callback_lock()) { - org::eclipse::cyclonedds::core::PublicationMatchedStatusDelegate sd; - sd.ddsc_status(&status); - ed->on_publication_matched(writer, sd); - ed->release_callback_lock(); + org::eclipse::cyclonedds::core::PublicationMatchedStatusDelegate sd; + sd.ddsc_status(&status); + la->cpp_ref->on_publication_matched(writer, sd); + la->cpp_ref->release_callback_lock(); } } @@ -113,113 +113,115 @@ extern "C" DDS_FN_EXPORT void callback_on_requested_deadline_missed (dds_entity_t reader, dds_requested_deadline_missed_status_t status, void* arg) { - org::eclipse::cyclonedds::core::EntityDelegate *ed = - reinterpret_cast(arg); + org::eclipse::cyclonedds::core::ListenerArg *la = + reinterpret_cast(arg); - if (ed->obtain_callback_lock()) + if (la->reset_on_invoke && la->cpp_ref->obtain_callback_lock()) { - org::eclipse::cyclonedds::core::RequestedDeadlineMissedStatusDelegate sd; - sd.ddsc_status(&status); - ed->on_requested_deadline_missed(reader, sd); - ed->release_callback_lock(); + org::eclipse::cyclonedds::core::RequestedDeadlineMissedStatusDelegate sd; + sd.ddsc_status(&status); + la->cpp_ref->on_requested_deadline_missed(reader, sd); + la->cpp_ref->release_callback_lock(); } } DDS_FN_EXPORT void callback_on_requested_incompatible_qos (dds_entity_t reader, dds_requested_incompatible_qos_status_t status, void* arg) { - org::eclipse::cyclonedds::core::EntityDelegate *ed = - reinterpret_cast(arg); + org::eclipse::cyclonedds::core::ListenerArg *la = + reinterpret_cast(arg); - if (ed->obtain_callback_lock()) + if (la->reset_on_invoke && la->cpp_ref->obtain_callback_lock()) { - org::eclipse::cyclonedds::core::RequestedIncompatibleQosStatusDelegate sd; - sd.ddsc_status(&status); - ed->on_requested_incompatible_qos(reader, sd); - ed->release_callback_lock(); + org::eclipse::cyclonedds::core::RequestedIncompatibleQosStatusDelegate sd; + sd.ddsc_status(&status); + la->cpp_ref->on_requested_incompatible_qos(reader, sd); + la->cpp_ref->release_callback_lock(); } } DDS_FN_EXPORT void callback_on_sample_rejected (dds_entity_t reader, dds_sample_rejected_status_t status, void* arg) { - org::eclipse::cyclonedds::core::EntityDelegate *ed = - reinterpret_cast(arg); + org::eclipse::cyclonedds::core::ListenerArg *la = + reinterpret_cast(arg); - if (ed->obtain_callback_lock()) + if (la->reset_on_invoke && la->cpp_ref->obtain_callback_lock()) { - org::eclipse::cyclonedds::core::SampleRejectedStatusDelegate sd; - sd.ddsc_status(&status); - ed->on_sample_rejected(reader, sd); - ed->release_callback_lock(); + org::eclipse::cyclonedds::core::SampleRejectedStatusDelegate sd; + sd.ddsc_status(&status); + la->cpp_ref->on_sample_rejected(reader, sd); + la->cpp_ref->release_callback_lock(); } } DDS_FN_EXPORT void callback_on_liveliness_changed (dds_entity_t reader, dds_liveliness_changed_status_t status, void* arg) { - org::eclipse::cyclonedds::core::EntityDelegate *ed = - reinterpret_cast(arg); + org::eclipse::cyclonedds::core::ListenerArg *la = + reinterpret_cast(arg); - if (ed->obtain_callback_lock()) + if (la->reset_on_invoke && la->cpp_ref->obtain_callback_lock()) { - org::eclipse::cyclonedds::core::LivelinessChangedStatusDelegate sd; - sd.ddsc_status(&status); - ed->on_liveliness_changed(reader, sd); - ed->release_callback_lock(); + org::eclipse::cyclonedds::core::LivelinessChangedStatusDelegate sd; + sd.ddsc_status(&status); + la->cpp_ref->on_liveliness_changed(reader, sd); + la->cpp_ref->release_callback_lock(); } } DDS_FN_EXPORT void callback_on_data_available (dds_entity_t reader, void* arg) { - org::eclipse::cyclonedds::core::EntityDelegate *ed = - reinterpret_cast(arg); - if (ed->obtain_callback_lock()) + org::eclipse::cyclonedds::core::ListenerArg *la = + reinterpret_cast(arg); + + if (la->reset_on_invoke && la->cpp_ref->obtain_callback_lock()) { - ed->on_data_available(reader); - ed->release_callback_lock(); + la->cpp_ref->on_data_available(reader); + la->cpp_ref->release_callback_lock(); } } DDS_FN_EXPORT void callback_on_subscription_matched (dds_entity_t reader, dds_subscription_matched_status_t status, void* arg) { - org::eclipse::cyclonedds::core::EntityDelegate *ed = - reinterpret_cast(arg); + org::eclipse::cyclonedds::core::ListenerArg *la = + reinterpret_cast(arg); - if (ed->obtain_callback_lock()) + if (la->reset_on_invoke && la->cpp_ref->obtain_callback_lock()) { - org::eclipse::cyclonedds::core::SubscriptionMatchedStatusDelegate sd; - sd.ddsc_status(&status); - ed->on_subscription_matched(reader, sd); - ed->release_callback_lock(); + org::eclipse::cyclonedds::core::SubscriptionMatchedStatusDelegate sd; + sd.ddsc_status(&status); + la->cpp_ref->on_subscription_matched(reader, sd); + la->cpp_ref->release_callback_lock(); } } DDS_FN_EXPORT void callback_on_sample_lost (dds_entity_t reader, dds_sample_lost_status_t status, void* arg) { - org::eclipse::cyclonedds::core::EntityDelegate *ed = - reinterpret_cast(arg); + org::eclipse::cyclonedds::core::ListenerArg *la = + reinterpret_cast(arg); - if (ed->obtain_callback_lock()) + if (la->reset_on_invoke && la->cpp_ref->obtain_callback_lock()) { - org::eclipse::cyclonedds::core::SampleLostStatusDelegate sd; - sd.ddsc_status(&status); - ed->on_sample_lost(reader, sd); - ed->release_callback_lock(); + org::eclipse::cyclonedds::core::SampleLostStatusDelegate sd; + sd.ddsc_status(&status); + la->cpp_ref->on_sample_lost(reader, sd); + la->cpp_ref->release_callback_lock(); } } // Subscriber callback DDS_FN_EXPORT void callback_on_data_readers (dds_entity_t subscriber, void* arg) { - org::eclipse::cyclonedds::core::EntityDelegate *ed = - reinterpret_cast(arg); - if (ed->obtain_callback_lock()) + org::eclipse::cyclonedds::core::ListenerArg *la = + reinterpret_cast(arg); + + if (la->reset_on_invoke && la->cpp_ref->obtain_callback_lock()) { - ed->on_data_readers(subscriber); - ed->release_callback_lock(); + la->cpp_ref->on_data_readers(subscriber); + la->cpp_ref->release_callback_lock(); } } } diff --git a/src/ddscxx/src/org/eclipse/cyclonedds/core/policy/PolicyDelegate.cpp b/src/ddscxx/src/org/eclipse/cyclonedds/core/policy/PolicyDelegate.cpp index 4b919374..6c9f660a 100644 --- a/src/ddscxx/src/org/eclipse/cyclonedds/core/policy/PolicyDelegate.cpp +++ b/src/ddscxx/src/org/eclipse/cyclonedds/core/policy/PolicyDelegate.cpp @@ -1624,6 +1624,48 @@ void WriterDataLifecycleDelegate::set_c_policy(dds_qos_t* qos) const } +//============================================================================== + +WriterBatchingDelegate::WriterBatchingDelegate(const WriterBatchingDelegate& other) + : batch_updates_(other.batch_updates_) +{ +} + +WriterBatchingDelegate::WriterBatchingDelegate(bool batch_updates): batch_updates_(batch_updates) +{ +} + +bool WriterBatchingDelegate::batch_updates() const +{ + return batch_updates_; +} + +void WriterBatchingDelegate::batch_updates(bool b) +{ + batch_updates_ = b; +} + +bool WriterBatchingDelegate::operator ==(const WriterBatchingDelegate& other) const +{ + return other.batch_updates() == batch_updates_; +} + +void WriterBatchingDelegate::check() const +{ + /* The batch_updates is just a boolean: nothing to check. */ +} + +void WriterBatchingDelegate::set_iso_policy(const dds_qos_t* qos) +{ + (void)dds_qget_writer_batching(qos, &batch_updates_); +} + +void WriterBatchingDelegate::set_c_policy(dds_qos_t* qos) const +{ + dds_qset_writer_batching(qos, batch_updates_); +} + + //============================================================================== #ifdef OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT diff --git a/src/ddscxx/src/org/eclipse/cyclonedds/domain/DomainParticipantDelegate.cpp b/src/ddscxx/src/org/eclipse/cyclonedds/domain/DomainParticipantDelegate.cpp index 35fe1f02..bd751160 100644 --- a/src/ddscxx/src/org/eclipse/cyclonedds/domain/DomainParticipantDelegate.cpp +++ b/src/ddscxx/src/org/eclipse/cyclonedds/domain/DomainParticipantDelegate.cpp @@ -277,7 +277,7 @@ org::eclipse::cyclonedds::domain::DomainParticipantDelegate::close() this->topics.all_close(); /* Stop listener. */ - this->listener_set(NULL, dds::core::status::StatusMask::none()); + this->listener_set(NULL, dds::core::status::StatusMask::none(), true); org::eclipse::cyclonedds::domain::DomainParticipantRegistry::remove(this); @@ -587,7 +587,7 @@ org::eclipse::cyclonedds::domain::DomainParticipantDelegate::listener( const ::dds::core::status::StatusMask& mask) { org::eclipse::cyclonedds::core::ScopedObjectLock scopedLock(*this); - this->listener_set(listener, mask); + this->listener_set(listener, mask, true); scopedLock.unlock(); } diff --git a/src/ddscxx/src/org/eclipse/cyclonedds/pub/AnyDataWriterDelegate.cpp b/src/ddscxx/src/org/eclipse/cyclonedds/pub/AnyDataWriterDelegate.cpp index 7c5b13ab..3219059e 100644 --- a/src/ddscxx/src/org/eclipse/cyclonedds/pub/AnyDataWriterDelegate.cpp +++ b/src/ddscxx/src/org/eclipse/cyclonedds/pub/AnyDataWriterDelegate.cpp @@ -501,7 +501,9 @@ AnyDataWriterDelegate::write_flush() void AnyDataWriterDelegate::set_batch(bool enable) { +DDSRT_WARNING_DEPRECATED_OFF dds_write_set_batch (enable); +DDSRT_WARNING_DEPRECATED_ON } } diff --git a/src/ddscxx/src/org/eclipse/cyclonedds/pub/PublisherDelegate.cpp b/src/ddscxx/src/org/eclipse/cyclonedds/pub/PublisherDelegate.cpp index 0dacdd39..7ad30dcc 100644 --- a/src/ddscxx/src/org/eclipse/cyclonedds/pub/PublisherDelegate.cpp +++ b/src/ddscxx/src/org/eclipse/cyclonedds/pub/PublisherDelegate.cpp @@ -101,7 +101,7 @@ PublisherDelegate::close() this->writers.all_close(); /* Stop listener. */ - this->listener_set(NULL, dds::core::status::StatusMask::none()); + this->listener_set(NULL, dds::core::status::StatusMask::none(), true); /* Unregister Publisher from Participant. */ this->dp_.delegate()->remove_publisher(*this); @@ -190,7 +190,7 @@ PublisherDelegate::listener(dds::pub::PublisherListener* listener, const ::dds::core::status::StatusMask& mask) { org::eclipse::cyclonedds::core::ScopedObjectLock scopedLock(*this); - this->listener_set(listener, mask); + this->listener_set(listener, mask, true); scopedLock.unlock(); } diff --git a/src/ddscxx/src/org/eclipse/cyclonedds/pub/qos/DataWriterQosDelegate.cpp b/src/ddscxx/src/org/eclipse/cyclonedds/pub/qos/DataWriterQosDelegate.cpp index da2d258f..387b2709 100644 --- a/src/ddscxx/src/org/eclipse/cyclonedds/pub/qos/DataWriterQosDelegate.cpp +++ b/src/ddscxx/src/org/eclipse/cyclonedds/pub/qos/DataWriterQosDelegate.cpp @@ -160,6 +160,13 @@ DataWriterQosDelegate::policy(const dds::core::policy::TypeConsistencyEnforcemen } #endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT +void +DataWriterQosDelegate::policy(const dds::core::policy::WriterBatching& writerbatching) +{ + writerbatching.delegate().check(); + writerbatching_ = writerbatching; +} + dds_qos_t* DataWriterQosDelegate::ddsc_qos() const { @@ -184,6 +191,7 @@ DataWriterQosDelegate::ddsc_qos() const datarepresentation_.delegate().set_c_policy(qos); typeconsistencyenforcement_.delegate().set_c_policy(qos); #endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT + writerbatching_.delegate().set_c_policy(qos); return qos; } @@ -211,6 +219,7 @@ DataWriterQosDelegate::ddsc_qos(const dds_qos_t* qos) datarepresentation_.delegate().set_iso_policy(qos); typeconsistencyenforcement_.delegate().set_iso_policy(qos); #endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT + writerbatching_.delegate().set_iso_policy(qos); } void @@ -242,6 +251,7 @@ DataWriterQosDelegate::named_qos(const struct _DDS_NamedDataWriterQos &qos) datarepresentation_.delegate().v_policy((v_writerDataRepresentationPolicy&)(q->writer_datarepresentation)); typeconsistencyenforcement_.delegate().v_policy((v_writerTypeConsistencyEnforcementPolicy&)(q->writer_typeconsistencyenforcement)); #endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT + writerbatching_.delegate().v_policy((v_writerbatchingPolicy&)(q->writer_batching) ); #endif } @@ -276,6 +286,7 @@ DataWriterQosDelegate::operator ==(const DataWriterQosDelegate& other) const && other.datarepresentation_ == datarepresentation_ && other.typeconsistencyenforcement_ == typeconsistencyenforcement_ #endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT + && other.writerbatching_ == writerbatching_ ; } diff --git a/src/ddscxx/src/org/eclipse/cyclonedds/sub/SubscriberDelegate.cpp b/src/ddscxx/src/org/eclipse/cyclonedds/sub/SubscriberDelegate.cpp index e418b800..c8ce4bf8 100644 --- a/src/ddscxx/src/org/eclipse/cyclonedds/sub/SubscriberDelegate.cpp +++ b/src/ddscxx/src/org/eclipse/cyclonedds/sub/SubscriberDelegate.cpp @@ -100,7 +100,7 @@ SubscriberDelegate::close() this->readers.all_close(); /* Stop listener. */ - this->listener_set(NULL, dds::core::status::StatusMask::none()); + this->listener_set(NULL, dds::core::status::StatusMask::none(), true); /* Unregister Subscriber from Participant. */ this->dp_.delegate()->remove_subscriber(*this); @@ -175,7 +175,7 @@ SubscriberDelegate::listener(dds::sub::SubscriberListener* listener, const ::dds::core::status::StatusMask& mask) { org::eclipse::cyclonedds::core::ScopedObjectLock scopedLock(*this); - this->listener_set(listener, mask); + this->listener_set(listener, mask, true); scopedLock.unlock(); } diff --git a/src/ddscxx/tests/Listener.cpp b/src/ddscxx/tests/Listener.cpp index 1cb9473b..d912982f 100644 --- a/src/ddscxx/tests/Listener.cpp +++ b/src/ddscxx/tests/Listener.cpp @@ -1016,7 +1016,7 @@ TEST_F(Listener, liveliness_changed) ASSERT_EQ(status.last_publication_handle(), instance_handle); } -TEST_F(Listener, DISABLED_incompatible_qos) +TEST_F(Listener, incompatible_qos) { DataReaderListener readerListener; DataWriterListener writerListener; @@ -1422,7 +1422,7 @@ TEST_F(Listener, data_available_participant) // TODO: // this test does not work because listener on publisher is triggered when writer is created // but before ddsc handle for writer is stored in the writer entity. -TEST_F(Listener, DISABLED_propagation) +TEST_F(Listener, propagation) { DomainParticipantListener participantListener; SubscriberListener subscriberListener; diff --git a/src/ddscxx/tests/Qos.cpp b/src/ddscxx/tests/Qos.cpp index 3ac1c47d..0ac7e7c2 100644 --- a/src/ddscxx/tests/Qos.cpp +++ b/src/ddscxx/tests/Qos.cpp @@ -71,6 +71,7 @@ DataRepresentation nonDefaultRepresentation({dds::core::policy::DataRepresen dds::core::policy::DataRepresentationId::XCDR2}); TypeConsistencyEnforcement nonDefaultTypeConsistencyEnforcement(dds::core::policy::TypeConsistencyKind::ALLOW_TYPE_COERCION, true, true, true, true, true); #endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT +WriterBatching nonDefaultWriterBatching(true); @@ -107,6 +108,7 @@ ReaderDataLifecycle tmpRdLifecycle; DataRepresentation tmpRepresentation; TypeConsistencyEnforcement tmpEnforcement; #endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT +WriterBatching tmpWriterBatching; TEST(Qos, DomainParticipant) { @@ -354,6 +356,7 @@ TEST(Qos, DataWriter) << nonDefaultRepresentation << nonDefaultTypeConsistencyEnforcement #endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT + << nonDefaultWriterBatching ; DataWriterQos dwQosWConstructed(dwQosShifted); DataWriterQos dwQosWAssigned1 = dwQosShifted; /* Actually calls copy constructor. */ @@ -363,6 +366,9 @@ TEST(Qos, DataWriter) DataWriterQos dwQosTAssigned2; dwQosWAssigned2 = dwQosShifted; dwQosTAssigned2 = tQosShifted; + dwQosTConstructed << nonDefaultWriterBatching; /* Necessary, since the TopicQos does not have the WriterBatching QoSPolicy. */ + dwQosTAssigned1 << nonDefaultWriterBatching; /* Necessary, since the TopicQos does not have the WriterBatching QoSPolicy. */ + dwQosTAssigned2 << nonDefaultWriterBatching; /* Necessary, since the TopicQos does not have the WriterBatching QoSPolicy. */ /* Compare the QoSses. */ ASSERT_NE(dwQosDefault, dwQosWConstructed); @@ -395,6 +401,7 @@ TEST(Qos, DataWriter) dwQosShifted >> tmpRepresentation; dwQosShifted >> tmpEnforcement; #endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT + dwQosShifted >> tmpWriterBatching; ASSERT_EQ(nonDefaultUserData, tmpUserData); ASSERT_EQ(nonDefaultDurability, tmpDurability); ASSERT_EQ(nonDefaultDeadline, tmpDeadline); @@ -412,6 +419,7 @@ TEST(Qos, DataWriter) ASSERT_EQ(nonDefaultRepresentation, tmpRepresentation); ASSERT_EQ(nonDefaultTypeConsistencyEnforcement, tmpEnforcement); #endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT + ASSERT_EQ(nonDefaultWriterBatching, tmpWriterBatching); ASSERT_EQ(nonDefaultUserData, dwQosWConstructed.policy()); ASSERT_EQ(nonDefaultDurability, dwQosWConstructed.policy()); @@ -430,6 +438,7 @@ TEST(Qos, DataWriter) ASSERT_EQ(nonDefaultRepresentation, dwQosWConstructed.policy()); ASSERT_EQ(nonDefaultTypeConsistencyEnforcement, dwQosWConstructed.policy()); #endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT + ASSERT_EQ(nonDefaultWriterBatching, dwQosWConstructed.policy()); #ifdef OMG_DDS_OWNERSHIP_SUPPORT dwQosShifted >> tmpStrength; @@ -646,4 +655,5 @@ TEST(Qos, policy_name) ASSERT_EQ(dds::core::policy::policy_name::name(), "DataRepresentation"); ASSERT_EQ(dds::core::policy::policy_name::name(), "TypeConsistencyEnforcement"); #endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT + ASSERT_EQ(dds::core::policy::policy_name::name(), "WriterBatching"); }