Skip to content

Commit

Permalink
Support bulk chunk size per counter ID
Browse files Browse the repository at this point in the history
format: counter_prefix:size{;counter_prefix:size}

Signed-off-by: Stephen Sun <[email protected]>
  • Loading branch information
stephenxs committed Dec 13, 2024
1 parent 6d35990 commit 6c1086a
Show file tree
Hide file tree
Showing 4 changed files with 253 additions and 11 deletions.
1 change: 1 addition & 0 deletions lib/RedisRemoteSaiInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,7 @@ sai_status_t RedisRemoteSaiInterface::notifyCounterGroupOperations(

emplaceStrings(POLL_INTERVAL_FIELD, flexCounterGroupParam->poll_interval, entries);
emplaceStrings(BULK_CHUNK_SIZE_FIELD, flexCounterGroupParam->bulk_chunk_size, entries);
emplaceStrings(BULK_CHUNK_SIZE_PER_PREFIX_FIELD, flexCounterGroupParam->bulk_chunk_size_per_prefix, entries);
emplaceStrings(STATS_MODE_FIELD, flexCounterGroupParam->stats_mode, entries);
emplaceStrings(flexCounterGroupParam->plugin_name, flexCounterGroupParam->plugins, entries);
emplaceStrings(FLEX_COUNTER_STATUS_FIELD, flexCounterGroupParam->operation, entries);
Expand Down
7 changes: 7 additions & 0 deletions lib/sairedis.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,13 @@ typedef struct _sai_redis_flex_counter_group_parameter_t
*/
sai_s8_list_t bulk_chunk_size;

/**
* @brief The bulk counter prefix map the counter group
*
* It should be a string representing bulk chunk size of each sub counter group.
*/
sai_s8_list_t bulk_chunk_size_per_prefix;

} sai_redis_flex_counter_group_parameter_t;

typedef struct _sai_redis_flex_counter_parameter_t
Expand Down
248 changes: 239 additions & 9 deletions syncd/FlexCounter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ void BaseCounterContext::setBulkChunkSize(
default_bulk_chunk_size = bulkChunkSize;
}

void BaseCounterContext::setBulkChunkSizePerPrefix(
_In_ const std::string& bulkChunkSizePerPrefix)
{
SWSS_LOG_ENTER();
m_bulkChunkSizePerPrefix = bulkChunkSizePerPrefix;
}

template <typename StatType,
typename Enable = void>
struct CounterIds
Expand Down Expand Up @@ -151,6 +158,8 @@ struct BulkStatsContext
std::vector<StatType> counter_ids;
std::vector<sai_status_t> object_statuses;
std::vector<uint64_t> counters;
std::string name;
uint32_t default_bulk_chunk_size;
};

// TODO: use if const expression when cpp17 is supported
Expand Down Expand Up @@ -425,6 +434,8 @@ void deserializeAttr(
template <typename StatType>
class CounterContext : public BaseCounterContext
{
std::map<std::string, uint32_t> m_counterChunkSizeMapFromPrefix;

public:
typedef CounterIds<StatType> CounterIdsType;
typedef BulkStatsContext<StatType> BulkContextType;
Expand Down Expand Up @@ -533,12 +544,209 @@ class CounterContext : public BaseCounterContext
}
m_objectIdsMap.emplace(vid, counter_data);
}
else
else if (m_counterChunkSizeMapFromPrefix.empty())
{
std::sort(supportedIds.begin(), supportedIds.end());
auto bulkContext = getBulkStatsContext(supportedIds);
auto bulkContext = getBulkStatsContext(supportedIds, "default", default_bulk_chunk_size);
addBulkStatsContext(vid, rid, supportedIds, *bulkContext.get());
}
else
{
std::map<std::string, vector<StatType>> counter_prefix_map;
std::vector<StatType> default_partition;
mapCountersByPrefix(supportedIds, counter_prefix_map, default_partition);

for (auto &counterPrefix : counter_prefix_map)
{
std::sort(counterPrefix.second.begin(), counterPrefix.second.end());
auto bulkContext = getBulkStatsContext(counterPrefix.second, counterPrefix.first, m_counterChunkSizeMapFromPrefix[counterPrefix.first]);
addBulkStatsContext(vid, rid, counterPrefix.second, *bulkContext.get());
}

std::sort(default_partition.begin(), default_partition.end());
auto bulkContext = getBulkStatsContext(default_partition, "default", default_bulk_chunk_size);
addBulkStatsContext(vid, rid, default_partition, *bulkContext.get());
}
}

