Skip to content

Commit

Permalink
Fixed style issue
Browse files Browse the repository at this point in the history
Fixed pylint issue across misc test repo

Signed-off-by: Praveen K Pandey <[email protected]>
  • Loading branch information
PraveenPenguin committed Nov 29, 2022
1 parent 0cf1783 commit 217211a
Show file tree
Hide file tree
Showing 65 changed files with 281 additions and 174 deletions.
32 changes: 17 additions & 15 deletions cpu/cpupower_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def setUp(self):
for line in output.splitlines():
if 'Available idle states: ' in line:
self.states_list = (line.split('Available idle states: ')[-1])\
.split()
.split()
break
self.log.info("Idle states on the system are: %s" % self.states_list)

Expand Down Expand Up @@ -79,7 +79,6 @@ def check_zero_nonzero(self, stop_state_index):
return 0

def test_workload(self):

"""
This test covers:
1. Collect cpupower monitor output.
Expand Down Expand Up @@ -113,11 +112,10 @@ def test_workload(self):
zero_nonzero = zero_nonzero + self.check_zero_nonzero(i + 1)
if not zero_nonzero:
self.log.info("cpus have not entered idle states after killing"
" ebizzy workload")
" ebizzy workload")
self.log.info("cpus have entered idle states after killing work load")

def test_disable_idlestate(self):

"""
1. Collect list of supported idle states.
2. Disable first idle statei, check cpus have not entered this state.
Expand All @@ -137,7 +135,6 @@ def test_disable_idlestate(self):

@skipIf("powerpc" not in cpu.get_arch(), "Skip, SMT specific tests")
def test_idlestate_smt(self):

"""
1. Set smt mode to off.
2. Run test_workload.
Expand All @@ -152,7 +149,6 @@ def test_idlestate_smt(self):
process.run('ppc64_cpu --smt=on', shell=True)

def test_idlestate_single_core(self):

"""
1. Set single core online.
2. Run test_workload.
Expand All @@ -167,32 +163,38 @@ def test_idlestate_single_core(self):
process.run('ppc64_cpu --smt=on', shell=True)

def test_idle_info(self):

"""
This test verifies cpupower idle-info with different smt states.
Prints the duration for which CPU is in snooze and CEDE state.
"""

process.run('cpupower -c all idle-info', shell=True)
for i in [1,2,4]:
for i in [1, 2, 4]:
process.run('ppc64_cpu --smt=%s' % i, shell=True)
process.run('ppc64_cpu --smt', shell=True)
output = process.system_output('cpupower -c %s idle-info | grep offline' % i, shell=True).split()
output = process.system_output(
'cpupower -c %s idle-info | grep offline' % i, shell=True).split()
if "offline" not in str(output[1]):
self.fail("cpupower tool verification with smt=%s failed" % i)
process.run('ppc64_cpu --smt=on', shell=True)
process.run('ppc64_cpu --cores-on=1', shell=True)
process.run('cpupower -c all idle-info', shell=True)
process.run('ppc64_cpu --cores-on=all', shell=True)
process.run('cpupower -c all idle-info', shell=True)
self.nr_cpus = process.system_output("lscpu | grep ^'CPU(s):'", shell=True).split()
self.nr_cpus = process.system_output(
"lscpu | grep ^'CPU(s):'", shell=True).split()
for i in range(int(self.nr_cpus[1])):
duration_init = process.system_output('cpupower -c %s idle-info | grep Duration' % i, shell=True).split()
duration_init = process.system_output(
'cpupower -c %s idle-info | grep Duration' % i, shell=True).split()
time.sleep(5)
duration_final = process.system_output('cpupower -c %s idle-info | grep Duration' % i, shell=True).split()
duration_final = process.system_output(
'cpupower -c %s idle-info | grep Duration' % i, shell=True).split()
duration_snooze = int(duration_final[1]) - int(duration_init[1])
self.log.info("CPU%s has entered snooze state for %s microseconds in 2 seconds" % (i, duration_snooze))
self.log.info("CPU%s has entered snooze state for %s microseconds in 2 seconds" % (
i, duration_snooze))
duration_CEDE = int(duration_final[3]) - int(duration_init[3])
self.log.info("CPU%s has entered CEDE state for %s microseconds in 2 seconds" % (i, duration_CEDE))
self.log.info("CPU%s has entered CEDE state for %s microseconds in 2 seconds" % (
i, duration_CEDE))
if (duration_snooze == 0) and (duration_CEDE == 0):
self.fail("CPU%s has not entered snooze or CEDE state even in idle state" % i)
self.fail(
"CPU%s has not entered snooze or CEDE state even in idle state" % i)
3 changes: 2 additions & 1 deletion cpu/linsched.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ def setUp(self):
"archive/refs/heads/master.zip"
tarball = self.fetch_asset("linsched.zip", locations=url, expire='7d')
archive.extract(tarball, self.workdir)
self.sourcedir = os.path.join(self.workdir, 'linux-scheduler-testing-master')
self.sourcedir = os.path.join(
self.workdir, 'linux-scheduler-testing-master')

