Skip to content

Commit

Permalink
Merge pull request #233 from sudiptob2/feat/231/service-account-crawl…
Browse files Browse the repository at this point in the history
…-factory

Feat/231/service account crawl factory
  • Loading branch information
0xDeva authored Jun 27, 2023
2 parents ba8dd9a + 9f6599a commit b8c75d6
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 36 deletions.
34 changes: 0 additions & 34 deletions src/gcp_scanner/crawl.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,37 +206,3 @@ def get_sas_for_impersonation(
list_of_sas.append(account_name)

return list_of_sas


def get_service_accounts(project_name: str,
service: discovery.Resource) -> List[Tuple[str, str]]:
"""Retrieve a list of service accounts managed in the project.
Args:
project_name: A name of a project to query info about.
service: A resource object for interacting with the IAM API.
Returns:
A list of service accounts managed in the project.
"""

logging.info("Retrieving SA list %s", project_name)
service_accounts = []

name = f"projects/{project_name}"

try:
request = service.projects().serviceAccounts().list(name=name)
while request is not None:
response = request.execute()
service_accounts = [(service_account["email"],
service_account.get("description",""))
for service_account in response.get("accounts",[])]

request = service.projects().serviceAccounts().list_next(
previous_request=request, previous_response=response)
except Exception:
logging.info("Failed to retrieve SA list for project %s", project_name)
logging.info(sys.exc_info())

return service_accounts
2 changes: 2 additions & 0 deletions src/gcp_scanner/crawler/crawler_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from gcp_scanner.crawler.kms_keys_crawler import KMSKeysCrawler
from gcp_scanner.crawler.machine_images_crawler import ComputeMachineImagesCrawler
from gcp_scanner.crawler.pubsub_subscriptions_crawler import PubSubSubscriptionsCrawler
from gcp_scanner.crawler.service_accounts_crawler import ServiceAccountsCrawler
from gcp_scanner.crawler.service_usage_crawler import ServiceUsageCrawler
from gcp_scanner.crawler.source_repo_crawler import CloudSourceRepoCrawler
from gcp_scanner.crawler.spanner_instances_crawler import SpannerInstancesCrawler
Expand All @@ -61,6 +62,7 @@
"project_list": CloudResourceManagerProjectListCrawler,
"pubsub_subs": PubSubSubscriptionsCrawler,
"services": ServiceUsageCrawler,
"service_accounts": ServiceAccountsCrawler,
"sourcerepos": CloudSourceRepoCrawler,
"spanner_instances": SpannerInstancesCrawler,
"sql_instances": SQLInstancesCrawler,
Expand Down
55 changes: 55 additions & 0 deletions src/gcp_scanner/crawler/service_accounts_crawler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import sys
from typing import List, Dict, Any

from googleapiclient import discovery

from gcp_scanner.crawler.interface_crawler import ICrawler


class ServiceAccountsCrawler(ICrawler):
"""Handle crawling of service accounts data."""

def crawl(self, project_name: str, service: discovery.Resource) -> List[Dict[str, Any]]:
"""Retrieve a list of service accounts in the project.
Args:
project_name: The name of the project to query information about.
service: A resource object for interacting with the GCP API.
Returns:
A list of resource objects representing the crawled data.
"""
logging.info("Retrieving SA list %s", project_name)
service_accounts = []

name = f"projects/{project_name}"

try:
request = service.projects().serviceAccounts().list(name=name)
while request is not None:
response = request.execute()
service_accounts = [(service_account["email"],
service_account.get("description", ""))
for service_account in response.get("accounts", [])]

request = service.projects().serviceAccounts().list_next(
previous_request=request, previous_response=response)
except Exception:
logging.info("Failed to retrieve SA list for project %s", project_name)
logging.info(sys.exc_info())

return service_accounts
4 changes: 3 additions & 1 deletion src/gcp_scanner/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,9 @@ def crawl_loop(initial_sa_tuples: List[Tuple[str, Credentials, List[str]]],

if is_set(scan_config, 'service_accounts'):
# Get service accounts
project_service_accounts = crawl.get_service_accounts(
project_service_accounts = CrawlerFactory.create_crawler(
'service_accounts',
).crawl(
project_number,
ClientFactory.get_client('iam').get_service(
credentials,
Expand Down
10 changes: 9 additions & 1 deletion src/gcp_scanner/test_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
from .crawler.kms_keys_crawler import KMSKeysCrawler
from .crawler.machine_images_crawler import ComputeMachineImagesCrawler
from .crawler.pubsub_subscriptions_crawler import PubSubSubscriptionsCrawler
from .crawler.service_accounts_crawler import ServiceAccountsCrawler
from .crawler.service_usage_crawler import ServiceUsageCrawler
from .crawler.source_repo_crawler import CloudSourceRepoCrawler
from .crawler.spanner_instances_crawler import SpannerInstancesCrawler
Expand Down Expand Up @@ -728,7 +729,9 @@ def test_service_accounts(self):
"""Test service accounts."""
self.assertTrue(
verify(
crawl.get_service_accounts(
CrawlerFactory.create_crawler(
"service_accounts",
).crawl(
PROJECT_NAME,
ClientFactory.get_client("iam").get_service(
self.credentials,
Expand Down Expand Up @@ -1007,6 +1010,11 @@ def test_create_crawler_endpoints(self):
crawler = CrawlerFactory.create_crawler("endpoints")
self.assertIsInstance(crawler, EndpointsCrawler)

def test_create_crawler_service_accounts(self):
"""Test create_crawler method with 'service_accounts' name."""
crawler = CrawlerFactory.create_crawler("service_accounts")
self.assertIsInstance(crawler, ServiceAccountsCrawler)

def test_create_crawler_invalid(self):
"""Test create_crawler method with invalid name."""
with self.assertLogs(level=logging.ERROR) as log:
Expand Down

0 comments on commit b8c75d6

Please sign in to comment.