Skip to content

Commit

Permalink
Fixed bandit issues. (#48)
Browse files Browse the repository at this point in the history
* Fixed bandit issues.

* Added bandit workflow.

* Fixed tests.

* Check bandit job.

* Bandit test.

* Bandit test.

* Bandit test.

* Bandit test.

* Minor correction.
  • Loading branch information
popovaan authored Oct 6, 2023
1 parent 91f65aa commit cd2db71
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 89 deletions.
51 changes: 51 additions & 0 deletions .github/workflows/bandit.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# This workflow uses actions that are not certified by GitHub.
# They are provided by a third-party and are governed by
# separate terms of service, privacy policy, and support
# documentation.

# Bandit is a security linter designed to find common security issues in Python code.
# This action will run Bandit on your codebase.
# The results of the scan will be found under the Security tab of your repository.

# https://github.com/marketplace/actions/bandit-scan is ISC licensed, by abirismyname
# https://pypi.org/project/bandit/ is Apache v2.0 licensed, by PyCQA

name: Bandit
on:
push:
branches: [ "master" ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ "master" ]
schedule:
- cron: '31 21 * * 6'

jobs:
bandit:
permissions:
contents: read # for actions/checkout to fetch code
security-events: write # for github/codeql-action/upload-sarif to upload SARIF results
actions: read # only required for a private repository by github/codeql-action/upload-sarif to get the Action run status

runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Bandit Scan
uses: shundor/python-bandit-scan@9cc5aa4a006482b8a7f91134412df6772dbda22c
with: # optional arguments
# exit with 0, even with results found
exit_zero: false # optional, default is DEFAULT
# Github token of the repository (automatically created by Github)
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} # Needed to get PR information.
# File or directory to run bandit on
# path: # optional, default is .
# Report only issues of a given severity level or higher. Can be LOW, MEDIUM or HIGH. Default is UNDEFINED (everything)
# level: # optional, default is UNDEFINED
# Report only issues of a given confidence level or higher. Can be LOW, MEDIUM or HIGH. Default is UNDEFINED (everything)
# confidence: # optional, default is UNDEFINED
# comma-separated list of paths (glob patterns supported) to exclude from scan (note that these are in addition to the excluded paths provided in the config file) (default: .svn,CVS,.bzr,.hg,.git,__pycache__,.tox,.eggs,*.egg)
# excluded_paths: # optional, default is DEFAULT
# comma-separated list of test IDs to skip
# skips: # optional, default is DEFAULT
# path to a .bandit file that supplies command line arguments
# ini_path: # optional, default is DEFAULT
9 changes: 7 additions & 2 deletions src/backend/backend_ga.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,13 @@ def send(self, message: Message):
message.attrs['cid'] = str(uuid.uuid4())
try:
data = parse.urlencode(message.attrs).encode()
req = request.Request(self.backend_url, data=data)
request.urlopen(req)

if self.backend_url.lower().startswith('http'):
req = request.Request(self.backend_url, data=data)
else:
raise ValueError("Incorrect backend URL.")

request.urlopen(req) #nosec
except Exception as err:
log.warning("Failed to send event with the following error: {}".format(err))

Expand Down
18 changes: 10 additions & 8 deletions src/backend/backend_ga4.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@

from .backend import TelemetryBackend
from ..utils.cid import get_or_generate_cid, remove_cid_file
from ..utils.params import telemetry_params