os.chdir(self.sourcedir)
fix_patch = 'patch -p1 < %s' % self.get_data('fix.patch')
Expand Down
3 changes: 2 additions & 1 deletion cpu/producer_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ def test(self):

cache_size = self.params.get('cache_size')
if not cache_size:
iteration_length = self.params.get('iteration_length', default=1024)
iteration_length = self.params.get(
'iteration_length', default=1024)
args = '-p %s -c %s -r %s -l %s -t %s' % (pcpu, ccpu, random_seed,
iteration_length, runtime)
else:
Expand Down
2 changes: 1 addition & 1 deletion dlpar/dlpar_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def test_dlpar(self):
'sha_cpu_fold_workload'):
dlpar_type_flag = "cpu_fold"
self.log.info(
"CPU folding Workload: Calling ./dlpar_workload_setup.py")
"CPU folding Workload: Calling ./dlpar_workload_setup.py")
test_cmd = './dlpar_workload_setup.py'
self.run_cmd(test_cmd, "cpu_fold")
self.dlpar_engine()
Expand Down
6 changes: 4 additions & 2 deletions fs/blktests.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ def setUp(self):
smm = SoftwareManager()
dist = distro.detect()
if dist.name in ['Ubuntu', 'debian']:
packages = ['gcc', 'make', 'util-linux', 'fio', 'libdevmapper-dev', 'g++']
packages = ['gcc', 'make', 'util-linux',
'fio', 'libdevmapper-dev', 'g++']
else:
packages = ['gcc', 'make', 'util-linux', 'fio', 'device-mapper', 'gcc-c++']
packages = ['gcc', 'make', 'util-linux',
'fio', 'device-mapper', 'gcc-c++']

for package in packages:
if not smm.check_installed(package) and not smm.install(package):
Expand Down
3 changes: 2 additions & 1 deletion fs/xfstests.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ def setup_nvdimm(self):
self.log_scratch = None
self.plib.create_namespace(region=self.region, size=dev_size)
self.plib.create_namespace(region=self.region, size=dev_size)
namespaces = self.plib.run_ndctl_list('-N -r %s -m fsdax' % self.region)
namespaces = self.plib.run_ndctl_list(
'-N -r %s -m fsdax' % self.region)
pmem_dev = self.plib.run_ndctl_list_val(namespaces[0], 'blockdev')
self.test_dev = "/dev/%s" % pmem_dev
pmem_dev = self.plib.run_ndctl_list_val(namespaces[1], 'blockdev')
Expand Down
6 changes: 4 additions & 2 deletions generic/interbench.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,16 @@ def setUp(self):
if memory.meminfo.MemTotal.b > disk_free_b:
self.cancel('Disk space is less than total memory. Skipping test')

tarball = self.fetch_asset('http://ck.kolivas.org/apps/interbench/interbench-0.31.tar.bz2')
tarball = self.fetch_asset(
'http://ck.kolivas.org/apps/interbench/interbench-0.31.tar.bz2')
archive.extract(tarball, self.workdir)
version = os.path.basename(tarball.split('.tar.')[0])
self.sourcedir = os.path.join(self.workdir, version)

# Patch for make file
os.chdir(self.sourcedir)
makefile_patch = 'patch -p1 < %s ' % self.get_data('makefile_fix.patch')
makefile_patch = 'patch -p1 < %s ' % self.get_data(
'makefile_fix.patch')
process.run(makefile_patch, shell=True)

build.make(self.sourcedir)
Expand Down
3 changes: 2 additions & 1 deletion io/common/distro_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@

release = "%s%s" % (distro.detect().name, distro.detect().version)


