Skip to content

Commit

Permalink
Use SQLite for parts database
Browse files Browse the repository at this point in the history
The intent here is to use this with
https://github.com/phiresky/sql.js-httpvfs. There are major missing
pieces:

- the python sqlite on my system doesn't have the fts5 extension
availible, so while sql.js-httpvfs theoretically supports full-text
search, it is not availible
- the sql.js-httpvfs VFS needs to be modified to download the index
file and map between the index & the gzipped data, and use the
Compression Streams API to decompress each chunk
- no work was done on the frontend portion of jlcparts

Quick benchmarks:

- gzip, page_size=1024:  (sqlite3=537M, gzindex= 4.2M, gzparts=172M)
- gzip, page_size=4096:  (sqlite3=634M, gzindex= 1.3M, gzparts=117M)
- gzip, page_size=8192:  (sqlite3=545M, gzindex= 257K, gzparts= 76M)
- gzip, page_size=16384: (sqlite3=513M, gzindex= 257K, gzparts= 56M)
- zstd, page_size=1024:  (sqlite3=537M, zstindex=4.3M, zstparts=81M)
- zstd, page_size=2048:  (sqlite3=566M, zstindex=4.5M, zstparts=85M)
- zstd, page_size=4096:  (sqlite3=634M, zstindex=5.1M, zstparts=84M)
- zstd, page_size=8192:  (sqlite3=545M, zstindex=4.4M, zstparts=86M)
- zstd, page_size=16384: (sqlite3=513M, zstindex=4.1M, zstparts=82M)
  • Loading branch information
flaviut committed Oct 1, 2023
1 parent eb5336a commit 64f1d8a
Showing 1 changed file with 236 additions and 56 deletions.
292 changes: 236 additions & 56 deletions jlcparts/datatables.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
import dataclasses
import re
import os
import shutil
import json
import datetime
import gzip
import itertools
import json
import multiprocessing
from multiprocessing.shared_memory import SharedMemory
import os
import random
import shutil
import sqlite3
import struct
import textwrap
from pathlib import Path
from typing import Dict, Generator, Optional

import click
from jlcparts.partLib import PartLibraryDb
from jlcparts.common import sha256file
import pyzstd

from jlcparts import attributes, descriptionAttributes
from jlcparts.common import sha256file
from jlcparts.partLib import PartLibraryDb


def saveJson(object, filename, hash=False, pretty=False, compress=False):
openFn = gzip.open if compress else open
Expand Down Expand Up @@ -202,7 +210,7 @@ def trimLcscUrl(url, lcsc):

def extractComponent(component, schema):
try:
propertyList = []
result: Dict[str, Optional[str]] = {}
for schItem in schema:
if schItem == "attributes":
# The cache might be in the old format
Expand Down Expand Up @@ -233,31 +241,31 @@ def extractComponent(component, schema):
attr["Manufacturer"] = component.get("manufacturer", None)

attr = dict([normalizeAttribute(key, val) for key, val in attr.items()])
propertyList.append(attr)
result[schItem] = json.dumps(attr)
elif schItem == "img":
images = component.get("extra", {}).get("images", None)
propertyList.append(crushImages(images))
result[schItem] = crushImages(images)
elif schItem == "url":
url = component.get("extra", {}).get("url", None)
propertyList.append(trimLcscUrl(url, component["lcsc"]))
result[schItem] = trimLcscUrl(url, component["lcsc"])
elif schItem in component:
item = component[schItem]
if isinstance(item, str):
item = item.strip()
propertyList.append(item)
else:
item = json.dumps(item)
result[schItem] = item
else:
propertyList.append(None)
return propertyList
result[schItem] = None
return result
except Exception as e:
raise RuntimeError(f"Cannot extract {component['lcsc']}").with_traceback(e.__traceback__)

def buildDatatable(components):

def buildDatatable(components) -> Generator[Dict[str, str], None, None]:
schema = ["lcsc", "mfr", "joints", "description",
"datasheet", "price", "img", "url", "attributes"]
return {
"schema": schema,
"components": [extractComponent(x, schema) for x in components]
}
"datasheet", "price", "img", "url", "attributes", "stock"]
return (extractComponent(component, schema) for component in components)

def buildStocktable(components):
return {component["lcsc"]: component["stock"] for component in components }
Expand All @@ -274,6 +282,39 @@ def clearDir(directory):
shutil.rmtree(file_path)


def _sqlite_fast_cur(path: str) -> sqlite3.Cursor:
conn = sqlite3.connect(path, isolation_level=None)
cursor = conn.cursor()
# very dangerous, but we don't care about data integrity here. Recovery is to
# delete the whole database and rebuild.
cursor.execute("PRAGMA journal_mode=OFF;")
cursor.execute("PRAGMA synchronous=OFF;")
return cursor


