Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/optimise #29

Merged
merged 6 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pygribjump/src/pygribjump/pygribjump.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ class ExtractionRequest:
The ranges to extract.
"""
def __init__(self, req, ranges, gridHash=None):
reqstr = "retrieve,"+dic_to_request(req)
reqstr = dic_to_request(req)
rangestr = list_to_rangestr(ranges)
request = ffi.new('gribjump_extraction_request_t**')
c_reqstr = ffi.new("char[]", reqstr.encode())
Expand Down
197 changes: 73 additions & 124 deletions src/gribjump/Engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
/// @author Christopher Bradley

#include "eckit/log/Plural.h"
#include "eckit/utils/StringTools.h"

#include "metkit/mars/MarsExpension.h"
#include "metkit/mars/MarsParser.h"

#include "gribjump/Engine.h"
#include "gribjump/ExtractionItem.h"
Expand All @@ -26,95 +28,8 @@ namespace gribjump {
//----------------------------------------------------------------------------------------------------------------------

// Stringify requests and keys alphabetically
namespace
{
std::string requestToStr(const metkit::mars::MarsRequest& request) {
std::stringstream ss;
std::string separator = "";
std::vector<std::string> keys = request.params();
std::sort(keys.begin(), keys.end());
for(const auto& key : keys) {
ss << separator << key << "=" << request[key];
separator = ",";
}
return ss.str();
}

//----------------------------------------------------------------------------------------------------------------------


class CollectFlattenedRequests : public metkit::mars::FlattenCallback {
public:
CollectFlattenedRequests(std::vector<metkit::mars::MarsRequest>& flattenedRequests) : flattenedRequests_(flattenedRequests) {}

virtual void operator()(const metkit::mars::MarsRequest& req) {
flattenedRequests_.push_back(req);
}

std::vector<metkit::mars::MarsRequest>& flattenedRequests_;
};

std::vector<metkit::mars::MarsRequest> flattenRequest(const metkit::mars::MarsRequest& request) {

metkit::mars::MarsExpension expansion(false);
metkit::mars::DummyContext ctx;
std::vector<metkit::mars::MarsRequest> flattenedRequests;

CollectFlattenedRequests cb(flattenedRequests);
expansion.flatten(ctx, request, cb);

LOG_DEBUG_LIB(LibGribJump) << "Base request: " << request << std::endl;

for (const auto& req : flattenedRequests) {
LOG_DEBUG_LIB(LibGribJump) << " Flattened request: " << req << std::endl;
}

return flattenedRequests;
}

// Stringify requests, and flatten if necessary

typedef std::map<metkit::mars::MarsRequest, std::vector<std::string>> flattenedKeys_t;

flattenedKeys_t buildFlatKeys(const ExtractionRequests& requests, bool flatten) {

flattenedKeys_t keymap;

for (const auto& req : requests) {
const metkit::mars::MarsRequest& baseRequest = req.request();
keymap[baseRequest] = std::vector<std::string>();

// Assume baseRequest has cardinality >= 1 and may need to be flattened
if (flatten) {
std::vector<metkit::mars::MarsRequest> flat = flattenRequest(baseRequest);
for (const auto& r : flat) {
keymap[baseRequest].push_back(requestToStr(r));
}
}

// Assume baseRequest has cardinality 1
else {
keymap[baseRequest].push_back(requestToStr(baseRequest));
}

eckit::Log::debug<LibGribJump>() << "Flattened keys for request " << baseRequest << ": " << keymap[baseRequest] << std::endl;
}

return keymap;
}

metkit::mars::MarsRequest unionRequest(const MarsRequests& requests) {

/// @todo: we should do some check not to merge on keys like class and stream
metkit::mars::MarsRequest unionRequest = requests.front();
for(size_t i = 1; i < requests.size(); ++i) {
unionRequest.merge(requests[i]);
}

eckit::Log::info() << "Gribjump: Union request is " << unionRequest << std::endl;

return unionRequest;
}
namespace {
// ----------------------------------------------------------------------------------------------------------------------

bool isRemote(eckit::URI uri) {
return uri.scheme() == "fdb";
Expand All @@ -127,43 +42,73 @@ Engine::Engine() {}

Engine::~Engine() {}

ExItemMap Engine::buildKeyToExtractionItem(const ExtractionRequests& requests, bool flatten){
ExItemMap keyToExtractionItem;

flattenedKeys_t flatKeys = buildFlatKeys(requests, flatten); // Map from base request to {flattened keys}

LOG_DEBUG_LIB(LibGribJump) << "Built flat keys" << std::endl;
metkit::mars::MarsRequest Engine::buildRequestMap(ExtractionRequests& requests, ExItemMap& keyToExtractionItem ){
// Split strings into one unified map
// We also canonicalise the requests such that their keys are in alphabetical order
/// @todo: Note that it is not in general possible to arbitrary requests into a single request. In future, we should look into
/// merging into the minimum number of requests.

std::map<std::string, std::set<std::string>> keyValues;
for (auto& r : requests) {
const std::string& s = r.requestString();
std::vector<std::string> kvs = eckit::StringTools::split(",", s); /// @todo might be faster to use tokenizer directly.
for (auto& kv : kvs) {
std::vector<std::string> kv_s = eckit::StringTools::split("=", kv);
if (kv_s.size() != 2) continue; // ignore verb
keyValues[kv_s[0]].insert(kv_s[1]);
}

// Create the 1-to-1 map
for (size_t i = 0; i < requests.size(); i++) {
const metkit::mars::MarsRequest& basereq = requests[i].request();
const std::vector<std::string> keys = flatKeys[basereq];
for (const auto& key : keys) {
ASSERT(keyToExtractionItem.find(key) == keyToExtractionItem.end()); /// @todo support duplicated requests?
auto extractionItem = std::make_unique<ExtractionItem>(basereq, requests[i].ranges());
extractionItem->gridHash(requests[i].gridHash());
keyToExtractionItem.emplace(key, std::move(extractionItem)); // 1-to-1-map
// Canonicalise string by sorting keys
std::sort(kvs.begin(), kvs.end());
std::string canonicalised = "";
for (auto& kv : kvs) {
canonicalised += kv;
if (kv != kvs.back()) {
canonicalised += ",";
}
}
ASSERT(keyToExtractionItem.find(canonicalised) == keyToExtractionItem.end()); // no repeats
r.requestString(canonicalised);
auto extractionItem = std::make_unique<ExtractionItem>(canonicalised, r.ranges());
extractionItem->gridHash(r.gridHash());
keyToExtractionItem.emplace(canonicalised, std::move(extractionItem)); // 1-to-1-map
}

return keyToExtractionItem;
}

filemap_t Engine::buildFileMap(const ExtractionRequests& requests, ExItemMap& keyToExtractionItem) {
// Map files to ExtractionItem
eckit::Timer timer("Gribjump Engine: Building file map");

std::vector<metkit::mars::MarsRequest> marsrequests;
for (const auto& req : requests) {
marsrequests.push_back(req.request());
// Construct the union request

std::string result = "retrieve,";
size_t i = 0;
for (auto& [key, values] : keyValues) {
result += key + "=";
if (values.size() == 1) {
result += *values.begin();
} else {
size_t j = 0;
for (auto& value : values) {
result += value;
if (j != values.size() - 1) {
result += "/";
}
j++;
}
}
if (i != keyValues.size() - 1) {
result += ",";
}
i++;
}

const metkit::mars::MarsRequest req = unionRequest(marsrequests);
MetricsManager::instance().set("union_request", req.asString());
timer.reset("Gribjump Engine: Flattened requests and constructed union request");
std::istringstream istream(result);
metkit::mars::MarsParser parser(istream);
std::vector<metkit::mars::MarsParsedRequest> unionRequests = parser.parse();
ASSERT(unionRequests.size() == 1);

filemap_t filemap = FDBLister::instance().fileMap(req, keyToExtractionItem);
return unionRequests[0];
}

filemap_t Engine::buildFileMap(const metkit::mars::MarsRequest& unionrequest, ExItemMap& keyToExtractionItem) {
// Map files to ExtractionItem
filemap_t filemap = FDBLister::instance().fileMap(unionrequest, keyToExtractionItem);
return filemap;
}

Expand All @@ -174,10 +119,11 @@ void Engine::forwardRemoteExtraction(filemap_t& filemap) {
const std::map<std::string, std::string>& servermap_str = LibGribJump::instance().config().serverMap();
ASSERT(!servermap_str.empty());

for (auto& [fdb, gj] : servermap_str) {
LOG_DEBUG_LIB(LibGribJump) << "Servermap: " << fdb << " -> " << gj << std::endl;
if (LibGribJump::instance().debug()) {
for (auto& [fdb, gj] : servermap_str) {
LOG_DEBUG_LIB(LibGribJump) << "Servermap: " << fdb << " -> " << gj << std::endl;
}
}

std::unordered_map<eckit::net::Endpoint, eckit::net::Endpoint> servermap;
for (auto& [fdb, gj] : servermap_str) {
eckit::net::Endpoint fdbEndpoint(fdb);
Expand Down Expand Up @@ -247,11 +193,14 @@ void Engine::scheduleTasks(filemap_t& filemap){
taskGroup_.waitForTasks();
}

ResultsMap Engine::extract(const ExtractionRequests& requests, bool flatten) {
ResultsMap Engine::extract(ExtractionRequests& requests) {

eckit::Timer timer("Engine::extract");
ExItemMap keyToExtractionItem = buildKeyToExtractionItem(requests, flatten); // Owns the ExtractionItems
filemap_t filemap = buildFileMap(requests, keyToExtractionItem);

ExItemMap keyToExtractionItem;
metkit::mars::MarsRequest unionreq = buildRequestMap(requests, keyToExtractionItem);

filemap_t filemap = buildFileMap(unionreq, keyToExtractionItem);
MetricsManager::instance().set("elapsed_build_filemap", timer.elapsed());
timer.reset("Gribjump Engine: Built file map");

Expand Down
6 changes: 3 additions & 3 deletions src/gribjump/Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class Engine {
Engine();
~Engine();

ResultsMap extract(const ExtractionRequests& requests, bool flattenRequests = false);
ResultsMap extract(ExtractionRequests& requests);

// byfiles: scan entire file, not just fields matching request
size_t scan(const MarsRequests& requests, bool byfiles = false);
Expand All @@ -43,10 +43,10 @@ class Engine {

private:

filemap_t buildFileMap(const ExtractionRequests& requests, ExItemMap& keyToExtractionItem);
ExItemMap buildKeyToExtractionItem(const ExtractionRequests& requests, bool flatten);
filemap_t buildFileMap(const metkit::mars::MarsRequest& unionrequest, ExItemMap& keyToExtractionItem);
ResultsMap collectResults(ExItemMap& keyToExtractionItem);
void forwardRemoteExtraction(filemap_t& filemap);
metkit::mars::MarsRequest buildRequestMap(ExtractionRequests& requests, ExItemMap& keyToExtractionItem );

private:

Expand Down
31 changes: 3 additions & 28 deletions src/gribjump/ExtractionData.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ std::vector<double> decodeVector(eckit::Stream& s) {
return std::vector<double>(data, data + size);
}

// todo: encodeVectorVector ?

} // namespace

ExtractionResult::ExtractionResult() {}
Expand Down Expand Up @@ -120,15 +118,16 @@ eckit::Stream& operator<<(eckit::Stream& s, const ExtractionResult& o) {

//---------------------------------------------------------------------------------------------------------------------

ExtractionRequest::ExtractionRequest(const metkit::mars::MarsRequest& request, const std::vector<Range>& ranges, std::string gridHash):
ExtractionRequest::ExtractionRequest(const std::string& request, const std::vector<Range>& ranges, std::string gridHash):
ranges_(ranges),
request_(request),
gridHash_(gridHash)
{}

ExtractionRequest::ExtractionRequest() {}

ExtractionRequest::ExtractionRequest(eckit::Stream& s) {
request_ = metkit::mars::MarsRequest(s);
s >> request_;
s >> gridHash_;
size_t numRanges;
s >> numRanges;
Expand All @@ -139,30 +138,6 @@ ExtractionRequest::ExtractionRequest(eckit::Stream& s) {
}
}

std::vector<ExtractionRequest> ExtractionRequest::split(const std::string& key) const {

std::vector<metkit::mars::MarsRequest> reqs = request_.split(key);

std::vector<ExtractionRequest> requests;
requests.reserve(reqs.size());
for (auto& r : reqs) {
requests.push_back(ExtractionRequest(r, ranges_));
}
return requests;
}

std::vector<ExtractionRequest> ExtractionRequest::split(const std::vector<std::string>& keys) const {

std::vector<metkit::mars::MarsRequest> reqs = request_.split(keys);

std::vector<ExtractionRequest> requests;
requests.reserve(reqs.size());
for (auto& r : reqs) {
requests.push_back(ExtractionRequest(r, ranges_));
}
return requests;
}

eckit::Stream& operator<<(eckit::Stream& s, const ExtractionRequest& o) {
o.encode(s);
return s;
Expand Down
12 changes: 4 additions & 8 deletions src/gribjump/ExtractionData.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ namespace gribjump {

//----------------------------------------------------------------------------------------------------------------------


/// @todo This class is now redundant thanks to ExtractionItem.

class ExtractionResult {
public: // methods

Expand Down Expand Up @@ -77,13 +74,12 @@ class ExtractionRequest {
public: // methods

ExtractionRequest();
ExtractionRequest(const metkit::mars::MarsRequest&, const std::vector<Range>&, std::string gridHash="");
ExtractionRequest(const std::string&, const std::vector<Range>&, std::string gridHash="");
explicit ExtractionRequest(eckit::Stream& s);

std::vector<ExtractionRequest> split(const std::vector<std::string>& keys) const;
std::vector<ExtractionRequest> split(const std::string& key) const;
const std::vector<Range>& ranges() const {return ranges_;}
const metkit::mars::MarsRequest& request() const {return request_;}
const std::string& requestString() const {return request_;}
void requestString(const std::string& s) {request_ = s;}
const std::string& gridHash() const {return gridHash_;}

private: // methods
Expand All @@ -94,7 +90,7 @@ class ExtractionRequest {

private: // members
std::vector<Range> ranges_;
metkit::mars::MarsRequest request_;
std::string request_;
std::string gridHash_;
};

Expand Down
Loading
Loading