diff --git a/.azure-pipelines/build-sairedis-template.yml b/.azure-pipelines/build-sairedis-template.yml index b37d2fa4..65e7c536 100644 --- a/.azure-pipelines/build-sairedis-template.yml +++ b/.azure-pipelines/build-sairedis-template.yml @@ -66,7 +66,6 @@ jobs: set -ex sudo apt-get update sudo apt-get install -qq -y \ - qtbase5-dev \ libdbus-glib-1-dev \ libpcsclite-dev \ docbook-to-man \ @@ -90,7 +89,7 @@ jobs: sudo mkdir -m 755 /var/run/sswsyncd sudo apt-get install -y rsyslog - sudo service rsyslog start + sudo rsyslogd displayName: "Install dependencies" - task: DownloadPipelineArtifact@2 @@ -137,7 +136,8 @@ jobs: displayName: "Compile sonic sairedis" - script: | sudo cp azsyslog.conf /etc/rsyslog.conf - sudo service rsyslog restart + sudo killall rsyslogd + sudo rsyslogd displayName: "Update rsyslog.conf" - ${{ if eq(parameters.run_unit_test, true) }}: - script: | diff --git a/.azure-pipelines/build-template.yml b/.azure-pipelines/build-template.yml index d1813c23..a4b607ef 100644 --- a/.azure-pipelines/build-template.yml +++ b/.azure-pipelines/build-template.yml @@ -98,16 +98,6 @@ jobs: mv ../*.deb . displayName: "Compile sonic swss common with coverage enabled" - ${{ if eq(parameters.run_unit_test, true) }}: - - script: | - set -ex - git clone https://github.com/gcovr/gcovr.git - cd gcovr/ - git checkout 5.2 - sudo pip3 install setuptools - sudo python3 setup.py install - cd .. - sudo rm -rf gcovr - displayName: "Install gcovr 5.2 (for --exclude-throw-branches support)" - script: | set -ex sudo pip install Pympler==0.8 pytest @@ -142,9 +132,9 @@ jobs: set -ex # Install .NET CORE curl -sSL https://packages.microsoft.com/keys/microsoft.asc | sudo apt-key add - - sudo apt-add-repository https://packages.microsoft.com/debian/11/prod + sudo apt-add-repository https://packages.microsoft.com/debian/12/prod sudo apt-get update - sudo apt-get install -y dotnet-sdk-6.0 + sudo apt-get install -y dotnet-sdk-8.0 displayName: "Install .NET CORE" - task: PublishCodeCoverageResults@1 inputs: diff --git a/.azure-pipelines/docker-sonic-vs/Dockerfile b/.azure-pipelines/docker-sonic-vs/Dockerfile index 3d219805..f598ea6c 100644 --- a/.azure-pipelines/docker-sonic-vs/Dockerfile +++ b/.azure-pipelines/docker-sonic-vs/Dockerfile @@ -1,11 +1,31 @@ FROM docker-sonic-vs ARG docker_container_name +ARG need_dbg COPY ["debs", "/debs"] # Remove existing packages first before installing the new/current packages. This is to overcome limitations with # Docker's diff detection mechanism, where only the file size and the modification timestamp (which will remain the # same, even though contents have changed) are checked between the previous and current layer. -RUN dpkg --purge libswsscommon python3-swsscommon sonic-db-cli libsaimetadata libsairedis libsaivs syncd-vs swss sonic-eventd libdashapi -RUN dpkg -i /debs/libdashapi_1.0.0_amd64.deb /debs/libswsscommon_1.0.0_amd64.deb /debs/python3-swsscommon_1.0.0_amd64.deb /debs/sonic-db-cli_1.0.0_amd64.deb /debs/libsaimetadata_1.0.0_amd64.deb /debs/libsairedis_1.0.0_amd64.deb /debs/libsaivs_1.0.0_amd64.deb /debs/syncd-vs_1.0.0_amd64.deb /debs/swss_1.0.0_amd64.deb +RUN dpkg --purge libswsscommon python3-swsscommon sonic-db-cli libsaimetadata libsairedis libsaivs syncd-vs swss sonic-eventd libdashapi + +RUN apt-get update + +RUN apt install -y /debs/libdashapi_1.0.0_amd64.deb \ + /debs/libswsscommon_1.0.0_amd64.deb \ + /debs/python3-swsscommon_1.0.0_amd64.deb \ + /debs/sonic-db-cli_1.0.0_amd64.deb \ + /debs/libsaimetadata_1.0.0_amd64.deb \ + /debs/libsairedis_1.0.0_amd64.deb \ + /debs/libsaivs_1.0.0_amd64.deb \ + /debs/syncd-vs_1.0.0_amd64.deb \ + /debs/swss_1.0.0_amd64.deb + +RUN if [ "$need_dbg" = "y" ] ; then dpkg -i /debs/libswsscommon-dbgsym_1.0.0_amd64.deb ; fi + +COPY ["start.sh", "/usr/bin/"] + +RUN pip3 install scapy==2.5.0 + +RUN apt-get -y install software-properties-common libdatetime-perl libcapture-tiny-perl build-essential libcpanel-json-xs-perl git python3-protobuf diff --git a/.azure-pipelines/docker-sonic-vs/start.sh b/.azure-pipelines/docker-sonic-vs/start.sh new file mode 100755 index 00000000..f7dbde8d --- /dev/null +++ b/.azure-pipelines/docker-sonic-vs/start.sh @@ -0,0 +1,187 @@ +#!/bin/bash -e + +# Generate configuration + +# NOTE: 'PLATFORM' and 'HWSKU' environment variables are set +# in the Dockerfile so that they persist for the life of the container + +ln -sf /usr/share/sonic/device/$PLATFORM /usr/share/sonic/platform +ln -sf /usr/share/sonic/device/$PLATFORM/$HWSKU /usr/share/sonic/hwsku + +SWITCH_TYPE=switch +PLATFORM_CONF=platform.json +if [[ $HWSKU == "DPU-2P" ]]; then + SWITCH_TYPE=dpu + PLATFORM_CONF=platform-dpu-2p.json +fi + +pushd /usr/share/sonic/hwsku + +# filter available front panel ports in lanemap.ini +[ -f lanemap.ini.orig ] || cp lanemap.ini lanemap.ini.orig +for p in $(ip link show | grep -oE "eth[0-9]+" | grep -v eth0); do + grep ^$p: lanemap.ini.orig +done > lanemap.ini + +# filter available sonic front panel ports in port_config.ini +[ -f port_config.ini.orig ] || cp port_config.ini port_config.ini.orig +grep ^# port_config.ini.orig > port_config.ini +for lanes in $(awk -F ':' '{print $2}' lanemap.ini); do + grep -E "\s$lanes\s" port_config.ini.orig +done >> port_config.ini + +popd + +[ -d /etc/sonic ] || mkdir -p /etc/sonic + +# Note: libswsscommon requires a dabase_config file in /var/run/redis/sonic-db/ +# Prepare this file before any dependent application, such as sonic-cfggen +mkdir -p /var/run/redis/sonic-db +cp /etc/default/sonic-db/database_config.json /var/run/redis/sonic-db/ + +SYSTEM_MAC_ADDRESS=$(ip link show eth0 | grep ether | awk '{print $2}') +sonic-cfggen -t /usr/share/sonic/templates/init_cfg.json.j2 -a "{\"system_mac\": \"$SYSTEM_MAC_ADDRESS\", \"switch_type\": \"$SWITCH_TYPE\"}" > /etc/sonic/init_cfg.json + +if [[ -f /usr/share/sonic/virtual_chassis/default_config.json ]]; then + sonic-cfggen -j /etc/sonic/init_cfg.json -j /usr/share/sonic/virtual_chassis/default_config.json --print-data > /tmp/init_cfg.json + mv /tmp/init_cfg.json /etc/sonic/init_cfg.json +fi + +if [ -f /etc/sonic/config_db.json ]; then + sonic-cfggen -j /etc/sonic/init_cfg.json -j /etc/sonic/config_db.json --print-data > /tmp/config_db.json + mv /tmp/config_db.json /etc/sonic/config_db.json +else + # generate and merge buffers configuration into config file + if [ -f /usr/share/sonic/hwsku/buffers.json.j2 ]; then + sonic-cfggen -k $HWSKU -p /usr/share/sonic/device/$PLATFORM/$PLATFORM_CONF -t /usr/share/sonic/hwsku/buffers.json.j2 > /tmp/buffers.json + buffers_cmd="-j /tmp/buffers.json" + fi + if [ -f /usr/share/sonic/hwsku/qos.json.j2 ]; then + sonic-cfggen -j /etc/sonic/init_cfg.json -t /usr/share/sonic/hwsku/qos.json.j2 > /tmp/qos.json + qos_cmd="-j /tmp/qos.json" + fi + + sonic-cfggen -p /usr/share/sonic/device/$PLATFORM/$PLATFORM_CONF -k $HWSKU --print-data > /tmp/ports.json + # change admin_status from up to down; Test cases dependent + sed -i "s/up/down/g" /tmp/ports.json + sonic-cfggen -j /etc/sonic/init_cfg.json $buffers_cmd $qos_cmd -j /tmp/ports.json --print-data > /etc/sonic/config_db.json +fi + +sonic-cfggen -t /usr/share/sonic/templates/copp_cfg.j2 > /etc/sonic/copp_cfg.json + +if [ "$HWSKU" == "Mellanox-SN2700" ]; then + cp /usr/share/sonic/hwsku/sai_mlnx.profile /usr/share/sonic/hwsku/sai.profile +elif [ "$HWSKU" == "DPU-2P" ]; then + cp /usr/share/sonic/hwsku/sai_dpu_2p.profile /usr/share/sonic/hwsku/sai.profile +fi + +mkdir -p /etc/swss/config.d/ + +rm -f /var/run/rsyslogd.pid + +supervisorctl start rsyslogd + +supervisord_cfg="/etc/supervisor/conf.d/supervisord.conf" +chassisdb_cfg_file="/usr/share/sonic/virtual_chassis/default_config.json" +chassisdb_cfg_file_default="/etc/default/sonic-db/default_chassis_cfg.json" +host_template="/usr/share/sonic/templates/hostname.j2" +db_cfg_file="/var/run/redis/sonic-db/database_config.json" +db_cfg_file_tmp="/var/run/redis/sonic-db/database_config.json.tmp" + +if [ -r "$chassisdb_cfg_file" ]; then + echo $(sonic-cfggen -j $chassisdb_cfg_file -t $host_template) >> /etc/hosts +else + chassisdb_cfg_file="$chassisdb_cfg_file_default" + echo "10.8.1.200 redis_chassis.server" >> /etc/hosts +fi + +supervisorctl start redis-server + +start_chassis_db=`sonic-cfggen -v DEVICE_METADATA.localhost.start_chassis_db -y $chassisdb_cfg_file` +if [[ "$HOSTNAME" == *"supervisor"* ]] || [ "$start_chassis_db" == "1" ]; then + supervisorctl start redis-chassis +fi + +conn_chassis_db=`sonic-cfggen -v DEVICE_METADATA.localhost.connect_to_chassis_db -y $chassisdb_cfg_file` +if [ "$start_chassis_db" != "1" ] && [ "$conn_chassis_db" != "1" ]; then + cp $db_cfg_file $db_cfg_file_tmp + update_chassisdb_config -j $db_cfg_file_tmp -d + cp $db_cfg_file_tmp $db_cfg_file +fi + +if [ "$conn_chassis_db" == "1" ]; then + if [ -f /usr/share/sonic/virtual_chassis/coreportindexmap.ini ]; then + cp /usr/share/sonic/virtual_chassis/coreportindexmap.ini /usr/share/sonic/hwsku/ + + pushd /usr/share/sonic/hwsku + + # filter available front panel ports in coreportindexmap.ini + [ -f coreportindexmap.ini.orig ] || cp coreportindexmap.ini coreportindexmap.ini.orig + for p in $(ip link show | grep -oE "eth[0-9]+" | grep -v eth0); do + grep ^$p: coreportindexmap.ini.orig + done > coreportindexmap.ini + + popd + fi +fi + +/usr/bin/configdb-load.sh + +if [ "$HWSKU" = "brcm_gearbox_vs" ]; then + supervisorctl start gbsyncd + supervisorctl start gearsyncd +fi + +supervisorctl start syncd + +supervisorctl start portsyncd + +supervisorctl start orchagent + +supervisorctl start coppmgrd + +supervisorctl start neighsyncd + +supervisorctl start fdbsyncd + +supervisorctl start teamsyncd + +supervisorctl start fpmsyncd + +supervisorctl start teammgrd + +supervisorctl start vrfmgrd + +supervisorctl start portmgrd + +supervisorctl start intfmgrd + +supervisorctl start vlanmgrd + +supervisorctl start zebra + +supervisorctl start mgmtd + +supervisorctl start staticd + +supervisorctl start buffermgrd + +supervisorctl start nbrmgrd + +supervisorctl start vxlanmgrd + +supervisorctl start sflowmgrd + +supervisorctl start natmgrd + +supervisorctl start natsyncd + +supervisorctl start tunnelmgrd + +supervisorctl start fabricmgrd + +# Start arp_update when VLAN exists +VLAN=`sonic-cfggen -d -v 'VLAN.keys() | join(" ") if VLAN'` +if [ "$VLAN" != "" ]; then + supervisorctl start arp_update +fi diff --git a/.azure-pipelines/test-docker-sonic-vs-template.yml b/.azure-pipelines/test-docker-sonic-vs-template.yml index 81af9bd8..0a2a7018 100644 --- a/.azure-pipelines/test-docker-sonic-vs-template.yml +++ b/.azure-pipelines/test-docker-sonic-vs-template.yml @@ -15,7 +15,7 @@ jobs: displayName: vstest timeoutInMinutes: ${{ parameters.timeout }} - pool: sonic-common + pool: sonictest steps: - checkout: self @@ -52,11 +52,10 @@ jobs: - script: | set -ex - ls -l sudo sonic-swss-common/.azure-pipelines/build_and_install_module.sh sudo apt-get install -y libhiredis0.14 libyang0.16 - sudo dpkg -i $(Build.ArtifactStagingDirectory)/download/libprotobuf*_amd64.deb $(Build.ArtifactStagingDirectory)/download/libprotobuf-lite*_amd64.deb $(Build.ArtifactStagingDirectory)/download/python3-protobuf*_amd64.deb + sudo dpkg -i $(Build.ArtifactStagingDirectory)/download/libprotobuf*_amd64.deb $(Build.ArtifactStagingDirectory)/download/libprotobuf-lite*_amd64.deb $(Build.ArtifactStagingDirectory)/download/python3-protobuf*_amd64.deb sudo dpkg -i $(Build.ArtifactStagingDirectory)/download/libdashapi*.deb sudo dpkg -i --force-confask,confnew $(Build.ArtifactStagingDirectory)/download/libswsscommon_1.0.0_amd64.deb || apt-get install -f sudo dpkg -i $(Build.ArtifactStagingDirectory)/download/python3-swsscommon_1.0.0_amd64.deb @@ -72,34 +71,17 @@ jobs: sudo docker load -i $(Build.ArtifactStagingDirectory)/download/docker-sonic-vs.gz docker ps ip netns list + sudo /sbin/ip link add Vrf1 type vrf table 1001 || { echo 'vrf command failed' ; exit 1; } + sudo /sbin/ip link del Vrf1 type vrf table 1001 pushd sonic-swss/tests - # run pytests in sets of 20 - all_tests=$(ls test_*.py) + all_tests=$(ls test_*.py | xargs) all_tests="${all_tests} p4rt dash" - test_set=() - for test in ${all_tests}; do - test_set+=("${test}") - if [ ${#test_set[@]} -ge 20 ]; then - test_name=$(echo "${test_set[0]}" | cut -d "." -f 1) - echo "${test_set[*]}" | xargs sudo py.test -v --force-flaky --junitxml="${test_name}_tr.xml" --keeptb --imgname=docker-sonic-vs:$(Build.DefinitionName).$(Build.BuildNumber) - container_count=$(docker ps -q -a | wc -l) - if [ ${container_count} -gt 0 ]; then - docker stop $(docker ps -q -a) - docker rm $(docker ps -q -a) - fi - test_set=() - fi - done - if [ ${#test_set[@]} -gt 0 ]; then - test_name=$(echo "${test_set[0]}" | cut -d "." -f 1) - echo "${test_set[*]}" | xargs sudo py.test -v --force-flaky --junitxml="${test_name}_tr.xml" --keeptb --imgname=docker-sonic-vs:$(Build.DefinitionName).$(Build.BuildNumber) - container_count=$(docker ps -q -a | wc -l) - if [ ${container_count} -gt 0 ]; then - docker stop $(docker ps -q -a) - docker rm $(docker ps -q -a) - fi - fi + + # Run the tests in parallel and retry + retry=3 + IMAGE_NAME=docker-sonic-vs:$(Build.DefinitionName).$(Build.BuildNumber) + echo $all_tests | xargs -n 1 | xargs -P 8 -I TEST_MODULE sudo ./run-tests.sh "$IMAGE_NAME" "--force-recreate-dvs" "TEST_MODULE" 3 rm -rf $(Build.ArtifactStagingDirectory)/download displayName: "Run vs tests" diff --git a/azure-pipelines.yml b/azure-pipelines.yml index f5f23326..eb974388 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -40,7 +40,7 @@ resources: parameters: - name: debian_version type: string - default: bullseye + default: bookworm variables: - name: BUILD_BRANCH ${{ if eq(variables['Build.Reason'], 'PullRequest') }}: @@ -62,7 +62,7 @@ stages: sudo apt-get update sudo apt-get install -y make libtool m4 autoconf dh-exec debhelper cmake pkg-config nlohmann-json3-dev \ libhiredis-dev libnl-3-dev libnl-genl-3-dev libnl-route-3-dev libnl-nf-3-dev swig3.0 \ - libpython2.7-dev libboost-dev libboost-serialization-dev uuid-dev libzmq5 libzmq3-dev + libpython2.7-dev libboost-dev libboost-serialization-dev uuid-dev libzmq3-dev sudo apt-get install -y sudo sudo apt-get install -y redis-server redis-tools sudo apt-get install -y python3-pip @@ -88,11 +88,47 @@ stages: artifact: sonic-swss-common.amd64.ubuntu20_04 displayName: "Archive swss common debian packages" + - job: + displayName: "amd64/ubuntu-22.04" + pool: + vmImage: 'ubuntu-22.04' + + steps: + - script: | + sudo apt-get update + sudo apt-get install -y make libtool m4 autoconf dh-exec debhelper cmake pkg-config nlohmann-json3-dev \ + libhiredis-dev libnl-3-dev libnl-genl-3-dev libnl-route-3-dev libnl-nf-3-dev swig4.0 \ + libpython3-dev libboost-dev libboost-serialization-dev uuid-dev libzmq3-dev + sudo apt-get install -y sudo + sudo apt-get install -y redis-server redis-tools + sudo apt-get install -y python3-pip + sudo pip3 install pytest + sudo apt-get install -y python + sudo apt-get install cmake libgtest-dev libgmock-dev libyang-dev + cd /usr/src/gtest && sudo cmake . && sudo make + ARCH=$(dpkg --print-architecture) + set -x + sudo curl -fsSL -o /usr/local/bin/bazel \ + https://github.com/bazelbuild/bazelisk/releases/latest/download/bazelisk-linux-${ARCH} + sudo chmod 755 /usr/local/bin/bazel + displayName: "Install dependencies" + - script: | + ./autogen.sh + dpkg-buildpackage -rfakeroot -us -uc -Pnopython2 -b -j$(nproc) && cp ../*.deb . + displayName: "Compile sonic swss common" + - script: | + bazel build //... + bazel test //... + displayName: "Compile and test all Bazel targets" + - publish: $(System.DefaultWorkingDirectory)/ + artifact: sonic-swss-common.amd64.ubuntu22_04 + displayName: "Archive swss common debian packages" + - template: .azure-pipelines/build-template.yml parameters: arch: amd64 sonic_slave: sonic-slave-${{ parameters.debian_version }}:$(BUILD_BRANCH) - artifact_name: sonic-swss-common + artifact_name: sonic-swss-common-${{ parameters.debian_version }} run_unit_test: true archive_gcov: true debian_version: ${{ parameters.debian_version }} @@ -107,7 +143,7 @@ stages: timeout: 180 pool: sonicbld-armhf sonic_slave: sonic-slave-${{ parameters.debian_version }}-armhf:$(BUILD_BRANCH) - artifact_name: sonic-swss-common.armhf + artifact_name: sonic-swss-common-${{ parameters.debian_version }}.armhf debian_version: ${{ parameters.debian_version }} - template: .azure-pipelines/build-template.yml @@ -116,36 +152,7 @@ stages: timeout: 180 pool: sonicbld-arm64 sonic_slave: sonic-slave-${{ parameters.debian_version }}-arm64:$(BUILD_BRANCH) - artifact_name: sonic-swss-common.arm64 - debian_version: ${{ parameters.debian_version }} - -- stage: BuildBookworm - dependsOn: BuildArm - condition: succeeded('BuildArm') - jobs: - - template: .azure-pipelines/build-template.yml - parameters: - arch: amd64 - sonic_slave: sonic-slave-bookworm:$(BUILD_BRANCH) - artifact_name: sonic-swss-common-bookworm - debian_version: ${{ parameters.debian_version }} - - - template: .azure-pipelines/build-template.yml - parameters: - arch: armhf - timeout: 180 - pool: sonicbld-armhf - sonic_slave: sonic-slave-bookworm-armhf:$(BUILD_BRANCH) - artifact_name: sonic-swss-common-bookworm.armhf - debian_version: ${{ parameters.debian_version }} - - - template: .azure-pipelines/build-template.yml - parameters: - arch: arm64 - timeout: 180 - pool: sonicbld-arm64 - sonic_slave: sonic-slave-bookworm-arm64:$(BUILD_BRANCH) - artifact_name: sonic-swss-common-bookworm.arm64 + artifact_name: sonic-swss-common-${{ parameters.debian_version }}.arm64 debian_version: ${{ parameters.debian_version }} - stage: BuildSairedis @@ -156,8 +163,8 @@ stages: parameters: arch: amd64 sonic_slave: sonic-slave-${{ parameters.debian_version }}:$(BUILD_BRANCH) - swss_common_artifact_name: sonic-swss-common - artifact_name: sonic-sairedis + swss_common_artifact_name: sonic-swss-common-${{ parameters.debian_version }} + artifact_name: sonic-sairedis-${{ parameters.debian_version }} syslog_artifact_name: sonic-sairedis.syslog debian_version: ${{ parameters.debian_version }} @@ -169,9 +176,9 @@ stages: parameters: arch: amd64 sonic_slave: sonic-slave-${{ parameters.debian_version }}:$(BUILD_BRANCH) - swss_common_artifact_name: sonic-swss-common - sairedis_artifact_name: sonic-sairedis - artifact_name: sonic-swss + swss_common_artifact_name: sonic-swss-common-${{ parameters.debian_version }} + sairedis_artifact_name: sonic-sairedis-${{ parameters.debian_version }} + artifact_name: sonic-swss-${{ parameters.debian_version }} debian_version: ${{ parameters.debian_version }} - stage: BuildDocker @@ -180,9 +187,9 @@ stages: jobs: - template: .azure-pipelines/build-docker-sonic-vs-template.yml parameters: - swss_common_artifact_name: sonic-swss-common - sairedis_artifact_name: sonic-sairedis - swss_artifact_name: sonic-swss + swss_common_artifact_name: sonic-swss-common-${{ parameters.debian_version }} + sairedis_artifact_name: sonic-sairedis-${{ parameters.debian_version }} + swss_artifact_name: sonic-swss-${{ parameters.debian_version }} artifact_name: docker-sonic-vs - stage: Test diff --git a/common/redispipeline.h b/common/redispipeline.h index be7561b6..96f97ab8 100644 --- a/common/redispipeline.h +++ b/common/redispipeline.h @@ -164,7 +164,7 @@ class RedisPipeline { return; m_channels.insert(channel); - m_luaPub += "redis.call('PUBLISH', '" + channel + "', 'G');"; + m_luaPub += "redis.call('PUBLISH', '" + channel + "', 'G')\n"; m_shaPub = loadRedisScript(m_luaPub); } diff --git a/common/schema.h b/common/schema.h index e99f2ad8..461ea4d8 100644 --- a/common/schema.h +++ b/common/schema.h @@ -475,11 +475,12 @@ namespace swss { #define CFG_SUPPRESS_ASIC_SDK_HEALTH_EVENT_NAME "SUPPRESS_ASIC_SDK_HEALTH_EVENT" +#define CFG_MEMORY_STATISTICS_TABLE_NAME "MEMORY_STATISTICS" + #define CFG_PAC_PORT_CONFIG_TABLE "PAC_PORT_CONFIG_TABLE" #define CFG_PAC_GLOBAL_CONFIG_TABLE "PAC_GLOBAL_CONFIG_TABLE" #define CFG_PAC_HOSTAPD_GLOBAL_CONFIG_TABLE "HOSTAPD_GLOBAL_CONFIG_TABLE" - /***** STATE DATABASE *****/ #define STATE_SWITCH_CAPABILITY_TABLE_NAME "SWITCH_CAPABILITY" diff --git a/common/zmqclient.cpp b/common/zmqclient.cpp index 8d61d1cd..6dff045e 100644 --- a/common/zmqclient.cpp +++ b/common/zmqclient.cpp @@ -18,6 +18,7 @@ namespace swss { ZmqClient::ZmqClient(const std::string& endpoint) :ZmqClient(endpoint, "") { +// initialize(endpoint); } ZmqClient::ZmqClient(const std::string& endpoint, const std::string& vrf) @@ -25,6 +26,13 @@ ZmqClient::ZmqClient(const std::string& endpoint, const std::string& vrf) initialize(endpoint, vrf); } +ZmqClient::ZmqClient(const std::string& endpoint, uint32_t waitTimeMs) : + m_waitTimeMs(waitTimeMs) +{ +// m_waitTimeMs = waitTimeMs; + initialize(endpoint); +} + ZmqClient::~ZmqClient() { std::lock_guard lock(m_socketMutex); @@ -55,6 +63,17 @@ void ZmqClient::initialize(const std::string& endpoint, const std::string& vrf) connect(); } + +void ZmqClient::initialize(const std::string& endpoint) +{ + m_connected = false; + m_endpoint = endpoint; + m_context = nullptr; + m_socket = nullptr; + m_sendbuffer.resize(MQ_RESPONSE_MAX_COUNT); + + connect(); +} bool ZmqClient::isConnected() { @@ -63,6 +82,7 @@ bool ZmqClient::isConnected() void ZmqClient::connect() { +SWSS_LOG_ERROR("DIV:: Inside function client connect"); if (m_connected) { SWSS_LOG_DEBUG("Already connected to endpoint: %s", m_endpoint.c_str()); @@ -88,6 +108,7 @@ void ZmqClient::connect() m_context = zmq_ctx_new(); m_socket = zmq_socket(m_context, ZMQ_PUSH); + SWSS_LOG_DEBUG("m_socket in client connect() is: %p\n", m_socket); // timeout all pending send package, so zmq will not block in dtor of this class: http://api.zeromq.org/master:zmq-setsockopt int linger = 0; zmq_setsockopt(m_socket, ZMQ_LINGER, &linger, sizeof(linger)); @@ -119,6 +140,7 @@ void ZmqClient::sendMsg( const std::string& tableName, const std::vector& kcos) { +SWSS_LOG_ERROR("DIV:: Inside function client sendMsg"); int serializedlen = (int)BinarySerializer::serializeBuffer( m_sendbuffer.data(), m_sendbuffer.size(), @@ -137,14 +159,14 @@ void ZmqClient::sendMsg( int zmq_err = 0; int retry_delay = 10; int rc = 0; - for (int i = 0; i <= MQ_MAX_RETRY; ++i) + for (int i = 0; i <= MQ_MAX_RETRY; ++i) { { // ZMQ socket is not thread safe: http://api.zeromq.org/2-1:zmq std::lock_guard lock(m_socketMutex); // Use none block mode to use all bandwidth: http://api.zeromq.org/2-1%3Azmq-send - rc = zmq_send(m_socket, m_sendbuffer.data(), serializedlen, ZMQ_NOBLOCK); + rc = zmq_send(m_socket, m_sendbuffer.data(), serializedlen, ZMQ_NOBLOCK); } if (rc >= 0) @@ -164,7 +186,7 @@ void ZmqClient::sendMsg( // For example when ZMQ socket still not receive reply message from last sended package. // There was state machine inside ZMQ socket, when the socket is not in ready to send state, this error will happen. // for more detail, please check: http://api.zeromq.org/2-1:zmq-send - SWSS_LOG_DEBUG("zmq send retry, endpoint: %s, error: %d", m_endpoint.c_str(), zmq_err); + SWSS_LOG_WARN("zmq send retry, endpoint: %s, error: %d", m_endpoint.c_str(), zmq_err); retry_delay = 0; } @@ -183,7 +205,7 @@ void ZmqClient::sendMsg( else { // for other error, send failed immediately. - auto message = "zmq send failed, endpoint: " + m_endpoint + ", error: " + to_string(rc); + auto message = "cli: zmq send failed, endpoint: " + m_endpoint + ", error: " + to_string(rc); SWSS_LOG_ERROR("%s", message.c_str()); throw system_error(make_error_code(errc::io_error), message); } @@ -192,68 +214,80 @@ void ZmqClient::sendMsg( } // failed after retry - auto message = "zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(serializedlen); + auto message = "cli: zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(serializedlen); SWSS_LOG_ERROR("%s", message.c_str()); throw system_error(make_error_code(errc::io_error), message); } bool ZmqClient::wait(std::string& dbName, - std::string& tableName, - std::vector>& kcos) { - +SWSS_LOG_ERROR("DIV:: Inside function wait"); SWSS_LOG_ENTER(); - int rc; + return false; - for (int i = 0; true ; ++i) + zmq_pollitem_t items [1] = { }; + items[0].socket = m_socket; + items[0].events = ZMQ_POLLIN; - { +/* zmq_pollitem_t poll_item; + poll_item.fd = 0; + poll_item.socket = m_socket; + poll_item.events = ZMQ_POLLIN; + poll_item.revents = 0;*/ - rc = zmq_recv(m_socket, m_sendbuffer.data(), m_sendbuffer.size(), 0); + int rc; + for (int i = 0; true; ++i) + { +// rc = zmq_poll(&poll_item, 1, 1000); + rc = zmq_poll(items, 1, 1000); + SWSS_LOG_DEBUG("cli: rc value is : %d", rc); + if (rc == 0) + { + SWSS_LOG_ERROR("zmq_poll timed out: zmqclient wait"); + return false; +// continue; + } + if (rc > 0) + { + break; + } + if (zmq_errno() == EINTR && i <= MQ_MAX_RETRY) + { + SWSS_LOG_DEBUG("Checking the 2nd if condition in zmq poll"); + continue; + } + SWSS_LOG_ERROR("zmqclient wait : zmq_poll failed, zmqerrno: %d", zmq_errno()); + } + for (int i = 0; true; ++i) + { + rc = zmq_recv(m_socket, m_sendbuffer.data(), m_sendbuffer.size(), ZMQ_DONTWAIT); if (rc < 0) - { - if (zmq_errno() == EINTR && i <= MQ_MAX_RETRY) - { - + SWSS_LOG_DEBUG("Checking the 2nd if condition in zmq receive"); continue; - } - - SWSS_LOG_THROW("zmq_recv failed, zmqerrno: %d", zmq_errno()); - + SWSS_LOG_ERROR("zmqclient wait : zmq_recv failed, zmqerrno: %d", zmq_errno()); + return false; } - if (rc >= (int)m_sendbuffer.size()) - { - - SWSS_LOG_THROW( - + SWSS_LOG_ERROR( "zmq_recv message was truncated (over %d bytes, received %d), increase buffer size, message DROPPED", - (int)m_sendbuffer.size(), rc); - +// return false; } - break; - } - m_sendbuffer.at(rc) = 0; // make sure that we end string with zero before parse - kcos.clear(); - BinarySerializer::deserializeBuffer(m_sendbuffer.data(), m_sendbuffer.size(), dbName, tableName, kcos); - return true; - } } diff --git a/common/zmqclient.h b/common/zmqclient.h index 54f10bac..c645a1f7 100644 --- a/common/zmqclient.h +++ b/common/zmqclient.h @@ -15,6 +15,7 @@ class ZmqClient ZmqClient(const std::string& endpoint); ZmqClient(const std::string& endpoint, const std::string& vrf); + ZmqClient(const std::string& endpoint, uint32_t waitTimeMs); ~ZmqClient(); bool isConnected(); @@ -26,13 +27,12 @@ class ZmqClient const std::vector& kcos); bool wait(std::string& dbName, - std::string& tableName, - std::vector>& kcos); private: void initialize(const std::string& endpoint, const std::string& vrf); + void initialize(const std::string& endpoint); std::string m_endpoint; @@ -44,9 +44,11 @@ class ZmqClient bool m_connected; + uint32_t m_waitTimeMs; + std::mutex m_socketMutex; - - std::vector m_sendbuffer; + + std::vector m_sendbuffer; }; } diff --git a/common/zmqproducerstatetable.cpp b/common/zmqproducerstatetable.cpp index bc32a765..8904f92b 100644 --- a/common/zmqproducerstatetable.cpp +++ b/common/zmqproducerstatetable.cpp @@ -148,6 +148,7 @@ void ZmqProducerStateTable::del(const std::vector &keys) void ZmqProducerStateTable::send(const std::vector &kcos) { +SWSS_LOG_DEBUG("DIV:: Inside function ZmqProducerStateTable::send"); m_zmqClient.sendMsg( m_dbName, m_tableNameStr, @@ -171,7 +172,7 @@ bool ZmqProducerStateTable::wait(std::string& dbName, std::vector>& kcos) { - +SWSS_LOG_DEBUG("DIV:: Inside function ZmqProducerStateTable::wait"); return m_zmqClient.wait(dbName, tableName, kcos); } diff --git a/common/zmqserver.cpp b/common/zmqserver.cpp index ab7cc43e..d46a85c4 100644 --- a/common/zmqserver.cpp +++ b/common/zmqserver.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -16,11 +17,24 @@ ZmqServer::ZmqServer(const std::string& endpoint) { } +ZmqServer::ZmqServer(const std::string& endpoint, bool zmr_test) +// : ZmqServer(endpoint, "") +: m_endpoint(endpoint), +m_allowZmqPoll(true) +{ + connect(); + m_buffer.resize(MQ_RESPONSE_MAX_COUNT); + m_mqPollThread = std::make_shared(&ZmqServer::mqPollThread, this); + m_runThread = true; + + SWSS_LOG_DEBUG("DIV: ZmqServer ctor endpoint: %s", endpoint.c_str()); +} + ZmqServer::ZmqServer(const std::string& endpoint, const std::string& vrf) : m_endpoint(endpoint), - m_vrf(vrf), - m_allowZmqPoll(true) + m_vrf(vrf) { + connect(); m_buffer.resize(MQ_RESPONSE_MAX_COUNT); m_runThread = true; m_mqPollThread = std::make_shared(&ZmqServer::mqPollThread, this); @@ -33,6 +47,35 @@ ZmqServer::~ZmqServer() m_allowZmqPoll = true; m_runThread = false; m_mqPollThread->join(); + + zmq_close(m_socket); + zmq_ctx_destroy(m_context); +} + +void ZmqServer::connect() +{ +SWSS_LOG_ERROR("DIV:: Inside function server connect"); + SWSS_LOG_ENTER(); + m_context = zmq_ctx_new(); + m_socket = zmq_socket(m_context, ZMQ_PULL); + + SWSS_LOG_DEBUG("m_socket in server connect() is: %p\n", m_socket); + // Increase recv buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt + int high_watermark = MQ_WATERMARK; + zmq_setsockopt(m_socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark)); + + if (!m_vrf.empty()) + { + zmq_setsockopt(m_socket, ZMQ_BINDTODEVICE, m_vrf.c_str(), m_vrf.length()); + } + + int rc = zmq_bind(m_socket, m_endpoint.c_str()); + if (rc != 0) + { + SWSS_LOG_THROW("zmq_bind failed on endpoint: %s, zmqerrno: %d", + m_endpoint.c_str(), + zmq_errno()); + } } void ZmqServer::registerMessageHandler( @@ -40,6 +83,7 @@ void ZmqServer::registerMessageHandler( const std::string tableName, ZmqMessageHandler* handler) { +SWSS_LOG_ERROR("DIV:: Inside function registerMessageHandler"); auto dbResult = m_HandlerMap.insert(pair>(dbName, map())); if (dbResult.second) { SWSS_LOG_DEBUG("ZmqServer add handler mapping for db: %s", dbName.c_str()); @@ -55,6 +99,7 @@ ZmqMessageHandler* ZmqServer::findMessageHandler( const std::string dbName, const std::string tableName) { +SWSS_LOG_ERROR("DIV:: Inside function findMessageHandler"); auto dbMappingIter = m_HandlerMap.find(dbName); if (dbMappingIter == m_HandlerMap.end()) { SWSS_LOG_DEBUG("ZmqServer can't find any handler for db: %s", dbName.c_str()); @@ -72,6 +117,7 @@ ZmqMessageHandler* ZmqServer::findMessageHandler( void ZmqServer::handleReceivedData(const char* buffer, const size_t size) { +SWSS_LOG_ERROR("DIV:: Inside function handleReceivedData"); std::string dbName; std::string tableName; std::vector> kcos; @@ -89,13 +135,20 @@ void ZmqServer::handleReceivedData(const char* buffer, const size_t size) void ZmqServer::mqPollThread() { +SWSS_LOG_ERROR("DIV:: Inside function mqPollThread"); SWSS_LOG_ENTER(); SWSS_LOG_NOTICE("mqPollThread begin"); - // Producer/Consumer state table are n:1 mapping, so need use PUSH/PULL pattern http://api.zeromq.org/master:zmq-socket - void* context = zmq_ctx_new();; +/* // Producer/Consumer state table are n:1 mapping, so need use PUSH/PULL pattern http://api.zeromq.org/master:zmq-socket + void* context = zmq_ctx_new(); void* socket = zmq_socket(context, ZMQ_PULL); +//divya + + int ret_code = zmq_recv(socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT); + SWSS_LOG_DEBUG("mqPollThread:: ret_code value is : %d", ret_code); +//divya + // Increase recv buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt int high_watermark = MQ_WATERMARK; zmq_setsockopt(socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark)); @@ -106,28 +159,31 @@ void ZmqServer::mqPollThread() } int rc = zmq_bind(socket, m_endpoint.c_str()); + SWSS_LOG_DEBUG("115: mqPollThread:: rc value is : %d", rc); if (rc != 0) { SWSS_LOG_THROW("zmq_bind failed on endpoint: %s, zmqerrno: %d, message: %s", m_endpoint.c_str(), zmq_errno(), strerror(zmq_errno())); - } + }*/ // zmq_poll will use less CPU zmq_pollitem_t poll_item; poll_item.fd = 0; - poll_item.socket = socket; + poll_item.socket = m_socket; poll_item.events = ZMQ_POLLIN; poll_item.revents = 0; SWSS_LOG_NOTICE("bind to zmq endpoint: %s", m_endpoint.c_str()); + SWSS_LOG_DEBUG("m_runThread: %d", m_runThread); while (m_runThread) { m_allowZmqPoll = false; // receive message - rc = zmq_poll(&poll_item, 1, 1000); + auto rc = zmq_poll(&poll_item, 1, 1000); + SWSS_LOG_DEBUG("ZmqServer::mqPollThread: zmq poll: rc value is : %d", rc); if (rc == 0 || !(poll_item.revents & ZMQ_POLLIN)) { // timeout or other event @@ -136,7 +192,10 @@ void ZmqServer::mqPollThread() } // receive message - rc = zmq_recv(socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT); + SWSS_LOG_DEBUG("m_socket in mqPollThread() server is: %p\n", m_socket); + + rc = zmq_recv(m_socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT); + SWSS_LOG_DEBUG("ZmqServer::mqPollThread: zmq recv rc value is : %d", rc); if (rc < 0) { int zmq_err = zmq_errno(); @@ -165,10 +224,88 @@ void ZmqServer::mqPollThread() handleReceivedData(m_buffer.data(), rc); } - zmq_close(socket); - zmq_ctx_destroy(context); +// zmq_close(socket); +// zmq_ctx_destroy(context); SWSS_LOG_NOTICE("mqPollThread end"); } +void ZmqServer::sendMsg(const std::string& dbName, const std::string& tableName, + const std::vector& values) +{ + + return; +SWSS_LOG_ERROR("DIV:: Inside function server sendMsg"); + int serializedlen = (int)BinarySerializer::serializeBuffer( + m_buffer.data(), + m_buffer.size(), + dbName, + tableName, + values); + SWSS_LOG_DEBUG("sending: %d", serializedlen); + int zmq_err = 0; + int retry_delay = 10; + int rc = 0; + for (int i = 0; i <= MQ_MAX_RETRY; ++i) + { + SWSS_LOG_DEBUG("1. m_socket in server sendmsg() is: %p\n", m_socket); + rc = zmq_send(m_socket, m_buffer.data(), serializedlen, 0); + SWSS_LOG_DEBUG("ser: rc value is : %d", rc); + if (rc >= 0) + { + m_allowZmqPoll = true; + SWSS_LOG_DEBUG("zmq sent %d bytes", serializedlen); + return; + } + zmq_err = zmq_errno(); + // sleep (2 ^ retry time) * 10 ms + retry_delay *= 2; + SWSS_LOG_DEBUG("2. m_socket in server sendmsg() is: %p\n", m_socket); + SWSS_LOG_DEBUG("zmq_err is : %d", zmq_err); + + if (zmq_err == EINTR + || zmq_err == EFSM) + { + // EINTR: interrupted by signal + // EFSM: socket state not ready + // For example when ZMQ socket still not receive reply message from last sended package. + // There was state machine inside ZMQ socket, when the socket is not in ready to send state, this + // error will happen. + // for more detail, please check: http://api.zeromq.org/2-1:zmq-send + SWSS_LOG_DEBUG("zmq send retry, endpoint: %s, error: %d", m_endpoint.c_str(), zmq_err); + + retry_delay = 0; + } + else if (zmq_err == EAGAIN) + { + // EAGAIN: ZMQ is full to need try again + SWSS_LOG_WARN("zmq is full, will retry in %d ms, endpoint: %s, error: %d", retry_delay, m_endpoint.c_str(), zmq_err); + } + else if (zmq_err == ETERM) + { + auto message = "zmq connection break, endpoint: " + m_endpoint + ", error: " + to_string(rc); + SWSS_LOG_ERROR("%s", message.c_str()); + throw system_error(make_error_code(errc::connection_reset), message); + } + else + { + // for other error, send failed immediately. + auto message = "zmq send failed, endpoint: " + m_endpoint + ", error: " + to_string(rc); + SWSS_LOG_ERROR("%s", message.c_str()); + SWSS_LOG_DEBUG("3. m_socket in server sendmsg() is: %p\n", m_socket); +// throw system_error(make_error_code(errc::io_error), message); +// SWSS_LOG_THROW("Else case message is: %s", message.c_str()); + return; + } + usleep(retry_delay * 1000); + } + + // failed after retry + auto message = "zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(serializedlen); + SWSS_LOG_ERROR("%s", message.c_str()); +// throw system_error(make_error_code(errc::io_error), message); +// SWSS_LOG_THROW("Last Error message is %s", message.c_str()); +// return; +} + } diff --git a/common/zmqserver.h b/common/zmqserver.h index b8b14c1e..169a8c48 100644 --- a/common/zmqserver.h +++ b/common/zmqserver.h @@ -31,6 +31,7 @@ class ZmqServer static constexpr int DEFAULT_POP_BATCH_SIZE = 128; ZmqServer(const std::string& endpoint); + ZmqServer(const std::string& endpoint, bool zmr_test); ZmqServer(const std::string& endpoint, const std::string& vrf); ~ZmqServer(); @@ -39,8 +40,13 @@ class ZmqServer const std::string tableName, ZmqMessageHandler* handler); + void sendMsg(const std::string& dbName, const std::string& tableName, + const std::vector& values); + private: + void connect(); + void handleReceivedData(const char* buffer, const size_t size); void mqPollThread(); @@ -55,14 +61,16 @@ class ZmqServer std::string m_endpoint; - std::string m_vrf; - void* m_context; + std::string m_vrf; + void* m_socket; bool m_allowZmqPoll; +// std::vector m_sendbuffer; + std::map> m_HandlerMap; }; diff --git a/core b/core new file mode 100644 index 00000000..6600072f Binary files /dev/null and b/core differ diff --git a/tests/zmq_state_ut.cpp b/tests/zmq_state_ut.cpp index 56a8299f..3af09364 100644 --- a/tests/zmq_state_ut.cpp +++ b/tests/zmq_state_ut.cpp @@ -56,8 +56,11 @@ static bool allDataReceived = false; static void producerWorker(string tableName, string endpoint, bool dbPersistence) { +SWSS_LOG_DEBUG("Inside producerWorker"); DBConnector db(TEST_DB, 0, true); +SWSS_LOG_DEBUG("producerWorker: After DBConnector"); ZmqClient client(endpoint); +SWSS_LOG_DEBUG("producerWorker: After zmqclient"); ZmqProducerStateTable p(&db, tableName, client, dbPersistence); cout << "Producer thread started: " << tableName << endl; @@ -257,6 +260,10 @@ static void consumerWorker(string tableName, string endpoint, bool dbPersistence } } + // Wait for some time to write into the DB. + + sleep(3); + allDataReceived = true; if (dbPersistence) @@ -288,6 +295,9 @@ static void testMethod(bool producerPersistence) // start consumer first, SHM can only have 1 consumer per table. thread *consumerThread = new thread(consumerWorker, testTableName, pullEndpoint, !producerPersistence); + // Wait for the consumer to start. + sleep(1); + cout << "Starting " << NUMBER_OF_THREADS << " producers" << endl; /* Starting the producer before the producer */ for (int i = 0; i < NUMBER_OF_THREADS; i++) @@ -351,6 +361,9 @@ static void testBatchMethod(bool producerPersistence) // start consumer first, SHM can only have 1 consumer per table. thread *consumerThread = new thread(consumerWorker, testTableName, pullEndpoint, !producerPersistence); + // Wait for the consumer to start. + sleep(1); + cout << "Starting " << NUMBER_OF_THREADS << " producers" << endl; /* Starting the producer before the producer */ for (int i = 0; i < NUMBER_OF_THREADS; i++) @@ -465,3 +478,131 @@ TEST(ZmqProducerStateTableDeleteAfterSend, test) table.getKeys(keys); EXPECT_EQ(keys.front(), testKey); } + +static bool zmq_done = false; + +static void zmqConsumerWorker(string tableName, string endpoint, bool dbPersistence) +{ + cout << "DIV:: Function zmqConsumerWorker 473" << endl; + cout << "Consumer thread started: " << tableName << endl; + DBConnector db(TEST_DB, 0, true); + cout << "DIV:: Function zmqConsumerWorker 476" << endl; + ZmqServer server(endpoint, true); + cout << "DIV:: Function zmqConsumerWorker 478" << endl; + ZmqConsumerStateTable c(&db, tableName, server, 128, 0, dbPersistence); + cout << "DIV:: Function zmqConsumerWorker 480" << endl; + Select cs; + cs.addSelectable(&c); + //validate received data + Selectable *selectcs; + std::deque vkco; + cout << "DIV:: Function zmqConsumerWorker 486" << endl; + int ret = 0; + while (!zmq_done) + { + cout << "DIV:: Function zmqConsumerWorker 490" << endl; + ret = cs.select(&selectcs, 10, true); + cout << "DIV:: Function zmqConsumerWorker 492" << endl; + if (ret == Select::OBJECT) + { + cout << "DIV:: Function zmqConsumerWorker 494" << endl; + c.pops(vkco); + cout << "DIV:: Function zmqConsumerWorker 496" << endl; + std::vector values; + cout << "DIV:: Function zmqConsumerWorker 498" << endl; + values.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector{FieldValueTuple{"f", "v"}}}); + cout << "DIV:: Function zmqConsumerWorker 500" << endl; + server.sendMsg(TEST_DB, tableName, values); + cout << "DIV:: Function zmqConsumerWorker 502" << endl; + } + } + + allDataReceived = true; + if (dbPersistence) + { + cout << "DIV:: Function zmqConsumerWorker 509" << endl; + // wait all persist data write to redis + while (c.dbUpdaterQueueSize() > 0) + { + sleep(1); + } + } + + cout << "Consumer thread ended: " << tableName << endl; +} + +static void ZmqWithResponse(bool producerPersistence) +{ + std::string testTableName = "ZMQ_PROD_CONS_UT"; +// std::string db_Name = "TEST_DB"; + std::string pushEndpoint = "tcp://localhost:1234"; + std::string pullEndpoint = "tcp://*:1234"; + // start consumer first, SHM can only have 1 consumer per table. + thread *consumerThread = new thread(zmqConsumerWorker, testTableName, pullEndpoint, !producerPersistence); + + cout << "DIV:: Function ZmqWithResponse ut 1 529" << endl; + // Wait for the consumer to be ready. + sleep(1); + DBConnector db(TEST_DB, 0, true); + cout << "DIV:: Function ZmqWithResponse ut 1 533" << endl; + ZmqClient client(pushEndpoint, 3000); + cout << "DIV:: Function ZmqWithResponse ut 1 535" << endl; + ZmqProducerStateTable p(&db, testTableName, client, true); + cout << "DIV:: Function ZmqWithResponse ut 1 537" << endl; + std::vector kcos; + kcos.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector{FieldValueTuple{"f", "v"}}}); + std::vector> kcos_p; + cout << "DIV:: Function ZmqWithResponse ut 1 541" << endl; + std::string dbName, tableName; + for (int i = 0; i < 3; ++i) + { + cout << "DIV:: Function ZmqWithResponse ut 1 545" << endl; + p.send(kcos); + ASSERT_TRUE(p.wait(dbName, tableName, kcos_p)); + cout << "DIV:: Function ZmqWithResponse ut 1 548" << endl; + EXPECT_EQ(dbName, TEST_DB); + EXPECT_EQ(tableName, testTableName); + ASSERT_EQ(kcos_p.size(), 1); + EXPECT_EQ(kfvKey(*kcos_p[0]), "k"); + EXPECT_EQ(kfvOp(*kcos_p[0]), SET_COMMAND); + std::vector cos = std::vector{FieldValueTuple{"f", "v"}}; + EXPECT_EQ(kfvFieldsValues(*kcos_p[0]), cos); + } + + cout << "DIV:: Function ZmqWithResponse ut 1 558" << endl; + zmq_done = true; + consumerThread->join(); + delete consumerThread; +} + +TEST(ZmqWithResponse, test) +{ + // test with persist by consumer + ZmqWithResponse(false); +} + +TEST(ZmqWithResponseClientError, test) +{ + std::string testTableName = "ZMQ_PROD_CONS_UT"; + std::string pushEndpoint = "tcp://localhost:1234"; +// std::string new_dbName = "TEST_DB"; + cout << "DIV:: Function ZmqWithResponse ut 2 575" << endl; + DBConnector db(TEST_DB, 0, true); + cout << "DIV:: Function ZmqWithResponse ut 2 577" << endl; + ZmqClient client(pushEndpoint, 3000); +// ZmqClient client(pushEndpoint); + cout << "DIV:: Function ZmqWithResponse ut 2 580" << endl; + ZmqProducerStateTable p(&db, testTableName, client, true); + cout << "DIV:: Function ZmqWithResponse ut 2 582" << endl; + std::vector kcos; + kcos.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector{}}); + std::vector> kcos_p; + std::string dbName, tableName; + cout << "DIV:: Function ZmqWithResponse ut 2 587" << endl; + p.send(kcos); + // Wait will timeout without server reply. + EXPECT_FALSE(p.wait(dbName, tableName, kcos_p)); + cout << "DIV:: Function ZmqWithResponse ut 2 591" << endl; +// EXPECT_FALSE(p.wait(new_dbName, testTableName, kcos_p)); +} +