Skip to content

Commit

Permalink
update some deprecated functions, add configurable temp dir for stori…
Browse files Browse the repository at this point in the history
…ng graph data
  • Loading branch information
chrstnbwnkl committed Jun 4, 2024
1 parent eb0eb8d commit 4b229cd
Show file tree
Hide file tree
Showing 16 changed files with 102 additions and 50 deletions.
4 changes: 2 additions & 2 deletions .docker_env
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ POSTGRES_PASS=admin

# If you need a SSL certificate to be respected on the gunicorn level
# Paths are local to the container (/app/ssl is mapped in docker-compose)
SSL_CERT='/app/data/ssl/l0t2511a.crt'
SSL_KEY='/app/data/ssl/l0t2511a.key.fixed'
SSL_CERT=''
SSL_KEY=''

# Valhalla image exclusive env vars
# How many threads to build tiles? Should be two less than available threads
Expand Down
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ ADMIN_PASS=admin
# you can change the directory to wherever you want the data to reside on the host
# MUST be an absolute path
DATA_DIR=/data/transfer/osm_v2
TMP_DATA_DIR=/tmp_data/transfer/osm_v2

VALHALLA_URL="http://localhost"

Expand Down
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@ scan*.txt

# temp
.env_local
.env
.env

.vscode/
17 changes: 9 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ curl --location -XPOST 'http://localhost:5000/api/v1/jobs' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "test", # name needs to be unique for a specific router & provider
"description": "test descr",
"description": "test descr",
"bbox": "1.531906,42.559908,1.6325,42.577608", # the bbox as minx,miny,maxx,maxy
"provider": "osm", # the dataset provider, needs to be registered in ENABLED_PROVIDERS
"update": "true" # whether this package should be updated on every planet build
Expand All @@ -51,6 +51,7 @@ curl --location -XPOST 'http://localhost:5000/api/v1/jobs' \
After a minute you should have the graph package available in `./data/output/osm_test/`. If not, check the logs of the worker process or the Flask app.

The `routing-packager-app` container running the HTTP API has a `supervisor` process running in a loop, which:

