Skip to content

Commit

Permalink
Set WAIT 2 for all ALTER TABLE REORGANIZE PARTITION calls (#57)
Browse files Browse the repository at this point in the history
* Set WAIT 2 for all ALTER TABLE REORGANIZE PARTITION calls

* Ensure alter_time_seconds gets updated even on nonzero SQL exits. Thanks @mcpherrinm

* Add a metric for alter_errors and reorder logs to be clearer on large tables
  • Loading branch information
jcjones authored Sep 13, 2022
1 parent 840c62d commit adbe13d
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 36 deletions.
29 changes: 22 additions & 7 deletions partitionmanager/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,15 @@ def do_partition(conf):
help_text="Time in seconds to complete the ALTER command",
type_name="gauge",
)
metrics.describe(
"alter_errors",
help_text="Number of errors observed during ALTER commands",
type_name="counter",
)

all_results = dict()
for table in conf.tables:
time_start = None
try:
table_problems = pm_tap.get_table_compatibility_problems(conf.dbcmd, table)
if table_problems:
Expand All @@ -311,11 +317,13 @@ def do_partition(conf):
if table.partition_period:
duration = table.partition_period

log.info(f"Evaluating {table} (duration={duration})")

positions = pm_tap.get_current_positions(
conf.dbcmd, table, map_data["range_cols"]
)

log.info(f"Evaluating {table} (duration={duration}) (pos={positions})")
log.info(f"{table} (pos={positions})")

cur_pos = partitionmanager.types.Position()
cur_pos.set_position([positions[col] for col in map_data["range_cols"]])
Expand Down Expand Up @@ -344,22 +352,29 @@ def do_partition(conf):
log.info(f"{table} running SQL: {composite_sql_command}")
time_start = datetime.utcnow()
output = conf.dbcmd.run(composite_sql_command)
time_end = datetime.utcnow()

all_results[table.name] = {"sql": composite_sql_command, "output": output}
log.info(f"{table} results: {output}")
metrics.add(
"alter_time_seconds",
table.name,
(time_end - time_start).total_seconds(),
)

except partitionmanager.types.NoEmptyPartitionsAvailableException:
log.warning(
f"Unable to automatically handle {table}: No empty "
"partition is available."
)
except partitionmanager.types.DatabaseCommandException as e:
log.warning("Failed to automatically handle %s: %s", table, e)
metrics.add("alter_errors", table.name, 1)
except (ValueError, Exception) as e:
log.warning("Failed to handle %s: %s", table, e)
metrics.add("alter_errors", table.name, 1)

time_end = datetime.utcnow()
if time_start:
metrics.add(
"alter_time_seconds",
table.name,
(time_end - time_start).total_seconds(),
)

