From 57abf05f46b7b5f92b6f1eb15463d2edf4bf9733 Mon Sep 17 00:00:00 2001 From: willtyler Date: Sat, 31 Aug 2024 16:37:36 +0000 Subject: [PATCH] Implement tensorstore afdist in Python --- src/collect_data.py | 32 ++++++++++++++++++++++++++ src/ts_afdist.py | 55 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 87 insertions(+) create mode 100644 src/ts_afdist.py diff --git a/src/collect_data.py b/src/collect_data.py index dc61fe0..445202d 100644 --- a/src/collect_data.py +++ b/src/collect_data.py @@ -23,6 +23,7 @@ # yuck - but simplest way to avoid changing directory structure sys.path.insert(0, "src") +from ts_afdist import ts_afdist from zarr_afdist import zarr_afdist, classify_genotypes_subset_filter, zarr_decode @@ -216,6 +217,30 @@ def run_ts_afdist(path, *, debug=False): return time_cli_command(cmd, debug) +def py_ts_afdist_worker(path, debug, conn): + before = time.time() + df = ts_afdist(path / "call_genotype") + wall_time = time.time() - before + cpu_times = psutil.Process().cpu_times() + conn.send(f"{wall_time} {cpu_times.user} {cpu_times.system}") + + if debug: + print(df) + + +def run_py_ts_afdist(path, *, debug: bool = False): + conn1, conn2 = multiprocessing.Pipe() + p = multiprocessing.Process(target=py_ts_afdist_worker, args=(path, debug, conn2)) + p.start() + value = conn1.recv() + wall_time, user_time, sys_time = map(float, value.split()) + p.join() + if p.exitcode != 0: + raise ValueError() + p.close() + return ProcessTimeResult(wall_time, sys_time, user_time) + + def run_savvy_decode(path, *, debug=False): cmd = ( "/usr/bin/time -f'%S %U' " @@ -438,6 +463,13 @@ def zarr_version(): None, None, ), + Tool( + "ts_py", + ".zarr", + run_py_ts_afdist, + None, + None, + ), ] diff --git a/src/ts_afdist.py b/src/ts_afdist.py new file mode 100644 index 0000000..41b26f5 --- /dev/null +++ b/src/ts_afdist.py @@ -0,0 +1,55 @@ +import numpy as np +import pandas as pd +import tensorstore as ts + +def ts_afdist(path): + path = str(path) + store = ts.open({ + "driver": "zarr", + "kvstore": { + "driver": "file", + "path": path, + }, + }, write=False).result() + variant_count = store.shape[0] + sample_count = store.shape[1] + chunk_shape = store.chunk_layout.read_chunk.shape + variant_chunk_size = chunk_shape[0] + sample_chunk_size = chunk_shape[1] + bin_counts = np.zeros((11,), dtype=int) + + for variant_chunk_start in range(0, variant_count, variant_chunk_size): + variant_chunk_end = min(variant_count, variant_chunk_start + variant_chunk_size) + variant_chunk_len = variant_chunk_end - variant_chunk_start + ref_counts = np.zeros((variant_chunk_len,), dtype=int) + het_counts = np.zeros((variant_chunk_len,), dtype=int) + hom_alt_counts = np.zeros((variant_chunk_len,), dtype=int) + + for sample_chunk_start in range(0, sample_count, sample_chunk_size): + sample_chunk_end = min(sample_count, sample_chunk_start + sample_chunk_size) + + chunk = store[variant_chunk_start:variant_chunk_end, sample_chunk_start:sample_chunk_end].read().result() + a = chunk[:, :, 0] + b = chunk[:, :, 1] + + chunk_ref_counts = ((a == 0).astype(int) + (b == 0).astype(int)).sum(axis=1) + chunk_het_counts = (a != b).sum(axis=1) + chunk_hom_alt_counts = np.logical_and(a == b, a > 0).sum(axis=1) + + np.add(ref_counts, chunk_ref_counts, out=ref_counts) + np.add(het_counts, chunk_het_counts, out=het_counts) + np.add(hom_alt_counts, chunk_hom_alt_counts, out=hom_alt_counts) + + alt_count = 2 * sample_count - ref_counts + alt_freq = alt_count / (2 * sample_count) + het_ref_freq = 2 * alt_freq * (1 - alt_freq) + hom_alt_freq = alt_freq * alt_freq + + bins = np.linspace(0, 1.0, len(bin_counts)) + bins[-1] += 0.0125 + a = np.bincount(np.digitize(het_ref_freq, bins), weights=het_counts, minlength=len(bins)).astype(int) + b = np.bincount(np.digitize(hom_alt_freq, bins), weights=hom_alt_counts, minlength=len(bins)).astype(int) + np.add(bin_counts, a, out=bin_counts) + np.add(bin_counts, b, out=bin_counts) + + return pd.DataFrame({"start": bins[:-1], "stop": bins[1:], "prob dist": bin_counts[1:]})