From 98cafe1918c0fe5f94383831833403348e35f702 Mon Sep 17 00:00:00 2001 From: "J.C. Jones" Date: Thu, 29 Feb 2024 12:56:12 -0700 Subject: [PATCH] Refactor `do_partition` to reduce the statement count --- partitionmanager/cli.py | 111 +++++++++++++++++++++------------------- pyproject.toml | 2 +- 2 files changed, 58 insertions(+), 55 deletions(-) diff --git a/partitionmanager/cli.py b/partitionmanager/cli.py index 723b650..fad38b9 100644 --- a/partitionmanager/cli.py +++ b/partitionmanager/cli.py @@ -273,6 +273,56 @@ def migrate_cmd(args): ) MIGRATE_PARSER.set_defaults(func=migrate_cmd) +def _partition_table(conf, log, table, metrics): + table_problems = pm_tap.get_table_compatibility_problems(conf.dbcmd, table) + if table_problems: + log.error(f"Cannot proceed: {table} {table_problems}") + return None + + map_data = pm_tap.get_partition_map(conf.dbcmd, table) + + duration = conf.partition_period + if table.partition_period: + duration = table.partition_period + + log.info(f"Evaluating {table} (duration={duration})") + cur_pos = partitionmanager.database_helpers.get_position_of_table( + conf.dbcmd, table, map_data + ) + + sql_cmds = pm_tap.get_pending_sql_reorganize_partition_commands( + database=conf.dbcmd, + table=table, + partition_list=map_data["partitions"], + current_position=cur_pos, + allowed_lifespan=duration, + num_empty_partitions=conf.num_empty, + evaluation_time=conf.curtime, + ) + + if not sql_cmds: + log.debug(f"{table} has no pending SQL updates.") + return None + + composite_sql_command = "\n".join(sql_cmds) + + if conf.noop: + log.info(f"{table} planned SQL: {composite_sql_command}") + return {"sql": composite_sql_command, "noop": True} + + log.info(f"{table} running SQL: {composite_sql_command}") + + time_start = datetime.utcnow() + output = conf.dbcmd.run(composite_sql_command) + time_end = datetime.utcnow() + metrics.add( + "alter_time_seconds", + table.name, + (time_end - time_start).total_seconds(), + ) + + log.info(f"{table} results: {output}") + return {"sql": composite_sql_command, "output": output} def do_partition(conf): """Produces SQL statements to manage partitions per the supplied configuration. @@ -306,72 +356,25 @@ def do_partition(conf): all_results = dict() for table in conf.tables: - time_start = None try: - table_problems = pm_tap.get_table_compatibility_problems(conf.dbcmd, table) - if table_problems: - log.error(f"Cannot proceed: {table} {table_problems}") - continue - - map_data = pm_tap.get_partition_map(conf.dbcmd, table) - - duration = conf.partition_period - if table.partition_period: - duration = table.partition_period - - log.info(f"Evaluating {table} (duration={duration})") - cur_pos = partitionmanager.database_helpers.get_position_of_table( - conf.dbcmd, table, map_data - ) - - sql_cmds = pm_tap.get_pending_sql_reorganize_partition_commands( - database=conf.dbcmd, - table=table, - partition_list=map_data["partitions"], - current_position=cur_pos, - allowed_lifespan=duration, - num_empty_partitions=conf.num_empty, - evaluation_time=conf.curtime, - ) - - if not sql_cmds: - log.debug(f"{table} has no pending SQL updates.") - continue - - composite_sql_command = "\n".join(sql_cmds) - - if conf.noop: - all_results[table.name] = {"sql": composite_sql_command, "noop": True} - log.info(f"{table} planned SQL: {composite_sql_command}") - continue - - log.info(f"{table} running SQL: {composite_sql_command}") - time_start = datetime.utcnow() - output = conf.dbcmd.run(composite_sql_command) - - all_results[table.name] = {"sql": composite_sql_command, "output": output} - log.info(f"{table} results: {output}") + results = _partition_table(conf, log, table, metrics) + if results: + all_results[table.name] = results except partitionmanager.types.NoEmptyPartitionsAvailableException: log.warning( - f"Unable to automatically handle {table}: No empty " - "partition is available." + "Unable to automatically handle %s: No empty " + "partition is available.", table ) except partitionmanager.types.DatabaseCommandException as e: log.warning("Failed to automatically handle %s: %s", table, e) metrics.add("alter_errors", table.name, 1) + except partitionmanager.types.TableEmptyException: + log.warning("Table %s appears to be empty. Skipping.", table) except (ValueError, Exception) as e: log.warning("Failed to handle %s: %s", table, e) metrics.add("alter_errors", table.name, 1) - time_end = datetime.utcnow() - if time_start: - metrics.add( - "alter_time_seconds", - table.name, - (time_end - time_start).total_seconds(), - ) - if conf.prometheus_stats_path: do_stats(conf, metrics=metrics) return all_results diff --git a/pyproject.toml b/pyproject.toml index ebb928b..8837f47 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -106,7 +106,7 @@ ignore = ["S101"] # Allow assert statements max-complexity = 16 # default is 10 [tool.ruff.lint.per-file-ignores] -"partitionmanager/cli.py" = ["B008"] # TODO: Fix me +"partitionmanager/cli.py" = ["B008", "PERF203"] # TODO: Fix B008, upgrade to Py3.11 for PERF203 "partitionmanager/cli_test.py" = ["S608", "SIM115", "SIM117"] # TODO: Fix SIMs "partitionmanager/sql.py" = ["B904", "S603"] # TODO: Fix S603 "partitionmanager/table_append_partition.py" = ["S608", "SIM102"] # TODO: Fix S608