Skip to content

Commit

Permalink
chore(lint): integrate Ruff and formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
Lowaiz committed Aug 24, 2023
1 parent feb6b68 commit b70accb
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 58 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
- name: Check with black + isort
if: always()
run: hatch run style:format && git diff --exit-code
- name: Check with flake8
- name: Check with ruff
if: always()
run: hatch run style:lint
- name: Check with mypy
Expand Down
36 changes: 20 additions & 16 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,6 @@ dependencies = [
"httpx >=0.24.1",
]

[project.optional-dependencies]
dev = [
"black >=23.7.0",
"commitizen >=3.6.0",
"isort >=5.9.3",
"mypy >=0.910",
]
test = [
"pytest~=6.2.5",
]

[tool.hatch.version]
path = "src/vaultwarden/__version__.py"

Expand Down Expand Up @@ -102,23 +91,39 @@ detached = true
dependencies = [
"black",
"isort",
"flake8",
"flake8-type-checking",
"ruff",
]
[tool.hatch.envs.style.scripts]
lint = [
"flake8 src/vaultwarden",
"ruff check --fix src/vaultwarden",
]
check = [
"isort --check-only --diff src/vaultwarden",
"black -q --check --diff src/vaultwarden",
"lint",
"ruff check src/vaultwarden",
]
format = [
"isort -q src/vaultwarden",
"black -q src/vaultwarden",
"lint"
]

[tool.ruff]
# Add "Q" to the list of enabled codes.
select = ["B", "E", "F", "I", "N", "Q", "RUF", "SIM", "TCH"]
fixable = ["ALL"]
src = ["src/vaultwarden", "tests"]
target-version = "py310"
line-length = 79

[tool.ruff.flake8-quotes]
docstring-quotes = "double"

[tool.ruff.flake8-bugbear]
extend-immutable-calls = ["typer.Argument"]

[tool.ruff.isort]
force-sort-within-sections = true

[tool.black]
line-length = 79
Expand All @@ -128,7 +133,6 @@ target-version = ["py310", "py311"]
profile = "black"
line_length = 80


[tool.mypy]
ignore_missing_imports = true
warn_unreachable = true
Expand Down
43 changes: 23 additions & 20 deletions src/vaultwarden/clients/bitwarden.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
# Class Bitwarden client with a httpx client
from typing import TYPE_CHECKING, Any, List, Literal, Optional

if TYPE_CHECKING:
from uuid import UUID
from typing import Any, List, Literal, Optional
from uuid import UUID

from bitwardentools import caseinsentive_key_search
from bitwardentools.crypto import decrypt, encrypt, make_master_key
from httpx import Client, HTTPError, Response

