Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve concatenation of netCDF files with simulation pickup #3818

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions src/OutputWriters/netcdf_output_writer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -372,16 +372,21 @@ function NetCDFOutputWriter(model, outputs;
dimensions = Dict(),
overwrite_existing = nothing,
deflatelevel = 0,
part = 1,
josuemtzmo marked this conversation as resolved.
Show resolved Hide resolved
part = nothing,
file_splitting = NoFileSplitting(),
verbose = false)
mkpath(dir)

part = number_of_split_files!(file_splitting, dir, filename, overwrite_existing)
filename = current_split_filename(dir, filename)

filename = auto_extension(filename, ".nc")

filepath = abspath(joinpath(dir, filename))

initialize!(file_splitting, model)
update_file_splitting_schedule!(file_splitting, filepath)

if isnothing(overwrite_existing)
if isfile(filepath)
overwrite_existing = false
Expand Down Expand Up @@ -666,7 +671,7 @@ function initialize_nc_file!(filepath,
grid,
model)

mode = overwrite_existing ? "c" : "a"
mode = overwrite_existing || !isfile(filepath) ? "c" : "a"

# Add useful metadata
global_attributes["date"] = "This file was generated on $(now())."
Expand Down
42 changes: 42 additions & 0 deletions src/OutputWriters/output_writer_utils.jl
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,48 @@ function update_file_splitting_schedule!(schedule::FileSizeLimit, filepath)
return nothing
end

function current_split_filelist(dir, filepath)
filename = first(split(basename(filepath),".")) * "_part"
filelist = filter(startswith(filename), readdir(dir))
return filelist
end

function current_split_filename(dir, filepath)
filelist = current_split_filelist(dir, filepath)
existing_files = length(filelist) > 0
if existing_files
return last(filelist)
end
return filepath
end

# Update schedule based on user input
number_of_split_files!(schedule, filepath) = nothing

function number_of_split_files!(schedule::TimeInterval, dir, filepath, overwrite_existing)
filelist = current_split_filelist(dir, filepath)
existing_files = length(filelist) > 0
if existing_files && !overwrite_existing
@warn "Split files found, Model will be set to append
to existing file:"* joinpath(dir, last(filelist))
schedule.actuations = length(filelist) - 1
return Int(length(filelist))
end
return 1
end

function number_of_split_files!(schedule::FileSizeLimit, dir, filepath, overwrite_existing)
filelist = current_split_filelist(dir, filepath)
existing_files = length(filelist) > 0
if existing_files && !overwrite_existing
@warn "Split files found, Model will be set to append
to existing file:"* joinpath(dir, last(filelist))
schedule.actuations = length(filelist) - 1
return Int(length(filelist))
end
return 1
end

"""
ext(ow)

Expand Down
1 change: 0 additions & 1 deletion src/Utils/schedules.jl
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ end
function (schedule::TimeInterval)(model)
t = model.clock.time
t★ = next_actuation_time(schedule)

if t >= t★
if schedule.actuations < typemax(Int)
schedule.actuations += 1
Expand Down
62 changes: 59 additions & 3 deletions test/test_netcdf_output_writer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,18 @@ function test_netcdf_time_file_splitting(arch)
global_attributes = fake_attributes,
file_splitting = TimeInterval(4seconds),
overwrite_existing = true)

cp = Checkpointer(model;
dir = ".",
schedule=IterationInterval(12),
prefix="model_checkpoint",
overwrite_existing = false,
verbose = true
)

push!(simulation.output_writers, ow)
push!(simulation.output_writers, ow, cp)

run!(simulation)
run!(simulation, pickup=false)

for n in string.(1:3)
filename = "test_part$n.nc"
Expand All @@ -121,11 +129,58 @@ function test_netcdf_time_file_splitting(arch)
# Test that all files contain the user defined attributes.
@test ds.attrib["fake_attribute"] == "fake_attribute"

# Leave test directory clean.
close(ds)
# rm(filename)
end
# rm("test_part4.nc")

return nothing
end

function test_netcdf_pickup_from_time_file_splitting(arch)
grid = RectilinearGrid(arch, size=(16, 16, 16), extent=(1, 1, 1), halo=(1, 1, 1))
model = NonhydrostaticModel(; grid, buoyancy=SeawaterBuoyancy(), tracers=(:T, :S))
simulation = Simulation(model, Δt=1, stop_iteration=24seconds)

fake_attributes = Dict("fake_attribute"=>"fake_attribute")

ow = NetCDFOutputWriter(model, (; u=model.velocities.u);
dir = ".",
filename = "test.nc",
schedule = IterationInterval(2),
array_type = Array{Float64},
with_halos = true,
global_attributes = fake_attributes,
file_splitting = TimeInterval(4seconds),
overwrite_existing = true)

cp = Checkpointer(model;
dir = ".",
schedule=IterationInterval(12),
prefix="model_checkpoint",
overwrite_existing = false,
verbose = true
)

push!(simulation.output_writers, ow, cp)

run!(simulation, pickup=true)

for n in string.(1:6)
filename = "test_part$n.nc"
ds = NCDataset(filename,"r")
dimlength = length(ds["time"])
# Test that all files contain the same dimensions.
@test dimlength == 2
# Test that all files contain the user defined attributes.
@test ds.attrib["fake_attribute"] == "fake_attribute"

# Leave test directory clean.
close(ds)
rm(filename)
end
rm("test_part4.nc")
rm("test_part7.nc")

return nothing
end
Expand Down Expand Up @@ -929,6 +984,7 @@ for arch in archs
test_DateTime_netcdf_output(arch)
test_netcdf_size_file_splitting(arch)
test_netcdf_time_file_splitting(arch)
test_netcdf_pickup_from_time_file_splitting(arch)
test_TimeDate_netcdf_output(arch)
test_thermal_bubble_netcdf_output(arch)
test_thermal_bubble_netcdf_output_with_halos(arch)
Expand Down