Skip to content

Commit

Permalink
Rework metrics db connection management.
Browse files Browse the repository at this point in the history
This removes the use of a process global, and adds support for readonly
connections to help reduce contention on the metrics db with
multithreaded access.

Adding readonly mode, and the initial PR feedback, led to a re-working
of the connection management. Rather than re-use
`lib.database.get_connection()`, we instead implement our own version.
This gives us control, and allows us to efficiently assure appropriate
set up as well as cleanly handle readonly connections.

One small chnage with this - we cannot use an in-memory db in testing
for the metrics db, as `mode=memory` is not compatible with setting
`mode=ro`. Each test still gets their own db, but its on disk.
  • Loading branch information
bloodearnest committed Dec 15, 2023
1 parent e9ea46e commit f7577d1
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 24 deletions.
70 changes: 54 additions & 16 deletions jobrunner/record_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
"""
import json
import logging
import sqlite3
import subprocess
import sys
import threading
import time
from collections import defaultdict

Expand All @@ -28,24 +30,63 @@
)
"""

_conn = None
CONNECTION_CACHE = threading.local()


def ensure_metrics_db():
global _conn
_conn = database.get_connection(config.METRICS_FILE)
_conn.execute("PRAGMA journal_mode = WAL")
_conn.execute(DDL)
def get_connection(readonly=True):
db_file = config.METRICS_FILE

# developer check against using memory dbs, which cannot be used with this
# function, as we need to set mode ourselves
assert not db_file.startswith(
"file:"
), "urls not supported for metrics db - must be path"

if readonly:
db = f"file:{db_file}?mode=ro"
else:
db = f"file:{db_file}?mode=rwc"

cache = CONNECTION_CACHE.__dict__
if db not in cache:
try:
conn = sqlite3.connect(db)
except sqlite3.OperationalError as exc:
# if its readonly, we cannot create file, so fail gracefully.
# Caller should check for conn being None.
if readonly and "unable to open" in str(exc).lower():
return None
raise

# manual transactions
conn.isolation_level = None
# Support dict-like access to rows
conn.row_factory = sqlite3.Row

if not readonly:
conn.execute("PRAGMA journal_mode = WAL")
conn.execute(DDL)

cache[db] = conn

return cache[db]


def read_job_metrics(job_id, **metrics):
if _conn is None:
ensure_metrics_db()
conn = get_connection(readonly=True)

raw_metrics = None

if conn is not None:
try:
raw_metrics = conn.execute(
"SELECT metrics FROM jobs WHERE id = ?",
(job_id,),
).fetchone()
except sqlite3.OperationalError as exc:
if "no such table" not in str(exc).lower():
raise

raw_metrics = _conn.execute(
"SELECT metrics FROM jobs WHERE id = ?",
(job_id,),
).fetchone()
if raw_metrics is None:
metrics = {}
else:
Expand All @@ -54,11 +95,8 @@ def read_job_metrics(job_id, **metrics):


def write_job_metrics(job_id, metrics):
if _conn is None:
ensure_metrics_db()

raw_metrics = json.dumps(metrics)
_conn.execute(
get_connection(readonly=False).execute(
"""
INSERT INTO jobs (id, metrics) VALUES (?, ?)
ON CONFLICT(id) DO UPDATE set metrics = ?
Expand Down
15 changes: 8 additions & 7 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,15 @@ def db(monkeypatch, request):


@pytest.fixture(autouse=True)
def metrics_db(monkeypatch, request):
"""Create a throwaway metrics db."""
record_stats._conn = None
database_file = f"file:metrics-{request.node.name}?mode=memory&cache=shared"
monkeypatch.setattr(config, "METRICS_FILE", database_file)
def metrics_db(monkeypatch, tmp_path, request):
"""Create a throwaway metrics db.
It must be a file, not memory, because we use readonly connections.
"""
db_path = tmp_path / "metrics.db"
monkeypatch.setattr(config, "METRICS_FILE", str(db_path))
yield
database.CONNECTION_CACHE.__dict__.pop(database_file, None)
record_stats._conn = None
record_stats.CONNECTION_CACHE.__dict__.clear()


@dataclass
Expand Down
7 changes: 7 additions & 0 deletions tests/lib/test_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
ensure_db,
ensure_valid_db,
find_one,
get_connection,
insert,
migrate_db,
select_values,
Expand All @@ -17,6 +18,12 @@
from jobrunner.models import Job, State


def test_get_connection():
db = "file:test_get_connection?mode=memory&cache=shared"
conn = get_connection(db)
assert conn is get_connection(db)


def test_basic_roundtrip(tmp_work_dir):
job = Job(
id="foo123",
Expand Down
48 changes: 47 additions & 1 deletion tests/test_record_stats.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,58 @@
import sqlite3
import subprocess
import time

from jobrunner import record_stats
from jobrunner import config, record_stats
from jobrunner.models import State, StatusCode
from tests.conftest import get_trace
from tests.factories import job_factory, metrics_factory


def test_get_connection_readonly():
conn = record_stats.get_connection(readonly=True)
assert conn is None

conn = record_stats.get_connection(readonly=False)
assert conn is record_stats.get_connection(readonly=False) # cached
assert conn.isolation_level is None
assert conn.row_factory is sqlite3.Row
assert conn.execute("PRAGMA journal_mode").fetchone()["journal_mode"] == "wal"
assert (
conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name=?", ("jobs",)
).fetchone()["name"]
== "jobs"
)

ro_conn = record_stats.get_connection(readonly=True)
assert ro_conn is record_stats.get_connection(readonly=True) # cached
assert ro_conn is not conn
assert conn.isolation_level is None
assert conn.row_factory is sqlite3.Row
assert conn.execute("PRAGMA journal_mode").fetchone()["journal_mode"] == "wal"
assert (
conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name=?", ("jobs",)
).fetchone()["name"]
== "jobs"
)


def test_read_write_job_metrics():

assert record_stats.read_job_metrics("id") == {}

# create db file
sqlite3.connect(config.METRICS_FILE)

# possible race condition, no table yet, should still report no metrics
assert record_stats.read_job_metrics("id") == {}

record_stats.write_job_metrics("id", {"test": 1.0})

assert record_stats.read_job_metrics("id") == {"test": 1.0}


def test_record_tick_trace(db, freezer, monkeypatch):
jobs = []
jobs.append(job_factory(status_code=StatusCode.CREATED))
Expand Down

0 comments on commit f7577d1

Please sign in to comment.