Skip to content

Commit

Permalink
MDB-33721: better logging
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrey Kopylov committed Feb 9, 2025
1 parent 3105ee9 commit 30aa761
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 59 deletions.
7 changes: 7 additions & 0 deletions development.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,10 @@ make check
```shell
TEST_ARGS='-i archive.feature' make check
```

## Debug
```shell
export DEBUG=true

TEST_ARGS='-i cascade.feature -t @fail_replication_source' make check
```
2 changes: 1 addition & 1 deletion src/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def init_logging(config):
"""
level = getattr(logging, config.get('global', 'log_level').upper())
logging.getLogger('kazoo').setLevel(logging.WARN)
logging.basicConfig(level=level, format='%(asctime)s %(levelname)s:\t%(message)s')
logging.basicConfig(level=level, format='%(asctime)s %(levelname)-7s:\t%(message)s')


def start(config):
Expand Down
2 changes: 1 addition & 1 deletion src/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ def wrapper(*args, **kwargs):
else:
current_sleep = min(sleep_time, retrying_end - time.time())
if current_sleep > 0:
logging.info(f'Waiting {current_sleep} for {event_name}')
logging.debug(f'Waiting {current_sleep:.2f} for {event_name}'.format())
time.sleep(current_sleep)
sleep_time = 1.1 * sleep_time + 0.1 * random.random()
logging.warning('Retrying timeout expired.')
Expand Down
59 changes: 23 additions & 36 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class pgconsul(object):
DESTRUCTIVE_OPERATIONS = ['rewind']

def __init__(self, **kwargs):
logging.debug('Initializing main class.')
logging.info('Initializing main class.')
self.config = kwargs.get('config')
self._cmd_manager = CommandManager(self.config)
self._should_run = True
Expand Down Expand Up @@ -270,14 +270,14 @@ def run_iteration(self, my_prio):
if not terminal_state:
logging.debug('Database is starting up or shutting down')
role = self.db.get_role()
logging.debug('Role: %s', str(role))
logging.info('Role: %s ----------', str(role))

db_state = self.db.get_state()
self.notifier.notify()
logging.debug(db_state)
logging.debug('db_state: {}'.format(db_state))
try:
zk_state = self.zk.get_state()
logging.debug(zk_state)
logging.debug('zk_state: {}'.format(zk_state))
helpers.write_status_file(db_state, zk_state, self.config.get('global', 'working_dir'))
self.update_maintenance_status(role, db_state.get('primary_fqdn'))
self._zk_alive_refresh(role, db_state, zk_state)
Expand Down Expand Up @@ -334,7 +334,7 @@ def run_iteration(self, my_prio):
self.finish_iteration(timer)

def finish_iteration(self, timer):
logging.debug('Finished iteration.')
logging.info('Finished iteration')
timer.sleep(self.config.getfloat('global', 'iteration_timeout'))

def release_lock_and_return_to_cluster(self):
Expand All @@ -358,15 +358,7 @@ def single_node_primary_iter(self, db_state, zk_state):

self.zk.write(self.zk.TIMELINE_INFO_PATH, db_state['timeline'])

pooler_port_available, pooler_service_running = self.db.pgpooler('status')
if pooler_service_running and not pooler_port_available:
logging.warning('Service alive, but pooler not accepting connections, restarting.')
self.db.pgpooler('stop')
self.db.pgpooler('start')
elif not pooler_service_running:
logging.debug('Here we should open for load.')
self.db.pgpooler('start')

self.db.ensure_pooler_started()
self.db.ensure_archiving_wal()

# Enable async replication
Expand Down Expand Up @@ -446,15 +438,7 @@ def primary_iter(self, db_state, zk_state):

self._drop_stale_switchover(db_state)

pooler_port_available, pooler_service_running = self.db.pgpooler('status')
if pooler_service_running and not pooler_port_available:
logging.warning('Service alive, but pooler not accepting connections, restarting.')
self.db.pgpooler('stop')
self.db.pgpooler('start')
elif not pooler_service_running:
logging.debug('Here we should open for load.')
self.db.pgpooler('start')

self.db.ensure_pooler_started()
# Ensure that wal archiving is enabled. It can be disabled earlier due to
# some zk connectivity issues.
self.db.ensure_archiving_wal()
Expand All @@ -467,12 +451,13 @@ def primary_iter(self, db_state, zk_state):
logging.debug('Checking ha replics for aliveness')
alive_hosts = self.zk.get_alive_hosts(timeout=3, catch_except=False)
ha_replics = {replica for replica in ha_replics_config if replica in alive_hosts}
logging.debug('alive_hosts: {}, ha_replics: {}'.format(alive_hosts, ha_replics))
except Exception:
logging.exception('Fail to get replica status')
ha_replics = ha_replics_config
if len(ha_replics) != len(ha_replics_config):
logging.debug(
'Some of the replics is unavailable, config replics % alive replics %s',
'Some of the replics is unavailable, config replics %s alive replics %s',
str(ha_replics_config),
str(ha_replics),
)
Expand Down Expand Up @@ -552,7 +537,7 @@ def handle_detached_replica(self, db_state):
zk_write_delay = now - self.last_zk_host_stat_write
if zk_write_delay < close_detached_replica_after:
logging.debug(
f'Replica ZK write delay {zk_write_delay} within '
f'Replica ZK write delay {zk_write_delay:.2f} within '
f'{close_detached_replica_after} seconds; keeping replica open'
)
return
Expand Down Expand Up @@ -638,6 +623,7 @@ def replica_return(self, db_state, zk_state):
my_hostname = helpers.get_hostname()
self.write_host_stat(my_hostname, db_state)
holder = zk_state['lock_holder']
logging.debug('Replica was return. So we resume WAL replay to {}'.format(holder))

self.checks['failover'] = 0
limit = self.config.getfloat('replica', 'recovery_timeout')
Expand All @@ -649,7 +635,7 @@ def replica_return(self, db_state, zk_state):
# Wal receiver is not running and
# postgresql isn't in archive recovery
# We should try to restart
logging.warning('We should try switch primary one more time here.')
logging.warning('We should try switch source of WAL to {}'.format(holder))
return self._return_to_cluster(holder, 'replica', is_dead=False)

def _get_streaming_replica_from_replics_info(self, fqdn, replics_info):
Expand Down Expand Up @@ -700,6 +686,7 @@ def non_ha_replica_iter(self, db_state, zk_state):
stream_from, zk_state.get(self.zk.REPLICS_INFO_PATH)
)
wal_receiver_info = self._zk_get_wal_receiver_info(stream_from)
logging.debug('wal_receiver_info: {}'.format(wal_receiver_info))
replication_source_streams = bool(
wal_receiver_info and wal_receiver_info.get('status') == 'streaming'
)
Expand Down Expand Up @@ -1052,7 +1039,7 @@ def _simple_primary_switch(self, limit, new_primary, is_dead):
primary_switch_checks = self.config.getint('replica', 'primary_switch_checks')
need_restart = self.config.getboolean('replica', 'primary_switch_restart')

logging.info('Starting simple primary switch.')
logging.info('Starting simple WAL source switch to {}'.format(new_primary))
if self.checks['primary_switch'] >= primary_switch_checks:
self._set_simple_primary_switch_try()

Expand Down Expand Up @@ -1086,13 +1073,13 @@ def _simple_primary_switch(self, limit, new_primary, is_dead):
#
# The easy way succeeded.
#
logging.info('Simple primary switch succeeded.')
logging.info('ACTION. Simple switch WAL source to {} succeeded'.format(new_primary))
return True
else:
return False

def _rewind_from_source(self, is_postgresql_dead, limit, new_primary):
logging.info("Starting pg_rewind")
logging.info("ACTION. Starting pg_rewind")

# Trying to connect to a new_primary. If not succeeded - exiting
if not helpers.await_for(
Expand Down Expand Up @@ -1230,12 +1217,12 @@ def _return_to_cluster(self, new_primary, role, is_dead=False):
"""
Return to cluster (try stupid method, if it fails we try rewind)
"""
logging.info('Starting returning to cluster.')
logging.info('ACTION. Starting returning to cluster. New WAl source: {}'.format(new_primary))
if self.checks['primary_switch'] >= 0:
self.checks['primary_switch'] += 1
else:
self.checks['primary_switch'] = 1
logging.debug("primary_switch checks is %d", self.checks['primary_switch'])
logging.debug("WAL source switch checks is %d", self.checks['primary_switch'])

self._acquire_replication_source_slot_lock(new_primary)
failover_state = self.zk.noexcept_get(self.zk.FAILOVER_INFO_PATH)
Expand Down Expand Up @@ -1266,9 +1253,9 @@ def _return_to_cluster(self, new_primary, role, is_dead=False):
last_op = self.zk.noexcept_get('%s/%s/op' % (self.zk.MEMBERS_PATH, helpers.get_hostname()))
logging.info('Last op is: %s' % str(last_op))
if role != 'primary' and not self.is_op_destructive(last_op) and not self._is_simple_primary_switch_tried():
logging.info('Trying to do a simple primary switch.')
logging.info('Trying to do a simple WAL source switch to: {}'.format(new_primary))
result = self._try_simple_primary_switch_with_lock(limit, new_primary, is_dead)
logging.info('Primary switch count: %s finish with result: %s', self.checks['primary_switch'], result)
logging.info('WAL source switch count: %s finish with result: %s', self.checks['primary_switch'], result)
return None

#
Expand Down Expand Up @@ -1609,7 +1596,7 @@ def _check_archive_recovery(self, new_primary, limit):

def check_recovery_start():
if self._check_postgresql_streaming(new_primary):
logging.debug('PostgreSQL is already streaming from primary')
logging.debug('PostgreSQL is already streaming from {}'.format(new_primary))
return True

# we can get here with another role or
Expand Down Expand Up @@ -1669,7 +1656,7 @@ def _check_postgresql_streaming(self, primary):
return False

if replica_infos is not None and (pgconsul._is_caught_up(replica_infos) and self.db.check_walreceiver()):
logging.debug('PostgreSQL has started streaming from primary.')
logging.debug('PostgreSQL has started streaming from {}'.format(primary))
return True

return None
Expand All @@ -1680,7 +1667,7 @@ def _wait_for_streaming(self, primary, limit=-1):
With limit=-1 the loop here can be infinite.
"""
check_streaming = functools.partial(self._check_postgresql_streaming, primary)
return helpers.await_for_value(check_streaming, limit, 'PostgreSQL started streaming from primary')
return helpers.await_for_value(check_streaming, limit, 'PostgreSQL started streaming from {}'.format(primary))

