Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[query] Various Benchmark Suite Improvements
Browse files Browse the repository at this point in the history
ehigham committed Dec 12, 2024
1 parent 4e259cd commit 624cfbb
Showing 14 changed files with 490 additions and 318 deletions.
56 changes: 28 additions & 28 deletions hail/python/benchmark/conftest.py
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@
def pytest_addoption(parser):
parser.addoption("--log", type=str, help='Log file path', default=None)
parser.addoption("--output", type=str, help="Output file path.", default=None)
parser.addoption("--data-dir", type=str, help="Data directory.", default=None)
parser.addoption("--data-dir", type=str, help="Data directory.", default=os.getenv('HAIL_BENCHMARK_DIR'))
parser.addoption('--iterations', type=int, help='override number of iterations for all benchmarks', default=None)
parser.addoption('--cores', type=int, help='Number of cores to use.', default=1)
parser.addoption(
@@ -23,36 +23,36 @@ def pytest_addoption(parser):
const='cpu',
default=None,
)
parser.addoption('--profiler-path', type=str, help='path to aysnc profiler', default=None)
parser.addoption(
'--max-duration',
type=int,
help='Maximum permitted duration for any benchmark trial in seconds, not to be confused with pytest-timeout',
default=200,
)
parser.addoption('--max-failures', type=int, help='Stop benchmarking item after this many failures', default=3)
parser.addoption(
'--profiler-path', type=str, help='path to aysnc profiler', default=os.getenv('ASYNC_PROFILER_HOME')
)
parser.addoption('--profiler-fmt', choices=['html', 'flame', 'jfr'], help='Choose profiler output.', default='html')