void parseCounterPrefixConfigString(
_In_ const std::string& prefixConfigString)
{
SWSS_LOG_ENTER();

m_counterChunkSizeMapFromPrefix.clear();

if (!prefixConfigString.empty())
{
auto tokens = swss::tokenize(prefixConfigString, ';');

for (auto &token: tokens)
{
auto counter_name_bulk_size = swss::tokenize(token, ':');
SWSS_LOG_INFO("New partition %s bulk chunk size %s", counter_name_bulk_size[0].c_str(), counter_name_bulk_size[1].c_str());
m_counterChunkSizeMapFromPrefix[counter_name_bulk_size[0]] = stoi(counter_name_bulk_size[1]);
}
}
}

void mapCountersByPrefix(
_In_ const std::vector<StatType>& supportedIds,
_Out_ std::map<std::string, std::vector<StatType>> &counter_prefix_map,
_Out_ std::vector<StatType> &default_partition,
_In_ bool log=false)
{
SWSS_LOG_ENTER();

default_partition.clear();
for (auto &counter : supportedIds)
{
std::string counterStr = serializeStat(counter);
bool found = false;
for (auto searchRef: m_counterChunkSizeMapFromPrefix)
{
if (!searchRef.first.empty() && counterStr.find(searchRef.first) != std::string::npos)
{
found = true;
counter_prefix_map[searchRef.first].push_back(counter);
if (log)
{
SWSS_LOG_INFO("Put counter %s to partition %s", counterStr.c_str(), searchRef.first.c_str());
}
break;
}
}
if (!found)
{
default_partition.push_back(counter);

if (log)
{
SWSS_LOG_INFO("Put counter %s to the default partition", counterStr.c_str());
}
}
}
}

void setBulkChunkSize(
_In_ uint32_t bulkChunkSize) override
{
SWSS_LOG_ENTER();
default_bulk_chunk_size = bulkChunkSize;
SWSS_LOG_INFO("Bulk chunk size updatd to %u", bulkChunkSize);

for (auto &bulkStatsContext : m_bulkContexts)
{
auto const &name = (*bulkStatsContext.second.get()).name;
if (name == "default")
{
SWSS_LOG_INFO("Bulk chunk size of default updated to %u", bulkChunkSize);
(*bulkStatsContext.second.get()).default_bulk_chunk_size = default_bulk_chunk_size;
break;
}
}
}

