Skip to content

Commit

Permalink
Mostly implemented NetCDF unstructured mesh reader for coastal formul…
Browse files Browse the repository at this point in the history
…ation use
  • Loading branch information
PhilMiller committed Sep 17, 2024
1 parent ebaa00d commit eaac987
Show file tree
Hide file tree
Showing 3 changed files with 381 additions and 1 deletion.
156 changes: 156 additions & 0 deletions include/forcing/NetCDFMeshPointsDataProvider.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
#pragma once

#include <NGenConfig.h>

#if NGEN_WITH_NETCDF

#include "GenericDataProvider.hpp"
#include "DataProviderSelectors.hpp"

#include <string>
#include <algorithm>
#include <map>
#include <memory>
#include <string>
#include <sstream>
#include <exception>
#include <mutex>
#include "assert.h"
#include <iomanip>
#include <boost/compute/detail/lru_cache.hpp>

#include <UnitsHelper.hpp>
#include <StreamHandler.hpp>

#include "AorcForcing.hpp"

namespace netCDF {
class NcVar;
class NcFile;
}

namespace data_access
{
class NetCDFMeshPointsDataProvider : public MeshPointsDataProvider
{
public:

enum TimeUnit
{
TIME_HOURS,
TIME_MINUTES,
TIME_SECONDS,
TIME_MILLISECONDS,
TIME_MICROSECONDS,
TIME_NANOSECONDS
};

using time_point_type = std::chrono::time_point<std::chrono::system_clock>;

/**
* @brief Factory method that creates or returns an existing provider for the provided path.
* @param input_path The path to a NetCDF file with lumped catchment forcing values.
* @param log_s An output log stream for messages from the underlying library. If a provider object for
* the given path already exists, this argument will be ignored.
*/
static std::shared_ptr<NetCDFMeshPointsDataProvider> get_shared_provider(std::string input_path, time_point_type sim_start, time_point_type sim_end);

/**
* @brief Cleanup the shared providers cache, ensuring that the files get closed.
*/
static void cleanup_shared_providers();

NetCDFMeshPointsDataProvider(std::string input_path,
time_point_type sim_start,
time_point_type sim_end);

// Default implementation defined in the .cpp file so that
// client code doesn't need to have the full definition of
// NcFile visible for the compiler to implicitly generate
// ~NetCDFMeshPointsDataProvider() = default;
// for every file that uses this class
~NetCDFMeshPointsDataProvider();

void finalize() override;

/** Return the variables that are accessable by this data provider */
boost::span<const std::string> get_available_variable_names() const override;

/** Return the first valid time for which data from the request variable can be requested */
long get_data_start_time() const override;

/** Return the last valid time for which data from the requested variable can be requested */
long get_data_stop_time() const override;

long record_duration() const override;

/**
* Get the index of the data time step that contains the given point in time.
*
* An @ref std::out_of_range exception should be thrown if the time is not in any time step.
*
* @param epoch_time The point in time, as a seconds-based epoch time.
* @return The index of the forcing time step that contains the given point in time.
* @throws std::out_of_range If the given point is not in any time step.
*/
size_t get_ts_index_for_time(const time_t &epoch_time) const override;

/**
* Get the value of a forcing property for an arbitrary time period, converting units if needed.
*
* An @ref std::out_of_range exception should be thrown if the data for the time period is not available.
*
* @param selector Data required to establish what subset of the stored data should be accessed
* @param m How data is to be resampled if there is a mismatch in data alignment or repeat rate
* @return The value of the forcing property for the described time period, with units converted if needed.
* @throws std::out_of_range If data for the time period is not available.
*/
double get_value(const selection_type& selector, ReSampleMethod m) override;

// The interface that DataProvider really should have
virtual void get_values(const selection_type& selector, boost::span<double> data);

// And an implementation of the usual version using it
std::vector<data_type> get_values(const selection_type& selector, data_access::ReSampleMethod) override
{
std::vector<data_type> output(selected_points_count(selector));
get_values(selector, output);
return output;
}

private:

size_t mesh_size(std::string const& variable_name);

size_t selected_points_count(const selection_type& selector)
{
auto* points = boost::get<std::vector<int>>(&selector.points);
size_t size = points ? points->size() : this->mesh_size(selector.variable_name);
return size;
}

time_point_type sim_start_date_time_epoch;
time_point_type sim_end_date_time_epoch;
std::chrono::seconds sim_to_data_time_offset; // Deliberately signed--sim should never start before data, yes?

static std::mutex shared_providers_mutex;
static std::map<std::string, std::shared_ptr<NetCDFMeshPointsDataProvider>> shared_providers;

std::vector<std::string> variable_names;
std::vector<time_point_type> time_vals;
TimeUnit time_unit; // the unit that time was stored as in the file
std::chrono::seconds time_stride; // the amount of time between stored time values

std::shared_ptr<netCDF::NcFile> nc_file;

struct metadata_cache_entry;
std::map<std::string, metadata_cache_entry> ncvar_cache;

boost::compute::detail::lru_cache<std::string, std::shared_ptr<std::vector<double>>> value_cache;
size_t cache_slice_t_size = 1;
size_t cache_slice_c_size = 1;
};
}


