Skip to content

Commit

Permalink
Add zstandard compression (NOAA-EMC#706)
Browse files Browse the repository at this point in the history
* Add zstandard compression

* Enable shuffling with zstandard compression

* Refactor io/module_write_netcdf.F90
  • Loading branch information
DusanJovic-NOAA authored Oct 31, 2023
1 parent 7f94132 commit 1250b41
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 83 deletions.
2 changes: 1 addition & 1 deletion io/module_fv3_io_def.F90
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ module module_fv3_io_def
real,dimension(:),allocatable :: cen_lon, cen_lat
real,dimension(:),allocatable :: lon1, lat1, lon2, lat2, dlon, dlat
real,dimension(:),allocatable :: stdlat1, stdlat2, dx, dy
integer,dimension(:),allocatable :: ideflate, nbits
integer,dimension(:),allocatable :: ideflate, nbits, zstandard_level
integer,dimension(:),allocatable :: ichunk2d, jchunk2d, ichunk3d, jchunk3d, kchunk3d

end module module_fv3_io_def
137 changes: 57 additions & 80 deletions io/module_write_netcdf.F90
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@

module module_write_netcdf

use mpi
use esmf
use netcdf
use module_fv3_io_def,only : ideflate, nbits, &
use module_fv3_io_def,only : ideflate, nbits, zstandard_level, &
ichunk2d,jchunk2d,ichunk3d,jchunk3d,kchunk3d, &
dx,dy,lon1,lat1,lon2,lat2, &
time_unlimited
use mpi

implicit none
private
Expand Down Expand Up @@ -83,8 +83,10 @@ subroutine write_netcdf(wrtfb, filename, &
integer :: oldMode
integer :: im_dimid, jm_dimid, tile_dimid, pfull_dimid, phalf_dimid, time_dimid, ch_dimid
integer :: im_varid, jm_varid, tile_varid, lon_varid, lat_varid, timeiso_varid
integer, dimension(:), allocatable :: dimids_2d, dimids_3d
integer, dimension(:), allocatable :: dimids_2d, dimids_3d, dimids, chunksizes
integer, dimension(:), allocatable :: varids
integer :: xtype
integer :: ishuffle
logical shuffle

logical :: is_cubed_sphere
Expand Down Expand Up @@ -315,86 +317,61 @@ subroutine write_netcdf(wrtfb, filename, &
call ESMF_FieldGet(fcstField(i), name=fldName, rank=rank, typekind=typekind, rc=rc); ESMF_ERR_RETURN(rc)

par_access = NF90_INDEPENDENT
! define variables

if (rank == 2) then
if (typekind == ESMF_TYPEKIND_R4) then
if (ideflate(grid_id) > 0) then
if (ichunk2d(grid_id) < 0 .or. jchunk2d(grid_id) < 0) then
! let netcdf lib choose chunksize
! shuffle filter on for 2d fields (lossless compression)
ncerr = nf90_def_var(ncid, trim(fldName), NF90_FLOAT, &
dimids_2d, varids(i), &
shuffle=.true.,deflate_level=ideflate(grid_id)); NC_ERR_STOP(ncerr)
else
if (is_cubed_sphere) then
ncerr = nf90_def_var(ncid, trim(fldName), NF90_FLOAT, &
dimids_2d, varids(i), &
shuffle=.true.,deflate_level=ideflate(grid_id),&
chunksizes=[ichunk2d(grid_id),jchunk2d(grid_id),tileCount,1]); NC_ERR_STOP(ncerr)
else
ncerr = nf90_def_var(ncid, trim(fldName), NF90_FLOAT, &
dimids_2d, varids(i), &
shuffle=.true.,deflate_level=ideflate(grid_id),&
chunksizes=[ichunk2d(grid_id),jchunk2d(grid_id), 1]); NC_ERR_STOP(ncerr)
end if
end if
! compression filters require collective access.
par_access = NF90_COLLECTIVE
else
ncerr = nf90_def_var(ncid, trim(fldName), NF90_FLOAT, &
dimids_2d, varids(i)); NC_ERR_STOP(ncerr)
end if
else if (typekind == ESMF_TYPEKIND_R8) then
ncerr = nf90_def_var(ncid, trim(fldName), NF90_DOUBLE, &
dimids_2d, varids(i)); NC_ERR_STOP(ncerr)
else
if (mype==0) write(0,*)'Unsupported typekind ', typekind
call ESMF_Finalize(endflag=ESMF_END_ABORT)
end if
dimids = dimids_2d
else if (rank == 3) then
if (typekind == ESMF_TYPEKIND_R4) then
if (ideflate(grid_id) > 0) then
! shuffle filter off for 3d fields using lossy compression
if (nbits(grid_id) > 0) then
shuffle=.false.
dimids = dimids_3d
else
if (mype==0) write(0,*)'Unsupported rank ', rank
call ESMF_Finalize(endflag=ESMF_END_ABORT)
end if

if (typekind == ESMF_TYPEKIND_R4) then
xtype = NF90_FLOAT
else if (typekind == ESMF_TYPEKIND_R8) then
xtype = NF90_DOUBLE
else
if (mype==0) write(0,*)'Unsupported typekind ', typekind
call ESMF_Finalize(endflag=ESMF_END_ABORT)
end if

! define variable
ncerr = nf90_def_var(ncid, trim(fldName), xtype, dimids, varids(i)) ; NC_ERR_STOP(ncerr)

! compression, shuffling and chunking
if (ideflate(grid_id) > 0 .or. zstandard_level(grid_id) > 0) then
par_access = NF90_COLLECTIVE
if (rank == 2 .and. ichunk2d(grid_id) > 0 .and. jchunk2d(grid_id) > 0) then
if (is_cubed_sphere) then
chunksizes = [im, jm, tileCount, 1]
else
shuffle=.true.
chunksizes = [ichunk2d(grid_id), jchunk2d(grid_id), 1]
end if
if (ichunk3d(grid_id) < 0 .or. jchunk3d(grid_id) < 0 .or. kchunk3d(grid_id) < 0) then
! let netcdf lib choose chunksize
ncerr = nf90_def_var(ncid, trim(fldName), NF90_FLOAT, &
dimids_3d, varids(i), &
shuffle=shuffle,deflate_level=ideflate(grid_id)); NC_ERR_STOP(ncerr)
ncerr = nf90_def_var_chunking(ncid, varids(i), NF90_CHUNKED, chunksizes) ; NC_ERR_STOP(ncerr)
else if (rank == 3 .and. ichunk3d(grid_id) > 0 .and. jchunk3d(grid_id) > 0 .and. kchunk3d(grid_id) > 0) then
if (is_cubed_sphere) then
chunksizes = [im, jm, lm, tileCount, 1]
else
if (is_cubed_sphere) then
ncerr = nf90_def_var(ncid, trim(fldName), NF90_FLOAT, &
dimids_3d, varids(i), &
shuffle=shuffle,deflate_level=ideflate(grid_id),&
chunksizes=[ichunk3d(grid_id),jchunk3d(grid_id),kchunk3d(grid_id),tileCount,1]); NC_ERR_STOP(ncerr)
else
ncerr = nf90_def_var(ncid, trim(fldName), NF90_FLOAT, &
dimids_3d, varids(i), &
shuffle=shuffle,deflate_level=ideflate(grid_id),&
chunksizes=[ichunk3d(grid_id),jchunk3d(grid_id),kchunk3d(grid_id), 1]); NC_ERR_STOP(ncerr)
end if
chunksizes = [ichunk3d(grid_id), jchunk3d(grid_id), kchunk3d(grid_id), 1]
end if
! compression filters require collective access.
par_access = NF90_COLLECTIVE
else
ncerr = nf90_def_var(ncid, trim(fldName), NF90_FLOAT, &
dimids_3d, varids(i)); NC_ERR_STOP(ncerr)
end if
else if (typekind == ESMF_TYPEKIND_R8) then
ncerr = nf90_def_var(ncid, trim(fldName), NF90_DOUBLE, &
dimids_3d, varids(i)); NC_ERR_STOP(ncerr)
else
if (mype==0) write(0,*)'Unsupported typekind ', typekind
call ESMF_Finalize(endflag=ESMF_END_ABORT)
end if
else
if (mype==0) write(0,*)'Unsupported rank ', rank
call ESMF_Finalize(endflag=ESMF_END_ABORT)
ncerr = nf90_def_var_chunking(ncid, varids(i), NF90_CHUNKED, chunksizes) ; NC_ERR_STOP(ncerr)
end if

ishuffle = NF90_SHUFFLE
! shuffle filter off for 3d fields using lossy compression
if (rank == 3 .and. nbits(grid_id) > 0) then
ishuffle = NF90_NOSHUFFLE
end if
if (ideflate(grid_id) > 0) then
ncerr = nf90_def_var_deflate(ncid, varids(i), ishuffle, 1, ideflate(grid_id)) ; NC_ERR_STOP(ncerr)
else if (zstandard_level(grid_id) > 0) then
ncerr = nf90_def_var_deflate(ncid, varids(i), ishuffle, 0, 0) ; NC_ERR_STOP(ncerr)
ncerr = nf90_def_var_zstandard(ncid, varids(i), zstandard_level(grid_id)) ; NC_ERR_STOP(ncerr)
end if

end if

if (par) then
ncerr = nf90_var_par_access(ncid, varids(i), par_access); NC_ERR_STOP(ncerr)
end if
Expand Down Expand Up @@ -649,7 +626,7 @@ subroutine write_netcdf(wrtfb, filename, &
if (typekind == ESMF_TYPEKIND_R4) then
if (par) then
call ESMF_FieldGet(fcstField(i), localDe=0, farrayPtr=array_r4_3d, rc=rc); ESMF_ERR_RETURN(rc)
if (ideflate(grid_id) > 0 .and. nbits(grid_id) > 0) then
if ((ideflate(grid_id) > 0 .or. zstandard_level(grid_id) > 0) .and. nbits(grid_id) > 0) then
dataMax = maxval(array_r4_3d)
dataMin = minval(array_r4_3d)
call mpi_allreduce(mpi_in_place,dataMax,1,mpi_real4,mpi_max,mpi_comm,ierr)
Expand All @@ -665,15 +642,15 @@ subroutine write_netcdf(wrtfb, filename, &
call ESMF_ArrayGather(array, array_r4_3d_cube(:,:,:,t), rootPet=0, tile=t, rc=rc); ESMF_ERR_RETURN(rc)
end do
if (mype==0) then
if (ideflate(grid_id) > 0 .and. nbits(grid_id) > 0) then
if ((ideflate(grid_id) > 0 .or. zstandard_level(grid_id) > 0) .and. nbits(grid_id) > 0) then
call quantize_array(array_r4_3d_cube, minval(array_r4_3d_cube), maxval(array_r4_3d_cube), nbits(grid_id), compress_err(i))
end if
ncerr = nf90_put_var(ncid, varids(i), values=array_r4_3d_cube, start=start_idx); NC_ERR_STOP(ncerr)
end if
else
call ESMF_FieldGather(fcstField(i), array_r4_3d, rootPet=0, rc=rc); ESMF_ERR_RETURN(rc)
if (mype==0) then
if (ideflate(grid_id) > 0 .and. nbits(grid_id) > 0) then
if ((ideflate(grid_id) > 0 .or. zstandard_level(grid_id) > 0) .and. nbits(grid_id) > 0) then
call quantize_array(array_r4_3d, minval(array_r4_3d), maxval(array_r4_3d), nbits(grid_id), compress_err(i))
end if
ncerr = nf90_put_var(ncid, varids(i), values=array_r4_3d, start=start_idx); NC_ERR_STOP(ncerr)
Expand Down Expand Up @@ -711,7 +688,7 @@ subroutine write_netcdf(wrtfb, filename, &

end do ! end fieldCount

if (ideflate(grid_id) > 0 .and. nbits(grid_id) > 0 .and. do_io) then
if ((ideflate(grid_id) > 0 .or. zstandard_level(grid_id) > 0) .and. nbits(grid_id) > 0 .and. do_io) then
ncerr = nf90_redef(ncid=ncid); NC_ERR_STOP(ncerr)
do i=1, fieldCount
if (compress_err(i) > 0) then
Expand Down
14 changes: 13 additions & 1 deletion io/module_write_restart_netcdf.F90
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@

module module_write_restart_netcdf

use mpi
use esmf
use fms
use mpp_mod, only : mpp_chksum ! needed for fms 2023.02
use netcdf
use mpi
use module_fv3_io_def,only : zstandard_level

implicit none
private
Expand Down Expand Up @@ -372,6 +374,16 @@ subroutine write_restart_netcdf(wrtfb, filename, &
ncerr = nf90_var_par_access(ncid, varids(i), par_access); NC_ERR_STOP(ncerr)
end if

if (zstandard_level(1) > 0) then
ncerr = nf90_def_var_zstandard(ncid, varids(i), zstandard_level(1))
if (ncerr /= nf90_noerr) then
if (ncerr == nf90_enofilter) then
if (mype==0) write(0,*) 'Zstandard filter not found.'
end if
NC_ERR_STOP(ncerr)
end if
end if

end do ! i=1,fieldCount

ncerr = nf90_put_att(ncid, NF90_GLOBAL, "NumFilesInSet", 1); NC_ERR_STOP(ncerr)
Expand Down
18 changes: 17 additions & 1 deletion io/module_wrt_grid_comp.F90
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ module module_wrt_grid_comp
use mpi
use esmf
use fms
use mpp_mod, only : mpp_init ! needed for fms 2023.02

use write_internal_state
use module_fv3_io_def, only : num_pes_fcst, &
Expand All @@ -40,7 +41,7 @@ module module_wrt_grid_comp
cen_lon, cen_lat, &
lon1, lat1, lon2, lat2, dlon, dlat, &
stdlat1, stdlat2, dx, dy, iau_offset, &
ideflate, lflname_fulltime
ideflate, zstandard_level, lflname_fulltime
use module_write_netcdf, only : write_netcdf
use module_write_restart_netcdf
use physcons, only : pi => con_pi
Expand Down Expand Up @@ -361,6 +362,7 @@ subroutine wrt_initialize_p1(wrt_comp, imp_state_write, exp_state_write, clock,
allocate(kchunk3d(ngrids))
allocate(ideflate(ngrids))
allocate(nbits(ngrids))
allocate(zstandard_level(ngrids))

allocate(wrt_int_state%out_grid_info(ngrids))

Expand Down Expand Up @@ -466,13 +468,27 @@ subroutine wrt_initialize_p1(wrt_comp, imp_state_write, exp_state_write, clock,
call ESMF_ConfigGetAttribute(config=CF,value=jchunk3d(n),default=0,label ='jchunk3d:',rc=rc)
call ESMF_ConfigGetAttribute(config=CF,value=kchunk3d(n),default=0,label ='kchunk3d:',rc=rc)

! zstandard compression flag
call ESMF_ConfigGetAttribute(config=CF,value=zstandard_level(n),default=0,label ='zstandard_level:',rc=rc)
if (zstandard_level(n) < 0) zstandard_level(n)=0

call ESMF_ConfigGetAttribute(config=CF,value=nbits(n),default=0,label ='nbits:',rc=rc)

! zlib compression flag
call ESMF_ConfigGetAttribute(config=CF,value=ideflate(n),default=0,label ='ideflate:',rc=rc)
if (ideflate(n) < 0) ideflate(n)=0

call ESMF_ConfigGetAttribute(config=CF,value=nbits(n),default=0,label ='nbits:',rc=rc)

if (ideflate(n) > 0 .and. zstandard_level(n) > 0) then
write(0,*)"wrt_initialize_p1: zlib and zstd compression cannot be both enabled at the same time"
call ESMF_LogWrite("wrt_initialize_p1: zlib and zstd compression cannot be both enabled at the same time",ESMF_LOGMSG_ERROR,rc=RC)
call ESMF_Finalize(endflag=ESMF_END_ABORT)
end if

if (lprnt) then
print *,'ideflate=',ideflate(n),' nbits=',nbits(n)
print *,'zstandard_level=',zstandard_level(n)
end if
! nbits quantization level for lossy compression (must be between 1 and 31)
! 1 is most compression, 31 is least. If outside this range, set to zero
Expand Down

0 comments on commit 1250b41

Please sign in to comment.