From 09a7848a9f94d11a6f7279b704365aad09f63a78 Mon Sep 17 00:00:00 2001 From: Farid Zakaria Date: Sun, 4 Feb 2024 23:21:33 +0000 Subject: [PATCH] Introduce new merge CLI --- pyproject.toml | 1 + sqlelf/tools/__init__.py | 0 sqlelf/tools/merge.py | 89 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 90 insertions(+) create mode 100644 sqlelf/tools/__init__.py create mode 100644 sqlelf/tools/merge.py diff --git a/pyproject.toml b/pyproject.toml index c109a31..fb30d8d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -56,6 +56,7 @@ packages = ["sqlelf"] [project.scripts] sqlelf = "sqlelf.cli:start" +sqlelf-merge = "sqlelf.tools.merge:start" [tool.isort] skip = [".git", "result"] diff --git a/sqlelf/tools/__init__.py b/sqlelf/tools/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/sqlelf/tools/merge.py b/sqlelf/tools/merge.py new file mode 100644 index 0000000..0831dfe --- /dev/null +++ b/sqlelf/tools/merge.py @@ -0,0 +1,89 @@ +import argparse +import os +import sys +from dataclasses import dataclass, field +from typing import TextIO + +import apsw + + +@dataclass +class ProgramArguments: + filenames: list[str] = field(default_factory=list) + output: str = "output.sqlite" + + +def is_sqlite_file(file: str) -> bool: + """Tests if the given file is a valid SQLite file""" + try: + with apsw.Connection(file): + return True + except apsw.NotADBError: + return False + + +def start(args: list[str] = sys.argv[1:], stdin: TextIO = sys.stdin) -> None: + """ + Start the merge CLI + + Args: + args: the command line arguments to parse + stdin: the stdin to use if invoking the shell + """ + parser = argparse.ArgumentParser( + prog="sqlelf-merge", + description="Merge multiple sqlelf SQLITE databases into a single one.", + epilog="Brought to you with ♥ by Farid Zakaria", + ) + parser.add_argument( + "filenames", nargs="+", metavar="FILE", help="The sqlites file to merge" + ) + parser.add_argument( + "-o", + "--output", + help="Output file to write the sqlite merged database to.", + ) + + program_args: ProgramArguments = parser.parse_args( + args, namespace=ProgramArguments() + ) + + if not all([os.path.isfile(f) for f in program_args.filenames]): + sys.exit("A provided file does not exist.") + + if not all([is_sqlite_file(f) for f in program_args.filenames]): + sys.exit("A provided file is not a valid SQLite file.") + + # Take the first file to be the "canonical" database to fetch the list of tables + # TODO(fzakaria): Consider listing from elf.py instead + tables = [] + with apsw.Connection(program_args.filenames[0]) as conn: + for row in conn.execute( + """SELECT name + FROM sqlite_schema + WHERE (name LIKE 'elf_%' OR name LIKE 'dwarf_%') + AND type = 'table'""" + ): + tables.append(row[0]) + + with apsw.Connection(program_args.output) as conn: + # Attach all the databases + for idx, file in enumerate(program_args.filenames): + conn.execute(f"ATTACH DATABASE '{file}' AS DB{idx};") + + for table in tables: + sql_union = [ + f"SELECT * FROM DB{idx}.{table}" + for idx in range(len(program_args.filenames)) + ] + + sql = f""" + CREATE TABLE {table} AS + """ + " UNION ALL ".join( + sql_union + ) + conn.execute(sql) + + +if __name__ == "__main__": + start()