Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Too many open files .fetch() error with a ThreadPoolExecutor #1326

Open
nebfield opened this issue Jan 24, 2025 · 0 comments
Open

Too many open files .fetch() error with a ThreadPoolExecutor #1326

nebfield opened this issue Jan 24, 2025 · 0 comments

Comments

@nebfield
Copy link

nebfield commented Jan 24, 2025

I'm trying to fetch a lot of records from an indexed VCF, so I thought I'd use a thread pool executor:

import concurrent.futures
import csv
import gzip
import pysam
import pathlib

def read_variants_from_scoring_file(path):
    with gzip.open(path, mode="rt") as f:
        lines = (line for line in f if not line.startswith("#"))
        reader = csv.DictReader(lines, delimiter="\t")
        variants = [
            {
                "chr_name": d["hm_chr"],
                "chr_position": int(d["hm_pos"]) if d["hm_pos"].strip() else None,
            }
            for d in reader
        ]
    return variants


def thread_fetch(*, vcf, chrom, start, stop):
    try:
        region = f"{chrom}:{start}-{stop}"
        results = vcf.fetch(region=region, reopen=True)
    except (ValueError, TypeError):
        results = []

    return results


def test_threaded(variants, vcf):
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = []

        for variant in variants:
            futures.append(
                executor.submit(
                    thread_fetch,
                    vcf=vcf,
                    chrom=variant["chr_name"],
                    start=variant["chr_position"],
                    stop=variant["chr_position"],
                )
            )

        for future in concurrent.futures.as_completed(futures):
            _ = future.result()


scoring_file_path = pathlib.Path("path" / "to" / "PGS001229_hmPOS_GRCh38.txt.gz")
target_genome_path = pathlib.Path("path" / "to"/ "indexed_vcf.gz")
variants = read_variants_from_scoring_file(scoring_file_path)
vcf = pysam.VariantFile(args.target_genome, mode="r")
test_threaded(variants, vcf)

I'm using coordinates from a file in the PGS Catalog for testing on a bgzipped + indexed copy of 1000 Genomes.

I saw in the docs I should be careful to use reopen=True when I call fetch() because of the threaded execution but I get a lot of errors in my terminal:

[E::hts_open_format] Failed to open file <path>: Too many open files

Creating a new file in the worker thread and closing it prevents this error, but I'd expect reopen=True to be OK.

Am I doing something obviously wrong with my thread pool executor approach? Thanks for your time 🚀

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant