diff --git a/airflow/plugins/operators/scrape_state_geoportal.py b/airflow/plugins/operators/scrape_state_geoportal.py index 62e57df18c..3e9387d88c 100644 --- a/airflow/plugins/operators/scrape_state_geoportal.py +++ b/airflow/plugins/operators/scrape_state_geoportal.py @@ -193,34 +193,30 @@ def execute(self, **kwargs): df = pd.json_normalize(api_content) if self.product == "state_highway_network": - # Select and rename columns - columns = { - "properties.Route": "Route", - "properties.County": "County", - "properties.District": "District", - "properties.RouteType": "RouteType", - "properties.Direction": "Direction", - "geometry.type": "type", - "geometry.coordinates": "coordinates", - } - df = df[list(columns.keys())].rename(columns=columns) + # Select columns to keep + df = df[ + [ + "properties.Route", + "properties.County", + "properties.District", + "properties.RouteType", + "properties.Direction", + "geometry.type", + "geometry.coordinates", + ] + ] + + # Dynamically create a mapping by removing known prefixes + columns = {col: col.split(".")[-1] for col in df.columns} + + # Rename columns using the dynamically created mapping + df = df.rename(columns=columns) # Create new column with WKT format df["wkt_coordinates"] = df.apply( lambda row: to_wkt(row["type"], row["coordinates"]), axis=1 ) - # Select final columns for output - final_columns = [ - "Route", - "County", - "District", - "RouteType", - "Direction", - "wkt_coordinates", - ] - df = df[final_columns] - # Compress the DataFrame content and save it self.gzipped_content = gzip.compress( df.to_json(orient="records", lines=True).encode()