if conf.prometheus_stats_path:
do_stats(conf, metrics=metrics)
Expand Down
16 changes: 9 additions & 7 deletions partitionmanager/cli_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def test_partition_cmd_noop(self):
{
"testtable_noop": {
"sql": (
"ALTER TABLE `testtable_noop` REORGANIZE PARTITION "
"ALTER TABLE `testtable_noop` WAIT 2 REORGANIZE PARTITION "
"`p_20201204` INTO "
"(PARTITION `p_20201112` VALUES LESS THAN (548), "
"PARTITION `p_20201212` VALUES LESS THAN MAXVALUE);"
Expand All @@ -102,7 +102,7 @@ def test_partition_cmd_final(self):
"testtable_commit": {
"output": [],
"sql": (
"ALTER TABLE `testtable_commit` REORGANIZE PARTITION "
"ALTER TABLE `testtable_commit` WAIT 2 REORGANIZE PARTITION "
"`p_20201204` INTO "
"(PARTITION `p_20201112` VALUES LESS THAN (548), "
"PARTITION `p_20201212` VALUES LESS THAN MAXVALUE);"
Expand Down Expand Up @@ -223,10 +223,12 @@ def test_partition_period_seven_days(self):
set(
[
"INFO:partition:Evaluating Table partitioned_last_week "
"(duration=7 days, 0:00:00) (pos={'id': 150})",
"(duration=7 days, 0:00:00)",
"INFO:partition:Table partitioned_last_week (pos={'id': 150})",
"DEBUG:partition:Table partitioned_last_week has no pending SQL updates.",
"INFO:partition:Evaluating Table partitioned_yesterday "
"(duration=7 days, 0:00:00) (pos={'id': 150})",
"(duration=7 days, 0:00:00)",
"INFO:partition:Table partitioned_yesterday (pos={'id': 150})",
"DEBUG:partition:Table partitioned_yesterday has no pending SQL updates.",
]
),
Expand Down Expand Up @@ -500,7 +502,7 @@ def test_migrate_cmd_in(self):
+ "PARTITION BY RANGE (id) (",
"\tPARTITION p_assumed VALUES LESS THAN MAXVALUE",
");",
"ALTER TABLE `partitioned_yesterday_new_20210421` "
"ALTER TABLE `partitioned_yesterday_new_20210421` WAIT 2 "
+ "REORGANIZE PARTITION `p_assumed` INTO (PARTITION "
+ "`p_20210421` VALUES LESS THAN (150), PARTITION "
+ "`p_20210521` VALUES LESS THAN (300), PARTITION "
Expand All @@ -525,7 +527,7 @@ def test_migrate_cmd_in(self):
"ALTER TABLE two_new_20210421 PARTITION BY RANGE (id) (",
"\tPARTITION p_assumed VALUES LESS THAN MAXVALUE",
");",
"ALTER TABLE `two_new_20210421` REORGANIZE PARTITION "
"ALTER TABLE `two_new_20210421` WAIT 2 REORGANIZE PARTITION "
+ "`p_assumed` INTO (PARTITION `p_20210421` VALUES "
+ "LESS THAN (150), PARTITION `p_20210521` VALUES LESS "
+ "THAN (375), PARTITION `p_20210620` VALUES LESS THAN "
Expand Down Expand Up @@ -585,7 +587,7 @@ def test_migrate_cmd_in_unpartitioned_with_override(self):
"ALTER TABLE unpartitioned_new_20210421 PARTITION BY RANGE (id) (",
"\tPARTITION p_assumed VALUES LESS THAN MAXVALUE",
");",
"ALTER TABLE `unpartitioned_new_20210421` REORGANIZE "
"ALTER TABLE `unpartitioned_new_20210421` WAIT 2 REORGANIZE "
+ "PARTITION `p_assumed` INTO (PARTITION `p_20210421` "
+ "VALUES LESS THAN (150), PARTITION `p_20210521` VALUES "
+ "LESS THAN (300), PARTITION `p_20210620` VALUES LESS "
Expand Down
4 changes: 2 additions & 2 deletions partitionmanager/migrate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def test_read_state_info(self):
"ALTER TABLE test_new_20210303 PARTITION BY RANGE (id) (",
"\tPARTITION p_start VALUES LESS THAN MAXVALUE",
");",
"ALTER TABLE `test_new_20210303` REORGANIZE PARTITION `p_start` "
"ALTER TABLE `test_new_20210303` WAIT 2 REORGANIZE PARTITION `p_start` "
+ "INTO (PARTITION `p_20210303` VALUES LESS THAN (156), "
+ "PARTITION `p_20210402` VALUES LESS THAN (2406), PARTITION "
+ "`p_20210502` VALUES LESS THAN MAXVALUE);",
Expand Down Expand Up @@ -187,7 +187,7 @@ def test_read_state_info_map_table(self):
+ "COLUMNS (orderID, authzID) (",
"\tPARTITION p_assumed VALUES LESS THAN (MAXVALUE, MAXVALUE)",
");",
"ALTER TABLE `map_table_new_20210303` REORGANIZE PARTITION "
"ALTER TABLE `map_table_new_20210303` WAIT 2 REORGANIZE PARTITION "
+ "`p_assumed` INTO (PARTITION `p_20210303` VALUES LESS THAN "
+ "(11, 22), PARTITION `p_20210402` VALUES LESS THAN "
+ "(41, 82), PARTITION `p_20210502` VALUES LESS THAN "
Expand Down
26 changes: 17 additions & 9 deletions partitionmanager/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,15 +123,23 @@ def __init__(self, exe):

def run(self, sql_cmd):
logging.debug(f"SubprocessDatabaseCommand executing {sql_cmd}")
result = subprocess.run(
[self.exe, "-X"],
input=sql_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
encoding="UTF-8",
check=True,
)
return XmlResult().parse(result.stdout)
try:
result = subprocess.run(
[self.exe, "-X"],
input=sql_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
encoding="UTF-8",
check=True,
)
return XmlResult().parse(result.stdout)
except subprocess.CalledProcessError as cpe:
logging.error(
"SubprocessDatabaseCommand failed, error code %d", cpe.returncode
)
logging.error("stdout: %s", cpe.stdout)
logging.error("stderr: %s", cpe.stderr)
raise partitionmanager.types.DatabaseCommandException(cpe.stderr)

def db_name(self):
rows = self.run("SELECT DATABASE();")
Expand Down
2 changes: 1 addition & 1 deletion partitionmanager/table_append_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ def generate_sql_reorganize_partition_commands(table, changes):
partition_update = ", ".join(partition_strings)

alter_cmd = (
f"ALTER TABLE `{table.name}` "
f"ALTER TABLE `{table.name}` WAIT 2 "
f"REORGANIZE PARTITION `{modified_partition.old.name}` INTO ({partition_update});"
)

Expand Down
20 changes: 10 additions & 10 deletions partitionmanager/table_append_partition_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ def test_plan_partition_changes_short_names(self):
self.assertEqual(
output,
[
"ALTER TABLE `table` REORGANIZE PARTITION `p_future` INTO "
"ALTER TABLE `table` WAIT 2 REORGANIZE PARTITION `p_future` INTO "
"(PARTITION `p_20210201` VALUES LESS THAN (12960433003), "
"PARTITION `p_20210208` VALUES LESS THAN MAXVALUE);"
],
Expand Down Expand Up @@ -705,7 +705,7 @@ def test_plan_partition_changes_bespoke_names(self):
self.assertEqual(
output,
[
"ALTER TABLE `table` REORGANIZE PARTITION `p_future` INTO "
"ALTER TABLE `table` WAIT 2 REORGANIZE PARTITION `p_future` INTO "
"(PARTITION `p_20210108` VALUES LESS THAN (170), "
"PARTITION `p_20210115` VALUES LESS THAN MAXVALUE);"
],
Expand Down Expand Up @@ -1007,7 +1007,7 @@ def testgenerate_sql_reorganize_partition_commands_single_change(self):
)
),
[
"ALTER TABLE `table` REORGANIZE PARTITION `p_20210102` INTO "
"ALTER TABLE `table` WAIT 2 REORGANIZE PARTITION `p_20210102` INTO "
"(PARTITION `p_20210116` VALUES LESS THAN (542, 190));"
],
)
Expand All @@ -1028,9 +1028,9 @@ def testgenerate_sql_reorganize_partition_commands_two_changes(self):
)
),
[
"ALTER TABLE `table` REORGANIZE PARTITION `p_20210120` INTO "
"ALTER TABLE `table` WAIT 2 REORGANIZE PARTITION `p_20210120` INTO "
"(PARTITION `p_20210214` VALUES LESS THAN (2000));",
"ALTER TABLE `table` REORGANIZE PARTITION `p_20210102` INTO "
"ALTER TABLE `table` WAIT 2 REORGANIZE PARTITION `p_20210102` INTO "
"(PARTITION `p_20210116` VALUES LESS THAN (500));",
],
)
Expand All @@ -1052,7 +1052,7 @@ def testgenerate_sql_reorganize_partition_commands_new_partitions(self):
)
),
[
"ALTER TABLE `table` REORGANIZE PARTITION `p_20210102` INTO "
"ALTER TABLE `table` WAIT 2 REORGANIZE PARTITION `p_20210102` INTO "
"(PARTITION `p_20210102` VALUES LESS THAN (200), "
"PARTITION `p_20210116` VALUES LESS THAN (542), "
"PARTITION `p_20210123` VALUES LESS THAN (662));"
Expand Down Expand Up @@ -1081,7 +1081,7 @@ def testgenerate_sql_reorganize_partition_commands_maintain_new_partition(self):
)
),
[
"ALTER TABLE `table` REORGANIZE PARTITION `future` INTO "
"ALTER TABLE `table` WAIT 2 REORGANIZE PARTITION `future` INTO "
"(PARTITION `p_20210114` VALUES LESS THAN (800), "
"PARTITION `p_20210116` VALUES LESS THAN (1000), "
"PARTITION `p_20210123` VALUES LESS THAN (1200), "
Expand Down Expand Up @@ -1147,9 +1147,9 @@ def test_plan_andgenerate_sql_reorganize_partition_commands_with_future_partitio
self.assertEqual(
list(generate_sql_reorganize_partition_commands(Table("water"), planned)),
[
"ALTER TABLE `water` REORGANIZE PARTITION `future` INTO "
"ALTER TABLE `water` WAIT 2 REORGANIZE PARTITION `future` INTO "
"(PARTITION `p_20210105` VALUES LESS THAN MAXVALUE);",
"ALTER TABLE `water` REORGANIZE PARTITION `p_20210104` INTO "
"ALTER TABLE `water` WAIT 2 REORGANIZE PARTITION `p_20210104` INTO "
"(PARTITION `p_20210102` VALUES LESS THAN (200));",
],
)
Expand Down Expand Up @@ -1211,7 +1211,7 @@ def test_get_pending_sql_reorganize_partition_commands_with_changes(self):
self.assertEqual(
list(cmds),
[
"ALTER TABLE `plushies` REORGANIZE PARTITION `future` INTO "
"ALTER TABLE `plushies` WAIT 2 REORGANIZE PARTITION `future` INTO "
"(PARTITION `p_20210104` VALUES LESS THAN (550), "
"PARTITION `p_20210111` VALUES LESS THAN (900), "
"PARTITION `p_20210118` VALUES LESS THAN MAXVALUE);"
Expand Down
4 changes: 4 additions & 0 deletions partitionmanager/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,3 +603,7 @@ class TableInformationException(Exception):

class NoEmptyPartitionsAvailableException(Exception):
"""Raised if no empty partitions are available to safely modify."""


class DatabaseCommandException(Exception):
"""Raised if the database command failed."""

0 comments on commit adbe13d

Please sign in to comment.