Skip to content

Commit

Permalink
[DPE-4414][DPE-4450] Limit intances based on connection type (#251)
Browse files Browse the repository at this point in the history
* Limit intances based on connection type

* Use socket for new interface integrations

* Increase the amount of instances for exposed rels

* Bump libs

* Reload service

* Stop excessive services during upgrade

* Tweak integration tests

* Use socket for legacy interface

* Revert "Use socket for legacy interface"

This reverts commit 4972b49.

* Update libs

* Remove superfluous check

* Switch back to localhost

* Coverage

* Update src/charm.py

Co-authored-by: Marcelo Henrique Neppel <[email protected]>

---------

Co-authored-by: Marcelo Henrique Neppel <[email protected]>
  • Loading branch information
dragomirp and marceloneppel authored Jun 14, 2024
1 parent 61fd34b commit 1bd4da7
Show file tree
Hide file tree
Showing 12 changed files with 259 additions and 151 deletions.
13 changes: 12 additions & 1 deletion lib/charms/data_platform_libs/v0/upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ def restart(self, event) -> None:

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 16
LIBPATCH = 17

PYDEPS = ["pydantic>=1.10,<2", "poetry-core"]

Expand Down Expand Up @@ -907,6 +907,17 @@ def _on_upgrade_charm(self, event: UpgradeCharmEvent) -> None:
logger.error(e)
self.set_unit_failed()
return
top_unit_id = self.upgrade_stack[-1]
top_unit = self.charm.model.get_unit(f"{self.charm.app.name}/{top_unit_id}")
if (
top_unit == self.charm.unit
and self.peer_relation.data[self.charm.unit].get("state") == "recovery"
):
# While in a rollback and the Juju leader unit is the top unit in the upgrade stack, emit the event
# for this unit to start the rollback.
self.peer_relation.data[self.charm.unit].update({"state": "ready"})
self.on_upgrade_changed(event)
return
self.charm.unit.status = WaitingStatus("other units upgrading first...")
self.peer_relation.data[self.charm.unit].update({"state": "ready"})

Expand Down
26 changes: 15 additions & 11 deletions lib/charms/postgresql_k8s/v0/postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 26
LIBPATCH = 27

INVALID_EXTRA_USER_ROLE_BLOCKING_MESSAGE = "invalid role(s) for extra user roles"

Expand Down Expand Up @@ -111,20 +111,19 @@ def __init__(
self.system_users = system_users

def _connect_to_database(
self, database: str = None, connect_to_current_host: bool = False
self, database: str = None, database_host: str = None
) -> psycopg2.extensions.connection:
"""Creates a connection to the database.
Args:
database: database to connect to (defaults to the database
provided when the object for this class was created).
connect_to_current_host: whether to connect to the current host
instead of the primary host.
database_host: host to connect to instead of the primary host.
Returns:
psycopg2 connection object.
"""
host = self.current_host if connect_to_current_host else self.primary_host
host = database_host if database_host is not None else self.primary_host
connection = psycopg2.connect(
f"dbname='{database if database else self.database}' user='{self.user}' host='{host}'"
f"password='{self.password}' connect_timeout=1"
Expand Down Expand Up @@ -388,7 +387,7 @@ def get_postgresql_text_search_configs(self) -> Set[str]:
Set of PostgreSQL text search configs.
"""
with self._connect_to_database(
connect_to_current_host=True
database_host=self.current_host
) as connection, connection.cursor() as cursor:
cursor.execute("SELECT CONCAT('pg_catalog.', cfgname) FROM pg_ts_config;")
text_search_configs = cursor.fetchall()
Expand All @@ -401,7 +400,7 @@ def get_postgresql_timezones(self) -> Set[str]:
Set of PostgreSQL timezones.
"""
with self._connect_to_database(
connect_to_current_host=True
database_host=self.current_host
) as connection, connection.cursor() as cursor:
cursor.execute("SELECT name FROM pg_timezone_names;")
timezones = cursor.fetchall()
Expand Down Expand Up @@ -434,7 +433,7 @@ def is_tls_enabled(self, check_current_host: bool = False) -> bool:
"""
try:
with self._connect_to_database(
connect_to_current_host=check_current_host
database_host=self.current_host if check_current_host else None
) as connection, connection.cursor() as cursor:
cursor.execute("SHOW ssl;")
return "on" in cursor.fetchone()[0]
Expand Down Expand Up @@ -502,19 +501,24 @@ def set_up_database(self) -> None:
if connection is not None:
connection.close()

def update_user_password(self, username: str, password: str) -> None:
def update_user_password(
self, username: str, password: str, database_host: str = None
) -> None:
"""Update a user password.
Args:
username: the user to update the password.
password: the new password for the user.
database_host: the host to connect to.
Raises:
PostgreSQLUpdateUserPasswordError if the password couldn't be changed.
"""
connection = None
try:
with self._connect_to_database() as connection, connection.cursor() as cursor:
with self._connect_to_database(
database_host=database_host
) as connection, connection.cursor() as cursor:
cursor.execute(
sql.SQL("ALTER USER {} WITH ENCRYPTED PASSWORD '" + password + "';").format(
sql.Identifier(username)
Expand Down Expand Up @@ -610,7 +614,7 @@ def validate_date_style(self, date_style: str) -> bool:
"""
try:
with self._connect_to_database(
connect_to_current_host=True
database_host=self.current_host
) as connection, connection.cursor() as cursor:
cursor.execute(
sql.SQL(
Expand Down
5 changes: 3 additions & 2 deletions lib/charms/postgresql_k8s/v0/postgresql_tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from ops.charm import ActionEvent, RelationBrokenEvent
from ops.framework import Object
from ops.pebble import ConnectionError, PathError, ProtocolError
from tenacity import RetryError

# The unique Charmhub library identifier, never change it
LIBID = "c27af44a92df4ef38d7ae06418b2800f"
Expand All @@ -43,7 +44,7 @@

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version.
LIBPATCH = 8
LIBPATCH = 9

logger = logging.getLogger(__name__)
SCOPE = "unit"
Expand Down Expand Up @@ -142,7 +143,7 @@ def _on_certificate_available(self, event: CertificateAvailableEvent) -> None:
logger.debug("Cannot push TLS certificates at this moment")
event.defer()
return
except (ConnectionError, PathError, ProtocolError) as e:
except (ConnectionError, PathError, ProtocolError, RetryError) as e:
logger.error("Cannot push TLS certificates: %r", e)
event.defer()
return
Expand Down
122 changes: 73 additions & 49 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,7 @@ def __init__(self, *args):
self.legacy_db_admin_relation = DbProvides(self, admin=True)
self.tls = PostgreSQLTLS(self, PEER_RELATION_NAME)

self._cores = max(min(os.cpu_count(), 4), 2)
self.service_ids = list(range(self._cores))
self.service_ids = list(range(self.instances_count))
self.pgb_services = [
f"{PGB}-{self.app.name}@{service_id}" for service_id in self.service_ids
]
Expand All @@ -147,10 +146,28 @@ def __init__(self, *args):
self, relation_name=TRACING_RELATION_NAME, protocols=[TRACING_PROTOCOL]
)

@property
def instances_count(self):
"""Returns the amount of instances to spin based on expose."""
if self._is_exposed:
return max(min(os.cpu_count(), 4), 2)
else:
return 1

# =======================
# Charm Lifecycle Hooks
# =======================

def create_instance_directories(self):
"""Create configuration directories for pgbouncer instances."""
pg_user = pwd.getpwnam(PG_USER)
app_conf_dir = f"{PGB_CONF_DIR}/{self.app.name}"

# Make a directory for each service to store configs.
for service_id in self.service_ids:
os.makedirs(f"{app_conf_dir}/{INSTANCE_DIR}{service_id}", 0o700, exist_ok=True)
os.chown(f"{app_conf_dir}/{INSTANCE_DIR}{service_id}", pg_user.pw_uid, pg_user.pw_gid)

@property
def tracing_endpoint(self) -> Optional[str]:
"""Otlp http endpoint for charm instrumentation."""
Expand All @@ -159,11 +176,6 @@ def tracing_endpoint(self) -> Optional[str]:

def render_utility_files(self):
"""Render charm utility services and configuration."""
# Initialise pgbouncer.ini config files from defaults set in charm lib and current config.
# We'll add basic configs for now even if this unit isn't a leader, so systemd doesn't
# throw a fit.
self.render_pgb_config()

# Render pgbouncer service file and reload systemd
with open("templates/pgbouncer.service.j2", "r") as file:
template = Template(file.read())
Expand Down Expand Up @@ -214,14 +226,8 @@ def _on_install(self, _) -> None:
error_message = "Failed to stop and disable pgbackrest snap service"
logger.exception(error_message, exc_info=e)

pg_user = pwd.getpwnam(PG_USER)
app_conf_dir = f"{PGB_CONF_DIR}/{self.app.name}"

# Make a directory for each service to store configs.
for service_id in self.service_ids:
os.makedirs(f"{app_conf_dir}/{INSTANCE_DIR}{service_id}", 0o700, exist_ok=True)
os.chown(f"{app_conf_dir}/{INSTANCE_DIR}{service_id}", pg_user.pw_uid, pg_user.pw_gid)

self.create_instance_directories()
self.render_pgb_config()
self.render_utility_files()

self.unit.status = WaitingStatus("Waiting to start PgBouncer")
Expand Down Expand Up @@ -501,7 +507,8 @@ def _on_config_changed(self, event) -> None:
restarts pgbouncer to apply changes.
"""
old_port = self.peers.app_databag.get("current_port")
if old_port != str(self.config["listen_port"]) and self._is_exposed:
port_changed = old_port != str(self.config["listen_port"])
if port_changed and self._is_exposed:
if self.unit.is_leader():
self.peers.app_databag["current_port"] = str(self.config["listen_port"])
# Open port
Expand All @@ -515,7 +522,7 @@ def _on_config_changed(self, event) -> None:
# TODO hitting upgrade errors here due to secrets labels failing to set on non-leaders.
# deferring until the leader manages to set the label
try:
self.render_pgb_config(reload_pgbouncer=True)
self.render_pgb_config(reload_pgbouncer=True, restart=port_changed)
except ModelError:
logger.warning("Deferring on_config_changed: cannot set secret label")
event.defer()
Expand Down Expand Up @@ -548,13 +555,16 @@ def check_pgb_running(self):

return True

def reload_pgbouncer(self):
def reload_pgbouncer(self, restart=False):
"""Restarts systemd pgbouncer service."""
initial_status = self.unit.status
self.unit.status = MaintenanceStatus("Reloading Pgbouncer")
try:
for service in self.pgb_services:
systemd.service_restart(service)
if restart or not systemd.service_running(service):
systemd.service_restart(service)
else:
systemd.service_reload(service)
self.unit.status = initial_status
except systemd.SystemdError as e:
logger.error(e)
Expand Down Expand Up @@ -683,7 +693,7 @@ def _get_relation_config(self) -> [Dict[str, Dict[str, Union[str, bool]]]]:
}
return pgb_dbs

def render_pgb_config(self, reload_pgbouncer=False):
def render_pgb_config(self, reload_pgbouncer=False, restart=False):
"""Derives config files for the number of required services from given config.
This method takes a primary config and generates one unique config for each intended
Expand All @@ -692,6 +702,17 @@ def render_pgb_config(self, reload_pgbouncer=False):
initial_status = self.unit.status
self.unit.status = MaintenanceStatus("updating PgBouncer config")

# If exposed relation, open the listen port for all units
if self._is_exposed:
# Open port
try:
self.unit.open_port("tcp", self.config["listen_port"])
except ModelError:
logger.exception("failed to open port")

self.create_instance_directories()
self.render_utility_files()

# Render primary config. This config is the only copy that the charm reads from to create
# PgbConfig objects, and is modified below to implement individual services.
app_conf_dir = f"{PGB_CONF_DIR}/{self.app.name}"
Expand All @@ -704,7 +725,7 @@ def render_pgb_config(self, reload_pgbouncer=False):
min_pool_size = 10
reserve_pool_size = 10
else:
effective_db_connections = max_db_connections / self._cores
effective_db_connections = max_db_connections / self.instances_count
default_pool_size = math.ceil(effective_db_connections / 2)
min_pool_size = math.ceil(effective_db_connections / 4)
reserve_pool_size = math.ceil(effective_db_connections / 4)
Expand All @@ -720,37 +741,40 @@ def render_pgb_config(self, reload_pgbouncer=False):
# Modify & render config files for each service instance
for service_id in self.service_ids:
self.unit.status = MaintenanceStatus("updating PgBouncer config")
self.render_file(
f"{app_conf_dir}/{INSTANCE_DIR}{service_id}/pgbouncer.ini",
template.render(
databases=databases,
readonly_databases=readonly_dbs,
peer_id=service_id,
base_socket_dir=f"{app_temp_dir}/{INSTANCE_DIR}",
peers=self.service_ids,
log_file=f"{app_log_dir}/{INSTANCE_DIR}{service_id}/pgbouncer.log",
pid_file=f"{app_temp_dir}/{INSTANCE_DIR}{service_id}/pgbouncer.pid",
listen_addr=addr,
listen_port=self.config["listen_port"],
pool_mode=self.config["pool_mode"],
max_db_connections=max_db_connections,
default_pool_size=default_pool_size,
min_pool_size=min_pool_size,
reserve_pool_size=reserve_pool_size,
stats_user=self.backend.stats_user,
auth_query=self.backend.auth_query,
auth_file=f"{app_conf_dir}/{AUTH_FILE_NAME}",
enable_tls=enable_tls,
key_file=f"{app_conf_dir}/{TLS_KEY_FILE}",
ca_file=f"{app_conf_dir}/{TLS_CA_FILE}",
cert_file=f"{app_conf_dir}/{TLS_CERT_FILE}",
),
0o700,
)
try:
self.render_file(
f"{app_conf_dir}/{INSTANCE_DIR}{service_id}/pgbouncer.ini",
template.render(
databases=databases,
readonly_databases=readonly_dbs,
peer_id=service_id,
base_socket_dir=f"{app_temp_dir}/{INSTANCE_DIR}",
peers=self.service_ids,
log_file=f"{app_log_dir}/{INSTANCE_DIR}{service_id}/pgbouncer.log",
pid_file=f"{app_temp_dir}/{INSTANCE_DIR}{service_id}/pgbouncer.pid",
listen_addr=addr,
listen_port=self.config["listen_port"],
pool_mode=self.config["pool_mode"],
max_db_connections=max_db_connections,
default_pool_size=default_pool_size,
min_pool_size=min_pool_size,
reserve_pool_size=reserve_pool_size,
stats_user=self.backend.stats_user,
auth_query=self.backend.auth_query,
auth_file=f"{app_conf_dir}/{AUTH_FILE_NAME}",
enable_tls=enable_tls,
key_file=f"{app_conf_dir}/{TLS_KEY_FILE}",
ca_file=f"{app_conf_dir}/{TLS_CA_FILE}",
cert_file=f"{app_conf_dir}/{TLS_CERT_FILE}",
),
0o700,
)
except FileNotFoundError:
logger.warning("Service %s not yet rendered" % service_id)
self.unit.status = initial_status

if reload_pgbouncer:
self.reload_pgbouncer()
self.reload_pgbouncer(restart)

def render_prometheus_service(self):
"""Render a unit file for the prometheus exporter and restarts the service."""
Expand Down
2 changes: 2 additions & 0 deletions src/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,7 @@
"ca": "cauth",
}

SOCKET_LOCATION = "/tmp/snap-private-tmp/snap.charmed-postgresql/tmp/pgbouncer/instance_0"

TRACING_RELATION_NAME = "tracing"
TRACING_PROTOCOL = "otlp_http"
12 changes: 0 additions & 12 deletions src/relations/pgbouncer_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
Application,
BlockedStatus,
MaintenanceStatus,
ModelError,
)

from constants import CLIENT_RELATION_NAME
Expand Down Expand Up @@ -111,17 +110,6 @@ def _on_database_requested(self, event: DatabaseRequestedEvent) -> None:
event.defer()
return

# If exposed relation, open the listen port for all units
if event.external_node_connectivity:
# Open port
try:
self.charm.unit.open_port("tcp", self.charm.config["listen_port"])
except ModelError:
logger.exception("failed to open port")

if not self.charm.unit.is_leader():
return

# Retrieve the database name and extra user roles using the charm library.
database = event.database
extra_user_roles = event.extra_user_roles or ""
Expand Down
Loading

0 comments on commit 1bd4da7

Please sign in to comment.