Skip to content

Commit

Permalink
Merge pull request #24 from lindsayplatt/pull_missing_data_fetch
Browse files Browse the repository at this point in the history
Welcome, KS & FL 🍍
  • Loading branch information
lindsayplatt authored Jul 7, 2021
2 parents 6529845 + ac67a06 commit ebd5d87
Show file tree
Hide file tree
Showing 12 changed files with 268 additions and 148 deletions.
13 changes: 13 additions & 0 deletions 0_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ targets:
command: c(I('gw-conditions/historic_gw_data_unfiltered.csv'))
historic_filtered_data_s3_fn:
command: c(I('gw-conditions/historic_gw_data_filtered.csv'))
historic_filtered_site_info_s3_fn:
command: c(I('gw-conditions/historic_gw_site_info_filtered.rds'))
historic_quantiles_s3_fn:
command: c(I('gw-conditions/historic_gw_quantiles.csv'))

Expand All @@ -46,9 +48,20 @@ targets:
# Start with CONUS only
viz_bounding_box:
command: c(I(-124.409591), I(24.523096), I(-66.949895), I(49.384358))
dv_fetch_size_limit:
command: I(200)
uv_fetch_size_limit:
command: I(50)

# DO NOT CHANGE UNLESS YOU WANT TO REBUILD ALL HISTORIC DATA, TOO!
gw_param_cd:
command: c(I('72019'))

# Additional fetch params for the states that don't report 72019 pcode
addl_gw_param_cd:
command: c(I('62610'))
addl_states:
command: c(I('FL'), I('KS'))

##-- Process configs --##

Expand Down
59 changes: 52 additions & 7 deletions 0_historic.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ targets:
- gw-conditions/historic_gw_site_info_unfiltered.rds.ind
- gw-conditions/historic_gw_data_unfiltered.csv.ind
- gw-conditions/historic_gw_data_filtered.csv.ind
- gw-conditions/historic_gw_site_info_filtered.rds.ind
- gw-conditions/historic_gw_quantiles.csv.ind

##-- All GW sites with continuous data for all time --##
Expand All @@ -31,16 +32,13 @@ targets:
historic_gw_sites_uv:
command: pull_sites_by_service(historic_gw_sites_all, I("uv"))

historic_gw_site_info:
command: fetch_gw_site_info(historic_gw_sites_all)
0_historic/out/historic_gw_site_info.rds:
command: saveRDS(file = target_name, historic_gw_site_info)

0_historic/out/historic_gw_data_dv.csv:
command: do_historic_gw_fetch(
final_target = target_name,
task_makefile = I('0_historic_fetch_tasks.yml'),
gw_site_nums = historic_gw_sites_dv,
gw_site_nums_obj_nm = I('historic_gw_sites_dv'),
param_cd_obj_nm = I('gw_param_cd'),
service_cd = I('dv'),
request_limit = historic_dv_fetch_size_limit,
'0_historic/src/do_historic_gw_fetch.R',
Expand All @@ -51,12 +49,42 @@ targets:
final_target = target_name,
task_makefile = I('0_historic_fetch_tasks.yml'),
gw_site_nums = historic_gw_sites_uv,
gw_site_nums_obj_nm = I('historic_gw_sites_uv'),
param_cd_obj_nm = I('gw_param_cd'),
service_cd = I('uv'),
request_limit = historic_uv_fetch_size_limit,
'0_historic/src/do_historic_gw_fetch.R',
'0_historic/src/fetch_nwis_historic.R')

# Special pull for just KS & just FL using `62610`.
# Read all about why on GitHub: https://github.com/USGS-VIZLAB/gw-conditions/issues/9
addl_site_nums:
command: fetch_addl_uv_sites(addl_states, addl_gw_param_cd, historic_start_date, historic_end_date)
0_historic/out/historic_gw_data_uv_addl.csv:
command: do_historic_gw_fetch(
final_target = target_name,
task_makefile = I('0_historic_fetch_tasks.yml'),
gw_site_nums = addl_site_nums,
gw_site_nums_obj_nm = I('addl_site_nums'),
param_cd_obj_nm = I('addl_gw_param_cd'),
service_cd = I('uv'),
request_limit = historic_uv_fetch_size_limit,
'0_historic/src/do_historic_gw_fetch.R',
'0_historic/src/fetch_nwis_historic.R')

