Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Large tidy up #11

Merged
merged 2 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 0 additions & 15 deletions src/gribjump/Bitmap.h

This file was deleted.

1 change: 1 addition & 0 deletions src/gribjump/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ list( APPEND gribjump_srcs
GribJumpException.h
ExtractionData.cc
ExtractionData.h
Types.h
)

if( HAVE_GRIBJUMP_LOCAL_EXTRACT )
Expand Down
60 changes: 38 additions & 22 deletions src/gribjump/Engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@

#include "metkit/mars/MarsExpension.h"


#include "gribjump/Engine.h"
#include "gribjump/Lister.h"
#include "gribjump/ExtractionItem.h"
#include "gribjump/remote/WorkQueue.h"
#include "gribjump/jumper/JumperFactory.h"
Expand Down Expand Up @@ -129,9 +127,8 @@ Engine::Engine() {}

Engine::~Engine() {}

Results Engine::extract(const MarsRequests& requests, const RangesList& ranges, bool flatten) {
typedef std::map<std::string, ExtractionItem*> keyToExItem_t;
keyToExItem_t keyToExtractionItem;
ExItemMap Engine::buildKeyToExtractionItem(const MarsRequests& requests, const RangesList& ranges, bool flatten){
ExItemMap keyToExtractionItem;

eckit::Timer timer;

Expand All @@ -151,42 +148,61 @@ Results Engine::extract(const MarsRequests& requests, const RangesList& ranges,

LOG_DEBUG_LIB(LibGribJump) << "Built keyToExtractionItem" << std::endl;

const metkit::mars::MarsRequest req = unionRequest(requests);
return keyToExtractionItem;
}

filemap_t Engine::buildFileMap(const MarsRequests& requests, ExItemMap& keyToExtractionItem) {
// Map files to ExtractionItem
eckit::Timer timer;

const metkit::mars::MarsRequest req = unionRequest(requests);
timer.reset("Gribjump Engine: Flattened requests and constructed union request");

// Map files to ExtractionItem
filemap_t filemap = FDBLister::instance().fileMap(req, keyToExtractionItem);
timer.reset("Gribjump Engine: Called fdb.list and constructed file map");

size_t counter = 0;
for (auto& [fname, extractionItems] : filemap) {
if (isRemote(extractionItems[0]->URI())) {
taskGroup_.enqueueTask(new InefficientFileExtractionTask(taskGroup_, counter++, fname, extractionItems));
}
else {
// Reaching here is an error on the databridge, as it means we think the file is local...
taskGroup_.enqueueTask(new FileExtractionTask(taskGroup_, counter++, fname, extractionItems));
return filemap;
}



ResultsMap Engine::extract(const MarsRequests& requests, const RangesList& ranges, bool flatten) {

ExItemMap keyToExtractionItem = buildKeyToExtractionItem(requests, ranges, flatten); // Owns the ExtractionItems
filemap_t filemap = buildFileMap(requests, keyToExtractionItem);
eckit::Timer timer;

bool remoteExtraction = LibGribJump::instance().config().getBool("remoteExtraction", false);
if (remoteExtraction) {
NOTIMP;
}
else {
size_t counter = 0;
for (auto& [fname, extractionItems] : filemap) {
if (isRemote(extractionItems[0]->URI())) {
taskGroup_.enqueueTask(new InefficientFileExtractionTask(taskGroup_, counter++, fname, extractionItems));
}
else {
// Reaching here is an error on the databridge, as it means we think the file is local...
taskGroup_.enqueueTask(new FileExtractionTask(taskGroup_, counter++, fname, extractionItems));
}
}
}

timer.reset("Gribjump Engine: Enqueued " + std::to_string(filemap.size()) + " file tasks");

taskGroup_.waitForTasks();

timer.reset("Gribjump Engine: All tasks finished");

// Create map of base request to vector of extraction items
std::map<metkit::mars::MarsRequest, std::vector<ExtractionItem*>> reqToExtractionItems;
// Create map of base request to vector of extraction items. Takes ownership of the ExtractionItems
ResultsMap results;

for (auto& [key, ex] : keyToExtractionItem) {
reqToExtractionItems[ex->request()].push_back(ex);
results[ex->request()].push_back(std::move(ex));
}

timer.reset("Gribjump Engine: Repackaged results");

return reqToExtractionItems;

return results;
}

size_t Engine::scan(const MarsRequests& requests, bool byfiles) {
Expand Down
17 changes: 8 additions & 9 deletions src/gribjump/Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,18 @@
#include "metkit/mars/MarsRequest.h"
#include "gribjump/ExtractionItem.h"
#include "gribjump/Task.h"
#include "gribjump/Lister.h"
#include "gribjump/Types.h"

namespace gribjump {

typedef std::vector<metkit::mars::MarsRequest> MarsRequests;
typedef std::pair<size_t, size_t> Range;
typedef std::vector<std::vector<Range>> RangesList;

// typedef std::vector<ExtractionResult> Results;
typedef std::map<metkit::mars::MarsRequest, std::vector<ExtractionItem*>> Results;


class Engine {
public:

Engine();
~Engine();

Results extract(const MarsRequests& requests, const RangesList& ranges, bool flattenRequests = false);
ResultsMap extract(const MarsRequests& requests, const RangesList& ranges, bool flattenRequests = false);

// byfiles: scan entire file, not just fields matching request
size_t scan(const MarsRequests& requests, bool byfiles = false);
Expand All @@ -42,6 +36,11 @@ class Engine {

void reportErrors(eckit::Stream& client_);

private:

filemap_t buildFileMap(const MarsRequests& requests, ExItemMap& keyToExtractionItem);
ExItemMap buildKeyToExtractionItem(const MarsRequests& requests, const RangesList& ranges, bool flatten);

private:

TaskGroup taskGroup_;
Expand Down
3 changes: 1 addition & 2 deletions src/gribjump/ExtractionData.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@

#include "eckit/serialisation/Stream.h"
#include "metkit/mars/MarsRequest.h"

using Range = std::pair<size_t, size_t>;
#include "gribjump/Types.h"

namespace gribjump {

Expand Down
9 changes: 1 addition & 8 deletions src/gribjump/ExtractionItem.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,15 @@
#pragma once

#include <bitset>

#include "eckit/filesystem/URI.h"
#include "metkit/mars/MarsRequest.h"

#include "gribjump/LibGribJump.h"

#include "gribjump/Types.h"
namespace gribjump {

using Ranges = std::vector<std::pair<size_t, size_t>>;
using ExValues = std::vector<std::vector<double>>;
using ExMask = std::vector<std::vector<std::bitset<64>>>;

// An object for grouping request, uri and result information together.
// Note, this is a one to one mapping between request and result.
// i.e. the request is assumed to be of cardinality 1. /// No it isn't! It's the base request

class ExtractionItem : public eckit::NonCopyable {

public:
Expand Down
8 changes: 0 additions & 8 deletions src/gribjump/GribJump.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,4 @@ void GribJump::stats() {
impl_->stats();
}

void GribJump::aggregate(const fdb5::Key& key, const eckit::URI& location) {
impl_->aggregate(key, location);
}

void GribJump::aggregate(const fdb5::Key& key, const eckit::message::Message& msg) {
impl_->aggregate(key, msg);
}

} // namespace gribjump
9 changes: 3 additions & 6 deletions src/gribjump/GribJumpBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "gribjump/Config.h"
#include "gribjump/Stats.h"
#include "gribjump/LibGribJump.h"
#include "gribjump/Types.h"

namespace fdb5 {
class Key;
Expand All @@ -32,7 +33,8 @@ namespace fdb5 {

namespace gribjump {

typedef std::pair<size_t, size_t> Range;
using ResultsMap = std::map<metkit::mars::MarsRequest, std::vector<std::unique_ptr<ExtractionItem>>>;

class GribJumpBase : public eckit::NonCopyable {
public:

Expand All @@ -47,14 +49,9 @@ class GribJumpBase : public eckit::NonCopyable {
virtual std::vector<std::unique_ptr<ExtractionItem>> extract(const eckit::PathName& path, const std::vector<eckit::Offset>& offsets, const std::vector<std::vector<Range>>& ranges) = 0;

virtual std::map<std::string, std::unordered_set<std::string>> axes(const std::string& request) = 0;


virtual void stats();

// Note: Only implemented if FDB is enabled
virtual void aggregate(const fdb5::Key& key, const eckit::URI& location) {NOTIMP;}
virtual void aggregate(const fdb5::Key& key, const eckit::message::Message& msg) {NOTIMP;}

protected: // members

Stats stats_;
Expand Down
17 changes: 0 additions & 17 deletions src/gribjump/Interval.h

This file was deleted.

5 changes: 2 additions & 3 deletions src/gribjump/Lister.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ std::string fdbkeyToStr(const fdb5::Key& key) {
}

// i.e. do all of the listing work I want...
filemap_t FDBLister::fileMap(const metkit::mars::MarsRequest& unionRequest, const reqToXRR_t& reqToExtractionItem) {

filemap_t FDBLister::fileMap(const metkit::mars::MarsRequest& unionRequest, const ExItemMap& reqToExtractionItem) {
eckit::AutoLock<FDBLister> lock(this);
filemap_t filemap;

Expand All @@ -93,7 +92,7 @@ filemap_t FDBLister::fileMap(const metkit::mars::MarsRequest& unionRequest, cons
// Set the URI in the ExtractionItem
eckit::URI uri = elem.location().fullUri();

ExtractionItem* extractionItem = reqToExtractionItem.at(key);
ExtractionItem* extractionItem = reqToExtractionItem.at(key).get();
extractionItem->URI(uri);

// Add to filemap
Expand Down
7 changes: 3 additions & 4 deletions src/gribjump/Lister.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@ class Lister {
};

// ------------------------------------------------------------------
using reqToXRR_t = std::map<std::string, ExtractionItem*>;
// We explicitly want this map to be randomly sorted.
using filemap_t = std::unordered_map<std::string, std::vector<ExtractionItem*>>; // String is filepath, eckit::PathName is not hashable?
// filemap holds non-owning pointers to ExtractionItems
using filemap_t = std::map<std::string, ExtractionItems>;

class FDBLister : public Lister {
public:
Expand All @@ -61,7 +60,7 @@ class FDBLister : public Lister {
virtual std::map<std::string, std::unordered_set<std::string> > axes(const std::string& request) override;
virtual std::map<std::string, std::unordered_set<std::string> > axes(const fdb5::FDBToolRequest& request);

filemap_t fileMap(const metkit::mars::MarsRequest& unionRequest, const reqToXRR_t& reqToXRR); // Used during extraction
filemap_t fileMap(const metkit::mars::MarsRequest& unionRequest, const ExItemMap& reqToXRR); // Used during extraction

std::map< eckit::PathName, eckit::OffsetList > filesOffsets(std::vector<metkit::mars::MarsRequest> requests); // Used during scan

Expand Down
17 changes: 3 additions & 14 deletions src/gribjump/LocalGribJump.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ std::vector<std::vector<ExtractionResult*>> LocalGribJump::extract(std::vector<E
ranges.push_back(req.getRanges());
}

std::map<MarsRequest, std::vector<ExtractionItem*>> results = extract(requests, ranges, flatten);
ResultsMap results = extract(requests, ranges, flatten);

std::vector<std::vector<ExtractionResult*>> extractionResults;
for (auto& req : polyRequest) {
Expand All @@ -117,10 +117,9 @@ std::vector<std::vector<ExtractionResult*>> LocalGribJump::extract(std::vector<E
return extractionResults;
}

std::map<MarsRequest, std::vector<ExtractionItem*>> LocalGribJump::extract(const std::vector<MarsRequest>& requests, const std::vector<std::vector<Range>>& ranges, bool flatten) {
ResultsMap LocalGribJump::extract(const std::vector<MarsRequest>& requests, const std::vector<std::vector<Range>>& ranges, bool flatten) {
Engine engine;
std::map<MarsRequest, std::vector<ExtractionItem*>> results = engine.extract(requests, ranges, flatten);
return results;
return engine.extract(requests, ranges, flatten);
}

std::map<std::string, std::unordered_set<std::string>> LocalGribJump::axes(const std::string& request) {
Expand All @@ -132,16 +131,6 @@ std::map<std::string, std::unordered_set<std::string>> LocalGribJump::axes(const
return engine.axes(request);
}

// TODO: remove these, plugin should use aggregator directly (which has its own config).
void LocalGribJump::aggregate(const fdb5::Key& key, const eckit::URI& location) {
NOTIMP;
};

void LocalGribJump::aggregate(const fdb5::Key& key, const eckit::message::Message& msg) {
NOTIMP;
};


static GribJumpBuilder<LocalGribJump> builder("local");

} // namespace gribjump
5 changes: 1 addition & 4 deletions src/gribjump/LocalGribJump.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,7 @@ class LocalGribJump : public GribJumpBase {
size_t scan(const std::vector<MarsRequest> requests, bool byfiles) override;

// new API!
std::map<MarsRequest, std::vector<ExtractionItem*>> extract(const std::vector<MarsRequest>& requests, const std::vector<std::vector<Range>>& ranges, bool flatten);

void aggregate(const fdb5::Key& key, const eckit::URI& location) override;
void aggregate(const fdb5::Key& key, const eckit::message::Message& msg) override;
ResultsMap extract(const std::vector<MarsRequest>& requests, const std::vector<std::vector<Range>>& ranges, bool flatten);

// old API
std::vector<std::unique_ptr<ExtractionItem>> extract(const eckit::PathName& path, const std::vector<eckit::Offset>& offsets, const std::vector<std::vector<Range>>& ranges) override;
Expand Down
7 changes: 5 additions & 2 deletions src/gribjump/Task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ void TaskGroup::reportErrors(eckit::Stream& client_) {

//----------------------------------------------------------------------------------------------------------------------

FileExtractionTask::FileExtractionTask(TaskGroup& taskgroup, const size_t id, const eckit::PathName& fname, std::vector<ExtractionItem*>& extractionItems) :
FileExtractionTask::FileExtractionTask(TaskGroup& taskgroup, const size_t id, const eckit::PathName& fname, ExtractionItems& extractionItems) :
Task(taskgroup, id),
fname_(fname),
extractionItems_(extractionItems) {
Expand Down Expand Up @@ -165,7 +165,10 @@ void FileExtractionTask::extract() {
}

//----------------------------------------------------------------------------------------------------------------------
InefficientFileExtractionTask::InefficientFileExtractionTask(TaskGroup& taskgroup, const size_t id, const eckit::PathName& fname, std::vector<ExtractionItem*>& extractionItems):


//----------------------------------------------------------------------------------------------------------------------
InefficientFileExtractionTask::InefficientFileExtractionTask(TaskGroup& taskgroup, const size_t id, const eckit::PathName& fname, ExtractionItems& extractionItems):
FileExtractionTask(taskgroup, id, fname, extractionItems) {
}

Expand Down
Loading
Loading