Skip to content

Commit

Permalink
Merge pull request #15 from ecmwf/feature/context
Browse files Browse the repository at this point in the history
Feature/context
  • Loading branch information
ChrisspyB authored Sep 18, 2024
2 parents e2dd4a8 + 1407e45 commit 7f702c8
Show file tree
Hide file tree
Showing 26 changed files with 349 additions and 104 deletions.
6 changes: 4 additions & 2 deletions pygribjump/gribjump_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ int gribjump_new_handle(gribjump_handle_t** gj);
int gribjump_delete_handle(gribjump_handle_t* gj);

int extract_single(gribjump_handle_t* handle, gribjump_extraction_request_t* request, gribjump_extraction_result_t*** results_array, unsigned long* nfields);
int extract(gribjump_handle_t* handle, gribjump_extraction_request_t** requests, unsigned long nrequests, gribjump_extraction_result_t**** results_array, unsigned long** nfields);
int extract(gribjump_handle_t* handle, gribjump_extraction_request_t** requests, unsigned long nrequests, gribjump_extraction_result_t**** results_array, unsigned long** nfields, const char* ctx);

int gribjump_new_request(gribjump_extraction_request_t** request, const char* reqstr, const char* rangesstr);
int gribjump_new_request(gribjump_extraction_request_t** request, const char* reqstr, const char* rangesstr, const char* gridhash);
int gribjump_delete_request(gribjump_extraction_request_t* request);
int gribjump_new_result(gribjump_extraction_result_t** result);
int gribjump_result_values(gribjump_extraction_result_t* result, double*** values, unsigned long* nrange, unsigned long** nvalues);
Expand All @@ -32,3 +32,5 @@ int gribjump_axes_values(gj_axes_t* axes, const char* key, const char*** values_
int gribjump_delete_axes(gj_axes_t* axes);

int gribjump_initialise();

const char* gribjump_error_string(int err);
41 changes: 33 additions & 8 deletions pygribjump/pygribjump.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def wrapped_fn(*args, **kwargs):
# TODO Error string. See pyfdb
retval = fn(*args, **kwargs)
if retval != 0:
error_str = "Error in function {}".format(name)
error_str = f"Error in function {name}: {ffi.string(self.__lib.gribjump_error_string(retval)).decode()}"
raise GribJumpException(error_str)
return retval

Expand All @@ -97,7 +97,8 @@ def __init__(self):
# Set free function
self.__gribjump = ffi.gc(gribjump[0], lib.gribjump_delete_handle)

def extract(self, polyrequest, dump=True):
def extract(self, polyrequest, ctx=None, dump=True):
# TODO Add md5 hash to request
"""
Parameters
----------
Expand All @@ -112,14 +113,21 @@ def extract(self, polyrequest, dump=True):
stored in the original buffer, and will be garbage collected when the result object
is garbage collected.
"""
requests = [ExtractionRequest(req, ranges) for req, ranges in polyrequest]
# requests = [ExtractionRequest(req, ranges, hash) for req, ranges, hash in polyrequest]
requests = self._unpack_polyrequest(polyrequest)

# results_array contains values, for each field, for each request.
results_array = ffi.new('gribjump_extraction_result_t****')
nfields = ffi.new('unsigned long**')
nrequests = len(requests)
c_requests = ffi.new('gribjump_extraction_request_t*[]', [r.ctype for r in requests])
lib.extract(self.__gribjump, c_requests, nrequests, results_array, nfields)
if (ctx):
logctx=str(ctx)
else:
logctx=""

logctx_c = ffi.new('const char[]', logctx.encode('ascii'))
lib.extract(self.__gribjump, c_requests, nrequests, results_array, nfields, logctx_c)

if dump:
res = [
Expand All @@ -132,6 +140,7 @@ def extract(self, polyrequest, dump=True):
]
return res

# @todo review if we still need this method
def extract_singles(self, polyrequest, dump=True):
"""
Carry out a series of single extractions, rather than a single polytope extraction.
Expand All @@ -149,7 +158,7 @@ def extract_singles(self, polyrequest, dump=True):
stored in the original buffer, and will be garbage collected when the result object
is garbage collected.
"""
requests = [ExtractionRequest(reqstr, ranges) for reqstr, ranges in polyrequest]
requests = self._unpack_polyrequest(polyrequest)

if dump:
# Copy values, allow original buffer to be garbage collected.
Expand All @@ -171,6 +180,20 @@ def extract_singles(self, polyrequest, dump=True):

def extract_str(self, reqstr, rangestr):
return self.extract_single(ExtractionRequest(reqstr, rangestr))

def _unpack_polyrequest(self, polyrequest):
requests = []
for item in polyrequest:
if len(item) == 2:
reqstr, ranges = item
hash = None
elif len(item) == 3:
reqstr, ranges, hash = item
else:
raise ValueError("Polyrequest should be a list of tuples of length 2 or 3")
requests.append(ExtractionRequest(reqstr, ranges, hash))
return requests


def extract_single(self, request):
"""
Expand Down Expand Up @@ -227,13 +250,14 @@ class ExtractionRequest:
ranges : [(lo, hi), (lo, hi), ...]
The ranges to extract.
"""
def __init__(self, req, ranges):
def __init__(self, req, ranges, gridHash=None):
reqstr = "retrieve,"+dic_to_request(req)
rangestr = list_to_rangestr(ranges)
request = ffi.new('gribjump_extraction_request_t**')
c_reqstr = ffi.new("char[]", reqstr.encode())
c_rangestr = ffi.new("char[]", rangestr.encode())
lib.gribjump_new_request(request, c_reqstr, c_rangestr)
c_hash = ffi.NULL if gridHash is None else ffi.new("char[]", gridHash.encode())
lib.gribjump_new_request(request, c_reqstr, c_rangestr, c_hash)
self.__request = ffi.gc(request[0], lib.gribjump_delete_request)

@property
Expand Down Expand Up @@ -327,4 +351,5 @@ def list_to_rangestr(ranges):

def dic_to_request(dic):
# e.g. {"class":"od", "expver":"0001", "levtype":"pl"} -> "class=od,expver=0001,levtype=pl"
return ','.join(['='.join([k, v]) for k, v in dic.items()])
return ','.join(['='.join([k, v]) for k, v in dic.items()])

1 change: 1 addition & 0 deletions src/gribjump/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ list( APPEND gribjump_srcs
GribJumpException.h
ExtractionData.cc
ExtractionData.h
Metrics.h
Types.h
)

Expand Down
39 changes: 25 additions & 14 deletions src/gribjump/Engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ std::vector<metkit::mars::MarsRequest> flattenRequest(const metkit::mars::MarsRe

typedef std::map<metkit::mars::MarsRequest, std::vector<std::string>> flattenedKeys_t;

flattenedKeys_t buildFlatKeys(const std::vector<metkit::mars::MarsRequest>& requests, bool flatten) {
flattenedKeys_t buildFlatKeys(const ExtractionRequests& requests, bool flatten) {

flattenedKeys_t keymap;

for (const auto& baseRequest : requests) {

for (const auto& req : requests) {
const metkit::mars::MarsRequest& baseRequest = req.request();
keymap[baseRequest] = std::vector<std::string>();

// Assume baseRequest has cardinality >= 1 and may need to be flattened
Expand Down Expand Up @@ -127,7 +127,7 @@ Engine::Engine() {}

Engine::~Engine() {}

ExItemMap Engine::buildKeyToExtractionItem(const MarsRequests& requests, const RangesList& ranges, bool flatten){
ExItemMap Engine::buildKeyToExtractionItem(const ExtractionRequests& requests, bool flatten){
ExItemMap keyToExtractionItem;

flattenedKeys_t flatKeys = buildFlatKeys(requests, flatten); // Map from base request to {flattened keys}
Expand All @@ -136,21 +136,30 @@ ExItemMap Engine::buildKeyToExtractionItem(const MarsRequests& requests, const R

// Create the 1-to-1 map
for (size_t i = 0; i < requests.size(); i++) {
const metkit::mars::MarsRequest& basereq = requests[i];
const metkit::mars::MarsRequest& basereq = requests[i].request();
const std::vector<std::string> keys = flatKeys[basereq];
for (const auto& key : keys) {
ASSERT(keyToExtractionItem.find(key) == keyToExtractionItem.end()); /// @todo support duplicated requests?
keyToExtractionItem.emplace(key, new ExtractionItem(basereq, ranges[i])); // 1-to-1-map
auto extractionItem = std::make_unique<ExtractionItem>(basereq, requests[i].ranges());
extractionItem->gridHash(requests[i].gridHash());
keyToExtractionItem.emplace(key, std::move(extractionItem)); // 1-to-1-map
}
}

return keyToExtractionItem;
}

filemap_t Engine::buildFileMap(const MarsRequests& requests, ExItemMap& keyToExtractionItem) {
filemap_t Engine::buildFileMap(const ExtractionRequests& requests, ExItemMap& keyToExtractionItem) {
// Map files to ExtractionItem
eckit::Timer timer("Gribjump Engine: Building file map");

std::vector<metkit::mars::MarsRequest> marsrequests;
for (const auto& req : requests) {
marsrequests.push_back(req.request());
}

const metkit::mars::MarsRequest req = unionRequest(requests);
const metkit::mars::MarsRequest req = unionRequest(marsrequests);
timer.reset("Gribjump Engine: Flattened requests and constructed union request");

filemap_t filemap = FDBLister::instance().fileMap(req, keyToExtractionItem);

Expand All @@ -159,13 +168,10 @@ filemap_t Engine::buildFileMap(const MarsRequests& requests, ExItemMap& keyToExt



ResultsMap Engine::extract(const MarsRequests& requests, const RangesList& ranges, bool flatten) {

eckit::Timer timer("Gribjump Engine: extract");

ExItemMap keyToExtractionItem = buildKeyToExtractionItem(requests, ranges, flatten); // Owns the ExtractionItems
timer.reset("Gribjump Engine: Key to ExtractionItem map built");
ResultsMap Engine::extract(const ExtractionRequests& requests, bool flatten) {
eckit::Timer timer("Gribjump Engine: Extracting");

ExItemMap keyToExtractionItem = buildKeyToExtractionItem(requests, flatten); // Owns the ExtractionItems
filemap_t filemap = buildFileMap(requests, keyToExtractionItem);
timer.reset("Gribjump Engine: File map built");

Expand Down Expand Up @@ -232,6 +238,11 @@ std::map<std::string, std::unordered_set<std::string> > Engine::axes(const std::
void Engine::reportErrors(eckit::Stream& client) {
taskGroup_.reportErrors(client);
}

void Engine::updateMetrics(Metrics& metrics) {
metrics.nTasks = taskGroup_.nTasks();
metrics.nFailedTasks = taskGroup_.nErrors();
}
//----------------------------------------------------------------------------------------------------------------------

} // namespace gribjump
11 changes: 7 additions & 4 deletions src/gribjump/Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "gribjump/Task.h"
#include "gribjump/Lister.h"
#include "gribjump/Types.h"
#include "gribjump/Metrics.h"

namespace gribjump {

Expand All @@ -27,7 +28,7 @@ class Engine {
Engine();
~Engine();

ResultsMap extract(const MarsRequests& requests, const RangesList& ranges, bool flattenRequests = false);
ResultsMap extract(const ExtractionRequests& requests, bool flattenRequests = false);

// byfiles: scan entire file, not just fields matching request
size_t scan(const MarsRequests& requests, bool byfiles = false);
Expand All @@ -36,14 +37,16 @@ class Engine {

void reportErrors(eckit::Stream& client_);

void updateMetrics(Metrics& metrics);

private:

filemap_t buildFileMap(const MarsRequests& requests, ExItemMap& keyToExtractionItem);
ExItemMap buildKeyToExtractionItem(const MarsRequests& requests, const RangesList& ranges, bool flatten);
filemap_t buildFileMap(const ExtractionRequests& requests, ExItemMap& keyToExtractionItem);
ExItemMap buildKeyToExtractionItem(const ExtractionRequests& requests, bool flatten);

private:

TaskGroup taskGroup_;
TaskGroup taskGroup_; /// @todo Maybe we should be returning the taskGroup, rather than storing it here.

};

Expand Down
9 changes: 7 additions & 2 deletions src/gribjump/ExtractionData.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ std::vector<double> decodeVector(eckit::Stream& s) {
return std::vector<double>(data, data + size);
}

// todo: encodeVectorVector ?

} // namespace

ExtractionResult::ExtractionResult() {}
Expand Down Expand Up @@ -118,14 +120,16 @@ eckit::Stream& operator<<(eckit::Stream& s, const ExtractionResult& o) {

//---------------------------------------------------------------------------------------------------------------------

ExtractionRequest::ExtractionRequest(const metkit::mars::MarsRequest& request, const std::vector<Range>& ranges):
ExtractionRequest::ExtractionRequest(const metkit::mars::MarsRequest& request, const std::vector<Range>& ranges, std::string gridHash):
ranges_(ranges),
request_(request)
request_(request),
gridHash_(gridHash)
{}
ExtractionRequest::ExtractionRequest() {}

ExtractionRequest::ExtractionRequest(eckit::Stream& s) {
request_ = metkit::mars::MarsRequest(s);
s >> gridHash_;
size_t numRanges;
s >> numRanges;
for (size_t j = 0; j < numRanges; j++) {
Expand Down Expand Up @@ -166,6 +170,7 @@ eckit::Stream& operator<<(eckit::Stream& s, const ExtractionRequest& o) {

void ExtractionRequest::encode(eckit::Stream& s) const {
s << request_;
s << gridHash_;
s << ranges_.size();
for (auto& [start, end] : ranges_) {
s << start << end;
Expand Down
10 changes: 5 additions & 5 deletions src/gribjump/ExtractionData.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,19 @@ class ExtractionResult {

//----------------------------------------------------------------------------------------------------------------------

/// @todo This class is now redundant thanks to ExtractionItem.

class ExtractionRequest {

public: // methods

ExtractionRequest();
ExtractionRequest(const metkit::mars::MarsRequest&, const std::vector<Range>&);
ExtractionRequest(const metkit::mars::MarsRequest&, const std::vector<Range>&, std::string gridHash="");
explicit ExtractionRequest(eckit::Stream& s);

std::vector<ExtractionRequest> split(const std::vector<std::string>& keys) const;
std::vector<ExtractionRequest> split(const std::string& key) const;
const std::vector<Range>& getRanges() const {return ranges_;}
const metkit::mars::MarsRequest& getRequest() const {return request_;}
const std::vector<Range>& ranges() const {return ranges_;}
const metkit::mars::MarsRequest& request() const {return request_;}
const std::string& gridHash() const {return gridHash_;}

private: // methods
void print(std::ostream&) const;
Expand All @@ -89,6 +88,7 @@ class ExtractionRequest {
private: // members
std::vector<Range> ranges_;
metkit::mars::MarsRequest request_;
std::string gridHash_;
};

//----------------------------------------------------------------------------------------------------------------------
Expand Down
6 changes: 6 additions & 0 deletions src/gribjump/ExtractionItem.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ class ExtractionItem : public eckit::NonCopyable {
return offset;
}

void gridHash(const std::string& hash) { gridHash_ = hash; }
const std::string& gridHash() const { return gridHash_; }

void URI(const eckit::URI& uri) { uri_ = uri; }

void values(ExValues values) { values_ = std::move(values); }
Expand Down Expand Up @@ -105,6 +108,9 @@ class ExtractionItem : public eckit::NonCopyable {
// Set on Extraction
ExValues values_;
ExMask mask_;

// Optional extras
std::string gridHash_=""; //< if supplied, check hash matches the jumpinfo
};

// ------------------------------------------------------------------
Expand Down
4 changes: 2 additions & 2 deletions src/gribjump/GribJump.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ size_t GribJump::scan(const std::vector<metkit::mars::MarsRequest> requests, boo
}


std::vector<std::vector<ExtractionResult*>> GribJump::extract(const std::vector<ExtractionRequest>& requests) {
std::vector<std::vector<ExtractionResult*>> out = impl_->extract(requests);
std::vector<std::vector<ExtractionResult*>> GribJump::extract(const std::vector<ExtractionRequest>& requests, LogContext ctx) {
std::vector<std::vector<ExtractionResult*>> out = impl_->extract(requests, ctx);
return out;
}

Expand Down
2 changes: 1 addition & 1 deletion src/gribjump/GribJump.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class GribJump {
size_t scan(const eckit::PathName& path);
size_t scan(std::vector<metkit::mars::MarsRequest> requests, bool byfiles = false);

std::vector<std::vector<ExtractionResult*>> extract(const std::vector<ExtractionRequest>& requests);
std::vector<std::vector<ExtractionResult*>> extract(const std::vector<ExtractionRequest>& requests, LogContext ctx=LogContext("none"));
std::vector<std::unique_ptr<ExtractionItem>> extract(const eckit::PathName& path, const std::vector<eckit::Offset>& offsets, const std::vector<std::vector<Range>>& ranges);

std::map<std::string, std::unordered_set<std::string>> axes(const std::string& request);
Expand Down
4 changes: 2 additions & 2 deletions src/gribjump/GribJumpBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
#include "eckit/memory/NonCopyable.h"
#include "eckit/filesystem/URI.h"


#include "gribjump/ExtractionData.h"
#include "gribjump/ExtractionItem.h"
#include "gribjump/Config.h"
#include "gribjump/Stats.h"
#include "gribjump/LibGribJump.h"
#include "gribjump/Metrics.h"
#include "gribjump/Types.h"

namespace fdb5 {
Expand All @@ -45,7 +45,7 @@ class GribJumpBase : public eckit::NonCopyable {
size_t virtual scan(const eckit::PathName& path) = 0;
virtual size_t scan(const std::vector<metkit::mars::MarsRequest> requests, bool byfiles) = 0;

virtual std::vector<std::vector<ExtractionResult*>> extract(std::vector<ExtractionRequest>) = 0;
virtual std::vector<std::vector<ExtractionResult*>> extract(std::vector<ExtractionRequest>, LogContext ctx=LogContext("none")) = 0;
virtual std::vector<std::unique_ptr<ExtractionItem>> extract(const eckit::PathName& path, const std::vector<eckit::Offset>& offsets, const std::vector<std::vector<Range>>& ranges) = 0;

virtual std::map<std::string, std::unordered_set<std::string>> axes(const std::string& request) = 0;
Expand Down
Loading

0 comments on commit 7f702c8

Please sign in to comment.