class DisrtoTool(Test):
'''
to test different type of tool
'''

def setUp(self):
'''
get all parameters
Expand Down Expand Up @@ -67,7 +69,6 @@ def setUp(self):
if not smm.check_installed(pkg) and not smm.install(pkg):
self.cancel("%s package is need to test" % pkg)


def test(self):
'''
test all distro tools
Expand Down
3 changes: 2 additions & 1 deletion io/common/virtual_bind_unbind.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ def test(self):
if self.device_type in ["l-lan", "vnic"]:
if self.networkinterface.ping_check(self.peer_ip,
count=5) is not None:
self.cancel("Please make sure the network peer is configured ?")
self.cancel(
"Please make sure the network peer is configured ?")
else:
if self.is_exists_device(self.virtual_device) is False:
self.cancel("failed to detect the test disk")
Expand Down
15 changes: 10 additions & 5 deletions io/disk/disk_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,13 @@ def test(self):
msg = []
if process.system("ls /dev/disk/by-id -l| grep -i %s" % self.disk_abs,
ignore_status=True, shell=True, sudo=True) != 0:
msg.append("Given disk %s is not in /dev/disk/by-id" % self.disk_abs)
msg.append("Given disk %s is not in /dev/disk/by-id" %
self.disk_abs)
for disk_node in self.disk_nodes:
if process.system("ls /dev/disk/by-path -l| grep -i %s" % disk_node,
ignore_status=True, shell=True, sudo=True) != 0:
msg.append("Given disk %s is not in /dev/disk/by-path" % disk_node)
msg.append(
"Given disk %s is not in /dev/disk/by-path" % disk_node)

# Verify disk listed in all tools
if self.mpath:
Expand All @@ -145,12 +147,14 @@ def test(self):
cmd = cmd + " | grep -i %s" % self.disk_base
if process.system(cmd, ignore_status=True,
shell=True, sudo=True) != 0:
msg.append("Given disk %s is not present in %s" % (self.disk_base, cmd))
msg.append("Given disk %s is not present in %s" %
(self.disk_base, cmd))
if self.mpath:
for disk_node in self.disk_nodes:
if process.system("lshw -c disk | grep -i %s" % disk_node,
ignore_status=True, shell=True, sudo=True) != 0:
msg.append("Given disk %s is not in lshw -c disk" % disk_node)
msg.append("Given disk %s is not in lshw -c disk" %
disk_node)

# Get the size and UUID of the disk
cmd = "lsblk -l %s --output SIZE -b |sed -n 2p" % self.disk
Expand Down Expand Up @@ -251,7 +255,8 @@ def test(self):
self.disk, self.fstype, self.uuid)

if process.system("grub2-probe %s" % self.dirs, ignore_status=True):
msg.append("Given disk %s's fs not detected by grub2" % self.disk_base)
msg.append("Given disk %s's fs not detected by grub2" %
self.disk_base)

# Un-mount the directory
self.log.info("Unmounting directory %s", self.dirs)
Expand Down
3 changes: 2 additions & 1 deletion io/disk/iozone.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,8 @@ def setUp(self):
if distro.detect().name == 'Ubuntu':
if not smm.check_installed("btrfs-tools") and not \
smm.install("btrfs-tools"):
self.cancel('btrfs-tools is needed for the test to be run')
self.cancel(
'btrfs-tools is needed for the test to be run')

tarball = self.fetch_asset(self.source_url)
archive.extract(tarball, self.teststmpdir)
Expand Down
1 change: 1 addition & 0 deletions io/disk/lvsetup.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class Lvsetup(Test):
"""
Test class for creating logical volumes.
"""

def setUp(self):
"""
Check existence of input PV,VG, LV and snapshots prior to Test.
Expand Down
4 changes: 2 additions & 2 deletions io/disk/multipath_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def is_mpath_available():
self.log.info("recovery of %s success" % path_dic["wwid"])
else:
msg += "Recovery of %s fails after blocklist\n" \
% path_dic["wwid"]
% path_dic["wwid"]

def is_path_available():
if operation == 'block':
Expand Down Expand Up @@ -210,7 +210,7 @@ def is_path_available():
self.log.info("recovery of %s success" % disk)
else:
msg += "Recovery of %s fails after blacklist %s\n" \
% (disk, path_dic["wwid"])
% (disk, path_dic["wwid"])

multipath.form_conf_mpath_file(defaults_extra=plcy)
if msg:
Expand Down
1 change: 1 addition & 0 deletions io/disk/port_bounce.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class CommandFailed(Exception):
'''
exception class
'''

def __init__(self, command, output, exitcode):
self.command = command
self.output = output
Expand Down
1 change: 1 addition & 0 deletions io/driver/module_unload_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class ModuleLoadUnload(Test):
:param iteration: Number of time to unload and load the module
:only_io True for single provide module and False for All pci modules
"""

