diff --git a/.docker_env b/.docker_env index 2141daa..f5c29e5 100644 --- a/.docker_env +++ b/.docker_env @@ -21,8 +21,8 @@ POSTGRES_PASS=admin # If you need a SSL certificate to be respected on the gunicorn level # Paths are local to the container (/app/ssl is mapped in docker-compose) -SSL_CERT='/app/data/ssl/l0t2511a.crt' -SSL_KEY='/app/data/ssl/l0t2511a.key.fixed' +SSL_CERT='' +SSL_KEY='' # Valhalla image exclusive env vars # How many threads to build tiles? Should be two less than available threads diff --git a/.env b/.env index 7c95fc3..e867a75 100644 --- a/.env +++ b/.env @@ -9,6 +9,7 @@ ADMIN_PASS=admin # you can change the directory to wherever you want the data to reside on the host # MUST be an absolute path DATA_DIR=/data/transfer/osm_v2 +TMP_DATA_DIR=/tmp_data/transfer/osm_v2 VALHALLA_URL="http://localhost" diff --git a/.gitignore b/.gitignore index afb241c..0942c25 100644 --- a/.gitignore +++ b/.gitignore @@ -15,4 +15,6 @@ scan*.txt # temp .env_local -.env \ No newline at end of file +.env + +.vscode/ \ No newline at end of file diff --git a/README.md b/README.md index e3311d3..a1b6c39 100644 --- a/README.md +++ b/README.md @@ -41,7 +41,7 @@ curl --location -XPOST 'http://localhost:5000/api/v1/jobs' \ --header 'Content-Type: application/json' \ --data-raw '{ "name": "test", # name needs to be unique for a specific router & provider - "description": "test descr", + "description": "test descr", "bbox": "1.531906,42.559908,1.6325,42.577608", # the bbox as minx,miny,maxx,maxy "provider": "osm", # the dataset provider, needs to be registered in ENABLED_PROVIDERS "update": "true" # whether this package should be updated on every planet build @@ -51,6 +51,7 @@ curl --location -XPOST 'http://localhost:5000/api/v1/jobs' \ After a minute you should have the graph package available in `./data/output/osm_test/`. If not, check the logs of the worker process or the Flask app. The `routing-packager-app` container running the HTTP API has a `supervisor` process running in a loop, which: + - downloads a planet PBF (if it doesn't exist) or updates the planet PBF (if it does exist) - builds a planet Valhalla graph - then updates all graph extracts with a fresh copy @@ -61,9 +62,9 @@ By default, also a fake SMTP server is started, and you can see incoming message ### Graph & OSM updates -Under the hood we're running a `supervisor` instance to control the graph builds. +Under the hood we're running a `supervisor` instance to control the graph builds. -Two instances of the [Valhalla docker image](https://github.com/gis-ops/docker-valhalla) take turns building a new graph from an updated OSM file. Those two graphs are physically separated from each other in subdirectories `$DATA_DIR/osm/8002` & `$DATA_DIR/osm/8003`. +Two instances of the [Valhalla docker image](https://github.com/gis-ops/docker-valhalla) take turns building a new graph from an updated OSM file. Those two graphs are physically separated from each other in subdirectories `$TMP_DATA_DIR/osm/8002` & `$TMP_DATA_DIR/osm/8003`. After each graph build finished, the OSM file is updated for the next graph build. @@ -80,9 +81,9 @@ The app is listening on `/api/v1/jobs` for new `POST` requests to generate some 1. Request is parsed, inserted into the Postgres database and the new entry is immediately returned with a few job details as blank fields. 2. Before returning the response, the graph generation function is queued with `ARQ` in a Redis database to dispatch to a worker. 3. If the worker is currently - - **idle**, the queue will immediately start the graph generation: - - Pull the job entry from the Postgres database - - Update the job's `status` database field along the processing to indicate the current stage - - Zip graph tiles from disk according to the request's bounding box and put the package to `$DATA_DIR/output/`, along with a metadata JSON - - **busy**, the current job will be put in the queue and will be processed once it reaches the queue's head + - **idle**, the queue will immediately start the graph generation: + - Pull the job entry from the Postgres database + - Update the job's `status` database field along the processing to indicate the current stage + - Zip graph tiles from disk according to the request's bounding box and put the package to `$DATA_DIR/output/`, along with a metadata JSON + - **busy**, the current job will be put in the queue and will be processed once it reaches the queue's head 4. Send an email to the requesting user with success or failure notice (including the error message) diff --git a/conf/valhalla.conf b/conf/valhalla.conf index c0f35a2..04fa27d 100644 --- a/conf/valhalla.conf +++ b/conf/valhalla.conf @@ -16,4 +16,4 @@ redirect_stderr=true # stdout_logfile_maxbytes=1MB stdout_logfile=/proc/1/fd/1 stdout_logfile_maxbytes=0 -environment=CONCURRENCY="4",DATA_DIR="/app/data" +environment=CONCURRENCY="4",DATA_DIR="/app/data",TMP_DATA_DIR="/app/tmp_data" diff --git a/docker-compose.yml b/docker-compose.yml index e450741..9d2edbf 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,17 +1,22 @@ volumes: postgis-data: - packages: # do not change any detail of this volume + packages: # do not change any detail of this volume driver: local driver_opts: type: none - device: $DATA_DIR # DATA_DIR is the host directory for the data. It has to be in the environment, e.g. in .env file + device: $DATA_DIR # DATA_DIR is the host directory for the data. It has to be in the environment, e.g. in .env file + o: bind + tmp_packages: # do not change any detail of this volume + driver: local + driver_opts: + type: none + device: $TMP_DATA_DIR # DATA_DIR is the host directory for the data. It has to be in the environment, e.g. in .env file o: bind # It's important it runs in its own private network, also more secure networks: routing-packager: -version: '3.2' services: postgis: image: kartoza/postgis:12.1 @@ -31,7 +36,8 @@ services: restart: always redis: image: redis:6.2 - container_name: routing-packager-redis + container_name: + routing-packager-redis # mostly needed to define the database hosts env_file: - .docker_env @@ -50,7 +56,8 @@ services: - routing-packager volumes: - packages:/app/data - - $PWD/.docker_env:/app/.env # Worker needs access to .env file + - tmp_packages:/app/tmp_data + - $PWD/.docker_env:/app/.env # Worker needs access to .env file depends_on: - postgis - redis @@ -67,7 +74,8 @@ services: - .docker_env volumes: - packages:/app/data - - $PWD/.docker_env:/app/.env # CLI needs access to .env file + - tmp_packages:/app/tmp_data + - $PWD/.docker_env:/app/.env # CLI needs access to .env file - $PWD/static:/app/static # static file for frontend networks: - routing-packager diff --git a/main.py b/main.py index 3d60c2b..3ff2706 100644 --- a/main.py +++ b/main.py @@ -1,3 +1,4 @@ +from contextlib import asynccontextmanager import uvicorn as uvicorn from arq import create_pool from arq.connections import RedisSettings @@ -10,21 +11,22 @@ from routing_packager_app.config import SETTINGS from routing_packager_app.api_v1.models import User -app: FastAPI = create_app() - -@app.on_event("startup") -async def startup_event(): +@asynccontextmanager +async def lifespan(app: FastAPI): SQLModel.metadata.create_all(engine, checkfirst=True) app.state.redis_pool = await create_pool(RedisSettings.from_dsn(SETTINGS.REDIS_URL)) User.add_admin_user(next(get_db())) # create the directories for provider in Providers: - p = SETTINGS.get_data_dir().joinpath(provider.lower()) + p = SETTINGS.get_tmp_data_dir().joinpath(provider.lower()) p.mkdir(exist_ok=True) SETTINGS.get_output_path().mkdir(exist_ok=True) + yield + app.state.redis_pool.shutdown() +app: FastAPI = create_app(lifespan=lifespan) if __name__ == "__main__": uvicorn.run("main:app", host="0.0.0.0", port=5000, reload=True) diff --git a/pyproject.toml b/pyproject.toml index d6ef255..8d7abfb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -2,7 +2,10 @@ name = "routing-graph-packager" version = "1.0.0b" description = "Backend to generate packages of routing graphs for FOSS routing engines in a job queue." -authors = ["Nils Nolde "] +authors = [ + "Nils Nolde ", + "Christian Beiwinkel ", +] license = "MIT" packages = [{ include = "routing_packager_app" }] diff --git a/routing_packager_app/__init__.py b/routing_packager_app/__init__.py index 3379ce3..5e443ba 100644 --- a/routing_packager_app/__init__.py +++ b/routing_packager_app/__init__.py @@ -1,4 +1,6 @@ +from typing import Optional from fastapi import FastAPI +from starlette.types import Lifespan from fastapi.staticfiles import StaticFiles from starlette.middleware.cors import CORSMiddleware from starlette.middleware.gzip import GZipMiddleware @@ -7,7 +9,7 @@ from .config import SETTINGS -def create_app(): +def create_app(lifespan: Optional[Lifespan[FastAPI]]): """ Creates a FastAPI app dynamically. """ @@ -15,7 +17,7 @@ def create_app(): with open(SETTINGS.DESCRIPTION_PATH) as fh: description = fh.read() - app = FastAPI(title="Routing Graph Packager App", description=description) + app = FastAPI(title="Routing Graph Packager App", description=description, lifespan=lifespan) app.mount("/static", StaticFiles(directory="static"), name="static") register_middlewares(app) register_router(app) diff --git a/routing_packager_app/config.py b/routing_packager_app/config.py index 9d76576..737407a 100644 --- a/routing_packager_app/config.py +++ b/routing_packager_app/config.py @@ -25,6 +25,7 @@ class BaseSettings(_BaseSettings): CORS_ORIGINS: List[str] = ["*"] DATA_DIR: Path = BASE_DIR.joinpath("data") + TMP_DATA_DIR: Path = BASE_DIR.joinpath("tmp_data") VALHALLA_URL: str = "http://localhost" ENABLED_PROVIDERS: list[str] = list(CommaSeparatedStrings("osm")) @@ -52,7 +53,7 @@ def get_valhalla_path(self, port: int) -> Path: # pragma: no cover Return the path to the OSM Valhalla instances. """ if port in (8002, 8003): - p = self.get_data_dir().joinpath(Providers.OSM.lower(), str(port)) + p = self.get_tmp_data_dir().joinpath(Providers.OSM.lower(), str(port)) p.mkdir(exist_ok=True, parents=True) return p raise ValueError(f"{port} is not a valid port for Valhalla.") @@ -68,6 +69,15 @@ def get_data_dir(self) -> Path: data_dir = Path("/app/data") return data_dir + + def get_tmp_data_dir(self) -> Path: + tmp_data_dir = self.TMP_DATA_DIR + # if we're inside a docker container, we need to reference the fixed directory instead + # Watch out for CI, also runs within docker + if os.path.isdir("/app") and not os.getenv("CI", None): # pragma: no cover + tmp_data_dir = Path("/app/tmp_data") + + return tmp_data_dir class ProdSettings(BaseSettings): @@ -87,6 +97,7 @@ class TestSettings(BaseSettings): POSTGRES_PASS: str = "admin" DATA_DIR: Path = BASE_DIR.joinpath("tests", "data") + TMP_DATA_DIR: Path = BASE_DIR.joinpath("tests", "tmp_data") ADMIN_EMAIL: str = "admin@example.org" ADMIN_PASS: str = "admin" diff --git a/routing_packager_app/worker.py b/routing_packager_app/worker.py index 6b29265..028406f 100644 --- a/routing_packager_app/worker.py +++ b/routing_packager_app/worker.py @@ -1,7 +1,7 @@ import json import logging import os -from datetime import datetime +from datetime import datetime, timezone from pathlib import Path from arq.connections import RedisSettings @@ -9,8 +9,8 @@ import requests from requests.exceptions import ConnectionError import shutil -from sqlmodel import Session -from starlette.status import HTTP_200_OK, HTTP_500_INTERNAL_SERVER_ERROR, HTTP_301_MOVED_PERMANENTLY +from sqlmodel import Session, select +from starlette.status import HTTP_200_OK, HTTP_404_NOT_FOUND, HTTP_500_INTERNAL_SERVER_ERROR, HTTP_301_MOVED_PERMANENTLY from .api_v1.dependencies import split_bbox from .config import SETTINGS @@ -38,16 +38,30 @@ async def create_package( # Set up the logger where we have access to the user email # and only if there hasn't been one before - user_email = session.query(User).get(user_id).email + statement = select(User).where(User.id == user_id) + results = session.exec(statement).first() + + if results is None: + raise HTTPException( + HTTP_404_NOT_FOUND, + "No user with specified ID found.", + ) + user_email = results.email if not LOGGER.handlers and update is False: handler = AppSmtpHandler(**get_smtp_details([user_email])) handler.setLevel(logging.INFO) LOGGER.addHandler(handler) log_extra = {"user": user_email, "job_id": job_id} - job: Job = session.query(Job).get(job_id) + statement = select(Job).where(Job.id == job_id) + job = session.exec(statement).first() + if job is None: + raise HTTPException( + HTTP_404_NOT_FOUND, + "No job with specified ID found.", + ) job.status = Statuses.COMPRESSING - job.last_started = datetime.utcnow() + job.last_started = datetime.now(timezone.utc) session.commit() succeeded = False @@ -85,7 +99,8 @@ async def create_package( raise HTTPException(404, f"No Valhalla tiles in bbox {bbox}") # zip up the tiles after locking the directory to not be updated right now - lock = current_valhalla_dir.joinpath(".lock") + out_dir = SETTINGS.get_output_path() + lock = out_dir.joinpath(".lock") lock.touch(exist_ok=False) make_zip(tile_paths, current_valhalla_dir, zip_path) lock.unlink(missing_ok=False) @@ -98,7 +113,7 @@ async def create_package( "name": job_name, "description": description, "extent": bbox, - "last_modified": str(datetime.utcnow()), + "last_modified": str(datetime.now(timezone.utc)), } dirname = os.path.dirname(zip_path) fname_sanitized = fname.split(os.extsep, 1)[0] @@ -126,7 +141,7 @@ async def create_package( final_status = Statuses.FAILED # always write the "last_finished" column - job.last_finished = datetime.utcnow() + job.last_finished = datetime.now(timezone.utc) job.status = final_status session.commit() diff --git a/scripts/run_valhalla.sh b/scripts/run_valhalla.sh index dec0d6a..08d8a35 100755 --- a/scripts/run_valhalla.sh +++ b/scripts/run_valhalla.sh @@ -45,12 +45,12 @@ reset_config() { PORT_8002="8002" PORT_8003="8003" -# $DATA_DIR needs to be defined, either by supervisor or the current shell -ELEVATION_DIR="$DATA_DIR/elevation" -VALHALLA_DIR_8002="$DATA_DIR/osm/$PORT_8002" -VALHALLA_DIR_8003="$DATA_DIR/osm/$PORT_8003" +# $TMP_DATA_DIR needs to be defined, either by supervisor or the current shell +ELEVATION_DIR="$TMP_DATA_DIR/elevation" +VALHALLA_DIR_8002="$TMP_DATA_DIR/osm/$PORT_8002" +VALHALLA_DIR_8003="$TMP_DATA_DIR/osm/$PORT_8003" # TODO: change PBF -PBF="/app/data/osm/planet-latest.osm.pbf" +PBF="/app/tmp_data/osm/planet-latest.osm.pbf" # activate the virtual env so the CLI can do its job in the supervisor env . /app/app_venv/bin/activate @@ -94,7 +94,7 @@ while true; do wget -nv https://ftp5.gwdg.de/pub/misc/openstreetmap/planet.openstreetmap.org/pbf/planet-latest.osm.pbf -O "$PBF" || exit 1 # wget -nv https://ftp5.gwdg.de/pub/misc/openstreetmap/download.geofabrik.de/germany-latest.osm.pbf -O "$PBF" || exit 1 # wget -nv https://download.geofabrik.de/europe/iceland-latest.osm.pbf -O "$PBF" || exit 1 - # wget -nv https://download.geofabrik.de/europe/andorra-latest.osm.pbf -O "$PBF" || exit 1 + # wget https://download.geofabrik.de/europe/andorra-latest.osm.pbf -O "$PBF" || exit 1 UPDATE_OSM="False" fi @@ -137,6 +137,8 @@ while true; do echo "INFO: Downloading elevation to $ELEVATION_DIR..." valhalla_build_elevation --from-tiles --decompress -c ${valhalla_config} -v || exit 1 + # debugging with andorra only: + # valhalla_build_elevation --decompress -c ${valhalla_config} -v -b 1,42,2,43 || exit 1 echo "INFO: Enhancing initial tiles with elevation..." valhalla_build_tiles -c "${valhalla_config}" -s enhance -e cleanup "$PBF" || exit 1 diff --git a/scripts/update_osm.sh b/scripts/update_osm.sh index 0d0d362..6b4e71c 100755 --- a/scripts/update_osm.sh +++ b/scripts/update_osm.sh @@ -20,7 +20,7 @@ usage() echo "usage: update_osm.sh --pbf/-p /app/data/osm/planet-latest.osm.pbf" } -pbf=/app/data/osm/planet-latest.osm.pbf +pbf=/app/tmp_data/osm/planet-latest.osm.pbf # Get the arguments while [ "$1" != "" ]; do @@ -38,7 +38,6 @@ while [ "$1" != "" ]; do done echo "$(date "+%Y-%m-%d %H:%M:%S") Updating ${pbf} with the proxy settings: http_proxy: $http_proxy, https_proxy: $https_proxy" - fn=$(basename "${pbf}") pbf_dir=$(dirname "$pbf") pbf_name_updated="updated_${fn}" diff --git a/tests/env b/tests/env index 85e31ba..fbd7cec 100644 --- a/tests/env +++ b/tests/env @@ -4,6 +4,7 @@ ADMIN_PASS=admin # you can change the directory to wherever you want the data to reside on the host # MUST be an absolute path DATA_DIR=${PWD}/tests/data +TMP_DATA_DIR=${PWD}/tests/tmp_data VALHALLA_SERVER_IP="http://localhost" diff --git a/tests/jobs/test_jobs.py b/tests/jobs/test_jobs.py index 2383ca1..586ee10 100644 --- a/tests/jobs/test_jobs.py +++ b/tests/jobs/test_jobs.py @@ -1,7 +1,7 @@ from base64 import b64encode import pytest -from sqlmodel import Session +from sqlmodel import Session, select from routing_packager_app import SETTINGS from routing_packager_app.api_v1.models import Job @@ -28,9 +28,10 @@ def test_post_job(provider, get_client, basic_auth_header, get_session: Session) "provider": provider, }, ) - - job_inst: Job = get_session.query(Job).get(res.json()["id"]) - + statement = select(Job).where(Job.id == res.json()["id"]) + job_inst: Job = get_session.exec(statement).first() + + assert job_inst != None assert job_inst.provider == provider assert job_inst.status == "Queued" assert job_inst.description == DEFAULT_ARGS_POST["description"] diff --git a/tests/users/test_users.py b/tests/users/test_users.py index 7d5e436..88a38c6 100644 --- a/tests/users/test_users.py +++ b/tests/users/test_users.py @@ -114,7 +114,9 @@ def test_get_all_users_not_empty(get_client, basic_auth_header, get_session: Ses search_user = r"^user[3|4|5]@email\.com$" for i in user_ids: - user = get_session.query(User).get(i) + statement = select(User).where(User.id == i) + user = get_session.exec(statement).first() + assert user != None assert re.search(search_user, user.email) @@ -190,5 +192,7 @@ def test_admin_user_created(get_client, get_session: Session): response = get_client.get("/api/v1/users") assert response.json()[0]["email"] == expected_email - admin_user = get_session.query(User).get(1) + statement = select(User).where(User.id == 1) + admin_user = get_session.exec(statement) + assert admin_user != None assert admin_user.email == expected_email