From ce88cce8e17d53da0bfa5f1c84149d86d9a1fe76 Mon Sep 17 00:00:00 2001 From: josuemtzmo Date: Wed, 2 Oct 2024 18:53:37 +0200 Subject: [PATCH 1/4] improve concatenation of netcdf files with simulation pickup --- src/OutputWriters/netcdf_output_writer.jl | 6 +++--- src/OutputWriters/output_writer_utils.jl | 17 +++++++++++++++++ src/Utils/schedules.jl | 7 +++---- 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/src/OutputWriters/netcdf_output_writer.jl b/src/OutputWriters/netcdf_output_writer.jl index 6c3ff9eafe..ac170acb2d 100644 --- a/src/OutputWriters/netcdf_output_writer.jl +++ b/src/OutputWriters/netcdf_output_writer.jl @@ -444,16 +444,16 @@ function NetCDFOutputWriter(model, outputs; dimensions = Dict(), overwrite_existing = nothing, deflatelevel = 0, - part = 1, file_splitting = NoFileSplitting(), verbose = false) mkpath(dir) filename = auto_extension(filename, ".nc") filepath = joinpath(dir, filename) - initialize!(file_splitting, model) update_file_splitting_schedule!(file_splitting, filepath) + part, filepath = is_output_splitted!(file_splitting, filepath, overwrite_existing) + if isnothing(overwrite_existing) if isfile(filepath) overwrite_existing = false @@ -739,7 +739,7 @@ function initialize_nc_file!(filepath, grid, model) - mode = overwrite_existing ? "c" : "a" + mode = overwrite_existing || !isfile(filepath) ? "c" : "a" # Add useful metadata global_attributes["date"] = "This file was generated on $(now())." diff --git a/src/OutputWriters/output_writer_utils.jl b/src/OutputWriters/output_writer_utils.jl index af6c6f6834..6e0248efdb 100644 --- a/src/OutputWriters/output_writer_utils.jl +++ b/src/OutputWriters/output_writer_utils.jl @@ -52,6 +52,23 @@ function update_file_splitting_schedule!(schedule::FileSizeLimit, filepath) return nothing end +# Update schedule based on user input +is_output_splitted!(schedule, filepath) = nothing + +function is_output_splitted!(schedule::TimeInterval, filepath, overwrite_existing) + folder = dirname(filepath) + filename = first(split(basename(filepath),"."))*"_part" + existing_files = filter(startswith(filename), readdir(folder)) + if existing_files |> length > 0 && !overwrite_existing + @warn "Split files found, Mode will be set to append to existing file:"* + joinpath(folder, last(existing_files)) + schedule.actuations = length(existing_files) - 1 + return Int(length(existing_files)), joinpath(folder, last(existing_files)) + end + return 1, filepath +end + + """ ext(ow) diff --git a/src/Utils/schedules.jl b/src/Utils/schedules.jl index 49302cfe7b..a18bd5ad22 100644 --- a/src/Utils/schedules.jl +++ b/src/Utils/schedules.jl @@ -55,17 +55,16 @@ end initialize!(schedule::TimeInterval, model) = initialize!(schedule, model.clock.time) -function next_actuation_time(schedule::TimeInterval) +function next_actuation_time(schedule::TimeInterval, t::Float64) t₀ = schedule.first_actuation_time - N = schedule.actuations T = schedule.interval + N = schedule.actuations return t₀ + (N + 1) * T end function (schedule::TimeInterval)(model) t = model.clock.time - t★ = next_actuation_time(schedule) - + t★ = next_actuation_time(schedule, t) if t >= t★ if schedule.actuations < typemax(Int) schedule.actuations += 1 From 056da9132cf787febcee644db41a40a508014c3b Mon Sep 17 00:00:00 2001 From: josuemtzmo Date: Thu, 3 Oct 2024 13:51:13 +0200 Subject: [PATCH 2/4] revert to previous version, since no changes were needed --- src/Utils/schedules.jl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Utils/schedules.jl b/src/Utils/schedules.jl index a18bd5ad22..22e1bff165 100644 --- a/src/Utils/schedules.jl +++ b/src/Utils/schedules.jl @@ -55,16 +55,16 @@ end initialize!(schedule::TimeInterval, model) = initialize!(schedule, model.clock.time) -function next_actuation_time(schedule::TimeInterval, t::Float64) +function next_actuation_time(schedule::TimeInterval) t₀ = schedule.first_actuation_time - T = schedule.interval N = schedule.actuations + T = schedule.interval return t₀ + (N + 1) * T end function (schedule::TimeInterval)(model) t = model.clock.time - t★ = next_actuation_time(schedule, t) + t★ = next_actuation_time(schedule) if t >= t★ if schedule.actuations < typemax(Int) schedule.actuations += 1 From 190bb07f196d340fbba6f0d43e7bf48712336aba Mon Sep 17 00:00:00 2001 From: josuemtzmo Date: Tue, 15 Oct 2024 16:03:57 +0200 Subject: [PATCH 3/4] include test of new automatic append to split files if simulation pickup --- src/OutputWriters/netcdf_output_writer.jl | 3 +- src/OutputWriters/output_writer_utils.jl | 17 ++++--- test/test_netcdf_output_writer.jl | 62 +++++++++++++++++++++-- 3 files changed, 70 insertions(+), 12 deletions(-) diff --git a/src/OutputWriters/netcdf_output_writer.jl b/src/OutputWriters/netcdf_output_writer.jl index ac170acb2d..14eafe0452 100644 --- a/src/OutputWriters/netcdf_output_writer.jl +++ b/src/OutputWriters/netcdf_output_writer.jl @@ -444,6 +444,7 @@ function NetCDFOutputWriter(model, outputs; dimensions = Dict(), overwrite_existing = nothing, deflatelevel = 0, + part = nothing, file_splitting = NoFileSplitting(), verbose = false) mkpath(dir) @@ -452,7 +453,7 @@ function NetCDFOutputWriter(model, outputs; initialize!(file_splitting, model) update_file_splitting_schedule!(file_splitting, filepath) - part, filepath = is_output_splitted!(file_splitting, filepath, overwrite_existing) + part, filepath = find_existing_splitted_output!(file_splitting, filepath, overwrite_existing) if isnothing(overwrite_existing) if isfile(filepath) diff --git a/src/OutputWriters/output_writer_utils.jl b/src/OutputWriters/output_writer_utils.jl index 6e0248efdb..00abb54ae8 100644 --- a/src/OutputWriters/output_writer_utils.jl +++ b/src/OutputWriters/output_writer_utils.jl @@ -53,17 +53,18 @@ function update_file_splitting_schedule!(schedule::FileSizeLimit, filepath) end # Update schedule based on user input -is_output_splitted!(schedule, filepath) = nothing +find_existing_splitted_output!(schedule, filepath) = nothing -function is_output_splitted!(schedule::TimeInterval, filepath, overwrite_existing) +function find_existing_splitted_output!(schedule::TimeInterval, filepath, overwrite_existing) folder = dirname(filepath) - filename = first(split(basename(filepath),"."))*"_part" - existing_files = filter(startswith(filename), readdir(folder)) - if existing_files |> length > 0 && !overwrite_existing + filename = first(split(basename(filepath),".")) * "_part" + filelist = filter(startswith(filename), readdir(folder)) + existing_files = length(filelist) > 0 + if existing_files && !overwrite_existing @warn "Split files found, Mode will be set to append to existing file:"* - joinpath(folder, last(existing_files)) - schedule.actuations = length(existing_files) - 1 - return Int(length(existing_files)), joinpath(folder, last(existing_files)) + joinpath(folder, last(filelist)) + schedule.actuations = length(filelist) - 1 + return Int(length(filelist)), joinpath(folder, last(filelist)) end return 1, filepath end diff --git a/test/test_netcdf_output_writer.jl b/test/test_netcdf_output_writer.jl index 2958d55a4f..9287ae64c3 100644 --- a/test/test_netcdf_output_writer.jl +++ b/test/test_netcdf_output_writer.jl @@ -107,10 +107,18 @@ function test_netcdf_time_file_splitting(arch) global_attributes = fake_attributes, file_splitting = TimeInterval(4seconds), overwrite_existing = true) + + cp = Checkpointer(model; + dir = ".", + schedule=IterationInterval(12), + prefix="model_checkpoint", + overwrite_existing = false, + verbose = true + ) - push!(simulation.output_writers, ow) + push!(simulation.output_writers, ow, cp) - run!(simulation) + run!(simulation, pickup=false) for n in string.(1:3) filename = "test_part$n.nc" @@ -121,11 +129,58 @@ function test_netcdf_time_file_splitting(arch) # Test that all files contain the user defined attributes. @test ds.attrib["fake_attribute"] == "fake_attribute" + # Leave test directory clean. + close(ds) + # rm(filename) + end + # rm("test_part4.nc") + + return nothing +end + +function test_netcdf_pickup_from_time_file_splitting(arch) + grid = RectilinearGrid(arch, size=(16, 16, 16), extent=(1, 1, 1), halo=(1, 1, 1)) + model = NonhydrostaticModel(; grid, buoyancy=SeawaterBuoyancy(), tracers=(:T, :S)) + simulation = Simulation(model, Δt=1, stop_iteration=24seconds) + + fake_attributes = Dict("fake_attribute"=>"fake_attribute") + + ow = NetCDFOutputWriter(model, (; u=model.velocities.u); + dir = ".", + filename = "test.nc", + schedule = IterationInterval(2), + array_type = Array{Float64}, + with_halos = true, + global_attributes = fake_attributes, + file_splitting = TimeInterval(4seconds), + overwrite_existing = true) + + cp = Checkpointer(model; + dir = ".", + schedule=IterationInterval(12), + prefix="model_checkpoint", + overwrite_existing = false, + verbose = true + ) + + push!(simulation.output_writers, ow, cp) + + run!(simulation, pickup=true) + + for n in string.(1:6) + filename = "test_part$n.nc" + ds = NCDataset(filename,"r") + dimlength = length(ds["time"]) + # Test that all files contain the same dimensions. + @test dimlength == 2 + # Test that all files contain the user defined attributes. + @test ds.attrib["fake_attribute"] == "fake_attribute" + # Leave test directory clean. close(ds) rm(filename) end - rm("test_part4.nc") + rm("test_part7.nc") return nothing end @@ -924,6 +979,7 @@ for arch in archs test_DateTime_netcdf_output(arch) test_netcdf_size_file_splitting(arch) test_netcdf_time_file_splitting(arch) + test_netcdf_pickup_from_time_file_splitting(arch) test_TimeDate_netcdf_output(arch) test_thermal_bubble_netcdf_output(arch) test_thermal_bubble_netcdf_output_with_halos(arch) From 018b0ae0e45f787f375170f72be9af5a5e725b00 Mon Sep 17 00:00:00 2001 From: josuemtzmo Date: Mon, 4 Nov 2024 20:26:46 +0100 Subject: [PATCH 4/4] simplifying syntaxis --- src/OutputWriters/netcdf_output_writer.jl | 9 +++-- src/OutputWriters/output_writer_utils.jl | 42 ++++++++++++++++++----- 2 files changed, 39 insertions(+), 12 deletions(-) diff --git a/src/OutputWriters/netcdf_output_writer.jl b/src/OutputWriters/netcdf_output_writer.jl index 14eafe0452..c234029b58 100644 --- a/src/OutputWriters/netcdf_output_writer.jl +++ b/src/OutputWriters/netcdf_output_writer.jl @@ -448,13 +448,16 @@ function NetCDFOutputWriter(model, outputs; file_splitting = NoFileSplitting(), verbose = false) mkpath(dir) + + part = number_of_split_files!(file_splitting, dir, filename, overwrite_existing) + filename = current_split_filename(dir, filename) + filename = auto_extension(filename, ".nc") filepath = joinpath(dir, filename) + initialize!(file_splitting, model) update_file_splitting_schedule!(file_splitting, filepath) - - part, filepath = find_existing_splitted_output!(file_splitting, filepath, overwrite_existing) - + if isnothing(overwrite_existing) if isfile(filepath) overwrite_existing = false diff --git a/src/OutputWriters/output_writer_utils.jl b/src/OutputWriters/output_writer_utils.jl index 00abb54ae8..79dbf2a0a1 100644 --- a/src/OutputWriters/output_writer_utils.jl +++ b/src/OutputWriters/output_writer_utils.jl @@ -52,23 +52,47 @@ function update_file_splitting_schedule!(schedule::FileSizeLimit, filepath) return nothing end +function current_split_filelist(dir, filepath) + filename = first(split(basename(filepath),".")) * "_part" + filelist = filter(startswith(filename), readdir(dir)) + return filelist +end + +function current_split_filename(dir, filepath) + filelist = current_split_filelist(dir, filepath) + existing_files = length(filelist) > 0 + if existing_files + return last(filelist) + end + return filepath +end + # Update schedule based on user input -find_existing_splitted_output!(schedule, filepath) = nothing +number_of_split_files!(schedule, filepath) = nothing -function find_existing_splitted_output!(schedule::TimeInterval, filepath, overwrite_existing) - folder = dirname(filepath) - filename = first(split(basename(filepath),".")) * "_part" - filelist = filter(startswith(filename), readdir(folder)) +function number_of_split_files!(schedule::TimeInterval, dir, filepath, overwrite_existing) + filelist = current_split_filelist(dir, filepath) existing_files = length(filelist) > 0 if existing_files && !overwrite_existing - @warn "Split files found, Mode will be set to append to existing file:"* - joinpath(folder, last(filelist)) + @warn "Split files found, Model will be set to append + to existing file:"* joinpath(dir, last(filelist)) schedule.actuations = length(filelist) - 1 - return Int(length(filelist)), joinpath(folder, last(filelist)) + return Int(length(filelist)) end - return 1, filepath + return 1 end +function number_of_split_files!(schedule::FileSizeLimit, dir, filepath, overwrite_existing) + filelist = current_split_filelist(dir, filepath) + existing_files = length(filelist) > 0 + if existing_files && !overwrite_existing + @warn "Split files found, Model will be set to append + to existing file:"* joinpath(dir, last(filelist)) + schedule.actuations = length(filelist) - 1 + return Int(length(filelist)) + end + return 1 +end """ ext(ow)