Skip to content

Commit

Permalink
fix: convert strings to boost:flyweight
Browse files Browse the repository at this point in the history
Problem: string comparison is immensely inefficient, taking 6-10%
of resources (reported by Tom) for a trace.
Solution: start with the root of the issue in the scoring
module and work backwards to convert string types to boost::
flyweight. I tried to have a minimal footprint here but it
spread really quickly

Signed-off-by: vsoch <[email protected]>
  • Loading branch information
vsoch committed Apr 12, 2024
1 parent c8e03f8 commit ae71d07
Show file tree
Hide file tree
Showing 21 changed files with 368 additions and 202 deletions.
3 changes: 2 additions & 1 deletion .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
"terminal.integrated.defaultProfile.linux": "bash"
},
"extensions": [
"ms-vscode.cmake-tools"
"ms-vscode.cmake-tools",
"ms-vscode.cpptools-extension-pack"
]
}
},
Expand Down
9 changes: 9 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ pkg_check_modules(HWLOC REQUIRED IMPORTED_TARGET hwloc>=1.11.1)
pkg_check_modules(JANSSON REQUIRED IMPORTED_TARGET jansson>=2.10)
pkg_check_modules(UUID REQUIRED IMPORTED_TARGET uuid)

# threads needed for boost flyweight
set(CMAKE_THREAD_PREFER_PTHREAD TRUE)
set(THREADS_PREFER_PTHREAD_FLAG TRUE)
find_package(Threads REQUIRED)

set(Boost_USE_STATIC_LIBS OFF)
set(Boost_USE_MULTITHREADED ON)
set(Boost_USE_STATIC_RUNTIME OFF)
Expand Down Expand Up @@ -209,6 +214,10 @@ add_custom_target(dist
COMMAND rm flux-sched.ver
COMMENT "Generated flux-sched-${FLUX_SCHED_VER}.tar.gz"
)

add_compile_options(-fsanitize=address)
add_link_options(-fsanitize=address)

