Skip to content

Commit

Permalink
[DPE-1757] Log rotation (#94)
Browse files Browse the repository at this point in the history
* Logrotate scaffolds

* pgb exporter should depend on pgb

* Temporary build fix

* Code review fixes

* Bump libs

* Bump to released lib
  • Loading branch information
dragomirp authored Jul 14, 2023
1 parent 7cb23b6 commit 6c7b416
Show file tree
Hide file tree
Showing 11 changed files with 306 additions and 98 deletions.
173 changes: 131 additions & 42 deletions lib/charms/data_platform_libs/v0/data_interfaces.py

Large diffs are not rendered by default.

114 changes: 78 additions & 36 deletions lib/charms/grafana_agent/v0/cos_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,13 @@ class _MetricsEndpointDict(TypedDict):
port: int

except ModuleNotFoundError:
_MetricsEndpointDict = dict
_MetricsEndpointDict = Dict # pyright: ignore

LIBID = "dc15fa84cef84ce58155fb84f6c6213a"
LIBAPI = 0
LIBPATCH = 3
LIBPATCH = 4

PYDEPS = ["cosl", "pydantic"]
PYDEPS = ["cosl", "pydantic<2"]

DEFAULT_RELATION_NAME = "cos-agent"
DEFAULT_PEER_RELATION_NAME = "peers"
Expand All @@ -217,8 +217,12 @@ def _serialize(raw_json: Union[str, bytes]) -> "GrafanaDashboard":
return GrafanaDashboard(encoded)

def _deserialize(self) -> Dict:
raw = lzma.decompress(base64.b64decode(self.encode("utf-8"))).decode()
return json.loads(raw)
try:
raw = lzma.decompress(base64.b64decode(self.encode("utf-8"))).decode()
return json.loads(raw)
except json.decoder.JSONDecodeError as e:
logger.error("Invalid Dashboard format: %s", e)
return {}

def __repr__(self):
"""Return string representation of self."""
Expand Down Expand Up @@ -247,7 +251,7 @@ class CosAgentProviderUnitData(pydantic.BaseModel):


class CosAgentPeersUnitData(pydantic.BaseModel):
"""Unit databag model for `cluster` cos-agent machine charm peer relation."""
"""Unit databag model for `peers` cos-agent machine charm peer relation."""

# We need the principal unit name and relation metadata to be able to render identifiers
# (e.g. topology) on the leader side, after all the data moves into peer data (the grafana
Expand Down Expand Up @@ -307,12 +311,11 @@ def __init__(
refresh_events: List of events on which to refresh relation data.
"""
super().__init__(charm, relation_name)
metrics_endpoints = metrics_endpoints or [DEFAULT_METRICS_ENDPOINT]
dashboard_dirs = dashboard_dirs or ["./src/grafana_dashboards"]

self._charm = charm
self._relation_name = relation_name
self._metrics_endpoints = metrics_endpoints
self._metrics_endpoints = metrics_endpoints or [DEFAULT_METRICS_ENDPOINT]
self._metrics_rules = metrics_rules_dir
self._logs_rules = logs_rules_dir
self._recursive = recurse_rules_dirs
Expand All @@ -339,14 +342,20 @@ def _on_refresh(self, event):
# Add a guard to make sure it doesn't happen.
if relation.data and self._charm.unit in relation.data:
# Subordinate relations can communicate only over unit data.
data = CosAgentProviderUnitData(
metrics_alert_rules=self._metrics_alert_rules,
log_alert_rules=self._log_alert_rules,
dashboards=self._dashboards,
metrics_scrape_jobs=self._scrape_jobs,
log_slots=self._log_slots,
)
relation.data[self._charm.unit][data.KEY] = data.json()
try:
data = CosAgentProviderUnitData(
metrics_alert_rules=self._metrics_alert_rules,
log_alert_rules=self._log_alert_rules,
dashboards=self._dashboards,
metrics_scrape_jobs=self._scrape_jobs,
log_slots=self._log_slots,
)
relation.data[self._charm.unit][data.KEY] = data.json()
except (
pydantic.ValidationError,
json.decoder.JSONDecodeError,
) as e:
logger.error("Invalid relation data provided: %s", e)

@property
def _scrape_jobs(self) -> List[Dict]:
Expand Down Expand Up @@ -387,16 +396,33 @@ class COSAgentDataChanged(EventBase):
"""Event emitted by `COSAgentRequirer` when relation data changes."""


class COSAgentValidationError(EventBase):
"""Event emitted by `COSAgentRequirer` when there is an error in the relation data."""

def __init__(self, handle, message: str = ""):
super().__init__(handle)
self.message = message

def snapshot(self) -> Dict:
"""Save COSAgentValidationError source information."""
return {"message": self.message}

def restore(self, snapshot):
"""Restore COSAgentValidationError source information."""
self.message = snapshot["message"]


class COSAgentRequirerEvents(ObjectEvents):
"""`COSAgentRequirer` events."""

data_changed = EventSource(COSAgentDataChanged)
validation_error = EventSource(COSAgentValidationError)


class COSAgentRequirer(Object):
"""Integration endpoint wrapper for the Requirer side of the cos_agent interface."""

on = COSAgentRequirerEvents()
on = COSAgentRequirerEvents() # pyright: ignore

def __init__(
self,
Expand Down Expand Up @@ -426,7 +452,7 @@ def __init__(
) # TODO: do we need this?
self.framework.observe(events.relation_changed, self._on_relation_data_changed)
for event in self._refresh_events:
self.framework.observe(event, self.trigger_refresh)
self.framework.observe(event, self.trigger_refresh) # pyright: ignore

# Peer relation events
# A peer relation is needed as it is the only mechanism for exchanging data across
Expand All @@ -450,7 +476,7 @@ def _on_peer_relation_changed(self, _):
# Peer data is used for forwarding data from principal units to the grafana agent
# subordinate leader, for updating the app data of the outgoing o11y relations.
if self._charm.unit.is_leader():
self.on.data_changed.emit()
self.on.data_changed.emit() # pyright: ignore

def _on_relation_data_changed(self, event: RelationChangedEvent):
# Peer data is the only means of communication between subordinate units.
Expand All @@ -474,7 +500,9 @@ def _on_relation_data_changed(self, event: RelationChangedEvent):

if not (raw := cos_agent_relation.data[principal_unit].get(CosAgentProviderUnitData.KEY)):
return
provider_data = CosAgentProviderUnitData(**json.loads(raw))

if not (provider_data := self._validated_provider_data(raw)):
return

# Copy data from the principal relation to the peer relation, so the leader could
# follow up.
Expand All @@ -492,12 +520,19 @@ def _on_relation_data_changed(self, event: RelationChangedEvent):
# We can't easily tell if the data that was changed is limited to only the data
# that goes into peer relation (in which case, if this is not a leader unit, we wouldn't
# need to emit `on.data_changed`), so we're emitting `on.data_changed` either way.
self.on.data_changed.emit()
self.on.data_changed.emit() # pyright: ignore

def _validated_provider_data(self, raw) -> Optional[CosAgentProviderUnitData]:
try:
return CosAgentProviderUnitData(**json.loads(raw))
except (pydantic.ValidationError, json.decoder.JSONDecodeError) as e:
self.on.validation_error.emit(message=str(e)) # pyright: ignore
return None

def trigger_refresh(self, _):
"""Trigger a refresh of relation data."""
# FIXME: Figure out what we should do here
self.on.data_changed.emit()
self.on.data_changed.emit() # pyright: ignore

@property
def _principal_unit(self) -> Optional[Unit]:
Expand Down Expand Up @@ -529,17 +564,24 @@ def _principal_unit_data(self) -> Optional[CosAgentProviderUnitData]:
Relies on the fact that, for subordinate relations, the only remote unit visible to
*this unit* is the principal unit that this unit is attached to.
"""
if relations := self._principal_relations:
# Technically it's a list, but for subordinates there can only be one relation
principal_relation = next(iter(relations))
if units := principal_relation.units:
# Technically it's a list, but for subordinates there can only be one
unit = next(iter(units))
raw = principal_relation.data[unit].get(CosAgentProviderUnitData.KEY)
if raw:
return CosAgentProviderUnitData(**json.loads(raw))
if not (relations := self._principal_relations):
return None

return None
# Technically it's a list, but for subordinates there can only be one relation
principal_relation = next(iter(relations))

if not (units := principal_relation.units):
return None

# Technically it's a list, but for subordinates there can only be one
unit = next(iter(units))
if not (raw := principal_relation.data[unit].get(CosAgentProviderUnitData.KEY)):
return None

if not (provider_data := self._validated_provider_data(raw)):
return None

return provider_data

def _gather_peer_data(self) -> List[CosAgentPeersUnitData]:
"""Collect data from the peers.
Expand Down Expand Up @@ -578,7 +620,7 @@ def metrics_alerts(self) -> Dict[str, Any]:
alert_rules = {}

seen_apps: List[str] = []
for data in self._gather_peer_data(): # type: CosAgentPeersUnitData
for data in self._gather_peer_data():
if rules := data.metrics_alert_rules:
app_name = data.app_name
if app_name in seen_apps:
Expand Down Expand Up @@ -649,7 +691,7 @@ def logs_alerts(self) -> Dict[str, Any]:
alert_rules = {}
seen_apps: List[str] = []

for data in self._gather_peer_data(): # type: CosAgentPeersUnitData
for data in self._gather_peer_data():
if rules := data.log_alert_rules:
# This is only used for naming the file, so be as specific as we can be
app_name = data.app_name
Expand Down Expand Up @@ -678,10 +720,10 @@ def dashboards(self) -> List[Dict[str, str]]:
Dashboards are assumed not to vary across units of the same primary.
"""
dashboards: List[Dict[str, str]] = []
dashboards: List[Dict[str, Any]] = []

seen_apps: List[str] = []
for data in self._gather_peer_data(): # type: CosAgentPeersUnitData
for data in self._gather_peer_data():
app_name = data.app_name
if app_name in seen_apps:
continue # dedup!
Expand Down
51 changes: 35 additions & 16 deletions lib/charms/operator_libs_linux/v2/snap.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 0
LIBPATCH = 1


# Regex to locate 7-bit C1 ANSI sequences
Expand Down Expand Up @@ -273,13 +273,13 @@ def _snap(self, command: str, optargs: Optional[Iterable[str]] = None) -> str:
SnapError if there is a problem encountered
"""
optargs = optargs or []
_cmd = ["snap", command, self._name, *optargs]
args = ["snap", command, self._name, *optargs]
try:
return subprocess.check_output(_cmd, universal_newlines=True)
return subprocess.check_output(args, universal_newlines=True)
except CalledProcessError as e:
raise SnapError(
"Snap: {!r}; command {!r} failed with output = {!r}".format(
self._name, _cmd, e.output
self._name, args, e.output
)
)

Expand All @@ -303,12 +303,12 @@ def _snap_daemons(
else:
services = [self._name]

_cmd = ["snap", *command, *services]
args = ["snap", *command, *services]

try:
return subprocess.run(_cmd, universal_newlines=True, check=True, capture_output=True)
return subprocess.run(args, universal_newlines=True, check=True, capture_output=True)
except CalledProcessError as e:
raise SnapError("Could not {} for snap [{}]: {}".format(_cmd, self._name, e.stderr))
raise SnapError("Could not {} for snap [{}]: {}".format(args, self._name, e.stderr))

def get(self, key) -> str:
"""Fetch a snap configuration value.
Expand Down Expand Up @@ -387,11 +387,11 @@ def connect(
elif slot:
command = command + [slot]

_cmd = ["snap", *command]
args = ["snap", *command]
try:
subprocess.run(_cmd, universal_newlines=True, check=True, capture_output=True)
subprocess.run(args, universal_newlines=True, check=True, capture_output=True)
except CalledProcessError as e:
raise SnapError("Could not {} for snap [{}]: {}".format(_cmd, self._name, e.stderr))
raise SnapError("Could not {} for snap [{}]: {}".format(args, self._name, e.stderr))

def hold(self, duration: Optional[timedelta] = None) -> None:
"""Add a refresh hold to a snap.
Expand All @@ -409,6 +409,25 @@ def unhold(self) -> None:
"""Remove the refresh hold of a snap."""
self._snap("refresh", ["--unhold"])

def alias(self, application: str, alias: Optional[str] = None) -> None:
"""Create an alias for a given application.
Args:
application: application to get an alias.
alias: (optional) name of the alias; if not provided, the application name is used.
"""
if alias is None:
alias = application
args = ["snap", "alias", f"{self.name}.{application}", alias]
try:
subprocess.check_output(args, universal_newlines=True)
except CalledProcessError as e:
raise SnapError(
"Snap: {!r}; command {!r} failed with output = {!r}".format(
self._name, args, e.output
)
)

def restart(
self, services: Optional[List[str]] = None, reload: Optional[bool] = False
) -> None:
Expand Down Expand Up @@ -992,17 +1011,17 @@ def install_local(
Raises:
SnapError if there is a problem encountered
"""
_cmd = [
args = [
"snap",
"install",
filename,
]
if classic:
_cmd.append("--classic")
args.append("--classic")
if dangerous:
_cmd.append("--dangerous")
args.append("--dangerous")
try:
result = subprocess.check_output(_cmd, universal_newlines=True).splitlines()[-1]
result = subprocess.check_output(args, universal_newlines=True).splitlines()[-1]
snap_name, _ = result.split(" ", 1)
snap_name = ansi_filter.sub("", snap_name)

Expand All @@ -1026,9 +1045,9 @@ def _system_set(config_item: str, value: str) -> None:
config_item: name of snap system setting. E.g. 'refresh.hold'
value: value to assign
"""
_cmd = ["snap", "set", "system", "{}={}".format(config_item, value)]
args = ["snap", "set", "system", "{}={}".format(config_item, value)]
try:
subprocess.check_call(_cmd, universal_newlines=True)
subprocess.check_call(args, universal_newlines=True)
except CalledProcessError:
raise SnapError("Failed setting system config '{}' to '{}'".format(config_item, value))

Expand Down
15 changes: 15 additions & 0 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,19 @@ def _on_install(self, _) -> None:
f"/etc/systemd/system/{PGB}-{self.app.name}@.service", rendered, perms=0o644
)
systemd.daemon_reload()
# Render the logrotate config
with open("templates/logrotate.j2", "r") as file:
template = Template(file.read())
# Logrotate expects the file to be owned by root
with open(f"/etc/logrotate.d/{PGB}-{self.app.name}", "w+") as file:
file.write(
template.render(
log_dir=PGB_LOG_DIR,
app_name=self.app.name,
service_ids=self.service_ids,
prefix=PGB,
)
)

self.unit.status = WaitingStatus("Waiting to start PgBouncer")

Expand All @@ -165,6 +178,7 @@ def _on_remove(self, _) -> None:

os.remove(f"/etc/systemd/system/{PGB}-{self.app.name}@.service")
self.remove_exporter_service()
os.remove(f"/etc/logrotate.d/{PGB}-{self.app.name}")

shutil.rmtree(f"{PGB_CONF_DIR}/{self.app.name}")
shutil.rmtree(f"{PGB_LOG_DIR}/{self.app.name}")
Expand Down Expand Up @@ -386,6 +400,7 @@ def render_prometheus_service(self):
# Render the template file with the correct values.
rendered = template.render(
stats_user=self.backend.stats_user,
pgb_service=f"{PGB}-{self.app.name}",
stats_password=self.peers.get_secret("app", MONITORING_PASSWORD_KEY),
listen_port=self.config["listen_port"],
metrics_port=self.config["metrics_port"],
Expand Down
Loading

0 comments on commit 6c7b416

Please sign in to comment.