def run_config_from_pytest_config(pytest_config):
return type(
'RunConfig',
(object,),
{
**{
flag: pytest_config.getoption(flag) or default
for flag, default in [
('log', None),
('output', None),
('cores', 1),
('data_dir', os.getenv('HAIL_BENCHMARK_DIR')),
('iterations', None),
('profile', None),
('profiler_path', os.getenv('ASYNC_PROFILER_HOME')),
('profiler_fmt', None),
]
},
'verbose': pytest_config.getoption('verbose') > 0,
'quiet': pytest_config.getoption('verbose') < 0,
'timeout': int(pytest_config.getoption('timeout') or 1800),
},
@pytest.hookimpl
def pytest_configure(config):
init_logging(file=config.getoption('log'))


@pytest.hookimpl(tryfirst=True)
def pytest_collection_modifyitems(config, items):
max_duration = config.getoption('max_duration')

xfail = pytest.mark.xfail(
raises=TimeoutError,
reason=f'Runtime exceeds maximum permitted duration of {max_duration}s',
)

for item in items:
if (xtimeout := item.get_closest_marker('xtimeout')) is None:
continue

@pytest.hookimpl
def pytest_configure(config):
config.run_config = run_config_from_pytest_config(config)
init_logging(file=config.run_config.log)
if len(xtimeout.args) == 0 or (len(xtimeout.args) == 1 and xtimeout.args[0] >= max_duration):
item.add_marker(xfail)
23 changes: 13 additions & 10 deletions hail/python/benchmark/hail/benchmark_combiner.py
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@
import pytest

import hail as hl
from benchmark.tools import benchmark, chunk
from benchmark.tools import chunk
from hail.vds.combiner import combine_variant_datasets, new_combiner, transform_gvcf

COMBINE_GVCF_MAX = 100
@@ -14,7 +14,8 @@ def import_vcf(path):
return hl.import_vcf(str(path), reference_genome='GRCh38', force=True)


@benchmark()
@pytest.mark.benchmark()
@pytest.mark.xfail(raises=hl.utils.java.FatalError, reason='??')
@with_flags(no_ir_logging='1')
def benchmark_compile_2k_merge(empty_gvcf, tmp_path):
vcf = import_vcf(empty_gvcf)
@@ -23,29 +24,30 @@ def benchmark_compile_2k_merge(empty_gvcf, tmp_path):
hl.vds.write_variant_datasets(combined, str(tmp_path / 'combiner-multi-write'), overwrite=True)


@benchmark()
@pytest.mark.benchmark()
@pytest.mark.xtimeout(270)
def benchmark_python_only_10k_transform(empty_gvcf):
for vcf in [import_vcf(empty_gvcf)] * 10_000:
transform_gvcf(vcf, [])


@benchmark()
@pytest.mark.benchmark()
def benchmark_python_only_10k_combine(empty_gvcf):
vcf = import_vcf(empty_gvcf)
mt = transform_gvcf(vcf, [])
for mts in chunk(COMBINE_GVCF_MAX, [mt] * 10_000):
combine_variant_datasets(mts)


@benchmark()
@pytest.mark.benchmark()
def benchmark_import_and_transform_gvcf(single_gvcf):
mt = import_vcf(single_gvcf)
vds = transform_gvcf(mt, [])
vds.reference_data._force_count_rows()
vds.variant_data._force_count_rows()


@benchmark()
@pytest.mark.benchmark()
def benchmark_import_gvcf_force_count(single_gvcf):
mt = import_vcf(single_gvcf)
mt._force_count_rows()
@@ -60,14 +62,15 @@ def tmp_and_output_paths(tmp_path):
return (tmp, output)


@benchmark()
@pytest.mark.benchmark()
@pytest.mark.xtimeout(180)
def benchmark_vds_combiner_chr22(chr22_gvcfs, tmp_and_output_paths):
parts = hl.eval([hl.parse_locus_interval('chr22:start-end', reference_genome='GRCh38')])

tmp, output = tmp_and_output_paths
combiner = new_combiner(
output_path=str(tmp_and_output_paths[0]),
output_path=str(output),
intervals=parts,
temp_path=str(tmp_and_output_paths[1]),
temp_path=str(tmp),
gvcf_paths=[str(path) for path in chr22_gvcfs],
reference_genome='GRCh38',
branch_factor=16,
28 changes: 17 additions & 11 deletions hail/python/benchmark/hail/benchmark_linalg.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,52 @@
import pytest

import hail as hl
from benchmark.tools import benchmark


@benchmark()
@pytest.mark.benchmark()
@pytest.mark.xtimeout
def benchmark_block_matrix_nested_multiply(tmp_path):
bm = hl.linalg.BlockMatrix.random(8 * 1024, 8 * 1024)
bm = bm.checkpoint(str(tmp_path / 'checkpoint.mt'))
bm = (bm @ bm) @ bm @ bm @ (bm @ bm)
bm.write(str(tmp_path / 'result.mt'), overwrite=True)


@benchmark()
@pytest.mark.benchmark()
def benchmark_make_ndarray():
ht = hl.utils.range_table(200_000)
ht = ht.annotate(x=hl.nd.array(hl.range(ht.idx)))
ht._force_count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_ndarray_addition():
arr = hl.nd.ones((1024, 1024))
hl.eval(arr + arr)


@benchmark()
@pytest.mark.benchmark()
def benchmark_ndarray_matmul_int64():
arr = hl.nd.arange(1024 * 1024).map(hl.int64).reshape((1024, 1024))
hl.eval(arr @ arr)


@benchmark()
@pytest.mark.benchmark()
def benchmark_ndarray_matmul_float64():
arr = hl.nd.arange(1024 * 1024).map(hl.float64).reshape((1024, 1024))
hl.eval(arr @ arr)


@benchmark()
@pytest.mark.benchmark()
@pytest.mark.xtimeout(200)
def benchmark_blockmatrix_write_from_entry_expr_range_mt(tmp_path):
mt = hl.utils.range_matrix_table(40_000, 40_000, n_partitions=4)
path = str(tmp_path / 'result.bm')
hl.linalg.BlockMatrix.write_from_entry_expr(mt.row_idx + mt.col_idx, path)


@benchmark()
@pytest.mark.benchmark()
@pytest.mark.xtimeout(700)
def benchmark_blockmatrix_write_from_entry_expr_range_mt_standardize(tmp_path):
mt = hl.utils.range_matrix_table(40_000, 40_000, n_partitions=4)
path = str(tmp_path / 'result.bm')
@@ -51,20 +55,22 @@ def benchmark_blockmatrix_write_from_entry_expr_range_mt_standardize(tmp_path):
)


@benchmark()
@pytest.mark.benchmark()
def benchmark_sum_table_of_ndarrays():
ht = hl.utils.range_table(400).annotate(nd=hl.nd.ones((4096, 4096)))
ht.aggregate(hl.agg.ndarray_sum(ht.nd))


@benchmark()
@pytest.mark.benchmark()
@pytest.mark.xtimeout(250)
def benchmark_block_matrix_to_matrix_table_row_major():
mt = hl.utils.range_matrix_table(20_000, 20_000, n_partitions=4)
bm = hl.linalg.BlockMatrix.from_entry_expr(mt.row_idx + mt.col_idx)
bm.to_matrix_table_row_major()._force_count_rows()


@benchmark()
@pytest.mark.benchmark()
@pytest.mark.xtimeout
def benchmark_king(tmp_path):
mt = hl.balding_nichols_model(6, n_variants=10000, n_samples=4096)
path = str(tmp_path / 'result.mt')
104 changes: 55 additions & 49 deletions hail/python/benchmark/hail/benchmark_matrix_table.py
Original file line number Diff line number Diff line change
@@ -1,126 +1,126 @@
import pytest

import hail as hl
from benchmark.tools import benchmark


@benchmark()
@pytest.mark.benchmark()
def benchmark_matrix_table_decode_and_count(profile25_mt):
mt = hl.read_matrix_table(str(profile25_mt))
mt._force_count_rows()


@benchmark()
@pytest.mark.benchmark()
def benchmark_matrix_table_decode_and_count_just_gt(profile25_mt):
mt = hl.read_matrix_table(str(profile25_mt)).select_entries('GT')
mt._force_count_rows()


@benchmark()
@pytest.mark.benchmark()
def benchmark_matrix_table_array_arithmetic(profile25_mt):
mt = hl.read_matrix_table(str(profile25_mt))
mt = mt.filter_rows(mt.alleles.length() == 2)
mt.select_entries(dosage=hl.pl_dosage(mt.PL)).select_rows()._force_count_rows()


@benchmark()
@pytest.mark.benchmark()
def benchmark_matrix_table_entries_table(profile25_mt):
mt = hl.read_matrix_table(str(profile25_mt))
mt.entries()._force_count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_matrix_table_entries_table_no_key(profile25_mt):
mt = hl.read_matrix_table(str(profile25_mt)).key_rows_by().key_cols_by()
mt.entries()._force_count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_matrix_table_rows_force_count(profile25_mt):
ht = hl.read_matrix_table(str(profile25_mt)).rows().key_by()
ht._force_count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_matrix_table_show(profile25_mt):
mt = hl.read_matrix_table(str(profile25_mt))
mt.show(100)


@benchmark()
@pytest.mark.benchmark()
def benchmark_matrix_table_rows_show(profile25_mt):
mt = hl.read_matrix_table(str(profile25_mt))
mt.rows().show(100)


@benchmark()
@pytest.mark.benchmark()
def benchmark_matrix_table_cols_show(profile25_mt):
mt = hl.read_matrix_table(str(profile25_mt))
mt.cols().show(100)


@benchmark()
@pytest.mark.benchmark()
def benchmark_matrix_table_take_entry(profile25_mt):
mt = hl.read_matrix_table(str(profile25_mt))
mt.GT.take(100)


@benchmark()
@pytest.mark.benchmark()
def benchmark_matrix_table_entries_show(profile25_mt):
mt = hl.read_matrix_table(str(profile25_mt))
mt.entries().show()


@benchmark()
@pytest.mark.benchmark()
def benchmark_matrix_table_take_row(profile25_mt):
mt = hl.read_matrix_table(str(profile25_mt))
mt.info.AF.take(100)


@benchmark()
@pytest.mark.benchmark()
def benchmark_matrix_table_take_col(profile25_mt):
mt = hl.read_matrix_table(str(profile25_mt))
mt.s.take(100)


@benchmark()
@pytest.mark.benchmark()
def benchmark_write_range_matrix_table_p100(tmp_path):
mt = hl.utils.range_matrix_table(n_rows=1_000_000, n_cols=10, n_partitions=100)
mt = mt.annotate_entries(x=mt.col_idx + mt.row_idx)
mt.write(str(tmp_path / 'tmp.mt'))


@benchmark()
@pytest.mark.benchmark()
def benchmark_write_profile_mt(profile25_mt, tmp_path):
hl.read_matrix_table(str(profile25_mt)).write(str(tmp_path / 'tmp.mt'))


@benchmark()
@pytest.mark.benchmark()
def benchmark_matrix_table_rows_is_transition(profile25_mt):
ht = hl.read_matrix_table(str(profile25_mt)).rows().key_by()
ht.select(is_snp=hl.is_snp(ht.alleles[0], ht.alleles[1]))._force_count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_matrix_table_filter_entries(profile25_mt):
mt = hl.read_matrix_table(str(profile25_mt))
mt.filter_entries((mt.GQ > 8) & (mt.DP > 2))._force_count_rows()


@benchmark()
@pytest.mark.benchmark()
def benchmark_matrix_table_filter_entries_unfilter(profile25_mt):
mt = hl.read_matrix_table(str(profile25_mt))
mt.filter_entries((mt.GQ > 8) & (mt.DP > 2)).unfilter_entries()._force_count_rows()


@benchmark()
@pytest.mark.benchmark()
@pytest.mark.xtimeout
def benchmark_matrix_table_nested_annotate_rows_annotate_entries(profile25_mt):
mt = hl.read_matrix_table(str(profile25_mt))
mt = mt.annotate_rows(r0=mt.info.AF[0] + 1)
mt = mt.annotate_entries(e0=mt.GQ + 5)
for i in range(1, 20):
mt = mt.annotate_rows(**{f'r{i}': mt[f'r{i-1}'] + 1})
mt = mt.annotate_entries(**{f'e{i}': mt[f'e{i-1}'] + 1})
mt = mt.annotate_rows(**{f'r{i}': mt[f'r{i - 1}'] + 1})
mt = mt.annotate_entries(**{f'e{i}': mt[f'e{i - 1}'] + 1})
mt._force_count_rows()


@@ -163,36 +163,35 @@ def many_aggs(mt):
return {f'x{i}': expr for i, expr in enumerate(aggs)}


@benchmark()
@pytest.mark.benchmark()
def benchmark_matrix_table_many_aggs_row_wise(profile25_mt):
mt = hl.read_matrix_table(str(profile25_mt))
mt = mt.annotate_rows(**many_aggs(mt))
mt.rows()._force_count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_matrix_table_many_aggs_col_wise(profile25_mt):
mt = hl.read_matrix_table(str(profile25_mt))
mt = mt.annotate_cols(**many_aggs(mt))
mt.cols()._force_count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_matrix_table_aggregate_entries(profile25_mt):
mt = hl.read_matrix_table(str(profile25_mt))
mt.aggregate_entries(hl.agg.stats(mt.GQ))


@benchmark()
@pytest.mark.benchmark()
def benchmark_matrix_table_call_stats_star_star(profile25_mt):
mt = hl.read_matrix_table(str(profile25_mt))
mt.annotate_rows(**hl.agg.call_stats(mt.GT, mt.alleles))._force_count_rows()


@benchmark()
@pytest.mark.skip(reason='never finishes')
def benchmark_gnomad_coverage_stats(profile25_mt):
mt = hl.read_matrix_table(str(profile25_mt))
@pytest.mark.benchmark()
def benchmark_gnomad_coverage_stats(gnomad_dp_sim):
mt = hl.read_matrix_table(str(gnomad_dp_sim))

def get_coverage_expr(mt):
cov_arrays = hl.literal({
@@ -217,12 +216,16 @@ def get_coverage_expr(mt):
),
)

mt = mt.annotate_rows(mean=hl.agg.mean(mt.x), median=hl.median(hl.agg.collect(mt.x)), **get_coverage_expr(mt))
mt = mt.annotate_rows(
mean=hl.agg.mean(mt.x),
median=hl.median(hl.agg.collect(mt.x)),
**get_coverage_expr(mt),
)
mt.rows()._force_count()


@benchmark()
def gnomad_coverage_stats_optimized(gnomad_dp_sim):
@pytest.mark.benchmark()
def benchmark_gnomad_coverage_stats_optimized(gnomad_dp_sim):
mt = hl.read_matrix_table(str(gnomad_dp_sim))
mt = mt.annotate_rows(
mean=hl.agg.mean(mt.x),
@@ -238,72 +241,74 @@ def gnomad_coverage_stats_optimized(gnomad_dp_sim):
mt.rows()._force_count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_per_row_stats_star_star(gnomad_dp_sim):
mt = hl.read_matrix_table(str(gnomad_dp_sim))
mt.annotate_rows(**hl.agg.stats(mt.x))._force_count_rows()


@benchmark()
@pytest.mark.benchmark()
def benchmark_read_decode_gnomad_coverage(gnomad_dp_sim):
hl.read_matrix_table(str(gnomad_dp_sim))._force_count_rows()


@benchmark()
@pytest.mark.benchmark()
def benchmark_import_bgen_force_count_just_gp(sim_ukb_bgen, sim_ukb_sample):
mt = hl.import_bgen(str(sim_ukb_bgen), sample_file=str(sim_ukb_sample), entry_fields=['GP'], n_partitions=8)
mt._force_count_rows()


@benchmark()
@pytest.mark.benchmark()
def benchmark_import_bgen_force_count_all(sim_ukb_bgen, sim_ukb_sample):
mt = hl.import_bgen(
str(sim_ukb_bgen), sample_file=str(sim_ukb_sample), entry_fields=['GT', 'GP', 'dosage'], n_partitions=8
)
mt._force_count_rows()


@benchmark()
@pytest.mark.benchmark()
@pytest.mark.xtimeout(180)
def benchmark_import_bgen_info_score(sim_ukb_bgen, sim_ukb_sample):
mt = hl.import_bgen(str(sim_ukb_bgen), sample_file=str(sim_ukb_sample), entry_fields=['GP'], n_partitions=8)
mt = mt.annotate_rows(info_score=hl.agg.info_score(mt.GP))
mt.rows().select('info_score')._force_count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_import_bgen_filter_count(sim_ukb_bgen, sim_ukb_sample):
mt = hl.import_bgen(str(sim_ukb_bgen), sample_file=str(sim_ukb_sample), entry_fields=['GT', 'GP'], n_partitions=8)
mt = mt.filter_rows(mt.alleles == ['A', 'T'])
mt._force_count_rows()


@benchmark()
@pytest.mark.benchmark()
def benchmark_export_range_matrix_table_entry_field_p100(tmp_path):
mt = hl.utils.range_matrix_table(n_rows=1_000_000, n_cols=10, n_partitions=100)
mt = mt.annotate_entries(x=mt.col_idx + mt.row_idx)
mt.x.export(str(tmp_path / 'result.txt'))


@benchmark()
@pytest.mark.benchmark()
def benchmark_export_range_matrix_table_row_p100(tmp_path):
mt = hl.utils.range_matrix_table(n_rows=1_000_000, n_cols=10, n_partitions=100)
mt.row.export(str(tmp_path / 'result.txt'))


@benchmark()
@pytest.mark.benchmark()
def benchmark_export_range_matrix_table_col_p100(tmp_path):
mt = hl.utils.range_matrix_table(n_rows=1_000_000, n_cols=10, n_partitions=100)
mt.col.export(str(tmp_path / 'result.txt'))


@benchmark()
@pytest.mark.benchmark()
@pytest.mark.xtimeout(250)
def benchmark_large_range_matrix_table_sum():
mt = hl.utils.range_matrix_table(n_cols=500000, n_rows=10000, n_partitions=2500)
mt = mt.annotate_entries(x=mt.col_idx + mt.row_idx)
mt.annotate_cols(foo=hl.agg.sum(mt.x))._force_count_cols()


@benchmark()
@pytest.mark.benchmark()
def benchmark_kyle_sex_specific_qc(profile25_mt):
mt = hl.read_matrix_table(str(profile25_mt))
mt = mt.annotate_cols(sex=hl.if_else(hl.rand_bool(0.5), 'Male', 'Female'))
@@ -344,35 +349,36 @@ def benchmark_kyle_sex_specific_qc(profile25_mt):
mt.rows()._force_count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_matrix_table_scan_count_rows_2():
mt = hl.utils.range_matrix_table(n_rows=200_000_000, n_cols=10, n_partitions=16)
mt = mt.annotate_rows(x=hl.scan.count())
mt._force_count_rows()


@benchmark()
@pytest.mark.benchmark()
def benchmark_matrix_table_scan_count_cols_2():
mt = hl.utils.range_matrix_table(n_cols=10_000_000, n_rows=10)
mt = mt.annotate_cols(x=hl.scan.count())
mt._force_count_rows()


@benchmark()
@pytest.mark.benchmark()
@pytest.mark.xtimeout
def benchmark_matrix_multi_write_nothing(tmp_path):
mt = hl.utils.range_matrix_table(1, 1, n_partitions=1)
mts = [mt] * 1000
hl.experimental.write_matrix_tables(mts, str(tmp_path / 'multi-write'))


@benchmark()
@pytest.mark.benchmark()
def benchmark_mt_localize_and_collect(profile25_mt):
mt = hl.read_matrix_table(str(profile25_mt))
ht = mt.localize_entries("ent")
ht.head(150).collect()


@benchmark()
@pytest.mark.benchmark()
def benchmark_mt_group_by_memory_usage(random_doubles_mt):
mt = hl.read_matrix_table(str(random_doubles_mt))
mt = mt.group_rows_by(new_idx=mt.row_idx % 3).aggregate(x=hl.agg.mean(mt.x))
52 changes: 28 additions & 24 deletions hail/python/benchmark/hail/benchmark_methods.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,45 @@
import pytest

import hail as hl
from benchmark.tools import benchmark


@benchmark()
@pytest.mark.benchmark()
def benchmark_import_vcf_write(profile25_vcf, tmp_path):
mt = hl.import_vcf(str(profile25_vcf))
out = str(tmp_path / 'out.mt')
mt.write(out)


@benchmark()
@pytest.mark.benchmark()
def benchmark_import_vcf_count_rows(profile25_vcf):
mt = hl.import_vcf(str(profile25_vcf))
mt.count_rows()


@benchmark()
@pytest.mark.benchmark()
def benchmark_export_vcf(profile25_mt, tmp_path):
mt = hl.read_matrix_table(str(profile25_mt))
out = str(tmp_path / 'out.vcf.bgz')
hl.export_vcf(mt, out)


@benchmark()
@pytest.mark.benchmark()
def benchmark_sample_qc(profile25_mt):
hl.sample_qc(hl.read_matrix_table(str(profile25_mt))).cols()._force_count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_variant_qc(profile25_mt):
hl.variant_qc(hl.read_matrix_table(str(profile25_mt))).rows()._force_count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_variant_and_sample_qc(profile25_mt):
mt = hl.read_matrix_table(str(profile25_mt))
hl.sample_qc(hl.variant_qc(mt))._force_count_rows()


@benchmark()
@pytest.mark.benchmark()
def benchmark_variant_and_sample_qc_nested_with_filters_2(profile25_mt):
mt = hl.read_matrix_table(str(profile25_mt))
mt = hl.variant_qc(mt)
@@ -52,7 +53,7 @@ def benchmark_variant_and_sample_qc_nested_with_filters_2(profile25_mt):
mt.count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_variant_and_sample_qc_nested_with_filters_4(profile25_mt):
mt = hl.read_matrix_table(str(profile25_mt))
mt = hl.variant_qc(mt)
@@ -74,7 +75,7 @@ def benchmark_variant_and_sample_qc_nested_with_filters_4(profile25_mt):
mt.count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_variant_and_sample_qc_nested_with_filters_4_counts(profile25_mt):
mt = hl.read_matrix_table(str(profile25_mt))
mt = hl.variant_qc(mt)
@@ -99,40 +100,40 @@ def benchmark_variant_and_sample_qc_nested_with_filters_4_counts(profile25_mt):
mt.count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_hwe_normalized_pca(profile25_mt):
mt = hl.read_matrix_table(str(profile25_mt))
mt = mt.filter_rows(mt.info.AF[0] > 0.01)
hl.hwe_normalized_pca(mt.GT)


@benchmark()
@pytest.mark.benchmark()
def benchmark_hwe_normalized_pca_blanczos_small_data_0_iterations(profile25_mt):
mt = hl.read_matrix_table(str(profile25_mt))
mt = mt.filter_rows(mt.info.AF[0] > 0.01)
hl._hwe_normalized_blanczos(mt.GT, q_iterations=0)


@benchmark()
@pytest.mark.benchmark()
def benchmark_hwe_normalized_pca_blanczos_small_data_10_iterations(profile25_mt):
mt = hl.read_matrix_table(str(profile25_mt))
mt = mt.filter_rows(mt.info.AF[0] > 0.01)
hl._hwe_normalized_blanczos(mt.GT, q_iterations=10)


@benchmark()
@pytest.mark.benchmark()
def benchmark_split_multi_hts(profile25_mt):
mt = hl.read_matrix_table(str(profile25_mt))
hl.split_multi_hts(mt)._force_count_rows()


@benchmark()
@pytest.mark.benchmark()
def benchmark_split_multi(profile25_mt):
mt = hl.read_matrix_table(str(profile25_mt))
hl.split_multi(mt)._force_count_rows()


@benchmark()
@pytest.mark.benchmark()
def benchmark_concordance(profile25_mt):
mt = hl.read_matrix_table(str(profile25_mt))
mt = mt.filter_rows(mt.alleles.length() == 2)
@@ -141,7 +142,7 @@ def benchmark_concordance(profile25_mt):
c._force_count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_genetics_pipeline(profile25_mt, tmp_path):
mt = hl.read_matrix_table(str(profile25_mt))
mt = hl.split_multi_hts(mt)
@@ -153,30 +154,32 @@ def benchmark_genetics_pipeline(profile25_mt, tmp_path):
mt.write(str(tmp_path / 'genetics_pipeline.mt'))


@benchmark()
@pytest.mark.benchmark()
@pytest.mark.xtimeout(480)
def benchmark_ld_prune_profile_25(profile25_mt):
mt = hl.read_matrix_table(str(profile25_mt))
mt = mt.filter_rows(hl.len(mt.alleles) == 2)
hl.ld_prune(mt.GT)._force_count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_pc_relate(profile25_mt):
mt = hl.read_matrix_table(str(profile25_mt))
mt = mt.annotate_cols(scores=hl.range(2).map(lambda x: hl.rand_unif(0, 1)))
rel = hl.pc_relate(mt.GT, 0.05, scores_expr=mt.scores, statistics='kin', min_kinship=0.05)
rel._force_count()


@benchmark()
@pytest.mark.benchmark()
@pytest.mark.xtimeout(320)
def benchmark_pc_relate_5k_5k(balding_nichols_5k_5k):
mt = hl.read_matrix_table(str(balding_nichols_5k_5k))
mt = mt.annotate_cols(scores=hl.range(2).map(lambda x: hl.rand_unif(0, 1)))
rel = hl.pc_relate(mt.GT, 0.05, scores_expr=mt.scores, statistics='kin', min_kinship=0.05)
rel._force_count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_linear_regression_rows(random_doubles_mt):
mt = hl.read_matrix_table(str(random_doubles_mt))
num_phenos = 100
@@ -191,7 +194,7 @@ def benchmark_linear_regression_rows(random_doubles_mt):
res._force_count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_linear_regression_rows_nd(random_doubles_mt):
mt = hl.read_matrix_table(str(random_doubles_mt))
num_phenos = 100
@@ -206,7 +209,7 @@ def benchmark_linear_regression_rows_nd(random_doubles_mt):
res._force_count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_logistic_regression_rows_wald(random_doubles_mt):
mt = hl.read_matrix_table(str(random_doubles_mt))
mt = mt.head(2000)
@@ -222,7 +225,8 @@ def benchmark_logistic_regression_rows_wald(random_doubles_mt):
res._force_count()


@benchmark()
@pytest.mark.benchmark()
@pytest.mark.xtimeout
def benchmark_logistic_regression_rows_wald_nd(random_doubles_mt):
mt = hl.read_matrix_table(str(random_doubles_mt))
mt = mt.head(2000)
6 changes: 3 additions & 3 deletions hail/python/benchmark/hail/benchmark_sentinel.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import gzip

from benchmark.tools import benchmark
import pytest


@benchmark(iterations=15)
@pytest.mark.benchmark()
def benchmark_sentinel_read_gunzip(many_ints_tsv):
with gzip.open(many_ints_tsv) as f:
for _ in f:
pass


@benchmark(iterations=15)
@pytest.mark.benchmark()
def benchmark_sentinel_cpu_hash_1():
x = 0
for _ in range(10_000):
15 changes: 8 additions & 7 deletions hail/python/benchmark/hail/benchmark_shuffle.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,46 @@
import pytest

import hail as hl
from benchmark.tools import benchmark


@benchmark()
@pytest.mark.benchmark()
def benchmark_shuffle_key_rows_by_mt(profile25_mt):
mt = hl.read_matrix_table(str(profile25_mt))
mt = mt.annotate_rows(reversed_position_locus=hl.struct(contig=mt.locus.contig, position=-mt.locus.position))
mt = mt.key_rows_by(mt.reversed_position_locus)
mt._force_count_rows()


@benchmark()
@pytest.mark.benchmark()
def benchmark_shuffle_order_by_10m_int():
t = hl.utils.range_table(10_000_000, n_partitions=100)
t = t.order_by(-t.idx)
t._force_count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_shuffle_key_rows_by_4096_byte_rows():
mt = hl.utils.range_matrix_table(100_000, (1 << 12) // 4)
mt = mt.annotate_entries(entry=mt.row_idx * mt.col_idx)
mt = mt.key_rows_by(backward_rows_idx=-mt.row_idx)
mt._force_count_rows()


@benchmark()
@pytest.mark.benchmark()
def benchmark_shuffle_key_rows_by_65k_byte_rows():
mt = hl.utils.range_matrix_table(10_000, (1 << 16) // 4)
mt = mt.annotate_entries(entry=mt.row_idx * mt.col_idx)
mt = mt.key_rows_by(backward_rows_idx=-mt.row_idx)
mt._force_count_rows()


@benchmark()
@pytest.mark.benchmark()
def benchmark_shuffle_key_by_aggregate_bad_locality(many_ints_ht):
ht = hl.read_table(str(many_ints_ht))
ht.group_by(x=ht.i0 % 1000).aggregate(c=hl.agg.count(), m=hl.agg.mean(ht.i2))._force_count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_shuffle_key_by_aggregate_good_locality(many_ints_ht):
ht = hl.read_table(str(many_ints_ht))
divisor = 7_500_000 / 51 # should ensure each partition never overflows default buffer size
99 changes: 50 additions & 49 deletions hail/python/benchmark/hail/benchmark_table.py
Original file line number Diff line number Diff line change
@@ -2,69 +2,68 @@

import hail as hl
from benchmark.hail.fixtures import many_partitions_ht
from benchmark.tools import benchmark


@benchmark()
@pytest.mark.benchmark()
def benchmark_table_key_by_shuffle():
n = 1_000_000
ht = hl.utils.range_table(n)
ht = ht.key_by(x=n - ht.idx)
ht._force_count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_table_group_by_aggregate_sorted():
n = 10_000_000
ht = hl.utils.range_table(n)
ht = ht.group_by(x=ht.idx // 1000).aggregate(y=hl.agg.count())
ht._force_count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_table_group_by_aggregate_unsorted():
n = 10_000_000
ht = hl.utils.range_table(n)
ht = ht.group_by(x=(n - ht.idx) // 1000).aggregate(y=hl.agg.count())
ht._force_count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_table_range_force_count():
hl.utils.range_table(100_000_000)._force_count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_table_range_join_1b_1k():
ht1 = hl.utils.range_table(1_000_000_000)
ht2 = hl.utils.range_table(1_000)
ht1.join(ht2, 'inner').count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_table_range_join_1b_1b():
ht1 = hl.utils.range_table(1_000_000_000)
ht2 = hl.utils.range_table(1_000_000_000)
ht1.join(ht2, 'inner').count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_table_python_construction():
n = 100
ht = hl.utils.range_table(100)
for i in range(n):
ht = ht.annotate(**{f'x_{i}': 0})


@benchmark()
@pytest.mark.benchmark()
def benchmark_table_big_aggregate_compilation():
n = 1_000
ht = hl.utils.range_table(1)
expr = tuple([hl.agg.fraction(ht.idx % i == 0) for i in range(n) if i > 0])
ht.aggregate(expr)


@benchmark()
@pytest.mark.benchmark()
def benchmark_table_big_aggregate_compile_and_execute():
n = 1_000
m = 1_000_000
@@ -73,23 +72,23 @@ def benchmark_table_big_aggregate_compile_and_execute():
ht.aggregate(expr)


@benchmark()
@pytest.mark.benchmark()
@pytest.mark.parametrize('m, n', [(1_000_000, 1_000_000), (1_000_000, 1_000)])
def benchmark_table_foreign_key_join(m, n):
ht = hl.utils.range_table(m)
ht2 = hl.utils.range_table(n)
ht.annotate(x=ht2[(m - 1 - ht.idx) % n])._force_count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_table_aggregate_array_sum():
n = 10_000_000
m = 100
ht = hl.utils.range_table(n)
ht.aggregate(hl.agg.array_sum(hl.range(0, m)))


@benchmark()
@pytest.mark.benchmark()
def benchmark_table_annotate_many_flat():
n = 1_000_000
m = 100
@@ -98,7 +97,8 @@ def benchmark_table_annotate_many_flat():
ht._force_count()


@benchmark()
@pytest.mark.benchmark()
@pytest.mark.skip(reason='never finishes')
def benchmark_table_annotate_many_nested_no_dependence():
n = 1_000_000
m = 100
@@ -108,7 +108,8 @@ def benchmark_table_annotate_many_nested_no_dependence():
ht._force_count()


@benchmark()
@pytest.mark.benchmark()
@pytest.mark.skip(reason='never finishes')
def benchmark_table_annotate_many_nested_dependence_constants():
n = 1_000_000
m = 100
@@ -118,7 +119,8 @@ def benchmark_table_annotate_many_nested_dependence_constants():
ht._force_count()


@benchmark()
@pytest.mark.benchmark()
@pytest.mark.skip(reason='never finishes')
def benchmark_table_annotate_many_nested_dependence():
n = 1_000_000
m = 100
@@ -129,37 +131,37 @@ def benchmark_table_annotate_many_nested_dependence():
ht._force_count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_table_read_force_count_ints(many_ints_ht):
ht = hl.read_table(str(many_ints_ht))
ht._force_count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_table_read_force_count_strings(many_strings_ht):
ht = hl.read_table(str(many_strings_ht))
ht._force_count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_table_import_ints(many_ints_tsv):
hl.import_table(
str(many_ints_tsv),
types={'idx': 'int', **{f'i{i}': 'int' for i in range(5)}, **{f'array{i}': 'array<int>' for i in range(2)}},
)._force_count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_table_import_ints_impute(many_ints_tsv):
hl.import_table(str(many_ints_tsv), impute=True)._force_count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_table_import_strings(many_strings_tsv):
hl.import_table(str(many_strings_tsv))._force_count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_table_aggregate_int_stats(many_ints_ht):
ht = hl.read_table(str(many_ints_ht))
ht.aggregate(
@@ -171,20 +173,20 @@ def benchmark_table_aggregate_int_stats(many_ints_ht):
)


@benchmark()
@pytest.mark.benchmark()
def benchmark_table_range_means():
ht = hl.utils.range_table(10_000_000, 16)
ht = ht.annotate(m=hl.mean(hl.range(0, ht.idx % 1111)))
ht._force_count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_table_range_array_range_force_count():
ht = hl.utils.range_table(30).annotate(big_range=hl.range(100_000_000))
ht._force_count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_table_aggregate_approx_cdf(random_doubles_mt):
mt = hl.read_matrix_table(str(random_doubles_mt))
mt.aggregate_entries((
@@ -194,74 +196,74 @@ def benchmark_table_aggregate_approx_cdf(random_doubles_mt):
))


@benchmark()
@pytest.mark.benchmark()
def benchmark_table_aggregate_counter(many_strings_ht):
ht = hl.read_table(str(many_strings_ht))
ht.aggregate(hl.tuple([hl.agg.counter(ht[f'f{i}']) for i in range(8)]))


@benchmark()
@pytest.mark.benchmark()
def benchmark_table_aggregate_take_by_strings(many_strings_ht):
ht = hl.read_table(str(many_strings_ht))
ht.aggregate(hl.tuple([hl.agg.take(ht['f18'], 25, ordering=ht[f'f{i}']) for i in range(18)]))


@benchmark()
@pytest.mark.benchmark()
def benchmark_table_aggregate_downsample_dense(many_ints_ht):
ht = hl.read_table(str(many_ints_ht))
ht.aggregate(tuple([hl.agg.downsample(ht[f'i{i}'], ht['i3'], label=hl.str(ht['i4'])) for i in range(3)]))


@benchmark()
@pytest.mark.benchmark()
def benchmark_table_aggregate_downsample_worst_case():
ht = hl.utils.range_table(250_000_000, 8)
ht.aggregate(hl.agg.downsample(ht.idx, -ht.idx))


@benchmark()
@pytest.mark.benchmark()
@pytest.mark.skip(reason='FIXME: this needs fixtures to accurately measure downsample (rather than randomness')
def benchmark_table_aggregate_downsample_sparse():
ht = hl.utils.range_table(250_000_000, 8)
ht.aggregate(hl.agg.downsample(hl.rand_norm() ** 5, hl.rand_norm() ** 5))


@benchmark()
@pytest.mark.benchmark()
def benchmark_table_aggregate_linreg(many_ints_ht):
ht = hl.read_table(str(many_ints_ht))
ht.aggregate(hl.agg.array_agg(lambda i: hl.agg.linreg(ht.i0 + i, [ht.i1, ht.i2, ht.i3, ht.i4]), hl.range(75)))


@benchmark()
@pytest.mark.benchmark()
def benchmark_table_take(many_strings_ht):
ht = hl.read_table(str(many_strings_ht))
ht.take(100)


@benchmark()
@pytest.mark.benchmark()
def benchmark_table_show(many_strings_ht):
ht = hl.read_table(str(many_strings_ht))
ht.show(100)


@benchmark()
@pytest.mark.benchmark()
def benchmark_table_expr_take(many_strings_ht):
ht = hl.read_table(str(many_strings_ht))
hl.tuple([ht.f1, ht.f2]).take(100)


@benchmark()
@pytest.mark.benchmark()
def benchmark_read_force_count_partitions(many_partitions_ht):
hl.read_table(str(many_partitions_ht))._force_count()


@benchmark()
@pytest.mark.benchmark()
@pytest.mark.parametrize('n,n_partitions', [(10_000_000, 1000), (10_000_000, 100), (10_000_000, 10)])
def benchmark_write_range_table(tmp_path, n, n_partitions):
ht = hl.utils.range_table(n, n_partitions)
ht.write(str(tmp_path / 'tmp.ht'))


@benchmark()
@pytest.mark.benchmark()
@pytest.mark.parametrize('many_partitions_ht', [1_000], indirect=True)
def benchmark_read_with_index(many_partitions_ht):
rows = 10_000_000
@@ -272,74 +274,73 @@ def benchmark_read_with_index(many_partitions_ht):
ht._force_count()


many_partitions_ht1 = many_partitions_ht
many_partitions_ht2 = many_partitions_ht
many_partitions_ht1, many_partitions_ht2 = [many_partitions_ht] * 2


@benchmark()
@pytest.mark.benchmark()
def benchmark_union_partitions_table(many_partitions_ht1, many_partitions_ht2):
ht1 = hl.read_table(str(many_partitions_ht1))
ht2 = hl.read_table(str(many_partitions_ht2))
ht1.union(ht2)._force_count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_join_partitions_table(many_partitions_ht1, many_partitions_ht2):
ht1 = hl.read_table(str(many_partitions_ht1))
ht2 = hl.read_table(str(many_partitions_ht2))
ht1.join(ht2)._force_count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_group_by_collect_per_row(gnomad_dp_sim):
ht = hl.read_matrix_table(str(gnomad_dp_sim)).localize_entries('e', 'c')
ht.group_by(*ht.key).aggregate(value=hl.agg.collect(ht.row_value))._force_count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_group_by_take_rekey(gnomad_dp_sim):
ht = hl.read_matrix_table(str(gnomad_dp_sim)).localize_entries('e', 'c')
ht.group_by(k=hl.int(ht.row_idx / 50)).aggregate(value=hl.agg.take(ht.row_value, 1))._force_count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_table_scan_sum_1k_partitions():
ht = hl.utils.range_table(1000000, n_partitions=1000)
ht = ht.annotate(x=hl.scan.sum(ht.idx))
ht._force_count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_table_scan_prev_non_null():
ht = hl.utils.range_table(100000000, n_partitions=10)
ht = ht.annotate(x=hl.range(0, ht.idx % 25))
ht = ht.annotate(y=hl.scan._prev_nonnull(ht.row))
ht._force_count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_test_map_filter_region_memory():
high_mem_table = hl.utils.range_table(30).naive_coalesce(1).annotate(big_array=hl.zeros(100_000_000))
high_mem_table = high_mem_table.filter(high_mem_table.idx % 2 == 0)
assert high_mem_table._force_count() == 15


@benchmark()
@pytest.mark.benchmark()
def benchmark_test_head_and_tail_region_memory():
high_mem_table = hl.utils.range_table(100).annotate(big_array=hl.zeros(100_000_000))
high_mem_table = high_mem_table.head(30)
high_mem_table._force_count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_test_inner_join_region_memory():
high_mem_table = hl.utils.range_table(30).naive_coalesce(1).annotate(big_array=hl.zeros(50_000_000))
high_mem_table2 = hl.utils.range_table(30).naive_coalesce(1).annotate(big_array=hl.zeros(50_000_000))
joined = high_mem_table.join(high_mem_table2)
joined._force_count()


@benchmark()
@pytest.mark.benchmark()
def benchmark_test_left_join_region_memory():
high_mem_table = hl.utils.range_table(30).naive_coalesce(1).annotate(big_array=hl.zeros(50_000_000))
high_mem_table2 = hl.utils.range_table(30).naive_coalesce(1).annotate(big_array=hl.zeros(50_000_000))
105 changes: 74 additions & 31 deletions hail/python/benchmark/hail/conftest.py
Original file line number Diff line number Diff line change
@@ -8,7 +8,6 @@
from typing import Any, Dict, List, Literal

import pytest
from _pytest.runner import pytest_runtest_protocol as runner_runtest_protocol

import hail as hl
from benchmark.hail.fixtures import (
@@ -66,12 +65,15 @@
chr22_gvcfs,
empty_gvcf,
gnomad_dp_sim,
local_tmpdir,
many_ints_ht,
many_ints_tsv,
many_partitions_ht,
many_strings_ht,
many_strings_tsv,
onekg_chr22,
onethreethree,
onethreetwo,
profile25_mt,
profile25_vcf,
random_doubles_mt,
@@ -85,14 +87,14 @@
get_peak_task_memory,
init_hail_for_benchmarks,
run_with_timeout,
select,
)
from benchmark.tools import maybe, prune
from hail.utils.java import Env


@contextmanager
def init_hail(run_config):
init_hail_for_benchmarks(run_config)
def init_hail(config):
init_hail_for_benchmarks(config)
try:
yield
finally:
@@ -106,22 +108,22 @@ def init_hail(run_config):
end = pytest.StashKey[datetime]()
iteration = pytest.StashKey[int]()
runs = pytest.StashKey[List[Dict[str, Any]]]()
consecutive_fail_count = pytest.StashKey[int]()

# used internally
context = pytest.StashKey[Literal['burn_in', 'benchmark']]()


@pytest.hookimpl(tryfirst=True)
def pytest_runtest_protocol(item, nextitem):
run_config = item.session.config.run_config
runner = item.config.pluginmanager.get_plugin('runner')
# Initialise hail before running every benchmark for two reasons:
# - each benchmark runs in a clean hail session
# - our means of getting max task memory is quite crude (regex on logs)
# and a fresh session provides a new log
with init_hail(run_config):
if run_config.iterations is not None:
with init_hail(item.config):
if (iterations := item.config.getoption('iterations')) is not None:
burn_in_iterations = 0
iterations = run_config.iterations
logging.info(
msg=(
f'Picked up iterations override. Config: '
@@ -131,13 +133,14 @@ def pytest_runtest_protocol(item, nextitem):
)

else:
burn_in_iterations, iterations = select(
['burn_in_iterations', 'iterations'], **(item.get_closest_marker('benchmark').kwargs)
)
params = item.get_closest_marker('benchmark').kwargs
burn_in_iterations = params.get('burn_in_iterations', 1)
iterations = params.get('iterations', 5)

s = item.stash
s[start] = datetime.now(timezone.utc).isoformat()
s[runs] = []
s[consecutive_fail_count] = 0
s[end] = None

logging.info(
@@ -148,50 +151,73 @@ def pytest_runtest_protocol(item, nextitem):
)
)

max_failures = item.config.getoption('max_failures')

s[context] = 'burn_in'
for k in range(burn_in_iterations):
if max_failures and s[consecutive_fail_count] >= max_failures:
break

s[iteration] = k
# `nextitem` is used to determine which fixtures need to be torn-down
# after the test finishes. For example, if `nextitem` is `None`, then
# all fixtures (including session fixtures) will be finalised.
# Since we're invoking this benchmark repeatedly, we want to tear-down
# function/method level fixtures only, leaving module and session
# fixtures in place; `item.parent` is one such `Item` that represents this.
runner_runtest_protocol(item, nextitem=item.parent)
runner.pytest_runtest_protocol(item, nextitem=item.parent)

s[context] = 'benchmark'
total_iterations = burn_in_iterations + iterations
for k in range(burn_in_iterations, total_iterations):
if max_failures and s[consecutive_fail_count] >= max_failures:
break

s[iteration] = k
# on the final iteration, perform the required teardown for the test
is_final_iteration = k == total_iterations - 1
runner_runtest_protocol(item, nextitem=nextitem if is_final_iteration else item.parent)
runner.pytest_runtest_protocol(item, nextitem=nextitem if is_final_iteration else item.parent)

s[end] = datetime.now(timezone.utc).isoformat()

if max_failures and s[consecutive_fail_count] >= max_failures:
logging.error(
msg=(
f'Benchmarking "{item.nodeid}" aborted due to too many '
f'consecutive failures (max={max_failures})'
)
)

# prevent other plugins running that might invoke the benchmark again
return True


@pytest.hookimpl(tryfirst=True)
def pytest_pyfunc_call(pyfuncitem):
with run_with_timeout(
pyfuncitem.config.run_config,
pyfuncitem.config.getoption('max_duration'),
pyfuncitem.obj,
**{arg: pyfuncitem.funcargs[arg] for arg in pyfuncitem._fixtureinfo.argnames},
) as (time, timed_out, traceback):
s = pyfuncitem.stash

is_burn_in = s[context] == 'burn_in'

s[runs].append({
'iteration': s[iteration],
'time': time,
'is_burn_in': is_burn_in,
'timed_out': timed_out,
'failure': traceback,
'task_memory': get_peak_task_memory(Env.hc()._log),
})
if timed_out or traceback is not None:
s[consecutive_fail_count] += 1
else:
s[consecutive_fail_count] = 0

s[runs].append(
prune({
'iteration': s[iteration],
'time': time,
'is_burn_in': is_burn_in,
'timed_out': timed_out,
'failure': maybe(json.dumps, traceback),
'task_memory': get_peak_task_memory(Env.hc()._log),
})
)

logging.info(f'{"(burn in) " if is_burn_in else ""}iteration {s[iteration]}, time: {time}s')

@@ -210,35 +236,48 @@ def open_file_or_stdout(file):

@pytest.hookimpl
def pytest_sessionfinish(session):
if not session.config.option.collectonly:
run_config = session.config.run_config

if hasattr(session, 'items') and len(session.items) > 0 and not session.config.option.collectonly:
meta = {
'uname': platform.uname()._asdict(),
'version': hl.__version__,
**({'batch_id': batch} if (batch := os.getenv('HAIL_BATCH_ID')) else {}),
**({'job_id': job} if (job := os.getenv('HAIL_JOB_ID')) else {}),
**({'trial': trial} if (trial := os.getenv('BENCHMARK_TRIAL_ID')) else {}),
'batch_id': maybe(int, os.getenv('HAIL_BATCH_ID')),
'job_id': maybe(int, os.getenv('HAIL_JOB_ID')),
'attempt_id': os.getenv('HAIL_ATTEMPT_ID'),
'trial': maybe(int, os.getenv('BENCHMARK_TRIAL_ID')),
}

now = datetime.now(timezone.utc).isoformat()
with open_file_or_stdout(run_config.output) as out:
with open_file_or_stdout(session.config.getoption('output')) as out:
for item in session.items:
path, _, name = item.location
json.dump(
{
prune({
'path': path,
'name': name,
**meta,
'start': item.stash[start],
'end': item.stash.get(end, now),
'runs': item.stash[runs],
},
}),
out,
)
out.write('\n')


@pytest.fixture(autouse=True)
def new_query_tmpdir(tmp_path):
# if hl.version() < '0.2.134':
# yield
# else:
backend = hl.current_backend()
local, remote = backend.local_tmpdir, backend.remote_tmpdir
backend.local_tmpdir, backend.remote_tmpdir = [str(tmp_path / f) for f in ('local', 'remote')]
try:
yield
finally:
backend.local_tmpdir, backend.remote_tmpdir = local, remote


# make fixtures discoverable to `pytest --fixtures` as well as all tests
# within benchmark/hail without explict import.
__all__ = [
@@ -296,12 +335,16 @@ def pytest_sessionfinish(session):
'chr22_gvcfs',
'empty_gvcf',
'gnomad_dp_sim',
'local_tmpdir',
'many_ints_ht',
'many_ints_tsv',
'many_partitions_ht',
'many_strings_ht',
'many_strings_tsv',
'new_query_tmpdir',
'onekg_chr22',
'onethreethree',
'onethreetwo',
'profile25_mt',
'profile25_vcf',
'random_doubles_mt',
53 changes: 41 additions & 12 deletions hail/python/benchmark/hail/fixtures.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,65 @@
import logging
import os
import shutil
import subprocess
from pathlib import Path

import pytest

import hail as hl
from benchmark.tools import maybe
from hailtop.utils import async_to_blocking, retry_transient_errors


@pytest.fixture(scope='session')
def resource_dir(request, tmpdir_factory):
run_config = request.config.run_config
if run_config.data_dir is not None:
resource_dir = Path(run_config.data_dir)
if (resource_dir := maybe(Path, request.config.getoption('data_dir'))) is not None:
resource_dir.mkdir(parents=True, exist_ok=True)
else:
resource_dir = tmpdir_factory.mktemp('hail_benchmark_resources')
resource_dir = Path(tmpdir_factory.mktemp('hail_benchmark_resources'))

return resource_dir


gs_curl_root = 'https://storage.googleapis.com/hail-common/benchmark'


def __download(data_dir, filename):
async def __download(data_dir, filename):
url = os.path.join(gs_curl_root, filename)
logging.info(f'downloading: {filename}')
# Note: the below does not work on batch due to docker/ssl problems
# dest = os.path.join(data_dir, filename)
# urlretrieve(url, dest)
subprocess.check_call(['curl', url, '-Lfs', '--output', f'{data_dir / filename}'])
subprocess.check_call(['curl', url, '-Lfs', '-m', '200', '--output', f'{data_dir / filename}'])
logging.info(f'done: {filename}')


def localize(path: Path):
def localize(path: Path) -> Path:
if not path.exists():
path.parent.mkdir(exist_ok=True)
__download(path.parent, path.name)
path.parent.mkdir(parents=True, exist_ok=True)
async_to_blocking(
retry_transient_errors(
__download,
path.parent,
path.name,
)
)

return path


@pytest.fixture(autouse=True, scope='function')
def local_tmpdir(tmp_path):
# if hl.version() < '0.2.134':
# yield
# else:
backend = hl.current_backend()
old = backend.local_tmpdir
backend.local_tmpdir = str(tmp_path)
try:
yield tmp_path
finally:
backend.local_tmpdir = old
shutil.rmtree(tmp_path)


@pytest.fixture(scope='session')
def empty_gvcf(resource_dir):
path = resource_dir / 'empty_gvcf'
@@ -332,3 +351,13 @@ def many_partitions_ht(resource_dir, request):
hl.utils.range_table(10_000_000, n_partitions).write(str(path))

return path


@pytest.fixture(scope='session')
def onethreetwo(resource_dir):
return localize(resource_dir / '0.2.132.jsonl')


@pytest.fixture(scope='session')
def onethreethree(resource_dir):
return localize(resource_dir / '0.2.133.jsonl')
99 changes: 51 additions & 48 deletions hail/python/benchmark/hail/utils.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,39 @@
import contextlib
import glob
import logging
import re
import signal
import timeit
import traceback
from contextlib import contextmanager
from pathlib import Path
from typing import NoReturn

import hail as hl


def init_hail_for_benchmarks(run_config):
def init_hail_for_benchmarks(config):
init_args = {
'master': f'local[{run_config.cores}]',
'quiet': not run_config.verbose,
'log': run_config.log,
'master': f'local[{config.getoption("cores")}]',
'quiet': config.getoption('verbose') < 0,
'log': config.getoption('log'),
}

if run_config.profile is not None:
if run_config.profile_fmt == 'html':
if (profile := config.getoption('profile')) is not None:
if config.getoption('profile_fmt') == 'html':
filetype = 'html'
fmt_arg = 'tree=total'
elif run_config.profile_fmt == 'flame':
elif config.getoption('profile_fmt') == 'flame':
filetype = 'svg'
fmt_arg = 'svg=total'
else:
filetype = 'jfr'
fmt_arg = 'jfr'

prof_args = (
f'-agentpath:{run_config.profiler_path}/build/libasyncProfiler.so=start,'
f'event={run_config.profile},'
f'-agentpath:{config.getoption("profiler_path")}/build/libasyncProfiler.so=start,'
f'event={profile},'
f'{fmt_arg},'
f'file=bench-profile-{run_config.profile}-%t.{filetype},'
f'file=bench-profile-{profile}-%t.{filetype},'
'interval=1ms,'
'framebuf=15000000'
)
@@ -48,50 +50,57 @@ def init_hail_for_benchmarks(run_config):
logging.getLogger('py4j.java_gateway').setLevel(logging.CRITICAL)


__timeout_state = False
# Using a custom exception instead of TimeoutError allows explicit handling
class __Timeout(BaseException):
pass


# https://stackoverflow.com/questions/492519/timeout-on-a-function-call/494273#494273
@contextlib.contextmanager
def timeout_signal(run_config):
global __timeout_state
__timeout_state = False

def handler(signum, frame):
global __timeout_state
__timeout_state = True
@contextmanager
def timeout_signal(duration):
def handler(signum, frame) -> NoReturn:
try:
hl.stop()
init_hail_for_benchmarks(run_config)
except Exception:
traceback.print_exc() # we're fucked.

raise TimeoutError(f'Timed out after {run_config.timeout}s')

signal.signal(signal.SIGALRM, handler)
signal.alarm(run_config.timeout)
signal.siginterrupt(signal.SIGINT, True)
except KeyboardInterrupt:
pass
finally:
raise __Timeout()

restore = signal.signal(signal.SIGALRM, handler)
signal.alarm(duration)
try:
yield
finally:
signal.signal(signal.SIGALRM, restore)
signal.alarm(0)

def no_op(signum, frame):
pass

signal.signal(signal.SIGALRM, no_op)
signal.alarm(0)
# Spark exposes the configuration parameter 'spark.worker.cleanup.enabled'.
# When enabled, spark periodically cleans up temporary files. It's not clear
# how to trigger a clean-up manually or how to configure which directory gets
# used.
def __hack_cleanup_spark_tmpfiles():
for tmpdir in glob.glob('/tmp/blockmgr*/**/*'):
Path(tmpdir).unlink()


@contextmanager
def run_with_timeout(run_config, fn, *args, **kwargs):
with timeout_signal(run_config):
def run_with_timeout(max_duration, fn, *args, **kwargs):
try:
try:
timer = timeit.Timer(lambda: fn(*args, **kwargs)).timeit(1)
yield timer, False, None
except Exception as e:
timed_out = isinstance(e, TimeoutError)
yield (run_config.timeout if timed_out else None, timed_out, traceback.format_exc())
raise e
timer = timeit.Timer(lambda: fn(*args, **kwargs))
with timeout_signal(max_duration):
duration = timer.timeit(1)
except __Timeout:
yield (max_duration, True, traceback.format_exc())
raise TimeoutError(f'Timed out after {max_duration}s')
except Exception:
yield (None, False, traceback.format_exc())
raise

yield duration, False, None
finally:
__hack_cleanup_spark_tmpfiles()


__peak_mem_pattern = re.compile(r'.*TaskReport:.*peakBytes=(\d+),.*')
@@ -100,16 +109,10 @@ def run_with_timeout(run_config, fn, *args, **kwargs):
def get_peak_task_memory(log_path) -> int:
with open(log_path, 'r') as f:
task_peak_bytes = [
int(match.groups()[0])
for line in f
if (match := __peak_mem_pattern.match(line)) is not None #
int(match.groups()[0]) for line in f if (match := __peak_mem_pattern.match(line)) is not None
]

if len(task_peak_bytes) == 0:
return 0

return max(task_peak_bytes)


def select(keys, **kwargs):
return (kwargs.get(k, None) for k in keys)
29 changes: 16 additions & 13 deletions hail/python/benchmark/tools/__init__.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
import functools
import logging
from typing import Any, Callable, Generator, List, Optional, Sequence, TypeVar

import pytest
A = TypeVar('A')


def benchmark(burn_in_iterations=1, iterations=5, batch_jobs=5):
def wrap(benchmark_fn):
@pytest.mark.benchmark(burn_in_iterations=burn_in_iterations, iterations=iterations, batch_jobs=batch_jobs)
@functools.wraps(benchmark_fn)
def runner(*args, **kwargs):
return benchmark_fn(*args, **kwargs)
def chunk(size: int, seq: Sequence[A]) -> Generator[Sequence[A], A, Any]:
for pos in range(0, len(seq), size):
yield seq[pos : pos + size]

return runner

return wrap
B = TypeVar('B')


def chunk(size, seq):
for pos in range(0, len(seq), size):
yield seq[pos : pos + size]
def maybe(f: Callable[[A], B], ma: Optional[A], default: Optional[B] = None) -> Optional[B]:
return f(ma) if ma is not None else default


def prune(kvs: dict) -> dict:
return {k: v for k, v in kvs.items() if v is not None}


def select(keys: List[str], **kwargs) -> List[Optional[Any]]:
return [kwargs.get(k) for k in keys]


def init_logging(file=None):
1 change: 1 addition & 0 deletions hail/python/pytest.ini
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@ markers =
backend: tests that relate only to one or more backend types
cloud: tests that relate only to one or more clouds
benchmark: placeholder for benchmarks
xtimeout: like timeout, only xfails instead of fails
filterwarnings =
error
ignore::UserWarning
138 changes: 105 additions & 33 deletions hail/scripts/benchmark_in_batch.py
Original file line number Diff line number Diff line change
@@ -13,7 +13,7 @@

import benchmark
import pytest
from benchmark.tools import chunk, init_logging
from benchmark.tools import chunk, init_logging, maybe

from hailtop import batch as hb
from hailtop.batch import Batch, Resource
@@ -22,39 +22,56 @@


class CollectBenchmarks:
def __init__(self):
def __init__(self, include: Optional[List[str]], exclude: Optional[List[str]]):
assert not ((include is not None) and (exclude is not None))
self.includes = maybe(set, include)
self.excludes = maybe(set, exclude)
self.items = []

@pytest.hookimpl(trylast=True)
def pytest_collection_modifyitems(self, items):
self.items = [
(item.location[0], item.name, marker.kwargs['batch_jobs'])
(item.nodeid, marker.kwargs.get('instances', 5))
for item in items
if (marker := item.get_closest_marker('benchmark'))
if (
(marker := item.get_closest_marker('benchmark')) is not None
and (self.includes is None or item.nodeid in self.includes)
and (self.excludes is None or item.nodeid not in self.excludes)
and (item.get_closest_marker('skip') is None)
)
]


# https://github.com/pytest-dev/pytest/discussions/2039
def list_benchmarks(include: str, exclude: str) -> List[Tuple[Path, str, int]]:
collect = CollectBenchmarks()
def list_benchmarks(
include: Optional[List[str]],
exclude: Optional[List[str]],
) -> List[Tuple[str, int]]:
collect = CollectBenchmarks(include, exclude)
directory = Path(benchmark.__file__).parent.parent
pytest.main(
[
'-qq',
'--co',
directory,
str(directory),
'-m',
'benchmark',
*(['-k', include] if include is not None else []),
*(['--ignore', exclude] if exclude is not None else []),
],
plugins=[collect],
)
return collect.items

items = collect.items

if include is not None:
diff = set(include) - set([nodeid for nodeid, *_ in items])
if len(diff) != 0:
logging.warning(f'The following includes matched no pytest items {diff}')

return items


def run_list_benchmarks(args: Namespace) -> None:
print(sep='\n', *[f'{path}::{name}' for path, name, _ in list_benchmarks(args.include, args.exclude)])
print(sep='\n', *[nodeid for nodeid, *_ in list_benchmarks(args.include, args.exclude)])


def build_and_push_benchmark_image(hail_dev_image: str, artifact_uri: str, tag: str) -> str:
@@ -88,13 +105,13 @@ def make_test_storage_permissions_job(b: Batch, object_prefix: str, labelled_sha
def make_benchmark_trial(
b: Batch,
env: Dict[str, Optional[str]],
path: Path,
benchmark_name: str,
benchmark_id: str,
trial: int,
iterations: Optional[int],
max_duration: Optional[int],
deps: List[Job], # dont reformat
) -> Job:
j = b.new_job(name=f'{path}::{benchmark_name}-{trial}')
) -> Resource:
j = b.new_job(name=f'{benchmark_id}-{trial}')
j.depends_on(*deps)
j.always_copy_output()

@@ -110,36 +127,45 @@ def make_benchmark_trial(
j.command('mkdir -p benchmark-resources')
j.command(
' '.join([
f"python3 -m pytest '{path}::{benchmark_name}'",
f"python3 -m pytest '{benchmark_id}'",
'-Werror:::hail -Werror:::hailtop -Werror::ResourceWarning',
'--log-cli-level=ERROR',
'-s',
'-r A',
'-vv',
'--instafail',
'--durations=50',
'--timeout=1800',
# pytest keeps 3 test sessions worth of temp files by default.
# some benchmarks generate very large output files which can quickly
# fill the tmpfs and so we'll make pytest always delete tmp files
# immediately when tmp_path fixtures are torn-down.
'--override-ini=tmp_path_retention_count=0',
'--override-ini=tmp_path_retention_policy=failed',
'--override-ini=xfail_strict=False',
f'--output={j.ofile}',
'--data-dir=benchmark-resources',
f'--iterations={iterations}' if iterations is not None else '',
f'--max-duration={max_duration}' if max_duration is not None else '',
])
)

return j.ofile


def read_jsonl(p: Path):
logging.info(f'reading json lines from {p}.')
with p.open(encoding='utf-8') as r:
for line in r:
if len(line) > 1:
yield json.loads(line)
try:
f = p.open(encoding='utf-8')
except IOError as e:
logging.error(e)
else:
logging.info(f'reading json lines from {p}.')
with f:
for line in f:
if len(line) > 1:
try:
yield json.loads(line)
except Exception as e:
logging.error(e)


def combine(output: Resource, files: List[Resource]) -> None:
@@ -151,7 +177,7 @@ def combine(output: Resource, files: List[Resource]) -> None:
logging.info(f'combining {len(files)} files')

jsonl = [line for f in files for line in read_jsonl(Path(f))]
jsonl.sort(key=lambda r: (r['path'], r['name'], r['trial']))
jsonl.sort(key=lambda r: (r['path'], r['name'], r['version'], r['trial']))

logging.info(f'Writing combine output to {output}')
logging.info(f'collected {len(jsonl)} benchmark jobs.')
@@ -192,8 +218,8 @@ def run_submit(args: Namespace) -> None:
output_file = P.join(object_prefix, f'{timestamp}-{labelled_sha}.jsonl')

all_benchmarks = [
(path, name, trial)
for path, name, num_jobs in list_benchmarks(args.include, args.exclude)
(nodeid, trial)
for nodeid, num_jobs in list_benchmarks(args.include, args.exclude)
for trial in range(args.jobs or num_jobs)
]

@@ -235,8 +261,16 @@ def run_submit(args: Namespace) -> None:
b,
args.branch_factor,
[
make_benchmark_trial(b, envvars, path, name, trial, args.iterations, deps=[test_permissions])
for path, name, trial in all_benchmarks
make_benchmark_trial(
b,
envvars,
nodeid,
trial,
args.iterations,
args.max_duration,
deps=[test_permissions],
)
for nodeid, trial in all_benchmarks
],
)

@@ -265,8 +299,21 @@ def nonempty(s: str):
'list',
description='List known hail benchmarks',
)
list_p.add_argument('--include', type=nonempty, help="see pytest -k", default=None)
list_p.add_argument('--exclude', type=nonempty, help='see pytest --ignore', default=None)
group = list_p.add_mutually_exclusive_group(required=False)
group.add_argument(
'--include',
type=nonempty,
help='fully-qualified benchmark name to run',
action='append',
default=None,
)
group.add_argument(
'--exclude',
type=nonempty,
help='fully-qualified benchmark name to skip',
action='append',
default=None,
)
list_p.set_defaults(main=run_list_benchmarks)

combine_p = subparser.add_parser(
@@ -290,16 +337,41 @@ def nonempty(s: str):
submit_p.add_argument('--label', type=nonempty, help='batch job label', default=None)
submit_p.add_argument('--branch-factor', type=int, help='number of benchmarks to combine at a time', default=32)
submit_p.add_argument(
'--iterations', type=int, help='override number of iterations for each benchmark', default=None
'--iterations',
type=int,
help='override number of iterations for each benchmark',
default=None,
)
submit_p.add_argument(
'--jobs', type=int, help='override number of batch jobs created for each benchmark', default=None
'--jobs',
type=int,
help='override number of batch jobs created for each benchmark',
default=None,
)
group = submit_p.add_mutually_exclusive_group(required=False)
group.add_argument(
'--include',
type=nonempty,
help='fully-qualified benchmark name to run. May be specified more than once.',
action='append',
default=None,
)
group.add_argument(
'--exclude',
type=nonempty,
help='fully-qualified benchmark name to skip. May be specified more than once.',
action='append',
default=None,
)
submit_p.add_argument('--include', type=nonempty, help="see pytest -k", default=None)
submit_p.add_argument('--exclude', type=nonempty, help='see pytest --ignore', default=None)
submit_p.add_argument('--lower', action='store_true')
submit_p.add_argument('--lower-only', action='store_true')
submit_p.add_argument('--wait', action='store_true', help='wait for batch to complete')
submit_p.add_argument(
'--max-duration',
type=int,
help='Maximum duration in seconds for a benchmark trial, after which the trial will be interrupted.',
default=None,
)
submit_p.set_defaults(main=run_submit)

args = parser.parse_args()

0 comments on commit 624cfbb

Please sign in to comment.