# run installcheck, if it passes then write out version information and pack up
# a tarball with the result
add_custom_target(distcheck
Expand Down
4 changes: 3 additions & 1 deletion resource/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ add_library(resource STATIC
policies/dfu_match_policy_factory.cpp
jobinfo/jobinfo.cpp
schema/resource_data.cpp
schema/data_std.cpp
schema/infra_data.cpp
schema/sched_data.cpp
schema/color.cpp
Expand Down Expand Up @@ -86,8 +87,9 @@ target_link_libraries(resource PUBLIC
Boost::regex
jobspec_conv
Boost::filesystem
Threads::Threads
)

add_subdirectory(modules)
add_subdirectory(reapi)
add_subdirectory(evaluators)
Expand Down
14 changes: 9 additions & 5 deletions resource/modules/resource_match.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,7 @@ static int grow_resource_db_hwloc (std::shared_ptr<resource_ctx_t> &ctx,
flux_log (ctx->h, LOG_ERR, "%s", future_strerror (f, errno));
goto done;
}
if (db.metadata.roots.find ("containment") == db.metadata.roots.end ()) {
if (db.metadata.roots.find (flux_subsystem_containment) == db.metadata.roots.end ()) {
if (rank != IDSET_INVALID_ID) {
if (!(hwloc_xml = get_array_string (xml_array, rank)))
goto done;
Expand All @@ -895,14 +895,14 @@ static int grow_resource_db_hwloc (std::shared_ptr<resource_ctx_t> &ctx,

// If the above grow() does not grow resources in the "containment"
// subsystem, this condition can still be false
if (db.metadata.roots.find ("containment") == db.metadata.roots.end ()) {
if (db.metadata.roots.find (flux_subsystem_containment) == db.metadata.roots.end ()) {
rc = -1;
errno = EINVAL;
flux_log (ctx->h, LOG_ERR, "%s: cluster vertex is unavailable",
__FUNCTION__);
goto done;
}
v = db.metadata.roots.at ("containment");
v = db.metadata.roots.at (flux_subsystem_containment);

rank = idset_next (ids, rank);
while (rank != IDSET_INVALID_ID) {
Expand Down Expand Up @@ -930,7 +930,7 @@ static int grow_resource_db_rv1exec (std::shared_ptr<resource_ctx_t> &ctx,
resource_graph_db_t &db = *(ctx->db);
char *rv1_str = nullptr;

if (db.metadata.roots.find ("containment") == db.metadata.roots.end ()) {
if (db.metadata.roots.find (flux_subsystem_containment) == db.metadata.roots.end ()) {
if ( (rv1_str = json_dumps (resobj, JSON_INDENT (0))) == NULL) {
errno = ENOMEM;
goto done;
Expand Down Expand Up @@ -1049,7 +1049,7 @@ static int grow_resource_db_jgf (std::shared_ptr<resource_ctx_t> &ctx,
__FUNCTION__);
goto done;
}
if (db.metadata.roots.find ("containment") == db.metadata.roots.end ()) {
if (db.metadata.roots.find (flux_subsystem_containment) == db.metadata.roots.end ()) {
if ( p_r_lite
&& (rc = remap_jgf_namespace (ctx, r_lite, p_r_lite)) < 0) {
flux_log_error (ctx->h, "%s: remap_jgf_namespace", __FUNCTION__);
Expand Down Expand Up @@ -2727,6 +2727,10 @@ extern "C" int mod_main (flux_t *h, int argc, char **argv)

flux_log (h, LOG_INFO, "version %s", PACKAGE_VERSION);

// Init flyweight, which is used across the resource module
// see resource/schema/data_std.cpp.
init_flyweight();

try {
std::shared_ptr<resource_ctx_t> ctx = nullptr;
uint32_t rank = 1;
Expand Down
51 changes: 33 additions & 18 deletions resource/policies/base/matcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ matcher_data_t::~matcher_data_t ()
int matcher_data_t::add_subsystem (const subsystem_t s, const std::string tf)
{
if (m_subsystems_map.find (s) == m_subsystems_map.end ()) {
boost::flyweight<std::string> fly_tf(tf);
m_subsystems.push_back (s);
m_subsystems_map[s].insert (tf);
m_subsystems_map[s].insert (fly_tf);
return 0;
}
return -1;
Expand Down Expand Up @@ -222,43 +223,51 @@ void matcher_util_api_t::set_pruning_type (const std::string &subsystem,
const std::string &anchor_type,
const std::string &prune_type)
{
static boost::flyweight<std::string> fly_subsystem(subsystem);
static boost::flyweight<std::string> fly_anchor_type(anchor_type);
static boost::flyweight<std::string> fly_prune_type(prune_type);

// Use operator[] to create an entry if subsystem key doesn't exist
auto &s = m_pruning_types[subsystem];
auto &s = m_pruning_types[fly_subsystem];
if (anchor_type == ANY_RESOURCE_TYPE) {
// Check whether you have already installed prune_type.
// If so, remove it as you want to install it against ANY_RESOURCE_TYPE.
for (auto &kv : s)
kv.second.erase (prune_type);
kv.second.erase (fly_prune_type);
// final container is "set" so it will only allow unique prune_types
s[anchor_type].insert (prune_type);
s[fly_anchor_type].insert (fly_prune_type);
} else {
if ((s.find (ANY_RESOURCE_TYPE) != s.end ())) {
auto &prune_set = s[ANY_RESOURCE_TYPE];
if (prune_set.find (prune_type) == prune_set.end ()) {
if (prune_set.find (fly_prune_type) == prune_set.end ()) {
// If prune_type does not exist against ANY_RESOURCE_TYPE
// Install it against anchor_type, an individual resource type
s[anchor_type].insert (prune_type);
s[fly_anchor_type].insert (fly_prune_type);
} // orelse NOOP
} else {
s[anchor_type].insert (prune_type);
s[fly_anchor_type].insert (fly_prune_type);
}
}
m_total_set[subsystem].insert (prune_type);
m_total_set[fly_subsystem].insert (fly_prune_type);
}

bool matcher_util_api_t::is_my_pruning_type (const std::string &subsystem,
const std::string &anchor_type,
const std::string &prune_type)
{
static boost::flyweight<std::string> fly_subsystem(subsystem);
static boost::flyweight<std::string> fly_anchor_type(anchor_type);
static boost::flyweight<std::string> fly_prune_type(prune_type);

bool rc = false;
try {
auto &s = m_pruning_types.at (subsystem);
if (s.find (anchor_type) != s.end ()) {
auto &m = s.at (anchor_type);
rc = (m.find (prune_type) != m.end ());
auto &s = m_pruning_types.at (fly_subsystem);
if (s.find (fly_anchor_type) != s.end ()) {
auto &m = s.at (fly_anchor_type);
rc = (m.find (fly_prune_type) != m.end ());
} else if (s.find (ANY_RESOURCE_TYPE) != s.end ()) {
auto &m = s.at (ANY_RESOURCE_TYPE);
rc = (m.find (prune_type) != m.end ());
rc = (m.find (fly_prune_type) != m.end ());
}
} catch (std::out_of_range &e) {
rc = false;
Expand All @@ -269,10 +278,13 @@ bool matcher_util_api_t::is_my_pruning_type (const std::string &subsystem,
bool matcher_util_api_t::is_pruning_type (const std::string &subsystem,
const std::string &prune_type)
{
static boost::flyweight<std::string> fly_subsystem(subsystem);
static boost::flyweight<std::string> fly_prune_type(prune_type);

bool rc = true;
try {
auto &s = m_total_set.at (subsystem);
rc = (s.find (prune_type) != s.end ());
auto &s = m_total_set.at (fly_subsystem);
rc = (s.find (fly_prune_type) != s.end ());
} catch (std::out_of_range &e) {
rc = false;
}
Expand All @@ -283,14 +295,17 @@ bool matcher_util_api_t::get_my_pruning_types (const std::string &subsystem,
const std::string &anchor_type,
std::vector<std::string> &out)
{
static boost::flyweight<std::string> fly_subsystem(subsystem);
static boost::flyweight<std::string> fly_anchor_type(anchor_type);

bool rc = true;
try {
// Get the value of the subsystem, which is a map
// of <string, set> type.
auto &s = m_pruning_types.at (subsystem);
if (s.find (anchor_type) != s.end ()) {
auto &s = m_pruning_types.at (fly_subsystem);
if (s.find (fly_anchor_type) != s.end ()) {
// Get the value of the anchor map, which is a set
auto &m = s.at (anchor_type);
auto &m = s.at (fly_anchor_type);
for (auto &k : m)
out.push_back (k);
}
Expand Down
9 changes: 5 additions & 4 deletions resource/policies/base/matcher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <vector>
#include <set>
#include <map>
#include <boost/flyweight.hpp>
#include "resource/libjobspec/jobspec.hpp"
#include "resource/schema/data_std.hpp"
#include "resource/planner/c/planner.h"
Expand All @@ -24,7 +25,7 @@
namespace Flux {
namespace resource_model {

const std::string ANY_RESOURCE_TYPE = "*";
const boost::flyweight<std::string> ANY_RESOURCE_TYPE = flux_match_any;

enum match_score_t { MATCH_UNMET = 0, MATCH_MET = 1 };

Expand Down Expand Up @@ -77,7 +78,7 @@ class matcher_data_t

private:
std::string m_name;
subsystem_t m_err_subsystem = "error";
subsystem_t m_err_subsystem = flux_error;
std::vector<subsystem_t> m_subsystems;
multi_subsystemsS m_subsystems_map;
};
Expand Down Expand Up @@ -170,8 +171,8 @@ class matcher_util_api_t
// in the containment subsystem at any level, please
// maintain an aggregate on the available cores under it.
std::map<subsystem_t,
std::map<std::string, std::set<std::string>>> m_pruning_types;
std::map<subsystem_t, std::set<std::string>> m_total_set;
std::map<boost::flyweight<std::string>, std::set<boost::flyweight<std::string>>>> m_pruning_types;
std::map<subsystem_t, std::set<boost::flyweight<std::string>>> m_total_set;
};

} // namespace resource_model
Expand Down
4 changes: 2 additions & 2 deletions resource/readers/resource_reader_grug.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ vtx_t dfs_emitter_t::emit_vertex (ggv_t u, gge_t e, const gg_t &recipe,

vtx_t v = add_vertex (g);;
std::string pref = "";
std::string ssys = recipe[u].subsystem;
auto ssys = recipe[u].subsystem;
int id = 0;

if (src_v == boost::graph_traits<resource_graph_t>::null_vertex ()) {
Expand All @@ -232,7 +232,7 @@ vtx_t dfs_emitter_t::emit_vertex (ggv_t u, gge_t e, const gg_t &recipe,
g[v].id = id;
g[v].name = recipe[u].basename + istr;
g[v].paths[ssys] = pref + "/" + g[v].name;
g[v].idata.member_of[ssys] = "*";
g[v].idata.member_of[ssys] = flux_match_any;
g[v].uniq_id = v;
g[v].rank = m_rank;

Expand Down
18 changes: 9 additions & 9 deletions resource/readers/resource_reader_hwloc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ extern "C" {
#include <limits>
#include "resource/readers/resource_reader_hwloc.hpp"
#include "resource/store/resource_graph_store.hpp"
#include <boost/flyweight.hpp>

using namespace Flux;
using namespace resource_model;
Expand Down Expand Up @@ -73,6 +74,8 @@ vtx_t resource_reader_hwloc_t::add_new_vertex (resource_graph_t &g,
{
vtx_t v = boost::add_vertex (g);

boost::flyweight<std::string> fly_subsystem(subsys);

// Set properties of the new vertex
bool is_root = false;
if (parent == boost::graph_traits<resource_graph_t>::null_vertex ())
Expand All @@ -91,7 +94,7 @@ vtx_t resource_reader_hwloc_t::add_new_vertex (resource_graph_t &g,
g[v].id = id;
g[v].name = (name != "")? name : basename + istr;
g[v].paths[subsys] = prefix + "/" + g[v].name;
g[v].idata.member_of[subsys] = "*";
g[v].idata.member_of[fly_subsystem] = flux_match_any;
g[v].status = resource_pool_t::status_t::UP;
g[v].properties = properties;

Expand Down Expand Up @@ -357,13 +360,10 @@ int resource_reader_hwloc_t::walk_hwloc (resource_graph_t &g,
if (!supported_resource) {
valid_ancestor = parent;
} else {
const std::string subsys = "containment";
vtx_t v = add_new_vertex (g, m, parent,
id, subsys, type, basename,
id, flux_subsystem_containment, type, basename,
name, properties, size, rank);
valid_ancestor = v;
std::string relation = "contains";
std::string rev_relation = "in";
edg_t e;
bool inserted; // set to false when we try and insert a parallel edge

Expand All @@ -374,8 +374,8 @@ int resource_reader_hwloc_t::walk_hwloc (resource_graph_t &g,
+ g[parent].name + " -> " + g[v].name + "; ";
return -1;
}
g[e].idata.member_of[subsys] = relation;
g[e].name[subsys] = relation;
g[e].idata.member_of[flux_subsystem_containment] = flux_relation_contains;
g[e].name[flux_subsystem_containment] = flux_relation_contains;
if (add_metadata (m, e, parent, v, g) < 0)
return -1;

Expand All @@ -386,8 +386,8 @@ int resource_reader_hwloc_t::walk_hwloc (resource_graph_t &g,
+ g[v].name + " -> " + g[parent].name + "; ";
return -1;
}
g[e].idata.member_of[subsys] = rev_relation;
g[e].name[subsys] = rev_relation;
g[e].idata.member_of[flux_subsystem_containment] = flux_relation_in;
g[e].name[flux_subsystem_containment] = flux_relation_in;
if (add_metadata (m, e, v, parent, g) < 0)
return -1;
}
Expand Down
22 changes: 14 additions & 8 deletions resource/readers/resource_reader_jgf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ extern "C" {
#include "resource/readers/resource_reader_jgf.hpp"
#include "resource/store/resource_graph_store.hpp"
#include "resource/planner/c/planner.h"
#include "boost/flyweight.hpp"

using namespace Flux;
using namespace Flux::resource_model;
Expand Down Expand Up @@ -456,8 +457,10 @@ vtx_t resource_reader_jgf_t::create_vtx (resource_graph_t &g,
g[v].paths = fetcher.paths;
g[v].schedule.plans = plans;
g[v].idata.x_checker = x_checker;
for (auto kv : g[v].paths)
g[v].idata.member_of[kv.first] = "*";
for (auto kv : g[v].paths) {
boost::flyweight<std::string> fly_first(kv.first);
g[v].idata.member_of[fly_first] = flux_match_any;
}

done:
return v;
Expand Down Expand Up @@ -511,11 +514,13 @@ int resource_reader_jgf_t::add_graph_metadata (vtx_t v,
resource_graph_metadata_t &m)
{
int rc = -1;
std::pair<std::map<std::string, vtx_t>::iterator, bool> ptr;
std::pair<std::map<boost::flyweight<std::string>, vtx_t>::iterator, bool> ptr;

for (auto kv : g[v].paths) {
boost::flyweight<std::string> fly_second(kv.second);
boost::flyweight<std::string> fly_first(kv.first);
if (is_root (kv.second)) {
ptr = m.roots.emplace (kv.first, v);
ptr = m.roots.emplace (fly_first, v);
if (!ptr.second) {
errno = EINVAL;
m_err_msg += __FUNCTION__;
Expand Down Expand Up @@ -993,7 +998,7 @@ int resource_reader_jgf_t::unpack_edges (resource_graph_t &g,
json_object_foreach (name, key, value) {
g[e].name[std::string (key)]
= std::string (json_string_value (value));
g[e].idata.member_of[std::string (key)]
g[e].idata.member_of[boost::flyweight<std::string> (key)]
= std::string (json_string_value (value));
}
// add this edge to by_outedges metadata
Expand Down Expand Up @@ -1044,11 +1049,12 @@ int resource_reader_jgf_t::update_src_edge (resource_graph_t &g,
if (vmap[source].is_roots.empty ())
return 0;

for (auto &kv : vmap[source].is_roots)
m.v_rt_edges[kv.first].set_for_trav_update (vmap[source].needs,
for (auto &kv : vmap[source].is_roots) {
boost::flyweight<std::string> fly_first(kv.first);
m.v_rt_edges[fly_first].set_for_trav_update (vmap[source].needs,
vmap[source].exclusive,
token);

}
// This way, when a root vertex appears in multiple JGF edges
// we only update the virtual in-edge into the root only once.
vmap[source].is_roots.clear ();
Expand Down
Loading

0 comments on commit ae71d07

Please sign in to comment.