diff --git a/pygribjump/gribjump_c.h b/pygribjump/gribjump_c.h index 3336a92..5b7a627 100644 --- a/pygribjump/gribjump_c.h +++ b/pygribjump/gribjump_c.h @@ -17,9 +17,9 @@ int gribjump_new_handle(gribjump_handle_t** gj); int gribjump_delete_handle(gribjump_handle_t* gj); int extract_single(gribjump_handle_t* handle, gribjump_extraction_request_t* request, gribjump_extraction_result_t*** results_array, unsigned long* nfields); -int extract(gribjump_handle_t* handle, gribjump_extraction_request_t** requests, unsigned long nrequests, gribjump_extraction_result_t**** results_array, unsigned long** nfields); +int extract(gribjump_handle_t* handle, gribjump_extraction_request_t** requests, unsigned long nrequests, gribjump_extraction_result_t**** results_array, unsigned long** nfields, const char* ctx); -int gribjump_new_request(gribjump_extraction_request_t** request, const char* reqstr, const char* rangesstr); +int gribjump_new_request(gribjump_extraction_request_t** request, const char* reqstr, const char* rangesstr, const char* gridhash); int gribjump_delete_request(gribjump_extraction_request_t* request); int gribjump_new_result(gribjump_extraction_result_t** result); int gribjump_result_values(gribjump_extraction_result_t* result, double*** values, unsigned long* nrange, unsigned long** nvalues); @@ -32,3 +32,5 @@ int gribjump_axes_values(gj_axes_t* axes, const char* key, const char*** values_ int gribjump_delete_axes(gj_axes_t* axes); int gribjump_initialise(); + +const char* gribjump_error_string(int err); \ No newline at end of file diff --git a/pygribjump/pygribjump.py b/pygribjump/pygribjump.py index 1951edd..32b4b3c 100644 --- a/pygribjump/pygribjump.py +++ b/pygribjump/pygribjump.py @@ -77,7 +77,7 @@ def wrapped_fn(*args, **kwargs): # TODO Error string. See pyfdb retval = fn(*args, **kwargs) if retval != 0: - error_str = "Error in function {}".format(name) + error_str = f"Error in function {name}: {ffi.string(self.__lib.gribjump_error_string(retval)).decode()}" raise GribJumpException(error_str) return retval @@ -97,7 +97,8 @@ def __init__(self): # Set free function self.__gribjump = ffi.gc(gribjump[0], lib.gribjump_delete_handle) - def extract(self, polyrequest, dump=True): + def extract(self, polyrequest, ctx=None, dump=True): + # TODO Add md5 hash to request """ Parameters ---------- @@ -112,14 +113,21 @@ def extract(self, polyrequest, dump=True): stored in the original buffer, and will be garbage collected when the result object is garbage collected. """ - requests = [ExtractionRequest(req, ranges) for req, ranges in polyrequest] + # requests = [ExtractionRequest(req, ranges, hash) for req, ranges, hash in polyrequest] + requests = self._unpack_polyrequest(polyrequest) # results_array contains values, for each field, for each request. results_array = ffi.new('gribjump_extraction_result_t****') nfields = ffi.new('unsigned long**') nrequests = len(requests) c_requests = ffi.new('gribjump_extraction_request_t*[]', [r.ctype for r in requests]) - lib.extract(self.__gribjump, c_requests, nrequests, results_array, nfields) + if (ctx): + logctx=str(ctx) + else: + logctx="" + + logctx_c = ffi.new('const char[]', logctx.encode('ascii')) + lib.extract(self.__gribjump, c_requests, nrequests, results_array, nfields, logctx_c) if dump: res = [ @@ -132,6 +140,7 @@ def extract(self, polyrequest, dump=True): ] return res + # @todo review if we still need this method def extract_singles(self, polyrequest, dump=True): """ Carry out a series of single extractions, rather than a single polytope extraction. @@ -149,7 +158,7 @@ def extract_singles(self, polyrequest, dump=True): stored in the original buffer, and will be garbage collected when the result object is garbage collected. """ - requests = [ExtractionRequest(reqstr, ranges) for reqstr, ranges in polyrequest] + requests = self._unpack_polyrequest(polyrequest) if dump: # Copy values, allow original buffer to be garbage collected. @@ -171,6 +180,20 @@ def extract_singles(self, polyrequest, dump=True): def extract_str(self, reqstr, rangestr): return self.extract_single(ExtractionRequest(reqstr, rangestr)) + + def _unpack_polyrequest(self, polyrequest): + requests = [] + for item in polyrequest: + if len(item) == 2: + reqstr, ranges = item + hash = None + elif len(item) == 3: + reqstr, ranges, hash = item + else: + raise ValueError("Polyrequest should be a list of tuples of length 2 or 3") + requests.append(ExtractionRequest(reqstr, ranges, hash)) + return requests + def extract_single(self, request): """ @@ -227,13 +250,14 @@ class ExtractionRequest: ranges : [(lo, hi), (lo, hi), ...] The ranges to extract. """ - def __init__(self, req, ranges): + def __init__(self, req, ranges, gridHash=None): reqstr = "retrieve,"+dic_to_request(req) rangestr = list_to_rangestr(ranges) request = ffi.new('gribjump_extraction_request_t**') c_reqstr = ffi.new("char[]", reqstr.encode()) c_rangestr = ffi.new("char[]", rangestr.encode()) - lib.gribjump_new_request(request, c_reqstr, c_rangestr) + c_hash = ffi.NULL if gridHash is None else ffi.new("char[]", gridHash.encode()) + lib.gribjump_new_request(request, c_reqstr, c_rangestr, c_hash) self.__request = ffi.gc(request[0], lib.gribjump_delete_request) @property @@ -327,4 +351,5 @@ def list_to_rangestr(ranges): def dic_to_request(dic): # e.g. {"class":"od", "expver":"0001", "levtype":"pl"} -> "class=od,expver=0001,levtype=pl" - return ','.join(['='.join([k, v]) for k, v in dic.items()]) \ No newline at end of file + return ','.join(['='.join([k, v]) for k, v in dic.items()]) + diff --git a/src/gribjump/CMakeLists.txt b/src/gribjump/CMakeLists.txt index f48bafb..ca2af78 100644 --- a/src/gribjump/CMakeLists.txt +++ b/src/gribjump/CMakeLists.txt @@ -44,6 +44,7 @@ list( APPEND gribjump_srcs GribJumpException.h ExtractionData.cc ExtractionData.h + Metrics.h Types.h ) diff --git a/src/gribjump/Engine.cc b/src/gribjump/Engine.cc index 734ecaa..da319c9 100644 --- a/src/gribjump/Engine.cc +++ b/src/gribjump/Engine.cc @@ -76,12 +76,12 @@ std::vector flattenRequest(const metkit::mars::MarsRe typedef std::map> flattenedKeys_t; -flattenedKeys_t buildFlatKeys(const std::vector& requests, bool flatten) { +flattenedKeys_t buildFlatKeys(const ExtractionRequests& requests, bool flatten) { flattenedKeys_t keymap; - for (const auto& baseRequest : requests) { - + for (const auto& req : requests) { + const metkit::mars::MarsRequest& baseRequest = req.request(); keymap[baseRequest] = std::vector(); // Assume baseRequest has cardinality >= 1 and may need to be flattened @@ -127,7 +127,7 @@ Engine::Engine() {} Engine::~Engine() {} -ExItemMap Engine::buildKeyToExtractionItem(const MarsRequests& requests, const RangesList& ranges, bool flatten){ +ExItemMap Engine::buildKeyToExtractionItem(const ExtractionRequests& requests, bool flatten){ ExItemMap keyToExtractionItem; flattenedKeys_t flatKeys = buildFlatKeys(requests, flatten); // Map from base request to {flattened keys} @@ -136,21 +136,30 @@ ExItemMap Engine::buildKeyToExtractionItem(const MarsRequests& requests, const R // Create the 1-to-1 map for (size_t i = 0; i < requests.size(); i++) { - const metkit::mars::MarsRequest& basereq = requests[i]; + const metkit::mars::MarsRequest& basereq = requests[i].request(); const std::vector keys = flatKeys[basereq]; for (const auto& key : keys) { ASSERT(keyToExtractionItem.find(key) == keyToExtractionItem.end()); /// @todo support duplicated requests? - keyToExtractionItem.emplace(key, new ExtractionItem(basereq, ranges[i])); // 1-to-1-map + auto extractionItem = std::make_unique(basereq, requests[i].ranges()); + extractionItem->gridHash(requests[i].gridHash()); + keyToExtractionItem.emplace(key, std::move(extractionItem)); // 1-to-1-map } } return keyToExtractionItem; } -filemap_t Engine::buildFileMap(const MarsRequests& requests, ExItemMap& keyToExtractionItem) { +filemap_t Engine::buildFileMap(const ExtractionRequests& requests, ExItemMap& keyToExtractionItem) { // Map files to ExtractionItem + eckit::Timer timer("Gribjump Engine: Building file map"); + + std::vector marsrequests; + for (const auto& req : requests) { + marsrequests.push_back(req.request()); + } - const metkit::mars::MarsRequest req = unionRequest(requests); + const metkit::mars::MarsRequest req = unionRequest(marsrequests); + timer.reset("Gribjump Engine: Flattened requests and constructed union request"); filemap_t filemap = FDBLister::instance().fileMap(req, keyToExtractionItem); @@ -159,13 +168,10 @@ filemap_t Engine::buildFileMap(const MarsRequests& requests, ExItemMap& keyToExt -ResultsMap Engine::extract(const MarsRequests& requests, const RangesList& ranges, bool flatten) { - - eckit::Timer timer("Gribjump Engine: extract"); - - ExItemMap keyToExtractionItem = buildKeyToExtractionItem(requests, ranges, flatten); // Owns the ExtractionItems - timer.reset("Gribjump Engine: Key to ExtractionItem map built"); +ResultsMap Engine::extract(const ExtractionRequests& requests, bool flatten) { + eckit::Timer timer("Gribjump Engine: Extracting"); + ExItemMap keyToExtractionItem = buildKeyToExtractionItem(requests, flatten); // Owns the ExtractionItems filemap_t filemap = buildFileMap(requests, keyToExtractionItem); timer.reset("Gribjump Engine: File map built"); @@ -232,6 +238,11 @@ std::map > Engine::axes(const std:: void Engine::reportErrors(eckit::Stream& client) { taskGroup_.reportErrors(client); } + +void Engine::updateMetrics(Metrics& metrics) { + metrics.nTasks = taskGroup_.nTasks(); + metrics.nFailedTasks = taskGroup_.nErrors(); +} //---------------------------------------------------------------------------------------------------------------------- } // namespace gribjump diff --git a/src/gribjump/Engine.h b/src/gribjump/Engine.h index 2cea651..00b99f7 100644 --- a/src/gribjump/Engine.h +++ b/src/gribjump/Engine.h @@ -18,6 +18,7 @@ #include "gribjump/Task.h" #include "gribjump/Lister.h" #include "gribjump/Types.h" +#include "gribjump/Metrics.h" namespace gribjump { @@ -27,7 +28,7 @@ class Engine { Engine(); ~Engine(); - ResultsMap extract(const MarsRequests& requests, const RangesList& ranges, bool flattenRequests = false); + ResultsMap extract(const ExtractionRequests& requests, bool flattenRequests = false); // byfiles: scan entire file, not just fields matching request size_t scan(const MarsRequests& requests, bool byfiles = false); @@ -36,14 +37,16 @@ class Engine { void reportErrors(eckit::Stream& client_); + void updateMetrics(Metrics& metrics); + private: - filemap_t buildFileMap(const MarsRequests& requests, ExItemMap& keyToExtractionItem); - ExItemMap buildKeyToExtractionItem(const MarsRequests& requests, const RangesList& ranges, bool flatten); + filemap_t buildFileMap(const ExtractionRequests& requests, ExItemMap& keyToExtractionItem); + ExItemMap buildKeyToExtractionItem(const ExtractionRequests& requests, bool flatten); private: - TaskGroup taskGroup_; + TaskGroup taskGroup_; /// @todo Maybe we should be returning the taskGroup, rather than storing it here. }; diff --git a/src/gribjump/ExtractionData.cc b/src/gribjump/ExtractionData.cc index 8b8a599..2dc249e 100644 --- a/src/gribjump/ExtractionData.cc +++ b/src/gribjump/ExtractionData.cc @@ -35,6 +35,8 @@ std::vector decodeVector(eckit::Stream& s) { return std::vector(data, data + size); } +// todo: encodeVectorVector ? + } // namespace ExtractionResult::ExtractionResult() {} @@ -118,14 +120,16 @@ eckit::Stream& operator<<(eckit::Stream& s, const ExtractionResult& o) { //--------------------------------------------------------------------------------------------------------------------- -ExtractionRequest::ExtractionRequest(const metkit::mars::MarsRequest& request, const std::vector& ranges): +ExtractionRequest::ExtractionRequest(const metkit::mars::MarsRequest& request, const std::vector& ranges, std::string gridHash): ranges_(ranges), - request_(request) + request_(request), + gridHash_(gridHash) {} ExtractionRequest::ExtractionRequest() {} ExtractionRequest::ExtractionRequest(eckit::Stream& s) { request_ = metkit::mars::MarsRequest(s); + s >> gridHash_; size_t numRanges; s >> numRanges; for (size_t j = 0; j < numRanges; j++) { @@ -166,6 +170,7 @@ eckit::Stream& operator<<(eckit::Stream& s, const ExtractionRequest& o) { void ExtractionRequest::encode(eckit::Stream& s) const { s << request_; + s << gridHash_; s << ranges_.size(); for (auto& [start, end] : ranges_) { s << start << end; diff --git a/src/gribjump/ExtractionData.h b/src/gribjump/ExtractionData.h index 98e4b6f..4425e25 100644 --- a/src/gribjump/ExtractionData.h +++ b/src/gribjump/ExtractionData.h @@ -65,20 +65,19 @@ class ExtractionResult { //---------------------------------------------------------------------------------------------------------------------- -/// @todo This class is now redundant thanks to ExtractionItem. - class ExtractionRequest { public: // methods ExtractionRequest(); - ExtractionRequest(const metkit::mars::MarsRequest&, const std::vector&); + ExtractionRequest(const metkit::mars::MarsRequest&, const std::vector&, std::string gridHash=""); explicit ExtractionRequest(eckit::Stream& s); std::vector split(const std::vector& keys) const; std::vector split(const std::string& key) const; - const std::vector& getRanges() const {return ranges_;} - const metkit::mars::MarsRequest& getRequest() const {return request_;} + const std::vector& ranges() const {return ranges_;} + const metkit::mars::MarsRequest& request() const {return request_;} + const std::string& gridHash() const {return gridHash_;} private: // methods void print(std::ostream&) const; @@ -89,6 +88,7 @@ class ExtractionRequest { private: // members std::vector ranges_; metkit::mars::MarsRequest request_; + std::string gridHash_; }; //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/gribjump/ExtractionItem.h b/src/gribjump/ExtractionItem.h index 80d9805..56c3edc 100644 --- a/src/gribjump/ExtractionItem.h +++ b/src/gribjump/ExtractionItem.h @@ -58,6 +58,9 @@ class ExtractionItem : public eckit::NonCopyable { return offset; } + void gridHash(const std::string& hash) { gridHash_ = hash; } + const std::string& gridHash() const { return gridHash_; } + void URI(const eckit::URI& uri) { uri_ = uri; } void values(ExValues values) { values_ = std::move(values); } @@ -105,6 +108,9 @@ class ExtractionItem : public eckit::NonCopyable { // Set on Extraction ExValues values_; ExMask mask_; + + // Optional extras + std::string gridHash_=""; //< if supplied, check hash matches the jumpinfo }; // ------------------------------------------------------------------ diff --git a/src/gribjump/GribJump.cc b/src/gribjump/GribJump.cc index c70f263..b77772e 100644 --- a/src/gribjump/GribJump.cc +++ b/src/gribjump/GribJump.cc @@ -38,8 +38,8 @@ size_t GribJump::scan(const std::vector requests, boo } -std::vector> GribJump::extract(const std::vector& requests) { - std::vector> out = impl_->extract(requests); +std::vector> GribJump::extract(const std::vector& requests, LogContext ctx) { + std::vector> out = impl_->extract(requests, ctx); return out; } diff --git a/src/gribjump/GribJump.h b/src/gribjump/GribJump.h index 186027b..7882cb3 100644 --- a/src/gribjump/GribJump.h +++ b/src/gribjump/GribJump.h @@ -46,7 +46,7 @@ class GribJump { size_t scan(const eckit::PathName& path); size_t scan(std::vector requests, bool byfiles = false); - std::vector> extract(const std::vector& requests); + std::vector> extract(const std::vector& requests, LogContext ctx=LogContext("none")); std::vector> extract(const eckit::PathName& path, const std::vector& offsets, const std::vector>& ranges); std::map> axes(const std::string& request); diff --git a/src/gribjump/GribJumpBase.h b/src/gribjump/GribJumpBase.h index 9d77bd0..ccecd59 100644 --- a/src/gribjump/GribJumpBase.h +++ b/src/gribjump/GribJumpBase.h @@ -18,12 +18,12 @@ #include "eckit/memory/NonCopyable.h" #include "eckit/filesystem/URI.h" - #include "gribjump/ExtractionData.h" #include "gribjump/ExtractionItem.h" #include "gribjump/Config.h" #include "gribjump/Stats.h" #include "gribjump/LibGribJump.h" +#include "gribjump/Metrics.h" #include "gribjump/Types.h" namespace fdb5 { @@ -45,7 +45,7 @@ class GribJumpBase : public eckit::NonCopyable { size_t virtual scan(const eckit::PathName& path) = 0; virtual size_t scan(const std::vector requests, bool byfiles) = 0; - virtual std::vector> extract(std::vector) = 0; + virtual std::vector> extract(std::vector, LogContext ctx=LogContext("none")) = 0; virtual std::vector> extract(const eckit::PathName& path, const std::vector& offsets, const std::vector>& ranges) = 0; virtual std::map> axes(const std::string& request) = 0; diff --git a/src/gribjump/LocalGribJump.cc b/src/gribjump/LocalGribJump.cc index d05ce86..b83461c 100644 --- a/src/gribjump/LocalGribJump.cc +++ b/src/gribjump/LocalGribJump.cc @@ -87,23 +87,15 @@ std::vector> LocalGribJump::extract(const eckit: } /// @todo, change API, remove extraction request -std::vector> LocalGribJump::extract(std::vector polyRequest) { +std::vector> LocalGribJump::extract(ExtractionRequests requests, LogContext ctx) { - std::vector requests; - std::vector> ranges; - bool flatten = true; - - for (auto& req : polyRequest) { - requests.push_back(req.getRequest()); - ranges.push_back(req.getRanges()); - } - - ResultsMap results = extract(requests, ranges, flatten); + Engine engine; + ResultsMap results = engine.extract(requests, flatten); std::vector> extractionResults; - for (auto& req : polyRequest) { - auto it = results.find(req.getRequest()); + for (auto& req : requests) { + auto it = results.find(req.request()); ASSERT(it != results.end()); std::vector res; for (auto& item : it->second) { @@ -119,7 +111,13 @@ std::vector> LocalGribJump::extract(std::vector& requests, const std::vector>& ranges, bool flatten) { Engine engine; - return engine.extract(requests, ranges, flatten); + ExtractionRequests extractionRequests; + + for (size_t i = 0; i < requests.size(); i++) { + extractionRequests.push_back(ExtractionRequest(requests[i], ranges[i])); + } + + return engine.extract(extractionRequests, flatten); } std::map> LocalGribJump::axes(const std::string& request) { diff --git a/src/gribjump/LocalGribJump.h b/src/gribjump/LocalGribJump.h index bb0172a..57afbc2 100644 --- a/src/gribjump/LocalGribJump.h +++ b/src/gribjump/LocalGribJump.h @@ -39,7 +39,7 @@ class LocalGribJump : public GribJumpBase { // old API std::vector> extract(const eckit::PathName& path, const std::vector& offsets, const std::vector>& ranges) override; - std::vector> extract(std::vector) override; + std::vector> extract(std::vector, LogContext ctx=LogContext("none")) override; std::map> axes(const std::string& request) override; diff --git a/src/gribjump/Metrics.h b/src/gribjump/Metrics.h new file mode 100644 index 0000000..c576f17 --- /dev/null +++ b/src/gribjump/Metrics.h @@ -0,0 +1,88 @@ +/* + * (C) Copyright 2024- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ + +/// @author Christopher Bradley + +#pragma once + +#include "eckit/serialisation/Stream.h" +#include "eckit/log/Timer.h" +#include "eckit/log/JSON.h" + +namespace gribjump { + +class LogContext { +public: + LogContext(std::string s="none") : context_(s) {} + + explicit LogContext(eckit::Stream& s) { + s >> context_; + } + + ~LogContext() {} +private: + void encode(eckit::Stream& s) const { + s << context_; + } + friend eckit::Stream& operator<<(eckit::Stream& s, const LogContext& o){ + o.encode(s); + return s; + } + void print(std::ostream& s) const { + s << context_; + } + friend std::ostream& operator<<(std::ostream& s, const LogContext& o){ + o.print(s); + return s; + } + +private: + std::string context_; +}; + + +// -------------------------------------------------------------------------------------------------------------------------------- + +class Metrics { + +public: // methods + + Metrics(LogContext ctx) : context_(ctx) { + } + + ~Metrics() {} + + void report() { + eckit::Log::metrics() << "{nRequests:" << nRequests + << ",nTasks:" << nTasks + << ",nFailedTasks:" << nFailedTasks + << ",timeReceive:" << timeReceive + << ",timeExecute:" << timeExecute + << ",timeReply:" << timeReply + << ",timeElapsed:" << timer_.elapsed() + << ",Context:" << context_ + << "}" << std::endl; + } + +public: // members + + int nRequests = -1; + int nTasks = -1; + int nFailedTasks = -1; + size_t timeReceive = 0; + size_t timeExecute = 0; + size_t timeReply = 0; + LogContext context_; + + eckit::Timer timer_; + +}; + +} // namespace gribjump diff --git a/src/gribjump/Task.cc b/src/gribjump/Task.cc index 5b61aa2..6b67d7a 100644 --- a/src/gribjump/Task.cc +++ b/src/gribjump/Task.cc @@ -15,6 +15,7 @@ #include "eckit/thread/AutoLock.h" #include "eckit/io/MemoryHandle.h" #include "eckit/io/Length.h" +#include "eckit/io/AutoCloser.h" #include "fdb5/api/FDB.h" @@ -108,10 +109,10 @@ void TaskGroup::waitForTasks() { LOG_DEBUG_LIB(LibGribJump) << "All tasks complete" << std::endl; } -void TaskGroup::reportErrors(eckit::Stream& client_) { - client_ << errors_.size(); +void TaskGroup::reportErrors(eckit::Stream& client) { + client << errors_.size(); for (const auto& s : errors_) { - client_ << s; + client << s; } } @@ -152,16 +153,23 @@ void FileExtractionTask::extract() { eckit::FileHandle fh(fname_); fh.openForRead(); + eckit::AutoCloser closer(fh); for (size_t i = 0; i < extractionItems_.size(); i++) { ExtractionItem* extractionItem = extractionItems_[i]; const JumpInfo& info = *infos[i]; + const std::string& expectedHash = extractionItem->gridHash(); + if (expectedHash.size() && (expectedHash != info.md5GridSection())) { + std::stringstream ss; + ss << "Grid hash mismatch for extraction item " << i << " in file " << fname_; + ss << ". Expected: " << expectedHash << ", got: " << info.md5GridSection(); + throw eckit::BadValue(ss.str()); + } + std::unique_ptr jumper(JumperFactory::instance().build(info)); // todo, dont build a new jumper for each info. jumper->extract(fh, offsets[i], info, *extractionItem); } - - fh.close(); } //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/gribjump/Task.h b/src/gribjump/Task.h index c9308c3..15657bd 100644 --- a/src/gribjump/Task.h +++ b/src/gribjump/Task.h @@ -78,9 +78,18 @@ class TaskGroup { /// Wait for all queued tasks to be executed void waitForTasks(); - void reportErrors(eckit::Stream& client_); + void reportErrors(eckit::Stream& client); std::mutex debugMutex_; + + size_t nTasks() const { + std::lock_guard lock(m_); + return tasks_.size(); + } + size_t nErrors() const { + std::lock_guard lock(m_); + return errors_.size(); + } private: @@ -90,7 +99,7 @@ class TaskGroup { bool waiting_ = false; - std::mutex m_; + mutable std::mutex m_; std::condition_variable cv_; std::vector tasks_; //< stores tasks status, must be initialised by derived class diff --git a/src/gribjump/Types.h b/src/gribjump/Types.h index 0384b28..de2d1a6 100644 --- a/src/gribjump/Types.h +++ b/src/gribjump/Types.h @@ -18,11 +18,14 @@ namespace gribjump { class ExtractionItem; +class ExtractionRequest; using MarsRequests = std::vector; using Range = std::pair; using Interval = std::pair; using RangesList = std::vector>; +using GridHashes = std::vector; +using ExtractionRequests = std::vector; using Bitmap = std::vector; using Ranges = std::vector; diff --git a/src/gribjump/gribjump_c.cc b/src/gribjump/gribjump_c.cc index 9d5d2e6..3e468f2 100644 --- a/src/gribjump/gribjump_c.cc +++ b/src/gribjump/gribjump_c.cc @@ -22,6 +22,21 @@ using namespace gribjump; extern "C" { +// -------------------------------------------------------------------------------------------- +// Error handling +static std::string LAST_ERROR_STR; + +const char* gribjump_error_string(int err) { + switch (err) { + case 1: + return LAST_ERROR_STR.c_str(); + default: + return "Unknown error"; + }; +} + +// -------------------------------------------------------------------------------------------- + struct gribjump_handle_t : public GribJump { using GribJump::GribJump; }; @@ -51,7 +66,7 @@ int gribjump_delete_handle(gribjump_handle_t* handle) { return 0; } -int gribjump_new_request(gribjump_extraction_request_t** request, const char* reqstr, const char* rangesstr) { +int gribjump_new_request(gribjump_extraction_request_t** request, const char* reqstr, const char* rangesstr, const char* gridhash) { // reqstr is a string representation of a metkit::mars::MarsRequest // rangesstr is a comma-separated list of ranges, e.g. "0-10,20-30" @@ -62,7 +77,6 @@ int gribjump_new_request(gribjump_extraction_request_t** request, const char* re ASSERT(requests.size() == 1); metkit::mars::MarsRequest mreq(requests[0]); - // Parse the ranges string std::vector ranges = eckit::StringTools::split(",", rangesstr); std::vector rangevec; @@ -72,7 +86,8 @@ int gribjump_new_request(gribjump_extraction_request_t** request, const char* re rangevec.push_back(std::make_pair(std::stoi(kv[0]), std::stoi(kv[1]))); } - *request = new gribjump_extraction_request_t(mreq, rangevec); + std::string gridhash_str = gridhash ? std::string(gridhash) : ""; + *request = new gribjump_extraction_request_t(mreq, rangevec, gridhash_str); return 0; } @@ -136,6 +151,7 @@ int gribjump_delete_result(gribjump_extraction_result_t* result) { return 0; } +/// @todo review why this extract_single exists. int extract_single(gribjump_handle_t* handle, gribjump_extraction_request_t* request, gribjump_extraction_result_t*** results_array, unsigned long* nfields) { ExtractionRequest req = *request; std::vector results = handle->extract(std::vector{req})[0]; @@ -149,12 +165,29 @@ int extract_single(gribjump_handle_t* handle, gribjump_extraction_request_t* req return 0; } -int extract(gribjump_handle_t* handle, gribjump_extraction_request_t** requests, unsigned long nrequests, gribjump_extraction_result_t**** results_array, unsigned long** nfields) { +int extract(gribjump_handle_t* handle, gribjump_extraction_request_t** requests, unsigned long nrequests, gribjump_extraction_result_t**** results_array, unsigned long** nfields, const char* ctx){ std::vector reqs; for (size_t i = 0; i < nrequests; i++) { reqs.push_back(*requests[i]); } - std::vector> results = handle->extract(reqs); + LogContext logctx; + if (ctx) { + logctx = LogContext(ctx); + } + + std::vector> results; + try { + results = handle->extract(reqs, logctx); + } catch (std::exception& e) { + eckit::Log::error() << "Caught exception on C-C++ API boundary (gribjump.extract): " << e.what() << std::endl; + LAST_ERROR_STR = e.what(); + return 1; + } + catch (...) { + eckit::Log::error() << "Caught unknown exception on C-C++ API boundary (gribjump.extract)" << std::endl; + LAST_ERROR_STR = "Unrecognised and unknown exception"; + return 1; + } *nfields = new unsigned long[nrequests]; *results_array = new gribjump_extraction_result_t**[nrequests]; @@ -212,7 +245,19 @@ struct gj_axes_t { int gribjump_new_axes(gj_axes_t** axes, const char* reqstr, gribjump_handle_t* gj) { ASSERT(gj); std::string reqstr_str(reqstr); - std::map> values = gj->axes(reqstr_str); + std::map> values; + try { + values = gj->axes(reqstr_str); + } catch (std::exception& e) { + eckit::Log::error() << "Caught exception on C-C++ API boundary (gribjump.axes): " << e.what() << std::endl; + LAST_ERROR_STR = e.what(); + return 1; + } + catch (...) { + eckit::Log::error() << "Caught unknown exception on C-C++ API boundary (gribjump.axes)" << std::endl; + LAST_ERROR_STR = "Unrecognised and unknown exception"; + return 1; + } *axes = new gj_axes_t(values); return 0; } diff --git a/src/gribjump/gribjump_c.h b/src/gribjump/gribjump_c.h index adb209b..9c45da1 100644 --- a/src/gribjump/gribjump_c.h +++ b/src/gribjump/gribjump_c.h @@ -32,9 +32,9 @@ int gribjump_new_handle(gribjump_handle_t** gj); int gribjump_delete_handle(gribjump_handle_t* gj); int extract_single(gribjump_handle_t* handle, gribjump_extraction_request_t* request, gribjump_extraction_result_t*** results_array, unsigned long* nfields); -int extract(gribjump_handle_t* handle, gribjump_extraction_request_t** requests, unsigned long nrequests, gribjump_extraction_result_t**** results_array, unsigned long** nfields); +int extract(gribjump_handle_t* handle, gribjump_extraction_request_t** requests, unsigned long nrequests, gribjump_extraction_result_t**** results_array, unsigned long** nfields, const char* ctx); -int gribjump_new_request(gribjump_extraction_request_t** request, const char* reqstr, const char* rangesstr); +int gribjump_new_request(gribjump_extraction_request_t** request, const char* reqstr, const char* rangesstr, const char* gridhash); int gribjump_delete_request(gribjump_extraction_request_t* request); int gribjump_new_result(gribjump_extraction_result_t** result); @@ -51,6 +51,7 @@ int gribjump_delete_axes(gj_axes_t* axes); int gribjump_initialise(); +const char* gribjump_error_string(int err); #ifdef __cplusplus } // extern "C" diff --git a/src/gribjump/remote/GribJumpUser.cc b/src/gribjump/remote/GribJumpUser.cc index 60d34bb..2becd3c 100644 --- a/src/gribjump/remote/GribJumpUser.cc +++ b/src/gribjump/remote/GribJumpUser.cc @@ -116,8 +116,10 @@ void GribJumpUser::extract(eckit::Stream& s, eckit::Timer& timer) { /// @todo, check if this is still working. timer.reset(); + + LogContext ctx(s); - ExtractRequest request(s); + ExtractRequest request(s, ctx); timer.reset("EXTRACT requests received"); @@ -128,6 +130,7 @@ void GribJumpUser::extract(eckit::Stream& s, eckit::Timer& timer) { request.replyToClient(); // s << size_t(0); + request.reportMetrics(); timer.reset("EXTRACT results sent"); } diff --git a/src/gribjump/remote/RemoteGribJump.cc b/src/gribjump/remote/RemoteGribJump.cc index 2b9f73b..e60813f 100644 --- a/src/gribjump/remote/RemoteGribJump.cc +++ b/src/gribjump/remote/RemoteGribJump.cc @@ -83,7 +83,7 @@ size_t RemoteGribJump::scan(const std::vector request return count; } -std::vector> RemoteGribJump::extract(std::vector requests) { +std::vector> RemoteGribJump::extract(std::vector requests, LogContext ctx) { eckit::Timer timer("RemoteGribJump::extract()"); std::vector> result; @@ -94,6 +94,8 @@ std::vector> RemoteGribJump::extract(std::vector< stream << "EXTRACT"; + stream << ctx; + size_t nRequests = requests.size(); stream << nRequests; for (auto& req : requests) { @@ -162,17 +164,27 @@ std::map> RemoteGribJump::axes(cons return result; } -bool RemoteGribJump::receiveErrors(eckit::Stream& stream) { +bool RemoteGribJump::receiveErrors(eckit::Stream& stream, bool raise) { size_t nErrors; stream >> nErrors; + if (nErrors == 0) { + return false; + } + + std::stringstream ss; for (size_t i = 0; i < nErrors; i++) { std::string error; stream >> error; - eckit::Log::error() << error << std::endl; + ss << error << std::endl; + } + if (raise) { + throw eckit::RemoteException(ss.str(), Here()); + } else { + eckit::Log::error() << ss.str() << std::endl; } - return nErrors > 0; + return true; } static GribJumpBuilder builder("remote"); -} // namespace gribjump \ No newline at end of file +} // namespace gribjump diff --git a/src/gribjump/remote/RemoteGribJump.h b/src/gribjump/remote/RemoteGribJump.h index fbc6136..ea901bc 100644 --- a/src/gribjump/remote/RemoteGribJump.h +++ b/src/gribjump/remote/RemoteGribJump.h @@ -28,14 +28,14 @@ class RemoteGribJump : public GribJumpBase { size_t scan(const eckit::PathName& path) override; size_t scan(const std::vector requests, bool byfiles) override; - std::vector> extract(std::vector polyRequest) override; + std::vector> extract(std::vector polyRequest, LogContext ctx) override; std::vector> extract(const eckit::PathName& path, const std::vector& offsets, const std::vector>& ranges) override; std::map> axes(const std::string& request) override; protected: // methods - bool receiveErrors(eckit::Stream& stream); + bool receiveErrors(eckit::Stream& stream, bool raise=true); private: // members int port_; diff --git a/src/gribjump/remote/Request.cc b/src/gribjump/remote/Request.cc index 890083b..b8c64fb 100644 --- a/src/gribjump/remote/Request.cc +++ b/src/gribjump/remote/Request.cc @@ -18,7 +18,7 @@ namespace gribjump { //---------------------------------------------------------------------------------------------------------------------- -Request::Request(eckit::Stream& stream) : client_(stream) { +Request::Request(eckit::Stream& stream, LogContext ctx) : client_(stream), metrics_(ctx) { } Request::~Request() {} @@ -59,35 +59,35 @@ void ScanRequest::replyToClient() { //---------------------------------------------------------------------------------------------------------------------- -ExtractRequest::ExtractRequest(eckit::Stream& stream) : Request(stream) { + +ExtractRequest::ExtractRequest(eckit::Stream& stream, LogContext ctx) : Request(stream, ctx) { // Receive the requests // Temp, repackage the requests from old format into format the engine expects + eckit::Timer timer; size_t nRequests; client_ >> nRequests; - std::vector requests; for (size_t i = 0; i < nRequests; i++) { ExtractionRequest req(client_); - requests.push_back(req); - } - - for (auto& req : requests) { - marsRequests_.push_back(req.getRequest()); - ranges_.push_back(req.getRanges()); + requests_.push_back(req); } - flatten_ = false; // xxx hard coded for now + + metrics_.nRequests = nRequests; + metrics_.timeReceive = timer.elapsed(); } ExtractRequest::~ExtractRequest() { } void ExtractRequest::execute() { + eckit::Timer timer; - results_ = engine_.extract(marsRequests_, ranges_, flatten_); + results_ = engine_.extract(requests_, flatten_); + engine_.updateMetrics(metrics_); if (LibGribJump::instance().debug()) { for (auto& pair : results_) { @@ -97,21 +97,23 @@ void ExtractRequest::execute() { } } } + metrics_.timeExecute = timer.elapsed(); } void ExtractRequest::replyToClient() { + eckit::Timer timer; reportErrors(); // Send the results, again repackage. - size_t nRequests = marsRequests_.size(); + size_t nRequests = requests_.size(); LOG_DEBUG_LIB(LibGribJump) << "Sending " << nRequests << " results to client" << std::endl; for (size_t i = 0; i < nRequests; i++) { LOG_DEBUG_LIB(LibGribJump) << "Sending result " << i << " to client" << std::endl; - auto it = results_.find(marsRequests_[i]); + auto it = results_.find(requests_[i].request()); ASSERT(it != results_.end()); std::vector>& items = it->second; // ExtractionItems items = it->second; @@ -124,6 +126,8 @@ void ExtractRequest::replyToClient() { } LOG_DEBUG_LIB(LibGribJump) << "Sent " << nRequests << " results to client" << std::endl; + + metrics_.timeReply = timer.elapsed(); } //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/gribjump/remote/Request.h b/src/gribjump/remote/Request.h index 9c158cd..a84c860 100644 --- a/src/gribjump/remote/Request.h +++ b/src/gribjump/remote/Request.h @@ -19,6 +19,7 @@ #include "gribjump/GribJump.h" #include "gribjump/ExtractionItem.h" +#include "gribjump/Metrics.h" #include "gribjump/remote/WorkQueue.h" #include "gribjump/Engine.h" @@ -29,7 +30,7 @@ namespace gribjump { class Request { public: // methods - Request(eckit::Stream& stream); + Request(eckit::Stream& stream, LogContext ctx = LogContext("none")); virtual ~Request(); @@ -39,6 +40,8 @@ class Request { /// Reply to the client with the results of the request virtual void replyToClient() = 0; + void reportMetrics() {metrics_.report();} + protected: // methods void reportErrors(); @@ -46,8 +49,9 @@ class Request { protected: // members eckit::Stream& client_; - Engine engine_; //< Engine and schedule tasks based on request + + Metrics metrics_; }; //---------------------------------------------------------------------------------------------------------------------- @@ -77,7 +81,7 @@ class ScanRequest : public Request { class ExtractRequest : public Request { public: - ExtractRequest(eckit::Stream& stream); + ExtractRequest(eckit::Stream& stream, LogContext ctx); ~ExtractRequest(); @@ -86,9 +90,7 @@ class ExtractRequest : public Request { void replyToClient() override; private: - - std::vector> ranges_; - std::vector marsRequests_; + std::vector requests_; bool flatten_; ResultsMap results_; diff --git a/tests/test_api.cc b/tests/test_api.cc index c776fdb..b6592e0 100644 --- a/tests/test_api.cc +++ b/tests/test_api.cc @@ -143,7 +143,7 @@ CASE( "test_gribjump_api_extract" ) { // Eccodes expected values std::vector>>> expectedValues; for (auto req : polyRequest1) { - expectedValues.push_back(eccodesExtract(req.getRequest(), req.getRanges())); + expectedValues.push_back(eccodesExtract(req.request(), req.ranges())); } compareValues(expectedValues, output1); @@ -162,10 +162,10 @@ CASE( "test_gribjump_api_extract" ) { } std::vector ranges = allIntervals[0]; - PolyRequest polyRequest; - polyRequest.push_back(ExtractionRequest(requests[0], ranges)); + PolyRequest polyRequest2; + polyRequest2.push_back(ExtractionRequest(requests[0], ranges)); - std::vector> output2 = gj.extract(polyRequest); + std::vector> output2 = gj.extract(polyRequest2); EXPECT(output2.size() == 1); EXPECT(output2[0].size() == 3); @@ -175,6 +175,17 @@ CASE( "test_gribjump_api_extract" ) { // -------------------------------------------------------------------------------------------- + // Test 2.b: Extract but with an md5 hash + /// @todo, the task will raise an error, but it is caught. However, only remoteGribjump propagates the error to the user. + std::vector> output2b = gj.extract({ExtractionRequest(requests[0], ranges, "wronghash")}); + EXPECT_EQUAL(output2b[0][0]->total_values(), 0); + + // correct hash + std::vector> output2c = gj.extract({ExtractionRequest(requests[0], ranges, "33c7d6025995e1b4913811e77d38ec50")}); + EXPECT_EQUAL(output2c[0][0]->total_values(), 15); + + // -------------------------------------------------------------------------------------------- + // Test 3: Extract function using path and offsets std::vector uris; diff --git a/tests/test_engine.cc b/tests/test_engine.cc index f233a64..1acf694 100644 --- a/tests/test_engine.cc +++ b/tests/test_engine.cc @@ -104,8 +104,11 @@ CASE ("test_engine_basic") { }; Engine engine; - ResultsMap results = engine.extract(requests, allIntervals, false); - + ExtractionRequests exRequests; + for (size_t i = 0; i < requests.size(); i++) { + exRequests.push_back(ExtractionRequest(requests[i], allIntervals[i])); + } + ResultsMap results = engine.extract(exRequests, false); // print contents of map for (auto& [req, exs] : results) { @@ -168,7 +171,12 @@ CASE ("test_engine_basic") { requests = expand.expand(parsedRequests); } - results = engine.extract(requests, allIntervals, true); + exRequests.clear(); + for (size_t i = 0; i < requests.size(); i++) { + exRequests.push_back(ExtractionRequest(requests[i], allIntervals[0])); + } + + results = engine.extract(exRequests, true); // print contents of map for (auto& [req, exs] : results) {