Skip to content

Commit

Permalink
fix: Conflate features from ODK with OSM
Browse files Browse the repository at this point in the history
  • Loading branch information
rsavoye committed Oct 28, 2023
1 parent 05634ce commit 0d6704b
Showing 1 changed file with 116 additions and 115 deletions.
231 changes: 116 additions & 115 deletions conflator/conflatePOI.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@
import os
from sys import argv
from osm_fieldwork.osmfile import OsmFile
from geojson import Point, Feature, FeatureCollection, dump, Polygon
from geojson import Feature, FeatureCollection, dump
import geojson
import concurrent.futures
import psycopg2
from shapely.geometry import shape, Polygon, mapping
from shapely.geometry import shape, Point, Polygon, mapping
import shapely
from shapely import wkt
import xmltodict
from progress.bar import Bar, PixelBar
from progress.spinner import PixelSpinner
from osm_fieldwork.convert import escape
from osm_rawdata.postgres import uriParser, DatabaseAccess
from osm_rawdata.postgres import uriParser, DatabaseAccess, PostgresClient
from codetiming import Timer
import concurrent.futures
from cpuinfo import get_cpu_info
Expand All @@ -43,12 +43,13 @@


# Instantiate logger
log = logging.getLogger(__name__)
log = logging.getLogger("conflator")

# The number of threads is based on the CPU cores
info = get_cpu_info()
cores = info['count']


