Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce new merge CLI #19

Merged
merged 1 commit into from
Apr 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ packages = ["sqlelf"]

[project.scripts]
sqlelf = "sqlelf.cli:start"
sqlelf-merge = "sqlelf.tools.merge:start"

[tool.isort]
skip = [".git", "result"]
Expand Down
Empty file added sqlelf/tools/__init__.py
Empty file.
89 changes: 89 additions & 0 deletions sqlelf/tools/merge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import argparse
import os
import sys
from dataclasses import dataclass, field
from typing import TextIO

import apsw


@dataclass
class ProgramArguments:
filenames: list[str] = field(default_factory=list)
output: str = "output.sqlite"


def is_sqlite_file(file: str) -> bool:
"""Tests if the given file is a valid SQLite file"""
try:
with apsw.Connection(file):
return True
except apsw.NotADBError:
return False


def start(args: list[str] = sys.argv[1:], stdin: TextIO = sys.stdin) -> None:
"""
Start the merge CLI

Args:
args: the command line arguments to parse
stdin: the stdin to use if invoking the shell
"""
parser = argparse.ArgumentParser(
prog="sqlelf-merge",
description="Merge multiple sqlelf SQLITE databases into a single one.",
epilog="Brought to you with ♥ by Farid Zakaria",
)
parser.add_argument(
"filenames", nargs="+", metavar="FILE", help="The sqlites file to merge"
)
parser.add_argument(
"-o",
"--output",
help="Output file to write the sqlite merged database to.",
)

program_args: ProgramArguments = parser.parse_args(
args, namespace=ProgramArguments()
)

if not all([os.path.isfile(f) for f in program_args.filenames]):
sys.exit("A provided file does not exist.")

if not all([is_sqlite_file(f) for f in program_args.filenames]):
sys.exit("A provided file is not a valid SQLite file.")

# Take the first file to be the "canonical" database to fetch the list of tables
# TODO(fzakaria): Consider listing from elf.py instead
tables = []
with apsw.Connection(program_args.filenames[0]) as conn:
for row in conn.execute(
"""SELECT name
FROM sqlite_schema
WHERE (name LIKE 'elf_%' OR name LIKE 'dwarf_%')
AND type = 'table'"""
):
tables.append(row[0])

with apsw.Connection(program_args.output) as conn:
# Attach all the databases
for idx, file in enumerate(program_args.filenames):
conn.execute(f"ATTACH DATABASE '{file}' AS DB{idx};")

for table in tables:
sql_union = [
f"SELECT * FROM DB{idx}.{table}"
for idx in range(len(program_args.filenames))
]

sql = f"""
CREATE TABLE {table} AS
""" + " UNION ALL ".join(
sql_union
)
conn.execute(sql)


if __name__ == "__main__":
start()
Loading