def _wait_for_lock(self, lock, limit=-1):
"""
Expand Down
34 changes: 22 additions & 12 deletions src/pg.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,12 @@ def get_replication_slots(self):
return [i[0] for i in res]

def _create_replication_slot(self, slot_name):
logging.info('Creating slot %s.', slot_name)
logging.info('ACTION. Creating slot %s.', slot_name)
query = f"SELECT pg_create_physical_replication_slot('{slot_name}', true)"
return self._exec_without_result(query)

def _drop_replication_slot(self, slot_name):
logging.info('Dropping slot %s.', slot_name)
logging.info('ACTION. Dropping slot %s.', slot_name)
query = f"SELECT pg_drop_replication_slot('{slot_name}')"
return self._exec_without_result(query)

Expand Down Expand Up @@ -493,7 +493,7 @@ def promote(self):
# We need to stop archiving WAL and resume after promote
# to prevent wrong history file in archive in case of failure
if not self.stop_archiving_wal():
logging.error('Could not stop archiving WAL')
logging.error('ACTION. Could not stop archiving WAL')
return False

# We need to resume replaying WAL before promote
Expand All @@ -502,7 +502,7 @@ def promote(self):
promoted = self._cmd_manager.promote(self.pgdata) == 0
if promoted:
if not self.resume_archiving_wal():
logging.error('Could not resume archiving WAL')
logging.error('ACTION. Could not resume archiving WAL')
if self._wait_for_primary_role():
self._plugins.run('after_promote', self.conn_local, self.config)
return promoted
Expand All @@ -514,7 +514,7 @@ def _wait_for_primary_role(self):
sleep_time = self.config.getfloat('global', 'iteration_timeout')
role = self.get_role()
while role != 'primary':
logging.debug('Our role should be primary but we are now "%s".', role)
logging.info('ACTION. Our role should be primary but we are now "%s".', role)
if role is None:
return False
logging.info('Waiting %.1f second(s) to become primary.', sleep_time)
Expand Down Expand Up @@ -609,13 +609,13 @@ def unequal(prev_value):
try:
if reset:
prev_value = self._get_param_value(param)
logging.debug(f'Resetting {param} with ALTER SYSTEM')
logging.info(f'ACTION. Resetting {param} with ALTER SYSTEM')
query = SQL("ALTER SYSTEM RESET {param}").format(param=Identifier(param))
self._exec_query(query.as_string(self.conn_local))
await_func = partial(unequal, prev_value)
await_message = f'{param} is reset after reload'
else:
logging.debug(f'Setting {param} to {value} with ALTER SYSTEM')
logging.info(f'ACTION. Setting {param} to {value} with ALTER SYSTEM')
query = SQL("ALTER SYSTEM SET {param} TO %(value)s").format(param=Identifier(param))
self._exec_query(query.as_string(self.conn_local), value=value)
await_func = equal
Expand All @@ -635,6 +635,16 @@ def unequal(prev_value):
def _change_replication_type(self, synchronous_standby_names):
return self._alter_system_set_param('synchronous_standby_names', synchronous_standby_names)

def ensure_pooler_started(self):
pooler_port_available, pooler_service_running = self.pgpooler('status')
if pooler_service_running and not pooler_port_available:
logging.warning('Service alive, but pooler not accepting connections, restarting.')
self.pgpooler('stop')
self.pgpooler('start')
elif not pooler_service_running:
logging.debug('Here we should open for load.')
self.pgpooler('start')

def ensure_archive_mode(self):
archive_mode = self._get_param_value('archive_mode')
if archive_mode == 'off':
Expand All @@ -644,11 +654,11 @@ def ensure_archive_mode(self):
def ensure_archiving_wal(self):
archive_command = self._get_param_value('archive_command')
if archive_command == self.DISABLED_ARCHIVE_COMMAND:
logging.info('Archive command was disabled, enabling it')
logging.info('ACTION. Archive command was disabled, enabling it')
self.resume_archiving_wal()
config = self._get_postgresql_auto_conf()
if config.get('archive_command') == self.DISABLED_ARCHIVE_COMMAND:
logging.info('Archive command was disabled in postgresql.auto.conf, resetting it')
logging.info('ACTION. Archive command was disabled in postgresql.auto.conf, resetting it')
self.resume_archiving_wal()

def stop_archiving_wal(self):
Expand Down Expand Up @@ -683,7 +693,7 @@ def _alter_system_stopped(self, param, set_value):
Method should be called only with stopped PostgreSQL.
"""
try:
logging.debug(f'Setting {param} to {set_value} in postgresql.auto.conf')
logging.info(f'ACTION. Setting {param} to {set_value} in postgresql.auto.conf')
config = self._get_postgresql_auto_conf()
current_file = os.path.join(self.pgdata, 'postgresql.auto.conf')
new_file = os.path.join(self.pgdata, 'postgresql.auto.conf.new')
Expand All @@ -709,7 +719,7 @@ def checkpoint(self, query=None):
"""
Perform checkpoint
"""
logging.warning('Initiating checkpoint')
logging.warning('ACTION. Initiating checkpoint')
if not query:
query = 'CHECKPOINT'
return self._exec_without_result(query)
Expand Down Expand Up @@ -791,7 +801,7 @@ def terminate_backend(self, pid):
self._exec_without_result(f'SELECT pg_terminate_backend({pid})')

def _pg_wal_replay(self, pause_or_resume):
logging.debug('WAL replay: %s', pause_or_resume)
logging.info('ACTION. WAL replay: %s', pause_or_resume)
self._exec_query(f'SELECT pg_wal_replay_{pause_or_resume}();')

def check_extension_installed(self, name):
Expand Down
Loading

0 comments on commit 30aa761

Please sign in to comment.