Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SNAC Audit] Install and configure syslog-ng from SNAC tool #59

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0


## [Unreleased]
### Added
* Install and configure `auditd` in order to log system activites.
* Install and configure `syslog-ng` in order to log system activites.

### Changed
* Restricted write access to system logs in `/var/log` to System Maintainers (root) and Auditors via the `adm` group.
* Restricted write access to `auditd.conf` to System Maintainers and Admins via the `sudo` group.


## [1.0.0] - 2024-12-16
Expand All @@ -16,7 +23,7 @@ Release corresponding to the NILRT 11.0 (2025Q1) distribution release.

### Added
* Added a `verify` operation to non-destructively check that the system is still SNAC-compliant. (#15)
* Added a system test fixture that sets up a wireguard tunnel between a Windows host and a SNAc device (#41).
* Added a system test fixture that sets up a wireguard tunnel between a Windows host and a SNAC device (#41).


### Changed
Expand Down
5 changes: 5 additions & 0 deletions nilrt_snac/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ def _parse_args(argv: List[str]) -> argparse.Namespace:
action="store_true",
help="Consent to changes",
)
configure_parser.add_argument(
"--audit-email",
type=str,
help="Email address for audit actions",
)
configure_parser.set_defaults(func=_configure)

verify_parser = subparsers.add_parser("verify", help="Verify SNAC mode configured correctly")
Expand Down
31 changes: 30 additions & 1 deletion nilrt_snac/_common.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,33 @@
import grp
import os
import pathlib
import stat
import subprocess


def _check_group_ownership(path: str, group: str) -> bool:
"Checks if the group ownership of a file or directory matches the specified group."
stat_info = os.stat(path)
gid = stat_info.st_gid
group_info = grp.getgrgid(gid)

return group_info.gr_name == group

def _check_owner(path: str, owner: str) -> bool:
"Checks if the owner of a file or directory matches the specified owner."
stat_info = os.stat(path)
uid = stat_info.st_uid
owner_info = grp.getgrgid(uid)
return owner_info.gr_name == owner

def _check_permissions(path: str, expected_mode: int) -> bool:
"Checks if the permissions of a file or directory match the expected mode."
stat_info = os.stat(path)
return stat.S_IMODE(stat_info.st_mode) == expected_mode

def _cmd(*args: str):
"Syntactic sugar for running shell commands."
subprocess.run(args, check=True)

def get_distro():
try:
Expand All @@ -9,4 +38,4 @@ def get_distro():
if line.startswith("ID="):
return line.split("=")[1].strip()
except NameError:
return None
return None
4 changes: 4 additions & 0 deletions nilrt_snac/_configs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import List

from nilrt_snac._configs._auditd_config import _AuditdConfig
from nilrt_snac._configs._base_config import _BaseConfig
from nilrt_snac._configs._console_config import _ConsoleConfig
from nilrt_snac._configs._cryptsetup_config import _CryptSetupConfig
Expand All @@ -13,6 +14,7 @@
from nilrt_snac._configs._ssh_config import _SshConfig
from nilrt_snac._configs._sudo_config import _SudoConfig
from nilrt_snac._configs._sysapi_config import _SysAPIConfig
from nilrt_snac._configs._syslog_ng_config import _SyslogConfig
from nilrt_snac._configs._tmux_config import _TmuxConfig
from nilrt_snac._configs._wifi_config import _WIFIConfig
from nilrt_snac._configs._wireguard_config import _WireguardConfig
Expand All @@ -36,4 +38,6 @@
_SshConfig(),
_SudoConfig(),
_FirewallConfig(),
_AuditdConfig(),
_SyslogConfig(),
]
202 changes: 202 additions & 0 deletions nilrt_snac/_configs/_auditd_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
import argparse
import grp
import os
import re
import socket
from typing import List

from nilrt_snac import logger
from nilrt_snac._configs._base_config import _BaseConfig
from nilrt_snac._common import _check_group_ownership, _check_owner, _check_permissions, _cmd
from nilrt_snac._configs._config_file import EqualsDelimitedConfigFile, _ConfigFile
from nilrt_snac.opkg import opkg_helper

def ensure_groups_exist(groups: List[str]) -> None:
"Ensures the specified groups exist on the system."
for group in groups:
try:
grp.getgrnam(group)
except KeyError:
_cmd("groupadd", group)
logger.info(f"Group {group} created.")

def format_email_template_text(audit_email: str) -> str:
return f"""
#!/usr/bin/perl
use strict;
use warnings;
use Net::SMTP;
# Configuration
my $smtp_server = 'smtp.yourisp.com';
my $smtp_user = '[email protected]';
my $smtp_pass = 'your_password';
my $from = '[email protected]';
my $to = '{audit_email}';
my $subject = 'Audit Alert';
my $body = "A critical audit event has been triggered: $ARGV[0]";
# Create SMTP object
my $smtp = Net::SMTP->new($smtp_server, Timeout => 60)
or die "Could not connect to SMTP server: $!";
# Authenticate
$smtp->auth($smtp_user, $smtp_pass)
or die "SMTP authentication failed: $!";
# Send email
$smtp->mail($from)
or die "Error setting sender: $!";
$smtp->to($to)
or die "Error setting recipient: $!";
$smtp->data()
or die "Error starting data: $!";
$smtp->datasend("To: $to\\n");
$smtp->datasend("From: $from\\n");
$smtp->datasend("Subject: $subject\\n");
$smtp->datasend("\\n");
$smtp->datasend("$body\\n");
$smtp->dataend()
or die "Error ending data: $!";
$smtp->quit;
"""

def is_valid_email(email: str) -> bool:
"Validates an email address."
email_regex = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+$'
return re.match(email_regex, email) is not None



class _AuditdConfig(_BaseConfig):
def __init__(self):
self._opkg_helper = opkg_helper
self.log_path = os.path.realpath('/var/log')
self.audit_config_path = '/etc/audit/auditd.conf'

def configure(self, args: argparse.Namespace) -> None:
print("Configuring auditd...")
auditd_config_file = EqualsDelimitedConfigFile(self.audit_config_path)
dry_run: bool = args.dry_run

# Check if auditd is already installed
if not self._opkg_helper.is_installed("auditd"):
self._opkg_helper.install("auditd")

#Ensure proper groups exist
groups_required = ["adm", "sudo"]
ensure_groups_exist(groups_required)

# Prompt for email if not provided
audit_email = args.audit_email
unattended_bypass = args.yes
if not audit_email:
audit_email = auditd_config_file.get("action_mail_acct")
if not is_valid_email(audit_email) and not unattended_bypass:
while not audit_email.strip() or not is_valid_email(audit_email):
audit_email = input("Please enter your audit email address: ")
else:
# Use local default e-mail
audit_email = f"root@{socket.gethostname()}"

if is_valid_email(audit_email):
auditd_config_file.update(r'^action_mail_acct\s*=.*$', f'action_mail_acct = {audit_email}')

# Install recommended SMTP package dependency
if not self._opkg_helper.is_installed("perl-module-net-smtp"):
self._opkg_helper.install("perl-module-net-smtp")

# Install auditd plugin package to allow for watch scripts to be used
if not self._opkg_helper.is_installed("audispd-plugins"):
self._opkg_helper.install("audispd-plugins")

# Create template audit rule script to send email alerts
audit_rule_script_path = '/etc/audit/audit_email_alert.pl'
if not os.path.exists(audit_rule_script_path):
audit_rule_script = format_email_template_text(audit_email)

with open(audit_rule_script_path, "w") as file:
file.write(audit_rule_script)

# Set the appropriate permissions
_cmd("chown", "root:sudo", audit_rule_script_path)
_cmd("chmod", "700", audit_rule_script_path)

audit_email_conf_path = '/etc/audit/plugins.d/audit_email_alert.conf'
if not os.path.exists(audit_email_conf_path):
audit_email_config = """
active = yes
direction = out
path = {audit_rule_script_path}
type = always
""".format(audit_rule_script_path=audit_rule_script_path)

with open(audit_email_conf_path, "w") as file:
file.write(audit_email_config)

# Set the appropriate permissions
audit_email_file = _ConfigFile(audit_email_conf_path)
audit_email_file.chown("root", "sudo")
audit_email_file.chmod(0o600)
audit_email_file.save(dry_run)


# Set the appropriate permissions to allow only root and the 'sudo' group to read/write
auditd_config_file.chown("root", "sudo")
auditd_config_file.chmod(0o660)
auditd_config_file.save(dry_run)

# Enable and start auditd service
if not os.path.exists("/etc/rc2.d/S20auditd"):
_cmd("update-rc.d", "auditd", "defaults")
_cmd("/etc/init.d/auditd", "restart")

# Set the appropriate permissions to allow only root and the 'adm' group to write/read
_cmd('chown', '-R', 'root:adm', self.log_path)
_cmd('chmod', '-R', '770', self.log_path)

# Ensure new log files created by the system inherit these permissions
_cmd('setfacl', '-d', '-m', 'g:adm:rwx', self.log_path)
_cmd('setfacl', '-d', '-m', 'o::0', self.log_path)



def verify(self, args: argparse.Namespace) -> bool:
print("Verifying auditd configuration...")
valid: bool = True

# Check if auditd is installed
valid = valid and self._opkg_helper.is_installed("auditd")

# Check if auditd config
auditd_config_file = EqualsDelimitedConfigFile(self.audit_config_path)
if not auditd_config_file.exists():
valid = False
logger.error(f"MISSING: {auditd_config_file.path} not found")
elif not is_valid_email(auditd_config_file.get("action_mail_acct")):
valid = False
logger.error("MISSING: expected action_mail_acct value")

# Check group ownership and permissions of auditd.conf
if not _check_group_ownership(self.audit_config_path, "sudo"):
logger.error(f"ERROR: {self.audit_config_path} is not owned by the 'sudo' group.")
valid = False
if not _check_permissions(self.audit_config_path, 0o660):
logger.error(f"ERROR: {self.audit_config_path} does not have 660 permissions.")
valid = False
if not _check_owner(self.audit_config_path, "root"):
logger.error(f"ERROR: {self.audit_config_path} is not owned by 'root'.")
valid = False

# Check group ownership and permissions of /var/log
if not _check_group_ownership(self.log_path, "adm"):
logger.error(f"ERROR: {self.log_path} is not owned by the 'adm' group.")
valid = False
if not _check_permissions(self.log_path, 0o770):
logger.error(f"ERROR: {self.log_path} does not have 770 permissions.")
valid = False
if not _check_owner(self.log_path, "root"):
logger.error(f"ERROR: {self.log_path} is not owned by 'root'.")
valid = False

return valid
27 changes: 27 additions & 0 deletions nilrt_snac/_configs/_config_file.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
"""Helper class to read/write and update configuration files."""

import grp
import os
import pathlib
import pwd
import re
from typing import Union

Expand All @@ -21,6 +24,8 @@ def __init__(self, path: Union[pathlib.Path, str]) -> None:
self.path = path
self._config = path.read_text() if path.exists() else ""
self._mode = path.stat().st_mode if path.exists() else 0o600
self._uid = path.stat().st_uid if path.exists() else None
self._gid = path.stat().st_gid if path.exists() else None

def save(self, dry_run: bool) -> None:
"""Save the configuration file."""
Expand All @@ -29,6 +34,8 @@ def save(self, dry_run: bool) -> None:
else:
self.path.write_text(self._config)
self.path.chmod(self._mode)
if self._uid is not None and self._gid is not None:
os.chown(self.path, self._uid, self._gid)
logger.debug(f"Contents of {self.path}:")
logger.debug(self._config)

Expand Down Expand Up @@ -56,6 +63,16 @@ def exists(self) -> bool:

def chmod(self, mode: int) -> None:
self._mode = mode

def chown(self, user: str, group: str) -> None:
"""Change the owner and group of the configuration file.

Args:
user: Username to set as the owner.
group: Group name to set as the group.
"""
self._uid = pwd.getpwnam(user).pw_uid
self._gid = grp.getgrnam(group).gr_gid

def contains(self, key: str) -> bool:
"""Check if the configuration file contains the given key.
Expand All @@ -65,3 +82,13 @@ def contains(self, key: str) -> bool:
Returns: True if the key is found, False otherwise.
"""
return bool(re.search(key, self._config))


class EqualsDelimitedConfigFile(_ConfigFile):
def get(self, key: str) -> str:
value_pattern = rf"{key}\s*=\s*(.*)"
match = re.search(value_pattern, self._config)
if match:
return match.group(1).strip()
else:
return ""
1 change: 1 addition & 0 deletions nilrt_snac/_configs/_opkg_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def __init__(self):
def configure(self, args: argparse.Namespace) -> None:
print("Configuring opkg...")
snac_config_file = _ConfigFile(OPKG_SNAC_CONF)
snac_config_file.chmod(0o644)
base_feeds_config_file = _ConfigFile("/etc/opkg/base-feeds.conf")
dry_run: bool = args.dry_run

Expand Down
Loading