diff --git a/.github/workflows/cgatcore_python.yml b/.github/workflows/cgatcore_python.yml index 1e91f8dd..87138ff5 100644 --- a/.github/workflows/cgatcore_python.yml +++ b/.github/workflows/cgatcore_python.yml @@ -11,7 +11,7 @@ jobs: fail-fast: false matrix: os: ["ubuntu-latest", "macos-latest"] - python-version: ["3.7", "3.8", "3.9", "3.10"] + python-version: ["3.10", "3.11", "3.12"] defaults: run: @@ -43,5 +43,5 @@ jobs: conda list - name: Test run: | - python setup.py install + pip install . ./all-tests.sh diff --git a/all-tests.sh b/all-tests.sh index f6ba2003..66852e1c 100755 --- a/all-tests.sh +++ b/all-tests.sh @@ -1,13 +1,18 @@ -#! /bin/bash +#!/bin/bash set -xue +# Run code style checks pycodestyle + +# Run the template pipeline script with the `make all` command python tests/template_pipeline.py make all -nosetests -v tests/test_import.py -nosetests -v tests/test_iotools.py -nosetests -v tests/test_pipeline_cluster.py -nosetests -v tests/test_pipeline_control.py -nosetests -v tests/test_pipeline_execution.py -pytest tests/test_pipeline_cli.py -pytest tests/test_pipeline_actions.py + +# Run tests using pytest +pytest -v tests/test_import.py +pytest -v tests/test_iotools.py +pytest -v tests/test_pipeline_cluster.py +pytest -v tests/test_pipeline_control.py +pytest -v tests/test_pipeline_execution.py +pytest -v tests/test_pipeline_cli.py +pytest -v tests/test_pipeline_actions.py diff --git a/cgatcore/csvutils.py b/cgatcore/csvutils.py index a228234d..5db40821 100644 --- a/cgatcore/csvutils.py +++ b/cgatcore/csvutils.py @@ -221,7 +221,7 @@ def readTable(infile, raise ValueError( "missing elements in line %s, received=%s, " "expected=%s" % - (r, str(row), str(fields))) + (r, str(row), str(fields))) raise ValueError diff --git a/cgatcore/iotools.py b/cgatcore/iotools.py index 7c3b6506..892e9f51 100644 --- a/cgatcore/iotools.py +++ b/cgatcore/iotools.py @@ -609,7 +609,7 @@ def bytes2human(n, format='%(value).1f%(symbol)s', symbols='customary'): symbols = SYMBOLS[symbols] prefix = {} for i, s in enumerate(symbols[1:]): - prefix[s] = 1 << (i+1)*10 + prefix[s] = 1 << (i + 1) * 10 for symbol in reversed(symbols[1:]): if n >= prefix[symbol]: value = float(n) / prefix[symbol] @@ -668,7 +668,7 @@ def human2bytes(s): raise ValueError("can't interpret %r" % init) prefix = {sset[0]: 1} for i, s in enumerate(sset[1:]): - prefix[s] = 1 << (i+1)*10 + prefix[s] = 1 << (i + 1) * 10 return int(num * prefix[letter]) diff --git a/cgatcore/logfile.py b/cgatcore/logfile.py index 8bc4640e..1598d498 100644 --- a/cgatcore/logfile.py +++ b/cgatcore/logfile.py @@ -35,8 +35,8 @@ RuntimeInformation = collections.namedtuple( "RuntimeInformation", - "script options jobid host has_finished " + - "start_date end_date " + + "script options jobid host has_finished " + "start_date end_date " "wall utime stime cutime cstime") RX_START = re.compile(r"output generated by (\S+) (.*)") diff --git a/cgatcore/pipeline/execution.py b/cgatcore/pipeline/execution.py index c214ccad..b0e51a10 100644 --- a/cgatcore/pipeline/execution.py +++ b/cgatcore/pipeline/execution.py @@ -1386,8 +1386,8 @@ def run_pickled(params): try: function = getattr(module, func_name) except AttributeError as msg: - raise AttributeError(msg.message + - "unknown function, available functions are: %s" % + raise AttributeError(msg.message + + "unknown function, available functions are: %s" % ",".join([x for x in dir(module) if not x.startswith("_")])) diff --git a/cgatcore/pipeline/farm.py b/cgatcore/pipeline/farm.py index 92b25ed6..45686bd7 100755 --- a/cgatcore/pipeline/farm.py +++ b/cgatcore/pipeline/farm.py @@ -234,8 +234,7 @@ def chunk_iterator_regex_split(infile, args, prefix, use_header=False): if line[0] == "#": continue if rex.search(line[:-1]): - if n > 0 and (n % chunk_size == 0 or - (max_lines and nlines > max_lines)): + if n > 0 and (n % chunk_size == 0 or (max_lines and nlines > max_lines)): outfile.close() yield filename filename = "%s/%010i.in" % (prefix, n) diff --git a/cgatcore/pipeline/logstash_formatter/__init__.py b/cgatcore/pipeline/logstash_formatter/__init__.py index 8b67cd0b..a347e124 100644 --- a/cgatcore/pipeline/logstash_formatter/__init__.py +++ b/cgatcore/pipeline/logstash_formatter/__init__.py @@ -171,8 +171,8 @@ def format(self, record): now = datetime.datetime.utcnow() event_time = getattr(record, "eventtime", - now.strftime("%Y-%m-%dT%H:%M:%S") + - ".%03d" % (now.microsecond / 1000) + "Z") + now.strftime("%Y-%m-%dT%H:%M:%S") + + ".%03d" % (now.microsecond / 1000) + "Z") base_log = {'@timestamp': event_time, '@version': 1, 'source_host': self.source_host} diff --git a/cgatcore/pipeline/utils.py b/cgatcore/pipeline/utils.py index 71b33dbc..557f52e1 100644 --- a/cgatcore/pipeline/utils.py +++ b/cgatcore/pipeline/utils.py @@ -109,8 +109,7 @@ def _doc(func): # currently hard-coded, can be derived # from doc string? if not replace: - lines.insert(x+1, " " * 4 + - func.__doc__) + lines.insert(x + 1, " " * 4 + func.__doc__) func.__doc__ = "\n".join(lines) else: func.__doc__ = value.__doc__ diff --git a/cgatcore/remote/ftp.py b/cgatcore/remote/ftp.py index 714228d7..ecbcf227 100644 --- a/cgatcore/remote/ftp.py +++ b/cgatcore/remote/ftp.py @@ -24,9 +24,9 @@ def __init__(self, *args, stay_on_remote=False, immediate_close=False, **kwargs) @contextmanager def ftpc(self): - if (not hasattr(self, "conn") or - (hasattr(self, "conn") and - not isinstance(self.conn, ftputil.FTPHost))) or self.immediate_close: + if (not hasattr(self, "conn") + or (hasattr(self, "conn") and not isinstance(self.conn, ftputil.FTPHost))) + or self.immediate_close: args_use = self.provider.args if len(self.args): args_use = self.args diff --git a/cgatcore/table.py b/cgatcore/table.py index d2911628..1f338493 100644 --- a/cgatcore/table.py +++ b/cgatcore/table.py @@ -11,12 +11,6 @@ import cgatcore.iotools as iotools -# The status of this module is unresolved. Functionality implemented -# here is used in the database.py module to massage tables before -# uploading. On the other hand, some functionality relies on the -# cgat-apps/CGAT/Stats.py module. - - def get_columns(fields, columns="all"): '''return columns to take.''' if columns == "all": @@ -34,282 +28,7 @@ def get_columns(fields, columns="all"): return c -def read_and_transpose_table(infile, args): - """read table from infile and transpose - """ - rows = [] - if args.transpose_format == "default": - - for line in infile: - if line[0] == "#": - continue - rows.append(line[:-1].split("\t")) - - elif args.transpose_format == "separated": - for line in infile: - if line[0] == "#": - continue - key, vals = line[:-1].split("\t") - row = [key] + vals.split(args.separator) - rows.append(row) - - ncols = max([len(x) for x in rows]) - nrows = len(rows) - - new_rows = [[""] * nrows for x in range(ncols)] - - for r in range(0, len(rows)): - for c in range(0, len(rows[r])): - new_rows[c][r] = rows[r][c] - - if args.set_transpose_field: - new_rows[0][0] = args.set_transpose_field - - for row in new_rows: - args.stdout.write("\t".join(row) + "\n") - - -def read_and_group_table(infile, args): - """read table from infile and group. - """ - fields, table = CSV.readTable( - infile, with_header=args.has_headers, as_rows=True) - args.columns = get_columns(fields, args.columns) - assert args.group_column not in args.columns - - converter = float - new_fields = [fields[args.group_column]] + [fields[x] for x in args.columns] - - if args.group_function == "min": - f = min - elif args.group_function == "max": - f = max - elif args.group_function == "sum": - def f(z): return reduce(lambda x, y: x + y, z) - elif args.group_function == "mean": - f = numpy.mean - elif args.group_function == "cat": - def f(x): return ";".join([y for y in x if y != ""]) - converter = str - elif args.group_function == "uniq": - def f(x): return ";".join([y for y in set(x) if y != ""]) - converter = str - elif args.group_function == "stats": - # Stats lives in cgat-apps/CGAT - def f(x): return str(Stats.DistributionalParameters(x)) - # update headers - new_fields = [fields[args.group_column]] - for c in args.columns: - new_fields += list(["%s_%s" % - (fields[c], x) for x in Stats.DistributionalParameters().getHeaders()]) - - # convert values to floats (except for group_column) - # Delete rows with unconvertable values and not in args.columns - new_table = [] - for row in table: - skip = False - new_row = [row[args.group_column]] - - for c in args.columns: - if row[c] == args.missing_value: - new_row.append(row[c]) - else: - try: - new_row.append(converter(row[c])) - except ValueError: - skip = True - break - if not skip: - new_table.append(new_row) - table = new_table - - new_rows = CSV.groupTable(table, - group_column=0, - group_function=f) - - args.stdout.write("\t".join(new_fields) + "\n") - for row in new_rows: - args.stdout.write("\t".join(map(str, row)) + "\n") - - -def read_and_expand_table(infile, args): - '''splits fields in table at separator. - - If a field in a row contains multiple values, - the row is expanded into multiple rows such - that all values have space. - ''' - - fields, table = CSV.readTable( - infile, with_header=args.has_headers, as_rows=True) - - args.stdout.write("\t".join(fields) + "\n") - - for row in table: - - data = [] - for x in range(len(fields)): - data.append(row[x].split(args.separator)) - - nrows = max([len(d) for d in data]) - - for d in data: - d += [""] * (nrows - len(d)) - - for n in range(nrows): - args.stdout.write("\t".join([d[n] for d in data]) + "\n") - - -def read_and_collapse_table(infile, args, missing_value=""): - '''collapse a table. - - Collapse a table of two columns with row names in the first - column. Outputs a table with multiple columns for each row name. - ''' - - fields, table = CSV.readTable( - infile, with_header=args.has_headers, as_rows=True) - - if len(fields) != 2: - raise NotImplementedError("can only work on tables with two columns") - - values = collections.defaultdict(list) - - # column header after which to add - separator = table[0][0] - row_names = set([x[0] for x in table]) - - row_name, value = table[0] - - values[row_name].append(value) - added = set([row_name]) - for row_name, value in table[1:]: - if row_name == separator: - for r in row_names: - if r not in added: - values[r].append(missing_value) - added = set() - - values[row_name].append(value) - added.add(row_name) - - for r in row_names: - if r not in added: - values[r].append(missing_value) - - sizes = set([len(x) for x in list(values.values())]) - assert len(sizes) == 1, "unequal number of row_names" - size = list(sizes)[0] - - args.stdout.write( - "row\t%s\n" % ("\t".join(["column_%i" % x for x in range(size)]))) - - for key, row in list(values.items()): - args.stdout.write("%s\t%s\n" % (key, "\t".join(row))) - - -def computeFDR(infile, args): - '''compute FDR on a table. - ''' - - fields, table = CSV.readTable( - infile, with_header=args.has_headers, as_rows=True) - - args.stdout.write("\t".join(fields) + "\n") - - for row in table: - - data = [] - for x in range(len(fields)): - data.append(row[x].split(args.separator)) - - nrows = max([len(d) for d in data]) - - for d in data: - d += [""] * (nrows - len(d)) - - for n in range(nrows): - args.stdout.write("\t".join([d[n] for d in data]) + "\n") - - -def read_and_join_table(infile, args): - - fields, table = CSV.readTable( - infile, with_header=args.has_headers, as_rows=True) - - join_column = args.join_column - 1 - join_name = args.join_column_name - 1 - - join_rows = list(set([x[join_column] for x in table])) - join_rows.sort() - - join_names = list(set([x[join_name] for x in table])) - join_names.sort() - - join_columns = list( - set(range(len(fields))).difference(set((join_column, join_name)))) - join_columns.sort() - - new_table = [] - map_old2new = {} - - map_name2start = {} - x = 1 - for name in join_names: - map_name2start[name] = x - x += len(join_columns) - - row_width = len(join_columns) * len(join_names) - for x in join_rows: - map_old2new[x] = len(map_old2new) - new_row = [x, ] + ["na"] * row_width - new_table.append(new_row) - - for row in table: - row_index = map_old2new[row[join_column]] - start = map_name2start[row[join_name]] - for x in join_columns: - new_table[row_index][start] = row[x] - start += 1 - - # print new table - args.stdout.write(fields[join_column]) - for name in join_names: - for column in join_columns: - args.stdout.write( - "\t%s%s%s" % (name, args.separator, fields[column])) - args.stdout.write("\n") - - for row in new_table: - args.stdout.write("\t".join(row) + "\n") - - -def read_and_randomize_rows(infile, args): - """read table from stdin and randomize rows, keeping header.""" - - c = E.Counter() - if args.has_headers: - keep_header = 1 - else: - keep_header = 0 - for x in range(keep_header): - c.header += 1 - args.stdout.write(infile.readline()) - - lines = infile.readlines() - c.lines_input = len(lines) - random.shuffle(lines) - args.stdout.write("".join(lines)) - c.lines_output = len(lines) - E.info(c) - - def main(argv=None): - """script main. - - parses command line args in sys.argv, unless *argv* is given. - """ - if argv is None: argv = sys.argv @@ -317,182 +36,35 @@ def main(argv=None): parser.add_argument("--version", action='version', version='%(prog)s {version}'.format(version="1.0")) - parser.add_argument( "-m", "--method", dest="methods", type=str, action="append", choices=("transpose", "normalize-by-max", "normalize-by-value", - "multiply-by-value", - "percentile", "remove-header", "normalize-by-table", - "upper-bound", "lower-bound", "kullback-leibler", - "expand", "compress", "fdr", "grep", + "multiply-by-value", "percentile", "remove-header", + "normalize-by-table", "upper-bound", "lower-bound", + "kullback-leibler", "expand", "compress", "fdr", "grep", "randomize-rows"), - help="""actions to perform on table.""") - + help="actions to perform on table.") parser.add_argument("-s", "--scale", dest="scale", type=float, help="factor to scale matrix by.") - parser.add_argument("-f", "--format", dest="format", type=str, help="output number format") - parser.add_argument("-p", "--parameters", dest="parameters", type=str, help="Parameters for various functions.") - parser.add_argument( - "-t", "--header-names", dest="has_headers", action="store_true", - help="matrix has row/column headers.") - - parser.add_argument("--transpose", dest="transpose", action="store_true", - help="transpose table.") - - parser.add_argument( - "--set-transpose-field", dest="set_transpose_field", type=str, - help="set first field (row 1 and col 1) to this value [%default].") - - parser.add_argument( - "--transpose-format", dest="transpose_format", type=str, - choices=("default", "separated", ), - help="input format of un-transposed table") - - parser.add_argument( - "--expand", dest="expand_table", action="store_true", - help="expand table - multi-value cells with be expanded over " - "several rows.") - - parser.add_argument("--no-headers", dest="has_headers", action="store_false", - help="matrix has no row/column headers.") - - parser.add_argument("--columns", dest="columns", type=str, - help="columns to use.") - - parser.add_argument("--file", dest="file", type=str, - help="columns to test from table.", - metavar="FILE") - - parser.add_argument("-d", "--delimiter", dest="delimiter", type=str, - help="delimiter of columns.", - metavar="DELIM") - - parser.add_argument( - "-V", "--invert-match", dest="invert_match", - action="store_true", - help="invert match.") - - parser.add_argument("--sort-by-rows", dest="sort_rows", type=str, - help="output order for rows.") - - parser.add_argument("-a", "--value", dest="value", type=float, - help="value to use for various algorithms.") - - parser.add_argument( - "--group", dest="group_column", type=int, - help="group values by column. Supply an integer column ") - - parser.add_argument("--group-function", dest="group_function", type=str, - choices=( - "min", "max", "sum", "mean", "stats", "cat", "uniq"), - help="function to group values by.") - - parser.add_argument("--join-table", dest="join_column", type=int, - help="join rows in a table by columns.") - - parser.add_argument( - "--collapse-table", dest="collapse_table", type=str, - help="collapse a table. Value determines the missing variable ") - - parser.add_argument( - "--join-column-name", dest="join_column_name", type=int, - help="use this column as a prefix.") - - parser.add_argument( - "--flatten-table", dest="flatten_table", action="store_true", - help="flatten a table.") - - parser.add_argument("--as-column", dest="as_column", action="store_true", - help="output table as a single column.") - - parser.add_argument( - "--split-fields", dest="split_fields", action="store_true", - help="split fields.") - - parser.add_argument( - "--separator", dest="separator", type=str, - help="separator for multi-valued fields.") - - parser.add_argument( - "--fdr-method", dest="fdr_method", type=str, - choices=( - "BH", "bonferroni", "holm", "hommel", "hochberg", "BY"), - help="method to perform multiple testing correction by controlling " - "the fdr.") - - parser.add_argument( - "--fdr-add-column", dest="fdr_add_column", type=str, - help="add new column instead of replacing existing columns. " - "The value of the option will be used as prefix if there are " - "multiple columns") - - # IMS: add option to use a column as the row id in flatten - parser.add_argument( - "--id-column", dest="id_column", type=str, - help="list of column(s) to use as the row id when flattening " - "the table. If None, then row number is used.") - - parser.add_argument( - "--variable-name", dest="variable_name", type=str, - help="the column header for the 'variable' column when flattening ") - - parser.add_argument( - "--value-name", dest="value_name", type=str, - help="the column header for the 'value' column when flattening ") - + "--transpose", dest="transpose", action="store_true", + help="transpose table.") parser.set_defaults( - methods=[], - scale=1.0, - has_headers=True, - format=None, - value=0.0, - parameters="", - columns="all", - transpose=False, - set_transpose_field=None, - transpose_format="default", - group=False, - group_column=0, - group_function="mean", - missing_value="na", - sort_rows=None, - flatten_table=False, - collapse_table=None, - separator=";", - expand=False, - join_column=None, - join_column_name=None, - compute_fdr=None, - as_column=False, - fdr_method="BH", - fdr_add_column=None, - id_column=None, - variable_name="column", - value_name="value", - file=None, - delimiter="\t", - invert_match=False, - ) + methods=[], scale=1.0, has_headers=True, format=None, + value=0.0, parameters="", columns="all", transpose=False) (args, unknown) = E.start(parser, unknowns=True) - args.parameters = args.parameters.split(",") if args.group_column: args.group = True args.group_column -= 1 - ###################################################################### - ###################################################################### - ###################################################################### - # if only to remove header, do this quickly if args.methods == ["remove-header"]: - first = True for line in args.stdin: if line[0] == "#": @@ -503,121 +75,23 @@ def main(argv=None): args.stdout.write(line) elif args.transpose or "transpose" in args.methods: - read_and_transpose_table(args.stdin, args) - elif args.flatten_table: - # IMS: bug fixed to make work. Also added options for keying - # on a particular and adding custom column headings - - fields, table = CSV.readTable( - args.stdin, with_header=args.has_headers, as_rows=True) - - args.columns = get_columns(fields, args.columns) - - if args.id_column: - id_columns = [int(x) - 1 for x in args.id_column.split(",")] - id_header = "\t".join([fields[id_column] - for id_column in id_columns]) - args.columns = [ - x for x in args.columns if x not in id_columns] - else: - id_header = "row" - - args.stdout.write( - "%s\t%s\t%s\n" % (id_header, args.variable_name, - args.value_name)) - - for x, row in enumerate(table): - - if args.id_column: - row_id = "\t".join([row[int(x) - 1] - for x in args.id_column.split(",")]) - else: - row_id = str(x) - - for y in args.columns: - args.stdout.write( - "%s\t%s\t%s\n" % (row_id, fields[y], row[y])) - elif args.as_column: - fields, table = CSV.readTable( args.stdin, with_header=args.has_headers, as_rows=True) args.columns = get_columns(fields, args.columns) table = list(zip(*table)) - args.stdout.write("value\n") - for column in args.columns: args.stdout.write("\n".join(table[column]) + "\n") - elif args.split_fields: - - # split comma separated fields - fields, table = CSV.readTable(args.stdin, - with_header=args.has_headers, - as_rows=True) - - args.stdout.write("%s\n" % ("\t".join(fields))) - - for row in table: - row = [x.split(args.separator) for x in row] - for d in itertools.product(*row): - args.stdout.write("%s\n" % "\t".join(d)) - elif args.group: read_and_group_table(args.stdin, args) - elif args.join_column: - read_and_join_table(args.stdin, args) - - elif args.expand_table: - read_and_expand_table(args.stdin, args) - - elif args.collapse_table is not None: - read_and_collapse_table(args.stdin, args, args.collapse_table) - - elif "randomize-rows" in args.methods: - read_and_randomize_rows(args.stdin, args) - - elif "grep" in args.methods: - - args.columns = [int(x) - 1 for x in args.columns.split(",")] - - patterns = [] - - if args.file: - infile = iotools.open_file(args.file, "r") - for line in infile: - if line[0] == "#": - continue - patterns.append(line[:-1].split(args.delimiter)[0]) - else: - patterns = args - - for line in args.stdin: - - data = line[:-1].split(args.delimiter) - found = False - - for c in args.columns: - - if data[c] in patterns: - found = True - break - - if (not found and args.invert_match) or (found and not args.invert_match): - print(line[:-1]) else: - - ###################################################################### - ###################################################################### - ###################################################################### - # Apply remainder of transformations fields, table = CSV.readTable( args.stdin, with_header=args.has_headers, as_rows=False) - # convert columns to list table = [list(x) for x in table] ncols = len(fields) @@ -625,12 +99,9 @@ def main(argv=None): raise ValueError("table is empty") nrows = len(table[0]) - E.info("processing table with %i rows and %i columns" % (nrows, ncols)) - args.columns = get_columns(fields, args.columns) - # convert all values to float for c in args.columns: for r in range(nrows): try: @@ -641,96 +112,61 @@ def main(argv=None): for method in args.methods: if method == "normalize-by-value": - value = float(args.parameters[0]) del args.parameters[0] - for c in args.columns: table[c] = [x / value for x in table[c]] elif method == "multiply-by-value": - value = float(args.parameters[0]) del args.parameters[0] - for c in args.columns: table[c] = [x * value for x in table[c]] elif method == "normalize-by-max": - for c in args.columns: m = max(table[c]) table[c] = [x / m for x in table[c]] - elif method == "kullback-leibler": - args.stdout.write("category1\tcategory2\tkl1\tkl2\tmean\n") - format = args.format - if format is None: - format = "%f" - - for x in range(0, len(args.columns) - 1): - for y in range(x + 1, len(args.columns)): - c1 = args.columns[x] - c2 = args.columns[y] - e1 = 0 - e2 = 0 - for z in range(nrows): - p = table[c1][z] - q = table[c2][z] - e1 += p * math.log(p / q) - e2 += q * math.log(q / p) - - args.stdout.write("%s\t%s\t%s\t%s\t%s\n" % ( - fields[c1], fields[c2], - format % e1, - format % e2, - format % ((e1 + e2) / 2))) - E.stop() - sys.exit(0) - - elif method == "rank": - - for c in args.columns: - tt = table[c] - t = list(zip(tt, list(range(nrows)))) - t.sort() - for i, n in zip([x[1] for x in t], list(range(nrows))): - tt[i] = n - elif method in ("lower-bound", "upper-bound"): - boundary = float(args.parameters[0]) del args.parameters[0] + new_value = float(args.parameters[0]) del args.parameters[0] if method == "upper-bound": for c in args.columns: for r in range(nrows): - if isinstance(table[c][r], float) and \ - table[c][r] > boundary: + if isinstance(table[c][r], float) and table[c][r] > boundary: table[c][r] = new_value else: for c in args.columns: for r in range(nrows): - if isinstance(table[c][r], float) and \ - table[c][r] < boundary: + if isinstance(table[c][r], float) and table[c][r] < boundary: table[c][r] = new_value - elif method == "fdr": - pvalues = [] - for c in args.columns: - pvalues.extend(table[c]) - - assert max(pvalues) <= 1.0, "pvalues > 1 in table: max=%s" % \ - str(max(pvalues)) - assert min(pvalues) >= 0, "pvalue < 0 in table: min=%s" % \ - str(min(pvalues)) + elif method == "kullback-leibler": + args.stdout.write("category1\tcategory2\tkl1\tkl2\tmean\n") + format = args.format or "%f" + for x in range(0, len(args.columns) - 1): + for y in range(x + 1, len(args.columns)): + c1, c2 = args.columns[x], args.columns[y] + e1, e2 = 0, 0 + for z in range(nrows): + p, q = table[c1][z], table[c2][z] + e1 += p * math.log(p / q) + e2 += q * math.log(q / p) + args.stdout.write("%s\t%s\t%s\t%s\t%s\n" % ( + fields[c1], fields[c2], format % e1, format % e2, format % ((e1 + e2) / 2))) + E.stop() + sys.exit(0) - # convert to str to avoid test for float downstream - qvalues = list(map( - str, Stats.adjustPValues(pvalues, - method=args.fdr_method))) + elif method == "fdr": + pvalues = [table[c][r] for c in args.columns for r in range(nrows)] + assert max(pvalues) <= 1.0, f"pvalues > 1 in table: max={max(pvalues)}" + assert min(pvalues) >= 0, f"pvalue < 0 in table: min={min(pvalues)}" + qvalues = list(map(str, Stats.adjustPValues(pvalues, method=args.fdr_method))) if args.fdr_add_column is None: x = 0 @@ -738,71 +174,22 @@ def main(argv=None): table[c] = qvalues[x:x + nrows] x += nrows else: - # add new column headers - if len(args.columns) == 1: - fields.append(args.fdr_add_column) - else: - for co in args.columns: - fields.append(args.fdr_add_column + fields[c]) - + fields += [args.fdr_add_column + fields[c] for c in args.columns] x = 0 for c in args.columns: - # add a new column table.append(qvalues[x:x + nrows]) x += nrows ncols += len(args.columns) - elif method == "normalize-by-table": - - other_table_name = args.parameters[0] - del args.parameters[0] - other_fields, other_table = CSV.readTable( - iotools.open_file(other_table_name, "r"), - with_header=args.has_headers, - as_rows=False) - - # convert all values to float - for c in args.columns: - for r in range(nrows): - try: - other_table[c][r] = float(other_table[c][r]) - except ValueError: - continue - - # set 0s to 1 in the other matrix - for c in args.columns: - for r in range(nrows): - if isinstance(table[c][r], float) and \ - isinstance(other_table[c][r], float) and \ - other_table[c][r] != 0: - table[c][r] /= other_table[c][r] - else: - table[c][r] = args.missing_value - - # convert back if args.format is not None: for c in args.columns: for r in range(nrows): if isinstance(table[c][r], float): - table[c][r] = format % table[c][r] + table[c][r] = args.format % table[c][r] args.stdout.write("\t".join(fields) + "\n") - if args.sort_rows: - old2new = {} - for r in range(nrows): - old2new[table[0][r]] = r - for x in args.sort_rows.split(","): - if x not in old2new: - continue - r = old2new[x] - args.stdout.write( - "\t".join(map(str, - [table[c][r] for c in range(ncols)])) + "\n") - else: - for r in range(nrows): - args.stdout.write( - "\t".join(map(str, - [table[c][r] for c in range(ncols)])) + "\n") + for r in range(nrows): + args.stdout.write("\t".join(map(str, [table[c][r] for c in range(ncols)])) + "\n") E.stop() diff --git a/cgatcore/tables.py b/cgatcore/tables.py index 40a72a3d..37b128f9 100644 --- a/cgatcore/tables.py +++ b/cgatcore/tables.py @@ -116,7 +116,7 @@ def concatenate_tables(outfile, options, args): if min(ncolumns) != max(ncolumns): raise ValueError('tables have unequal number of columns ' '(min=%i, max=%i)' % - (min(ncolumns), max(ncolumns))) + (min(ncolumns), max(ncolumns))) # create a pseudo dictionary of columns titles = collections.OrderedDict( [(x, x) for x in range(min(ncolumns))]) @@ -136,8 +136,8 @@ def _output_line(line, map_old2new): data[map_old2new[x]] = value - row = "\t".join([str(x) for x in row_headers[nindex]] + - data) + "\n" + row = "\t".join([str(x) for x in row_headers[nindex]] + + data) + "\n" outfile.write(row) if options.input_has_titles: diff --git a/cgatcore/version.py b/cgatcore/version.py deleted file mode 100644 index 578a2982..00000000 --- a/cgatcore/version.py +++ /dev/null @@ -1 +0,0 @@ -__version__ = "0.6.16" diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 00000000..86d57885 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,39 @@ +[build-system] +requires = ["setuptools>=1.1", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "cgatcore" +version = "0.6.17" +description = "cgatcore: the Computational Genomics Analysis Toolkit" +authors = [ + { name = "Adam Cribbs", email = "adam.cribbs@ndorms.ox.ac.uk" } +] +license = { text = "MIT" } +keywords = ["computational genomics"] +classifiers = [ + "Development Status :: 3 - Alpha", + "Intended Audience :: Science/Research", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 3", + "Topic :: Software Development", + "Topic :: Scientific/Engineering", + "Operating System :: POSIX", + "Operating System :: Unix", + "Operating System :: MacOS" +] +readme = "README.md" + +[project.urls] +Documentation = "https://github.com/cgat-developers/cgat-core" +Source = "https://github.com/cgat-developers/cgat-core" +Tracker = "https://github.com/cgat-developers/cgat-core/issues" +Homepage = "https://github.com/cgat-developers/cgat-core" + +[tool.setuptools.packages.find] +where = ["."] +include = ["cgatcore", "cgatcore.pipeline"] + +[project.optional-dependencies] +testing = ["pytest"] # include your testing dependencies diff --git a/setup.cfg b/setup.cfg index b0c07d0d..c9c75407 100644 --- a/setup.cfg +++ b/setup.cfg @@ -5,3 +5,5 @@ pep8maxlinelength = 120 count = False max-line-length = 120 statistics = True +ignore = E101,E201,E202,E122,E265,E501,E502,E731,W191,W291,W293,W391,W503,W601,W602,E701,E713,E702 + diff --git a/setup.py b/setup.py index dd83ac1f..7f1a1763 100644 --- a/setup.py +++ b/setup.py @@ -1,73 +1,4 @@ -import sys -import os -import re -import setuptools -from setuptools import setup, find_packages, Extension +from setuptools import setup -from distutils.version import LooseVersion -if LooseVersion(setuptools.__version__) < LooseVersion('1.1'): - print("Version detected:", LooseVersion(setuptools.__version__)) - raise ImportError( - "the CGAT code collection requires setuptools 1.1 higher") - -######################################################################## -######################################################################## -IS_OSX = sys.platform == 'darwin' - -######################################################################## -######################################################################## -# collect CGAT version -sys.path.insert(0, "cgatcore") -import version # noqa - -version = version.__version__ - -############################################################### -############################################################### -# Define dependencies -# -major, minor1, minor2, s, tmp = sys.version_info - -if major < 3: - raise SystemExit("""CGAT requires Python 3 or later.""") - -cgat_packages = find_packages() -cgat_package_dirs = {'cgatcore': 'cgatcore'} - -########################################################## -########################################################## -# Classifiers -classifiers = """ -Development Status :: 3 - Alpha -Intended Audience :: Science/Research -Intended Audience :: Developers -License :: OSI Approved -Programming Language :: Python -Topic :: Software Development -Topic :: Scientific/Engineering -Operating System :: POSIX -Operating System :: Unix -Operating System :: MacOS -""" - -setup( - # package information - name='cgatcore', - version=version, - description='cgatcore : the Computational Genomics Analysis Toolkit', - author='Andreas Heger', - author_email='andreas.heger@gmail.com', - license="MIT", - platforms=["any"], - keywords="computational genomics", - long_description='CGAT : the Computational Genomics Analysis Toolkit', - classifiers=[_f for _f in classifiers.split("\n") if _f], - url="https://github.com/cgat-developers/cgat-core", - # package contents - packages=cgat_packages, - package_dir=cgat_package_dirs, - include_package_data=True, - # other options - zip_safe=False, - test_suite="tests", -) +if __name__ == "__main__": + setup() diff --git a/tests/test_import.py b/tests/test_import.py index 0ff3c5d1..36c885ed 100644 --- a/tests/test_import.py +++ b/tests/test_import.py @@ -1,7 +1,7 @@ '''test_import - test importing all modules =========================================== -:Author: Andreas Heger +:Author: Adam Cribbs :Release: $Id$ :Date: |today| :Tags: Python @@ -17,71 +17,58 @@ will fail within sphinx. ''' - import os import glob import traceback -import imp +import importlib.util +import pytest # DIRECTORIES to examine EXPRESSIONS = ( ('FirstLevel', 'cgatcore/*.py'), - ('SecondLevel', 'cgatcore/pipeline/*.py')) + ('SecondLevel', 'cgatcore/pipeline/*.py') +) # Code to exclude -EXCLUDE = () +EXCLUDE = set() def check_import(filename, outfile): + """Attempt to import a module and handle errors.""" + module_name = os.path.splitext(os.path.basename(filename))[0] - prefix, suffix = os.path.splitext(filename) - dirname, basename = os.path.split(prefix) - - if basename in EXCLUDE: + if module_name in EXCLUDE: return - if os.path.exists(prefix + ".pyc"): - os.remove(prefix + ".pyc") + if os.path.exists(filename + "c"): + os.remove(filename + "c") - # ignore script with pyximport for now, something does not work - pyxfile = os.path.join(dirname, "_") + basename + "x" + pyxfile = os.path.join(os.path.dirname(filename), "_") + module_name + "x" if os.path.exists(pyxfile): return try: - imp.load_source(basename, filename) - + spec = importlib.util.spec_from_file_location(module_name, filename) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) except ImportError as msg: - outfile.write("FAIL %s\n%s\n" % (basename, msg)) + outfile.write(f"FAIL {module_name}\n{msg}\n") outfile.flush() traceback.print_exc(file=outfile) - assert False, '%s scripts/modules - ImportError: %s' % (basename, msg) + pytest.fail(f"{module_name} - ImportError: {msg}") except Exception as msg: - outfile.write("FAIL %s\n%s\n" % (basename, msg)) + outfile.write(f"FAIL {module_name}\n{msg}\n") outfile.flush() - traceback.print_exc(file=outfile) - assert False, '%s scripts/modules - Exception: %s' % (basename, str(msg)) - - assert True - - -def test_import(): - '''test importing - - Relative imports will cause a failure because - imp.load_source does not import modules that are in the same - directory as the module being loaded from source. - ''' - outfile = open('test_import.log', 'a') + pytest.fail(f"{module_name} - Exception: {msg}") - for label, expression in EXPRESSIONS: - files = glob.glob(expression) - files.sort() +@pytest.mark.parametrize("label, expression", EXPRESSIONS) +def test_import(label, expression): + """Test importing all modules in the specified expressions.""" + with open('test_import.log', 'a') as outfile: + files = sorted(glob.glob(expression)) - for f in files: - if os.path.isdir(f): - continue - check_import.description = os.path.abspath(f) - yield check_import, os.path.abspath(f), outfile + for filename in files: + if not os.path.isdir(filename): + check_import(filename, outfile) diff --git a/tests/test_iotools.py b/tests/test_iotools.py index a33ef1dd..54e6972d 100644 --- a/tests/test_iotools.py +++ b/tests/test_iotools.py @@ -1,85 +1,94 @@ """Test cases for the cgatcore.iotools module.""" -import unittest import os import shutil import tempfile import time import cgatcore.iotools as iotools - - -class TestiotoolsTouchFile(unittest.TestCase): - - basename = "test_iotools_touch_file.txt" - - def setUp(self): - self.tempdir = tempfile.mkdtemp() - self.filename = os.path.join(self.tempdir, - self.basename) - - def tearDown(self): - shutil.rmtree(self.tempdir) - - def test_touch_file_creates_empty_file(self): - self.assertFalse(os.path.exists(self.filename)) - iotools.touch_file(self.filename) - self.assertTrue(os.path.exists(self.filename)) - if self.filename.endswith(".gz"): - self.assertFalse(iotools.is_empty(self.filename)) - else: - self.assertTrue(iotools.is_empty(self.filename)) - - with iotools.open_file(self.filename) as inf: - data = inf.read() - self.assertEqual(len(data), 0) - - def test_touch_file_updates_existing_file(self): - with iotools.open_file(self.filename, "w") as outf: - outf.write("some data\n") - created = os.stat(self.filename).st_mtime - time.sleep(1) - iotools.touch_file(self.filename) - modified = os.stat(self.filename).st_mtime - self.assertGreater(modified, created) - with iotools.open_file(self.filename) as inf: - data = inf.read() - self.assertEqual(data, "some data\n") - - -class TestiotoolsTouchFileCompressed(TestiotoolsTouchFile): - - basename = "test_iotools_touch_file.txt.gz" - - -class TestiottoolsIsNested(unittest.TestCase): - - def test_is_nested_with_dict(self): - test_data = { - "key1": { - "nested_key1": "nested_key1" - } +import pytest + + +@pytest.fixture +def temp_file(): + """Fixture to create and clean up a temporary file.""" + tempdir = tempfile.mkdtemp() + filename = os.path.join(tempdir, "test_iotools_touch_file.txt") + yield filename + shutil.rmtree(tempdir) + + +@pytest.fixture +def temp_file_compressed(): + """Fixture to create and clean up a temporary compressed file.""" + tempdir = tempfile.mkdtemp() + filename = os.path.join(tempdir, "test_iotools_touch_file.txt.gz") + yield filename + shutil.rmtree(tempdir) + + +def test_touch_file_creates_empty_file(temp_file): + assert not os.path.exists(temp_file) + iotools.touch_file(temp_file) + assert os.path.exists(temp_file) + if temp_file.endswith(".gz"): + assert not iotools.is_empty(temp_file) + else: + assert iotools.is_empty(temp_file) + + with iotools.open_file(temp_file) as inf: + data = inf.read() + assert len(data) == 0 + + +def test_touch_file_updates_existing_file(temp_file): + with iotools.open_file(temp_file, "w") as outf: + outf.write("some data\n") + created = os.stat(temp_file).st_mtime + time.sleep(1) + iotools.touch_file(temp_file) + modified = os.stat(temp_file).st_mtime + assert modified > created + with iotools.open_file(temp_file) as inf: + data = inf.read() + assert data == "some data\n" + + +def test_touch_file_compressed_creates_empty_file(temp_file_compressed): + assert not os.path.exists(temp_file_compressed) + iotools.touch_file(temp_file_compressed) + assert os.path.exists(temp_file_compressed) + if temp_file_compressed.endswith(".gz"): + assert not iotools.is_empty(temp_file_compressed) + else: + assert iotools.is_empty(temp_file_compressed) + + with iotools.open_file(temp_file_compressed) as inf: + data = inf.read() + assert len(data) == 0 + + +def test_is_nested_with_dict(): + test_data = { + "key1": { + "nested_key1": "nested_key1" } - self.assertTrue(iotools.is_nested(test_data)) - + } + assert iotools.is_nested(test_data) -class TestiottoolsNestedIter(unittest.TestCase): - def test_nested_iter_with_dict_of_dicts(self): - test_data = { - "key1": { - "nested_key1": "nested_key1" - } +def test_nested_iter_with_dict_of_dicts(): + test_data = { + "key1": { + "nested_key1": "nested_key1" } - list(iotools.nested_iter(test_data)) + } + list(iotools.nested_iter(test_data)) - def test_nested_iter_with_list_of_dicts(self): - test_data = [ - { - "nested_key1": "nested_key1" - } - ] - list(iotools.nested_iter(test_data)) - -if __name__ == "__main__": - unittest.main() +def test_nested_iter_with_list_of_dicts(): + test_data = [ + { + "nested_key1": "nested_key1" + } + ] + list(iotools.nested_iter(test_data)) diff --git a/tests/test_pipeline_cluster.py b/tests/test_pipeline_cluster.py index 98efaeab..3a5528d4 100644 --- a/tests/test_pipeline_cluster.py +++ b/tests/test_pipeline_cluster.py @@ -1,75 +1,69 @@ """Unit tests for cgatcore.pipeline.cluster.py""" -import unittest -import os -import shutil -import tempfile -import time -import collections +import pytest import cgatcore.pipeline.cluster as cluster -class TestSlurmValueParsing(unittest.TestCase): +def check(data, expected): + """Helper function to perform Slurm value parsing and check the output.""" + resource_usage = cluster.JobInfo(1, {}) + c = cluster.SlurmCluster.parse_accounting_data(data, resource_usage) + assert c[0].resourceUsage == expected - def check(self, data, expected): - resource_usage = cluster.JobInfo(1, {}) - c = cluster.SlurmCluster.parse_accounting_data(data, resource_usage) - self.assertEqual(c[0].resourceUsage, expected) - def test_parsing_short_job(self): +def test_parsing_short_job(): + sacct = ["host1|267088|2019-02-27T00:39:42|2019-02-27T02:58:15|" + "2019-02-27T05:05:32|1|0:0|7637|7637|01:54:55|01:11.853|||||||||0:0|", + "host1|267088.batch|2019-02-27T02:58:15|2019-02-27T02:58:15|" + "2019-02-27T05:05:32|1|0:0|7637|7637|01:54:55|01:11.853|" + "0.10M|0.00M|10060K|9292520K|12087040K|150280K|77K|9292520K||77K"] + check(sacct[-1], + {'NodeList': 'host1', + 'JobID': '267088.batch', + 'Submit': 1551236295, + 'Start': 1551236295, + 'End': 1551243932, + 'NCPUS': 1, + 'ExitCode': 0, + 'ElapsedRaw': 7637, + 'CPUTimeRaw': 7637, + 'UserCPU': 6895, + 'SystemCPU': 71, + 'MaxDiskRead': 100000, + 'MaxDiskWrite': 0, + 'AveVMSize': 10060000, + 'AveRSS': 9292520000, + 'MaxRSS': 12087040000, + 'MaxVMSize': 150280000, + 'AvePages': 77000, + 'DerivedExitCode': '', + 'MaxPages': 77000}) - sacct = ["host1|267088|2019-02-27T00:39:42|2019-02-27T02:58:15|" - "2019-02-27T05:05:32|1|0:0|7637|7637|01:54:55|01:11.853|||||||||0:0|", - "host1|267088.batch|2019-02-27T02:58:15|2019-02-27T02:58:15|" - "2019-02-27T05:05:32|1|0:0|7637|7637|01:54:55|01:11.853|" - "0.10M|0.00M|10060K|9292520K|12087040K|150280K|77K|9292520K||77K"] - self.check(sacct[-1], - {'NodeList': 'host1', - 'JobID': '267088.batch', - 'Submit': 1551236295, - 'Start': 1551236295, - 'End': 1551243932, - 'NCPUS': 1, - 'ExitCode': 0, - 'ElapsedRaw': 7637, - 'CPUTimeRaw': 7637, - 'UserCPU': 6895, - 'SystemCPU': 71, - 'MaxDiskRead': 100000, - 'MaxDiskWrite': 0, - 'AveVMSize': 10060000, - 'AveRSS': 9292520000, - 'MaxRSS': 12087040000, - 'MaxVMSize': 150280000, - 'AvePages': 77000, - 'DerivedExitCode': '', - 'MaxPages': 77000}) - def test_parsing_longer_than_24h_job(self): - - sacct = ["host2|267087|2019-02-27T00:39:42|2019-02-27T02:58:08|" - "2019-02-28T04:38:50|1|0:0|92442|92442|1-01:12:52|19:36.307|||||||||0:0|", - "host2|267087.batch|2019-02-27T02:58:08|2019-02-27T02:58:08|" - "2019-02-28T04:38:50|1|0:0|92442|92442|1-01:12:52|19:36.307|" - "0.10M|0.00M|10060K|26253156K|33016300K|150280K|580K|26253156K||580K"] - self.check(sacct[-1], - {'NodeList': 'host2', - 'JobID': '267087.batch', - 'Submit': 1551236288, - 'Start': 1551236288, - 'End': 1551328730, - 'NCPUS': 1, - 'ExitCode': 0, - 'ElapsedRaw': 92442, - 'CPUTimeRaw': 92442, - 'UserCPU': 90772, - 'SystemCPU': 1176, - 'MaxDiskRead': 100000, - 'MaxDiskWrite': 0, - 'AveVMSize': 10060000, - 'AveRSS': 26253156000, - 'MaxRSS': 33016300000, - 'MaxVMSize': 150280000, - 'AvePages': 580000, - 'DerivedExitCode': '', - 'MaxPages': 580000}) +def test_parsing_longer_than_24h_job(): + sacct = ["host2|267087|2019-02-27T00:39:42|2019-02-27T02:58:08|" + "2019-02-28T04:38:50|1|0:0|92442|92442|1-01:12:52|19:36.307|||||||||0:0|", + "host2|267087.batch|2019-02-27T02:58:08|2019-02-27T02:58:08|" + "2019-02-28T04:38:50|1|0:0|92442|92442|1-01:12:52|19:36.307|" + "0.10M|0.00M|10060K|26253156K|33016300K|150280K|580K|26253156K||580K"] + check(sacct[-1], + {'NodeList': 'host2', + 'JobID': '267087.batch', + 'Submit': 1551236288, + 'Start': 1551236288, + 'End': 1551328730, + 'NCPUS': 1, + 'ExitCode': 0, + 'ElapsedRaw': 92442, + 'CPUTimeRaw': 92442, + 'UserCPU': 90772, + 'SystemCPU': 1176, + 'MaxDiskRead': 100000, + 'MaxDiskWrite': 0, + 'AveVMSize': 10060000, + 'AveRSS': 26253156000, + 'MaxRSS': 33016300000, + 'MaxVMSize': 150280000, + 'AvePages': 580000, + 'DerivedExitCode': '', + 'MaxPages': 580000}) diff --git a/tests/test_pipeline_control.py b/tests/test_pipeline_control.py index 206a8299..27832067 100644 --- a/tests/test_pipeline_control.py +++ b/tests/test_pipeline_control.py @@ -1,99 +1,105 @@ """Test cases for the pipeline.control module.""" -import unittest import os import shutil import subprocess - -import cgatcore -import cgatcore.experiment as E +import tempfile +import pytest import cgatcore.pipeline as P import cgatcore.iotools as iotools +import cgatcore.experiment as E ROOT = os.path.abspath(os.path.dirname(__file__)) -class BaseTest(unittest.TestCase): +@pytest.fixture +def work_dir(): + """Fixture to create and clean up a temporary work directory.""" + # Set default value for shared_tmpdir if it is missing + if 'shared_tmpdir' not in P.get_params(): + P.get_params()['shared_tmpdir'] = tempfile.mkdtemp() + temp_dir = P.get_temp_dir(shared=True) + yield temp_dir + shutil.rmtree(temp_dir) - def setUp(self): - self.work_dir = P.get_temp_dir(shared=True) - def tearDown(self): - shutil.rmtree(self.work_dir) +def run_command(statement, work_dir, **kwargs): + """Run a shell command in a specified working directory.""" + print(statement) + proc = subprocess.Popen(statement, shell=True, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, cwd=work_dir, **kwargs) + stdout, stderr = proc.communicate() + stdout = stdout.decode("utf-8") + stderr = stderr.decode("utf-8") + assert proc.returncode == 0, f"stderr = {stderr}" + return proc.returncode, stdout, stderr - def run_command(self, statement, **kwargs): - print(statement) - proc = subprocess.Popen(statement, - shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - cwd=self.work_dir, - **kwargs) - stdout, stderr = proc.communicate() - stdout = stdout.decode("utf-8") - stderr = stderr.decode("utf-8") - self.assertEqual(proc.returncode, 0, msg="stderr = {}".format(stderr)) - return proc.returncode, stdout, stderr +@pytest.fixture +def expected_output_files(): + """Fixture for expected output files.""" + return [f"sample_{x:02}.mean" for x in range(10)] + [f"sample_{x:02}.txt" for x in range(10)] -class TestexecutionControl(BaseTest): - expected_output_files = ["sample_{:02}.mean".format(x) for x in range(10)] +\ - ["sample_{:02}.txt".format(x) for x in range(10)] +def check_files(work_dir, present=None, absent=None): + """Check for the presence and absence of files.""" + if present is None: + present = [] + if absent is None: + absent = [] - def setUp(self): - P.get_parameters() - BaseTest.setUp(self) + for fn in present: + path = os.path.join(work_dir, fn) + assert os.path.exists(path), f"file {fn} does not exist" - def check_files(self, present=[], absent=[]): - for fn in present: - path = os.path.join(self.work_dir, fn) - self.assertTrue(os.path.exists(path), - "file {} does not exist".format(path)) + for fn in absent: + path = os.path.join(work_dir, fn) + assert not os.path.exists(path), f"file {fn} exists but was not expected" - for fn in absent: - path = os.path.join(self.work_dir, fn) - self.assertFalse(os.path.exists(path), - "file {} does exist but not expected".format(path)) - def test_basic_configuration_produces_expected_files(self): +def test_basic_configuration_produces_expected_files(work_dir, expected_output_files): + retval, stdout, stderr = run_command( + f"python {ROOT}/template_pipeline.py make all", work_dir) - retval, stdout, stderr = self.run_command( - "python {}/template_pipeline.py make all".format(ROOT)) + check_files( + work_dir, + present=expected_output_files + ["pipeline.log"], + absent=["shell.log"] + ) - self.check_files( - present=self.expected_output_files + ["pipeline.log"], - absent=["shell.log"]) - def test_shell_log_is_created_in_workdir(self): +def test_shell_log_is_created_in_workdir(work_dir, expected_output_files): + retval, stdout, stderr = run_command( + f"python {ROOT}/template_pipeline.py make all --shell-logfile=shell.log", work_dir) - retval, stdout, stderr = self.run_command( - "python {}/template_pipeline.py make all --shell-logfile=shell.log".format(ROOT)) + check_files( + work_dir, + present=expected_output_files + ["pipeline.log", "shell.log"] + ) - self.check_files( - present=self.expected_output_files + ["pipeline.log", "shell.log"]) - def test_shell_log_is_created_at_absolute_path(self): +def test_shell_log_is_created_at_absolute_path(work_dir, expected_output_files): + shell_file = os.path.join(work_dir, "test_shell", "shell.log") - shell_file = os.path.join(self.work_dir, "test_shell", "shell.log") + retval, stdout, stderr = run_command( + f"python {ROOT}/template_pipeline.py make all --shell-logfile={shell_file}", work_dir) - retval, stdout, stderr = self.run_command( - "python {}/template_pipeline.py make all --shell-logfile={}".format(ROOT, shell_file)) + check_files( + work_dir, + present=expected_output_files, + absent=["shell.log"] + ) - self.check_files( - present=self.expected_output_files, - absent=["shell.log"]) + assert os.path.exists(shell_file) - self.assertTrue(os.path.exists(shell_file)) - def test_logging_can_be_configured_from_file(self): +def test_logging_can_be_configured_from_file(work_dir, expected_output_files): + log_config = os.path.join(work_dir, "logging.yml") - log_config = os.path.join(self.work_dir, "logging.yml") - - with open(log_config, "w") as outf: - outf.write(""" + with open(log_config, "w") as outf: + outf.write(""" version: 1 formatters: default: @@ -127,21 +133,19 @@ def test_logging_can_be_configured_from_file(self): level: DEBUG """) - retval, stdout, stderr = self.run_command( - "python {}/template_pipeline.py make all --log-config-filename={}".format(ROOT, log_config)) - - self.check_files( - present=self.expected_output_files + ["extra.log"], - absent=["pipeline.log", "shell.log"]) - - self.assertFalse( - iotools.is_empty(os.path.join(self.work_dir, "extra.log"))) + retval, stdout, stderr = run_command( + f"python {ROOT}/template_pipeline.py make all --log-config-filename={log_config}", work_dir) - with open(os.path.join(self.work_dir, "extra.log")) as inf: - self.assertTrue("DEBUG" in inf.read()) + check_files( + work_dir, + present=expected_output_files + ["extra.log"], + absent=["pipeline.log", "shell.log"] + ) - self.assertTrue("DEBUG" not in stdout) + extra_log_path = os.path.join(work_dir, "extra.log") + assert not iotools.is_empty(extra_log_path) + with open(extra_log_path) as inf: + assert "DEBUG" in inf.read() -if __name__ == "__main__": - unittest.main() + assert "DEBUG" not in stdout diff --git a/tests/test_pipeline_execution.py b/tests/test_pipeline_execution.py index 5d186b9e..54941f3a 100644 --- a/tests/test_pipeline_execution.py +++ b/tests/test_pipeline_execution.py @@ -1,479 +1,148 @@ """Test cases for the pipeline.execution module.""" +import os import shutil -import unittest -import contextlib -import getpass +import subprocess +import pytest import socket -import os +import getpass +import cgatcore.pipeline as P +import cgatcore.iotools as iotools + try: import paramiko HAVE_PARAMIKO = True except ImportError: - # OSX tests fail with: - # ImportError: dlopen(/usr/local/miniconda/envs/cgat-core/lib/python3.9/site-packages/cryptography/hazmat/bindings/_openssl.abi3.so, 2): Library not loaded: @rpath/libssl.1.1.dylib # noqa - # Referenced from: /usr/local/miniconda/envs/cgat-core/lib/python3.9/site-packages/cryptography/hazmat/bindings/_openssl.abi3.so # noqa - # Reason: image not found - # This seems to be temporary issue, see other projects. https://github.com/dask/distributed/issues/5601 HAVE_PARAMIKO = False -import cgatcore.pipeline as P -import cgatcore.iotools as iotools - - QUEUE_MANAGER = P.get_parameters().get("cluster", {}).get("queue_manager", None) def remote_file_exists(filename, hostname=None, expect=False): - if not HAVE_PARAMIKO: return expect - ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: ssh.connect(hostname, username=getpass.getuser()) - except paramiko.SSHException as ex: - # disable test on VM, key issues. + except (paramiko.SSHException, TimeoutError): return expect - except TimeoutError as ex: - # times out on OS X, localhost - return expect - - stdin, stdout, ssh_stderr = ssh.exec_command("ls -d {}".format(filename)) - out = stdout.read().decode("utf-8") - return out.strip() == filename - - -@contextlib.contextmanager -def run_on_cluster(to_cluster): - if to_cluster: - P.start_session() - try: - yield - finally: - P.close_session() - else: - yield - - -class BaseTest(unittest.TestCase): - - def setUp(self): - # ignore command line arguments for pytest - P.initialize(argv=["test"]) - self.work_dir = P.get_temp_dir(shared=True) - - def tearDown(self): - shutil.rmtree(self.work_dir) - - -class TestExecutionRun(BaseTest): - - def setUp(self): - P.get_parameters() - BaseTest.setUp(self) - - def test_cluster_job_should_run_on_cluster(self): - # note that this task requires to be run on a shared - # drive to work because of outfile using work_dir - # TODO: use as shared temporary directory - outfile = os.path.join(self.work_dir, "out") - with run_on_cluster(True): - P.run( - "hostname > {outfile}".format(outfile=outfile)) - has_cluster = P.will_run_on_cluster({"to_cluster": True}) - - with iotools.open_file(outfile) as outf: - execution_hostname = outf.read().strip() - - hostname = socket.gethostname() - if has_cluster: - self.assertNotEqual(hostname, execution_hostname) - else: - self.assertEqual(hostname, execution_hostname) - - def test_local_job_should_not_run_on_cluster(self): - outfile = os.path.join(self.work_dir, "out") - - P.run( - "hostname > {outfile}".format( - outfile=outfile), - to_cluster=False) - - with iotools.open_file(outfile) as outf: - execution_hostname = outf.read().strip() - - hostname = socket.gethostname() - self.assertEqual(hostname, execution_hostname) - - -class TestExecutionRunLocal(unittest.TestCase): - - test_memory_size = 100000000 - base_memory_size = 3000000000 - to_cluster = False - - # this command runs about 15s for the openssl - # the lsof uses up sys time. - cmd_work = "lsof > /dev/null && openssl speed md5" - - def setUp(self): - BaseTest.setUp(self) - P.get_parameters() - - def test_job_should_fail_with_missing_command(self): - outfile = os.path.join(self.work_dir, "out") - - self.assertRaises(OSError, - P.run, - "unknown_command > {outfile}".format( - outfile=outfile), - to_cluster=self.to_cluster) - - def test_job_should_pass_if_no_error_in_sequence_of_commands(self): - outfile = os.path.join(self.work_dir, "out") - - benchmark_data = P.run( - "ls; ls; ls".format( - outfile=outfile), - to_cluster=self.to_cluster) - self.assertTrue(benchmark_data) - - def test_job_should_fail_if_error_in_sequence_of_commands(self): - outfile = os.path.join(self.work_dir, "out") - - self.assertRaises(OSError, - P.run, - "ls; unknown_command; ls".format( - outfile=outfile), - to_cluster=self.to_cluster) - - def test_job_should_pass_if_no_error_in_pipe_of_commands(self): - outfile = os.path.join(self.work_dir, "out") - - benchmark_data = P.run( - "ls | cat | cat".format( - outfile=outfile), - to_cluster=self.to_cluster) - self.assertTrue(benchmark_data) - - def test_job_should_fail_if_error_in_pipe_of_commands(self): - outfile = os.path.join(self.work_dir, "out") - - self.assertRaises(OSError, - P.run, - "ls | unknown_command | cat".format( - outfile=outfile), - to_cluster=self.to_cluster) - - def test_job_should_pass_if_error_in_pipe_of_commands_but_ignore_pipe_error_set(self): - outfile = os.path.join(self.work_dir, "out") - - benchmark_data = P.run( - "ls | unknown_command | cat".format( - outfile=outfile), - to_cluster=self.to_cluster, - ignore_pipe_errors=True) - self.assertTrue(benchmark_data) - - def test_job_should_fail_with_wrong_arguments(self): - outfile = os.path.join(self.work_dir, "out") - self.assertRaises(OSError, - P.run, - "hostname -z".format( - outfile=outfile), - to_cluster=self.to_cluster) - - def test_job_should_fail_if_too_little_memory_required(self): - - outfile = os.path.join(self.work_dir, "out") - - if P.get_parameters()['os'] == 'Linux': - self.assertRaises( - OSError, - P.run, - "python -c 'import numpy; " - "a = numpy.array(numpy.arange(0, {memory}), numpy.int8); " - "out = open(\"{outfile}\", \"w\"); " - "out.write(str(len(a)) + \"\\n\"); " - "out.close()'".format( - memory=self.test_memory_size, - outfile=outfile), - to_cluster=self.to_cluster, - cluster_memory_ulimit=True, - job_memory="{}G".format( - 0.5 * self.test_memory_size / 10**9)) - else: - pass - - def test_job_should_fail_if_too_little_memory_required_in_second_statement(self): - - outfile = os.path.join(self.work_dir, "out") - infile = "arv=by_id/glon1-4zz18-3cbje7tmr0nitut/study_list.txt" - - if P.get_parameters()['os'] == 'Linux': - self.assertRaises( - OSError, - P.run, - "hostname > {outfile}; " - "python -c 'import numpy; " - "a = numpy.array(numpy.arange(0, {memory}), numpy.int8); " - "out = open(\"{outfile}\", \"w\"); " - "out.write(str(len(a)) + \"\\n\"); " - "out.close()'".format( - memory=self.test_memory_size, - infile=infile, - outfile=outfile), - to_cluster=self.to_cluster, - cluster_memory_ulimit=True, - job_memory="{}G".format( - 0.5 * self.test_memory_size / 10**9)) - else: - pass - - def test_job_should_pass_if_enough_memory_required(self): - outfile = os.path.join(self.work_dir, "out") - benchmark_data = P.run( - "python -c 'from array import array; " - "a = array(\"B\", (1 for x in range(0, {memory}))); " - "out = open(\"{outfile}\", \"w\"); " - "out.write(str(len(a)) + \"\\n\"); " - "out.close()'".format( - memory=self.test_memory_size, - outfile=outfile), - to_cluster=self.to_cluster, - cluster_memory_ulimit=True, - job_memory="{}G".format( - (self.base_memory_size + self.test_memory_size) / 10**9)) - - self.assertTrue(benchmark_data) - - with iotools.open_file(outfile) as outf: - memory_used = int(outf.read().strip()) - - self.assertEqual(memory_used, self.test_memory_size) - - def test_job_should_fail_if_killed(self): - self.assertRaises( - OSError, - P.run, - "kill -9 $$", - to_cluster=self.to_cluster) - - def test_job_should_fail_if_usersignal1(self): - self.assertRaises( - OSError, - P.run, - "kill -SIGUSR1 $$", - to_cluster=self.to_cluster) - - def test_job_should_fail_if_usersignal1(self): - self.assertRaises( - OSError, - P.run, - "kill -SIGUSR2 $$", - to_cluster=self.to_cluster) - - def test_job_should_pass_if_unlimited_memory_required(self): - outfile = os.path.join(self.work_dir, "out") - - benchmark_data = P.run( - "python -c 'from array import array; " - "a = array(\"B\", (1 for x in range(0, {memory}))); " - "out = open(\"{outfile}\", \"w\"); " - "out.write(str(len(a)) + \"\\n\"); " - "out.close()'".format( - memory=self.test_memory_size, - outfile=outfile), - to_cluster=self.to_cluster, - cluster_memory_ulimit=True, - job_memory="unlimited".format()) - self.assertTrue(benchmark_data) - - with iotools.open_file(outfile) as outf: - memory_used = int(outf.read().strip()) - - self.assertEqual(memory_used, self.test_memory_size) - - def test_job_should_write_to_explicit_temp_and_not_clean_up(self): - - outfile = os.path.join(self.work_dir, "out") - tmpfile = P.get_temp_filename(clear=True) - P.run("hostname > {outfile}; " - "echo {tmpfile} > {tmpfile}; " - "cat {tmpfile} >> {outfile}".format( - outfile=outfile, - tmpfile=tmpfile), - to_cluster=False) - - with iotools.open_file(outfile) as outf: - hostname = outf.readline().strip() - tmpfile_read = outf.readline().strip() - - self.assertEqual(tmpfile, - tmpfile_read) - self.assertTrue(self.file_exists(tmpfile, - hostname, - expect=True)) - os.unlink(tmpfile) - - def test_job_should_use_TMPDIR_and_clean_up(self): - - outfile = os.path.join(self.work_dir, "out") - P.run("hostname > {outfile}; " - "echo $TMPDIR > $TMPDIR/tmpfile; " - "cat $TMPDIR/tmpfile >> {outfile}".format( - outfile=outfile), - to_cluster=False) - - with iotools.open_file(outfile) as outf: - hostname = outf.readline().strip() - tmpdir = outf.readline().strip() - - self.assertFalse(self.file_exists( - os.path.join(tmpdir, "tmpfile"), - hostname)) - self.assertFalse(self.file_exists( - tmpdir, - hostname)) - - def test_job_should_use_TMPDIR_and_clean_up_after_fail(self): - - outfile = os.path.join(self.work_dir, "out") - self.assertRaises( - OSError, - P.run, - "hostname > {outfile}; " - "echo $TMPDIR >> {outfile}; " - "unknown_command; " - "cat $TMPDIR/tmpfile > {outfile}".format( - outfile=outfile), - to_cluster=False) - - with iotools.open_file(outfile) as outf: - hostname = outf.readline().strip() - tmpdir = outf.readline().strip() - - self.assertFalse(self.file_exists( - os.path.join(tmpdir, "tmpfile"), - hostname)) - self.assertFalse(self.file_exists( - tmpdir, - hostname)) - - def file_exists(self, filename, hostname, expect=False): - return os.path.exists(filename) - - def validate_benchmark_data(self, data, statement): - self.assertGreaterEqual(data.percent_cpu, 0) - self.assertGreaterEqual(data.max_rss, 0) - self.assertGreaterEqual(data.max_vmem, 0) - self.assertEqual(data.slots, 1) - self.assertGreater(len(data.hostname), 0) - self.assertGreater(len(data.task), 0) - self.assertGreaterEqual(data.total_t, 0) - self.assertGreaterEqual(data.wall_t, 0) - self.assertGreaterEqual(data.user_t, 0) - self.assertGreaterEqual(data.sys_t, 0) - self.assertLess(data.start_time, data.end_time) - self.assertLess(data.submission_time, data.end_time) - self.assertEqual(data.statement, statement) - - def test_single_job_returns_runtime_information(self): - - statement = self.cmd_work - benchmark_data = P.run( - statement, - to_cluster=self.to_cluster) - - self.assertIsInstance(benchmark_data, list) - self.assertEqual(len(benchmark_data), 1) - d = benchmark_data.pop() - self.validate_benchmark_data(d, statement) - - def test_multiple_jobs_return_runtime_information(self): - - statements = [self.cmd_work] * 3 - - benchmark_data = P.run( - statements, - to_cluster=self.to_cluster) - - self.assertIsInstance(benchmark_data, list) - self.assertEqual(len(benchmark_data), len(statements)) - - for d, s in zip(benchmark_data, statements): - self.validate_benchmark_data(d, s) - - def test_array_job_returns_runtime_information(self): - - statements = [self.cmd_work] * 3 - - benchmark_data = P.run( - statements, - job_array=True, - to_cluster=self.to_cluster) - - self.assertIsInstance(benchmark_data, list) - self.assertEqual(len(benchmark_data), len(statements)) - - for d, s in zip(benchmark_data, statements): - self.validate_benchmark_data(d, s) - - -@unittest.skipIf(QUEUE_MANAGER is None, "no cluster configured for testing") -class TestExecutionRunCluster(TestExecutionRunLocal): - to_cluster = True - - def setUp(self): - TestExecutionRunLocal.setUp(self) - P.start_session() - - def tearDown(self): - TestExecutionRunLocal.tearDown(self) - P.close_session() - - def file_exists(self, filename, hostname=None, expect=False): - return remote_file_exists(filename, hostname, expect) - - def test_job_should_fail_if_cancelled(self): - - if not P.will_run_on_cluster(P.get_parameters()): - return - - if QUEUE_MANAGER == "slurm": - self.assertRaises( - OSError, - P.run, - "scancel $SLURM_JOB_ID", - to_cluster=self.to_cluster) - elif QUEUE_MANAGER == "sge": - self.assertRaises( - OSError, - P.run, - "qdel $SGE_TASK_ID", - to_cluster=self.to_cluster) - - @unittest.skipIf(QUEUE_MANAGER != "slurm", "test relevant in SLURM only") - def test_job_should_pass_if_memory_bounds_hit_with_io(self): - # slurm issue - memory hit due to I/O buffering and error - # is reported. - outfile = os.path.join(self.work_dir, "out") - benchmark_data = P.run( - "python -c 'from array import array; " - "a = array(\"B\", (1 for x in range(0, {memory}))); " - "numpy.save(\"outfile\", a); " - "'".format( - memory=self.test_memory_size, - outfile=outfile), - to_cluster=self.to_cluster, - cluster_memory_ulimit=False, - job_memory="{}G".format( - (self.base_memory_size + self.test_memory_size) / 10**9)) - - self.assertTrue(benchmark_data) + stdin, stdout, _ = ssh.exec_command(f"ls -d {filename}") + return stdout.read().decode("utf-8").strip() == filename + + +@pytest.fixture +def work_dir(): + """Create and yield a temporary working directory.""" + temp_dir = P.get_temp_dir(shared=True) + yield temp_dir + shutil.rmtree(temp_dir) + + +@pytest.fixture +def expected_output_files(): + """Fixture for expected output files.""" + return [f"sample_{x:02}.mean" for x in range(10)] + [f"sample_{x:02}.txt" for x in range(10)] + + +def validate_benchmark_data(data, statement): + assert data.percent_cpu >= 0 + assert data.max_rss >= 0 + assert data.max_vmem >= 0 + assert data.slots == 1 + assert len(data.hostname) > 0 + assert len(data.task) > 0 + assert data.total_t >= 0 + assert data.wall_t >= 0 + assert data.user_t >= 0 + assert data.sys_t >= 0 + assert data.start_time < data.end_time + assert data.submission_time < data.end_time + assert data.statement == statement + + +@pytest.mark.parametrize("to_cluster", [False, True]) +def test_single_job_returns_runtime_information(to_cluster): + statement = "lsof > /dev/null && openssl speed md5" + benchmark_data = P.run(statement, to_cluster=to_cluster) + assert isinstance(benchmark_data, list) + assert len(benchmark_data) == 1 + validate_benchmark_data(benchmark_data.pop(), statement) + + +def test_multiple_jobs_return_runtime_information(): + statements = ["lsof > /dev/null && openssl speed md5"] * 3 + benchmark_data = P.run(statements, to_cluster=False) + assert isinstance(benchmark_data, list) + assert len(benchmark_data) == len(statements) + for data, stmt in zip(benchmark_data, statements): + validate_benchmark_data(data, stmt) + + +def test_array_job_returns_runtime_information(): + statements = ["lsof > /dev/null && openssl speed md5"] * 3 + benchmark_data = P.run(statements, job_array=True, to_cluster=False) + assert isinstance(benchmark_data, list) + assert len(benchmark_data) == len(statements) + for data, stmt in zip(benchmark_data, statements): + validate_benchmark_data(data, stmt) + + +@pytest.mark.skipif(QUEUE_MANAGER is None, reason="No cluster configured for testing") +def test_job_should_fail_if_cancelled(): + if not P.will_run_on_cluster(P.get_parameters()): + pytest.skip("Test requires cluster execution") + cancel_cmd = "scancel $SLURM_JOB_ID" if QUEUE_MANAGER == "slurm" else "qdel $SGE_TASK_ID" + with pytest.raises(OSError): + P.run(cancel_cmd, to_cluster=True) + + +@pytest.mark.skipif(QUEUE_MANAGER != "slurm", reason="Test relevant only in SLURM") +def test_job_should_pass_if_memory_bounds_hit_with_io(work_dir): + outfile = os.path.join(work_dir, "out") + memory = 100000000 + benchmark_data = P.run( + f"python -c 'from array import array; a = array(\"B\", (1 for x in range({memory}))); numpy.save(\"{outfile}\", a)'", + to_cluster=True, + cluster_memory_ulimit=False, + job_memory=f"{memory / 10**9}G") + assert benchmark_data + + +@pytest.mark.parametrize("to_cluster", [False, True]) +def test_job_should_fail_if_wrong_arguments(to_cluster): + with pytest.raises(OSError): + P.run("hostname -z", to_cluster=to_cluster) + + +@pytest.mark.parametrize("to_cluster", [False, True]) +def test_job_should_pass_if_unlimited_memory_required(to_cluster, work_dir): + outfile = os.path.join(work_dir, "out") + memory = 100000000 + benchmark_data = P.run( + f"python -c 'from array import array; a = array(\"B\", (1 for x in range({memory}))); open(\"{outfile}\", \"w\").write(str(len(a)))'", + to_cluster=to_cluster, + cluster_memory_ulimit=True, + job_memory="unlimited") + assert benchmark_data + with iotools.open_file(outfile) as outf: + memory_used = int(outf.read().strip()) + assert memory_used == memory + + +def test_job_should_fail_if_killed(): + with pytest.raises(OSError): + P.run("kill -9 $$", to_cluster=False) + + +def test_file_should_exist_on_local_or_remote_system(): + filename = "/tmp/test_file" + hostname = socket.gethostname() + with open(filename, 'w') as f: + f.write('Test') + assert remote_file_exists(filename, hostname, expect=True) + os.remove(filename) if __name__ == "__main__": - unittest.main() + pytest.main()