diff --git a/lib/RedisRemoteSaiInterface.cpp b/lib/RedisRemoteSaiInterface.cpp index 63961b74b..08ffb1939 100644 --- a/lib/RedisRemoteSaiInterface.cpp +++ b/lib/RedisRemoteSaiInterface.cpp @@ -569,6 +569,7 @@ sai_status_t RedisRemoteSaiInterface::notifyCounterGroupOperations( emplaceStrings(POLL_INTERVAL_FIELD, flexCounterGroupParam->poll_interval, entries); emplaceStrings(BULK_CHUNK_SIZE_FIELD, flexCounterGroupParam->bulk_chunk_size, entries); + emplaceStrings(BULK_CHUNK_SIZE_PER_PREFIX_FIELD, flexCounterGroupParam->bulk_chunk_size_per_prefix, entries); emplaceStrings(STATS_MODE_FIELD, flexCounterGroupParam->stats_mode, entries); emplaceStrings(flexCounterGroupParam->plugin_name, flexCounterGroupParam->plugins, entries); emplaceStrings(FLEX_COUNTER_STATUS_FIELD, flexCounterGroupParam->operation, entries); diff --git a/lib/sairedis.h b/lib/sairedis.h index 5e334694f..3481de3df 100644 --- a/lib/sairedis.h +++ b/lib/sairedis.h @@ -147,6 +147,13 @@ typedef struct _sai_redis_flex_counter_group_parameter_t */ sai_s8_list_t bulk_chunk_size; + /** + * @brief The bulk counter prefix map the counter group + * + * It should be a string representing bulk chunk size of each sub counter group. + */ + sai_s8_list_t bulk_chunk_size_per_prefix; + } sai_redis_flex_counter_group_parameter_t; typedef struct _sai_redis_flex_counter_parameter_t diff --git a/syncd/FlexCounter.cpp b/syncd/FlexCounter.cpp index 03a29abd4..1720ee4fc 100644 --- a/syncd/FlexCounter.cpp +++ b/syncd/FlexCounter.cpp @@ -81,6 +81,13 @@ void BaseCounterContext::setBulkChunkSize( default_bulk_chunk_size = bulkChunkSize; } +void BaseCounterContext::setBulkChunkSizePerPrefix( + _In_ const std::string& bulkChunkSizePerPrefix) +{ + SWSS_LOG_ENTER(); + m_bulkChunkSizePerPrefix = bulkChunkSizePerPrefix; +} + template struct CounterIds @@ -151,6 +158,8 @@ struct BulkStatsContext std::vector counter_ids; std::vector object_statuses; std::vector counters; + std::string name; + uint32_t default_bulk_chunk_size; }; // TODO: use if const expression when cpp17 is supported @@ -425,6 +434,8 @@ void deserializeAttr( template class CounterContext : public BaseCounterContext { + std::map m_counterChunkSizeMapFromPrefix; + public: typedef CounterIds CounterIdsType; typedef BulkStatsContext BulkContextType; @@ -533,12 +544,209 @@ class CounterContext : public BaseCounterContext } m_objectIdsMap.emplace(vid, counter_data); } - else + else if (m_counterChunkSizeMapFromPrefix.empty()) { std::sort(supportedIds.begin(), supportedIds.end()); - auto bulkContext = getBulkStatsContext(supportedIds); + auto bulkContext = getBulkStatsContext(supportedIds, "default", default_bulk_chunk_size); addBulkStatsContext(vid, rid, supportedIds, *bulkContext.get()); } + else + { + std::map> counter_prefix_map; + std::vector default_partition; + mapCountersByPrefix(supportedIds, counter_prefix_map, default_partition); + + for (auto &counterPrefix : counter_prefix_map) + { + std::sort(counterPrefix.second.begin(), counterPrefix.second.end()); + auto bulkContext = getBulkStatsContext(counterPrefix.second, counterPrefix.first, m_counterChunkSizeMapFromPrefix[counterPrefix.first]); + addBulkStatsContext(vid, rid, counterPrefix.second, *bulkContext.get()); + } + + std::sort(default_partition.begin(), default_partition.end()); + auto bulkContext = getBulkStatsContext(default_partition, "default", default_bulk_chunk_size); + addBulkStatsContext(vid, rid, default_partition, *bulkContext.get()); + } + } + + void parseCounterPrefixConfigString( + _In_ const std::string& prefixConfigString) + { + SWSS_LOG_ENTER(); + + m_counterChunkSizeMapFromPrefix.clear(); + + if (!prefixConfigString.empty()) + { + auto tokens = swss::tokenize(prefixConfigString, ';'); + + for (auto &token: tokens) + { + auto counter_name_bulk_size = swss::tokenize(token, ':'); + SWSS_LOG_INFO("New partition %s bulk chunk size %s", counter_name_bulk_size[0].c_str(), counter_name_bulk_size[1].c_str()); + m_counterChunkSizeMapFromPrefix[counter_name_bulk_size[0]] = stoi(counter_name_bulk_size[1]); + } + } + } + + void mapCountersByPrefix( + _In_ const std::vector& supportedIds, + _Out_ std::map> &counter_prefix_map, + _Out_ std::vector &default_partition, + _In_ bool log=false) + { + SWSS_LOG_ENTER(); + + default_partition.clear(); + for (auto &counter : supportedIds) + { + std::string counterStr = serializeStat(counter); + bool found = false; + for (auto searchRef: m_counterChunkSizeMapFromPrefix) + { + if (!searchRef.first.empty() && counterStr.find(searchRef.first) != std::string::npos) + { + found = true; + counter_prefix_map[searchRef.first].push_back(counter); + if (log) + { + SWSS_LOG_INFO("Put counter %s to partition %s", counterStr.c_str(), searchRef.first.c_str()); + } + break; + } + } + if (!found) + { + default_partition.push_back(counter); + + if (log) + { + SWSS_LOG_INFO("Put counter %s to the default partition", counterStr.c_str()); + } + } + } + } + + void setBulkChunkSize( + _In_ uint32_t bulkChunkSize) override + { + SWSS_LOG_ENTER(); + default_bulk_chunk_size = bulkChunkSize; + SWSS_LOG_INFO("Bulk chunk size updatd to %u", bulkChunkSize); + + for (auto &bulkStatsContext : m_bulkContexts) + { + auto const &name = (*bulkStatsContext.second.get()).name; + if (name == "default") + { + SWSS_LOG_INFO("Bulk chunk size of default updated to %u", bulkChunkSize); + (*bulkStatsContext.second.get()).default_bulk_chunk_size = default_bulk_chunk_size; + break; + } + } + } + + void setBulkChunkSizePerPrefix( + _In_ const std::string& bulkChunkSizePerPrefix) override + { + SWSS_LOG_ENTER(); + + m_bulkChunkSizePerPrefix = bulkChunkSizePerPrefix; + + parseCounterPrefixConfigString(bulkChunkSizePerPrefix); + if (m_bulkContexts.empty()) + { + return; + } + + if (m_bulkContexts.size() == 1) + { + // hasn't split + SWSS_LOG_NOTICE("Split counter IDs set by prefix for the first time %s", bulkChunkSizePerPrefix.c_str()); + auto it = m_bulkContexts.begin(); + BulkContextType &singleBulkContext = *it->second.get(); + const std::vector &allCounterIds = singleBulkContext.counter_ids; + std::map> counterChunkSizePerPrefix; + std::vector defaultPartition; + + if (m_counterChunkSizeMapFromPrefix.empty()) + { + singleBulkContext.default_bulk_chunk_size = default_bulk_chunk_size; + } + else + { + mapCountersByPrefix(allCounterIds, counterChunkSizePerPrefix, defaultPartition, true); + + for (auto &counterPrefix : counterChunkSizePerPrefix) + { + std::sort(counterPrefix.second.begin(), counterPrefix.second.end()); + auto bulkContext = getBulkStatsContext(counterPrefix.second, counterPrefix.first, m_counterChunkSizeMapFromPrefix[counterPrefix.first]); + + bulkContext.get()->counter_ids = move(counterPrefix.second); + bulkContext.get()->object_statuses.resize(singleBulkContext.object_statuses.size()); + bulkContext.get()->object_vids = singleBulkContext.object_vids; + bulkContext.get()->object_keys = singleBulkContext.object_keys; + bulkContext.get()->counters.resize(bulkContext.get()->counter_ids.size() * bulkContext.get()->object_vids.size()); + + SWSS_LOG_INFO("Re-initializing counter partition %s", counterPrefix.first.c_str()); + } + + std::sort(defaultPartition.begin(), defaultPartition.end()); + auto defaultBulkContext = getBulkStatsContext(defaultPartition, "default", default_bulk_chunk_size); + defaultBulkContext.get()->counter_ids = move(defaultPartition); + defaultBulkContext.get()->object_statuses = move(singleBulkContext.object_statuses); + defaultBulkContext.get()->object_vids = move(singleBulkContext.object_vids); + defaultBulkContext.get()->object_keys = move(singleBulkContext.object_keys); + defaultBulkContext.get()->counters.resize(defaultBulkContext.get()->counter_ids.size() * defaultBulkContext.get()->object_vids.size()); + m_bulkContexts.erase(it); + SWSS_LOG_INFO("Removed the previous default counter partition"); + } + } + else + { + SWSS_LOG_NOTICE("Update bulk chunk size only %s", bulkChunkSizePerPrefix.c_str()); + + auto counterChunkSizeMapFromPrefix = m_counterChunkSizeMapFromPrefix; + for (auto &bulkStatsContext : m_bulkContexts) + { + auto const &name = (*bulkStatsContext.second.get()).name; + + if (name == "default") + { + continue; + } + + auto const &searchRef = counterChunkSizeMapFromPrefix.find(name); + if (searchRef != counterChunkSizeMapFromPrefix.end()) + { + auto const &chunkSize = searchRef->second; + + SWSS_LOG_INFO("Reset counter prefix %s chunk size %d", name.c_str(), chunkSize); + (*bulkStatsContext.second.get()).default_bulk_chunk_size = chunkSize; + counterChunkSizeMapFromPrefix.erase(searchRef); + } + else + { + SWSS_LOG_WARN("Update bulk chunk size: bulk chunk size for prefix %s is not provided", name.c_str()); + } + } + + for (auto &it : counterChunkSizeMapFromPrefix) + { + SWSS_LOG_WARN("Update bulk chunk size: prefix %s does not exist", it.first.c_str()); + } + } + + for (auto context : m_bulkContexts) + { + SWSS_LOG_INFO("%s %s partition %s number of OIDs %d number of counter IDs %d number of counters %d", + m_name.c_str(), + m_instanceId.c_str(), + context.second.get()->name.c_str(), + context.second.get()->object_keys.size(), + context.second.get()->counter_ids.size(), + context.second.get()->counters.size()); + } } void removeObject( @@ -744,7 +952,7 @@ class CounterContext : public BaseCounterContext { SWSS_LOG_ENTER(); auto statsMode = m_groupStatsMode == SAI_STATS_MODE_READ ? SAI_STATS_MODE_BULK_READ : SAI_STATS_MODE_BULK_READ_AND_CLEAR; - uint32_t bulk_chunk_size = default_bulk_chunk_size; + uint32_t bulk_chunk_size = ctx.default_bulk_chunk_size; uint32_t size = static_cast(ctx.object_keys.size()); if (bulk_chunk_size > size || bulk_chunk_size == 0) { @@ -752,7 +960,7 @@ class CounterContext : public BaseCounterContext } uint32_t current = 0; - SWSS_LOG_INFO("Before getting bulk %s %s size %u bulk chunk size %u current %u", m_instanceId.c_str(), m_name.c_str(), size, bulk_chunk_size, current); + SWSS_LOG_INFO("Before getting bulk %s %s %s size %u bulk chunk size %u current %u", m_instanceId.c_str(), m_name.c_str(), ctx.name.c_str(), size, bulk_chunk_size, current); while (current < size) { @@ -772,7 +980,7 @@ class CounterContext : public BaseCounterContext } current += bulk_chunk_size; - SWSS_LOG_INFO("After getting bulk %s %s index %u(advanced to %u) bulk chunk size %u", m_instanceId.c_str(), m_name.c_str(), current - bulk_chunk_size, current, bulk_chunk_size); + SWSS_LOG_INFO("After getting bulk %s %s %s index %u(advanced to %u) bulk chunk size %u", m_instanceId.c_str(), m_name.c_str(), ctx.name.c_str(), current - bulk_chunk_size, current, bulk_chunk_size); if (size - current < bulk_chunk_size) { @@ -801,11 +1009,13 @@ class CounterContext : public BaseCounterContext values.clear(); } - SWSS_LOG_INFO("After pushing db %s %s", m_instanceId.c_str(), m_name.c_str()); + SWSS_LOG_INFO("After pushing db %s %s %s", m_instanceId.c_str(), m_name.c_str(), ctx.name.c_str()); } auto getBulkStatsContext( - _In_ const std::vector& counterIds) + _In_ const std::vector& counterIds, + _In_ const std::string& name, + _In_ uint32_t bulk_chunk_size=0) { SWSS_LOG_ENTER(); auto iter = m_bulkContexts.find(counterIds); @@ -814,7 +1024,10 @@ class CounterContext : public BaseCounterContext return iter->second; } + SWSS_LOG_NOTICE("Create bulk stat context %s %s %s", m_instanceId.c_str(), m_name.c_str(), name.c_str()); auto ret = m_bulkContexts.emplace(counterIds, std::make_shared()); + (*ret.first->second.get()).name = name; + (*ret.first->second.get()).default_bulk_chunk_size = bulk_chunk_size; return ret.first->second; } @@ -1096,9 +1309,10 @@ class DashMeterCounterContext : public BaseCounterContext public: DashMeterCounterContext( _In_ const std::string &name, + _In_ const std::string &instance, _In_ sairedis::SaiInterface *vendor_sai, _In_ std::string dbCounters): - BaseCounterContext(name), m_dbCounters(dbCounters), m_vendorSai(vendor_sai) + BaseCounterContext(name, instance), m_dbCounters(dbCounters), m_vendorSai(vendor_sai) { SWSS_LOG_ENTER(); } @@ -1558,6 +1772,7 @@ void FlexCounter::addCounterPlugin( m_isDiscarded = false; uint32_t bulkChunkSize = 0; + std::string bulkChunkSizePerPrefix; for (auto& fvt: values) { @@ -1579,6 +1794,15 @@ void FlexCounter::addCounterPlugin( SWSS_LOG_NOTICE("Set counter context %s %s bulk size %u", m_instanceId.c_str(), COUNTER_TYPE_PORT.c_str(), bulkChunkSize); } } + else if (field == BULK_CHUNK_SIZE_PER_PREFIX_FIELD) + { + bulkChunkSizePerPrefix = value; + for (auto &context : m_counterContext) + { + context.second->setBulkChunkSizePerPrefix(bulkChunkSizePerPrefix); + SWSS_LOG_NOTICE("Set counter context %s %s bulk chunk prefix map %s", m_instanceId.c_str(), COUNTER_TYPE_PORT.c_str(), bulkChunkSizePerPrefix.c_str()); + } + } else if (field == FLEX_COUNTER_STATUS_FIELD) { setStatus(value); @@ -1607,6 +1831,12 @@ void FlexCounter::addCounterPlugin( getCounterContext(counterTypeRef->second)->setBulkChunkSize(bulkChunkSize); SWSS_LOG_NOTICE("Create counter context %s %s with bulk size %u", m_instanceId.c_str(), counterTypeRef->second.c_str(), bulkChunkSize); } + + if (!bulkChunkSizePerPrefix.empty()) + { + getCounterContext(counterTypeRef->second)->setBulkChunkSizePerPrefix(bulkChunkSizePerPrefix); + SWSS_LOG_NOTICE("Create counter context %s %s with bulk prefix map %s", m_instanceId.c_str(), counterTypeRef->second.c_str(), bulkChunkSizePerPrefix.c_str()); + } } else { @@ -1752,7 +1982,7 @@ std::shared_ptr FlexCounter::createCounterContext( } else if (context_name == COUNTER_TYPE_METER_BUCKET) { - return std::make_shared(context_name, m_vendorSai.get(), m_dbCounters); + return std::make_shared(context_name, instance, m_vendorSai.get(), m_dbCounters); } else if (context_name == ATTR_TYPE_QUEUE) { diff --git a/syncd/FlexCounter.h b/syncd/FlexCounter.h index 7c2671966..211dc7580 100644 --- a/syncd/FlexCounter.h +++ b/syncd/FlexCounter.h @@ -27,9 +27,12 @@ namespace syncd void setNoDoubleCheckBulkCapability( _In_ bool); - void setBulkChunkSize( + virtual void setBulkChunkSize( _In_ uint32_t bulkChunkSize); + virtual void setBulkChunkSizePerPrefix( + _In_ const std::string& bulkChunkSizePerPrefix); + bool hasPlugin() const {return !m_plugins.empty();} void removePlugins() {m_plugins.clear();} @@ -56,6 +59,7 @@ namespace syncd std::string m_name; std::string m_instanceId; std::set m_plugins; + std::string m_bulkChunkSizePerPrefix; public: bool always_check_supported_counters = false; @@ -63,7 +67,7 @@ namespace syncd bool use_sai_stats_ext = false; bool double_confirm_supported_counters = false; bool no_double_check_bulk_capability = false; - uint32_t default_bulk_chunk_size = 0; + uint32_t default_bulk_chunk_size; }; class FlexCounter {