from vaultwarden.models.api_models import ApiToken
from vaultwarden.models.exception_models import BitwardenException
from vaultwarden.models.exception_models import BitwardenError
from vaultwarden.utils.logger import logger
from vaultwarden.utils.tools import (
get_collection_id_from_ditcs,
Expand All @@ -32,7 +30,7 @@ def __init__(
if not all(
[url, email, password, client_id, client_secret, device_id]
):
raise BitwardenException("All parameters are required")
raise BitwardenError("All parameters are required")
self.email = email
self.password = password
self.client_id = client_id
Expand Down Expand Up @@ -138,8 +136,8 @@ def get_organization_user_details(
params={"includeCollections": True, "includeGroups": True},
).json()

# Get all users in an organization. Returns a dict with email as key and user details as value.
# If raw is True, returns the raw json response
# Get all users in an organization. Returns a dict with email as key and
# user details as value. If raw is True, returns the raw json response
def get_organization_users(self, organization_id: str, raw=False):
users = (
self._api_request(
Expand All @@ -158,29 +156,30 @@ def get_org_key(self, org_id):
sync = self.get_sync()
profile = sync.get("Profile", None)
if profile is None:
raise BitwardenException("No profile in Sync")
raise BitwardenError("No profile in Sync")
orgs = profile.get("Organizations", None)
if orgs is None:
raise BitwardenException("No Organizations in Sync[Profile]")
raise BitwardenError("No Organizations in Sync[Profile]")
raw_key = None
for org in orgs:
if org.get("Id") == org_id:
raw_key = org.get("Key")
break
if raw_key is not None:
return decrypt(raw_key, self.api_token.get("orgs_key"))
raise BitwardenException(f"No Organizations `{org_id}` found")
raise BitwardenError(f"No Organizations `{org_id}` found")

def deduplicate_collection(self, organization_id):
"""
Deduplicate collections with the same name in a given org, by moving users
and secrets into the bigger (by user count) collection
Deduplicate collections with the same name in a given org, by moving
users and secrets into the bigger (by user count) collection
"""
collections = self.get_organization_collections(organization_id)

seen_names = {}
duplicated = {}
# List duplicated collections, indexed on the name, referencing a list of collections id
# List duplicated collections, indexed on the name, referencing a
# list of collections id
for name, collection in collections:
if seen_names.get(name) is None:
seen_names[name] = collection["Id"]
Expand All @@ -194,8 +193,8 @@ def deduplicate_collection(self, organization_id):
base_id = None
nb_users = 0
users = {}
# Building user list by merging all accesses and choose which collection will not be deleted
# (based on most users)
# Building user list by merging all accesses and choose which
# collection will not be deleted (based on most users)
for collection_id in id_list:
coll_users = self.get_users_of_collection(
organization_id, collection_id
Expand All @@ -207,12 +206,14 @@ def deduplicate_collection(self, organization_id):

users_list = list(users.values())

# Removing the chosen collection from the list of collection to delete
# Removing the chosen collection from the list of collection to
# delete
id_list.remove(base_id)
self.set_users_of_collection(organization_id, base_id, users_list)

for collection_id in id_list:
# List items inside the current collection and move these items to the chosen collection
# List items inside the current collection and move these
# items to the chosen collection
ciphers = self.get_organizations_collection_items(
organization_id, collection_id
)
Expand Down Expand Up @@ -373,7 +374,8 @@ def create_collection(
collections_ids[data["Id"]] = data
return data

# Return 2 dicts: one indexed by name, one indexed by id, both containing the collections details
# Return 2 dicts: one indexed by name, one indexed by id,
# both containing the collections details
def get_organization_collections_dicts(self, org_id):
resp = self._api_request(
"GET", f"api/organizations/{org_id}/collections"
Expand Down Expand Up @@ -543,7 +545,8 @@ def invite_organisation(self, org_id, user_accesses, email):
"POST", f"api/organizations/{org_id}/users/invite", json=data
)

# Re-invite a user with the same accesses. Return True if at least one organization has been re-invited
# Re-invite a user with the same accesses. Return True if at least one
# organization has been re-invited
def invite_with_accesses(self, organizations, email):
logger.info("Re-invite with accesses")
for org_id, accesses in organizations.items():
Expand Down
35 changes: 16 additions & 19 deletions src/vaultwarden/clients/vaultwarden.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,21 @@
import http
from typing import TYPE_CHECKING, Any, Literal, Optional

if TYPE_CHECKING:
from http.cookiejar import Cookie
from http.cookiejar import Cookie
from typing import Any, Literal, Optional

from httpx import Client, HTTPStatusError, Response

from vaultwarden.models.exception_models import VaultwardenAdminException
from vaultwarden.clients.bitwarden import BitwardenClient
from vaultwarden.models.api_models import VaultWardenUser
from vaultwarden.models.exception_models import VaultwardenAdminError
from vaultwarden.utils.logger import logger
from vaultwarden.utils.tools import log_raise_for_status

if TYPE_CHECKING:
from vaultwarden.clients.bitwarden import BitwardenClient
from vaultwarden.models.api_models import VaultWardenUser


class VaultwardenAdminClient:
def __init__(self, url: str, admin_secret_token: str, preload_users: bool):
# If url or admin_secret_token is None, raise an exception
if not url or not admin_secret_token:
raise VaultwardenAdminException(
"Missing url or admin_secret_token"
)
raise VaultwardenAdminError("Missing url or admin_secret_token")
self.admin_secret_token = admin_secret_token
self.url = url.strip("/")
self._http_client = Client(
Expand Down Expand Up @@ -54,12 +48,13 @@ def _admin_request(
self, method: Literal["GET", "POST"], path: str, **kwargs: Any
) -> Response:
self._admin_login()
return self._http_client.request(method, path, **kwargs) # type: ignore
return self._http_client.request(method, path, **kwargs)

def _fill_id_mail_pool(self, users: list[VaultWardenUser]) -> None:
"""Cache the email->GUID mapping for the given users
Necessary since Vaultwarden does not offer a search or query-by-email endpoint
Necessary since Vaultwarden does not offer a search or
query-by-email endpoint
"""
self._id_mail_pool |= {u["Email"]: u["Id"] for u in users}

Expand Down Expand Up @@ -90,7 +85,7 @@ def set_user_enabled(self, identifier: str, enabled: bool) -> None:
def remove_2fa(self, email: str) -> None:
user = self.get_user(email)
if user is None:
raise VaultwardenAdminException(f"User {email} not found")
raise VaultwardenAdminError(f"User {email} not found")
self._admin_request(
"POST", f"users/{email}/remove-2fa"
).raise_for_status()
Expand All @@ -107,8 +102,8 @@ def get_user(self, search: str) -> VaultWardenUser:
if search in self._id_mail_pool:
search = self._id_mail_pool[search]
elif "@" in search:
# search is not a UUID (probably an email) but wasn't found in cache
raise VaultwardenAdminException(f"User {search} not found")
# search is not UUID (probably an email) but wasn't found in cache
raise VaultwardenAdminError(f"User {search} not found")
# else assume it's a UUID
resp = self._admin_request("GET", f"users/{search}")
resp.raise_for_status()
Expand All @@ -129,14 +124,16 @@ def reset_account(self, email: str, bitwarden_client: BitwardenClient):
)
if warning:
check = input(
"WARNING: A organisation where you where present is not maintain by SOC account\n"
"WARNING: A organisation where you where present is not "
"maintain by SOC account\n"
"Press 'yes' if you still want to reset the account"
)
if check != "yes":
logger.warning("Cancelling the reset")
return
logger.warning(
f"Doing reset on {email} despite having not complete information on its accesses"
f"Doing reset on {email} despite having not complete "
f"information on its accesses"
)
self.delete(user["Id"])
bitwarden_client.invite_with_accesses(accesses, user.get("Email"))
Expand Down
4 changes: 2 additions & 2 deletions src/vaultwarden/models/exception_models.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
class BitwardenException(Exception):
class BitwardenError(Exception):
"""BitwardenClient Exception class"""

pass


class VaultwardenAdminException(Exception):
class VaultwardenAdminError(Exception):
"""VaultwardenAdminClient Exception class"""

pass

0 comments on commit b70accb

Please sign in to comment.