Skip to content

Commit

Permalink
Feature: add postprocessing steps for masks and geojsons
Browse files Browse the repository at this point in the history
  • Loading branch information
itisacloud committed Dec 14, 2023
1 parent c4cb19b commit 12667e3
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 39 deletions.
17 changes: 12 additions & 5 deletions sketch_map_tool/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from sketch_map_tool.oqt_analyses import get_report
from sketch_map_tool.upload_processing import (
clip,
create_marking_array,
georeference,
merge,
polygonize,
Expand Down Expand Up @@ -146,11 +147,17 @@ def process(
r: BytesIO = db_client_celery.select_file(sketch_map_id) # type: ignore
r: NDArray = to_array(r) # type: ignore
r: NDArray = clip(r, map_frames[uuid]) # type: ignore
r: NDArray = detect_markings(r, yolo_model, sam_predictor) # type: ignore
r: BytesIO = georeference(r, bbox, bgr=False) # type: ignore
r: FeatureCollection = polygonize(r, layer_name=name) # type: ignore
r: FeatureCollection = post_process(r, name) # type: ignore
return r
masks , colors = detect_markings(r, yolo_model, sam_predictor)
geojsons: list[FeatureCollection] = []
for mask,color in zip(masks,colors):
r_: NDArray = create_marking_array([mask], [color], r)
r_: BytesIO = georeference(r_, bbox, bgr=False) # type: ignore
r_: FeatureCollection = polygonize(r_, layer_name=name)
r_t = r_ # type: ignore
r_: FeatureCollection = post_process(r_, name)
geojsons.append(r_)
# type: ignore
return merge(geojsons)

return merge(
[
Expand Down
2 changes: 2 additions & 0 deletions sketch_map_tool/upload_processing/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .clip import clip
from .detect_markings import detect_markings
from .create_marking_array import create_marking_array
from .georeference import georeference
from .merge import merge
from .polygonize import polygonize
Expand All @@ -8,6 +9,7 @@

__all__ = (
"clip",
"create_marking_array",
"detect_markings",
"georeference",
"merge",
Expand Down
26 changes: 26 additions & 0 deletions sketch_map_tool/upload_processing/create_marking_array.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import numpy as np
from numpy._typing import NDArray


def create_marking_array(
masks: list[NDArray],
colors: list[int],
image: NDArray,
) -> NDArray:
"""Create a single color marking array based on masks and colors.
Parameters:
- masks: List of masks representing markings.
- colors: List of colors corresponding to each mask.
- image: Original sketch map frame.
Returns:
NDArray: Single color marking array.
"""
single_color_marking = np.zeros(
(image.shape[0], image.shape[1]),
dtype=np.uint8,
)
for color, mask in zip(colors, masks):
single_color_marking[mask] = color
return single_color_marking
80 changes: 50 additions & 30 deletions sketch_map_tool/upload_processing/detect_markings.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
# -*- coding: utf-8 -*-

import cv2
import numpy as np
from numpy.typing import NDArray
from PIL import Image
from segment_anything import SamPredictor
from ultralytics import YOLO

from sketch_map_tool.upload_processing.create_marking_array import create_marking_array


def detect_markings(
image: NDArray,
Expand All @@ -15,17 +17,17 @@ def detect_markings(
# Sam can only deal with RGB and not RGBA etc.
img = Image.fromarray(image[:, :, ::-1]).convert("RGB")
# masks represent markings
masks, colors = apply_ml_pipeline(img, yolo_model, sam_predictor)
masks, bboxes,colors = apply_ml_pipeline(img, yolo_model, sam_predictor)
colors = [int(c) + 1 for c in colors] # +1 because 0 is background
masks_processed = post_process(masks)
return create_marking_array(masks_processed, colors, image)
masks_processed = post_process(masks,bboxes)
return masks_processed, colors


def apply_ml_pipeline(
image: Image.Image,
yolo_model: YOLO,
sam_predictor: SamPredictor,
) -> tuple[list, list]:
) -> tuple[list, list, list]:
"""Apply the entire machine learning pipeline on an image.
Steps:
Expand All @@ -40,7 +42,7 @@ def apply_ml_pipeline(
"""
bounding_boxes, class_labels = apply_yolo(image, yolo_model)
masks, _ = apply_sam(image, bounding_boxes, sam_predictor)
return masks, class_labels
return masks, bounding_boxes, class_labels


def apply_yolo(
Expand Down Expand Up @@ -82,7 +84,7 @@ def apply_sam(
return masks, scores


def mask_from_bbox(bbox, sam_predictor: SamPredictor) -> tuple:
def mask_from_bbox(bbox:list, sam_predictor: SamPredictor) -> tuple:
"""Generate a mask using SAM (Segment Anything) predictor for a given bounding box.
Returns:
Expand All @@ -92,32 +94,50 @@ def mask_from_bbox(bbox, sam_predictor: SamPredictor) -> tuple:
return masks[0], scores[0]


def post_process(masks: list[NDArray]) -> list[NDArray]:
processed = []
for mask in masks:
processed.append(mask)
return processed
def post_process(masks: list[NDArray], bboxes: list[list[int]]) -> list[NDArray]:
"""
Post-processes masks and bounding boxes to clean up and fill contours.

def create_marking_array(
masks: list[NDArray],
colors: list[int],
image: NDArray,
) -> NDArray:
"""Create a single color marking array based on masks and colors.
This function takes a list of masks and their corresponding bounding boxes, applies
morphological operations to clean the masks, and then fills the contours. The result is
a list of tuples, each containing a cleaned mask and its contours.
Parameters:
- masks: List of masks representing markings.
- colors: List of colors corresponding to each mask.
- image: Original sketch map frame.
- masks (List[NDArray]): A list of masks, where each mask is a NumPy array.
- bboxes (List[List[int]]): A list of bounding boxes, where each bbox is defined as [x1, y1, x2, y2].
Returns:
NDArray: Single color marking array.
- List[Tuple[NDArray, List]]: A list of tuples, where each tuple contains a cleaned mask
and its contours.
"""
single_color_marking = np.zeros(
(image.shape[0], image.shape[1]),
dtype=np.uint8,
)
for color, mask in zip(colors, masks):
single_color_marking[mask] = color
return single_color_marking

# Convert and preprocess masks
preprocessed_masks = np.array([np.vstack(mask) for mask in masks], dtype=np.float32)
preprocessed_masks[preprocessed_masks == 0] = np.nan

# Calculate height and width for each bounding box
bbox_sizes = [np.array([bbox[2] - bbox[0], bbox[3] - bbox[1]]) for bbox in bboxes]

# Initialize the list for cleaned masks
cleaned_masks = []

for i, mask in enumerate(preprocessed_masks):
# Calculate kernel size as 5% of the bounding box dimensions
kernel_size = tuple((bbox_sizes[i] * 0.05).astype(int))
kernel = np.ones(kernel_size, np.uint8)

# Apply morphological closing operation
closed_mask = cv2.morphologyEx(mask.astype('uint8'), cv2.MORPH_CLOSE, kernel)

# Find contours
contours, _ = cv2.findContours(closed_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

# Create a blank canvas for filled contours
filled_contours = np.zeros_like(closed_mask, dtype=np.uint8)
cv2.drawContours(filled_contours, contours, -1, 1, thickness=cv2.FILLED)
cleaned_masks.append(filled_contours.astype(bool))

return cleaned_masks



118 changes: 118 additions & 0 deletions sketch_map_tool/upload_processing/post_process.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
import geojson
from geojson import FeatureCollection
from shapely.geometry import shape
from shapely import Polygon, MultiPolygon
from shapely.geometry import mapping
from shapely.ops import cascaded_union
from shapelysmooth import chaikin_smooth

from sketch_map_tool.definitions import COLORS

from typing import Union

def post_process(fc: FeatureCollection, name: str) -> FeatureCollection:
fc = clean(fc)
fc = enrich(fc, properties={"name": name})
fc = simplify(fc)
fc = smooth(fc)
return fc


Expand Down Expand Up @@ -35,3 +43,113 @@ def enrich(fc: FeatureCollection, properties):
if "color" in feature.properties.keys():
feature.properties["color"] = COLORS[feature.properties["color"]]
return fc


def simplify(fc: FeatureCollection) -> FeatureCollection:
"""
Simplifies the geometries in a GeoJSON FeatureCollection.
This function simplifies the geometries in a given FeatureCollection. It applies a buffer operation
to each geometry based on a percentage of the maximum width, dissolves the geometries based on a
'color' attribute, removes inner rings, and then re-applies a negative buffer to restore the original
size. The function assumes that the 'color' field exists in the properties of the features.
Parameters:
- fc (geojson.FeatureCollection): The FeatureCollection to be simplified.
Returns:
- geojson.FeatureCollection: The simplified FeatureCollection.
"""
features = fc["features"]
properties = features[0]["properties"]

geometries = [shape(feature["geometry"]) for feature in features]

# Buffer operation
buffer_distance_percentage = 0.1
max_width = max(
((geometry.bounds[2] - geometry.bounds[0]) ** 2 + (geometry.bounds[3] - geometry.bounds[1]) ** 2) ** 0.5 for
geometry in geometries) # check for webmercator
buffer_distance = buffer_distance_percentage * max_width
buffered_geometries = [geometry.buffer(buffer_distance) for geometry in geometries]

# Dissolve by color field (assuming there's a "color" field)
try:
dissolved_geometrie = [remove_inner_rings(geometry) for geometry in cascaded_union(buffered_geometries)]
except:
dissolved_geometrie = [remove_inner_rings(geometry) for geometry in [cascaded_union(buffered_geometries)]]

simplified_geometries = [geometry.buffer(-buffer_distance) for geometry in dissolved_geometrie]

# Create a single GeoJSON feature
features = [geojson.Feature(
geometry=mapping(geometry),
properties=properties
) for geometry in simplified_geometries]

# Create a GeoJSON feature collection with the single feature
fc = geojson.FeatureCollection(features)
return fc


def remove_inner_rings(geometry: Polygon | MultiPolygon) -> Polygon | MultiPolygon:
"""
Removes inner rings (holes) from a given Shapely geometry object.
This function checks the type of the geometry object. If it is a Polygon, it creates
a new Polygon with just the exterior ring. If it is a MultiPolygon, it does the same
for each Polygon in it. Other geometry types are not supported and will raise an error.
Parameters:
- geometry (Polygon, MultiPolygon): A Shapely geometry object, either a Polygon or MultiPolygon.
Returns:
- Polygon, MultiPolygon: A Shapely geometry object with inner rings removed.
"""
if geometry.is_empty:
return geometry
elif geometry.type == 'Polygon':
return Polygon(geometry.exterior)
elif geometry.type == 'MultiPolygon':
return MultiPolygon([Polygon(poly.exterior) for poly in geometry.geoms])
else:
raise ValueError("Unsupported geometry type")


def smooth(fc: FeatureCollection) -> FeatureCollection:
"""
Smoothens the polygon geometries in a GeoJSON FeatureCollection.
This function applies a Chaikin smoothing algorithm to each polygon geometry in the given
FeatureCollection. Non-polygon geometries are skipped. The function updates the geometries
while retaining the properties of each feature.
Parameters:
- fc (geojson.FeatureCollection): The FeatureCollection to be smoothed.
Returns:
- geojson.FeatureCollection: The FeatureCollection with smoothed polygon geometries.
"""
features = fc["features"]
updated_features = []

for feature in features:
geometry = feature["geometry"]
properties = feature["properties"]

if geometry["type"] == "Polygon":
geometry = Polygon(geometry["coordinates"][0])
else:
continue # Skip non-polygon geometries

corrected_geometry = chaikin_smooth(geometry)

updated_features.append(geojson.Feature(
geometry=mapping(corrected_geometry),
properties=properties
))

# Create a GeoJSON feature collection with the updated features
fc = geojson.FeatureCollection(updated_features)
return fc

8 changes: 4 additions & 4 deletions tests/integration/upload_processing/test_detect_markings.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ def test_detect_markings(sam_predictor, yolo_model, map_frame_marked):


def test_apply_ml_pipeline(sam_predictor, yolo_model, map_frame_marked):
masks, colors = apply_ml_pipeline(map_frame_marked, yolo_model, sam_predictor)
masks, bboxes ,colors = apply_ml_pipeline(map_frame_marked, yolo_model, sam_predictor)
# TODO: Should the len not be 2? Only two markings are on the input image.
assert len(masks) == len(colors) == 6
assert len(masks) == len(colors)


@pytest.mark.skip("For manuel testing")
Expand All @@ -50,11 +50,11 @@ def test_apply_ml_pipeline_show_masks(
yolo_model,
map_frame_marked,
):
masks, _ = apply_ml_pipeline(map_frame_marked, yolo_model, sam_predictor)
masks, _ ,_ = apply_ml_pipeline(map_frame_marked, yolo_model, sam_predictor)
for mask in masks:
plt.imshow(mask, cmap="viridis", alpha=0.7)
plt.show()


def test_post_process():
assert post_process([]) is not None
assert post_process([],[] ) is not None

0 comments on commit 12667e3

Please sign in to comment.