void setBulkChunkSizePerPrefix(
_In_ const std::string& bulkChunkSizePerPrefix) override
{
SWSS_LOG_ENTER();

m_bulkChunkSizePerPrefix = bulkChunkSizePerPrefix;

parseCounterPrefixConfigString(bulkChunkSizePerPrefix);
if (m_bulkContexts.empty())
{
return;
}

if (m_bulkContexts.size() == 1)
{
// hasn't split
SWSS_LOG_NOTICE("Split counter IDs set by prefix for the first time %s", bulkChunkSizePerPrefix.c_str());
auto it = m_bulkContexts.begin();
BulkContextType &singleBulkContext = *it->second.get();
const std::vector<StatType> &allCounterIds = singleBulkContext.counter_ids;
std::map<std::string, vector<StatType>> counterChunkSizePerPrefix;
std::vector<StatType> defaultPartition;

if (m_counterChunkSizeMapFromPrefix.empty())
{
singleBulkContext.default_bulk_chunk_size = default_bulk_chunk_size;
}
else
{
mapCountersByPrefix(allCounterIds, counterChunkSizePerPrefix, defaultPartition, true);

for (auto &counterPrefix : counterChunkSizePerPrefix)
{
std::sort(counterPrefix.second.begin(), counterPrefix.second.end());
auto bulkContext = getBulkStatsContext(counterPrefix.second, counterPrefix.first, m_counterChunkSizeMapFromPrefix[counterPrefix.first]);

bulkContext.get()->counter_ids = move(counterPrefix.second);
bulkContext.get()->object_statuses.resize(singleBulkContext.object_statuses.size());
bulkContext.get()->object_vids = singleBulkContext.object_vids;
bulkContext.get()->object_keys = singleBulkContext.object_keys;
bulkContext.get()->counters.resize(bulkContext.get()->counter_ids.size() * bulkContext.get()->object_vids.size());

SWSS_LOG_INFO("Re-initializing counter partition %s", counterPrefix.first.c_str());
}

std::sort(defaultPartition.begin(), defaultPartition.end());
auto defaultBulkContext = getBulkStatsContext(defaultPartition, "default", default_bulk_chunk_size);
defaultBulkContext.get()->counter_ids = move(defaultPartition);
defaultBulkContext.get()->object_statuses = move(singleBulkContext.object_statuses);
defaultBulkContext.get()->object_vids = move(singleBulkContext.object_vids);
defaultBulkContext.get()->object_keys = move(singleBulkContext.object_keys);
defaultBulkContext.get()->counters.resize(defaultBulkContext.get()->counter_ids.size() * defaultBulkContext.get()->object_vids.size());
m_bulkContexts.erase(it);
SWSS_LOG_INFO("Removed the previous default counter partition");
}
}
else
{
SWSS_LOG_NOTICE("Update bulk chunk size only %s", bulkChunkSizePerPrefix.c_str());

auto counterChunkSizeMapFromPrefix = m_counterChunkSizeMapFromPrefix;
for (auto &bulkStatsContext : m_bulkContexts)
{
auto const &name = (*bulkStatsContext.second.get()).name;

if (name == "default")
{
continue;
}

auto const &searchRef = counterChunkSizeMapFromPrefix.find(name);
if (searchRef != counterChunkSizeMapFromPrefix.end())
{
auto const &chunkSize = searchRef->second;

SWSS_LOG_INFO("Reset counter prefix %s chunk size %d", name.c_str(), chunkSize);
(*bulkStatsContext.second.get()).default_bulk_chunk_size = chunkSize;
counterChunkSizeMapFromPrefix.erase(searchRef);
}
else
{
SWSS_LOG_WARN("Update bulk chunk size: bulk chunk size for prefix %s is not provided", name.c_str());
}
}

for (auto &it : counterChunkSizeMapFromPrefix)
{
SWSS_LOG_WARN("Update bulk chunk size: prefix %s does not exist", it.first.c_str());
}
}

for (auto context : m_bulkContexts)
{
SWSS_LOG_INFO("%s %s partition %s number of OIDs %d number of counter IDs %d number of counters %d",
m_name.c_str(),
m_instanceId.c_str(),
context.second.get()->name.c_str(),
context.second.get()->object_keys.size(),
context.second.get()->counter_ids.size(),
context.second.get()->counters.size());
}
}

void removeObject(
Expand Down Expand Up @@ -744,15 +952,15 @@ class CounterContext : public BaseCounterContext
{
SWSS_LOG_ENTER();
auto statsMode = m_groupStatsMode == SAI_STATS_MODE_READ ? SAI_STATS_MODE_BULK_READ : SAI_STATS_MODE_BULK_READ_AND_CLEAR;
uint32_t bulk_chunk_size = default_bulk_chunk_size;
uint32_t bulk_chunk_size = ctx.default_bulk_chunk_size;
uint32_t size = static_cast<uint32_t>(ctx.object_keys.size());
if (bulk_chunk_size > size || bulk_chunk_size == 0)
{
bulk_chunk_size = size;
}
uint32_t current = 0;

SWSS_LOG_INFO("Before getting bulk %s %s size %u bulk chunk size %u current %u", m_instanceId.c_str(), m_name.c_str(), size, bulk_chunk_size, current);
SWSS_LOG_INFO("Before getting bulk %s %s %s size %u bulk chunk size %u current %u", m_instanceId.c_str(), m_name.c_str(), ctx.name.c_str(), size, bulk_chunk_size, current);

while (current < size)
{
Expand All @@ -772,7 +980,7 @@ class CounterContext : public BaseCounterContext
}
current += bulk_chunk_size;

SWSS_LOG_INFO("After getting bulk %s %s index %u(advanced to %u) bulk chunk size %u", m_instanceId.c_str(), m_name.c_str(), current - bulk_chunk_size, current, bulk_chunk_size);
SWSS_LOG_INFO("After getting bulk %s %s %s index %u(advanced to %u) bulk chunk size %u", m_instanceId.c_str(), m_name.c_str(), ctx.name.c_str(), current - bulk_chunk_size, current, bulk_chunk_size);

if (size - current < bulk_chunk_size)
{
Expand Down Expand Up @@ -801,11 +1009,13 @@ class CounterContext : public BaseCounterContext
values.clear();
}

SWSS_LOG_INFO("After pushing db %s %s", m_instanceId.c_str(), m_name.c_str());
SWSS_LOG_INFO("After pushing db %s %s %s", m_instanceId.c_str(), m_name.c_str(), ctx.name.c_str());
}

auto getBulkStatsContext(
_In_ const std::vector<StatType>& counterIds)
_In_ const std::vector<StatType>& counterIds,
_In_ const std::string& name,
_In_ uint32_t bulk_chunk_size=0)
{
SWSS_LOG_ENTER();
auto iter = m_bulkContexts.find(counterIds);
Expand All @@ -814,7 +1024,10 @@ class CounterContext : public BaseCounterContext
return iter->second;
}

