diff --git a/pom.xml b/pom.xml index 4aab021..21d4c7b 100644 --- a/pom.xml +++ b/pom.xml @@ -313,5 +313,17 @@ provided + + com.esotericsoftware + kryo-shaded + 3.0.0 + + + + de.javakaffee + kryo-serializers + 0.28 + + diff --git a/src/main/java/com/urbanairship/datacube/Bucketer.java b/src/main/java/com/urbanairship/datacube/Bucketer.java index 128a56c..49f234c 100644 --- a/src/main/java/com/urbanairship/datacube/Bucketer.java +++ b/src/main/java/com/urbanairship/datacube/Bucketer.java @@ -35,6 +35,11 @@ public interface Bucketer { * Return all bucket types that exist in this dimension. */ public List getBucketTypes(); + + /** + * Return coordinate represented by byte array + */ + public Object deserialize(byte[] coord, BucketType bucketType); /** @@ -58,5 +63,11 @@ public CSerializable bucketForRead(Object coordinateField, BucketType bucketType public List getBucketTypes() { return bucketTypes; } + + @Override + public Object deserialize(byte[] coord, BucketType bucketType) { + throw new RuntimeException(IdentityBucketer.class.getSimpleName()+" can not be" + + " used to get coordinate"); + } } } diff --git a/src/main/java/com/urbanairship/datacube/DataCubeIo.java b/src/main/java/com/urbanairship/datacube/DataCubeIo.java index 98b9135..7119094 100644 --- a/src/main/java/com/urbanairship/datacube/DataCubeIo.java +++ b/src/main/java/com/urbanairship/datacube/DataCubeIo.java @@ -5,7 +5,11 @@ package com.urbanairship.datacube; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; @@ -13,6 +17,13 @@ import java.util.concurrent.TimeUnit; import com.google.common.collect.Lists; +import com.urbanairship.datacube.idservices.HBaseIdService; +import com.urbanairship.datacube.idservices.MapIdService; +import com.urbanairship.datacube.dimensionholder.DimensionHolder; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,7 +85,8 @@ public class DataCubeIo { private AsyncException asyncException = null; private final Object lock = new Object(); - + private static final byte NON_WILDCARD_FIELD = 1; + // This executor will wait for DB writes to complete then check if they had an error. private final ThreadPoolExecutor asyncErrorMonitorExecutor; @@ -282,8 +294,225 @@ public Optional get(Address addr) throws IOException, InterruptedException { cube.checkValidReadOrThrow(addr); return db.get(addr); } - - public Optional get(ReadBuilder readBuilder) throws IOException, InterruptedException { + + /** + * This takes rowKey and returns map + * + * @param rowKey + * @param idservice + * @param dimensionHolder + * @return + */ + public static Map getDimensionFromRowKey(byte[] rowKey, IdService idservice, + DimensionHolder dimensionHolder) { + if (rowKey == null || rowKey.length == 0) { + log.error("RowKey is either NULL or EMPTY"); + throw new IllegalArgumentException("RowKey is either NULL or EMPTY"); + } + Map returnDimensionMap = new HashMap(); + ArrayList> cubeDimensions = dimensionHolder.getDimensionArrayList(); + int rowKeyIndex = 0; + int dimensionIndex = 0; + while (rowKeyIndex < rowKey.length) { + byte firstByte = rowKey[rowKeyIndex]; + if (firstByte == NON_WILDCARD_FIELD) { + Dimension dimension = cubeDimensions.get(dimensionIndex); + String mapKey = dimension.toString(); + String mapValue; + Bucketer dimensionBucketer = dimension.getBucketer(); + log.debug("DIMENSION {} is present in rowKey", dimension); + int sumBucketTypeAndBucketLength = dimension.sumDimensionBucketTypeAndBucket(); + int dimensionBucketLength = dimension.getNumFieldBytes(); + if (rowKeyIndex + sumBucketTypeAndBucketLength >= rowKey.length) { + log.error("The number of bytes for " + dimension + " dimension is not present"); + throw new RuntimeException("The number of bytes for " + dimension+ + "dimension is not present in rowKey"); + } else { + if (dimension.isBucketed() == true) { + Pair pair = getValueForBucketedDimension(rowKey, + rowKeyIndex, dimension, dimensionIndex, idservice); + mapValue = pair.toString(); + rowKeyIndex = rowKeyIndex + 1 + sumBucketTypeAndBucketLength; + } else { + log.debug("Dimension {} is not bucketed and bucketer is {}", dimension, + dimensionBucketer.getClass().toString()); + byte[] bucket = Arrays.copyOfRange(rowKey, rowKeyIndex + 1, + rowKeyIndex + 1 + dimensionBucketLength); + mapValue = getCoordinate(bucket, idservice, dimension, + dimensionIndex, BucketType.IDENTITY); + rowKeyIndex = rowKeyIndex + 1 + dimensionBucketLength; + } + returnDimensionMap.put(mapKey, mapValue); + dimensionIndex++; + } + } else { + Dimension dimension = cubeDimensions.get(dimensionIndex); + int sumBucketTypeAndBucketLength = dimension.sumDimensionBucketTypeAndBucket(); + if (rowKeyIndex + sumBucketTypeAndBucketLength >= rowKey.length) { + log.error("The number of bytes for " + dimension + " dimension is not present"); + throw new RuntimeException("The number of bytes for " + dimension+ + "dimension is not present in rowKey"); + } + rowKeyIndex = rowKeyIndex + 1 + sumBucketTypeAndBucketLength; + dimensionIndex++; + } + } + log.debug("return DimensionMap is {}", returnDimensionMap.toString()); + return returnDimensionMap; + } + + /** + * For the dimension, which is bucketed this function + * calculate value of bucketType and coordinate from + * the rowKey.Combine these two and return it as String + * + * @param rowKey + * @param rowKeyIndex + * @param dimension + * @param dimensionIndex + * @param idService + * @return + */ + private static Pair getValueForBucketedDimension(byte[] rowKey, + int rowKeyIndex, Dimension dimension, int dimensionIndex, IdService idService) { + Bucketer dimensionBucketer = dimension.getBucketer(); + int dimensionBucketTypeLength = dimension.getBucketPrefixSize(); + int dimensionBucketLength = dimension.getNumFieldBytes(); + log.debug("Dimension {} is bucketed and bucketer is {}", dimension, + dimensionBucketer.getClass().toString()); + byte[] bucketTypeBytes = Arrays.copyOfRange(rowKey, rowKeyIndex + 1, + rowKeyIndex + 1 + dimensionBucketTypeLength); + BucketType bucketType = getBucketTypeFromByteArray(bucketTypeBytes, dimension); + log.debug("BucketType {} in rowKey for dimension {}", bucketType.toString(), dimension); + int previousRowKeyIndex = rowKeyIndex; + rowKeyIndex = rowKeyIndex + 1 + dimensionBucketTypeLength; + if (rowKeyIndex <= sumBucketTypeBucketAndRowKeyIndex(previousRowKeyIndex, dimension)) { + byte[] bucket = Arrays.copyOfRange(rowKey, rowKeyIndex, + rowKeyIndex + dimensionBucketLength); + String coordinate = getCoordinate(bucket, idService, dimension, dimensionIndex, + bucketType); + log.debug("Dimension " + dimension + " has bucket bytes " + + Bytes.toStringBinary(bucket) + " and coordinate " + coordinate); + return (new Pair(bucketType.toString(), coordinate)); + } else { + log.error("rowKey size is small and bucket can not be extracted"); + throw new RuntimeException("rowKey size is small and bucket can not be extracted"); + } + } + + /** + * This function returns sum of (rowKeyIndex, Dimension's bucket, Dimension's bucketType) + * + * @param rowKeyIndex + * @param dimension + * @return + */ + private static int sumBucketTypeBucketAndRowKeyIndex(int rowKeyIndex, Dimension dimension) { + return (rowKeyIndex + dimension.sumDimensionBucketTypeAndBucket()); + } + + /** + * This function takes byte array and it returns + * BucketType that is represented by this byte array. + * + * @param bucketTypeBytes + * @param dimension + * @return + */ + private static BucketType getBucketTypeFromByteArray(byte[] bucketTypeBytes, + Dimension dimension) { + Bucketer dimensionBucketer = dimension.getBucketer(); + BucketType bucketType = null; + List bucketTypeList = dimensionBucketer.getBucketTypes(); + for (BucketType eleBucketType : bucketTypeList) { + byte[] uniqueId = eleBucketType.getUniqueId(); + if (ArrayUtils.isEquals(bucketTypeBytes, uniqueId)) { + bucketType = eleBucketType; + break; + } + } + if (bucketType == null) { + log.error("Dimension " + dimension + "does not have " + bucketTypeBytes.toString()); + throw new RuntimeException("Dimension " + dimension + "does not have " + + bucketTypeBytes.toString()); + } else { + log.debug("Dimension {} has {} bukcetType", dimension, bucketType.toString()); + return bucketType; + } + } + + /** + * For the dimension, that does id-substitution this function + * takes byte array and return coordinate using idService + * + * @param bucket + * @param idService + * @param dimension + * @param dimensionIndex + * @param bucketType + * @return + */ + private static String getCoordinateFromIdService(byte[] bucket, IdService idService, + Dimension dimension, int dimensionIndex, BucketType bucketType) { + Bucketer dimensionBucketer = dimension.getBucketer(); + long bucketLong = Util.bytesToLongPad(bucket); + log.debug("ID substituted long value is {}", bucketLong); + byte[] bucketAfterPadding = Util.longToBytes(bucketLong); + log.debug("bucket after padding is {}", Bytes.toStringBinary(bucketAfterPadding)); + if (idService instanceof MapIdService) { + log.debug("Id substitution is done using Map"); + byte[] actualDimensionByteValue = idService.getCoordinate(dimensionIndex, + bucketAfterPadding); + Object coordinate = dimensionBucketer.deserialize(actualDimensionByteValue, bucketType); + log.debug("Value present for dimension {} is {}", dimension.toString(), + coordinate.toString()); + return coordinate.toString(); + } else if (idService instanceof HBaseIdService) { + log.debug("Id substitution is done using Hbase Table"); + byte[] actualDimensionByteValue = idService.getCoordinate(dimensionIndex, + bucketAfterPadding); + Object coordinate = dimensionBucketer.deserialize(actualDimensionByteValue, bucketType); + log.debug("Value present for dimension {} is {}", dimension.toString(), + coordinate.toString()); + return coordinate.toString(); + } else { + log.error("Can not get coordinate from " + idService); + throw new RuntimeException("Can not get coordinate from " + idService); + } + } + + /** + * This function returns coordinate from the row key + * If id-substitution is present then it retrieve coordinate from idService + * Otherwise, it directly return coordinate by de-serializing + * + * @param bucket + * @param idService + * @param dimension + * @param dimensionIndex + * @param bucketType + * @return + */ + private static String getCoordinate(byte[] bucket, IdService idService, Dimension dimension, + int dimensionIndex, BucketType bucketType) { + Bucketer dimensionBucketer = dimension.getBucketer(); + if (dimension.getDoIdSubstitution() == true) { + log.debug("Id substitution is done for {} dimension", dimension); + String coordinateFromIdService = getCoordinateFromIdService(bucket, idService, + dimension, dimensionIndex, bucketType); + log.debug("Value present for dimension {} is {}", dimension.toString(), + coordinateFromIdService.toString()); + return coordinateFromIdService.toString(); + } else { + log.debug("Id substitution is not done for {}", dimension); + Object coordinate = dimensionBucketer.deserialize(bucket, bucketType); + log.debug("Value present for dimension {} is {}", dimension.toString(), + coordinate.toString()); + return coordinate.toString(); + } + } + + public Optional get(ReadBuilder readBuilder) throws IOException, InterruptedException { return this.get(readBuilder.build()); } diff --git a/src/main/java/com/urbanairship/datacube/Dimension.java b/src/main/java/com/urbanairship/datacube/Dimension.java index 3458a70..c81ed40 100644 --- a/src/main/java/com/urbanairship/datacube/Dimension.java +++ b/src/main/java/com/urbanairship/datacube/Dimension.java @@ -115,5 +115,7 @@ int getBucketPrefixSize() { boolean isBucketed() { return isBucketed; } + + public int sumDimensionBucketTypeAndBucket() { return (numFieldBytes + bucketPrefixSize); } } diff --git a/src/main/java/com/urbanairship/datacube/IdService.java b/src/main/java/com/urbanairship/datacube/IdService.java index 5f7a89b..998916e 100644 --- a/src/main/java/com/urbanairship/datacube/IdService.java +++ b/src/main/java/com/urbanairship/datacube/IdService.java @@ -4,6 +4,14 @@ package com.urbanairship.datacube; +import com.urbanairship.datacube.idservices.HBaseIdService; +import com.urbanairship.datacube.idservices.MapIdService; +import org.apache.commons.lang.ArrayUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; import java.io.IOException; /** @@ -29,8 +37,11 @@ */ public interface IdService { - public byte[] getId(int dimensionNum, byte[] input, int numIdBytes) throws IOException, InterruptedException; - + public static final Logger log = LoggerFactory.getLogger(IdService.class); + + public byte[] getId(int dimensionNum, byte[] input, int numIdBytes) + throws IOException, InterruptedException; + /** * Utilities to make implementation of IdService easier. */ @@ -54,4 +65,39 @@ public static void validateNumIdBytes(int numIdBytes) { } } } + + public static class Deserializer { + private static final int lengthIntByte = 4; + + public static IdService deserialize(byte[] bytes) { + ByteArrayInputStream input = new ByteArrayInputStream(bytes); + DataInputStream inputStream = new DataInputStream(input); + try { + int lengthClassName = inputStream.readInt(); + byte[] className = new byte[lengthClassName]; + inputStream.readFully(className, 0, lengthClassName); + Class classType = Class.forName(new String(className)); + log.debug("ClassType is {}", classType.toString()); + if (classType.equals(MapIdService.class)) { + byte[] subArray = ArrayUtils.subarray(bytes, + (lengthClassName + lengthIntByte), bytes.length); + return MapIdService.deserialize(subArray); + } else if (classType.equals(HBaseIdService.class)) { + byte[] subArray = ArrayUtils.subarray(bytes, + (lengthClassName + lengthIntByte), bytes.length); + return HBaseIdService.deserialize(subArray); + } else { + throw new RuntimeException(classType.toString() + " class " + + " does not have implementaion of idserviceDeserialized method "); + } + } catch (Exception e) { + log.error("Exception in IdService deserialization is "+e); + throw new RuntimeException(e); + } + } + } + + public byte[] getCoordinate(int dimensionNum, byte[] byteArray); + + public byte[] serialize(); } diff --git a/src/main/java/com/urbanairship/datacube/Util.java b/src/main/java/com/urbanairship/datacube/Util.java index e9ab117..3e8021d 100644 --- a/src/main/java/com/urbanairship/datacube/Util.java +++ b/src/main/java/com/urbanairship/datacube/Util.java @@ -57,6 +57,17 @@ public static long bytesToLongPad(byte[] bytes) { bb.flip(); return bb.getLong(); } + + public static int bytesToIntPad(byte[] bytes){ + final int padZeros = Math.max(4-bytes.length, 0); + ByteBuffer bb = ByteBuffer.allocate(4); + for(int i = 0; i < padZeros; i++) { + bb.put((byte)0); + } + bb.put(bytes, 0, 4-padZeros); + bb.flip(); + return bb.getInt(); + } public static byte[] intToBytes(int x) { ByteBuffer bb = ByteBuffer.allocate(4); diff --git a/src/main/java/com/urbanairship/datacube/bucketers/BigEndianIntBucketer.java b/src/main/java/com/urbanairship/datacube/bucketers/BigEndianIntBucketer.java index df69b81..62dd01f 100644 --- a/src/main/java/com/urbanairship/datacube/bucketers/BigEndianIntBucketer.java +++ b/src/main/java/com/urbanairship/datacube/bucketers/BigEndianIntBucketer.java @@ -21,4 +21,12 @@ public class BigEndianIntBucketer extends AbstractIdentityBucketer { public CSerializable makeSerializable(Integer coord) { return new IntSerializable(coord); } + + @Override + public Integer deserialize(byte[] coord, BucketType bucketType) { + if (coord.length > 4) + throw new IllegalArgumentException("BigEndianIntBucketer can not have coordinate " + + "byte array size more than number of bytes require by Integer"); + return IntSerializable.deserialize(coord); + } } diff --git a/src/main/java/com/urbanairship/datacube/bucketers/BigEndianLongBucketer.java b/src/main/java/com/urbanairship/datacube/bucketers/BigEndianLongBucketer.java index ae66fb9..632da6b 100644 --- a/src/main/java/com/urbanairship/datacube/bucketers/BigEndianLongBucketer.java +++ b/src/main/java/com/urbanairship/datacube/bucketers/BigEndianLongBucketer.java @@ -37,4 +37,16 @@ public CSerializable bucketForRead(Object coordinate, BucketType bucketType) { public List getBucketTypes() { return bucketTypes; } + + @Override + public Long deserialize(byte[] coord, BucketType bucketType) { + if (coord == null || coord.length == 0) { + throw new IllegalArgumentException( + "Null or Zero length byte array can not be" + " deserialized"); + } else if (coord.length > 8) { + throw new IllegalArgumentException("BigEndianLongBucketer can not have coordinate " + + "byte array size more than number of bytes require by Long"); + } + return LongSerializable.deserialize(coord); + } } diff --git a/src/main/java/com/urbanairship/datacube/bucketers/BooleanBucketer.java b/src/main/java/com/urbanairship/datacube/bucketers/BooleanBucketer.java index d93632d..2d3aad4 100644 --- a/src/main/java/com/urbanairship/datacube/bucketers/BooleanBucketer.java +++ b/src/main/java/com/urbanairship/datacube/bucketers/BooleanBucketer.java @@ -35,5 +35,13 @@ public CSerializable makeSerializable(Boolean coordinateField) { public static final BooleanBucketer getInstance() { return instance; } + + @Override + public Boolean deserialize(byte[] coord, BucketType bucketType) { + if (coord == null || coord.length == 0) + throw new IllegalArgumentException("Null or Zero length byte array can not be " + + "deserialized"); + return BooleanSerializable.deserialize(coord); + } } diff --git a/src/main/java/com/urbanairship/datacube/bucketers/EnumToOrdinalBucketer.java b/src/main/java/com/urbanairship/datacube/bucketers/EnumToOrdinalBucketer.java index c97c54a..312417d 100644 --- a/src/main/java/com/urbanairship/datacube/bucketers/EnumToOrdinalBucketer.java +++ b/src/main/java/com/urbanairship/datacube/bucketers/EnumToOrdinalBucketer.java @@ -12,12 +12,15 @@ import com.urbanairship.datacube.CSerializable; import com.urbanairship.datacube.Util; import com.urbanairship.datacube.serializables.BytesSerializable; +import com.urbanairship.datacube.serializables.EnumSerializable; public class EnumToOrdinalBucketer> extends AbstractIdentityBucketer { private final int numBytes; - - public EnumToOrdinalBucketer(int numBytes) { + private final Class enumClass; + + public EnumToOrdinalBucketer(int numBytes, Class enumClass) { this.numBytes = numBytes; + this.enumClass = enumClass; } @Override @@ -27,4 +30,17 @@ public CSerializable makeSerializable(T coordinate) { return new BytesSerializable(bytes); } + + @Override + public T deserialize(byte[] coord, BucketType bucketType) { + if (coord == null || coord.length == 0) { + throw new IllegalArgumentException("Null or Zero length byte array can not be " + + "deserialized"); + } else if (coord.length > 4) { + throw new IllegalArgumentException("EnumToOrdinalBucketer can not have coordinate " + + "byte array size more than number of bytes require by Integer"); + } + int ordinal = EnumSerializable.deserialize(coord); + return enumClass.getEnumConstants()[ordinal]; + } } diff --git a/src/main/java/com/urbanairship/datacube/bucketers/HourDayMonthBucketer.java b/src/main/java/com/urbanairship/datacube/bucketers/HourDayMonthBucketer.java index 9dd18bb..2d9cd71 100644 --- a/src/main/java/com/urbanairship/datacube/bucketers/HourDayMonthBucketer.java +++ b/src/main/java/com/urbanairship/datacube/bucketers/HourDayMonthBucketer.java @@ -86,6 +86,19 @@ private CSerializable bucket(DateTime inputTime, BucketType bucketType) { public List getBucketTypes() { return ImmutableList.of(hours, days, months, years, weeks); } + + @Override + public DateTime deserialize(byte[] coord, BucketType bucketType){ + if (coord == null || coord.length == 0) { + throw new IllegalArgumentException("Null or Zero length byte array can not be " + + "deserialized"); + } else if (coord.length > 8) { + throw new IllegalArgumentException("HourMonthDayBucketer can not have coordinate " + + "byte array size more than number of bytes require by Long"); + } + Long coordLong = LongSerializable.deserialize(coord); + return new DateTime(coordLong); + } public static DateTime hourFloor(DateTime dt) { return dt.withMillisOfSecond(0).withSecondOfMinute(0).withMinuteOfHour(0); diff --git a/src/main/java/com/urbanairship/datacube/bucketers/StringToBytesBucketer.java b/src/main/java/com/urbanairship/datacube/bucketers/StringToBytesBucketer.java index bd00e31..5bfd690 100644 --- a/src/main/java/com/urbanairship/datacube/bucketers/StringToBytesBucketer.java +++ b/src/main/java/com/urbanairship/datacube/bucketers/StringToBytesBucketer.java @@ -29,4 +29,11 @@ public static final StringToBytesBucketer getInstance() { public CSerializable makeSerializable(String coord) { return new StringSerializable(coord); } + + @Override + public String deserialize(byte[] coord, BucketType bucketType) { + if (coord == null || coord.length == 0) + throw new IllegalArgumentException("Zero length byte array can not be deserialized"); + return StringSerializable.deserialize(coord); + } } diff --git a/src/main/java/com/urbanairship/datacube/bucketers/TagsBucketer.java b/src/main/java/com/urbanairship/datacube/bucketers/TagsBucketer.java index c1bd142..f171265 100644 --- a/src/main/java/com/urbanairship/datacube/bucketers/TagsBucketer.java +++ b/src/main/java/com/urbanairship/datacube/bucketers/TagsBucketer.java @@ -37,4 +37,12 @@ public CSerializable bucketForRead(Object coordinate, BucketType bucketType) { public List getBucketTypes() { return ImmutableList.of(BucketType.IDENTITY); } + + @Override + public String deserialize(byte[] coord, BucketType bucketType) { + if (coord == null || coord.length == 0) + throw new IllegalArgumentException("Null or Zero length byte array can not be " + + "deserialized"); + return StringSerializable.deserialize(coord); + } } diff --git a/src/main/java/com/urbanairship/datacube/dimensionholder/DimensionHolder.java b/src/main/java/com/urbanairship/datacube/dimensionholder/DimensionHolder.java new file mode 100644 index 0000000..40eef82 --- /dev/null +++ b/src/main/java/com/urbanairship/datacube/dimensionholder/DimensionHolder.java @@ -0,0 +1,80 @@ +package com.urbanairship.datacube.dimensionholder; + +/** + * To get dimensions from the row key, we need meta-data about + * the cube (its dimensions and map of (column qualifier name and operation(LongOp, IntOp etc))that + * the cube is using for aggregating in each column of the data cube table. + * so this dimensionHolder class stores these meta-data of the cube. + */ + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.collect.ImmutableList; +import com.urbanairship.datacube.Dimension; +import com.urbanairship.datacube.operations.OpDeserializers; +import de.javakaffee.kryoserializers.KryoReflectionFactorySupport; +import de.javakaffee.kryoserializers.guava.ImmutableListSerializer; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.util.ArrayList; +import java.util.Map; + +public class DimensionHolder { + private ArrayList> dimensionArrayList; + private Map qualifierOperationMap; + private int rowKeyPrefixLength; + private static ThreadLocal kryos = new ThreadLocal() { + protected Kryo initialValue() { + Kryo kryo = new KryoReflectionFactorySupport() { + @Override + public Serializer getDefaultSerializer(final Class clazz) { + if (ImmutableList.class.isAssignableFrom(clazz)) { + return new ImmutableListSerializer(); + } + return super.getDefaultSerializer(clazz); + } + }; + return kryo; + } + }; + + public DimensionHolder(ArrayList> dimensionArrayList, + Map qualifierOperationMap, int rowKeyPrefixLength) { + this.dimensionArrayList = dimensionArrayList; + this.qualifierOperationMap = qualifierOperationMap; + this.rowKeyPrefixLength = rowKeyPrefixLength; + } + + public byte[] serialize() { + final Kryo kryo = kryos.get(); + ImmutableListSerializer.registerSerializers(kryo); + Output output = new Output(new ByteArrayOutputStream()); + kryo.writeObject(output, this); + byte[] objectBytes = output.toBytes(); + output.close(); + return objectBytes; + } + + public static DimensionHolder deserialize(byte[] byteArray) { + final Kryo kryo = kryos.get(); + ImmutableListSerializer.registerSerializers(kryo); + Input input = new Input(new ByteArrayInputStream(byteArray)); + DimensionHolder readObject; + readObject = kryo.readObject(input, DimensionHolder.class); + input.close(); + return readObject; + } + + public ArrayList> getDimensionArrayList() { + return dimensionArrayList; + } + + public Map getqualifierOperationMap() { + return qualifierOperationMap; + } + + public int getRowKeyPrefixLength() { return rowKeyPrefixLength; } +} diff --git a/src/main/java/com/urbanairship/datacube/idservices/CachingIdService.java b/src/main/java/com/urbanairship/datacube/idservices/CachingIdService.java index 60e7bc9..d07a202 100644 --- a/src/main/java/com/urbanairship/datacube/idservices/CachingIdService.java +++ b/src/main/java/com/urbanairship/datacube/idservices/CachingIdService.java @@ -4,6 +4,8 @@ package com.urbanairship.datacube.idservices; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.util.concurrent.ExecutionException; @@ -96,7 +98,27 @@ public byte[] getId(int dimensionNum, byte[] bytes, int numIdBytes) throws IOExc } } - + + @Override + public byte[] serialize() { + try { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); + outputStream.writeInt(CachingIdService.class.getName().length()); + outputStream.write(CachingIdService.class.getName().getBytes()); + return byteArrayOutputStream.toByteArray(); + } catch (Exception e) { + log.error("Exception in idService serialization of CachingIdService "+e); + throw new RuntimeException(e); + } + } + + @Override + public byte[] getCoordinate(int dimensionIndex, byte[] byteArray) { + log.error("Can not get coordinate from CachingIdService"); + throw new RuntimeException("Can not get coordinate from CachingIdService"); + } + private class Key { private final int dimensionNum; private final BoxedByteArray bytes; diff --git a/src/main/java/com/urbanairship/datacube/idservices/HBaseIdService.java b/src/main/java/com/urbanairship/datacube/idservices/HBaseIdService.java index 0bcea7e..76c4100 100644 --- a/src/main/java/com/urbanairship/datacube/idservices/HBaseIdService.java +++ b/src/main/java/com/urbanairship/datacube/idservices/HBaseIdService.java @@ -4,6 +4,10 @@ package com.urbanairship.datacube.idservices; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; @@ -15,6 +19,7 @@ import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTablePool; @@ -22,6 +27,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,7 +43,8 @@ public class HBaseIdService implements IdService { private static final Logger log = LoggerFactory.getLogger(HBaseIdService.class); public static final byte[] QUALIFIER = ArrayUtils.EMPTY_BYTE_ARRAY; - public static final long ALLOC_TIMEOUT_MS = 10000; + public static final byte[] REVERSE_QUALIFIER = new byte[] {'r'}; + public static final long ALLOC_TIMEOUT_MS = 10000; private static enum Status {ALLOCATING, ALLOCATED}; // don't change ordinals private static final byte[] ALLOCATING_BYTES = new byte[] {(byte)Status.ALLOCATING.ordinal()}; @@ -47,16 +54,117 @@ private static enum Status {ALLOCATING, ALLOCATED}; // don't change ordinals private final byte[] lookupTable; private final byte[] uniqueCubeName; private final byte[] cf; - - public HBaseIdService(Configuration configuration, byte[] lookupTable, - byte[] counterTable, byte[] cf, byte[] uniqueCubeName) { + private final boolean storeReverseMapping; + private final String zookeperQuorum; + private final String clientPort; + private final String parentNode; + + public HBaseIdService(Configuration configuration, byte[] lookupTable, + byte[] counterTable, byte[] cf, byte[] uniqueCubeName, boolean storeReverseMapping) { pool = new HTablePool(configuration, Integer.MAX_VALUE); this.lookupTable = lookupTable; this.counterTable = counterTable; this.uniqueCubeName = uniqueCubeName; this.cf = cf; + this.storeReverseMapping = storeReverseMapping; + this.zookeperQuorum = configuration.get(HConstants.ZOOKEEPER_QUORUM); + this.clientPort = configuration.get(HConstants.CLIENT_PORT_STR); + this.parentNode = configuration.get(HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); } - + + public HBaseIdService(Configuration configuration, byte[] lookupTable, + byte[] counterTable, byte[] cf, byte[] uniqueCubeName) { + this(configuration, lookupTable, counterTable, cf, uniqueCubeName, false); + } + + @Override + public byte[] serialize() { + try { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); + outputStream.writeInt(HBaseIdService.class.getName().length()); + outputStream.write(HBaseIdService.class.getName().getBytes()); + + outputStream.writeInt(zookeperQuorum.length()); + outputStream.write(zookeperQuorum.getBytes()); + + outputStream.writeInt(clientPort.length()); + outputStream.write(clientPort.getBytes()); + + outputStream.writeInt(parentNode.length()); + outputStream.write(parentNode.getBytes()); + + outputStream.writeInt(lookupTable.length); + outputStream.write(lookupTable); + outputStream.writeInt(counterTable.length); + outputStream.write(counterTable); + outputStream.writeInt(cf.length); + outputStream.write(cf); + outputStream.writeInt(uniqueCubeName.length); + outputStream.write(uniqueCubeName); + outputStream.writeBoolean(storeReverseMapping); + return byteArrayOutputStream.toByteArray(); + } catch (Exception e) { + log.error("Exception in HbaseIdService serialization is " + e); + throw new RuntimeException(e); + } + } + + public static HBaseIdService deserialize(byte[] bytes) { + ByteArrayInputStream input = new ByteArrayInputStream(bytes); + DataInputStream inputStream = new DataInputStream(input); + try { + int zookeperLength = inputStream.readInt(); + byte[] zookeperBytes = new byte[zookeperLength]; + inputStream.readFully(zookeperBytes, 0, zookeperLength); + + int clientPortLength = inputStream.readInt(); + byte[] clientPortBytes = new byte[clientPortLength]; + inputStream.readFully(clientPortBytes, 0, clientPortLength); + + int parentNodeLength = inputStream.readInt(); + byte[] parentNodeBytes = new byte[parentNodeLength]; + inputStream.readFully(parentNodeBytes, 0, parentNodeLength); + + int lookUpTableSize = inputStream.readInt(); + byte[] lookUpTable = new byte[lookUpTableSize]; + inputStream.readFully(lookUpTable, 0, lookUpTableSize); + + int counterTableSize = inputStream.readInt(); + byte[] counterTable = new byte[counterTableSize]; + inputStream.readFully(counterTable, 0, counterTableSize); + + int cfSize = inputStream.readInt(); + byte[] cf = new byte[cfSize]; + inputStream.readFully(cf, 0, cfSize); + + int uniqueCubeNameSize = inputStream.readInt(); + byte[] uniqueCubeName = new byte[uniqueCubeNameSize]; + inputStream.readFully(uniqueCubeName, 0, uniqueCubeNameSize); + + boolean storeReverseMapping = inputStream.readBoolean(); + + Configuration configuration = HBaseConfiguration.create(); + log.debug("ZOOKEPER QUORUM used in HbaseIdService recreation is {}", + Bytes.toString(zookeperBytes)); + log.debug("ZOOKEPER CLIENT PORT used in HbaseIdService recreation is {}", + Bytes.toString(clientPortBytes)); + log.debug("ZOOKEPER ZNODE PARENT used in HbaseIdService recreation is {}", + Bytes.toString(parentNodeBytes)); + + configuration.set(HConstants.ZOOKEEPER_QUORUM, Bytes.toString(zookeperBytes)); + configuration.set(HConstants.ZOOKEEPER_CLIENT_PORT, Bytes.toString(clientPortBytes)); + configuration.set(HConstants.ZOOKEEPER_ZNODE_PARENT, Bytes.toString(parentNodeBytes)); + + return new HBaseIdService(configuration, lookUpTable, counterTable, cf, uniqueCubeName, + storeReverseMapping); + } + catch (Exception e) { + log.error("Exception in HbaseIdService deserialization is "+e); + throw new RuntimeException(e); + } + } + @Override public byte[] getId(int dimensionNum, byte[] input, int numIdBytes) throws IOException, InterruptedException { @@ -64,6 +172,7 @@ public byte[] getId(int dimensionNum, byte[] input, int numIdBytes) throws IOExc Validate.validateNumIdBytes(numIdBytes); final byte[] lookupKey = makeLookupKey(dimensionNum, input); + log.debug("LookUpKey from makeLookUpKey {}", Bytes.toStringBinary(lookupKey)); /* * PHASE 1 @@ -185,13 +294,33 @@ public byte[] getId(int dimensionNum, byte[] input, int numIdBytes) throws IOExc * of IDs: once one thread uses an input->id mapping, every other thread must use it * forever. */ + + final int revBufSize = uniqueCubeName.length + 2; + ByteBuffer bb = ByteBuffer.allocate(revBufSize); + bb.put(uniqueCubeName); + bb.putShort((short) dimensionNum); + byte[] revArray = bb.array(); + byte[] allocateRevKey = ArrayUtils.addAll(revArray, Util.longToBytes(id)); + final Put putRev = new Put(allocateRevKey); + putRev.add(cf, REVERSE_QUALIFIER, lookupKey); + + final Put put = new Put(lookupKey); byte[] allocatedRecord = ArrayUtils.addAll(new byte[] {(byte)Status.ALLOCATED.ordinal()}, Util.longToBytes(id)); put.add(cf, QUALIFIER, allocatedRecord); boolean swapSuccess = WithHTable.checkAndPut(pool, lookupTable, lookupKey, cf, QUALIFIER, allocRecord, put); - if(swapSuccess) { + if (swapSuccess) { + if (storeReverseMapping) { + log.debug("Storing Reverse Mapping is enabled for dimensionNum {}", dimensionNum); + WithHTable.put(pool, lookupTable, putRev); + log.debug("Reverse mapping key in "+Bytes.toStringBinary(lookupTable)+ + " table is "+Bytes.toStringBinary(allocateRevKey)+" for dimension " + +dimensionNum+" input "+Bytes.toStringBinary(lookupKey)); + } else { + log.debug("Storing Reverse Mapping is disabled for dimensionNum {}", dimensionNum); + } return Util.leastSignificantBytes(id, numIdBytes); } else { log.warn("Concurrent allocators!?!? ID " + id + " will never be used"); @@ -199,7 +328,53 @@ public byte[] getId(int dimensionNum, byte[] input, int numIdBytes) throws IOExc return getId(dimensionNum, input, numIdBytes); } } - + + @Override + public byte[] getCoordinate(int dimensionNum, byte[] byteArray) { + if (storeReverseMapping == false) { + log.error("As storing reverse mapping is disabled so can not get " + + "coordinate for dimension "+dimensionNum); + throw new RuntimeException("As storing reverse mapping is disabled so can " + + "not get coordinate for dimension "+dimensionNum); + } + + final int bufSize = uniqueCubeName.length + 2; + ByteBuffer bb = ByteBuffer.allocate(bufSize); + bb.put(uniqueCubeName); + bb.putShort((short) dimensionNum); + byte[] lookUpKeyFirstPart = bb.array(); + byte[] generatedlookUpKey = ArrayUtils.addAll(lookUpKeyFirstPart, byteArray); + + log.debug("Reverse lookUpKey: {} in table {} when getting coordinate from HbaseIdService", + Bytes.toStringBinary(generatedlookUpKey), Bytes.toStringBinary(lookupTable)); + + try { + Get get = new Get(generatedlookUpKey); + Result result = WithHTable.get(pool, lookupTable, get); + final byte[] columnVal = result.getValue(cf, REVERSE_QUALIFIER); + + if (columnVal == null) { + log.error("The value stored at REVERSE_QUALIFIER for Reverse LookUpKey " + +Bytes.toStringBinary(generatedlookUpKey)+" in "+ + Bytes.toStringBinary(lookupTable)+ " table is null"); + throw new RuntimeException("The value stored at REVERSE_QUALIFIER for " + + "Reverse LookUpKey "+Bytes.toStringBinary(generatedlookUpKey)+" in "+ + Bytes.toStringBinary(lookupTable)+ " table is null"); + } else { + int uniqueCubeLength = uniqueCubeName.length; + int prefixLength = uniqueCubeLength + 2; + byte[] actualDimensionValue = ArrayUtils.subarray(columnVal, prefixLength, + columnVal.length); + log.debug("Actual coordinate is {}", Bytes.toStringBinary(actualDimensionValue)+ + " for input "+Bytes.toStringBinary(byteArray)); + return actualDimensionValue; + } + } catch (Exception e) { + log.error("Exception present in getting coordinate from HBaseIdService " + e); + throw new RuntimeException(e); + } + } + public boolean consistencyCheck() throws IOException { Scan scan = new Scan(); scan.setStartRow(uniqueCubeName); diff --git a/src/main/java/com/urbanairship/datacube/idservices/MapIdService.java b/src/main/java/com/urbanairship/datacube/idservices/MapIdService.java index 9f576cb..7cdbb1b 100644 --- a/src/main/java/com/urbanairship/datacube/idservices/MapIdService.java +++ b/src/main/java/com/urbanairship/datacube/idservices/MapIdService.java @@ -4,10 +4,15 @@ package com.urbanairship.datacube.idservices; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.util.Arrays; import java.util.Map; import org.apache.commons.codec.binary.Hex; +import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,24 +27,43 @@ */ public class MapIdService implements IdService { private static final Logger log = LoggerFactory.getLogger(MapIdService.class); - - private final Map> idMap = Maps.newHashMap(); - private final Map nextIds = Maps.newHashMap(); - + + private final Map> idMap = Maps.newHashMap(); + private final Map nextIds = Maps.newHashMap(); + private final Map> reverseIdMap = Maps.newHashMap(); + private final boolean storeReverseMapping; + + public MapIdService() { + this.storeReverseMapping = false; + } + + public MapIdService(boolean storeReverseMapping) { + this.storeReverseMapping = storeReverseMapping; + } + @Override public byte[] getId(int dimensionNum, byte[] bytes, int numBytes) { Validate.validateDimensionNum(numBytes); Validate.validateNumIdBytes(numBytes); - - Map idMapForDimension = idMap.get(dimensionNum); - - if(idMapForDimension == null) { + + Map idMapForDimension = idMap.get(dimensionNum); + Map reverseIdMapForDimension = reverseIdMap.get(dimensionNum); + + if (idMapForDimension == null) { // This is the first request for this dimension. Create a new map for the dimension. - if(log.isDebugEnabled()) { + if (log.isDebugEnabled()) { log.debug("Creating new id map for dimension " + dimensionNum); } idMapForDimension = Maps.newHashMap(); idMap.put(dimensionNum, idMapForDimension); + + if (storeReverseMapping) { + log.debug("Storing Reverse Mapping is enabled for dimensionNum {}", dimensionNum); + reverseIdMapForDimension = Maps.newHashMap(); + reverseIdMap.put(dimensionNum, reverseIdMapForDimension); + } else { + log.debug("Storing Reverse Mapping is disabled for dimensionNum {}", dimensionNum); + } } BoxedByteArray inputBytes = new BoxedByteArray(bytes); @@ -55,7 +79,12 @@ public byte[] getId(int dimensionNum, byte[] bytes, int numBytes) { // Remember this ID assignment, future requests should get the same ID idMapForDimension.put(inputBytes, id); - + + if (storeReverseMapping) { + reverseIdMapForDimension.put(id, inputBytes); + log.debug("Storing reverse unique ID " + Bytes.toStringBinary(inputBytes.bytes) + + " for dimension " + dimensionNum + " and input: " + id); + } // The next ID assigned for this dimension should be one greater than this one long nextId = id+1L; nextIds.put(dimensionNum, nextId); @@ -72,4 +101,47 @@ public byte[] getId(int dimensionNum, byte[] bytes, int numBytes) { } return idBytesTruncated; } + + @Override + public byte[] getCoordinate(int dimensionNum, byte[] byteArray) { + long bucketLong = Util.bytesToLong(byteArray); + Map mapforDimension = reverseIdMap.get(dimensionNum); + if (mapforDimension != null) { + BoxedByteArray value = mapforDimension.get(bucketLong); + return value.bytes; + } else { + log.error("Reverse mapping for dimension "+dimensionNum+ " is not stored" + + " so coordinate can not be extracted from MapIdService"); + throw new RuntimeException("Reverse mapping for dimension "+dimensionNum+ + " is not stored so coordinate can not be extracted from MapIdService"); + } + } + + @Override + public byte[] serialize() { + try { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); + outputStream.writeInt(MapIdService.class.getName().length()); + outputStream.write(MapIdService.class.getName().getBytes()); + outputStream.writeBoolean(storeReverseMapping); + return byteArrayOutputStream.toByteArray(); + } catch (Exception e) { + log.error("Exception in MapIdService serialization "+e); + throw new RuntimeException(e); + } + } + + public static MapIdService deserialize(byte[] bytes) { + ByteArrayInputStream input = new ByteArrayInputStream(bytes); + DataInputStream inputStream = new DataInputStream(input); + try { + boolean storeReverseMapping = inputStream.readBoolean(); + return new MapIdService(storeReverseMapping); + } + catch (Exception e) { + log.error("Exception in MapIdSerive deserialization is "+e); + throw new RuntimeException(e); + } + } } diff --git a/src/main/java/com/urbanairship/datacube/operations/OpDeserializer.java b/src/main/java/com/urbanairship/datacube/operations/OpDeserializer.java new file mode 100644 index 0000000..6125918 --- /dev/null +++ b/src/main/java/com/urbanairship/datacube/operations/OpDeserializer.java @@ -0,0 +1,5 @@ +package com.urbanairship.datacube.operations; + +public interface OpDeserializer { + String getValue(byte[] values); +} diff --git a/src/main/java/com/urbanairship/datacube/operations/OpDeserializers.java b/src/main/java/com/urbanairship/datacube/operations/OpDeserializers.java new file mode 100644 index 0000000..965d968 --- /dev/null +++ b/src/main/java/com/urbanairship/datacube/operations/OpDeserializers.java @@ -0,0 +1,29 @@ +package com.urbanairship.datacube.operations; + +import com.urbanairship.datacube.ops.DoubleOp; +import com.urbanairship.datacube.ops.IntOp; +import com.urbanairship.datacube.ops.LongOp; + +public enum OpDeserializers implements OpDeserializer { + INT { + @Override + public String getValue(byte[] values) { + IntOp.IntOpDeserializer deserializer = new IntOp.IntOpDeserializer(); + return String.valueOf(deserializer.fromBytes(values).getInt()); + } + }, + DOUBLE { + @Override + public String getValue(byte[] values) { + DoubleOp.DoubleOpDeserializer deserializer = new DoubleOp.DoubleOpDeserializer(); + return String.valueOf(deserializer.fromBytes(values).getDouble()); + } + }, + LONG { + @Override + public String getValue(byte[] values) { + LongOp.LongOpDeserializer deserializer = new LongOp.LongOpDeserializer(); + return String.valueOf(deserializer.fromBytes(values).getLong()); + } + }; +} diff --git a/src/main/java/com/urbanairship/datacube/serializables/BooleanSerializable.java b/src/main/java/com/urbanairship/datacube/serializables/BooleanSerializable.java index a9fc6a4..c112fe1 100644 --- a/src/main/java/com/urbanairship/datacube/serializables/BooleanSerializable.java +++ b/src/main/java/com/urbanairship/datacube/serializables/BooleanSerializable.java @@ -5,6 +5,7 @@ package com.urbanairship.datacube.serializables; import com.urbanairship.datacube.CSerializable; +import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.hbase.util.Bytes; /** @@ -31,4 +32,18 @@ public static byte[] staticSerialize(boolean b) { return FALSE_SERIAL; } } + + public static boolean deserialize(byte[] coord) { + if(ArrayUtils.isEquals(FALSE_SERIAL, coord)) { + return false; + } + else if (ArrayUtils.isEquals(TRUE_SERIAL, coord)) { + return true; + } + else { + throw new IllegalArgumentException("Byte array should be of length 1 and holding " + + "(0 or 1 only) and Given byte array coordinate does not satisfy " + + "this condition so can not deserialized"); + } + } } diff --git a/src/main/java/com/urbanairship/datacube/serializables/BytesSerializable.java b/src/main/java/com/urbanairship/datacube/serializables/BytesSerializable.java index 9536f74..aac1f43 100644 --- a/src/main/java/com/urbanairship/datacube/serializables/BytesSerializable.java +++ b/src/main/java/com/urbanairship/datacube/serializables/BytesSerializable.java @@ -17,4 +17,8 @@ public BytesSerializable(byte[] bytes) { public byte[] serialize() { return bytes; } + + public static byte[] deserialize(byte[] coord){ + return coord; + } } diff --git a/src/main/java/com/urbanairship/datacube/serializables/EnumSerializable.java b/src/main/java/com/urbanairship/datacube/serializables/EnumSerializable.java index 3196188..38694ab 100644 --- a/src/main/java/com/urbanairship/datacube/serializables/EnumSerializable.java +++ b/src/main/java/com/urbanairship/datacube/serializables/EnumSerializable.java @@ -42,4 +42,8 @@ public static byte[] staticSerialize(Enum enumInstance, int numFieldBytes) { public static byte[] staticSerialize(int ordinal, int numFieldBytes) { return Util.intToBytesWithLen(ordinal, numFieldBytes); } + + public static int deserialize(byte[] coord){ + return Util.bytesToIntPad(coord); + } } diff --git a/src/main/java/com/urbanairship/datacube/serializables/IntSerializable.java b/src/main/java/com/urbanairship/datacube/serializables/IntSerializable.java index df2657c..3ce1452 100644 --- a/src/main/java/com/urbanairship/datacube/serializables/IntSerializable.java +++ b/src/main/java/com/urbanairship/datacube/serializables/IntSerializable.java @@ -12,7 +12,7 @@ */ public class IntSerializable implements CSerializable { private final int x; - + public IntSerializable(int x) { this.x = x; } @@ -25,4 +25,8 @@ public byte[] serialize() { public static byte[] staticSerialize(int x) { return Util.intToBytes(x); } + + public static int deserialize(byte[] coord) { + return Util.bytesToInt(coord); + } } diff --git a/src/main/java/com/urbanairship/datacube/serializables/LongSerializable.java b/src/main/java/com/urbanairship/datacube/serializables/LongSerializable.java index 01a8921..57a40aa 100644 --- a/src/main/java/com/urbanairship/datacube/serializables/LongSerializable.java +++ b/src/main/java/com/urbanairship/datacube/serializables/LongSerializable.java @@ -12,7 +12,7 @@ */ public class LongSerializable implements CSerializable { private final long l; - + public LongSerializable(long l) { this.l = l; } @@ -25,4 +25,8 @@ public byte[] serialize() { public static byte[] staticSerialize(long l) { return Util.longToBytes(l); } + + public static Long deserialize(byte[] coord) { + return Util.bytesToLong(coord); + } } diff --git a/src/main/java/com/urbanairship/datacube/serializables/StringSerializable.java b/src/main/java/com/urbanairship/datacube/serializables/StringSerializable.java index c15705b..c36cd8f 100644 --- a/src/main/java/com/urbanairship/datacube/serializables/StringSerializable.java +++ b/src/main/java/com/urbanairship/datacube/serializables/StringSerializable.java @@ -31,4 +31,8 @@ public static byte[] staticSerialize(String s) { throw new RuntimeException(e); } } + + public static String deserialize(byte[] coord) { + return new String(coord); + } } diff --git a/src/test/java/com/urbanairship/datacube/CompleteExampleTest.java b/src/test/java/com/urbanairship/datacube/CompleteExampleTest.java index 412cc80..a640cdf 100644 --- a/src/test/java/com/urbanairship/datacube/CompleteExampleTest.java +++ b/src/test/java/com/urbanairship/datacube/CompleteExampleTest.java @@ -100,6 +100,27 @@ public CSerializable bucketForRead(Object coordinateField, BucketType bucketType public List getBucketTypes() { return ImmutableList.of(usState, usCity); } + + @Override + public Object deserialize(byte[] coord, BucketType bucketType) { + if (coord == null || coord.length ==0) { + throw new IllegalArgumentException("Null or Zero length byte array can not be" + + " deserialized"); + } else if (coord.length > 4) { + throw new IllegalArgumentException("LocationBucketer can not have coordinate " + + "byte array size more than number of bytes require by Integer"); + } + int intCoord = EnumSerializable.deserialize(coord); + if (bucketType.equals(usCity)) { + return City.values()[intCoord]; + } + else if (bucketType.equals(usState)) { + return UsState.values()[intCoord]; + } + else { + throw new RuntimeException(bucketType.toString()+" bucketType is not present"); + } + } }; /** @@ -146,6 +167,27 @@ public CSerializable bucketForRead(Object coordinateField, BucketType bucketType public List getBucketTypes() { return ImmutableList.of(deviceName, osType); } + + @Override + public Object deserialize(byte[] coord, BucketType bucketType) { + if (coord == null || coord.length == 0) { + throw new IllegalArgumentException("Null or Zero length byte array can not be" + + " deserialized"); + } else if (coord.length > 4) { + throw new IllegalArgumentException("DeviceBucketer can not have coordinate " + + "byte array size more than number of bytes require by Integer"); + } + int intCoord = EnumSerializable.deserialize(coord); + if (bucketType.equals(deviceName)) { + return DeviceType.values()[intCoord]; + } + else if(bucketType.equals(osType)) { + return OsManufacturer.values()[intCoord]; + } + else { + throw new RuntimeException(bucketType.toString()+" bucketType is not present"); + } + } } /* diff --git a/src/test/java/com/urbanairship/datacube/HBaseBackfillIntegrationTest.java b/src/test/java/com/urbanairship/datacube/HBaseBackfillIntegrationTest.java index b6e048f..10c39ee 100644 --- a/src/test/java/com/urbanairship/datacube/HBaseBackfillIntegrationTest.java +++ b/src/test/java/com/urbanairship/datacube/HBaseBackfillIntegrationTest.java @@ -46,7 +46,7 @@ private static enum Color {RED, BLUE}; private static final Dimension timeDimension = new Dimension("time", new HourDayMonthBucketer(), false, 8); private static final Dimension colorDimension = new Dimension("color", - new EnumToOrdinalBucketer(1), false, 1); + new EnumToOrdinalBucketer(1, Color.class), false, 1); private static IdService idService; private static DataCube oldCube, newCube; diff --git a/src/test/java/com/urbanairship/datacube/TimedFlushTest.java b/src/test/java/com/urbanairship/datacube/TimedFlushTest.java index f8c8c8f..9ac4ab8 100644 --- a/src/test/java/com/urbanairship/datacube/TimedFlushTest.java +++ b/src/test/java/com/urbanairship/datacube/TimedFlushTest.java @@ -27,7 +27,8 @@ enum Color {RED, BLUE}; @Test public void test() throws Exception { - Dimension colorDimension = new Dimension("color", new EnumToOrdinalBucketer(1), false, 1); + Dimension colorDimension = new Dimension("color", + new EnumToOrdinalBucketer(1, Color.class), false, 1); Rollup colorRollup = new Rollup(colorDimension); IdService idService = new MapIdService(); ConcurrentMap backingMap = Maps.newConcurrentMap();