Skip to content

Commit

Permalink
Merge pull request #11 from ecmwf/feature/cleanup
Browse files Browse the repository at this point in the history
Large tidy up
  • Loading branch information
ChrisspyB authored Sep 11, 2024
2 parents 7c5a3fe + 9b0d930 commit 4868527
Show file tree
Hide file tree
Showing 23 changed files with 132 additions and 138 deletions.
15 changes: 0 additions & 15 deletions src/gribjump/Bitmap.h

This file was deleted.

1 change: 1 addition & 0 deletions src/gribjump/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ list( APPEND gribjump_srcs
GribJumpException.h
ExtractionData.cc
ExtractionData.h
Types.h
)

if( HAVE_GRIBJUMP_LOCAL_EXTRACT )
Expand Down
60 changes: 38 additions & 22 deletions src/gribjump/Engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@

#include "metkit/mars/MarsExpension.h"


#include "gribjump/Engine.h"
#include "gribjump/Lister.h"
#include "gribjump/ExtractionItem.h"
#include "gribjump/remote/WorkQueue.h"
#include "gribjump/jumper/JumperFactory.h"
Expand Down Expand Up @@ -129,9 +127,8 @@ Engine::Engine() {}

Engine::~Engine() {}

Results Engine::extract(const MarsRequests& requests, const RangesList& ranges, bool flatten) {
typedef std::map<std::string, ExtractionItem*> keyToExItem_t;
keyToExItem_t keyToExtractionItem;
ExItemMap Engine::buildKeyToExtractionItem(const MarsRequests& requests, const RangesList& ranges, bool flatten){
ExItemMap keyToExtractionItem;

eckit::Timer timer;

Expand All @@ -151,42 +148,61 @@ Results Engine::extract(const MarsRequests& requests, const RangesList& ranges,

LOG_DEBUG_LIB(LibGribJump) << "Built keyToExtractionItem" << std::endl;

const metkit::mars::MarsRequest req = unionRequest(requests);
return keyToExtractionItem;
}

filemap_t Engine::buildFileMap(const MarsRequests& requests, ExItemMap& keyToExtractionItem) {
// Map files to ExtractionItem
eckit::Timer timer;

const metkit::mars::MarsRequest req = unionRequest(requests);
timer.reset("Gribjump Engine: Flattened requests and constructed union request");

// Map files to ExtractionItem
filemap_t filemap = FDBLister::instance().fileMap(req, keyToExtractionItem);
timer.reset("Gribjump Engine: Called fdb.list and constructed file map");

size_t counter = 0;
for (auto& [fname, extractionItems] : filemap) {
if (isRemote(extractionItems[0]->URI())) {
taskGroup_.enqueueTask(new InefficientFileExtractionTask(taskGroup_, counter++, fname, extractionItems));
}
else {
// Reaching here is an error on the databridge, as it means we think the file is local...
taskGroup_.enqueueTask(new FileExtractionTask(taskGroup_, counter++, fname, extractionItems));
return filemap;
}



ResultsMap Engine::extract(const MarsRequests& requests, const RangesList& ranges, bool flatten) {

ExItemMap keyToExtractionItem = buildKeyToExtractionItem(requests, ranges, flatten); // Owns the ExtractionItems
filemap_t filemap = buildFileMap(requests, keyToExtractionItem);
eckit::Timer timer;

bool remoteExtraction = LibGribJump::instance().config().getBool("remoteExtraction", false);
if (remoteExtraction) {
NOTIMP;
}
else {
size_t counter = 0;
for (auto& [fname, extractionItems] : filemap) {
if (isRemote(extractionItems[0]->URI())) {
taskGroup_.enqueueTask(new InefficientFileExtractionTask(taskGroup_, counter++, fname, extractionItems));
}
else {
// Reaching here is an error on the databridge, as it means we think the file is local...
taskGroup_.enqueueTask(new FileExtractionTask(taskGroup_, counter++, fname, extractionItems));
}
}
}

timer.reset("Gribjump Engine: Enqueued " + std::to_string(filemap.size()) + " file tasks");

taskGroup_.waitForTasks();

timer.reset("Gribjump Engine: All tasks finished");

// Create map of base request to vector of extraction items
std::map<metkit::mars::MarsRequest, std::vector<ExtractionItem*>> reqToExtractionItems;
// Create map of base request to vector of extraction items. Takes ownership of the ExtractionItems
ResultsMap results;

for (auto& [key, ex] : keyToExtractionItem) {
reqToExtractionItems[ex->request()].push_back(ex);
results[ex->request()].push_back(std::move(ex));
}

timer.reset("Gribjump Engine: Repackaged results");

return reqToExtractionItems;

return results;
}

size_t Engine::scan(const MarsRequests& requests, bool byfiles) {
Expand Down
17 changes: 8 additions & 9 deletions src/gribjump/Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,18 @@
#include "metkit/mars/MarsRequest.h"
#include "gribjump/ExtractionItem.h"
#include "gribjump/Task.h"
#include "gribjump/Lister.h"
#include "gribjump/Types.h"

namespace gribjump {

typedef std::vector<metkit::mars::MarsRequest> MarsRequests;
typedef std::pair<size_t, size_t> Range;
typedef std::vector<std::vector<Range>> RangesList;

// typedef std::vector<ExtractionResult> Results;
typedef std::map<metkit::mars::MarsRequest, std::vector<ExtractionItem*>> Results;


class Engine {
public:

Engine();
~Engine();

Results extract(const MarsRequests& requests, const RangesList& ranges, bool flattenRequests = false);
ResultsMap extract(const MarsRequests& requests, const RangesList& ranges, bool flattenRequests = false);

// byfiles: scan entire file, not just fields matching request
size_t scan(const MarsRequests& requests, bool byfiles = false);
Expand All @@ -42,6 +36,11 @@ class Engine {

void reportErrors(eckit::Stream& client_);

private:

filemap_t buildFileMap(const MarsRequests& requests, ExItemMap& keyToExtractionItem);
ExItemMap buildKeyToExtractionItem(const MarsRequests& requests, const RangesList& ranges, bool flatten);

private:

TaskGroup taskGroup_;
Expand Down
3 changes: 1 addition & 2 deletions src/gribjump/ExtractionData.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@

#include "eckit/serialisation/Stream.h"
#include "metkit/mars/MarsRequest.h"

using Range = std::pair<size_t, size_t>;
#include "gribjump/Types.h"

namespace gribjump {

Expand Down
9 changes: 1 addition & 8 deletions src/gribjump/ExtractionItem.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,15 @@
#pragma once

#include <bitset>

#include "eckit/filesystem/URI.h"
#include "metkit/mars/MarsRequest.h"

#include "gribjump/LibGribJump.h"

#include "gribjump/Types.h"
namespace gribjump {

using Ranges = std::vector<std::pair<size_t, size_t>>;
using ExValues = std::vector<std::vector<double>>;
using ExMask = std::vector<std::vector<std::bitset<64>>>;

// An object for grouping request, uri and result information together.
// Note, this is a one to one mapping between request and result.
// i.e. the request is assumed to be of cardinality 1. /// No it isn't! It's the base request

class ExtractionItem : public eckit::NonCopyable {

public:
Expand Down
8 changes: 0 additions & 8 deletions src/gribjump/GribJump.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,4 @@ void GribJump::stats() {
impl_->stats();
}

void GribJump::aggregate(const fdb5::Key& key, const eckit::URI& location) {
impl_->aggregate(key, location);
}

void GribJump::aggregate(const fdb5::Key& key, const eckit::message::Message& msg) {
impl_->aggregate(key, msg);
}

} // namespace gribjump
9 changes: 3 additions & 6 deletions src/gribjump/GribJumpBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "gribjump/Config.h"
#include "gribjump/Stats.h"
#include "gribjump/LibGribJump.h"
#include "gribjump/Types.h"

namespace fdb5 {
class Key;
Expand All @@ -32,7 +33,8 @@ namespace fdb5 {

namespace gribjump {

typedef std::pair<size_t, size_t> Range;
using ResultsMap = std::map<metkit::mars::MarsRequest, std::vector<std::unique_ptr<ExtractionItem>>>;

class GribJumpBase : public eckit::NonCopyable {
public:

Expand All @@ -47,14 +49,9 @@ class GribJumpBase : public eckit::NonCopyable {
virtual std::vector<std::unique_ptr<ExtractionItem>> extract(const eckit::PathName& path, const std::vector<eckit::Offset>& offsets, const std::vector<std::vector<Range>>& ranges) = 0;

virtual std::map<std::string, std::unordered_set<std::string>> axes(const std::string& request) = 0;


virtual void stats();

// Note: Only implemented if FDB is enabled
virtual void aggregate(const fdb5::Key& key, const eckit::URI& location) {NOTIMP;}
virtual void aggregate(const fdb5::Key& key, const eckit::message::Message& msg) {NOTIMP;}

protected: // members

Stats stats_;
Expand Down
17 changes: 0 additions & 17 deletions src/gribjump/Interval.h

This file was deleted.

5 changes: 2 additions & 3 deletions src/gribjump/Lister.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ std::string fdbkeyToStr(const fdb5::Key& key) {
}

// i.e. do all of the listing work I want...
filemap_t FDBLister::fileMap(const metkit::mars::MarsRequest& unionRequest, const reqToXRR_t& reqToExtractionItem) {

filemap_t FDBLister::fileMap(const metkit::mars::MarsRequest& unionRequest, const ExItemMap& reqToExtractionItem) {
eckit::AutoLock<FDBLister> lock(this);
filemap_t filemap;

Expand All @@ -93,7 +92,7 @@ filemap_t FDBLister::fileMap(const metkit::mars::MarsRequest& unionRequest, cons
// Set the URI in the ExtractionItem
eckit::URI uri = elem.location().fullUri();

ExtractionItem* extractionItem = reqToExtractionItem.at(key);
ExtractionItem* extractionItem = reqToExtractionItem.at(key).get();
extractionItem->URI(uri);

// Add to filemap
Expand Down
7 changes: 3 additions & 4 deletions src/gribjump/Lister.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@ class Lister {
};

// ------------------------------------------------------------------
using reqToXRR_t = std::map<std::string, ExtractionItem*>;
// We explicitly want this map to be randomly sorted.
using filemap_t = std::unordered_map<std::string, std::vector<ExtractionItem*>>; // String is filepath, eckit::PathName is not hashable?
// filemap holds non-owning pointers to ExtractionItems
using filemap_t = std::map<std::string, ExtractionItems>;

class FDBLister : public Lister {
public:
Expand All @@ -61,7 +60,7 @@ class FDBLister : public Lister {
virtual std::map<std::string, std::unordered_set<std::string> > axes(const std::string& request) override;
virtual std::map<std::string, std::unordered_set<std::string> > axes(const fdb5::FDBToolRequest& request);

filemap_t fileMap(const metkit::mars::MarsRequest& unionRequest, const reqToXRR_t& reqToXRR); // Used during extraction
filemap_t fileMap(const metkit::mars::MarsRequest& unionRequest, const ExItemMap& reqToXRR); // Used during extraction

std::map< eckit::PathName, eckit::OffsetList > filesOffsets(std::vector<metkit::mars::MarsRequest> requests); // Used during scan

Expand Down
17 changes: 3 additions & 14 deletions src/gribjump/LocalGribJump.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ std::vector<std::vector<ExtractionResult*>> LocalGribJump::extract(std::vector<E
ranges.push_back(req.getRanges());
}

std::map<MarsRequest, std::vector<ExtractionItem*>> results = extract(requests, ranges, flatten);
ResultsMap results = extract(requests, ranges, flatten);

std::vector<std::vector<ExtractionResult*>> extractionResults;
for (auto& req : polyRequest) {
Expand All @@ -117,10 +117,9 @@ std::vector<std::vector<ExtractionResult*>> LocalGribJump::extract(std::vector<E
return extractionResults;
}

std::map<MarsRequest, std::vector<ExtractionItem*>> LocalGribJump::extract(const std::vector<MarsRequest>& requests, const std::vector<std::vector<Range>>& ranges, bool flatten) {
ResultsMap LocalGribJump::extract(const std::vector<MarsRequest>& requests, const std::vector<std::vector<Range>>& ranges, bool flatten) {
Engine engine;
std::map<MarsRequest, std::vector<ExtractionItem*>> results = engine.extract(requests, ranges, flatten);
return results;
return engine.extract(requests, ranges, flatten);
}

std::map<std::string, std::unordered_set<std::string>> LocalGribJump::axes(const std::string& request) {
Expand All @@ -132,16 +131,6 @@ std::map<std::string, std::unordered_set<std::string>> LocalGribJump::axes(const
return engine.axes(request);
}

// TODO: remove these, plugin should use aggregator directly (which has its own config).
void LocalGribJump::aggregate(const fdb5::Key& key, const eckit::URI& location) {
NOTIMP;
};

void LocalGribJump::aggregate(const fdb5::Key& key, const eckit::message::Message& msg) {
NOTIMP;
};


static GribJumpBuilder<LocalGribJump> builder("local");

} // namespace gribjump
5 changes: 1 addition & 4 deletions src/gribjump/LocalGribJump.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,7 @@ class LocalGribJump : public GribJumpBase {
size_t scan(const std::vector<MarsRequest> requests, bool byfiles) override;

// new API!
std::map<MarsRequest, std::vector<ExtractionItem*>> extract(const std::vector<MarsRequest>& requests, const std::vector<std::vector<Range>>& ranges, bool flatten);

void aggregate(const fdb5::Key& key, const eckit::URI& location) override;
void aggregate(const fdb5::Key& key, const eckit::message::Message& msg) override;
ResultsMap extract(const std::vector<MarsRequest>& requests, const std::vector<std::vector<Range>>& ranges, bool flatten);

// old API
std::vector<std::unique_ptr<ExtractionItem>> extract(const eckit::PathName& path, const std::vector<eckit::Offset>& offsets, const std::vector<std::vector<Range>>& ranges) override;
Expand Down
7 changes: 5 additions & 2 deletions src/gribjump/Task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ void TaskGroup::reportErrors(eckit::Stream& client_) {

//----------------------------------------------------------------------------------------------------------------------

FileExtractionTask::FileExtractionTask(TaskGroup& taskgroup, const size_t id, const eckit::PathName& fname, std::vector<ExtractionItem*>& extractionItems) :
FileExtractionTask::FileExtractionTask(TaskGroup& taskgroup, const size_t id, const eckit::PathName& fname, ExtractionItems& extractionItems) :
Task(taskgroup, id),
fname_(fname),
extractionItems_(extractionItems) {
Expand Down Expand Up @@ -165,7 +165,10 @@ void FileExtractionTask::extract() {
}

//----------------------------------------------------------------------------------------------------------------------
InefficientFileExtractionTask::InefficientFileExtractionTask(TaskGroup& taskgroup, const size_t id, const eckit::PathName& fname, std::vector<ExtractionItem*>& extractionItems):


//----------------------------------------------------------------------------------------------------------------------
InefficientFileExtractionTask::InefficientFileExtractionTask(TaskGroup& taskgroup, const size_t id, const eckit::PathName& fname, ExtractionItems& extractionItems):
FileExtractionTask(taskgroup, id, fname, extractionItems) {
}

Expand Down
Loading

0 comments on commit 4868527

Please sign in to comment.