def setUp(self):
"""
get parameters.
Expand Down
31 changes: 19 additions & 12 deletions io/net/bonding.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ def setUp(self):
for self.host_interface in self.host_interfaces:
if self.host_interface not in interfaces:
self.cancel("interface is not available")
self.peer_first_ipinterface = self.params.get("peer_ips", default="").split(" ")
self.peer_first_ipinterface = self.params.get(
"peer_ips", default="").split(" ")
if not self.peer_interfaces or self.peer_first_ipinterface == "":
self.cancel("peer machine should available")
self.ipaddr = self.params.get("host_ips", default="").split(" ")
Expand All @@ -104,8 +105,8 @@ def setUp(self):
self.peer_interfaces):
if self.peer_bond_needed:
self.remotehost = RemoteHost(
self.peer_public_ip,
self.user, password=self.password)
self.peer_public_ip,
self.user, password=self.password)
peer_networkinterface = NetworkInterface(interface,
self.remotehost)
try:
Expand Down Expand Up @@ -179,11 +180,13 @@ def setUp(self):

if 'setup' in str(self.name.name):
for interface in self.peer_interfaces:
peer_networkinterface = NetworkInterface(interface, self.remotehost)
peer_networkinterface = NetworkInterface(
interface, self.remotehost)
if peer_networkinterface.set_mtu(self.mtu) is not None:
self.cancel("Failed to set mtu in peer")
for host_interface in self.host_interfaces:
self.networkinterface = NetworkInterface(host_interface, self.localhost)
self.networkinterface = NetworkInterface(
host_interface, self.localhost)
if self.networkinterface.set_mtu(self.mtu) is not None:
self.cancel("Failed to set mtu in host")

Expand Down Expand Up @@ -361,9 +364,11 @@ def bond_fail(self, arg1):
if peer_networkinterface.set_mtu(mtu) is not None:
self.cancel("Failed to set mtu in peer")
if not self.ping_check():
self.fail("Ping fail in mode %s after MTU change to %s" % (self.mode, mtu))
self.fail("Ping fail in mode %s after MTU change to %s" %
(self.mode, mtu))
else:
self.log.info("Ping success for mode %s bond with MTU %s" % (self.mode, mtu))
self.log.info(
"Ping success for mode %s bond with MTU %s" % (self.mode, mtu))
if self.bond_networkinterface.set_mtu('1500'):
self.cancel("Failed to set mtu back to 1500 in host")
for interface in self.peer_interfaces:
Expand Down Expand Up @@ -522,15 +527,16 @@ def test_cleanup(self):
try:
networkinterface.restore_from_backup()
except Exception:
self.log.info("backup file not availbale, could not restore file.")
self.log.info(
"backup file not availbale, could not restore file.")

if self.peer_bond_needed:
self.bond_remove("peer")
for ipaddr, interface in zip(self.peer_first_ipinterface,
self.peer_interfaces):
self.remotehost = RemoteHost(
self.peer_public_ip, self.user,
password=self.password)
self.peer_public_ip, self.user,
password=self.password)
peer_networkinterface = NetworkInterface(interface,
self.remotehost)
try:
Expand All @@ -541,7 +547,7 @@ def test_cleanup(self):
peer_networkinterface.save(ipaddr, self.netmask)
time.sleep(self.sleep_time)
self.error_check()

detected_distro = distro.detect()
if detected_distro.name == "rhel":
cmd = "systemctl restart NetworkManager.service"
Expand All @@ -554,7 +560,8 @@ def test_cleanup(self):

try:
for interface in self.peer_interfaces:
peer_networkinterface = NetworkInterface(interface, self.remotehost)
peer_networkinterface = NetworkInterface(
interface, self.remotehost)
peer_networkinterface.set_mtu("1500")
self.remotehost.remote_session.quit()
except Exception:
Expand Down
Loading

0 comments on commit 217211a

Please sign in to comment.