Skip to content

Commit

Permalink
Refactor do_partition to reduce the statement count
Browse files Browse the repository at this point in the history
  • Loading branch information
jcjones committed Feb 29, 2024
1 parent f03e3c1 commit 98cafe1
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 55 deletions.
111 changes: 57 additions & 54 deletions partitionmanager/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,56 @@ def migrate_cmd(args):
)
MIGRATE_PARSER.set_defaults(func=migrate_cmd)

def _partition_table(conf, log, table, metrics):
table_problems = pm_tap.get_table_compatibility_problems(conf.dbcmd, table)
if table_problems:
log.error(f"Cannot proceed: {table} {table_problems}")
return None

map_data = pm_tap.get_partition_map(conf.dbcmd, table)

duration = conf.partition_period
if table.partition_period:
duration = table.partition_period

log.info(f"Evaluating {table} (duration={duration})")
cur_pos = partitionmanager.database_helpers.get_position_of_table(
conf.dbcmd, table, map_data
)

sql_cmds = pm_tap.get_pending_sql_reorganize_partition_commands(
database=conf.dbcmd,
table=table,
partition_list=map_data["partitions"],
current_position=cur_pos,
allowed_lifespan=duration,
num_empty_partitions=conf.num_empty,
evaluation_time=conf.curtime,
)

if not sql_cmds:
log.debug(f"{table} has no pending SQL updates.")
return None

composite_sql_command = "\n".join(sql_cmds)

if conf.noop:
log.info(f"{table} planned SQL: {composite_sql_command}")
return {"sql": composite_sql_command, "noop": True}

log.info(f"{table} running SQL: {composite_sql_command}")

time_start = datetime.utcnow()
output = conf.dbcmd.run(composite_sql_command)
time_end = datetime.utcnow()
metrics.add(
"alter_time_seconds",
table.name,
(time_end - time_start).total_seconds(),
)

log.info(f"{table} results: {output}")
return {"sql": composite_sql_command, "output": output}

def do_partition(conf):
"""Produces SQL statements to manage partitions per the supplied configuration.
Expand Down Expand Up @@ -306,72 +356,25 @@ def do_partition(conf):

all_results = dict()
for table in conf.tables:
time_start = None
try:
table_problems = pm_tap.get_table_compatibility_problems(conf.dbcmd, table)
if table_problems:
log.error(f"Cannot proceed: {table} {table_problems}")
continue

map_data = pm_tap.get_partition_map(conf.dbcmd, table)

duration = conf.partition_period
if table.partition_period:
duration = table.partition_period

log.info(f"Evaluating {table} (duration={duration})")
cur_pos = partitionmanager.database_helpers.get_position_of_table(
conf.dbcmd, table, map_data
)

sql_cmds = pm_tap.get_pending_sql_reorganize_partition_commands(
database=conf.dbcmd,
table=table,
partition_list=map_data["partitions"],
current_position=cur_pos,
allowed_lifespan=duration,
num_empty_partitions=conf.num_empty,
evaluation_time=conf.curtime,
)

if not sql_cmds:
log.debug(f"{table} has no pending SQL updates.")
continue

composite_sql_command = "\n".join(sql_cmds)

if conf.noop:
all_results[table.name] = {"sql": composite_sql_command, "noop": True}
log.info(f"{table} planned SQL: {composite_sql_command}")
continue

log.info(f"{table} running SQL: {composite_sql_command}")
time_start = datetime.utcnow()
output = conf.dbcmd.run(composite_sql_command)

all_results[table.name] = {"sql": composite_sql_command, "output": output}
log.info(f"{table} results: {output}")
results = _partition_table(conf, log, table, metrics)
if results:
all_results[table.name] = results

except partitionmanager.types.NoEmptyPartitionsAvailableException:
log.warning(
f"Unable to automatically handle {table}: No empty "
"partition is available."
"Unable to automatically handle %s: No empty "
"partition is available.", table
)
except partitionmanager.types.DatabaseCommandException as e:
log.warning("Failed to automatically handle %s: %s", table, e)
metrics.add("alter_errors", table.name, 1)
except partitionmanager.types.TableEmptyException:
log.warning("Table %s appears to be empty. Skipping.", table)
except (ValueError, Exception) as e:
log.warning("Failed to handle %s: %s", table, e)
metrics.add("alter_errors", table.name, 1)

time_end = datetime.utcnow()
if time_start:
metrics.add(
"alter_time_seconds",
table.name,
(time_end - time_start).total_seconds(),
)

if conf.prometheus_stats_path:
do_stats(conf, metrics=metrics)
return all_results
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ ignore = ["S101"] # Allow assert statements
max-complexity = 16 # default is 10

[tool.ruff.lint.per-file-ignores]
"partitionmanager/cli.py" = ["B008"] # TODO: Fix me
"partitionmanager/cli.py" = ["B008", "PERF203"] # TODO: Fix B008, upgrade to Py3.11 for PERF203
"partitionmanager/cli_test.py" = ["S608", "SIM115", "SIM117"] # TODO: Fix SIMs
"partitionmanager/sql.py" = ["B904", "S603"] # TODO: Fix S603
"partitionmanager/table_append_partition.py" = ["S608", "SIM102"] # TODO: Fix S608
Expand Down

0 comments on commit 98cafe1

Please sign in to comment.