class GA4Backend(TelemetryBackend):
api_secret = "zbEMioDXSE-MUHHPLRIi0A"
id = 'ga4'
cid_filename = 'openvino_ga_cid'
old_cid_filename = 'openvino_ga_uid'
Expand All @@ -24,7 +24,7 @@ def __init__(self, tid: str = None, app_name: str = None, app_version: str = Non
self.session_id = None
self.cid = None
self.backend_url = "https://www.google-analytics.com/mp/collect?measurement_id={}&api_secret={}".format(
self.measurement_id, self.api_secret)
self.measurement_id, telemetry_params["api_key"])
self.default_message_attrs = {
'app_name': self.app_name,
'app_version': self.app_version,
Expand All @@ -35,8 +35,13 @@ def send(self, message: dict):
return
try:
data = json.dumps(message).encode()
req = request.Request(self.backend_url, data=data)
request.urlopen(req)

if self.backend_url.lower().startswith('http'):
req = request.Request(self.backend_url, data=data)
else:
raise ValueError("Incorrect backend URL.")

request.urlopen(req) #nosec
except Exception as err:
log.warning("Failed to send event with the following error: {}".format(err))

Expand Down Expand Up @@ -79,10 +84,6 @@ def build_error_message(self, category: str, error_msg: str, **kwargs):
def build_stack_trace_message(self, category: str, error_msg: str, **kwargs):
return self.build_event_message(category, "stack_trace", error_msg, 1)

def remove_client_id_file(self):
self.cid = None
remove_cid_file(self.cid_filename)

def generate_new_cid_file(self):
self.cid = get_or_generate_cid(self.cid_filename, lambda: str(uuid.uuid4()), is_valid_cid, self.old_cid_filename)

Expand All @@ -95,6 +96,7 @@ def generate_new_session_id(self):
def remove_cid_file(self):
self.cid = None
remove_cid_file(self.cid_filename)
remove_cid_file(self.old_cid_filename)


def is_valid_cid(cid: str):
Expand Down
3 changes: 2 additions & 1 deletion src/main_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ def test_check_by_cmd_line_if_dialog_needed(self):

@patch('sys.argv', [app_name])
def check():
assert Telemetry("a", "b", "c").check_by_cmd_line_if_dialog_needed() is expected_result
if not Telemetry("a", "b", "c").check_by_cmd_line_if_dialog_needed() is expected_result:
raise Exception("Expected result is not equal to dialog result.")

check()

Expand Down
140 changes: 62 additions & 78 deletions src/utils/opt_in_checker_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@

import json
import os
import stat
import unittest
from platform import system
from tempfile import TemporaryDirectory
from unittest.mock import MagicMock

from .opt_in_checker import OptInChecker, ConsentCheckResult
Expand All @@ -15,116 +17,98 @@ class OptInCheckerTest(unittest.TestCase):
test_directory = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'test_opt_in')
test_subdir = 'test_opt_in_subdir'

def init_opt_in_checker(self):
self.remove_test_subdir()
self.opt_in_checker.consent_file_base_dir = MagicMock(return_value=self.test_directory)
def init_opt_in_checker(self, test_directory):
self.opt_in_checker.consent_file_base_dir = MagicMock(return_value=test_directory)
self.opt_in_checker.consent_file_subdirectory = MagicMock(return_value=self.test_subdir)
self.opt_in_checker._check_input_is_terminal = MagicMock(return_value=True)
self.opt_in_checker._check_main_process = MagicMock(return_value=True)
if not os.path.exists(self.test_directory):
os.mkdir(os.path.join(self.test_directory))
test_subdir = os.path.join(self.test_directory, self.test_subdir)
if not os.path.exists(test_subdir):
os.mkdir(test_subdir)

def remove_test_subdir(self):
test_subdir = os.path.join(self.test_directory, self.test_subdir)
if os.path.exists(self.opt_in_checker.consent_file()):
os.remove(self.opt_in_checker.consent_file())
if os.path.exists(test_subdir):
os.chmod(test_subdir, 0o777)
os.rmdir(test_subdir)
if os.path.exists(self.test_directory):
os.chmod(self.test_directory, 0o777)
os.rmdir(os.path.join(self.test_directory))