INIT_TABLES_SQL = textwrap.dedent("""\
CREATE TABLE categories
(
id INTEGER PRIMARY KEY NOT NULL,
category TEXT NOT NULL,
subcategory TEXT NOT NULL,
UNIQUE (category, subcategory)
);
CREATE TABLE component
(
lcsc TEXT PRIMARY KEY NOT NULL,
category_id INTEGER NOT NULL REFERENCES categories (id),
mfr TEXT NOT NULL,
joints INTEGER NOT NULL,
description TEXT NOT NULL,
datasheet TEXT NOT NULL,
price TEXT NOT NULL, -- JSON
img TEXT, -- JSON
url TEXT,
attributes TEXT NOT NULL, -- JSON
stock INTEGER NOT NULL
);""")

@dataclasses.dataclass
class MapCategoryParams:
libraryPath: str
Expand All @@ -284,6 +325,7 @@ class MapCategoryParams:
subcatName: str


_map_category_cur = None
def _map_category(val: MapCategoryParams):
# Sometimes, JLC PCB doesn't fill in the category names. Ignore such
# components.
Expand All @@ -292,29 +334,133 @@ def _map_category(val: MapCategoryParams):
if val.subcatName.strip() == "":
return None

filebase = val.catName + val.subcatName
filebase = filebase.replace("&", "and").replace("/", "aka")
filebase = re.sub('[^A-Za-z0-9]', '_', filebase)
# each process gets its own database, then we merge them at the end and add indexes
global _map_category_cur
if _map_category_cur is None:
_map_category_cur = _sqlite_fast_cur(os.path.join(val.outdir, f"part_${os.getpid()}.sqlite3"))
_map_category_cur.executescript(INIT_TABLES_SQL)
cur = _map_category_cur

lib = PartLibraryDb(val.libraryPath)
components = lib.getCategoryComponents(val.catName, val.subcatName, stockNewerThan=val.ignoreoldstock)

dataTable = buildDatatable(components)
dataTable.update({"category": val.catName, "subcategory": val.subcatName})
dataHash = saveJson(dataTable, os.path.join(val.outdir, f"{filebase}.json.gz"),
hash=True, compress=True)

stockTable = buildStocktable(components)
stockHash = saveJson(stockTable, os.path.join(val.outdir, f"{filebase}.stock.json"), hash=True)
components = lib.getCategoryComponents(val.catName, val.subcatName,
stockNewerThan=val.ignoreoldstock)

categoryId = random.randint(0, 2**31 - 1)
insertCategory = textwrap.dedent("""\
INSERT INTO categories (id, category, subcategory)
VALUES ($id, $category, $subcategory)""")
cur.execute(
insertCategory,
{
"id": categoryId,
"category": val.catName,
"subcategory": val.subcatName
})

insertComponent = textwrap.dedent("""\
INSERT INTO component (lcsc, category_id, mfr, joints, description, datasheet, price, img, url,
attributes, stock)
VALUES ($lcsc, $category_id, $mfr, $joints, $description, $datasheet, $price, $img, $url,
$attributes, $stock)""")
dataIter = ({
**component,
"category_id": categoryId,
} for component in buildDatatable(components))

while batch := list(itertools.islice(dataIter, 200)):
cur.executemany(insertComponent, batch)
cur.connection.commit()

return {
"catName": val.catName,
"subcatName": val.subcatName,
"sourcename": filebase,
"datahash": dataHash,
"stockhash": stockHash
}


def pagedCompressFile(inputPath, pageSize=1024):
with open(inputPath, 'rb') as infile, \
open(f"{inputPath}.gzpart", 'wb') as outfile, \
open(f"{inputPath}.gzindex", 'wb') as indexfile:
dataPos = 0
compPos = 0

# magic number to indicate index format.
indexfile.write(struct.pack('QQ', 0xb6d881b0d2f0408e, 0x9c7649381fe30e8c))

while True:
chunk = infile.read(pageSize)
if chunk:
compressedChunk = gzip.compress(chunk)
# if chunk is empty, we're at EOF. Write a last index entry to simplify
# index reading logic.
outfile.write(compressedChunk)
indexfile.write(struct.pack('<LL', dataPos, compPos))
dataPos += len(chunk)
compPos += len(compressedChunk)
if not chunk:
break

def compressChunkZstd(args):
c, sharedDictName = args
try:
sharedDict = SharedMemory(name=sharedDictName)
zstd_dict = pyzstd.ZstdDict(sharedDict.buf)
return (c, pyzstd.compress(c, 6, zstd_dict=zstd_dict))
finally:
sharedDict.close()

def pagedCompressFileZstd(inputPath, pageSize=1024, jobs=1):
def trainDict():
with open(inputPath, 'rb') as infile:
dictSize = 64 * 1024
# sample 256k of data every 16MiB
samples = []
while chunk := infile.read(256 * 1024):
samples.append(chunk)
infile.seek(16 * 1024 * 1024 - len(chunk), 1)

zstdDict = pyzstd.train_dict(samples, dict_size=dictSize)
zstdDict = pyzstd.finalize_dict(zstdDict, samples, dict_size=dictSize, level=6)
return zstdDict


