Skip to content

Commit

Permalink
feat: refactor (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
Quadrubo authored Jul 9, 2024
1 parent e29ebb5 commit 99c2352
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 115 deletions.
72 changes: 72 additions & 0 deletions src/backup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import os
from bitwarden import Bitwarden
from notifier import Notifier
from dotenv import load_dotenv
from datetime import datetime


class Backup:
def __init__(self, bitwarden: Bitwarden, notifier: Notifier):
self.bitwarden = bitwarden
self.notifier = notifier

self.export_timestamp = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")

load_dotenv()

self.backup_path = os.getenv("BACKUP_PATH")
self.backup_format = os.getenv("BACKUP_FORMAT")
self.backup_password = os.getenv("BACKUP_PASSWORD")

backup_organizations = os.getenv("BACKUP_ORGANIZATIONS").strip()

if backup_organizations:
self.backup_organizations = backup_organizations.split(",")
else:
self.backup_organizations = []

def generate_export_filename(self, organization_id=None):
filename = "bitwarden-encrypted_" + self.export_timestamp

if organization_id:
filename += "_" + organization_id

filename += ".json"

return filename

def generate_export_path(self, organization_id=None):
return self.backup_path + "/" + self.generate_export_filename(organization_id)

def start(self):
export_files = []

self.notifier.send_start()

# Logout is needed before configuring the server
self.bitwarden.logout()

self.bitwarden.configure_server()
self.bitwarden.login()
self.bitwarden.unlock()

self.bitwarden.export(
self.generate_export_path(),
self.backup_format,
self.backup_password
)

export_files.append(self.generate_export_filename())

for organization_id in self.backup_organizations:
self.bitwarden.export(
self.generate_export_path(organization_id),
self.backup_format,
self.backup_password
)

export_files.append(self.generate_export_filename(organization_id))

self.bitwarden.logout()

self.notifier.send_success(export_files)
92 changes: 54 additions & 38 deletions src/bitwarden.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,85 +2,101 @@
import os
import json
import re
from cli import CLI


class Bitwarden:
def __init__(self, binary_path):
self.binary_path = binary_path
def __init__(self, cli: CLI, server, client_id, client_secret, master_password):
self.cli = cli
self.server = server
self.client_id = client_id
self.client_secret = client_secret
self.master_password = master_password
self.session_key = None

def get_client_environment(self):
if not self.client_id or not self.client_secret:
raise Exception("Client id or client secret not set.")

environment = os.environ.copy()
environment["BW_CLIENTID"] = self.client_id
environment["BW_CLIENTSECRET"] = self.client_secret

return environment

def get_password_environment(self):
if not self.master_password:
raise Exception("Master password not set.")

environment = os.environ.copy()
environment["BW_PASSWORD"] = self.master_password

return environment

def get_session_environment(self):
if (self.session_key):
my_env = os.environ.copy()
my_env["BW_SESSION"] = self.session_key
return my_env
else:
if not self.session_key:
raise Exception("Session key not set. Please login & unlock first.")

def configure_server(self, server_url):
if (self.status() != 'unauthenticated'):
environment = os.environ.copy()
environment["BW_SESSION"] = self.session_key

return environment

def configure_server(self):
if self.status() != 'unauthenticated':
print("Already authenticated. Please logout first.")
return

process = subprocess.run([self.binary_path, 'config', 'server', server_url])
self.cli.execute("config", ["server", self.server])

def status(self):
process = subprocess.run([self.binary_path, 'status'], capture_output=True)
stdout, stderr = self.cli.execute('status')

output = process.stdout.decode('utf-8')
error_output = process.stderr.decode('utf-8')
status = json.loads(stdout)['status']

output_json = json.loads(output)
status = output_json['status']

return status

def login(self, client_id, client_secret):
def login(self):
if self.status() != 'unauthenticated':
print("Already authenticated. Please logout first.")
return

my_env = os.environ.copy()
my_env["BW_CLIENTID"] = client_id
my_env["BW_CLIENTSECRET"] = client_secret
environment = self.get_client_environment()

process = subprocess.run([self.binary_path, 'login', '--apikey'], env=my_env)
self.cli.execute("login", ["--apikey"], environment)

def logout(self):
if self.status() == 'unauthenticated':
print("Already unauthenticated. Please login first.")
return

process = subprocess.run([self.binary_path, 'logout'])
self.cli.execute("logout")

def unlock(self, master_password):
def unlock(self):
if self.status() == 'unlocked':
print("Already unlocked. Please lock first.")
return

my_env = os.environ.copy()
my_env["BW_PASSWORD"] = master_password

process = subprocess.run([self.binary_path, 'unlock', '--passwordenv', 'BW_PASSWORD'], env=my_env, capture_output=True)
environment = self.get_password_environment()

output = process.stdout.decode('utf-8')
error_output = process.stderr.decode('utf-8')
stdout, stderr = self.cli.execute("unlock", ["--passwordenv", "BW_PASSWORD"], environment)

if "Invalid master password." in error_output:
if "Invalid master password." in stderr:
raise Exception("Invalid master password.")

session_key = re.search(r'(?<=BW_SESSION=").*?(?=")', output).group(0)
session_key = re.search(r'(?<=BW_SESSION=").*?(?=")', stdout).group(0)

self.session_key = session_key

def export(self, output, cfg_format, password = None, organization_id = None):
my_env = self.get_session_environment()
def export(self, output, cfg_format, password=None, organization_id=None):
environment = self.get_session_environment()

export_command = [self.binary_path, 'export', '--output', output, '--format', cfg_format]
arguments = ['--output', output, '--format', cfg_format]

if password:
export_command += ['--password', password]
arguments += ['--password', password]

if organization_id:
export_command += ['--organizationid', organization_id]
arguments += ['--organizationid', organization_id]

subprocess.run(export_command, env=my_env, check=True)
self.cli.execute("export", arguments, environment)
20 changes: 20 additions & 0 deletions src/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import subprocess


class CLI:
def __init__(self, binary_path):
self.binary_path = binary_path

def execute(self, command, arguments=None, environment=None):
if arguments is None:
arguments = []

if environment:
process = subprocess.run([self.binary_path, command] + arguments, env=environment, capture_output=True)
else:
process = subprocess.run([self.binary_path, command] + arguments, capture_output=True)

stdout = process.stdout.decode('utf-8')
stderr = process.stderr.decode('utf-8')

return stdout, stderr
104 changes: 27 additions & 77 deletions src/main.py
Original file line number Diff line number Diff line change
@@ -1,104 +1,54 @@
from dotenv import load_dotenv
from notifier import Notifier
from notifiers.ntfy import NTFY
from notifiers.mock import Mock
from bitwarden import Bitwarden
import os
from datetime import datetime
from backup import Backup
from cli import CLI

load_dotenv()

# Path configuration
bw_binary = os.getenv("BW_BINARY")
backup_path = os.getenv("BACKUP_PATH")

# Bitwarden configuration
# authentication
bw_client_id = os.getenv("BW_CLIENT_ID")
bw_client_secret = os.getenv("BW_CLIENT_SECRET")
bw_master_password = os.getenv("BW_MASTER_PASSWORD")
def inject_cli():
bw_binary = os.getenv("BW_BINARY")

# connection
bw_server = os.getenv("BW_SERVER")
return CLI(bw_binary)

# backups
backup_format = os.getenv("BACKUP_FORMAT")
backup_password = os.getenv("BACKUP_PASSWORD")
backup_organizations = os.getenv("BACKUP_ORGANIZATIONS")

# NTFY configuration
ntfy_server = os.getenv("NTFY_SERVER")
ntfy_topic = os.getenv("NTFY_TOPIC")
ntfy_username = os.getenv("NTFY_USERNAME")
ntfy_password = os.getenv("NTFY_PASSWORD")
def inject_bitwarden(cli: CLI):
bw_binary = os.getenv("BW_BINARY")
bw_server = os.getenv("BW_SERVER")
bw_client_id = os.getenv("BW_CLIENT_ID")
bw_client_secret = os.getenv("BW_CLIENT_SECRET")
bw_master_password = os.getenv("BW_MASTER_PASSWORD")

def generate_output_path(path, current_time, organization_id = None):
output_path = backup_path + "/bitwarden-encrypted_" + current_time
return Bitwarden(cli, bw_server, bw_client_id, bw_client_secret, bw_master_password)

if organization_id:
output_path += "_" + organization_id

output_path += ".json"

return output_path

def inject_notifier():
ntfy_server = os.getenv("NTFY_SERVER")
ntfy_topic = os.getenv("NTFY_TOPIC")
ntfy_username = os.getenv("NTFY_USERNAME")
ntfy_password = os.getenv("NTFY_PASSWORD")

if ntfy_server and ntfy_topic:
return NTFY(ntfy_server, ntfy_topic, ntfy_username, ntfy_password)
else:
return Mock()

def main(notifier: Notifier, bitwarden: Bitwarden, config):
notifier.send_start()

# Logout is needed before configuring the server
bitwarden.logout()

bitwarden.configure_server(config["bw_server"])

bitwarden.login(config["bw_client_id"], config["bw_client_secret"])
bitwarden.unlock(config["bw_master_password"])

current_time = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")

output_paths = []

vault_output_path = generate_output_path(config["backup_path"], current_time)

bitwarden.export(vault_output_path, config["backup_format"], config["backup_password"])

output_paths.append(vault_output_path)

if config["backup_organizations"]:
backup_organizations = config["backup_organizations"].split(",")
else:
backup_organizations = []

for organization_id in backup_organizations:
organization_output_path = generate_output_path(config["backup_path"], current_time, organization_id)

bitwarden.export(organization_output_path, config["backup_format"], config["backup_password"], organization_id)

output_paths.append(organization_output_path)
def main():
cli = inject_cli()
bitwarden = inject_bitwarden(cli)
notifier = inject_notifier()

bitwarden.logout()
backup = Backup(
bitwarden,
notifier
)

notifier.send_success(output_paths)
backup.start()


if __name__ == "__main__":
notifier = inject_notifier()
main(
notifier,
Bitwarden(bw_binary),
{
"bw_server": bw_server,
"bw_client_id": bw_client_id,
"bw_client_secret": bw_client_secret,
"bw_master_password": bw_master_password,
"backup_path": backup_path,
"backup_format": backup_format,
"backup_password": backup_password,
"backup_organizations": backup_organizations
}
)
main()

0 comments on commit 99c2352

Please sign in to comment.