Skip to content

Commit

Permalink
Update calitp data analysis - add gtfs-segments, pyairtable (#3267
Browse files Browse the repository at this point in the history
)

* (calitp_data_analysis): update geography_utils and utils

* add poetry files
  • Loading branch information
tiffanychu90 authored Feb 7, 2024
1 parent 7794933 commit 472d9ea
Show file tree
Hide file tree
Showing 4 changed files with 941 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,19 @@ def create_segments(
Input a geometry column, such as gdf.geometry.
Double check: segment_distance must be given in the same units as the CRS!
Use case:
gdf['segment_geometry'] = gdf.apply(
lambda x:
create_segments(x.geometry, int(segment_length)),
axis=1,
)
gdf2 = explode_segments(
gdf,
group_cols = ['route_key'],
segment_col = 'segment_geometry'
)
"""
lines = []

Expand All @@ -113,64 +126,54 @@ def create_segments(
return lines


def cut_segments(
gdf: gpd.GeoDataFrame,
group_cols: list = ["calitp_itp_id", "calitp_url_number", "route_id"],
segment_distance: int = 1_000,
def explode_segments(
gdf: gpd.GeoDataFrame, group_cols: list, segment_col: str = "segment_geometry"
) -> gpd.GeoDataFrame:
"""
Cut segments from linestrings at defined segment lengths.
Make sure segment distance is defined in the same CRS as the gdf.
group_cols: list of columns.
The set of columns that represents how segments should be cut.
Ex: for transit route, it's calitp_itp_id-calitp_url_number-route_id
Ex: for highways, it's Route-RouteType-County-District.
Returns a gpd.GeoDataFrame where each linestring row is now multiple
rows (each at the pre-defined segment_distance). A new column called
`segment_sequence` is also created, which differentiates each
new row created (since they share the same group_cols).
"""
EPSG_CODE = gdf.crs.to_epsg()

segmented = gpd.GeoDataFrame()

gdf = gdf[group_cols + ["geometry"]].drop_duplicates().reset_index(drop=True)

for row in gdf.itertuples():
row_geom = getattr(row, "geometry")
segment = create_segments(row_geom, int(segment_distance))
Explode the column that is used to store segments, which is a list.
Take the list and create a row for each element in the list.
We'll do a rough rank so we can order the segments.
Use case:
gdf['segment_geometry'] = gdf.apply(
lambda x:
create_segments(x.geometry, int(segment_length)),
axis=1,
)
to_append = pd.DataFrame()
to_append["geometry"] = segment
for c in group_cols:
to_append[c] = getattr(row, c)
gdf2 = explode_segments(
gdf,
group_cols = ['route_key'],
segment_col = 'segment_geometry'
)
"""
gdf_exploded = gdf.explode(segment_col).reset_index(drop=True)

segmented = pd.concat([segmented, to_append], axis=0, ignore_index=True)
gdf_exploded["temp_index"] = gdf_exploded.index

segmented = segmented.assign(
temp_index=segmented.sort_values(group_cols).reset_index(drop=True).index
gdf_exploded = gdf_exploded.assign(
segment_sequence=(
gdf_exploded.groupby(
group_cols, observed=True, group_keys=False
).temp_index.transform("rank")
- 1
# there are NaNs, but since they're a single segment, just use 0
)
.fillna(0)
.astype("int16")
)

# Why would there be NaNs?
# could this be coming from group_cols...one of the cols has a NaN in some rows?
segmented = segmented[segmented.temp_index.notna()]

segmented = (
segmented.assign(
segment_sequence=(
segmented.groupby(group_cols)["temp_index"].transform("rank") - 1
).astype("int16")
)
.sort_values(group_cols)
# Drop the original line geometry, use the segment geometry only
gdf_exploded2 = (
gdf_exploded.drop(columns=["geometry", "temp_index"])
.rename(columns={segment_col: "geometry"})
.set_geometry("geometry")
.set_crs(gdf_exploded.crs)
.sort_values(group_cols + ["segment_sequence"])
.reset_index(drop=True)
.drop(columns="temp_index")
)

segmented2 = gpd.GeoDataFrame(segmented, crs=f"EPSG:{EPSG_CODE}") # noqa: E231

return segmented2
return gdf_exploded2


return_options = Literal[
Expand Down
34 changes: 26 additions & 8 deletions packages/calitp-data-analysis/calitp_data_analysis/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,19 @@ def sanitize_file_path(file_name: str) -> str:
return str(Path(file_name).stem)


def parse_file_directory(file_name: str) -> str:
"""
Grab the directory of the filename.
For GCS bucket, we do not want '.' as the parent
directory, we want to parse and put together the
GCS filepath correctly.
"""
if str(Path(file_name).parent) != ".":
return str(Path(file_name).parent)
else:
return ""


def geoparquet_gcs_export(
gdf: Union[gpd.GeoDataFrame, dg.GeoDataFrame],
gcs_file_path: str,
Expand All @@ -39,20 +52,25 @@ def geoparquet_gcs_export(
file_name: str
Filename, with or without .parquet.
"""
file_name_sanitized = sanitize_file_path(file_name)
# Parse out file_name into stem (file_name_sanitized)
# and parent (file_directory_sanitized)
file_name_sanitized = Path(sanitize_file_path(file_name))
file_directory_sanitized = parse_file_directory(file_name)

# Make sure GCS path includes the directory we want the file to go to
expanded_gcs = f"{Path(gcs_file_path).joinpath(file_directory_sanitized)}/"
expanded_gcs = str(expanded_gcs).replace("gs:/", "gs://")

if isinstance(gdf, dg.GeoDataFrame):
gdf.to_parquet(
f"{gcs_file_path}{file_name_sanitized}", overwrite=True, **kwargs
)
gdf.to_parquet(f"{expanded_gcs}{file_name_sanitized}", overwrite=True, **kwargs)

else:
gdf.to_parquet(f"./{file_name_sanitized}.parquet", **kwargs)
gdf.to_parquet(f"{file_name_sanitized}.parquet", **kwargs)
fs.put(
f"./{file_name_sanitized}.parquet",
f"{gcs_file_path}{file_name_sanitized}.parquet",
f"{file_name_sanitized}.parquet",
f"{str(expanded_gcs)}{file_name_sanitized}.parquet",
)
os.remove(f"./{file_name_sanitized}.parquet", **kwargs)
os.remove(f"{file_name_sanitized}.parquet", **kwargs)


def geojson_gcs_export(
Expand Down
Loading

0 comments on commit 472d9ea

Please sign in to comment.