with open(inputPath, 'rb') as infile, \
open(f"{inputPath}.zstpart", 'wb') as outfile, \
open(f"{inputPath}.zstindex", 'wb') as indexfile:
dataPos = 0
compPos = 0

print("Training dictionary...")
zstdDict = trainDict()

# magic number to indicate index format.
indexfile.write(struct.pack('QQ', 0x0f4b462afc1e47fc, 0xb6ee9b384955469b))
indexfile.write(zstdDict.dict_content)

try:
sharedDict = SharedMemory(create=True, size=len(zstdDict.dict_content))
sharedDict.buf[:] = zstdDict.dict_content
sharedDictName = sharedDict.name

totalChunks = os.path.getsize(inputPath) // pageSize
with multiprocessing.Pool(jobs or multiprocessing.cpu_count()) as pool:

args = ((chunk, sharedDictName) for chunk in iter(lambda: infile.read(pageSize), b''))
for i, (chunk, compressedChunk) in enumerate(pool.imap(compressChunkZstd, args, chunksize=100)):
if i % (16*1024) == 0:
print(f"{i / totalChunks * 100:.0f} % compressed")
outfile.write(compressedChunk)
indexfile.write(struct.pack('<LL', dataPos, compPos))
dataPos += len(chunk)
compPos += len(compressedChunk)

# write last index entry to simplify index reading logic.
indexfile.write(struct.pack('<LL', dataPos, compPos))
finally:
sharedDict.close()
sharedDict.unlink()


@click.command()
@click.argument("library", type=click.Path(dir_okay=False))
@click.argument("outdir", type=click.Path(file_okay=False))
Expand All @@ -330,9 +476,10 @@ def buildtables(library, outdir, ignoreoldstock, jobs):
Path(outdir).mkdir(parents=True, exist_ok=True)
clearDir(outdir)

outputDb = _sqlite_fast_cur(os.path.join(outdir, "index.sqlite3"))
# fast writes

total = lib.countCategories()
categoryIndex = {}
outputDb.executescript(INIT_TABLES_SQL)

params = []
for (catName, subcategories) in lib.categories().items():
Expand All @@ -341,22 +488,55 @@ def buildtables(library, outdir, ignoreoldstock, jobs):
libraryPath=library, outdir=outdir, ignoreoldstock=ignoreoldstock,
catName=catName, subcatName=subcatName))

with multiprocessing.Pool(jobs or multiprocessing.cpu_count()) as pool:
for i, result in enumerate(pool.imap_unordered(_map_category, params)):
if result is None:
continue
catName, subcatName = result["catName"], result["subcatName"]
print(f"{((i) / total * 100):.2f} % {catName}: {subcatName}")
if catName not in categoryIndex:
categoryIndex[catName] = {}
assert subcatName not in categoryIndex[catName]
categoryIndex[catName][subcatName] = {
"sourcename": result["sourcename"],
"datahash": result["datahash"],
"stockhash": result["stockhash"]
}
index = {
"categories": categoryIndex,
"created": datetime.datetime.now().astimezone().replace(microsecond=0).isoformat()
}
saveJson(index, os.path.join(outdir, "index.json"), hash=True)
total = lib.countCategories()

def report_progress(i, result):
if result is None:
return
catName, subcatName = result["catName"], result["subcatName"]
print(f"{((i) / total * 100):.1f} % {catName}: {subcatName}")

if jobs == 1:
# do everything in the main thread in this case to make debugging easier
for i, param in enumerate(params):
report_progress(i, _map_category(param))
else:
with multiprocessing.Pool(jobs or multiprocessing.cpu_count()) as pool:
for i, result in enumerate(pool.imap_unordered(_map_category, params)):
report_progress(i, result)


pageSize = 2048
outputDb.execute(f"pragma page_size = {pageSize};")

# merge all databases
print("Merging databases...")
for i, filename in enumerate(os.listdir(outdir)):
if not filename.startswith("part_"):
continue
filepath = os.path.join(outdir, filename)
outputDb.execute(f"ATTACH DATABASE '{filepath}' AS db{i};")
outputDb.execute(f"INSERT INTO main.categories SELECT * FROM db{i}.categories;")
outputDb.execute(f"INSERT INTO main.component SELECT * FROM db{i}.component;")
outputDb.execute(f"DETACH DATABASE db{i};")
os.unlink(filepath)

print("Building indexes...")
# create indexes after all data is inserted to speed up the process
outputDb.executescript("""\
CREATE INDEX component_mfr ON component (mfr);
CREATE INDEX categories_category ON categories (category);
CREATE INDEX categories_subcategory ON categories (subcategory);
""")

# optimize db: https://github.com/phiresky/sql.js-httpvfs#usage
print("Optimizing database...")
outputDb.execute("pragma journal_mode = delete;")
outputDb.execute("vacuum;")
outputDb.connection.close()

print("Compressing database...")
pagedCompressFileZstd(
os.path.join(outdir, "index.sqlite3"),
pageSize=1024,
jobs=jobs)

0 comments on commit 64f1d8a

Please sign in to comment.