SWSS_LOG_NOTICE("Create bulk stat context %s %s %s", m_instanceId.c_str(), m_name.c_str(), name.c_str());
auto ret = m_bulkContexts.emplace(counterIds, std::make_shared<BulkContextType>());
(*ret.first->second.get()).name = name;
(*ret.first->second.get()).default_bulk_chunk_size = bulk_chunk_size;
return ret.first->second;
}

Expand Down Expand Up @@ -1096,9 +1309,10 @@ class DashMeterCounterContext : public BaseCounterContext
public:
DashMeterCounterContext(
_In_ const std::string &name,
_In_ const std::string &instance,
_In_ sairedis::SaiInterface *vendor_sai,
_In_ std::string dbCounters):
BaseCounterContext(name), m_dbCounters(dbCounters), m_vendorSai(vendor_sai)
BaseCounterContext(name, instance), m_dbCounters(dbCounters), m_vendorSai(vendor_sai)
{
SWSS_LOG_ENTER();
}
Expand Down Expand Up @@ -1558,6 +1772,7 @@ void FlexCounter::addCounterPlugin(

m_isDiscarded = false;
uint32_t bulkChunkSize = 0;
std::string bulkChunkSizePerPrefix;

for (auto& fvt: values)
{
Expand All @@ -1579,6 +1794,15 @@ void FlexCounter::addCounterPlugin(
SWSS_LOG_NOTICE("Set counter context %s %s bulk size %u", m_instanceId.c_str(), COUNTER_TYPE_PORT.c_str(), bulkChunkSize);
}
}
else if (field == BULK_CHUNK_SIZE_PER_PREFIX_FIELD)
{
bulkChunkSizePerPrefix = value;
for (auto &context : m_counterContext)
{
context.second->setBulkChunkSizePerPrefix(bulkChunkSizePerPrefix);
SWSS_LOG_NOTICE("Set counter context %s %s bulk chunk prefix map %s", m_instanceId.c_str(), COUNTER_TYPE_PORT.c_str(), bulkChunkSizePerPrefix.c_str());
}
}
else if (field == FLEX_COUNTER_STATUS_FIELD)
{
setStatus(value);
Expand Down Expand Up @@ -1607,6 +1831,12 @@ void FlexCounter::addCounterPlugin(
getCounterContext(counterTypeRef->second)->setBulkChunkSize(bulkChunkSize);
SWSS_LOG_NOTICE("Create counter context %s %s with bulk size %u", m_instanceId.c_str(), counterTypeRef->second.c_str(), bulkChunkSize);
}

if (!bulkChunkSizePerPrefix.empty())
{
getCounterContext(counterTypeRef->second)->setBulkChunkSizePerPrefix(bulkChunkSizePerPrefix);
SWSS_LOG_NOTICE("Create counter context %s %s with bulk prefix map %s", m_instanceId.c_str(), counterTypeRef->second.c_str(), bulkChunkSizePerPrefix.c_str());
}
}
else
{
Expand Down Expand Up @@ -1752,7 +1982,7 @@ std::shared_ptr<BaseCounterContext> FlexCounter::createCounterContext(
}
else if (context_name == COUNTER_TYPE_METER_BUCKET)
{
return std::make_shared<DashMeterCounterContext>(context_name, m_vendorSai.get(), m_dbCounters);
return std::make_shared<DashMeterCounterContext>(context_name, instance, m_vendorSai.get(), m_dbCounters);
}
else if (context_name == ATTR_TYPE_QUEUE)
{
Expand Down
Loading

0 comments on commit 6c1086a

Please sign in to comment.