0_historic/out/historic_gw_data.csv:
command: combine_gw_uv_and_dv(target_name, '0_historic/out/historic_gw_data_dv.csv', '0_historic/out/historic_gw_data_uv.csv')
command: combine_gw_fetches(
target_name,
'0_historic/out/historic_gw_data_dv.csv',
'0_historic/out/historic_gw_data_uv.csv',
'0_historic/out/historic_gw_data_uv_addl.csv')

historic_gw_sites:
command: combine_gw_sites(historic_gw_sites_all, addl_site_nums, gw_param_cd, addl_gw_param_cd)
historic_gw_site_info:
command: fetch_gw_site_info('0_historic/out/historic_gw_data.csv')
0_historic/out/historic_gw_site_info.rds:
command: saveRDS(file = target_name, historic_gw_site_info)

##-- Use historic data, filter to sites that meet a min years requirement, and calculate quantiles --##

Expand All @@ -66,11 +94,25 @@ targets:
historic_gw_data_fn = "0_historic/out/historic_gw_data.csv",
min_years = historic_min_yrs_per_site_for_quantile_calc,
allowed_site_types = historic_site_types_for_quantile_calc)
0_historic/out/filtered_historic_gw_site_info.rds:
command: gather_metadata_of_sites_used(
target_name,
site_data_filtered_fn = "0_historic/out/filtered_historic_gw_data.csv",
site_data_service_to_pull = historic_gw_sites,
site_metadata = historic_gw_site_info)

quantiles_to_calc:
command: seq(0, 1, by = 0.05)
# Sites that use "depth below" as their gw level need to be inversed. In the
# current implementation, this means any site that used pcode == '72019'
depth_below_sites:
command: historic_gw_sites_all[[I('site_no')]]
0_historic/out/historic_gw_quantiles.csv:
command: calculate_historic_quantiles(target_name, "0_historic/out/filtered_historic_gw_data.csv", quantiles_to_calc)
command: calculate_historic_quantiles(
target_name,
"0_historic/out/filtered_historic_gw_data.csv",
quantiles_to_calc,
inverse_sites = depth_below_sites)

##-- Now push historic data and quantiles to S3 to be used in the rest of the pipeline --##

Expand All @@ -83,5 +125,8 @@ targets:
gw-conditions/historic_gw_data_filtered.csv.ind:
command: s3_put(target_name, local_source = '0_historic/out/filtered_historic_gw_data.csv')

gw-conditions/historic_gw_site_info_filtered.rds.ind:
command: s3_put(remote_ind = target_name, local_source = '0_historic/out/filtered_historic_gw_site_info.rds')

gw-conditions/historic_gw_quantiles.csv.ind:
command: s3_put(target_name, local_source = '0_historic/out/historic_gw_quantiles.csv')
12 changes: 8 additions & 4 deletions 0_historic/src/calculate_historic_quantiles.R
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,20 @@ combine_gw_uv_and_dv <- function(out_file, dv_fn, uv_fn) {
}

# Per gage, calc quantiles using historic groundwater level data (no seasonality)
calculate_historic_quantiles <- function(target_name, historic_gw_data_fn, quantiles_to_calc) {

# If the gage reports "depth below", then we need to negate the values so that
# low percentiles line up with less water and high percentiles with more water.
# See this comment on GitHub for more detail/background:
# https://github.com/USGS-VIZLAB/gw-conditions/issues/9#issuecomment-854115170

calculate_historic_quantiles <- function(target_name, historic_gw_data_fn, quantiles_to_calc, inverse_sites) {
read_csv(historic_gw_data_fn, col_types = 'cDn') %>%
mutate(is_inverse = site_no %in% inverse_sites) %>%
split(.$site_no) %>%
map_dfr(., function(.x) {
quants <- quantile(.x$GWL, quantiles_to_calc, na.rm = TRUE)
quants <- quantile(.x$GWL*ifelse(.x$is_inverse, -1, 1), quantiles_to_calc, na.rm = TRUE)
names(quants) <- NULL # Otherwise, carries unnecessary metadata
tibble(quantile_nm = quantiles_to_calc*100, quantile_va = quants)
}, .id = "site_no") %>%
bind_rows() %>%
write_csv(target_name)

}
6 changes: 3 additions & 3 deletions 0_historic/src/do_historic_gw_fetch.R
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
do_historic_gw_fetch <- function(final_target, task_makefile, gw_site_nums, service_cd, request_limit, ...) {
do_historic_gw_fetch <- function(final_target, task_makefile, gw_site_nums, gw_site_nums_obj_nm, param_cd_obj_nm, service_cd, request_limit, ...) {

# Number indicating how many sites to include per dataRetrieval request to prevent
# errors from requesting too much at once. More relevant for surface water requests.
Expand Down Expand Up @@ -34,7 +34,7 @@ do_historic_gw_fetch <- function(final_target, task_makefile, gw_site_nums, serv
sprintf('gw_sites_%s_%s', service_cd, task_name)
},
command = function(..., task_name, steps) {
sprintf("historic_gw_sites_%s[%s]", service_cd, steps[["site_sequence"]]$target_name)
sprintf("%s[%s]", gw_site_nums_obj_nm, steps[["site_sequence"]]$target_name)
}
)

Expand All @@ -49,7 +49,7 @@ do_historic_gw_fetch <- function(final_target, task_makefile, gw_site_nums, serv
"gw_sites = %s," = steps[["subset_sites"]]$target_name,
"start_date = historic_start_date,",
"end_date = historic_end_date,",
"param_cd = gw_param_cd)")
"param_cd = %s)" = param_cd_obj_nm)
}
)

