Skip to content

Commit

Permalink
Merge branch 'devel' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
cdbethune committed Aug 16, 2024
2 parents 982a822 + b975870 commit 4c21745
Show file tree
Hide file tree
Showing 68 changed files with 3,565 additions and 1,137 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ build_cdr:

# Target: build
# Description: Builds all components.
build: build_segmentation build_metadata build_points build_georef build_cdr
build: build_segmentation build_metadata build_points build_georef build_text build_cdr

# Target: tag_dev
# Description: Tags images with the dev tag.
Expand Down
12 changes: 7 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@

![example workflow](https://github.com/uncharted-lara/lara-models/actions/workflows/build_test.yml/badge.svg)

## LARA - Layered Atlas Reconstruction Analytics
This repository contains Uncharted's TA1 contributions for DARPA's CriticalMAAS program. The main goals are automated feature extraction and georeferencing of geologic maps.

This repository contains five pipelines:

* [Map Segmentation](pipelines/segmentation/README.md) - detects and extracts the main map area, polygon legend, point/line legend and geologic cross section from maps
* [Map Segmentation](pipelines/segmentation/README.md) - detects and extracts the main map area, polygon legend, point/line legend and geologic cross sections from maps
* [Metadata Extraction](pipelines/metadata_extraction/README.md) - extracts metadata values such as title, author, year and scale from an input map image
* [Point Extraction](pipelines/point_extraction/README.md) - detects and extracts geologic point symbols from an input map image
* [Point Extraction](pipelines/point_extraction/README.md) - detects and extracts the location and orientation of geologic point symbols from an input map image
* [Georeferencing](pipelines/geo_referencing/README.md) - computes an image space to geo space transform given an input map image
* [Text Extraction](pipelines/text_extraction/README.md) - extracts text as individual words, lines or paragraphs/blocks from an input image

The `tasks` directory contains the `pip` installable library of tasks and supporting utilities, with each pipeline found in the `pipelines` directory being composed of these tasks. Each pipeline is itself `pip` installable, and is accompanied by a wrapper to support command line execution (`run_pipeline.py`), and a server wrapper to support execution as a REST service (`run_sever.py`). Scripts to build the server wrapper into a Docker container are also included.


The `tasks` directory contains the `pip` installable library of tasks and supporting utilities, with each pipeline found in the `pipelines` directory being composed of these tasks. Each pipeline is itself `pip` installable, and is accompanied by a wrapper to support command line execution (`run_pipeline.py`), and a server wrapper to support execution as a REST service (`run_server.py`). Scripts to build the server wrapper into a Docker container are also included.

A [Makefile](./Makefile) is also available to handle building and deploying Docker containers for the various LARA pipelines.
Empty file added __init__.py
Empty file.
2 changes: 1 addition & 1 deletion cdr/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ name = "lara-cdr"
version = "0.1.0"
description = "LARA CDR integration supporting both one-off processing and webhook event-driven processing"
readme = "README.md"
dependencies = ["jsons", "flask", "lara-tasks", "mypy-boto3-s3", "rasterio", "ngrok", "pyproj", "coloredlogs"]
dependencies = ["jsons", "flask", "lara-tasks", "mypy-boto3-s3", "ngrok", "pyproj"]

[project.optional-dependencies]
development = [
Expand Down
89 changes: 67 additions & 22 deletions cdr/result_subscriber.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import logging
import os
import pprint
import threading
from time import sleep
from typing import List, Optional
Expand All @@ -16,6 +17,7 @@
from pika.exceptions import AMQPChannelError, AMQPConnectionError
import pika.spec as spec
from pydantic import BaseModel
from regex import P
from cdr.json_log import JSONLog
from cdr.request_publisher import LaraRequestPublisher
from schema.cdr_schemas.feature_results import FeatureResults
Expand All @@ -31,10 +33,10 @@
Request,
RequestResult,
)
from schema.mappers.cdr import get_mapper
from schema.mappers.cdr import GeoreferenceMapper, get_mapper
from tasks.geo_referencing.entities import GeoreferenceResult as LARAGeoreferenceResult
from tasks.metadata_extraction.entities import MetadataExtraction as LARAMetadata
from tasks.point_extraction.entities import MapImage as LARAPoints
from tasks.point_extraction.entities import PointLabels as LARAPoints
from tasks.segmentation.entities import MapSegmentation as LARASegmentation
import datetime

Expand Down Expand Up @@ -79,6 +81,22 @@ class LaraResultSubscriber:
GEOREFERENCE_PIPELINE,
]