- downloads a planet PBF (if it doesn't exist) or updates the planet PBF (if it does exist)
- builds a planet Valhalla graph
- then updates all graph extracts with a fresh copy
Expand All @@ -61,9 +62,9 @@ By default, also a fake SMTP server is started, and you can see incoming message

### Graph & OSM updates

Under the hood we're running a `supervisor` instance to control the graph builds.
Under the hood we're running a `supervisor` instance to control the graph builds.

Two instances of the [Valhalla docker image](https://github.com/gis-ops/docker-valhalla) take turns building a new graph from an updated OSM file. Those two graphs are physically separated from each other in subdirectories `$DATA_DIR/osm/8002` & `$DATA_DIR/osm/8003`.
Two instances of the [Valhalla docker image](https://github.com/gis-ops/docker-valhalla) take turns building a new graph from an updated OSM file. Those two graphs are physically separated from each other in subdirectories `$TMP_DATA_DIR/osm/8002` & `$TMP_DATA_DIR/osm/8003`.

After each graph build finished, the OSM file is updated for the next graph build.

Expand All @@ -80,9 +81,9 @@ The app is listening on `/api/v1/jobs` for new `POST` requests to generate some
1. Request is parsed, inserted into the Postgres database and the new entry is immediately returned with a few job details as blank fields.
2. Before returning the response, the graph generation function is queued with `ARQ` in a Redis database to dispatch to a worker.
3. If the worker is currently
- **idle**, the queue will immediately start the graph generation:
- Pull the job entry from the Postgres database
- Update the job's `status` database field along the processing to indicate the current stage
- Zip graph tiles from disk according to the request's bounding box and put the package to `$DATA_DIR/output/<JOB_NAME>`, along with a metadata JSON
- **busy**, the current job will be put in the queue and will be processed once it reaches the queue's head
- **idle**, the queue will immediately start the graph generation:
- Pull the job entry from the Postgres database
- Update the job's `status` database field along the processing to indicate the current stage
- Zip graph tiles from disk according to the request's bounding box and put the package to `$DATA_DIR/output/<JOB_NAME>`, along with a metadata JSON
- **busy**, the current job will be put in the queue and will be processed once it reaches the queue's head
4. Send an email to the requesting user with success or failure notice (including the error message)
2 changes: 1 addition & 1 deletion conf/valhalla.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ redirect_stderr=true
# stdout_logfile_maxbytes=1MB
stdout_logfile=/proc/1/fd/1
stdout_logfile_maxbytes=0
environment=CONCURRENCY="4",DATA_DIR="/app/data"
environment=CONCURRENCY="4",DATA_DIR="/app/data",TMP_DATA_DIR="/app/tmp_data"
20 changes: 14 additions & 6 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
volumes:
postgis-data:
packages: # do not change any detail of this volume
packages: # do not change any detail of this volume
driver: local
driver_opts:
type: none
device: $DATA_DIR # DATA_DIR is the host directory for the data. It has to be in the environment, e.g. in .env file
device: $DATA_DIR # DATA_DIR is the host directory for the data. It has to be in the environment, e.g. in .env file
o: bind
tmp_packages: # do not change any detail of this volume
driver: local
driver_opts:
type: none
device: $TMP_DATA_DIR # DATA_DIR is the host directory for the data. It has to be in the environment, e.g. in .env file
o: bind

# It's important it runs in its own private network, also more secure
networks:
routing-packager:

version: '3.2'
services:
postgis:
image: kartoza/postgis:12.1
Expand All @@ -31,7 +36,8 @@ services:
restart: always
redis:
image: redis:6.2
container_name: routing-packager-redis
container_name:
routing-packager-redis
# mostly needed to define the database hosts
env_file:
- .docker_env
Expand All @@ -50,7 +56,8 @@ services:
- routing-packager
volumes:
- packages:/app/data
- $PWD/.docker_env:/app/.env # Worker needs access to .env file
- tmp_packages:/app/tmp_data
- $PWD/.docker_env:/app/.env # Worker needs access to .env file
depends_on:
- postgis
- redis
Expand All @@ -67,7 +74,8 @@ services:
- .docker_env
volumes:
- packages:/app/data
- $PWD/.docker_env:/app/.env # CLI needs access to .env file
- tmp_packages:/app/tmp_data
- $PWD/.docker_env:/app/.env # CLI needs access to .env file
- $PWD/static:/app/static # static file for frontend
networks:
- routing-packager
Expand Down
12 changes: 7 additions & 5 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from contextlib import asynccontextmanager
import uvicorn as uvicorn
from arq import create_pool
from arq.connections import RedisSettings
Expand All @@ -10,21 +11,22 @@
from routing_packager_app.config import SETTINGS
from routing_packager_app.api_v1.models import User

app: FastAPI = create_app()


@app.on_event("startup")
async def startup_event():
@asynccontextmanager
async def lifespan(app: FastAPI):
SQLModel.metadata.create_all(engine, checkfirst=True)
app.state.redis_pool = await create_pool(RedisSettings.from_dsn(SETTINGS.REDIS_URL))
User.add_admin_user(next(get_db()))

# create the directories
for provider in Providers:
p = SETTINGS.get_data_dir().joinpath(provider.lower())
p = SETTINGS.get_tmp_data_dir().joinpath(provider.lower())
p.mkdir(exist_ok=True)
SETTINGS.get_output_path().mkdir(exist_ok=True)
yield
app.state.redis_pool.shutdown()

app: FastAPI = create_app(lifespan=lifespan)

if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=5000, reload=True)
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
name = "routing-graph-packager"
version = "1.0.0b"
description = "Backend to generate packages of routing graphs for FOSS routing engines in a job queue."
authors = ["Nils Nolde <[email protected]>"]
authors = [
"Nils Nolde <[email protected]>",
"Christian Beiwinkel <[email protected]>",
]
license = "MIT"
packages = [{ include = "routing_packager_app" }]

Expand Down
6 changes: 4 additions & 2 deletions routing_packager_app/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from typing import Optional
from fastapi import FastAPI
from starlette.types import Lifespan
from fastapi.staticfiles import StaticFiles
from starlette.middleware.cors import CORSMiddleware
from starlette.middleware.gzip import GZipMiddleware
Expand All @@ -7,15 +9,15 @@
from .config import SETTINGS


def create_app():
def create_app(lifespan: Optional[Lifespan[FastAPI]]):
"""
Creates a FastAPI app dynamically.
"""

with open(SETTINGS.DESCRIPTION_PATH) as fh:
description = fh.read()

app = FastAPI(title="Routing Graph Packager App", description=description)
app = FastAPI(title="Routing Graph Packager App", description=description, lifespan=lifespan)
app.mount("/static", StaticFiles(directory="static"), name="static")
register_middlewares(app)
register_router(app)
Expand Down
13 changes: 12 additions & 1 deletion routing_packager_app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class BaseSettings(_BaseSettings):
CORS_ORIGINS: List[str] = ["*"]

DATA_DIR: Path = BASE_DIR.joinpath("data")
TMP_DATA_DIR: Path = BASE_DIR.joinpath("tmp_data")
VALHALLA_URL: str = "http://localhost"

ENABLED_PROVIDERS: list[str] = list(CommaSeparatedStrings("osm"))
Expand Down Expand Up @@ -52,7 +53,7 @@ def get_valhalla_path(self, port: int) -> Path: # pragma: no cover
Return the path to the OSM Valhalla instances.
"""
if port in (8002, 8003):
p = self.get_data_dir().joinpath(Providers.OSM.lower(), str(port))
p = self.get_tmp_data_dir().joinpath(Providers.OSM.lower(), str(port))
p.mkdir(exist_ok=True, parents=True)
return p
raise ValueError(f"{port} is not a valid port for Valhalla.")
Expand All @@ -68,6 +69,15 @@ def get_data_dir(self) -> Path:
data_dir = Path("/app/data")

return data_dir

def get_tmp_data_dir(self) -> Path:
tmp_data_dir = self.TMP_DATA_DIR
# if we're inside a docker container, we need to reference the fixed directory instead
# Watch out for CI, also runs within docker
if os.path.isdir("/app") and not os.getenv("CI", None): # pragma: no cover
tmp_data_dir = Path("/app/tmp_data")

return tmp_data_dir


class ProdSettings(BaseSettings):
Expand All @@ -87,6 +97,7 @@ class TestSettings(BaseSettings):
POSTGRES_PASS: str = "admin"

DATA_DIR: Path = BASE_DIR.joinpath("tests", "data")
TMP_DATA_DIR: Path = BASE_DIR.joinpath("tests", "tmp_data")

ADMIN_EMAIL: str = "[email protected]"
ADMIN_PASS: str = "admin"
Expand Down
33 changes: 24 additions & 9 deletions routing_packager_app/worker.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import json
import logging
import os
from datetime import datetime
from datetime import datetime, timezone
from pathlib import Path

from arq.connections import RedisSettings
from fastapi import HTTPException
import requests
from requests.exceptions import ConnectionError
import shutil
from sqlmodel import Session
from starlette.status import HTTP_200_OK, HTTP_500_INTERNAL_SERVER_ERROR, HTTP_301_MOVED_PERMANENTLY
from sqlmodel import Session, select
from starlette.status import HTTP_200_OK, HTTP_404_NOT_FOUND, HTTP_500_INTERNAL_SERVER_ERROR, HTTP_301_MOVED_PERMANENTLY

from .api_v1.dependencies import split_bbox
from .config import SETTINGS
Expand Down Expand Up @@ -38,16 +38,30 @@ async def create_package(

# Set up the logger where we have access to the user email
# and only if there hasn't been one before
user_email = session.query(User).get(user_id).email
statement = select(User).where(User.id == user_id)
results = session.exec(statement).first()

if results is None:
raise HTTPException(
HTTP_404_NOT_FOUND,
"No user with specified ID found.",
)
user_email = results.email
if not LOGGER.handlers and update is False:
handler = AppSmtpHandler(**get_smtp_details([user_email]))
handler.setLevel(logging.INFO)
LOGGER.addHandler(handler)
log_extra = {"user": user_email, "job_id": job_id}

job: Job = session.query(Job).get(job_id)
statement = select(Job).where(Job.id == job_id)
job = session.exec(statement).first()
if job is None:
raise HTTPException(
HTTP_404_NOT_FOUND,
"No job with specified ID found.",
)
job.status = Statuses.COMPRESSING
job.last_started = datetime.utcnow()
job.last_started = datetime.now(timezone.utc)
session.commit()

succeeded = False
Expand Down Expand Up @@ -85,7 +99,8 @@ async def create_package(
raise HTTPException(404, f"No Valhalla tiles in bbox {bbox}")

# zip up the tiles after locking the directory to not be updated right now
lock = current_valhalla_dir.joinpath(".lock")
out_dir = SETTINGS.get_output_path()
lock = out_dir.joinpath(".lock")
lock.touch(exist_ok=False)
make_zip(tile_paths, current_valhalla_dir, zip_path)
lock.unlink(missing_ok=False)
Expand All @@ -98,7 +113,7 @@ async def create_package(
"name": job_name,
"description": description,
"extent": bbox,
"last_modified": str(datetime.utcnow()),
"last_modified": str(datetime.now(timezone.utc)),
}
dirname = os.path.dirname(zip_path)
fname_sanitized = fname.split(os.extsep, 1)[0]
Expand Down Expand Up @@ -126,7 +141,7 @@ async def create_package(
final_status = Statuses.FAILED

# always write the "last_finished" column
job.last_finished = datetime.utcnow()
job.last_finished = datetime.now(timezone.utc)
job.status = final_status
session.commit()

Expand Down
14 changes: 8 additions & 6 deletions scripts/run_valhalla.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ reset_config() {

PORT_8002="8002"
PORT_8003="8003"
# $DATA_DIR needs to be defined, either by supervisor or the current shell
ELEVATION_DIR="$DATA_DIR/elevation"
VALHALLA_DIR_8002="$DATA_DIR/osm/$PORT_8002"
VALHALLA_DIR_8003="$DATA_DIR/osm/$PORT_8003"
# $TMP_DATA_DIR needs to be defined, either by supervisor or the current shell
ELEVATION_DIR="$TMP_DATA_DIR/elevation"
VALHALLA_DIR_8002="$TMP_DATA_DIR/osm/$PORT_8002"
VALHALLA_DIR_8003="$TMP_DATA_DIR/osm/$PORT_8003"
# TODO: change PBF
PBF="/app/data/osm/planet-latest.osm.pbf"
PBF="/app/tmp_data/osm/planet-latest.osm.pbf"

# activate the virtual env so the CLI can do its job in the supervisor env
. /app/app_venv/bin/activate
Expand Down Expand Up @@ -94,7 +94,7 @@ while true; do
wget -nv https://ftp5.gwdg.de/pub/misc/openstreetmap/planet.openstreetmap.org/pbf/planet-latest.osm.pbf -O "$PBF" || exit 1
# wget -nv https://ftp5.gwdg.de/pub/misc/openstreetmap/download.geofabrik.de/germany-latest.osm.pbf -O "$PBF" || exit 1
# wget -nv https://download.geofabrik.de/europe/iceland-latest.osm.pbf -O "$PBF" || exit 1
# wget -nv https://download.geofabrik.de/europe/andorra-latest.osm.pbf -O "$PBF" || exit 1
# wget https://download.geofabrik.de/europe/andorra-latest.osm.pbf -O "$PBF" || exit 1
UPDATE_OSM="False"
fi

Expand Down Expand Up @@ -137,6 +137,8 @@ while true; do

echo "INFO: Downloading elevation to $ELEVATION_DIR..."
valhalla_build_elevation --from-tiles --decompress -c ${valhalla_config} -v || exit 1
# debugging with andorra only:
# valhalla_build_elevation --decompress -c ${valhalla_config} -v -b 1,42,2,43 || exit 1

echo "INFO: Enhancing initial tiles with elevation..."
valhalla_build_tiles -c "${valhalla_config}" -s enhance -e cleanup "$PBF" || exit 1
Expand Down
3 changes: 1 addition & 2 deletions scripts/update_osm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ usage()
echo "usage: update_osm.sh --pbf/-p /app/data/osm/planet-latest.osm.pbf"
}

pbf=/app/data/osm/planet-latest.osm.pbf
pbf=/app/tmp_data/osm/planet-latest.osm.pbf

# Get the arguments
while [ "$1" != "" ]; do
Expand All @@ -38,7 +38,6 @@ while [ "$1" != "" ]; do
done

echo "$(date "+%Y-%m-%d %H:%M:%S") Updating ${pbf} with the proxy settings: http_proxy: $http_proxy, https_proxy: $https_proxy"

fn=$(basename "${pbf}")
pbf_dir=$(dirname "$pbf")
pbf_name_updated="updated_${fn}"
Expand Down
1 change: 1 addition & 0 deletions tests/env
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ ADMIN_PASS=admin
# you can change the directory to wherever you want the data to reside on the host
# MUST be an absolute path
DATA_DIR=${PWD}/tests/data
TMP_DATA_DIR=${PWD}/tests/tmp_data

VALHALLA_SERVER_IP="http://localhost"

Expand Down
Loading

0 comments on commit 4b229cd

Please sign in to comment.