def test_subdir_no_writable(self):
# At Windows chmod() sets only the file’s read-only flag. All other bits are ignored.
# https://docs.python.org/3/library/os.html#os.chmod
# For this reason this test cannot be run on Windows.
if system() == 'Windows':
return
self.init_opt_in_checker()
test_subdir = os.path.join(self.test_directory, self.test_subdir)
if not os.path.exists(test_subdir):
os.mkdir(test_subdir)
os.chmod(test_subdir, 0o444)
with TemporaryDirectory(prefix=self.test_directory) as test_dir:
self.init_opt_in_checker(test_dir)
test_subdir = os.path.join(test_dir, self.test_subdir)
if not os.path.exists(test_subdir):
os.mkdir(test_subdir)
os.chmod(test_subdir, 0o444)

self.assertTrue(self.opt_in_checker.check(enable_opt_in_dialog=False) == ConsentCheckResult.NO_FILE)
self.assertTrue(self.opt_in_checker.check(enable_opt_in_dialog=True) == ConsentCheckResult.NO_FILE)
self.assertTrue(self.opt_in_checker.create_or_check_consent_dir() is False)

self.assertTrue(self.opt_in_checker.check(enable_opt_in_dialog=False) == ConsentCheckResult.NO_FILE)
self.assertTrue(self.opt_in_checker.create_or_check_consent_dir() is False)
self.remove_test_subdir()

def test_dir_no_writable(self):
# At Windows chmod() sets only the file’s read-only flag. All other bits are ignored.
# https://docs.python.org/3/library/os.html#os.chmod
# For this reason this test cannot be run on Windows.
if system() == 'Windows':
return
self.init_opt_in_checker()
os.chmod(self.test_directory, 0o444)
with TemporaryDirectory(prefix=self.test_directory) as test_dir:
self.init_opt_in_checker(test_dir)
os.chmod(test_dir, 0o444)

self.assertTrue(self.opt_in_checker.check(enable_opt_in_dialog=False) == ConsentCheckResult.NO_FILE)
self.assertTrue(self.opt_in_checker.create_or_check_consent_dir() is False)
os.chmod(self.test_directory, 0o777)
self.remove_test_subdir()
self.assertTrue(self.opt_in_checker.check(enable_opt_in_dialog=False) == ConsentCheckResult.NO_FILE)
self.assertTrue(self.opt_in_checker.check(enable_opt_in_dialog=True) == ConsentCheckResult.NO_FILE)
self.assertTrue(self.opt_in_checker.create_or_check_consent_dir() is False)

def test_subdir_is_file(self):
self.init_opt_in_checker()
test_subdir = os.path.join(self.test_directory, self.test_subdir)
if os.path.exists(test_subdir):
os.rmdir(test_subdir)
open(test_subdir, 'w').close()
os.chmod(test_subdir, 0o444)

self.assertTrue(self.opt_in_checker.check(enable_opt_in_dialog=False) == ConsentCheckResult.NO_FILE)
with TemporaryDirectory(prefix=self.test_directory) as test_dir:
self.init_opt_in_checker(test_dir)
test_subdir = os.path.join(test_dir, self.test_subdir)
if os.path.exists(test_subdir):
os.rmdir(test_subdir)
open(test_subdir, 'w').close()
os.chmod(test_subdir, 0o444)

# Linux allows delete read-only files, while Windows doesn't
if system() == 'Windows':
self.assertTrue(self.opt_in_checker.create_or_check_consent_dir() is False)
os.chmod(test_subdir, 0o777)
os.remove(test_subdir)
else:
self.assertTrue(self.opt_in_checker.create_or_check_consent_dir() is True)
self.assertTrue(self.opt_in_checker.check(enable_opt_in_dialog=False) == ConsentCheckResult.NO_FILE)
self.assertTrue(self.opt_in_checker.check(enable_opt_in_dialog=True) == ConsentCheckResult.NO_FILE)

# Linux allows delete read-only files, while Windows doesn't
if system() == 'Windows':
self.assertTrue(self.opt_in_checker.create_or_check_consent_dir() is False)
else:
self.assertTrue(self.opt_in_checker.create_or_check_consent_dir() is True)

self.remove_test_subdir()