# map of pipeline name to system name
PIPELINE_SYSTEM_NAMES = {
SEGMENTATION_PIPELINE: "uncharted-area",
METADATA_PIPELINE: "uncharted-metadata",
POINTS_PIPELINE: "uncharted-points",
GEOREFERENCE_PIPELINE: "uncharted-georeference",
}

# map of pipeline name to system version
PIPELINE_SYSTEM_VERSIONS = {
SEGMENTATION_PIPELINE: "0.0.4",
METADATA_PIPELINE: "0.0.4",
POINTS_PIPELINE: "0.0.4",
GEOREFERENCE_PIPELINE: "0.0.5",
}

def __init__(
self,
request_publisher: Optional[LaraRequestPublisher],
Expand All @@ -87,8 +105,6 @@ def __init__(
cdr_token: str,
output: str,
workdir: str,
system_name: str,
system_version: str,
json_log: JSONLog,
host="localhost",
pipeline_sequence: List[str] = DEFAULT_PIPELINE_SEQUENCE,
Expand All @@ -101,8 +117,6 @@ def __init__(
self._cdr_token = cdr_token
self._workdir = workdir
self._output = output
self._system_name = system_name
self._system_version = system_version
self._json_log = json_log
self._host = host
self._pipeline_sequence = (
Expand Down Expand Up @@ -329,7 +343,11 @@ def _push_georeferencing(self, result: RequestResult):
files_ = []
try:
lara_result = LARAGeoreferenceResult.model_validate(georef_result_raw)
mapper = get_mapper(lara_result, self._system_name, self._system_version)
mapper = get_mapper(
lara_result,
self.PIPELINE_SYSTEM_NAMES[self.GEOREFERENCE_PIPELINE],
self.PIPELINE_SYSTEM_VERSIONS[self.GEOREFERENCE_PIPELINE],
)
cdr_result = mapper.map_to_cdr(lara_result) # type: ignore
assert cdr_result is not None
assert cdr_result.georeference_results is not None
Expand All @@ -342,27 +360,33 @@ def _push_georeferencing(self, result: RequestResult):
assert gcps is not None

logger.info(
f"projecting image {result.image_path} to {output_file_name_full} using crs {projection.crs}"
f"projecting image {result.image_path} to {output_file_name_full} using crs {GeoreferenceMapper.DEFAULT_OUTPUT_CRS}"
)
self._project_georeference(
result.image_path, output_file_name_full, projection.crs, gcps
result.image_path,
output_file_name_full,
GeoreferenceMapper.DEFAULT_OUTPUT_CRS,
gcps,
)

files_.append(
("files", (output_file_name, open(output_file_name_full, "rb")))
)
except:
logger.error(
"bad georeferencing result received so creating an empty result to send to cdr"
except Exception as e:
logger.exception(
"bad georeferencing result received so creating an empty result to send to cdr",
e,
)

# create an empty result to send to cdr
cdr_result = GeoreferenceResults(
cog_id=result.request.image_id,
georeference_results=[],
gcps=[],
system=self._system_name,
system_version=self._system_version,
system=self.PIPELINE_SYSTEM_NAMES[self.GEOREFERENCE_PIPELINE],
system_version=self.PIPELINE_SYSTEM_VERSIONS[
self.GEOREFERENCE_PIPELINE
],
)

assert cdr_result is not None
Expand Down Expand Up @@ -425,7 +449,11 @@ def _push_segmentation(self, result: RequestResult):
cdr_result: Optional[FeatureResults] = None
try:
lara_result = LARASegmentation.model_validate(segmentation_raw_result)
mapper = get_mapper(lara_result, self._system_name, self._system_version)
mapper = get_mapper(
lara_result,
self.PIPELINE_SYSTEM_NAMES[self.SEGMENTATION_PIPELINE],
self.PIPELINE_SYSTEM_VERSIONS[self.SEGMENTATION_PIPELINE],
)
cdr_result = mapper.map_to_cdr(lara_result) # type: ignore
except:
logger.error(
Expand All @@ -443,7 +471,11 @@ def _push_points(self, result: RequestResult):
cdr_result: Optional[FeatureResults] = None
try:
lara_result = LARAPoints.model_validate(points_raw_result)
mapper = get_mapper(lara_result, self._system_name, self._system_version)
mapper = get_mapper(
lara_result,
self.PIPELINE_SYSTEM_NAMES[self.POINTS_PIPELINE],
self.PIPELINE_SYSTEM_VERSIONS[self.POINTS_PIPELINE],
)
cdr_result = mapper.map_to_cdr(lara_result) # type: ignore
except:
logger.error("bad points result received so unable to send results to cdr")
Expand All @@ -462,7 +494,11 @@ def _push_metadata(self, result: RequestResult):
cdr_result: Optional[CogMetaData] = None
try:
lara_result = LARAMetadata.model_validate(metadata_result_raw)
mapper = get_mapper(lara_result, self._system_name, self._system_version)
mapper = get_mapper(
lara_result,
self.PIPELINE_SYSTEM_NAMES[self.METADATA_PIPELINE],
self.PIPELINE_SYSTEM_VERSIONS[self.METADATA_PIPELINE],
)
cdr_result = mapper.map_to_cdr(lara_result) # type: ignore
except Exception as e:
logger.exception(
Expand Down Expand Up @@ -553,10 +589,19 @@ def _cps_to_transform(
]
cps_p = []
for cp in cps:
proj = Transformer.from_crs(cp["crs"], to_crs, always_xy=True)
x_p, y_p = proj.transform(xx=cp["x"], yy=cp["y"])
cps_p.append(
riot.GroundControlPoint(row=cp["row"], col=cp["col"], x=x_p, y=y_p)
)
if cp["crs"] != to_crs:
proj = Transformer.from_crs(cp["crs"], to_crs, always_xy=True)
x_p, y_p = proj.transform(xx=cp["x"], yy=cp["y"])
cps_p.append(
riot.GroundControlPoint(row=cp["row"], col=cp["col"], x=x_p, y=y_p)
)
else:
cps_p.append(
riot.GroundControlPoint(
row=cp["row"], col=cp["col"], x=cp["x"], y=cp["y"]
)
)
print("cps_p:")
pprint.pprint(cps_p)

return riot.from_gcps(cps_p)
84 changes: 45 additions & 39 deletions cdr/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from schema.mappers.cdr import get_mapper
from schema.cdr_schemas.events import MapEventPayload

from typing import Any, Dict, List
from typing import Any, Dict, List, Optional

from util.logging import config_logger

Expand All @@ -34,8 +34,6 @@

CDR_API_TOKEN = os.environ["CDR_API_TOKEN"]
CDR_HOST = "https://api.cdr.land"
CDR_SYSTEM_NAME = "uncharted"
CDR_SYSTEM_VERSION = "0.0.4"
CDR_CALLBACK_SECRET = "maps rock"
APP_PORT = 5001
CDR_EVENT_LOG = "events.log"
Expand All @@ -54,15 +52,13 @@ class Settings:
workdir: str
imagedir: str
output: str
system_name: str
system_version: str
callback_secret: str
callback_url: str
registration_id: str
registration_id: Dict[str, str] = {}
rabbitmq_host: str
json_log: JSONLog
serial: bool
sequence: List[str]
sequence: List[str] = []


def prefetch_image(working_dir: Path, image_id: str, image_url: str) -> None:
Expand Down Expand Up @@ -212,31 +208,42 @@ def process_image(image_id: str, request_publisher: LaraRequestPublisher):


def register_cdr_system():
logger.info(f"registering system {settings.system_name} with cdr")
headers = {"Authorization": f"Bearer {settings.cdr_api_token}"}

registration = {
"name": settings.system_name,
"version": settings.system_version,
"callback_url": settings.callback_url,
"webhook_secret": settings.callback_secret,
# Leave blank if callback url has no auth requirement
# "auth_header": "",
# "auth_token": "",
# Registers for ALL events
"events": [],
}

client = httpx.Client(follow_redirects=True)

r = client.post(
f"{settings.cdr_host}/user/me/register", json=registration, headers=headers
)
for i, pipeline in enumerate(settings.sequence):
system_name = LaraResultSubscriber.PIPELINE_SYSTEM_NAMES[pipeline]
system_version = LaraResultSubscriber.PIPELINE_SYSTEM_VERSIONS[pipeline]
logger.info(f"registering system {system_name} with cdr")
headers = {"Authorization": f"Bearer {settings.cdr_api_token}"}

# register for all events on the first pipeline others can ignore
events: Optional[List[str]] = [] if i == 0 else ["ping"]

registration = {
"name": system_name,
"version": system_version,
"callback_url": settings.callback_url,
"webhook_secret": settings.callback_secret,
# Leave blank if callback url has no auth requirement
# "auth_header": "",
# "auth_token": "",
"events": events,
}

client = httpx.Client(follow_redirects=True)

r = client.post(
f"{settings.cdr_host}/user/me/register", json=registration, headers=headers
)
# check if the request was successful
if r.status_code != 200:
logger.error(f"failed to register system {system_name} with cdr")
logger.error(f"response: {r.text}")
exit(1)

# Log our registration_id such we can delete it when we close the program.
response_raw = r.json()
settings.registration_id = response_raw["id"]
logger.info(f"system {settings.system_name} registered with cdr")
# Log our registration_id such we can delete it when we close the program.
response_raw = r.json()
settings.registration_id[pipeline] = response_raw["id"]
logger.info(f"system {system_name} registered with cdr")


def get_cdr_registrations() -> List[Dict[str, Any]]:
Expand Down Expand Up @@ -266,17 +273,21 @@ def cdr_unregister(registration_id: str):
def cdr_clean_up():
logger.info(f"unregistering system {settings.registration_id} with cdr")
# delete our registered system at CDR on program end
cdr_unregister(settings.registration_id)
logger.info(f"system {settings.registration_id} no longer registered with cdr")
for pipeline in settings.sequence:
cdr_unregister(settings.registration_id[pipeline])
logger.info(f"system {settings.registration_id} no longer registered with cdr")


def cdr_startup(host: str):
# check if already registered and delete existing registrations for this name and token combination
registrations = get_cdr_registrations()
if len(registrations) > 0:
for r in registrations:
if r["name"] == settings.system_name:
cdr_unregister(r["id"])
for pipeline in settings.sequence:
if r["name"] == LaraResultSubscriber.PIPELINE_SYSTEM_NAMES[pipeline]:
logger.info(f"unregistering system {r['name']} with cdr")
cdr_unregister(r["id"])
break

# make it accessible from the outside
settings.callback_url = f"{host}/process_event"
Expand All @@ -302,7 +313,6 @@ def main():

parser = argparse.ArgumentParser()
parser.add_argument("--mode", choices=("process", "host"), required=True)
parser.add_argument("--system", type=str, default=CDR_SYSTEM_NAME)
parser.add_argument("--workdir", type=str, required=True)
parser.add_argument("--imagedir", type=str, required=True)
parser.add_argument("--cog_id", type=str, required=False)
Expand All @@ -322,8 +332,6 @@ def main():
settings.workdir = p.workdir
settings.imagedir = p.imagedir
settings.output = p.output
settings.system_name = p.system
settings.system_version = CDR_SYSTEM_VERSION
settings.callback_secret = CDR_CALLBACK_SECRET
settings.serial = True
settings.sequence = p.sequence
Expand Down Expand Up @@ -362,8 +370,6 @@ def main():
settings.cdr_api_token,
settings.output,
settings.workdir,
settings.system_name,
settings.system_version,
settings.json_log,
host=p.host,
pipeline_sequence=settings.sequence,
Expand Down
10 changes: 7 additions & 3 deletions deploy/vars_example.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@
"georef_workdir": "",
"point_extract_workdir": "",
"segmentation_workdir": "",
"image_workdir": "",
"metadata_workdir": "",
"image_dir": "",
"ngrok_authtoken": "",
"openai_api_key": "",
"google_application_credentials": ""
}
"google_application_credentials": "",
"segmentation_model_weights": "",
"point_model_weights": "",
"tag": ""
}
Loading

0 comments on commit 4c21745

Please sign in to comment.