Skip to content

Commit

Permalink
debugging
Browse files Browse the repository at this point in the history
made changes to ensure that medaka can be converted
added `--force` which will allow dropping of some problematic vcf entries
No longer duplicating some loci/alleles in merge
  • Loading branch information
ACEnglish committed Feb 13, 2025
1 parent 3728a16 commit 4f7c44b
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 16 deletions.
44 changes: 34 additions & 10 deletions tdb/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,19 @@ def make_parquets(samples, out_dir, compression):
return ret


def sample_extract(locus_id, fmt):
def sample_extract(locus_id, fmt, o_alleles, n_alleles):
"""
Given a dict from a vcf record sample, turn them into sample rows
"""
ret = []
view = zip(fmt['GT'], fmt['SD'], fmt['ALLR'],
fmt.get('AM', [None] * len(fmt['GT'])))
gts = [_ for _ in fmt['GT'] if _ is not None]
view = zip(gts, fmt['SD'], fmt['ALLR'],
fmt.get('AM', [None] * len(gts)))
for an, sd, allr, am in view:
if an is None:
continue
# Map allele number to new, deduplicated allele number
an = n_alleles.index(o_alleles[an])
lrl, lru = map(int, allr.split('-'))
ret.append([locus_id, an, sd, lrl, lru, am])
return ret
Expand All @@ -99,15 +102,17 @@ def translate_entry(entry, locus_id, samples):
a dictionary of sample: list of sample rows
"""
locus = [locus_id, entry.chrom, entry.start, entry.stop]
# dict ensures there's no duplicates
filt_alleles = list(dict.fromkeys(filter(lambda x: x not in [None, "."], entry.alleles)))
alleles = [(locus_id, allele_number, len(sequence),
b"" if sequence in [None, "."] else sequence.encode("utf8"))
for allele_number, sequence in enumerate(entry.alleles)]
samples = {sample: sample_extract(locus_id, fmt)
for sample, fmt in zip(samples, entry.samples.values())}
sequence.encode("utf8"))
for allele_number, sequence in enumerate(filt_alleles)]
samples = {sample: sample_extract(locus_id, fmt, entry.alleles, filt_alleles)
for sample, fmt in zip(samples, entry.samples.values())}
return locus, alleles, samples


def convert_buffer(vcf, samples, stats, avail_mem):
def convert_buffer(vcf, samples, stats, avail_mem, seen_loci, force=False):
"""
Converts a number of vcf entries.
Tries to monitor memory to not buffer too many
Expand All @@ -126,7 +131,23 @@ def convert_buffer(vcf, samples, stats, avail_mem):
break

cvt_any = True
cur_l, cur_a, cur_s = translate_entry(entry, stats['locus'], samples)
try:
cur_l, cur_a, cur_s = translate_entry(entry, stats['locus'], samples)
except Exception as e:
logging.warning("Unable to convert %s", str(entry))
logging.warning("Error: %s", str(e))
if not force:
sys.exit(1)
continue
lkey = f'{cur_l[1]}:{cur_l[2]}-{cur_l[3]}'
if lkey in seen_loci:
logging.critical("Locus %s seen more than once! Skipping presumably redundant entries", lkey)
if not force:
sys.exit(1)
continue
else:
seen_loci.add(lkey)

m_buffer['locus'].append(cur_l)
m_buffer['allele'].extend(cur_a)
num_samples = 0
Expand Down Expand Up @@ -179,6 +200,8 @@ def create_main(args):
help="Memory in GB available to buffer reading (%(default)s)")
parser.add_argument("--no-compress", action="store_false",
help="Don't compress the database")
parser.add_argument("--force", action="store_true",
help="Dropping problematic entries without exiting")
parser.add_argument("--debug", action="store_true",
help="Verbose logging")
parser.add_argument("input", metavar="IN",
Expand Down Expand Up @@ -212,8 +235,9 @@ def create_main(args):

tables = make_parquets(samples, args.output, args.no_compress)
logging.info("Converting VCF with %d samples", len(samples))
seen_loci = set()
while True:
cur_tables, cvt_any = convert_buffer(vcf, samples, stats, avail_mem)
cur_tables, cvt_any = convert_buffer(vcf, samples, stats, avail_mem, seen_loci, args.force)
if not cvt_any:
break
logging.info("Writing batch. Row totals %s", stats)
Expand Down
13 changes: 7 additions & 6 deletions tdb/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def join_loci_tables(original_loci, second_loci, compress):

query = f"""
COPY (
SELECT
SELECT DISTINCT
second.locusid AS update_LocusID,
original.locusid AS to_LocusID,
COALESCE(original.chrom, second.chrom) AS chrom,
Expand Down Expand Up @@ -265,11 +265,12 @@ def consolidate_alleles(original_allele, second_allele, new_alleles, compress):
second.LocusID = new_alleles.LocusID
AND second.allele_number = new_alleles.update_allele_number
)
SELECT *
FROM read_parquet('{original_allele}')
UNION ALL
SELECT *
FROM subset_alleles
SELECT DISTINCT *
FROM (
SELECT * FROM read_parquet('{original_allele}')
UNION ALL
SELECT * FROM subset_alleles
)
{do_order}
) TO '{up_allele}' (FORMAT PARQUET{comp});
"""
Expand Down

0 comments on commit 4f7c44b

Please sign in to comment.