Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TASK-6005 - Error loading variants : "Row length 45522 is > 32767" #2449

Merged
merged 9 commits into from
May 17, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.opencb.opencga.app.migrations.v2_12_5.storage;

import org.opencb.opencga.app.migrations.StorageMigrationTool;
import org.opencb.opencga.catalog.migration.Migration;
import org.opencb.opencga.storage.core.variant.VariantStorageEngine;

@Migration(id="add_missing_column_to_phoenix_TASK-6005", description = "Add missing ALLELES column to phoenix #TASK-6005",
version = "2.12.5", domain = Migration.MigrationDomain.STORAGE, date = 20240510
)
public class AddAllelesColumnToPhoenix extends StorageMigrationTool {

@Override
protected void run() throws Exception {
for (String project : getVariantStorageProjects()) {
VariantStorageEngine engine = getVariantStorageEngineByProject(project);
if (engine.getStorageEngineId().equals("hadoop")) {
logger.info("Adding missing columns (if any) for project " + project);
// Using same class for both migrations
Class<?> aClass = Class.forName("org.opencb.opencga.storage.hadoop.variant.migration.v2_3_0.AddMissingColumns");
Runnable runnable = (Runnable) aClass
.getConstructor(Object.class)
.newInstance(engine);
runnable.run();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opencb.opencga.storage.core.StoragePipelineResult;
import org.opencb.opencga.storage.core.exceptions.StorageEngineException;
import org.opencb.opencga.storage.core.metadata.models.StudyMetadata;
import org.opencb.opencga.storage.core.variant.adaptors.VariantQuery;
import org.opencb.opencga.storage.core.variant.adaptors.VariantQueryParam;
import org.opencb.opencga.storage.core.variant.adaptors.iterators.VariantDBIterator;
import org.opencb.opencga.storage.core.variant.io.VariantWriterFactory;
Expand All @@ -39,11 +40,14 @@
public abstract class VariantStorageEngineSVTest extends VariantStorageBaseTest {

protected static StudyMetadata studyMetadata;
protected static StudyMetadata studyMetadata2;
protected static boolean loaded = false;
protected static StoragePipelineResult pipelineResult1;
protected static StoragePipelineResult pipelineResult2;
protected static StoragePipelineResult pipelineResult3;
protected static URI input1;
protected static URI input2;
protected static URI input3;

@Before
public void before() throws Exception {
Expand All @@ -66,12 +70,22 @@ protected void loadFiles() throws Exception {
variantStorageEngine.getOptions().append(VariantStorageOptions.ANNOTATOR_CELLBASE_EXCLUDE.key(), "expression,clinical");
pipelineResult1 = runDefaultETL(input1, variantStorageEngine, studyMetadata, new QueryOptions()
.append(VariantStorageOptions.ANNOTATE.key(), true)
.append(VariantStorageOptions.STATS_CALCULATE.key(), true)
.append(VariantStorageOptions.ASSEMBLY.key(), "grch38")
);
input2 = getResourceUri("variant-test-sv_2.vcf");
pipelineResult2 = runDefaultETL(input2, variantStorageEngine, studyMetadata, new QueryOptions()
.append(VariantStorageOptions.ANNOTATE.key(), true)
.append(VariantStorageOptions.STATS_CALCULATE.key(), true)
.append(VariantStorageOptions.ASSEMBLY.key(), "grch38"));

input3 = getResourceUri("variant-test-sv-large.vcf");
studyMetadata2 = new StudyMetadata(2, "s2");
pipelineResult3 = runDefaultETL(input3, variantStorageEngine, studyMetadata2, new QueryOptions()
.append(VariantStorageOptions.ANNOTATE.key(), true)
.append(VariantStorageOptions.STATS_CALCULATE.key(), true)
.append(VariantStorageOptions.ASSEMBLY.key(), "grch38"));

}

@Test
Expand All @@ -88,7 +102,7 @@ public void checkCount() throws Exception {
+ 1
+ 1 // negative cipos
;
int count = variantStorageEngine.getDBAdaptor().count().first().intValue();
int count = variantStorageEngine.count(new VariantQuery().study(studyMetadata.getName())).first().intValue();
assertEquals(expected, count);
}

Expand Down Expand Up @@ -146,7 +160,10 @@ protected void checkCorrectness(URI file) throws StorageEngineException, NonStan

@Test
public void exportVcf() throws Exception {
variantStorageEngine.exportData(null, VariantWriterFactory.VariantOutputFormat.VCF, null, new Query(VariantQueryParam.UNKNOWN_GENOTYPE.key(), "./."), new QueryOptions(QueryOptions.SORT, true));
variantStorageEngine.exportData(null, VariantWriterFactory.VariantOutputFormat.VCF, null,
new VariantQuery().unknownGenotype("./.").study(studyMetadata.getName()), new QueryOptions(QueryOptions.SORT, true));
variantStorageEngine.exportData(null, VariantWriterFactory.VariantOutputFormat.VCF, null,
new VariantQuery().unknownGenotype("./.").study(studyMetadata2.getName()), new QueryOptions(QueryOptions.SORT, true));
}

protected Map<String, Variant> readVariants(URI input) throws StorageEngineException, NonStandardCompliantSampleField {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -577,9 +577,12 @@ public Scan parseQuery(VariantQueryProjection selectElements, Query query, Query
//// filters.addFilter(keyOnlyFilter);
// scan.addColumn(genomeHelper.getColumnFamily(), VariantPhoenixHelper.VariantColumn.TYPE.bytes());
// }
if (selectElements.getFields().contains(VariantField.TYPE) || !scan.hasFamilies()) {
scan.addColumn(family, VariantColumn.TYPE.bytes());
}

// Alleles must always be included.
scan.addColumn(family, VariantColumn.ALLELES.bytes());
// Because alleles column may be empty, we must still ensure that we get, at least, one result per row.
// Include "type" column, which is never empty.
scan.addColumn(family, VariantColumn.TYPE.bytes());

// if (!columnPrefixes.isEmpty()) {
// MultipleColumnPrefixFilter columnPrefixFilter = new MultipleColumnPrefixFilter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import htsjdk.variant.variantcontext.Allele;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.query.QueryConstants;
Expand All @@ -28,6 +30,7 @@
import org.opencb.biodata.models.variant.Variant;
import org.opencb.biodata.models.variant.VariantBuilder;
import org.opencb.biodata.models.variant.avro.*;
import org.opencb.opencga.storage.hadoop.variant.GenomeHelper;

import java.sql.ResultSet;
import java.sql.SQLException;
Expand All @@ -43,12 +46,15 @@
*/
public class VariantPhoenixKeyFactory {

public static final Integer UINT_SIZE = PUnsignedInt.INSTANCE.getByteSize();
protected static final String SV_ALTERNATE_SEPARATOR = "|";
protected static final String SV_ALTERNATE_SEPARATOR_SPLIT = "\\" + SV_ALTERNATE_SEPARATOR;

public static final Comparator<String> HBASE_KEY_CHROMOSOME_COMPARATOR = (c1, c2) -> Bytes.compareTo(
VariantPhoenixKeyFactory.generateSimpleVariantRowKey(c1, 1, "N", "N"),
VariantPhoenixKeyFactory.generateSimpleVariantRowKey(c2, 1, "N", "N"));
public static final String HASH_PREFIX = "#";
public static final byte[] HASH_PREFIX_BYTES = Bytes.toBytes(HASH_PREFIX);

public static byte[] generateVariantRowKey(String chrom, int position) {
return generateSimpleVariantRowKey(chrom, position, "", "");
Expand All @@ -59,6 +65,26 @@ public static byte[] generateVariantRowKey(Variant var) {
var.getSv());
}

public static byte[] generateVariantRowKey(ResultSet resultSet) {
String chromosome = null;
Integer start = null;
String reference = null;
String alternate = null;
try {
chromosome = resultSet.getString(VariantPhoenixSchema.VariantColumn.CHROMOSOME.column());
start = resultSet.getInt(VariantPhoenixSchema.VariantColumn.POSITION.column());
reference = resultSet.getString(VariantPhoenixSchema.VariantColumn.REFERENCE.column());
alternate = resultSet.getString(VariantPhoenixSchema.VariantColumn.ALTERNATE.column());

return generateVariantRowKey(chromosome, start, null, reference, alternate, null);
} catch (RuntimeException | SQLException e) {
throw new IllegalStateException("Fail to generate row key from Phoenix result set: " + chromosome
+ ':' + start
+ ':' + (reference == null ? "-" : reference)
+ ':' + (alternate == null ? "-" : alternate), e);
}
}

public static byte[] generateVariantRowKey(VariantAnnotation variantAnnotation) {
byte[] bytesRowKey = null;
if (variantAnnotation.getAdditionalAttributes() != null) {
Expand Down Expand Up @@ -97,6 +123,12 @@ public static byte[] generateSimpleVariantRowKey(String chrom, int position, Str
return generateVariantRowKey(chrom, position, null, ref, alt, null);
}

public static boolean mightHashAlleles(Variant variant) {
int size = getSize(variant);
return size > HConstants.MAX_ROW_LENGTH;
}


/**
* Generates a Row key based on Chromosome, start, end (optional), ref and alt. <br>
* <ul>
Expand All @@ -114,16 +146,16 @@ public static byte[] generateSimpleVariantRowKey(String chrom, int position, Str
*/
public static byte[] generateVariantRowKey(String chrom, int start, Integer end, String ref, String alt, StructuralVariation sv) {
chrom = Region.normalizeChromosome(chrom);
int size = PVarchar.INSTANCE.estimateByteSizeFromLength(chrom.length())
+ QueryConstants.SEPARATOR_BYTE_ARRAY.length
+ PUnsignedInt.INSTANCE.getByteSize()
+ PVarchar.INSTANCE.estimateByteSizeFromLength(ref.length());
alt = buildSymbolicAlternate(ref, alt, end, sv);
if (!alt.isEmpty()) {
size += QueryConstants.SEPARATOR_BYTE_ARRAY.length
+ PVarchar.INSTANCE.estimateByteSizeFromLength(alt.length());
int size = getSize(chrom, ref, alt);

if (size > HConstants.MAX_ROW_LENGTH) {
// This is a problem. The row key is too long.
// Use hashCode for reference/alternate/SV fields
ref = hashAllele(ref);
alt = hashAllele(alt);
size = getSize(chrom, ref, alt);
}

byte[] rk = new byte[size];

int offset = 0;
Expand All @@ -143,6 +175,31 @@ public static byte[] generateVariantRowKey(String chrom, int start, Integer end,
return rk;
}

private static int getSize(Variant variant) {
String symbolicAlternate = buildSymbolicAlternate(variant);
return getSize(variant.getChromosome(), variant.getReference(), symbolicAlternate);
}

private static int getSize(String chrom, String ref, String alt) {
int size = PVarchar.INSTANCE.estimateByteSizeFromLength(chrom.length())
+ QueryConstants.SEPARATOR_BYTE_ARRAY.length
+ PUnsignedInt.INSTANCE.getByteSize()
+ PVarchar.INSTANCE.estimateByteSizeFromLength(ref.length());
if (!alt.isEmpty()) {
size += QueryConstants.SEPARATOR_BYTE_ARRAY.length
+ PVarchar.INSTANCE.estimateByteSizeFromLength(alt.length());
}
return size;
}

public static String hashAllele(String ref) {
return HASH_PREFIX + Integer.toString(ref.hashCode());
}

public static String buildAlleles(Variant v) {
return v.getReference() + SV_ALTERNATE_SEPARATOR + buildSymbolicAlternate(v);
}

public static String buildSymbolicAlternate(Variant v) {
return buildSymbolicAlternate(v.getReference(), v.getAlternate(), v.getEnd(), v.getSv());
}
Expand Down Expand Up @@ -215,6 +272,26 @@ public static String extractChrFromVariantRowKey(byte[] variantRowKey, int offse
return (String) PVarchar.INSTANCE.toObject(variantRowKey, offset, chrPosSeparator, PVarchar.INSTANCE);
}

public static Variant extractVariantFromResult(Result result) {
byte[] variantRowKey = result.getRow();

int chrPosSeparator = ArrayUtils.indexOf(variantRowKey, (byte) 0);
int referenceOffset = chrPosSeparator + 1 + UINT_SIZE;
if (variantRowKey.length > (referenceOffset + HASH_PREFIX_BYTES.length)
&& Bytes.equals(variantRowKey, referenceOffset, HASH_PREFIX_BYTES.length,
HASH_PREFIX_BYTES, 0, HASH_PREFIX_BYTES.length)) {
// The reference and alternate are hashed.
// The type and alleles are stored in the result
byte[] type = result.getValue(GenomeHelper.COLUMN_FAMILY_BYTES,
VariantPhoenixSchema.VariantColumn.TYPE.bytes());
byte[] alleles = result.getValue(GenomeHelper.COLUMN_FAMILY_BYTES,
VariantPhoenixSchema.VariantColumn.ALLELES.bytes());
return extractVariantFromVariantRowKey(variantRowKey, type, alleles);
} else {
return extractVariantFromVariantRowKey(variantRowKey, null, null);
}
}

public static Variant extractVariantFromResultSet(ResultSet resultSet) {
String chromosome = null;
Integer start = null;
Expand All @@ -226,9 +303,10 @@ public static Variant extractVariantFromResultSet(ResultSet resultSet) {
reference = resultSet.getString(VariantPhoenixSchema.VariantColumn.REFERENCE.column());
alternate = resultSet.getString(VariantPhoenixSchema.VariantColumn.ALTERNATE.column());

String alleles = resultSet.getString(VariantPhoenixSchema.VariantColumn.ALLELES.column());
String type = resultSet.getString(VariantPhoenixSchema.VariantColumn.TYPE.column());

return buildVariant(chromosome, start, reference, alternate, type);
return buildVariant(chromosome, start, reference, alternate, type, alleles);
} catch (RuntimeException | SQLException e) {
throw new IllegalStateException("Fail to parse variant: " + chromosome
+ ':' + start
Expand All @@ -237,13 +315,12 @@ public static Variant extractVariantFromResultSet(ResultSet resultSet) {
}
}

public static Variant extractVariantFromVariantRowKey(byte[] variantRowKey) {
public static Variant extractVariantFromVariantRowKey(byte[] variantRowKey, byte[] type, byte[] alleles) {
int chrPosSeparator = ArrayUtils.indexOf(variantRowKey, (byte) 0);
String chromosome = (String) PVarchar.INSTANCE.toObject(variantRowKey, 0, chrPosSeparator, PVarchar.INSTANCE);

Integer intSize = PUnsignedInt.INSTANCE.getByteSize();
int position = (Integer) PUnsignedInt.INSTANCE.toObject(variantRowKey, chrPosSeparator + 1, intSize, PUnsignedInt.INSTANCE);
int referenceOffset = chrPosSeparator + 1 + intSize;
int position = (Integer) PUnsignedInt.INSTANCE.toObject(variantRowKey, chrPosSeparator + 1, UINT_SIZE, PUnsignedInt.INSTANCE);
int referenceOffset = chrPosSeparator + 1 + UINT_SIZE;
int refAltSeparator = ArrayUtils.indexOf(variantRowKey, (byte) 0, referenceOffset);
String reference;
String alternate;
Expand All @@ -257,8 +334,16 @@ public static Variant extractVariantFromVariantRowKey(byte[] variantRowKey) {
alternate = (String) PVarchar.INSTANCE.toObject(variantRowKey, refAltSeparator + 1,
variantRowKey.length - (refAltSeparator + 1), PVarchar.INSTANCE);
}
String typeStr = null;
String alleleStr = null;
if (type != null) {
typeStr = (String) PVarchar.INSTANCE.toObject(type);
}
if (alleles != null) {
alleleStr = (String) PVarchar.INSTANCE.toObject(alleles);
}
try {
return buildVariant(chromosome, position, reference, alternate, null);
return buildVariant(chromosome, position, reference, alternate, typeStr, alleleStr);
} catch (RuntimeException e) {
throw new IllegalStateException("Fail to parse variant: " + chromosome
+ ':' + position
Expand All @@ -268,7 +353,17 @@ public static Variant extractVariantFromVariantRowKey(byte[] variantRowKey) {
}
}

public static Variant buildVariant(String chromosome, int start, String reference, String alternate, String type) {
public static Variant buildVariant(String chromosome, int start, String reference, String alternate, String type, String alleles) {
if ((reference != null && reference.startsWith(HASH_PREFIX)) || (alternate != null && alternate.startsWith(HASH_PREFIX))) {
if (StringUtils.isNotEmpty(alleles)) {
int i1 = alleles.indexOf(SV_ALTERNATE_SEPARATOR);
reference = alleles.substring(0, i1);
alternate = alleles.substring(i1 + SV_ALTERNATE_SEPARATOR.length());
} else {
throw new IllegalStateException("Reference and alternate are hashed, but alleles is empty!"
+ " '" + chromosome + "' '" + start + "' '" + reference + "' '" + alternate + "'");
}
}

if (alternate != null && alternate.length() > 5 && alternate.contains(SV_ALTERNATE_SEPARATOR)) {
Integer end = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public enum VariantColumn implements Column {
CI_END_R("CI_END_R", PUnsignedInt.INSTANCE),

TYPE("TYPE", PVarchar.INSTANCE),
ALLELES("ALLELES", PVarchar.INSTANCE),

ANNOTATION_ID(ANNOTATION_PREFIX + "ID", PInteger.INSTANCE),

Expand Down
Loading
Loading