diff --git a/CMakeLists.txt b/CMakeLists.txt index 22a3af6b41..9d3c473497 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -316,6 +316,7 @@ add_subdirectory("src/core") add_subdirectory("src/geojson") add_subdirectory("src/bmi") add_subdirectory("src/realizations/catchment") +add_subdirectory("src/realizations/coastal") add_subdirectory("src/forcing") add_subdirectory("src/utilities/mdarray") add_subdirectory("src/utilities/mdframe") diff --git a/include/bmi/Bmi_Fortran_Adapter.hpp b/include/bmi/Bmi_Fortran_Adapter.hpp index 9f5bfe71bb..8abedef1b4 100644 --- a/include/bmi/Bmi_Fortran_Adapter.hpp +++ b/include/bmi/Bmi_Fortran_Adapter.hpp @@ -11,6 +11,10 @@ #include "State_Exception.hpp" #include "utilities/ExternalIntegrationException.hpp" +#if NGEN_WITH_MPI +#include +#endif // NGEN_WITH_MPI + // Forward declaration to provide access to protected items in testing class Bmi_Fortran_Adapter_Test; @@ -76,6 +80,46 @@ namespace models { } } +#if NGEN_WITH_MPI + // Special case constructor for formulation modules that + // implement the NGen BMI MPI protocol, in which the MPI + // Communicator is set through SetValue() between + // construction and Initialize() + Bmi_Fortran_Adapter(const std::string &type_name, + std::string library_file_path, + std::string bmi_init_config, + bool has_fixed_time_step, + std::string registration_func, + MPI_Comm comm) + : AbstractCLibBmiAdapter(type_name, + library_file_path, + bmi_init_config, + has_fixed_time_step, + registration_func + ) + { + try { + bmi_model = std::make_unique(Bmi_Fortran_Handle_Wrapper()); + dynamic_library_load(); + execModuleRegistration(); + + MPI_Fint comm_fortran = MPI_Comm_c2f(comm); + inner_set_value_int("bmi_mpi_comm_handle", &comm_fortran); + + int init_result = initialize(&bmi_model->handle, bmi_init_config.c_str()); + if (init_result != BMI_SUCCESS) { + init_exception_msg = "Failure when attempting to initialize " + model_name; + throw models::external::State_Exception(init_exception_msg); + } + model_initialized = true; + } + catch (...) { + model_initialized = true; + throw; + } + } +#endif // NGEN_WITH_MPI + Bmi_Fortran_Adapter(Bmi_Fortran_Adapter const&) = delete; Bmi_Fortran_Adapter(Bmi_Fortran_Adapter&&) = delete; diff --git a/include/forcing/DataProvider.hpp b/include/forcing/DataProvider.hpp index 32cbb36029..8d3da0ff5a 100644 --- a/include/forcing/DataProvider.hpp +++ b/include/forcing/DataProvider.hpp @@ -96,6 +96,8 @@ namespace data_access */ virtual std::vector get_values(const selection_type& selector, ReSampleMethod m=SUM) = 0; + virtual void get_values(const selection_type& selector, boost::span data) { throw std::runtime_error("DP::get_values(span) Unimplemented"); } + virtual bool is_property_sum_over_time_step(const std::string& name) const {return false; } private: diff --git a/include/forcing/GenericDataProvider.hpp b/include/forcing/GenericDataProvider.hpp index d4c6bd7ae9..ed8dda9191 100644 --- a/include/forcing/GenericDataProvider.hpp +++ b/include/forcing/GenericDataProvider.hpp @@ -3,10 +3,12 @@ #include "DataProvider.hpp" #include "DataProviderSelectors.hpp" +#include "MeshPointsSelectors.hpp" namespace data_access { using GenericDataProvider = DataProvider; + using MeshPointsDataProvider = DataProvider; } #endif diff --git a/include/forcing/MeshPointsSelectors.hpp b/include/forcing/MeshPointsSelectors.hpp new file mode 100644 index 0000000000..55e9c639d2 --- /dev/null +++ b/include/forcing/MeshPointsSelectors.hpp @@ -0,0 +1,17 @@ +#pragma once + +#include +#include +#include + +struct AllPoints {}; +static AllPoints all_points; + +struct MeshPointsSelector +{ + std::string variable_name; + std::chrono::time_point init_time; + std::chrono::seconds duration; + std::string output_units; + boost::variant> points; +}; diff --git a/include/forcing/NetCDFMeshPointsDataProvider.hpp b/include/forcing/NetCDFMeshPointsDataProvider.hpp new file mode 100644 index 0000000000..fe4530c07c --- /dev/null +++ b/include/forcing/NetCDFMeshPointsDataProvider.hpp @@ -0,0 +1,104 @@ +#pragma once + +#include + +#if NGEN_WITH_NETCDF + +#include "GenericDataProvider.hpp" +#include "DataProviderSelectors.hpp" + +#include +#include +#include +#include +#include +#include + +namespace netCDF { + class NcVar; + class NcFile; +} + +namespace data_access +{ + class NetCDFMeshPointsDataProvider : public MeshPointsDataProvider + { + public: + + using time_point_type = std::chrono::time_point; + + NetCDFMeshPointsDataProvider(std::string input_path, + time_point_type sim_start, + time_point_type sim_end); + + // Default implementation defined in the .cpp file so that + // client code doesn't need to have the full definition of + // NcFile visible for the compiler to implicitly generate + // ~NetCDFMeshPointsDataProvider() = default; + // for every file that uses this class + ~NetCDFMeshPointsDataProvider(); + + void finalize() override; + + /** Return the variables that are accessible by this data provider */ + boost::span get_available_variable_names() const override; + + /** Return the first valid time for which data from the requested variable can be requested */ + long get_data_start_time() const override; + + /** Return the last valid time for which data from the requested variable can be requested */ + long get_data_stop_time() const override; + + long record_duration() const override; + + /** + * Get the index of the data time step that contains the given point in time. + * + * An @ref std::out_of_range exception should be thrown if the time is not in any time step. + * + * @param epoch_time The point in time, as a seconds-based epoch time. + * @return The index of the forcing time step that contains the given point in time. + * @throws std::out_of_range If the given point is not in any time step. + */ + size_t get_ts_index_for_time(const time_t &epoch_time) const override; + + /** + * Get the value of a forcing property for an arbitrary time period, converting units if needed. + * + * An @ref std::out_of_range exception should be thrown if the data for the time period is not available. + * + * @param selector Data required to establish what subset of the stored data should be accessed + * @param m How data is to be resampled if there is a mismatch in data alignment or repeat rate + * @return The value of the forcing property for the described time period, with units converted if needed. + * @throws std::out_of_range If data for the time period is not available. + */ + double get_value(const selection_type& selector, ReSampleMethod m) override; + + void get_values(const selection_type& selector, boost::span data) override; + + // And an implementation of the usual version using it + std::vector get_values(const selection_type& selector, data_access::ReSampleMethod) override + { + throw std::runtime_error("Unimplemented"); + } + + private: + + void cache_variable(std::string const& var_name); + + time_point_type sim_start_date_time_epoch; + time_point_type sim_end_date_time_epoch; + + std::vector variable_names; + std::vector time_vals; + std::chrono::seconds time_stride; // the amount of time between stored time values + + std::shared_ptr nc_file; + + struct metadata_cache_entry; + std::map ncvar_cache; + }; +} + + +#endif // NGEN_WITH_NETCDF diff --git a/include/realizations/coastal/CoastalFormulation.hpp b/include/realizations/coastal/CoastalFormulation.hpp new file mode 100644 index 0000000000..8f2e4369dd --- /dev/null +++ b/include/realizations/coastal/CoastalFormulation.hpp @@ -0,0 +1,51 @@ +#pragma once + +#include +#include + +#include +#include + +class CoastalFormulation : public data_access::DataProvider +{ +public: + CoastalFormulation(std::string id) + : id_(std::move(id)) + { } + + virtual ~CoastalFormulation() = default; + + std::string get_id() const { + return this->id_; + } + + virtual void initialize() = 0; + void finalize() override = 0; + virtual void update() = 0; + + void get_values(const selection_type& selector, boost::span data) override = 0; + + // And an implementation of the usual version using it + std::vector get_values(const selection_type& selector, data_access::ReSampleMethod) override + { + std::vector output(selected_points_count(selector)); + get_values(selector, output); + return output; + } + +protected: + using selection_type = MeshPointsSelector; + using data_type = double; + + virtual size_t mesh_size(std::string const& variable_name) = 0; + + size_t selected_points_count(const selection_type& selector) + { + auto* points = boost::get>(&selector.points); + size_t size = points ? points->size() : this->mesh_size(selector.variable_name); + return size; + } + +private: + const std::string id_; +}; diff --git a/include/realizations/coastal/SchismFormulation.hpp b/include/realizations/coastal/SchismFormulation.hpp new file mode 100644 index 0000000000..04f1df3c15 --- /dev/null +++ b/include/realizations/coastal/SchismFormulation.hpp @@ -0,0 +1,82 @@ +#pragma once + +#include + +#if NGEN_WITH_BMI_FORTRAN && NGEN_WITH_MPI + +#include +#include +#include +#include +#include + +class SchismFormulation final : public CoastalFormulation +{ +public: + using MeshPointsDataProvider = data_access::DataProvider; + SchismFormulation( + std::string const& id + , std::string const& library_path + , std::string const& init_config_path + , MPI_Comm mpi_comm + , std::shared_ptr met_forcings + , std::shared_ptr offshore_boundary + , std::shared_ptr inflow_boundary + ); + + ~SchismFormulation(); + + // Implementation of DataProvider + boost::span get_available_variable_names() const override; + + long get_data_start_time() const override; + long get_data_stop_time() const override; + long record_duration() const override; + size_t get_ts_index_for_time(const time_t &epoch_time) const override; + + data_type get_value(const selection_type& selector, data_access::ReSampleMethod m) override; + + // Implementation of CoastalFormulation + void initialize() override; + void finalize() override; + void update() override; + + void get_values(const selection_type& selector, boost::span data) override; + +protected: + size_t mesh_size(std::string const& variable_name) override; + +private: + std::unique_ptr bmi_; + + enum ForcingSelector { METEO, OFFSHORE, INFLOW }; + struct InputMapping { ForcingSelector selector; std::string name; }; + static std::map expected_input_variables_; + std::map input_variable_units_; + std::map input_variable_type_; + std::map input_variable_count_; + + static std::vector exported_output_variable_names_; + std::map output_variable_units_; + std::map output_variable_type_; + std::map output_variable_count_; + + std::chrono::time_point current_time_; + std::chrono::seconds time_step_length_; + + // TODO: Some of these maybe should be members of + // CoastalFormulation + + // TODO: We really want something that we can ask for + // area-averaged RAINRATE over elements, but we're going to make + // do with point values at the element centroids + + using ProviderType = data_access::DataProvider; + std::shared_ptr meteorological_forcings_provider_; + std::shared_ptr offshore_boundary_provider_; + std::shared_ptr inflows_boundary_provider_; + + void set_inputs(); +}; + +#endif // NGEN_WITH_BMI_FORTRAN && NGEN_WITH_MPI diff --git a/include/utilities/parallel_utils.h b/include/utilities/parallel_utils.h index 3e3a70a85d..509649e0dd 100644 --- a/include/utilities/parallel_utils.h +++ b/include/utilities/parallel_utils.h @@ -2,6 +2,10 @@ #define NGEN_PARALLEL_UTILS_H #include + +// Define in the non-MPI case so that we don't need to conditionally compile `if (mpi_rank == 0)` +extern int mpi_rank; + #if NGEN_WITH_MPI #ifndef MPI_HF_SUB_CODE_GOOD @@ -29,6 +33,8 @@ #include "python/HydrofabricSubsetter.hpp" #endif // NGEN_WITH_PYTHON +extern int mpi_num_procs; + namespace parallel { /** @@ -50,37 +56,7 @@ namespace parallel { * is not in the success/ready state. * @return Whether all ranks coordinating status had a success/ready status value. */ - bool mpiSyncStatusAnd(bool status, int mpi_rank, int mpi_num_procs, const std::string &taskDesc) { - - // Expect 0 is good and 1 is no good for goodCode - // TODO: assert this in constructor or somewhere, or maybe just in a unit test - unsigned short codeBuffer; - bool printMessage = !taskDesc.empty(); - // For the other ranks, start by properly setting the status code value in the buffer and send to rank 0 - if (mpi_rank != 0) { - codeBuffer = status ? MPI_HF_SUB_CODE_GOOD : MPI_HF_SUB_CODE_BAD; - MPI_Send(&codeBuffer, 1, MPI_UNSIGNED_SHORT, 0, 0, MPI_COMM_WORLD); - } - // In rank 0, the first step is to receive and process codes from the other ranks into unified global status - else { - for (int i = 1; i < mpi_num_procs; ++i) { - MPI_Recv(&codeBuffer, 1, MPI_UNSIGNED_SHORT, i, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - // If any is ever "not good", overwrite status to be "false" - if (codeBuffer != MPI_HF_SUB_CODE_GOOD) { - if (printMessage) { - std::cout << "Rank " << i << " not successful/ready after " << taskDesc << std::endl; - } - status = false; - } - } - // Rank 0 must also now prepare the codeBuffer value for broadcasting the global status - codeBuffer = status ? MPI_HF_SUB_CODE_GOOD : MPI_HF_SUB_CODE_BAD; - } - - // Execute broadcast of global status rooted at rank 0 - MPI_Bcast(&codeBuffer, 1, MPI_UNSIGNED_SHORT, 0, MPI_COMM_WORLD); - return codeBuffer == MPI_HF_SUB_CODE_GOOD; - } + bool mpiSyncStatusAnd(bool status, int mpi_rank, int mpi_num_procs, const std::string &taskDesc); /** * Convenience method for overloaded function when no message is needed, and thus no description param provided. @@ -91,7 +67,7 @@ namespace parallel { * @return Whether all ranks coordinating status had a success/ready status value. * @see mpiSyncStatusAnd(bool, const std::string&) */ - bool mpiSyncStatusAnd(bool status, int mpi_rank, int mpi_num_procs) { + inline bool mpiSyncStatusAnd(bool status, int mpi_rank, int mpi_num_procs) { return mpiSyncStatusAnd(status, mpi_rank, mpi_num_procs, ""); } @@ -113,34 +89,7 @@ namespace parallel { * @return Whether proprocessing has already been performed to divide the main hydrofabric into existing, individual * sub-hydrofabric files for each partition/process. */ - bool is_hydrofabric_subdivided(int mpi_rank, int mpi_num_procs, bool printMsg) { - std::string name = catchmentDataFile + "." + std::to_string(mpi_rank); - // Initialize isGood based on local state. Here, local file is "good" when it already exists. - // TODO: this isn't actually checking whether the files are right (just that they are present) so do we need to? - bool isGood = utils::FileChecker::file_is_readable(name); - - if (mpiSyncStatusAnd(isGood, mpi_rank, mpi_num_procs)) { - if (printMsg) { std::cout << "Process " << mpi_rank << ": Hydrofabric already subdivided in " << mpi_num_procs << " files." << std::endl; } - return true; - } - else { - if (printMsg) { std::cout << "Process " << mpi_rank << ": Hydrofabric has not yet been subdivided." << std::endl; } - return false; - } - } - - /** - * Convenience overloaded method for when no supplemental output message is required. - * - * @param mpi_rank The rank of the current process. - * @param mpi_num_procs The total number of MPI processes. - * @return Whether proprocessing has already been performed to divide the main hydrofabric into existing, individual - * sub-hydrofabric files for each partition/process. - * @see is_hydrofabric_subdivided(bool) - */ - bool is_hydrofabric_subdivided(int mpi_rank, int mpi_num_procs) { - return is_hydrofabric_subdivided(mpi_rank, mpi_num_procs, false); - } + bool is_hydrofabric_subdivided(int mpi_rank, int mpi_num_procs, std::string const& catchmentDataFile, bool printMsg); /** * Set each rank's host "id" value in a provided host array. @@ -157,78 +106,7 @@ namespace parallel { * @param mpi_num_procs The number of ranks. * @param host_array A pointer to an allocated array of size ``mpi_num_procs``. */ - void get_hosts_array(int mpi_rank, int mpi_num_procs, int *host_array) { - const int ROOT_RANK = 0; - // These are the lengths of the (trimmed) C-string representations of the hostname for each rank - std::vector actualHostnameCStrLength(mpi_num_procs); - // Initialize to -1 to represent unknown - for (int i = 0; i < mpi_num_procs; ++i) { - actualHostnameCStrLength[i] = -1; - } - - // Get this rank's hostname (things should never be longer than 256) - char myhostname[256]; - gethostname(myhostname, 256); - - // Set the one for this rank - actualHostnameCStrLength[mpi_rank] = std::strlen(myhostname); - - // First, gather the hostname string lengths - MPI_Gather(&actualHostnameCStrLength[mpi_rank], 1, MPI_INT, actualHostnameCStrLength.data(), 1, MPI_INT, ROOT_RANK, - MPI_COMM_WORLD); - // Per-rank start offsets/displacements in our hostname strings gather buffer. - std::vector recvDisplacements(mpi_num_procs); - int totalLength = 0; - for (int i = 0; i < mpi_num_procs; ++i) { - // Displace each rank's string by the sum of the length of the previous rank strings - recvDisplacements[i] = totalLength; - // Adding the extra to make space for the \0 - actualHostnameCStrLength[i] += 1; - // Then update the total length - totalLength += actualHostnameCStrLength[i]; - - } - // Now we can create our buffer array and gather the hostname strings into it - char hostnames[totalLength]; - MPI_Gatherv(myhostname, actualHostnameCStrLength[mpi_rank], MPI_CHAR, hostnames, actualHostnameCStrLength.data(), - recvDisplacements.data(), MPI_CHAR, ROOT_RANK, MPI_COMM_WORLD); - - if (mpi_rank == ROOT_RANK) { - host_array[0] = 0; - int next_host_id = 1; - - int rank_with_matching_hostname; - char *checked_rank_hostname, *known_rank_hostname; - - for (int rank_being_check = 0; rank_being_check < mpi_num_procs; ++rank_being_check) { - // Set this as negative initially for each rank check to indicate no match found (at least yet) - rank_with_matching_hostname = -1; - // Get a C-string pointer for this rank's hostname, offset by the appropriate displacement - checked_rank_hostname = &hostnames[recvDisplacements[rank_being_check]]; - - // Assume that hostnames for any ranks less than the current rank being check are already known - for (int known_rank = 0; known_rank < rank_being_check; ++known_rank) { - // Get the right C-string pointer for the current known rank's hostname also - known_rank_hostname = &hostnames[recvDisplacements[known_rank]]; - // Compare the hostnames, setting and breaking if a match is found - if (std::strcmp(known_rank_hostname, checked_rank_hostname) == 0) { - rank_with_matching_hostname = known_rank; - break; - } - } - // This indicates this rank had no earlier rank with a matching hostname. - if (rank_with_matching_hostname < 0) { - // Assign new host id, then increment what the next id will be - host_array[rank_being_check] = next_host_id++; - } - else { - host_array[rank_being_check] = host_array[rank_with_matching_hostname]; - } - } - } - // Now, broadcast the results out - MPI_Bcast(host_array, mpi_num_procs, MPI_INT, 0, MPI_COMM_WORLD); - } + void get_hosts_array(int mpi_rank, int mpi_num_procs, int *host_array); /** * Send the contents of a text file to another MPI rank. @@ -240,58 +118,7 @@ namespace parallel { * @param destRank The MPI rank to which the file data should be sent. * @return Whether sending was successful. */ - bool mpi_send_text_file(const char *fileName, const int mpi_rank, const int destRank) { - int bufSize = 4096; - std::vector buf(bufSize); - int code; - // How much has been transferred so far - int totalNumTransferred = 0; - - FILE *file = fopen(fileName, "r"); - - // Transmit error code instead of expected size and return false if file can't be opened - if (file == NULL) { - // TODO: output error message - code = -1; - MPI_Send(&code, 1, MPI_INT, destRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD); - return false; - } - - // Send expected size to start - MPI_Send(&bufSize, 1, MPI_INT, destRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD); - - // Then get back expected size to infer other side is good to go - MPI_Recv(&code, 1, MPI_INT, destRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - if (code != bufSize) { - // TODO: output error message - fclose(file); - return false; - } - int continueCode = 1; - // Then while there is more of the file to read and send, read the next batch and ... - while (fgets(buf.data(), bufSize, file) != NULL) { - // Indicate we are ready to continue sending data - MPI_Send(&continueCode, 1, MPI_INT, destRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD); - - // Send this batch - MPI_Send(buf.data(), bufSize, MPI_CHAR, destRank, NGEN_MPI_DATA_TAG, MPI_COMM_WORLD); - - // Then get back a code, which will be -1 if bad and need to exit and otherwise good - MPI_Recv(&code, 1, MPI_INT, destRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - if (code < 0) { - // TODO: output error message - fclose(file); - return false; - } - } - // Once there is no more file to read and send, we should stop continuing - continueCode = 0; - MPI_Send(&continueCode, 1, MPI_INT, destRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD); - // Expect to get back a code of 0 - MPI_Recv(&code, 1, MPI_INT, destRank, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - fclose(file); - return code == 0; - } + bool mpi_send_text_file(const char *fileName, const int mpi_rank, const int destRank); /** * Receive text data from another MPI rank and write the contents to a text file. @@ -305,58 +132,7 @@ namespace parallel { * @param destRank The MPI rank to which the file data should be sent. * @return Whether sending was successful. */ - bool mpi_recv_text_file(const char *fileName, const int mpi_rank, const int srcRank) { - int bufSize, writeCode; - // Receive expected buffer size to start - MPI_Recv(&bufSize, 1, MPI_INT, srcRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - - // If the sending side couldn't open the file, then immediately return false - if (bufSize == -1) { - // TODO: output error - return false; - } - - // Try to open recv file ... - FILE *file = fopen(fileName, "w"); - // ... and let sending size know whether this was successful by sending error code if not ... - if (file == NULL) { - // TODO: output error message - bufSize = -1; - MPI_Send(&bufSize, 1, MPI_INT, srcRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD); - return false; - } - - // Send back the received buffer it if file opened, confirming things are good to go for transfer - MPI_Send(&bufSize, 1, MPI_INT, srcRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD); - - // How much has been transferred so far - int totalNumTransferred = 0; - std::vector buf(bufSize); - - int continueCode; - - while (true) { - // Make sure the other side wants to continue sending data - MPI_Recv(&continueCode, 1, MPI_INT, srcRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - if (continueCode <= 0) - break; - - MPI_Recv(buf.data(), bufSize, MPI_CHAR, srcRank, NGEN_MPI_DATA_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - - writeCode = fputs(buf.data(), file); - MPI_Send(&writeCode, 1, MPI_INT, srcRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD); - - if (writeCode < 0) { - fclose(file); - return false; - } - } - - fclose(file); - MPI_Send(&continueCode, 1, MPI_INT, srcRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD); - return true; - } - + bool mpi_recv_text_file(const char *fileName, const int mpi_rank, const int srcRank); /** * Distribute subdivided hydrofabric files to ranks on other hosts as needed. @@ -409,51 +185,7 @@ namespace parallel { */ bool distribute_subdivided_hydrofabric_files(const std::string &baseCatchmentFile, const std::string &baseNexusFile, const int sendingRank, const int mpi_rank, const int mpi_num_procs, - const int *hostIdForRank, bool syncReturnStatus, bool blockAll) - { - // Start with status as good - bool isGood = true; - // FIXME: For now, just have rank 0 send everything, but optimize with multiple procs or threads later - // Only need to process this if sending rank or a receiving ranks (i.e., not on same host as sending rank) - if (mpi_rank == sendingRank || hostIdForRank[mpi_rank] != hostIdForRank[sendingRank]) { - // Have the sending rank send out all files - if (mpi_rank == sendingRank) { - // In rank 0, for all the other ranks ... - for (int otherRank = 0; otherRank < mpi_num_procs; ++otherRank) { - // If another rank is on a different host (note that this covers otherRank == sendingRank case) ... - if (hostIdForRank[otherRank] != hostIdForRank[mpi_rank]) { - // ... then send that rank its rank-specific catchment and nexus files - std::string catFileToSend = baseCatchmentFile + "." + std::to_string(otherRank); - std::string nexFileToSend = baseNexusFile + "." + std::to_string(otherRank); - // Note that checking previous isGood is necessary here because of loop - isGood = isGood && mpi_send_text_file(catFileToSend.c_str(), mpi_rank, otherRank); - isGood = isGood && mpi_send_text_file(nexFileToSend.c_str(), mpi_rank, otherRank); - } - } - } - else { - // For a rank not on the same host as the sending rank, receive the transmitted file - std::string catFileToReceive = baseCatchmentFile + "." + std::to_string(mpi_rank); - std::string nexFileToReceive = baseNexusFile + "." + std::to_string(mpi_rank); - // Note that, unlike a bit earlier, don't need to check prior isGood in 1st receive, because not in loop - isGood = mpi_recv_text_file(catFileToReceive.c_str(), mpi_rank, sendingRank); - isGood = isGood && mpi_recv_text_file(nexFileToReceive.c_str(), mpi_rank, sendingRank); - } - } - - // Wait when appropriate - if (blockAll) { MPI_Barrier(MPI_COMM_WORLD); } - - // Sync status among the ranks also, if appropriate - if (syncReturnStatus) { - return mpiSyncStatusAnd(isGood, mpi_rank, mpi_num_procs, "distributing subdivided hydrofabric files"); - } - // Otherwise, just return the local status value - else { - return isGood; - } - } - + const int *hostIdForRank, bool syncReturnStatus, bool blockAll); /** * Attempt to subdivide the passed hydrofabric files into a series of per-partition files. @@ -470,71 +202,8 @@ namespace parallel { * @return Whether subdividing was successful. */ bool subdivide_hydrofabric(int mpi_rank, int mpi_num_procs, const std::string &catchmentDataFile, - const std::string &nexusDataFile, const std::string &partitionConfigFile) - { - // Track whether things are good, meaning ok to continue and, at the end, whether successful - // Start with a value of true - bool isGood = true; - - #if !NGEN_WITH_PYTHON - // We can't be good to proceed with this, because Python is not active - isGood = false; - std::cerr << "Driver is unable to perform required hydrofabric subdividing when Python integration is not active." << std::endl; - - - // Sync with the rest of the ranks and bail if any aren't ready to proceed for any reason - if (!mpiSyncStatusAnd(isGood, mpi_rank, mpi_num_procs, "initializing hydrofabric subdivider")) { - return false; - } - #else // i.e., #if NGEN_WITH_PYTHON - // Have rank 0 handle the generation task for all files/partitions - std::unique_ptr subdivider; - // Have rank 0 handle the generation task for all files/partitions - if (mpi_rank == 0) { - try { - subdivider = std::make_unique(catchmentDataFile, nexusDataFile, - partitionConfigFile); - } - catch (const std::exception &e) { - std::cerr << e.what() << std::endl; - // Set not good if the subdivider object couldn't be instantiated - isGood = false; - } - } - // Sync ranks and bail if any aren't ready to proceed for any reason - if (!mpiSyncStatusAnd(isGood, mpi_rank, mpi_num_procs, "initializing hydrofabric subdivider")) { - return false; - } - - if (mpi_rank == 0) { - // Try to perform the subdividing - try { - isGood = subdivider->execSubdivision(); - } - catch (const std::exception &e) { - std::cerr << e.what() << std::endl; - // Set not good if the subdivider object couldn't be instantiated - isGood = false; - } - } - // Sync ranks again here on whether subdividing was successful, having them all exit at this point if not - if (!mpiSyncStatusAnd(isGood, mpi_rank, mpi_num_procs, "executing hydrofabric subdivision")) { - return false; - } - // But if the subdividing went fine ... - else { - // ... figure out what ranks are on hosts with each other by getting an id for host of each rank - std::vector hostIdForRank(mpi_num_procs); - get_hosts_array(mpi_rank, mpi_num_procs, hostIdForRank.data()); - - // ... then (when necessary) transferring files around - return distribute_subdivided_hydrofabric_files(catchmentDataFile, nexusDataFile, 0, mpi_rank, - mpi_num_procs, hostIdForRank.data(), true, true); - - } - #endif // NGEN_WITH_PYTHON - } -} + const std::string &nexusDataFile, const std::string &partitionConfigFile); +} // namespace parallel #endif // NGEN_WITH_MPI diff --git a/src/NGen.cpp b/src/NGen.cpp index 1c0cf2bf28..d97145c7fc 100644 --- a/src/NGen.cpp +++ b/src/NGen.cpp @@ -38,8 +38,7 @@ std::string nexusDataFile = ""; std::string REALIZATION_CONFIG_PATH = ""; bool is_subdivided_hydrofabric_wanted = false; -// Define in the non-MPI case so that we don't need to conditionally compile `if (mpi_rank == 0)` -int mpi_rank = 0; +#include "parallel_utils.h" #if NGEN_WITH_MPI @@ -47,15 +46,12 @@ int mpi_rank = 0; #define MPI_HF_SUB_CLI_FLAG "--subdivided-hydrofabric" #endif -#include -#include "parallel_utils.h" #include "core/Partition_Parser.hpp" #include #include "core/Partition_One.hpp" std::string PARTITION_PATH = ""; -int mpi_num_procs; #endif // NGEN_WITH_MPI #include @@ -305,7 +301,7 @@ int main(int argc, char *argv[]) { // Do some extra steps if we expect to load a subdivided hydrofabric if (is_subdivided_hydrofabric_wanted) { // Ensure the hydrofabric is subdivided (either already or by doing it now), and then adjust these paths - if (parallel::is_hydrofabric_subdivided(mpi_rank, mpi_num_procs, true) || + if (parallel::is_hydrofabric_subdivided(mpi_rank, mpi_num_procs, catchmentDataFile, true) || parallel::subdivide_hydrofabric(mpi_rank, mpi_num_procs, catchmentDataFile, nexusDataFile, PARTITION_PATH)) { diff --git a/src/core/parallel_utils.cpp b/src/core/parallel_utils.cpp new file mode 100644 index 0000000000..d0d0f87a5a --- /dev/null +++ b/src/core/parallel_utils.cpp @@ -0,0 +1,352 @@ +#include + +#include +#include +#include + +int mpi_rank = 0; + +#if NGEN_WITH_MPI + +int mpi_num_procs = 0; + +namespace parallel { + + bool mpiSyncStatusAnd(bool status, int mpi_rank, int mpi_num_procs, const std::string &taskDesc) { + // Expect 0 is good and 1 is no good for goodCode + // TODO: assert this in constructor or somewhere, or maybe just in a unit test + unsigned short codeBuffer; + bool printMessage = !taskDesc.empty(); + // For the other ranks, start by properly setting the status code value in the buffer and send to rank 0 + if (mpi_rank != 0) { + codeBuffer = status ? MPI_HF_SUB_CODE_GOOD : MPI_HF_SUB_CODE_BAD; + MPI_Send(&codeBuffer, 1, MPI_UNSIGNED_SHORT, 0, 0, MPI_COMM_WORLD); + } + // In rank 0, the first step is to receive and process codes from the other ranks into unified global status + else { + for (int i = 1; i < mpi_num_procs; ++i) { + MPI_Recv(&codeBuffer, 1, MPI_UNSIGNED_SHORT, i, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + // If any is ever "not good", overwrite status to be "false" + if (codeBuffer != MPI_HF_SUB_CODE_GOOD) { + if (printMessage) { + std::cout << "Rank " << i << " not successful/ready after " << taskDesc << std::endl; + } + status = false; + } + } + // Rank 0 must also now prepare the codeBuffer value for broadcasting the global status + codeBuffer = status ? MPI_HF_SUB_CODE_GOOD : MPI_HF_SUB_CODE_BAD; + } + + // Execute broadcast of global status rooted at rank 0 + MPI_Bcast(&codeBuffer, 1, MPI_UNSIGNED_SHORT, 0, MPI_COMM_WORLD); + return codeBuffer == MPI_HF_SUB_CODE_GOOD; + } + + bool is_hydrofabric_subdivided(int mpi_rank, int mpi_num_procs, std::string const& catchmentDataFile, bool printMsg) { + std::string name = catchmentDataFile + "." + std::to_string(mpi_rank); + // Initialize isGood based on local state. Here, local file is "good" when it already exists. + // TODO: this isn't actually checking whether the files are right (just that they are present) so do we need to? + bool isGood = utils::FileChecker::file_is_readable(name); + + if (mpiSyncStatusAnd(isGood, mpi_rank, mpi_num_procs)) { + if (printMsg) { std::cout << "Process " << mpi_rank << ": Hydrofabric already subdivided in " << mpi_num_procs << " files." << std::endl; } + return true; + } + else { + if (printMsg) { std::cout << "Process " << mpi_rank << ": Hydrofabric has not yet been subdivided." << std::endl; } + return false; + } + } + + void get_hosts_array(int mpi_rank, int mpi_num_procs, int *host_array) { + const int ROOT_RANK = 0; + // These are the lengths of the (trimmed) C-string representations of the hostname for each rank + std::vector actualHostnameCStrLength(mpi_num_procs); + // Initialize to -1 to represent unknown + for (int i = 0; i < mpi_num_procs; ++i) { + actualHostnameCStrLength[i] = -1; + } + + // Get this rank's hostname (things should never be longer than 256) + char myhostname[256]; + gethostname(myhostname, 256); + + // Set the one for this rank + actualHostnameCStrLength[mpi_rank] = std::strlen(myhostname); + + // First, gather the hostname string lengths + MPI_Gather(&actualHostnameCStrLength[mpi_rank], 1, MPI_INT, actualHostnameCStrLength.data(), 1, MPI_INT, ROOT_RANK, + MPI_COMM_WORLD); + // Per-rank start offsets/displacements in our hostname strings gather buffer. + std::vector recvDisplacements(mpi_num_procs); + int totalLength = 0; + for (int i = 0; i < mpi_num_procs; ++i) { + // Displace each rank's string by the sum of the length of the previous rank strings + recvDisplacements[i] = totalLength; + // Adding the extra to make space for the \0 + actualHostnameCStrLength[i] += 1; + // Then update the total length + totalLength += actualHostnameCStrLength[i]; + + } + // Now we can create our buffer array and gather the hostname strings into it + char hostnames[totalLength]; + MPI_Gatherv(myhostname, actualHostnameCStrLength[mpi_rank], MPI_CHAR, hostnames, actualHostnameCStrLength.data(), + recvDisplacements.data(), MPI_CHAR, ROOT_RANK, MPI_COMM_WORLD); + + if (mpi_rank == ROOT_RANK) { + host_array[0] = 0; + int next_host_id = 1; + + int rank_with_matching_hostname; + char *checked_rank_hostname, *known_rank_hostname; + + for (int rank_being_check = 0; rank_being_check < mpi_num_procs; ++rank_being_check) { + // Set this as negative initially for each rank check to indicate no match found (at least yet) + rank_with_matching_hostname = -1; + // Get a C-string pointer for this rank's hostname, offset by the appropriate displacement + checked_rank_hostname = &hostnames[recvDisplacements[rank_being_check]]; + + // Assume that hostnames for any ranks less than the current rank being check are already known + for (int known_rank = 0; known_rank < rank_being_check; ++known_rank) { + // Get the right C-string pointer for the current known rank's hostname also + known_rank_hostname = &hostnames[recvDisplacements[known_rank]]; + // Compare the hostnames, setting and breaking if a match is found + if (std::strcmp(known_rank_hostname, checked_rank_hostname) == 0) { + rank_with_matching_hostname = known_rank; + break; + } + } + // This indicates this rank had no earlier rank with a matching hostname. + if (rank_with_matching_hostname < 0) { + // Assign new host id, then increment what the next id will be + host_array[rank_being_check] = next_host_id++; + } + else { + host_array[rank_being_check] = host_array[rank_with_matching_hostname]; + } + } + } + // Now, broadcast the results out + MPI_Bcast(host_array, mpi_num_procs, MPI_INT, 0, MPI_COMM_WORLD); + } + + bool mpi_send_text_file(const char *fileName, const int mpi_rank, const int destRank) { + int bufSize = 4096; + std::vector buf(bufSize); + int code; + // How much has been transferred so far + int totalNumTransferred = 0; + + FILE *file = fopen(fileName, "r"); + + // Transmit error code instead of expected size and return false if file can't be opened + if (file == NULL) { + // TODO: output error message + code = -1; + MPI_Send(&code, 1, MPI_INT, destRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD); + return false; + } + + // Send expected size to start + MPI_Send(&bufSize, 1, MPI_INT, destRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD); + + // Then get back expected size to infer other side is good to go + MPI_Recv(&code, 1, MPI_INT, destRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + if (code != bufSize) { + // TODO: output error message + fclose(file); + return false; + } + int continueCode = 1; + // Then while there is more of the file to read and send, read the next batch and ... + while (fgets(buf.data(), bufSize, file) != NULL) { + // Indicate we are ready to continue sending data + MPI_Send(&continueCode, 1, MPI_INT, destRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD); + + // Send this batch + MPI_Send(buf.data(), bufSize, MPI_CHAR, destRank, NGEN_MPI_DATA_TAG, MPI_COMM_WORLD); + + // Then get back a code, which will be -1 if bad and need to exit and otherwise good + MPI_Recv(&code, 1, MPI_INT, destRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + if (code < 0) { + // TODO: output error message + fclose(file); + return false; + } + } + // Once there is no more file to read and send, we should stop continuing + continueCode = 0; + MPI_Send(&continueCode, 1, MPI_INT, destRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD); + // Expect to get back a code of 0 + MPI_Recv(&code, 1, MPI_INT, destRank, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + fclose(file); + return code == 0; + } + + bool mpi_recv_text_file(const char *fileName, const int mpi_rank, const int srcRank) { + int bufSize, writeCode; + // Receive expected buffer size to start + MPI_Recv(&bufSize, 1, MPI_INT, srcRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + + // If the sending side couldn't open the file, then immediately return false + if (bufSize == -1) { + // TODO: output error + return false; + } + + // Try to open recv file ... + FILE *file = fopen(fileName, "w"); + // ... and let sending size know whether this was successful by sending error code if not ... + if (file == NULL) { + // TODO: output error message + bufSize = -1; + MPI_Send(&bufSize, 1, MPI_INT, srcRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD); + return false; + } + + // Send back the received buffer it if file opened, confirming things are good to go for transfer + MPI_Send(&bufSize, 1, MPI_INT, srcRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD); + + // How much has been transferred so far + int totalNumTransferred = 0; + std::vector buf(bufSize); + + int continueCode; + + while (true) { + // Make sure the other side wants to continue sending data + MPI_Recv(&continueCode, 1, MPI_INT, srcRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + if (continueCode <= 0) + break; + + MPI_Recv(buf.data(), bufSize, MPI_CHAR, srcRank, NGEN_MPI_DATA_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + + writeCode = fputs(buf.data(), file); + MPI_Send(&writeCode, 1, MPI_INT, srcRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD); + + if (writeCode < 0) { + fclose(file); + return false; + } + } + + fclose(file); + MPI_Send(&continueCode, 1, MPI_INT, srcRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD); + return true; + } + + bool distribute_subdivided_hydrofabric_files(const std::string &baseCatchmentFile, const std::string &baseNexusFile, + const int sendingRank, const int mpi_rank, const int mpi_num_procs, + const int *hostIdForRank, bool syncReturnStatus, bool blockAll) + { + // Start with status as good + bool isGood = true; + // FIXME: For now, just have rank 0 send everything, but optimize with multiple procs or threads later + // Only need to process this if sending rank or a receiving ranks (i.e., not on same host as sending rank) + if (mpi_rank == sendingRank || hostIdForRank[mpi_rank] != hostIdForRank[sendingRank]) { + // Have the sending rank send out all files + if (mpi_rank == sendingRank) { + // In rank 0, for all the other ranks ... + for (int otherRank = 0; otherRank < mpi_num_procs; ++otherRank) { + // If another rank is on a different host (note that this covers otherRank == sendingRank case) ... + if (hostIdForRank[otherRank] != hostIdForRank[mpi_rank]) { + // ... then send that rank its rank-specific catchment and nexus files + std::string catFileToSend = baseCatchmentFile + "." + std::to_string(otherRank); + std::string nexFileToSend = baseNexusFile + "." + std::to_string(otherRank); + // Note that checking previous isGood is necessary here because of loop + isGood = isGood && mpi_send_text_file(catFileToSend.c_str(), mpi_rank, otherRank); + isGood = isGood && mpi_send_text_file(nexFileToSend.c_str(), mpi_rank, otherRank); + } + } + } + else { + // For a rank not on the same host as the sending rank, receive the transmitted file + std::string catFileToReceive = baseCatchmentFile + "." + std::to_string(mpi_rank); + std::string nexFileToReceive = baseNexusFile + "." + std::to_string(mpi_rank); + // Note that, unlike a bit earlier, don't need to check prior isGood in 1st receive, because not in loop + isGood = mpi_recv_text_file(catFileToReceive.c_str(), mpi_rank, sendingRank); + isGood = isGood && mpi_recv_text_file(nexFileToReceive.c_str(), mpi_rank, sendingRank); + } + } + + // Wait when appropriate + if (blockAll) { MPI_Barrier(MPI_COMM_WORLD); } + + // Sync status among the ranks also, if appropriate + if (syncReturnStatus) { + return mpiSyncStatusAnd(isGood, mpi_rank, mpi_num_procs, "distributing subdivided hydrofabric files"); + } + // Otherwise, just return the local status value + else { + return isGood; + } + } + + bool subdivide_hydrofabric(int mpi_rank, int mpi_num_procs, const std::string &catchmentDataFile, + const std::string &nexusDataFile, const std::string &partitionConfigFile) + { + // Track whether things are good, meaning ok to continue and, at the end, whether successful + // Start with a value of true + bool isGood = true; + + #if !NGEN_WITH_PYTHON + // We can't be good to proceed with this, because Python is not active + isGood = false; + std::cerr << "Driver is unable to perform required hydrofabric subdividing when Python integration is not active." << std::endl; + + // Sync with the rest of the ranks and bail if any aren't ready to proceed for any reason + return mpiSyncStatusAnd(isGood, mpi_rank, mpi_num_procs, "initializing hydrofabric subdivider"); + + #else // i.e., #if NGEN_WITH_PYTHON + // Have rank 0 handle the generation task for all files/partitions + std::unique_ptr subdivider; + // Have rank 0 handle the generation task for all files/partitions + if (mpi_rank == 0) { + try { + subdivider = std::make_unique(catchmentDataFile, nexusDataFile, + partitionConfigFile); + } + catch (const std::exception &e) { + std::cerr << e.what() << std::endl; + // Set not good if the subdivider object couldn't be instantiated + isGood = false; + } + } + // Sync ranks and bail if any aren't ready to proceed for any reason + if (!mpiSyncStatusAnd(isGood, mpi_rank, mpi_num_procs, "initializing hydrofabric subdivider")) { + return false; + } + + if (mpi_rank == 0) { + // Try to perform the subdividing + try { + isGood = subdivider->execSubdivision(); + } + catch (const std::exception &e) { + std::cerr << e.what() << std::endl; + // Set not good if the subdivider object couldn't be instantiated + isGood = false; + } + } + // Sync ranks again here on whether subdividing was successful, having them all exit at this point if not + if (!mpiSyncStatusAnd(isGood, mpi_rank, mpi_num_procs, "executing hydrofabric subdivision")) { + return false; + } + // But if the subdividing went fine ... + else { + // ... figure out what ranks are on hosts with each other by getting an id for host of each rank + std::vector hostIdForRank(mpi_num_procs); + get_hosts_array(mpi_rank, mpi_num_procs, hostIdForRank.data()); + + // ... then (when necessary) transferring files around + return distribute_subdivided_hydrofabric_files(catchmentDataFile, nexusDataFile, 0, mpi_rank, + mpi_num_procs, hostIdForRank.data(), true, true); + + } + #endif // NGEN_WITH_PYTHON + } +} // namespace parallel + +#endif diff --git a/src/forcing/CMakeLists.txt b/src/forcing/CMakeLists.txt index d374032b2e..8cf40e198f 100644 --- a/src/forcing/CMakeLists.txt +++ b/src/forcing/CMakeLists.txt @@ -23,7 +23,10 @@ target_link_libraries(forcing PUBLIC target_sources(forcing PRIVATE "${CMAKE_CURRENT_LIST_DIR}/NullForcingProvider.cpp") if(NGEN_WITH_NETCDF) - target_sources(forcing PRIVATE "${CMAKE_CURRENT_LIST_DIR}/NetCDFPerFeatureDataProvider.cpp") + target_sources(forcing PRIVATE + "${CMAKE_CURRENT_LIST_DIR}/NetCDFPerFeatureDataProvider.cpp" + "${CMAKE_CURRENT_LIST_DIR}/NetCDFMeshPointsDataProvider.cpp" + ) target_link_libraries(forcing PUBLIC NetCDF) endif() diff --git a/src/forcing/NetCDFMeshPointsDataProvider.cpp b/src/forcing/NetCDFMeshPointsDataProvider.cpp new file mode 100644 index 0000000000..61c8ad653f --- /dev/null +++ b/src/forcing/NetCDFMeshPointsDataProvider.cpp @@ -0,0 +1,210 @@ +#include + +#if NGEN_WITH_NETCDF +#include "NetCDFMeshPointsDataProvider.hpp" +#include + +#include + +#include +#include + +namespace data_access { + +// Out-of-line class definition after forward-declaration so that the +// header doesn't need #include for NcVar to be a complete +// type there +struct NetCDFMeshPointsDataProvider::metadata_cache_entry { + netCDF::NcVar ncVar; + std::string units; + double scale_factor; + double offset; +}; + +NetCDFMeshPointsDataProvider::NetCDFMeshPointsDataProvider(std::string input_path, time_point_type sim_start, time_point_type sim_end) + : sim_start_date_time_epoch(sim_start) + , sim_end_date_time_epoch(sim_end) +{ + nc_file = std::make_shared(input_path, netCDF::NcFile::read); + + auto num_times = nc_file->getDim("time").getSize(); + auto time_var = nc_file->getVar("Time"); + + if (time_var.getDimCount() != 1) { + throw std::runtime_error("'Time' variable has dimension other than 1"); + } + + auto time_unit_att = time_var.getAtt("units"); + std::string time_unit_str; + + time_unit_att.getValues(time_unit_str); + if (time_unit_str != "minutes since 1970-01-01 00:00:00 UTC") { + throw std::runtime_error("Time units not exactly as expected"); + } + + std::vector>> raw_time(num_times); + time_var.getVar(raw_time.data()); + + time_vals.reserve(num_times); + for (int i = 0; i < num_times; ++i) { + // Assume that the system clock's epoch matches Unix time. + // This is guaranteed from C++20 onwards + time_vals.push_back(time_point_type(std::chrono::duration_cast(raw_time[i]))); + } + + time_stride = std::chrono::duration_cast(time_vals[1] - time_vals[0]); + + // verify the time stride + for( size_t i = 1; i < time_vals.size() -1; ++i) + { + auto tinterval = time_vals[i+1] - time_vals[i]; + + if ( tinterval - time_stride > std::chrono::microseconds(1) || + time_stride - tinterval > std::chrono::microseconds(1) ) + { + throw std::runtime_error("Time intervals in forcing file are not constant"); + } + } +} + +NetCDFMeshPointsDataProvider::~NetCDFMeshPointsDataProvider() = default; + +void NetCDFMeshPointsDataProvider::finalize() +{ + ncvar_cache.clear(); + + if (nc_file != nullptr) { + nc_file->close(); + } + nc_file = nullptr; +} + +boost::span NetCDFMeshPointsDataProvider::get_available_variable_names() const +{ + return variable_names; +} + +long NetCDFMeshPointsDataProvider::get_data_start_time() const +{ + return std::chrono::system_clock::to_time_t(time_vals[0]); +#if 0 + //return start_time; + //FIXME: Matching behavior from CsvMeshPointsForcingProvider, but both are probably wrong! + return sim_start_date_time_epoch.time_since_epoch().count(); // return start_time + sim_to_data_time_offset; +#endif +} + +long NetCDFMeshPointsDataProvider::get_data_stop_time() const +{ + return std::chrono::system_clock::to_time_t(time_vals.back() + time_stride); +#if 0 + //return stop_time; + //FIXME: Matching behavior from CsvMeshPointsForcingProvider, but both are probably wrong! + return sim_end_date_time_epoch.time_since_epoch().count(); // return end_time + sim_to_data_time_offset; +#endif +} + +long NetCDFMeshPointsDataProvider::record_duration() const +{ + return std::chrono::duration_cast(time_stride).count(); +} + +size_t NetCDFMeshPointsDataProvider::get_ts_index_for_time(const time_t &epoch_time_in) const +{ + // time_t in simulation engine's time frame - i.e. seconds, starting at 0 + auto epoch_time = sim_start_date_time_epoch + std::chrono::seconds(epoch_time_in); + + auto start_time = time_vals.front(); + auto stop_time = time_vals.back() + time_stride; + + if (start_time <= epoch_time && epoch_time < stop_time) + { + auto offset = epoch_time - start_time; + auto index = offset / time_stride; + return index; + } + else + { + std::stringstream ss; + ss << "The value " << (int)epoch_time_in << " was not in the range [" + << std::chrono::system_clock::to_time_t(start_time) << ", " + << std::chrono::system_clock::to_time_t(stop_time) << ")\n" + << SOURCE_LOC; + throw std::out_of_range(ss.str().c_str()); + } +} + +void NetCDFMeshPointsDataProvider::get_values(const selection_type& selector, boost::span data) +{ + if (!boost::get(&selector.points)) throw std::runtime_error("Not implemented - only all_points"); + + cache_variable(selector.variable_name); + + auto const& metadata = ncvar_cache[selector.variable_name]; + std::string const& source_units = metadata.units; + + size_t time_index = get_ts_index_for_time(std::chrono::system_clock::to_time_t(selector.init_time)); + + metadata.ncVar.getVar({time_index, 0}, {1, data.size()}, data.data()); + + for (double& value : data) { + value = value * metadata.scale_factor + metadata.offset; + } + + // These mass and and volume flux density units are very close to + // numerically identical for liquid water at atmospheric + // conditions, and so we currently treat them as interchangeable + bool RAINRATE_equivalence = + selector.variable_name == "RAINRATE" && + source_units == "mm s^-1" && + selector.output_units == "kg m-2 s-1"; + + if (!RAINRATE_equivalence) { + UnitsHelper::convert_values(source_units, data.data(), selector.output_units, data.data(), data.size()); + } +} + +double NetCDFMeshPointsDataProvider::get_value(const selection_type& selector, ReSampleMethod m) +{ + throw std::runtime_error("Not implemented - access chunks of the mesh"); + + return 0.0; +} + +void NetCDFMeshPointsDataProvider::cache_variable(std::string const& var_name) +{ + if (ncvar_cache.find(var_name) != ncvar_cache.end()) return; + + auto ncvar = nc_file->getVar(var_name); + variable_names.push_back(var_name); + + std::string native_units; + auto units_att = ncvar.getAtt("units"); + units_att.getValues(native_units); + + double scale_factor = 1.0; + try { + auto scale_var = ncvar.getAtt("scale_factor"); + if (!scale_var.isNull()) { + scale_var.getValues(&scale_factor); + } + } catch (...) { + // Assume it's just not present, and so keeps the default value + } + + double offset = 0.0; + try { + auto offset_var = ncvar.getAtt("add_offset"); + if (!offset_var.isNull()) { + offset_var.getValues(&offset); + } + } catch (...) { + // Assume it's just not present, and so keeps the default value + } + + ncvar_cache[var_name] = {ncvar, native_units, scale_factor, offset}; +} + +} + +#endif diff --git a/src/realizations/coastal/CMakeLists.txt b/src/realizations/coastal/CMakeLists.txt new file mode 100644 index 0000000000..e60ba8baee --- /dev/null +++ b/src/realizations/coastal/CMakeLists.txt @@ -0,0 +1,22 @@ +include(${PROJECT_SOURCE_DIR}/cmake/dynamic_sourced_library.cmake) +dynamic_sourced_cxx_library(realizations_coastal "${CMAKE_CURRENT_SOURCE_DIR}") + +add_library(NGen::realizations_coastal ALIAS realizations_coastal) + +target_include_directories(realizations_coastal PUBLIC + ${PROJECT_SOURCE_DIR}/include/core + ${PROJECT_SOURCE_DIR}/include/realizations/coastal + ${PROJECT_SOURCE_DIR}/include/forcing + ${PROJECT_SOURCE_DIR}/include/simulation_time + ${PROJECT_SOURCE_DIR}/include/utilities + ${PROJECT_SOURCE_DIR}/include/bmi + ) + +target_link_libraries(realizations_coastal PUBLIC + ${CMAKE_DL_LIBS} + NGen::config_header + NGen::geojson + NGen::logging + NGen::ngen_bmi + ) + diff --git a/src/realizations/coastal/SchismFormulation.cpp b/src/realizations/coastal/SchismFormulation.cpp new file mode 100644 index 0000000000..064ce728d2 --- /dev/null +++ b/src/realizations/coastal/SchismFormulation.cpp @@ -0,0 +1,188 @@ +#include + +#if NGEN_WITH_BMI_FORTRAN && NGEN_WITH_MPI + +#include +#include + +const static auto s_schism_registration_function = "register_bmi"; + +std::map SchismFormulation::expected_input_variables_ = + { + /* Meteorological Forcings */ + // RAINRATE - precipitation + {"RAINRATE", { SchismFormulation::METEO, "RAINRATE"}}, + // SFCPRS - surface atmospheric pressure + {"SFCPRS", { SchismFormulation::METEO, "PSFC"}}, + // SPFH2m - specific humidity at 2m + {"SPFH2m", { SchismFormulation::METEO, "Q2D"}}, + // TMP2m - temperature at 2m + {"TMP2m", { SchismFormulation::METEO, "T2D"}}, + // UU10m, VV10m - wind velocity components at 10m + {"UU10m", { SchismFormulation::METEO, "U2D"}}, + {"VV10m", { SchismFormulation::METEO, "V2D"}}, + + /* Input Boundary Conditions */ + // ETA2_bnd - water surface elevation at the boundaries + {"ETA2_bnd", { SchismFormulation::OFFSHORE, "ETA2_bnd"}}, + // Q_bnd - flows at boundaries + {"Q_bnd", { SchismFormulation::INFLOW, "Q_bnd"}}, + }; + +std::vector SchismFormulation::exported_output_variable_names_ = + { + "ETA2", + "VX", + "VY", + "BEDLEVEL" + }; + +SchismFormulation::SchismFormulation( + std::string const& id + , std::string const& library_path + , std::string const& init_config_path + , MPI_Comm mpi_comm + , std::shared_ptr met_forcings + , std::shared_ptr offshore_boundary + , std::shared_ptr inflow_boundary + ) + : CoastalFormulation(id) + , meteorological_forcings_provider_(met_forcings) + , offshore_boundary_provider_(offshore_boundary) + , inflows_boundary_provider_(inflow_boundary) +{ + bmi_ = std::make_unique + ( + "schism_coastal_formulation" + , library_path + , init_config_path + , /* model_time_step_fixed = */ true + , s_schism_registration_function + , mpi_comm + ); +} + +SchismFormulation::~SchismFormulation() = default; + +void SchismFormulation::initialize() +{ + auto const& input_vars = bmi_->GetInputVarNames(); + + for (auto const& name : input_vars) { + if (expected_input_variables_.find(name) == expected_input_variables_.end()) { + throw std::runtime_error("SCHISM instance requests unexpected input variable '" + name + "'"); + } + + input_variable_units_[name] = bmi_->GetVarUnits(name); + input_variable_type_[name] = bmi_->GetVarType(name); + input_variable_count_[name] = mesh_size(name); + } + + auto const& output_vars = bmi_->GetOutputVarNames(); + + for (auto const& name : output_vars) { + //if (expected_output_variables_.find(name) == expected_output_variables_.end()) { + // throw std::runtime_error("SCHISM instance requests unexpected output variable '" + name + "'"); + //} + + output_variable_units_[name] = bmi_->GetVarUnits(name); + output_variable_type_[name] = bmi_->GetVarType(name); + output_variable_count_[name] = mesh_size(name); + } + + time_step_length_ = std::chrono::seconds((long long)bmi_->GetTimeStep()); + + set_inputs(); +} + +void SchismFormulation::finalize() +{ + meteorological_forcings_provider_->finalize(); + offshore_boundary_provider_->finalize(); + inflows_boundary_provider_->finalize(); + + bmi_->Finalize(); +} + +void SchismFormulation::set_inputs() +{ + for (auto var : expected_input_variables_) { + auto const& name = var.first; + auto const& mapping = var.second; + auto selector = mapping.selector; + auto const& source_name = mapping.name; + auto points = MeshPointsSelector{source_name, current_time_, time_step_length_, input_variable_units_[name], all_points}; + + ProviderType* provider = [this, selector](){ + switch(selector) { + case METEO: return meteorological_forcings_provider_.get(); + case OFFSHORE: return offshore_boundary_provider_.get(); + case INFLOW: return inflows_boundary_provider_.get(); + default: throw std::runtime_error("Unknown SCHISM provider selector type"); + } + }(); + std::vector buffer(mesh_size(name)); + provider->get_values(points, buffer); + bmi_->SetValue(name, buffer.data()); + } +} + +void SchismFormulation::update() +{ + current_time_ += time_step_length_; + set_inputs(); + bmi_->Update(); +} + +boost::span SchismFormulation::get_available_variable_names() const +{ + return exported_output_variable_names_; +} + +long SchismFormulation::get_data_start_time() const +{ + throw std::runtime_error(__func__); + return 0; +} + +long SchismFormulation::get_data_stop_time() const +{ + throw std::runtime_error(__func__); + return 0; +} + +long SchismFormulation::record_duration() const +{ + throw std::runtime_error(__func__); + return 0; +} + +size_t SchismFormulation::get_ts_index_for_time(const time_t &epoch_time) const +{ + throw std::runtime_error(__func__); + return 0; +} + +SchismFormulation::data_type SchismFormulation::get_value(const selection_type& selector, data_access::ReSampleMethod m) +{ + throw std::runtime_error(__func__); + return 0.0; +} + +void SchismFormulation::get_values(const selection_type& selector, boost::span data) +{ + bmi_->GetValue(selector.variable_name, data.data()); +} + +size_t SchismFormulation::mesh_size(std::string const& variable_name) +{ + auto nbytes = bmi_->GetVarNbytes(variable_name); + auto itemsize = bmi_->GetVarItemsize(variable_name); + if (nbytes % itemsize != 0) { + throw std::runtime_error("For SCHISM variable '" + variable_name + "', itemsize " + std::to_string(itemsize) + + " does not evenly divide nbytes " + std::to_string(nbytes)); + } + return nbytes / itemsize; +} + +#endif // NGEN_WITH_BMI_FORTRAN && NGEN_WITH_MPI diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 79fd3897cc..e82cb096b5 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -134,6 +134,25 @@ endfunction() ########################## Individual tests ########################## +########################## Coastal Formulation Tests +# ngen_add_test( +# test_coastal +# OBJECTS +# coastal/SchismFormulation_Test.cpp +# LIBRARIES +# NGen::core +# NGen::realizations_catchment +# NGen::core_mediator +# NGen::forcing +# NGen::ngen_bmi +# REQUIRES +# NGEN_WITH_BMI_FORTRAN +# NGEN_WITH_MPI +# ) + +add_executable(test_coastal coastal/SchismFormulation_Test.cpp) +target_link_libraries(test_coastal NGen::core NGen::realizations_catchment NGen::realizations_coastal NGen::core_mediator NGen::forcing NGen::ngen_bmi) + ########################## GeoJSON Unit Tests ngen_add_test( test_geojson diff --git a/test/coastal/SchismFormulation_Test.cpp b/test/coastal/SchismFormulation_Test.cpp new file mode 100644 index 0000000000..921a285883 --- /dev/null +++ b/test/coastal/SchismFormulation_Test.cpp @@ -0,0 +1,153 @@ +//#include "gtest/gtest.h" +#include + +#include "realizations/coastal/SchismFormulation.hpp" +#include +#include +#include + +const static std::string library_path = "/Users/phil/Code/noaa/BUILD/bmischism_2024-08-19-ngen/libtestbmifortranmodel.dylib"; +const static std::string init_config_path = "/Users/phil/Code/noaa/SCHISM_Lake_Champlain_BMI_test/namelist.input"; +const static std::string met_forcing_netcdf_path = "/Users/phil/Code/noaa/NextGen_Forcings_Engine_SCHISM_Lake_Champlain_2015120100.nc"; + +#if 0 +struct Schism_Formulation_IT : public ::testing::Test +{ + +}; +#endif + +std::map input_variables_defaults = + { + /* Meteorological Forcings */ + // RAINRATE - precipitation + {"RAINRATE", 0.01}, + // SFCPRS - surface atmospheric pressure + {"SFCPRS", 101325.0}, + // SPFH2m - specific humidity at 2m + {"SPFH2m", 0.01}, + // TMP2m - temperature at 2m + {"TMP2m", 293}, + // UU10m, VV10m - wind velocity components at 10m + {"UU10m", 1.0}, + {"VV10m", 1.0}, + + /* Input Boundary Conditions */ + // ETA2_bnd - water surface elevation at the boundaries + {"ETA2_bnd", 30}, + // Q_bnd - flows at boundaries + {"Q_bnd", 0.1}, + }; + +struct MockProvider : data_access::DataProvider +{ + std::vector data; + + MockProvider() + : data(552697, 0.0) + {} + ~MockProvider() = default; + + // Implementation of DataProvider + std::vector variables; + boost::span get_available_variable_names() const override { return variables; } + + long get_data_start_time() const override { return 0; } + long get_data_stop_time() const override { return 0; } + long record_duration() const override { return 3600; } + size_t get_ts_index_for_time(const time_t &epoch_time) const override { return 1; } + + data_type get_value(const selection_type& selector, data_access::ReSampleMethod m) override { return data[0]; } + std::vector get_values(const selection_type& selector, data_access::ReSampleMethod m) override { throw ""; return data; } + void get_values(const selection_type& selector, boost::span out_data) override + { + auto default_value = input_variables_defaults[selector.variable_name]; + for (auto& val : out_data) { + val = default_value; + } + } +}; + +int main(int argc, char **argv) +{ + MPI_Init(&argc, &argv); + MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank); + + auto provider = std::make_shared(); + + std::tm start_time_tm{}; + start_time_tm.tm_year = 2015 - 1900; + start_time_tm.tm_mon = 12 - 1; + start_time_tm.tm_mday = 1; + auto start_time_t = std::mktime(&start_time_tm); + + std::tm stop_time_tm{}; + stop_time_tm.tm_year = 2015 - 1900; + stop_time_tm.tm_mon = 12 - 1; + stop_time_tm.tm_mday = 2; + auto stop_time_t = std::mktime(&stop_time_tm); + + auto netcdf_met_provider = std::make_shared(met_forcing_netcdf_path, + std::chrono::system_clock::from_time_t(start_time_t), + std::chrono::system_clock::from_time_t(stop_time_t)); + auto schism = std::make_unique(/*id=*/ "test_schism_formulation", + library_path, + init_config_path, + MPI_COMM_SELF, + netcdf_met_provider, + provider, + provider + ); + + schism->initialize(); + + for (int i = 0; i < 3; ++i) + schism->update(); + + using namespace std::chrono_literals; + + auto report = [](std::vector const& data, std::string name) { + double min = 10e6, max = -10e6; + for (int i = 0; i < data.size(); ++i) { + double val = data[i]; + if (std::isnan(val)) { + std::cout << "Nan found at " << i << std::endl; + break; + } + + min = std::min(val, min); + max = std::max(val, max); + } + std::cout << name << " ranges from " << min << " to " << max << std::endl; + }; + + std::vector bedlevel(278784, std::numeric_limits::quiet_NaN()); + MeshPointsSelector bedlevel_selector{"BEDLEVEL", std::chrono::system_clock::now(), 3600s, "m", all_points}; + schism->get_values(bedlevel_selector, bedlevel); + report(bedlevel, "BEDLEVEL"); + + std::vector eta2(278784, std::numeric_limits::quiet_NaN()); + MeshPointsSelector eta2_selector{"ETA2", std::chrono::system_clock::now(), 3600s, "m", all_points}; + schism->get_values(eta2_selector, eta2); + report(eta2, "ETA2"); + + std::vector vx(278784, std::numeric_limits::quiet_NaN()); + MeshPointsSelector vx_selector{"VX", std::chrono::system_clock::now(), 3600s, "m s-1", all_points}; + schism->get_values(vx_selector, vx); + report(vx, "VX"); + + std::vector vy(278784, std::numeric_limits::quiet_NaN()); + MeshPointsSelector vy_selector{"VY", std::chrono::system_clock::now(), 3600s, "m s-1", all_points}; + schism->get_values(vy_selector, vy); + report(vy, "VY"); + + schism->update(); + schism->get_values(vx_selector, vx); + schism->get_values(vy_selector, vy); + report(vx, "VX"); + report(vy, "VY"); + + schism->finalize(); + MPI_Finalize(); + return 0; +}