Skip to content

Commit

Permalink
Merge pull request #1316 from cal-itp/dask-from-map-switch
Browse files Browse the repository at this point in the history
Add `dask.from_map` in `dask_utils/time_series_utils`
  • Loading branch information
tiffanychu90 authored Dec 3, 2024
2 parents 63d793b + f632501 commit dc5f6d7
Show file tree
Hide file tree
Showing 6 changed files with 551 additions and 125 deletions.
51 changes: 51 additions & 0 deletions _shared_utils/shared_utils/dask_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from calitp_data_analysis import utils
from dask import compute, delayed
from dask.delayed import Delayed # type hint
from shared_utils import time_helpers

fs = gcsfs.GCSFileSystem()

Expand Down Expand Up @@ -106,3 +107,53 @@ def concatenate_list_of_files(
full_df = pd.concat(results, axis=0).reset_index(drop=True)

return full_df


def import_df_func(
path: str,
one_date: str,
data_type: Literal["df", "gdf"] = "df",
add_date: bool = False,
**kwargs,
):
"""
Set up function with little modifications based on
the dask docs. Modifications are that we want to read in
pandas or geopandas df for a single date.
https://docs.dask.org/en/latest/generated/dask.dataframe.from_map.html
https://blog.dask.org/2023/04/12/from-map
"""
if data_type == "gdf":
df = gpd.read_parquet(
f"{path}_{one_date}.parquet",
**kwargs,
).drop_duplicates()

else:
df = pd.read_parquet(
f"{path}_{one_date}.parquet",
**kwargs,
).drop_duplicates()

if add_date:
df = time_helpers.add_service_date(df, one_date)

return df


def get_ddf(paths, date_list, data_type, get_pandas: bool = False, **kwargs):
"""
Set up function with little modifications based on
the dask docs. Modifications are that we want to read in
a list of dates.
https://docs.dask.org/en/latest/generated/dask.dataframe.from_map.html
https://blog.dask.org/2023/04/12/from-map
"""
ddf = dd.from_map(import_df_func, paths, date_list, data_type=data_type, **kwargs).drop_duplicates()

if get_pandas:
ddf = ddf.compute()

return ddf
4 changes: 3 additions & 1 deletion _shared_utils/shared_utils/gtfs_analytics_data.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,15 @@ stop_segments:
shape_stop_cols: ["shape_array_key", "shape_id", "stop_sequence"]
stop_pair_cols: ["stop_pair", "stop_pair_name"]
route_dir_cols: ["route_id", "direction_id"]
segment_cols: ["schedule_gtfs_dataset_key", "route_id", "direction_id", "stop_pair", "geometry"]
segment_cols: ["route_id", "direction_id", "stop_pair", "geometry"]
shape_stop_single_segment: "rollup_singleday/speeds_shape_stop_segments" #-- stop after Oct 2024
route_dir_single_segment: "rollup_singleday/speeds_route_dir_segments"
route_dir_single_segment_detail: "rollup_singleday/speeds_route_dir_segments_detail" # interim for speedmaps
route_dir_multi_segment: "rollup_multiday/speeds_route_dir_segments"
segments_file: "segment_options/shape_stop_segments"
max_speed: ${speed_vars.max_speed}
route_dir_quarter_segment: "rollup_multiday/quarter_speeds_route_dir_segments"
route_dir_year_segment: "rollup_multiday/year_speeds_route_dir_segments"

rt_stop_times:
dir: ${gcs_paths.SEGMENT_GCS}
Expand Down
24 changes: 24 additions & 0 deletions _shared_utils/shared_utils/time_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,27 @@ def add_time_span_columns(df: pd.DataFrame, time_span_num: str) -> pd.DataFrame:
)

return df


def add_service_date(df: pd.DataFrame, date: str) -> pd.DataFrame:
"""
Add a service date column for GTFS data.
Pipe this function when we want to use dask_utils.
"""
df = df.assign(service_date=pd.to_datetime(date))
return df


def add_quarter(df: pd.DataFrame, date_col: str = "service_date") -> pd.DataFrame:
"""
Parse a date column for the year, quarter it is in.
Pipe this function when we want to use dask_utils.
"""
df = df.assign(
year=df[date_col].dt.year,
quarter=df[date_col].dt.quarter,
)

df = df.assign(year_quarter=df.year.astype(str) + "_Q" + df.quarter.astype(str))

return df
Loading

0 comments on commit dc5f6d7

Please sign in to comment.