class ConflatePOI(object):
def __init__(self,
dburi: str = None,
Expand All @@ -66,14 +67,18 @@ def __init__(self,
(ConflatePOI): An instance of this object
"""
self.data = dict()
self.db = list()
self.db = None
self.tolerance = 7 # Distance in meters for conflating with postgis
self.boundary = boundary
self.analyze = ('building', 'amenity')
if dburi:
for thread in range(0, cores + 1):
db = GeoSupport(dburi)
self.db.append(db)
if boundary:
db.clipDB(boundary, db)
# for thread in range(0, cores + 1):
self.db = GeoSupport(dburi)
# self.db.append(db)
# We only need to clip the database into a new table once
if boundary:
self.db.clipDB(boundary, self.db.db)
self.db.clipDB(boundary, self.db.db, "nodes_view", "nodes")

def overlaps(self,
feature: dict,
Expand Down Expand Up @@ -144,9 +149,65 @@ def overlaps(self,
return {'attrs': attrs, 'tags': tags}
return dict()

def conflateData(self,
data: list,
):
"""
Conflate all the data. This the primary interfacte for conflation.
Args:
odkdata (list): A list of all the entries in the OSM XML input file
Returns:
(dict): The modified features
"""
timer = Timer(text="conflateData() took {seconds:.0f}s")
timer.start()
# Use fuzzy string matching to handle minor issues in the name column,
# which is often used to match an amenity.
if len(self.data) == 0:
self.db.queryDB("CREATE EXTENSION IF NOT EXISTS fuzzystrmatch")
log.debug(f"conflateData() called! {len(data)} features")

# A chunk is a group of threads
entries = len(data)
chunk = round(len(data) / cores)

if entries <= chunk:
result = conflateThread(data, self)
timer.stop()
return result

# Chop the data into a subset for each thread
newdata = list()
future = None
result = None
index = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=cores) as executor:
i = 0
subset = dict()
futures = list()
for key, value in data.items():
subset[key] = value
if i == chunk:
i = 0
result = executor.submit(conflateThread, subset, self)
index += 1
# result.add_done_callback(callback)
futures.append(result)
subset = dict()
i += 1
for future in concurrent.futures.as_completed(futures):
log.debug(f"Waiting for thread to complete..")
# print(f"YYEESS!! {future.result(timeout=10)}")
newdata.append(future.result(timeout=5))
timer.stop()
return newdata
# return alldata

def conflateWay(self,
feature: dict,
dbindex: int,
db: GeoSupport = None,
):
"""
Conflate a POI against all the ways in a postgres view
Expand All @@ -169,12 +230,11 @@ def conflateWay(self,
cleanval = escape(value)
query = f"SELECT osm_id,tags,version,ST_AsText(ST_Centroid(geom)) FROM ways_view WHERE ST_Distance(geom::geography, ST_GeogFromText(\'SRID=4326;{wkt.wkt}\')) < {self.tolerance} AND levenshtein(tags->>'{key}', '{cleanval}') <= 1"
# log.debug(query)
self.postgres[dbindex].dbcursor.execute(query)
try:
result = self.postgres[dbindex].dbcursor.fetchall()
except:
result = list()
# log.warning(f"No results at all for {query}")
if db:
result = db.queryDB(query)
else:
result = self.db.queryDB(query)
log.warning(f"No results at all for {query}")
if len(result) > 0:
hits = True
break
Expand All @@ -199,7 +259,7 @@ def conflateWay(self,

def conflateNode(self,
feature: dict,
dbindex: int,
db: GeoSupport = None,
):
"""
Conflate a POI against all the nodes in the view
Expand All @@ -218,20 +278,20 @@ def conflateNode(self,
result = list()
ratio = 1
for key,value in feature['tags'].items():
if key in self.analyze:
if True: # key in self.analyze:
# print("%s = %s" % (key, value))
# Use a Geography data type to get the answer in meters, which
# is easier to deal with than degress of the earth.
cleanval = escape(value)
query = f"SELECT osm_id,tags,version,ST_AsEWKT(geom) FROM nodes_view WHERE ST_Distance(geom::geography, ST_GeogFromText(\'SRID=4326;{wkt.wkt}\')) < {self.tolerance} AND levenshtein(tags->>'{key}', '{cleanval}') <= {ratio}"
# print(query)
# FIXME: this currently only works with a local database, not underpass yet
self.postgres[dbindex].dbcursor.execute(query)
try:
result = self.postgres[dbindex].dbcursor.fetchall()
except:
result = list()
# log.warning(f"No results at all for {query}")
# FIXME: this currently only works with a local database,
# not underpass yet
if db:
result = db.queryDB(query)
else:
result = self.db.queryDB(query)
# log.warning(f"No results at all for {query}")
if len(result) > 0:
hits = True
break
Expand All @@ -248,66 +308,15 @@ def conflateNode(self,
return {'attrs': attrs, 'tags': tags}
return dict()

def conflateData(self,
data: list,
):
"""
Conflate all the data. This the primary interfacte for conflation.
Args:
odkdata (list): A list of all the entries in the OSM XML input file
Returns:
(dict): The modified features
"""
timer = Timer(text="conflateData() took {seconds:.0f}s")
timer.start()
# Use fuzzy string matching to handle minor issues in the name column,
# which is often used to match an amenity.
if len(self.data) == 0:
self.postgres[0].dbcursor.execute("CREATE EXTENSION IF NOT EXISTS fuzzystrmatch")
log.debug(f"conflateData() called! {len(data)} features")

# A chunk is a group of threads
chunk = round(len(odkdata) / cores)

# Chop the data into a subset for each thread
newdata = list()
future = None
result = None
index = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=cores) as executor:
i = 0
subset = dict()
futures = list()
for key, value in data.items():
subset[key] = value
if i == chunk:
i = 0
result = executor.submit(conflateThread, subset, self, index)
index += 1
# result.add_done_callback(callback)
futures.append(result)
subset = dict()
i += 1
for future in concurrent.futures.as_completed(futures):
log.debug(f"Waiting for thread to complete..")
# print(f"YYEESS!! {future.result(timeout=10)}")
newdata.append(future.result(timeout=5))
timer.stop()
return newdata
# return alldata

def conflateThread(features: dict,
source: str,
dbindex: int,
def conflateThread(features: list,
cp: ConflatePOI,
):
"""
Conflate a subset of the data
Args:
feature (dict): The feature to conflate
source (str): The data source for conflation, file or database
dbindex (int): An index into the array of postgres connections
Returns:
Expand All @@ -322,24 +331,23 @@ def conflateThread(features: dict,
# and look for a possible match with existing data.
for key, value in features.items():
id = int(value['attrs']['id'])
geom = Point((float(value["attrs"]["lon"]), float(value["attrs"]["lat"])))
if not shapely.contains(shape(cp.boundary), geom):
# log.debug(f"Point is not in boundary!")
continue
# Each of the conflation methods take a single feature
# as a parameter, and returns a possible match or a zero
# length dictionary.
if id > 0:
# Any feature ID greater than zero is existing data.
if source.source[:3] != "PG:":
result = source.conflateFile(value)
else:
# Any feature ID less than zero is new data collected
# using geopoint in the XLSForm.
result = source.conflateById(value, dbindex)
# Any feature ID less than zero is new data collected
# using geopoint in the XLSForm.
result = cp.conflateById(value)
elif id < 0:
if source.source[:3] != "PG:":
result = source.conflateFile(value)
else:
result = source.conflateNode(value, dbindex)
if len(result) == 0:
result = source.conflateWay(value, dbindex)
result = cp.conflateNode(value)
if len(result) == 0:

result = cp.conflateWay(value)
if result and len(result) > 0:
# Merge the tags and attributes together, the OSM data and ODK data.
# If no match is found, the ODK data is used to create a new feature.
Expand Down Expand Up @@ -370,20 +378,13 @@ def main():
""",
)
parser.add_argument("-v", "--verbose", action="store_true", help="verbose output")
parser.add_argument("-o", "--outfile", help="Output file from the conflation")
parser.add_argument("-i", "--infile", help="GeoJson or OSM XML file to conflate")
parser.add_argument("-b", "--boundary", help="Boundary polygon to limit the data size")

args, unknown = parser.parse_known_args()
osmdata = None
source = None
if len(unknown) < 2:
parser.print_help()
quit()
else:
osmdata = unknown[0]
source = unknown[1]
parser.add_argument("-o", "--osmuri", required=True, help="OSM Database URI")
parser.add_argument("-i", "--infile", required=True,
help="GeoJson or OSM XML file to conflate")
parser.add_argument("-b", "--boundary",
help="Boundary polygon to limit the data size")

args = parser.parse_args()
# if verbose, dump to the terminal.
if args.verbose:
log.setLevel(logging.DEBUG)
Expand All @@ -395,20 +396,20 @@ def main():
ch.setFormatter(formatter)
log.addHandler(ch)

if args.outfile:
outfile = args.outfile
else:
outfile = os.path.basename(osmdata.replace('.osm', '-foo.osm'))
outfile = os.path.basename(args.infile.replace('.osm', '-foo.osm'))

# This is the existing OSM data, a database or a file
if args.boundary:
extract = OdkMerge(source, args.boundary)
file = open(args.boundary, 'r')
boundary = geojson.load(file)
if 'features' in boundary:
poly = boundary['features'][0]['geometry']
else:
extract = OdkMerge(source)
poly = boundary['geometry']
extract = ConflatePOI(args.osmuri, poly)

if extract:
odkf = OsmFile(outfile) # output file
osm = odkf.loadFile(osmdata) # input file
odkf = OsmFile(outfile)
osm = odkf.loadFile(args.infile)
#odkf.dump()
else:
log.error("No ODK data source specified!")
Expand All @@ -425,9 +426,9 @@ def main():
for feature in entry:
if 'refs' in feature:
feature['refs'] = list()
out.append(odkf.createWay(feature, True))
out.append(odkf.createWay(entry, True))
else:
out.append(odkf.createNode(feature, True))
out.append(odkf.createNode(entry, True))

# out = ""
# for id, feature in osm.items():
Expand Down

0 comments on commit 0d6704b

Please sign in to comment.