#endif // NGEN_WITH_NETCDF
5 changes: 4 additions & 1 deletion src/forcing/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ target_link_libraries(forcing PUBLIC
target_sources(forcing PRIVATE "${CMAKE_CURRENT_LIST_DIR}/NullForcingProvider.cpp")

if(NGEN_WITH_NETCDF)
target_sources(forcing PRIVATE "${CMAKE_CURRENT_LIST_DIR}/NetCDFPerFeatureDataProvider.cpp")
target_sources(forcing PRIVATE
"${CMAKE_CURRENT_LIST_DIR}/NetCDFPerFeatureDataProvider.cpp"
"${CMAKE_CURRENT_LIST_DIR}/NetCDFMeshPointsDataProvider.cpp"
)
target_link_libraries(forcing PUBLIC NetCDF)
endif()

Expand Down
221 changes: 221 additions & 0 deletions src/forcing/NetCDFMeshPointsDataProvider.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
#include <NGenConfig.h>

#if NGEN_WITH_NETCDF
#include "NetCDFMeshPointsDataProvider.hpp"
#include <chrono>
#include <netcdf>

std::mutex data_access::NetCDFMeshPointsDataProvider::shared_providers_mutex;
std::map<std::string, std::shared_ptr<data_access::NetCDFMeshPointsDataProvider>> data_access::NetCDFMeshPointsDataProvider::shared_providers;

namespace data_access {

// Out-of-line class definition after forward-declaration so that the
// header doesn't need #include <netcdf> for NcVar to be a complete
// type there
struct NetCDFMeshPointsDataProvider::metadata_cache_entry {
netCDF::NcVar variable;
std::string units;
double scale_factor;
double offset;
};

std::shared_ptr<NetCDFMeshPointsDataProvider> NetCDFMeshPointsDataProvider::get_shared_provider(std::string input_path, time_point_type sim_start, time_point_type sim_end)
{
const std::lock_guard<std::mutex> lock(shared_providers_mutex);
std::shared_ptr<NetCDFMeshPointsDataProvider> p;
if(shared_providers.count(input_path) > 0){
p = shared_providers[input_path];
} else {
p = std::make_shared<data_access::NetCDFMeshPointsDataProvider>(input_path, sim_start, sim_end);
shared_providers[input_path] = p;
}
return p;
}

void NetCDFMeshPointsDataProvider::cleanup_shared_providers()
{
const std::lock_guard<std::mutex> lock(shared_providers_mutex);
// First lets try just emptying the map... if all goes well, everything will destruct properly on its own...
shared_providers.clear();
}

NetCDFMeshPointsDataProvider::NetCDFMeshPointsDataProvider(std::string input_path, time_point_type sim_start, time_point_type sim_end)
: value_cache(20),
sim_start_date_time_epoch(sim_start),
sim_end_date_time_epoch(sim_end)
{
nc_file = std::make_shared<netCDF::NcFile>(input_path, netCDF::NcFile::read);

auto var_set = nc_file->getVars();

// Populate the variable attributes cache
for (const auto& element : var_set)
{
std::string var_name = element.first;
auto ncvar = nc_file->getVar(var_name);
variable_names.push_back(var_name);

std::string native_units;
auto units_att = ncvar.getAtt("units");
units_att.getValues(native_units);

double scale_factor = 1.0;
auto scale_var = ncvar.getAtt("scale_factor");
if (!scale_var.isNull()) {
scale_var.getValues(&scale_factor);
}

double offset = 0.0;
auto offset_var = ncvar.getAtt("add_offset");
if (!offset_var.isNull()) {
offset_var.getValues(&offset);
}

ncvar_cache[var_name] = {ncvar, native_units, scale_factor, offset};
}

auto num_times = nc_file->getDim("time").getSize();

std::vector<std::chrono::duration<double, std::ratio<60>>> raw_time(num_times);

auto time_var = nc_file->getVar("Time");

if (time_var.getDimCount() != 1) {
throw "'Time' variable has dimension other than 1";
}

time_var.getVar(raw_time.data());

auto time_unit_att = time_var.getAtt("units");
std::string time_unit_str;

time_unit_att.getValues(time_unit_str);
if (time_unit_str != "minutes since 1970-01-01 00:00:00 UTC") {
throw "Time units not exactly as expected";
}

time_vals.reserve(num_times);

for (int i = 0; i < num_times; ++i) {
// Assume that the system clock's epoch matches Unix time.
// This is guaranteed from C++20 onwards
time_vals.push_back(time_point_type(std::chrono::duration_cast<time_point_type::duration>(raw_time[i])));
}

time_stride = std::chrono::duration_cast<std::chrono::seconds>(time_vals[1] - time_vals[0]);

#if 0
// verify the time stride
for( size_t i = 1; i < time_vals.size() -1; ++i)
{
auto tinterval = time_vals[i+1] - time_vals[i];

if ( tinterval - time_stride > 1us)
{
throw std::runtime_error("Time intervals in forcing file are not constant");
}
}

// determine start_time and stop_time;
start_time = epoch_start_time + time_vals[0];
stop_time = epoch_start_time + time_vals.back() + time_stride;

sim_to_data_time_offset = std::chrono::duration_cast<std::chrono::seconds>(sim_start_date_time_epoch - start_time);
#endif
}

NetCDFMeshPointsDataProvider::~NetCDFMeshPointsDataProvider() = default;

void NetCDFMeshPointsDataProvider::finalize()
{
ncvar_cache.clear();

if (nc_file != nullptr) {
nc_file->close();
}
nc_file = nullptr;
}

boost::span<const std::string> NetCDFMeshPointsDataProvider::get_available_variable_names() const
{
return variable_names;
}

/** Return the first valid time for which data from the request variable can be requested */
long NetCDFMeshPointsDataProvider::get_data_start_time() const
{
return std::chrono::system_clock::to_time_t(time_vals[0]);
#if 0
//return start_time;
//FIXME: Matching behavior from CsvMeshPointsForcingProvider, but both are probably wrong!
return sim_start_date_time_epoch.time_since_epoch().count(); // return start_time + sim_to_data_time_offset;
#endif
}

/** Return the last valid time for which data from the requested variable can be requested */
long NetCDFMeshPointsDataProvider::get_data_stop_time() const
{
return std::chrono::system_clock::to_time_t(time_vals.back()) + std::chrono::duration_cast<std::chrono::seconds>(time_stride).count();
#if 0
//return stop_time;
//FIXME: Matching behavior from CsvMeshPointsForcingProvider, but both are probably wrong!
return sim_end_date_time_epoch.time_since_epoch().count(); // return end_time + sim_to_data_time_offset;
#endif
}

long NetCDFMeshPointsDataProvider::record_duration() const
{
return std::chrono::duration_cast<std::chrono::seconds>(time_stride).count();
}

size_t NetCDFMeshPointsDataProvider::get_ts_index_for_time(const time_t &epoch_time_in) const
{
auto start_time = time_vals.front();
auto stop_time = time_vals.back() + time_stride;

auto epoch_time = std::chrono::system_clock::from_time_t(epoch_time_in);

if (start_time <= epoch_time && epoch_time < stop_time)
{
auto offset = epoch_time - start_time;
auto index = offset / time_stride;
return index;
}
else
{
//std::stringstream ss;
//ss << "The value " << (int)epoch_time << " was not in the range [" << (int)start_time << "," << (int)stop_time << ")\n" << SOURCE_LOC;
//throw std::out_of_range(ss.str().c_str());
throw std::out_of_range("");
}
}

void NetCDFMeshPointsDataProvider::get_values(const selection_type& selector, boost::span<double> data)
{
if (!boost::get<AllPoints>(&selector.points)) throw "Not implemented - only all_points";

auto const& metadata = ncvar_cache[selector.variable_name];
std::string const& source_units = metadata.units;

size_t time_index = get_ts_index_for_time(std::chrono::system_clock::to_time_t(selector.init_time));

metadata.variable.getVar({time_index, 0}, {1, data.size()}, data.data());

for (double& value : data) {
value = value * metadata.scale_factor + metadata.offset;
}

UnitsHelper::convert_values(source_units, data.data(), selector.output_units, data.data(), data.size());
}

double NetCDFMeshPointsDataProvider::get_value(const selection_type& selector, ReSampleMethod m)
{
throw std::runtime_error("Not implemented - access chunks of the mesh");

return 0.0;
}

}

#endif

0 comments on commit eaac987

Please sign in to comment.