Expand Down
33 changes: 29 additions & 4 deletions 0_historic/src/fetch_nwis_historic.R
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,26 @@ pull_sites_by_service <- function(site_df, service) {
site_df %>% filter(data_type_cd == service) %>% pull(site_no)
}

fetch_gw_site_info <- function(site_df) {
readNWISsite(site_df$site_no) %>%
fetch_gw_site_info <- function(data_fn) {
sites <- read_csv(data_fn, col_types = 'cDn') %>%
pull(site_no) %>%
unique()
readNWISsite(sites) %>%
select(site_no, station_nm, state_cd, dec_lat_va, dec_long_va)
}

fetch_addl_uv_sites <- function(addl_states, param_cd, start_date, end_date) {
site_nums <- c()
for(i in 1:length(addl_states)) {
site_nums_i <- whatNWISdata(
stateCd = addl_states[i], parameterCd = param_cd, service = 'uv',
startDate = start_date, endDate = end_date) %>%
pull(site_no) %>% unique()
site_nums <- c(site_nums, site_nums_i)
}
return(site_nums)
}

fetch_gw_historic_dv <- function(target_name, gw_sites, start_date, end_date, param_cd) {
readNWISdv(
siteNumbers = gw_sites,
Expand Down Expand Up @@ -86,6 +101,16 @@ convert_uv_to_dv <- function(target_name, gw_uv_data_fn) {
write_feather(target_name)
}

combine_gw_data <- function(dv_fn, uv_fn) {
bind_rows(read_csv(dv_fn, col_types = 'cDn'), read_csv(uv_fn, col_types = 'cDn'))
combine_gw_fetches <- function(target_name, dv_fn, uv_fn, uv_addl_fn) {
read_csv(dv_fn, col_types = 'cDn') %>%
bind_rows(read_csv(uv_fn, col_types = 'cDn')) %>%
bind_rows(read_csv(uv_addl_fn, col_types = 'cDn')) %>%
write_csv(target_name)
}

combine_gw_sites <- function(gw_site_df, uv_addl_sites, param_cd, addl_param_cd) {
mutate(gw_site_df, param_cd = param_cd) %>%
bind_rows(tibble(site_no = uv_addl_sites,
data_type_cd = "uv",
param_cd = addl_param_cd))
}
18 changes: 12 additions & 6 deletions 0_historic/src/filter_historic_fetched_data.R
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,8 @@ filter_historic_fetched_data <- function(target_name, historic_gw_data_fn, min_y
filter(n_days >= 365*min_years) %>%
pull(site_no)

# 6/4/2021: One of the sites had its leading zero dropped along the way. I
# know that it is a LK site, which we are not keeping, so going to manually
# filter it out for now and return to this the next time we go to rebuild.
site_nums <- unique(historic_gw_data$site_no)
i_bad <- which(site_nums %in% c("5346050")) # It should be "05346050"
sites_meet_site_tp <- readNWISsite(site_nums[-i_bad]) %>%
sites_meet_site_tp <- readNWISsite(site_nums) %>%
select(site_no, site_tp_cd) %>%
filter(site_tp_cd %in% allowed_site_types) %>%
pull(site_no)
Expand All @@ -23,5 +19,15 @@ filter_historic_fetched_data <- function(target_name, historic_gw_data_fn, min_y
historic_gw_data %>%
filter(site_no %in% sites_meet_min_yrs) %>%
filter(site_no %in% sites_meet_site_tp) %>%
write_csv(target_name)
write_csv(file=target_name)
}

gather_metadata_of_sites_used <- function(target_name, site_data_filtered_fn, site_data_service_to_pull, site_metadata) {

sites_actually_used <- read_csv(site_data_filtered_fn) %>% pull(site_no) %>% unique()

site_data_service_to_pull %>%
left_join(site_metadata, by = "site_no") %>%
filter(site_no %in% sites_actually_used) %>%
saveRDS(target_name)
}
66 changes: 54 additions & 12 deletions 1_fetch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,68 @@ targets:
1_fetch/out/historic_gw_data_filtered.csv:
command: fetch_s3(target_name, historic_filtered_data_s3_fn)

# Only this one is absolutely necessary to run the pipeline in its current
# state (2_process/out/gw_daily_quantiles.csv depends on it). The others
# will not build unless you manually build the targets.
# Only these are absolutely necessary to run the pipeline in its current
# state. The others will not build unless you manually build the targets.
1_fetch/out/historic_gw_site_info_filtered.rds:
command: fetch_s3(target_name, historic_filtered_site_info_s3_fn)
1_fetch/out/historic_gw_quantiles.csv:
command: fetch_s3(target_name, historic_quantiles_s3_fn)

##-- GW sites and data for current viz time period --##

1_fetch/out/gw_sites.rds:
command: fetch_gw_sites(target_name, viz_start_date, viz_end_date, viz_bounding_box, gw_param_cd)
1_fetch/out/gw_site_info.csv:
command: fetch_gw_site_info(target_name, "1_fetch/out/gw_sites.rds")
# Only fetch data for sites that are in the quantiles data
gw_quantile_site_info:
command: readRDS("1_fetch/out/historic_gw_site_info_filtered.rds")

gw_sites:
command: readRDS('1_fetch/out/gw_sites.rds')
1_fetch/out/gw_data.csv:
gw_sites_dv:
command: pull_sites_by_query(gw_quantile_site_info, I("dv"), gw_param_cd)
gw_sites_uv:
command: pull_sites_by_query(gw_quantile_site_info, I("uv"), gw_param_cd)
gw_sites_uv_addl:
command: pull_sites_by_query(gw_quantile_site_info, I("uv"), addl_gw_param_cd)

1_fetch/out/gw_data_dv.csv:
command: do_gw_fetch(
final_target = target_name,
task_makefile = I('1_fetch_tasks.yml'),
gw_site_nums = gw_sites_dv,
gw_site_nums_obj_nm = I('gw_sites_dv'),
param_cd_obj_nm = I('gw_param_cd'),
service_cd = I('dv'),
request_limit = dv_fetch_size_limit,
'1_fetch/src/do_gw_fetch.R',
'1_fetch/src/fetch_nwis.R')
# Task table for `uv` includes a step to average the data to daily values.
1_fetch/out/gw_data_uv.csv:
command: do_gw_fetch(
final_target = target_name,
task_makefile = I('1_fetch_tasks.yml'),
gw_site_nums = gw_sites,
request_limit = 200,
gw_site_nums = gw_sites_uv,
gw_site_nums_obj_nm = I('gw_sites_uv'),
param_cd_obj_nm = I('gw_param_cd'),
service_cd = I('uv'),
request_limit = uv_fetch_size_limit,
'1_fetch/src/do_gw_fetch.R',
'1_fetch/src/fetch_nwis.R')

# Special pull for just KS & just FL using `62610`.
# Read all about why on GitHub: https://github.com/USGS-VIZLAB/gw-conditions/issues/9
1_fetch/out/gw_data_uv_addl.csv:
command: do_gw_fetch(
final_target = target_name,
task_makefile = I('1_fetch_tasks.yml'),
gw_site_nums = gw_sites_uv_addl,
gw_site_nums_obj_nm = I('gw_sites_uv_addl'),
param_cd_obj_nm = I('addl_gw_param_cd'),
service_cd = I('uv'),
request_limit = uv_fetch_size_limit,
'1_fetch/src/do_gw_fetch.R',
'1_fetch/src/fetch_nwis.R')

# Combine all data
1_fetch/out/gw_data.csv:
command: combine_gw_fetches(
target_name,
'1_fetch/out/gw_data_dv.csv',
'1_fetch/out/gw_data_uv.csv',
'1_fetch/out/gw_data_uv_addl.csv')
Loading

0 comments on commit ebd5d87

Please sign in to comment.