Skip to content

Commit

Permalink
Pre-interpolate data loaders
Browse files Browse the repository at this point in the history
So that reprojection doesn't occur during model running
  • Loading branch information
ctessum committed Feb 28, 2025
1 parent 1141d8d commit a7c9609
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 29 deletions.
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "EarthSciData"
uuid = "a293c155-435f-439d-9c11-a083b6b47337"
authors = ["EarthSciML Authors"]
version = "0.12.3"
version = "0.12.4"

[deps]
DataInterpolations = "82cc6244-b520-54b8-b5a6-8a565e85f1d0"
Expand Down
80 changes: 67 additions & 13 deletions src/load.jl
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ struct MetaData
"The index number of the z-dimension (e.g. vertical level)"
zdim::Int
"Grid staggering for each dimension. (true=edge-aligned, false=center-aligned)"
staggering::NTuple{3, Bool}
staggering::NTuple{3,Bool}
end

"""
Expand Down Expand Up @@ -176,8 +176,8 @@ mutable struct DataSetInterpolator{To,N,N2,FT,ITPT,DomT}
N = ndims(data)
N2 = N - 1
times = [DateTime(0, 1, 1) + Hour(i) for i 1:cache_size]
_, itp2 = create_interpolator!(To, interp_cache, data,
repeat([0:1], length(metadata.varsize)), times)
_, itp2 = create_interpolator!(interp_cache, data,
repeat([0:0.1:0.1], length(metadata.varsize)), times)
ITPT = typeof(itp2)

if domain.spatial_ref == metadata.native_sr
Expand All @@ -203,6 +203,14 @@ end
function replace_in_tuple(t::NTuple{N,T1}, index1::Int, v1::T2, index2::Int, v2::T2) where {T1,T2,N}
ntuple(i -> i == index1 ? T1(v1) : i == index2 ? T1(v2) : t[i], N)
end
function tuple_from_vals(index1::Int, v1::T, index2::Int, v2::T) where {T}
ntuple(i -> i == index1 ? v1 : i == index2 ? v2 :
throw(ArgumentError("missing index")), 2)
end
function tuple_from_vals(index1::Int, v1::T, index2::Int, v2::T, index3::Int, v3::T) where {T}
ntuple(i -> i == index1 ? v1 : i == index2 ? v2 : i == index3 ? v3 :
throw(ArgumentError("missing index")), 3)
end

function Base.show(io::IO, itp::DataSetInterpolator)
print(io, "DataSetInterpolator{$(typeof(itp.fs)), $(itp.varname)}")
Expand All @@ -227,8 +235,8 @@ function knots2range(knots, reltol=0.05)
end

"""Create a new interpolator, overwriting `interp_cache`."""
function create_interpolator!(To, interp_cache, data, coords, times)
grid = Tuple(knots2range.([coords..., datetime2unix.(times)]))
function create_interpolator!(interp_cache, data, coords, times)
grid = tuple(coords..., knots2range(datetime2unix.(times)))
copyto!(interp_cache, data)
itp = interpolate!(interp_cache, BSpline(Linear()))
itp = scale(itp, grid)
Expand All @@ -239,7 +247,8 @@ function update_interpolator!(itp::DataSetInterpolator{To}) where {To}
if size(itp.interp_cache) != size(itp.data)
itp.interp_cache = similar(itp.data)
end
grid, itp2 = create_interpolator!(To, itp.interp_cache, itp.data, itp.metadata.coords, itp.times)
coords = _model_grid(itp)
grid, itp2 = create_interpolator!(itp.interp_cache, itp.data, coords, itp.times)
@assert all([length(g) for g in grid] .== size(itp.data)) "invalid data size: $([length(g) for g in grid]) != $(size(itp.data))"
itp.itp = itp2
end
Expand Down Expand Up @@ -312,9 +321,25 @@ function async_loader(itp::DataSetInterpolator)
end
end

# Get the model grid for this interpolator.
function _model_grid(itp::DataSetInterpolator)
grid = EarthSciMLBase.grid(itp.domain, itp.metadata.staggering)
if length(itp.metadata.varsize) == 2 && itp.metadata.zdim <= 0
grid_size = tuple_from_vals(itp.metadata.xdim, grid[1],
itp.metadata.ydim, grid[2])
elseif length(itp.metadata.varsize) == 3
grid_size = tuple_from_vals(itp.metadata.xdim, grid[1],
itp.metadata.ydim, grid[2], itp.metadata.zdim, grid[3])
else
error("Invalid data size")
end
end

function initialize!(itp::DataSetInterpolator, t::DateTime)
itp.load_cache = zeros(eltype(itp.load_cache), itp.metadata.varsize...)
itp.data = zeros(eltype(itp.data), itp.metadata.varsize..., size(itp.data, length(size(itp.data)))) # Add a dimension for time.
grid_size = length.(_model_grid(itp))
itp.data = zeros(eltype(itp.data), grid_size...,
size(itp.data, length(size(itp.data)))) # Add a dimension for time.
Threads.@spawn async_loader(itp)
itp.initialized = true
end
Expand Down Expand Up @@ -343,6 +368,7 @@ function update!(itp::DataSetInterpolator, t::DateTime)
error("Unexpected time ordering, can't reorder indexes $(idxs_in_times) to $(idxs_in_cache)")
end

model_grid = EarthSciMLBase.grid(itp.domain, itp.metadata.staggering)
# Load the additional times we need
for idx in idxs_not_in_times
d = selectdim(itp.data, N, idx)
Expand All @@ -351,7 +377,7 @@ function update!(itp::DataSetInterpolator, t::DateTime)
if r != 0
throw(r)
end
interpolate_from!(itp, d, itp.load_cache) # Copy results to correct location
interpolate_from!(itp, d, itp.load_cache, model_grid) # Copy results to correct location
put!(itp.copyfinish, 0) # Let the loader know we've finished copying
end
itp.times = times
Expand All @@ -360,12 +386,40 @@ function update!(itp::DataSetInterpolator, t::DateTime)
update_interpolator!(itp)
end

function interpolate_from!(dsi::DataSetInterpolator, dst, src)
function interpolate_from!(dsi::DataSetInterpolator, dst::AbstractArray{T,N}, src::AbstractArray{T,N},
model_grid, extrapolate_type=Flat()) where {T,N}
data_grid = Tuple(knots2range.(dsi.metadata.coords))
model_grid = EarthSciMLBase.grid(dsi.domain, dsi.metadata.staggering)
dsi.metadata.xdim, dsi.metadata.ydim
itp = interpolate!(src, BSpline(Linear()))
itp = scale(itp, data_grid)
dst .= src
itp = extrapolate(scale(itp, data_grid), extrapolate_type)
if N == 3
for (i, x) in enumerate(model_grid[1])
for (j, y) in enumerate(model_grid[2])
for (k, z) in enumerate(model_grid[3])
idx = tuple_from_vals(dsi.metadata.xdim, i,
dsi.metadata.ydim, j, dsi.metadata.zdim, k)
locs = tuple_from_vals(dsi.metadata.xdim, x,
dsi.metadata.ydim, y, dsi.metadata.zdim, z)
locs = dsi.coord_trans(locs)
dst[idx...] = itp(locs...)
end
end
end
elseif N == 2 && dsi.metadata.zdim <= 0
for (i, x) in enumerate(model_grid[1])
for (j, y) in enumerate(model_grid[2])
idx = tuple_from_vals(dsi.metadata.xdim, i,
dsi.metadata.ydim, j)
locs = tuple_from_vals(dsi.metadata.xdim, x,
dsi.metadata.ydim, y)
locs = dsi.coord_trans(locs)
dst[idx...] = itp(locs...)
end
end
else
error("Invalid dimension configuration")
end
dst
end

function lazyload!(itp::DataSetInterpolator, t::DateTime)
Expand Down Expand Up @@ -423,7 +477,7 @@ Interpolate without checking if the data has been correctly loaded for the given
@generated function interp_unsafe(itp::DataSetInterpolator{T1,N,N2}, t::DateTime, locs::Vararg{T2,N2}) where {T1,T2,N,N2}
if N2 == N - 1 # Number of locs has to be one less than the number of data dimensions so we can add the time in.
quote
locs = itp.coord_trans(locs)
#locs = itp.coord_trans(locs)
try
itp.itp(locs..., datetime2unix(t))
catch err
Expand Down
41 changes: 29 additions & 12 deletions test/load_test.jl
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ te = DateTime(2022, 5, 3)
fs = EarthSciData.GEOSFPFileSet("4x5", "A3dyn", t, te)

domain = DomainInfo(t, te;
latrange=deg2rad(-85.0f0):deg2rad(2):deg2rad(85.0f0),
lonrange=deg2rad(-180.0f0):deg2rad(2.5):deg2rad(175.0f0),
lonrange=deg2rad(-175.0):deg2rad(2.5):deg2rad(175.0),
latrange=deg2rad(-85.0):deg2rad(2):deg2rad(85.0),
levrange=1:10, dtype=Float64)

@test EarthSciData.url(fs, t) == "https://geos-chem.s3-us-west-2.amazonaws.com/GEOS_4x5/GEOS_FP/2022/05/GEOSFP.20220501.A3dyn.4x5.nc"
Expand All @@ -39,6 +39,14 @@ itp = EarthSciData.DataSetInterpolator{Float32}(fs, "U", t, te, domain)
@test EarthSciData.dimnames(itp) == ["lon", "lat", "lev"]
@test issetequal(EarthSciData.varnames(fs), ["U", "OMEGA", "RH", "DTRAIN", "V"])

@testset "grid" begin
grd = EarthSciData._model_grid(itp)
length.(grd) == (142, 86, 10)
grd[1] deg2rad(-175.0 - 1.25):deg2rad(2.5):deg2rad(175.0 + 1.25)
grd[2] deg2rad(-85.0):deg2rad(2):deg2rad(85.0)
grd[3] 1:1.0:10
end

@testset "interpolation" begin
uvals = []
times = DateTime(2022, 5, 1):Hour(1):DateTime(2022, 5, 3)
Expand All @@ -48,8 +56,8 @@ itp = EarthSciData.DataSetInterpolator{Float32}(fs, "U", t, te, domain)
for i 4:3:length(uvals)-1
@test uvals[i] (uvals[i-1] + uvals[i+1]) / 2 atol = 1e-2
end
want_uvals = [-0.047425747f0, 0.064035736f0, 0.11166346f0, 0.09545743f0, 0.07925139f0,
-0.011301707f0, -0.17620188f0, -0.34110206f0, -0.501398f0, -0.65708977f0]
want_uvals = [-0.07933916f0, 0.03189625f0, 0.079422876f0, 0.06324073f0, 0.047058582f0,
-0.044040862f0, -0.21005762f0, -0.37607434f0, -0.53645617f0, -0.6912031f0]
@test uvals[1:10] want_uvals

# Test that shuffling the times doesn't change the results.
Expand Down Expand Up @@ -79,11 +87,13 @@ end
dfi = EarthSciData.DataFrequencyInfo(fs)
tt = dfi.centerpoints[EarthSciData.centerpoint_index(dfi, t)]
v = tv(fs, tt)
cache .= [v, v * 0.5, v * 2.0]
cache[:, 1] .= [v, v * 0.5, v * 2.0]
cache[:, 2] .= [v, v * 0.5, v * 2.0]
end
function EarthSciData.loadmetadata(fs::DummyFileSet, varname)
return EarthSciData.MetaData([[0.0, 0.5, 1.0]], u"m", "description", ["x"], [3],
"+proj=longlat +datum=WGS84 +no_defs", 1, 1, -1, (false, false, false))
return EarthSciData.MetaData([[0.0, 0.5, 1.0], [0.0, 1.0]], u"m",
"description", ["x"], [3, 2],
"+proj=longlat +datum=WGS84 +no_defs", 1, 2, -1, (false, false, false))
end

fs = DummyFileSet(DateTime(2022, 4, 30), DateTime(2022, 5, 4))
Expand All @@ -99,7 +109,8 @@ end

answerdata = [tv(fs, t) * v for t dfi.centerpoints, v [1.0, 0.5, 2.0]]

grid = Tuple(EarthSciData.knots2range.([datetime2unix.(dfi.centerpoints), [0.0, 0.5, 1.0]]))
grid = Tuple(EarthSciData.knots2range.([datetime2unix.(dfi.centerpoints),
[0.0, 0.5, 1.0]]))
answer_itp = scale(interpolate(answerdata, BSpline(Linear())), grid)

times = DateTime(2022, 5, 1):Hour(1):DateTime(2022, 5, 3)
Expand All @@ -109,14 +120,14 @@ end
answers = zeros(Float32, length(times), length(xs))
for (i, tt) enumerate(times)
for (j, x) enumerate(xs)
uvals[i, j] = interp!(itp, tt, x)
uvals[i, j] = interp!(itp, tt, (x, x)...)
answers[i, j] = answer_itp(datetime2unix(tt), x)
end
end

@test uvals answers

interp!(itp, times[end], xs[end])
interp!(itp, times[end], xs[end], xs[end])
@test length(itp.times) == 2
@test itp.times == [DateTime("2022-05-02T22:30:00"), DateTime("2022-05-03T01:30:00")]

Expand All @@ -126,7 +137,7 @@ end
tt = times[i]
for j randperm(length(xs))
x = xs[j]
uvals[i, j] = interp!(itp, tt, x)
uvals[i, j] = interp!(itp, tt, x, x)
answers[i, j] = answer_itp(datetime2unix(tt), x)
end
end
Expand All @@ -140,7 +151,7 @@ end
answers = zeros(Float32, length(times), length(xs))
for (i, tt) enumerate(times)
for (j, x) enumerate(xs)
uvals[i, j] = interp!(itp, tt, x)
uvals[i, j] = interp!(itp, tt, x, x)
answers[i, j] = answer_itp(datetime2unix(tt), x)
end
end
Expand Down Expand Up @@ -170,3 +181,9 @@ end
true
end
end

@testset "tuple_from_vals" begin
@test EarthSciData.tuple_from_vals(1, 1, 2, 2, 3, 3) == (1, 2, 3)
@test EarthSciData.tuple_from_vals(2, 2, 1, 1, 3, 3) == (1, 2, 3)
@test EarthSciData.tuple_from_vals(3, 3, 2, 2, 1, 1) == (1, 2, 3)
end
4 changes: 2 additions & 2 deletions test/nei2016monthly_test.jl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ sample_time = DateTime(2016, 5, 1)

@testset "correct projection" begin
itp = EarthSciData.DataSetInterpolator{Float32}(fileset, "NOX", ts, te, domain)
@test interp!(itp, sample_time, deg2rad(-97.0f0), deg2rad(40.0f0)) 9.211331f-10
@test interp!(itp, sample_time, deg2rad(-97.0f0), deg2rad(40.0f0)) 1.256768f-9
end

@testset "incorrect projection" begin
Expand All @@ -42,7 +42,7 @@ end

@testset "Out of domain" begin
itp = EarthSciData.DataSetInterpolator{Float32}(fileset, "NOX", ts, te, domain)
@test_throws BoundsError interp!(itp, sample_time, deg2rad(0.0f0), deg2rad(40.0f0))
@test interp!(itp, sample_time, deg2rad(0.0f0), deg2rad(40.0f0)) 0.0
end


Expand Down
2 changes: 1 addition & 1 deletion test/solve_test.jl
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ end
DateTime("2016-04-16T00:00:00"), DateTime("2016-05-16T12:00:00")]
prob = ODEProblem(sys2, [], get_tspan(domain), [])
sol = solve(prob)
@test only(sol.u[end]) 6.3645540358326396e-6
@test only(sol.u[end]) 5.322912896619149e-6
end

emis = NEI2016MonthlyEmis("mrggrid_withbeis_withrwc", domain)
Expand Down

2 comments on commit a7c9609

@ctessum
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration pull request created: JuliaRegistries/General/126054

Tip: Release Notes

Did you know you can add release notes too? Just add markdown formatted text underneath the comment after the text
"Release notes:" and it will be added to the registry PR, and if TagBot is installed it will also be added to the
release that TagBot creates. i.e.

@JuliaRegistrator register

Release notes:

## Breaking changes

- blah

To add them here just re-invoke and the PR will be updated.

Tagging

After the above pull request is merged, it is recommended that a tag is created on this repository for the registered package version.

This will be done automatically if the Julia TagBot GitHub Action is installed, or can be done manually through the github interface, or via:

git tag -a v0.12.4 -m "<description of version>" a7c960977d086a33086d3f233bfffd5f1c40c5b9
git push origin v0.12.4

Please sign in to comment.