Skip to content

Commit

Permalink
pipeline stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
kyle-woodward committed May 20, 2024
1 parent 15b4786 commit af6db19
Show file tree
Hide file tree
Showing 4 changed files with 948 additions and 0 deletions.
8 changes: 8 additions & 0 deletions fao_models/beam_pipelines/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import sys
import os

# Get the parent directory path
parent_dir = os.path.dirname(os.path.abspath(__file__))

# Add the parent directory to the Python module search path
sys.path.append(parent_dir)
43 changes: 43 additions & 0 deletions fao_models/beam_pipelines/test_initialPCollection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#%%
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
import unittest
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
import logging
import geopandas
logger = logging.getLogger(__name__)

# load gdf and compute centroid from geometry
gdf = geopandas.read_file('C:\\Users\\kyle\\Downloads\\ALL_centroids_completed_v1_\\ALL_centroids_completed_v1_.shp')
gdf.loc[:,'centroid'] = gdf.geometry.centroid
print(gdf.head())
#%%
# convert centroid (a GeoSeries geometry), to a native python list of lat,lon)
gdf.loc[:,'latlon'] = gdf.centroid.apply(lambda x: [x.y, x.x])
print(gdf.dtypes)
print(gdf.head())
#%%
# construct list of global_id, latlon tuples for the pipeline
features = gdf[['global_id', 'latlon']].values.tolist()
print(features[:5])


#%%
# https://beam.apache.org/documentation/pipelines/test-your-pipeline/#testing-transforms
expected_output = features[:5]
def test_pipe(argv=None, save_main_session=True):
"""Main entry point;"""
# read in a gdf and construct begnning PCollection from gdf in-memory

# do we need to convert each record in gdf to a list or dict?
with TestPipeline(runner=beam.runners.DirectRunner()) as p:

pipe_features = p | beam.Create(features[:5]) # if you change this to features[:6] the test will raise AssertionError
assert_that(pipe_features,equal_to(expected_output), label='check features')


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
test_pipe()
7 changes: 7 additions & 0 deletions fao_models/beam_pipelines/test_pyshp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# works but i don't think we'll be able ot make centroid lat lon with this package easily
import shapefile

input_file = 'C:\\Users\\kyle\\Downloads\\ALL_centroids_completed_v1_\\ALL_centroids_completed_v1_.shp'
sf = shapefile.Reader(input_file)
print(sf.fields)
print(sf.records()[0:10])
Loading

0 comments on commit af6db19

Please sign in to comment.