diff --git a/src/vin/__init__.py b/src/vin/__init__.py index ac58770..fe085b9 100644 --- a/src/vin/__init__.py +++ b/src/vin/__init__.py @@ -20,7 +20,7 @@ from vin.constants import VIN_MODEL_YEAR_CHARACTERS from vin.constants import VIN_POSITION_WEIGHTS from vin.database import DecodedVehicle -from vin.database import VehicleDatabase +from vin.database import lookup_vehicle class DecodingError(Exception): @@ -130,15 +130,13 @@ def _decode_vin(self) -> None: DecodingError: Unable to decode VIN using NHTSA vPIC. """ vehicle: DecodedVehicle = None - db_path = files("vin").joinpath("vehicle.db") - with VehicleDatabase(path=db_path) as db: - model_year = self._decode_model_year() - if model_year > 0: - vehicle = db.lookup_vehicle(self.wmi, self.descriptor, model_year) - else: - vehicle = db.lookup_vehicle(self.wmi, self.descriptor, abs(model_year)) - if not vehicle: - vehicle = db.lookup_vehicle(self.wmi, self.descriptor, abs(model_year) - 30) + model_year = self._decode_model_year() + if model_year > 0: + vehicle = lookup_vehicle(self.wmi, self.descriptor, model_year) + else: + vehicle = lookup_vehicle(self.wmi, self.descriptor, abs(model_year)) + if not vehicle: + vehicle = lookup_vehicle(self.wmi, self.descriptor, abs(model_year) - 30) if vehicle is None: raise DecodingError() diff --git a/src/vin/database.py b/src/vin/database.py index 80654d6..9dc8979 100644 --- a/src/vin/database.py +++ b/src/vin/database.py @@ -2,10 +2,26 @@ import re import sqlite3 from dataclasses import dataclass +from importlib.resources import files log = logging.getLogger(__name__) +DATABASE_PATH = files("vin").joinpath("vehicle.db") + + +def regex(value, pattern) -> bool: + """REGEXP shim for SQLite versions bundled with Python 3.11 and earlier""" + return re.match(pattern, value) is not None + # found = re.match(pattern, value) is not None + # print(f"{value=} {pattern=} {'found' if found else '---'}") + # return found + + +connection = sqlite3.connect(DATABASE_PATH, detect_types=sqlite3.PARSE_DECLTYPES) +connection.row_factory = sqlite3.Row +connection.create_function("REGEXP", 2, regex) + @dataclass class DecodedVehicle: @@ -20,88 +36,57 @@ class DecodedVehicle: truck_type: str -def regex(value, pattern): - """REGEXP shim for SQLite versions bundled with Python 3.11 and earlier""" - found = re.match(pattern, value) is not None - print(f"{value=} {pattern=} {'found' if found else '---'}") - return found - - -class VehicleDatabase: - def __init__(self, path): - """return a SQLite3 database connection""" - assert path.exists() - self._path = path - - def __enter__(self) -> "VehicleDatabase": - """connect to the database - - Build the database and schema if requested. - """ - log.debug(f"Opening database {self._path.absolute()}") - connection = sqlite3.connect(self._path, detect_types=sqlite3.PARSE_DECLTYPES) - connection.row_factory = sqlite3.Row - # version = sqlite3.sqlite_version_info - connection.create_function("REGEXP", 2, regex) - self._connection = connection - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - if self._connection.in_transaction: - log.debug("Auto commit") - self._connection.commit() - self._connection.close() - - def query(self, sql: str, args: tuple = ()) -> list[sqlite3.Row]: - """insert rows and return rowcount""" - cursor = self._connection.cursor() - results = cursor.execute(sql, args).fetchall() - cursor.close() - - # print(sql) - print(args) - for result in results: - print(dict(result)) - - return results - - def lookup_vehicle(self, wmi: str, vds: str, model_year: int) -> DecodedVehicle | None: - """get vehicle details - - Args: - vin: The 17-digit Vehicle Identification Number. - - Returns: - Vehicle: the vehicle details - """ - if results := self.query(sql=LOOKUP_VEHICLE_SQL, args=(wmi, model_year, vds)): - details = {"series": None, "trim": None, "model_year": model_year} - for row in results: - if row["model"] is not None: - details.update( - { - k: row[k] - for k in [ - "manufacturer", - "make", - "model", - "vehicle_type", - "truck_type", - "country", - ] - } - ) - elif row["series"] is not None: - details["series"] = row["series"] - elif row["trim"] is not None: - details["trim"] = row["trim"] - else: - raise Exception( - f"expected model and series WMI {wmi} VDS {vds} " - f"model year {model_year}, but got {row}" - ) - return DecodedVehicle(**details) - return None +def query(sql: str, args: tuple = ()) -> list[sqlite3.Row]: + """insert rows and return rowcount""" + cursor = connection.cursor() + results = cursor.execute(sql, args).fetchall() + cursor.close() + + # print(sql) + print(args) + for result in results: + print(dict(result)) + + return results + + +def lookup_vehicle(wmi: str, vds: str, model_year: int) -> DecodedVehicle | None: + """get vehicle details + + Args: + vin: The 17-digit Vehicle Identification Number. + + Returns: + Vehicle: the vehicle details + """ + if results := query(sql=LOOKUP_VEHICLE_SQL, args=(wmi, model_year, vds)): + details = {"series": None, "trim": None, "model_year": model_year} + for row in results: + if row["model"] is not None: + details.update( + { + k: row[k] + for k in [ + "manufacturer", + "make", + "model", + "vehicle_type", + "truck_type", + "country", + ] + } + ) + elif row["series"] is not None: + details["series"] = row["series"] + elif row["trim"] is not None: + details["trim"] = row["trim"] + else: + raise Exception( + f"expected model and series WMI {wmi} VDS {vds} " + f"model year {model_year}, but got {row}" + ) + return DecodedVehicle(**details) + return None LOOKUP_VEHICLE_SQL = """