def test_incorrect_control_file_format(self):
self.init_opt_in_checker()
with open(self.opt_in_checker.consent_file(), 'w') as file:
file.write("{ abc")
result = self.opt_in_checker.check(enable_opt_in_dialog=False)
self.assertTrue(result == ConsentCheckResult.DECLINED)
self.remove_test_subdir()
with TemporaryDirectory(prefix=self.test_directory) as test_dir:
self.init_opt_in_checker(test_dir)
os.mkdir(os.path.join(test_dir, self.test_subdir))
with open(self.opt_in_checker.consent_file(), 'w') as file:
file.write("{ abc")
self.assertTrue(self.opt_in_checker.check(enable_opt_in_dialog=False) == ConsentCheckResult.DECLINED)
self.assertTrue(self.opt_in_checker.check(enable_opt_in_dialog=True) == ConsentCheckResult.DECLINED)

def test_incorrect_result_value(self):
self.init_opt_in_checker()
with open(self.opt_in_checker.consent_file(), 'w') as file:
content = {'opt_in': 312}
json.dump(content, file)
result = self.opt_in_checker.check(enable_opt_in_dialog=False)
self.assertTrue(result == ConsentCheckResult.DECLINED)
self.remove_test_subdir()
with TemporaryDirectory(prefix=self.test_directory) as test_dir:
self.init_opt_in_checker(test_dir)
os.mkdir(os.path.join(test_dir, self.test_subdir))
with open(self.opt_in_checker.consent_file(), 'w') as file:
content = {'opt_in': 312}
json.dump(content, file)
self.assertTrue(self.opt_in_checker.check(enable_opt_in_dialog=False) == ConsentCheckResult.DECLINED)
self.assertTrue(self.opt_in_checker.check(enable_opt_in_dialog=True) == ConsentCheckResult.DECLINED)

def test_subdirectory_does_not_exist(self):
self.init_opt_in_checker()
test_subdir = os.path.join(self.test_directory, self.test_subdir)
os.rmdir(test_subdir)
self.assertTrue(self.opt_in_checker.create_or_check_consent_dir() is True)
self.remove_test_subdir()
with TemporaryDirectory(prefix=self.test_directory) as test_dir:
self.init_opt_in_checker(test_dir)
self.assertTrue(self.opt_in_checker.create_or_check_consent_dir() is True)

def test_base_directory_does_not_exist(self):
self.init_opt_in_checker()
test_subdir = os.path.join(self.test_directory, self.test_subdir)
os.rmdir(test_subdir)
os.rmdir(self.test_directory)
self.init_opt_in_checker(self.test_directory)
self.assertTrue(self.opt_in_checker.create_or_check_consent_dir() is False)
self.remove_test_subdir()

def test_send_telemetry_from_non_cmd_tool(self):
self.init_opt_in_checker()
self.opt_in_checker._check_input_is_terminal = MagicMock(return_value=False)
with open(self.opt_in_checker.consent_file(), 'w') as file:
file.write("1")
result = self.opt_in_checker.check(enable_opt_in_dialog=False)
self.assertTrue(result == ConsentCheckResult.ACCEPTED)
self.remove_test_subdir()
with TemporaryDirectory(prefix=self.test_directory) as test_dir:
self.init_opt_in_checker(test_dir)
self.opt_in_checker._check_input_is_terminal = MagicMock(return_value=False)
os.mkdir(os.path.join(test_dir, self.test_subdir))
with open(self.opt_in_checker.consent_file(), 'w') as file:
file.write("1")
self.assertTrue(self.opt_in_checker.check(enable_opt_in_dialog=True) == ConsentCheckResult.ACCEPTED)
self.assertTrue(self.opt_in_checker.check(enable_opt_in_dialog=False) == ConsentCheckResult.ACCEPTED)
6 changes: 6 additions & 0 deletions src/utils/params.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Copyright (C) 2018-2023 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

telemetry_params = {
"api_key": "zbEMioDXSE-MUHHPLRIi0A"
}

0 comments on commit cd2db71

Please sign in to comment.