diff --git a/ranger-discovery-bundle/perf/results/io.appform.ranger.discovery.bundle.id.PartitionAwareNonceGeneratorTest.testGenerateWithBenchmark.json b/ranger-discovery-bundle/perf/results/io.appform.ranger.discovery.bundle.id.PartitionAwareNonceGeneratorTest.testGenerateWithBenchmark.json new file mode 100644 index 00000000..35012bf8 --- /dev/null +++ b/ranger-discovery-bundle/perf/results/io.appform.ranger.discovery.bundle.id.PartitionAwareNonceGeneratorTest.testGenerateWithBenchmark.json @@ -0,0 +1,7 @@ +{ + "name" : "io.appform.ranger.discovery.bundle.id.PartitionAwareNonceGeneratorTest.testGenerateWithBenchmark", + "iterations" : 100000, + "threads" : 5, + "totalMillis" : 4118, + "avgTime" : 823.6 +} \ No newline at end of file diff --git a/ranger-discovery-bundle/perf/results/io.appform.ranger.discovery.bundle.id.WeightedNonceGeneratorTest.testGenerateWithBenchmark.json b/ranger-discovery-bundle/perf/results/io.appform.ranger.discovery.bundle.id.WeightedNonceGeneratorTest.testGenerateWithBenchmark.json new file mode 100644 index 00000000..9501a860 --- /dev/null +++ b/ranger-discovery-bundle/perf/results/io.appform.ranger.discovery.bundle.id.WeightedNonceGeneratorTest.testGenerateWithBenchmark.json @@ -0,0 +1,7 @@ +{ + "name" : "io.appform.ranger.discovery.bundle.id.WeightedNonceGeneratorTest.testGenerateWithBenchmark", + "iterations" : 100000, + "threads" : 5, + "totalMillis" : 5020, + "avgTime" : 1004.0 +} \ No newline at end of file diff --git a/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/CircularQueue.java b/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/CircularQueue.java new file mode 100644 index 00000000..3de33103 --- /dev/null +++ b/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/CircularQueue.java @@ -0,0 +1,70 @@ +package io.appform.ranger.discovery.bundle.id; + +import com.codahale.metrics.MetricRegistry; +import lombok.extern.slf4j.Slf4j; +import lombok.val; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerArray; + +@Slf4j +class CircularQueue { + private static final String QUEUE_FULL_METRIC_STRING = "idGenerator.queueFull.forPrefix."; + private static final String UNUSED_INDICES_METRIC_STRING = "idGenerator.unusedIds.forPrefix."; + +// private final Meter queueFullMeter; +// private final Meter unusedDataMeter; + + /** List of data for the specific queue */ + private final AtomicIntegerArray queue; + + /** Size of the queue */ + private final int size; + + /** Pointer to track the first usable index. Helps to determine the next usable data. */ + private final AtomicInteger firstIdx = new AtomicInteger(0); + + /** Pointer to track index of last usable index. Helps to determine which index the next data should go in. */ + private final AtomicInteger lastIdx = new AtomicInteger(0); + + + public CircularQueue(int size, final MetricRegistry metricRegistry, final String namespace) { + this.size = size; + this.queue = new AtomicIntegerArray(size); +// this.queueFullMeter = metricRegistry.meter(QUEUE_FULL_METRIC_STRING + namespace); +// this.unusedDataMeter = metricRegistry.meter(UNUSED_INDICES_METRIC_STRING + namespace); + } + + public synchronized void setId(int id) { + // Don't store new data if the queue is already full of unused data. + if (lastIdx.get() >= firstIdx.get() + size - 1) { +// queueFullMeter.mark(); + return; + } + val arrayIdx = lastIdx.get() % size; + queue.set(arrayIdx, id); + lastIdx.getAndIncrement(); + } + + private int getId(int index) { + val arrayIdx = index % size; + return queue.get(arrayIdx); + } + + public synchronized Optional getNextId() { + if (firstIdx.get() < lastIdx.get()) { + val id = getId(firstIdx.getAndIncrement()); + return Optional.of(id); + } else { + return Optional.empty(); + } + } + + public void reset() { + val unusedIds = lastIdx.get() - firstIdx.get(); +// unusedDataMeter.mark(unusedIds); + lastIdx.set(0); + firstIdx.set(0); + } +} diff --git a/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/Constants.java b/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/Constants.java index f191b90e..fb97d73e 100644 --- a/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/Constants.java +++ b/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/Constants.java @@ -25,4 +25,7 @@ public class Constants { public static final int MAX_ID_PER_MS = 1000; public static final int MAX_NUM_NODES = 10000; + public static final int MAX_IDS_PER_SECOND = 1_000_000; + public static final int DEFAULT_DATA_STORAGE_TIME_LIMIT_IN_SECONDS = 60; + public static final int DEFAULT_PARTITION_RETRY_COUNT = 100; } diff --git a/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/GenerationResult.java b/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/GenerationResult.java index 6ad14fa0..e0d40128 100644 --- a/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/GenerationResult.java +++ b/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/GenerationResult.java @@ -1,10 +1,13 @@ package io.appform.ranger.discovery.bundle.id; -import lombok.Value; +import lombok.Builder; +import lombok.Getter; -@Value +@Getter +@Builder public class GenerationResult { IdInfo idInfo; IdValidationState state; String domain; + String namespace; } diff --git a/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/IdGenerator.java b/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/IdGenerator.java index 5959e8e9..ff7038ae 100644 --- a/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/IdGenerator.java +++ b/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/IdGenerator.java @@ -166,7 +166,7 @@ public static Optional generateWithConstraints( String prefix, final List inConstraints, boolean skipGlobal) { - return generate(IdGenerationRequest.builder() + return generate(IdGenerationRequest.builder() .prefix(prefix) .constraints(inConstraints) .skipGlobal(skipGlobal) diff --git a/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/IdPool.java b/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/IdPool.java new file mode 100644 index 00000000..92e7f4a3 --- /dev/null +++ b/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/IdPool.java @@ -0,0 +1,26 @@ +package io.appform.ranger.discovery.bundle.id; + +import com.codahale.metrics.MetricRegistry; + +import java.util.Optional; + +public class IdPool { + /** List of IDs for the specific IdPool */ + private final CircularQueue queue; + + public IdPool(int size, final MetricRegistry metricRegistry, final String namespace) { + this.queue = new CircularQueue(size, metricRegistry, namespace); + } + + public void setId(int id) { + queue.setId(id); + } + + public Optional getNextId() { + return queue.getNextId(); + } + + public void reset() { + queue.reset(); + } +} diff --git a/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/IdUtils.java b/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/IdUtils.java new file mode 100644 index 00000000..ea1afa9e --- /dev/null +++ b/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/IdUtils.java @@ -0,0 +1,14 @@ +package io.appform.ranger.discovery.bundle.id; + +import lombok.val; +import org.joda.time.DateTime; + + +public class IdUtils { + public static DateTime getDateTimeFromSeconds(long seconds) { + // Convert seconds to milliSeconds + val millis = seconds * 1000L; + // Get DateTime object from milliSeconds + return new DateTime(millis); + } +} diff --git a/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/PartitionIdTracker.java b/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/PartitionIdTracker.java new file mode 100644 index 00000000..3b522115 --- /dev/null +++ b/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/PartitionIdTracker.java @@ -0,0 +1,75 @@ +package io.appform.ranger.discovery.bundle.id; + +import com.codahale.metrics.MetricRegistry; +import com.google.common.base.Preconditions; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import lombok.val; + +import java.time.Instant; +import java.util.concurrent.atomic.AtomicInteger; + +import static io.appform.ranger.discovery.bundle.id.Constants.MAX_IDS_PER_SECOND; + +/** + * Tracks generated IDs and pointers for generating next IDs for a partition. + */ +@Slf4j +public class PartitionIdTracker { + /** Array to store IdPools for each partition */ + private final IdPool[] idPoolList; + + /** Counter to keep track of the number of IDs created */ + private final AtomicInteger nextIdCounter = new AtomicInteger(); + + @Getter + private Instant instant; + +// private final Meter generatedIdCountMeter; + + public PartitionIdTracker(final int partitionSize, + final int idPoolSize, + final Instant instant, + final MetricRegistry metricRegistry, + final String namespace) { + this.instant = instant; + idPoolList = new IdPool[partitionSize]; + for (int i=0; i namespaceConfig = Collections.emptySet(); + + @NotNull + @Min(1) + private int partitionCount; + + /** Buffer time to pre-generate IDs for */ + @Min(1) + @Max(300) + @Builder.Default + private int dataStorageLimitInSeconds = Constants.DEFAULT_DATA_STORAGE_TIME_LIMIT_IN_SECONDS; + + /** Retry limit for selecting a valid partition. Not required for unconstrained scenarios */ + @Min(1) + @Builder.Default + private int partitionRetryCount = Constants.DEFAULT_PARTITION_RETRY_COUNT; + + @ValidationMethod(message = "Namespaces should be unique") + @JsonIgnore + public boolean areNamespacesUnique() { + Set namespaces = namespaceConfig.stream() + .map(NamespaceConfig::getNamespace) + .collect(Collectors.toSet()); + return namespaceConfig.size() == namespaces.size(); + } + + @ValidationMethod(message = "Invalid Partition Range") + @JsonIgnore + public boolean isPartitionCountValid() { + if (weightedIdConfig != null) { + List sortedPartitions = new ArrayList<>(weightedIdConfig.getPartitions()); + sortedPartitions.sort(Comparator.comparingInt(k -> k.getPartitionRange().getStart())); + return sortedPartitions.get(sortedPartitions.size() - 1).getPartitionRange().getEnd() - sortedPartitions.get(0).getPartitionRange().getStart() + 1 == partitionCount; + } + return true; + } + +} \ No newline at end of file diff --git a/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/config/NamespaceConfig.java b/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/config/NamespaceConfig.java new file mode 100644 index 00000000..f9e6255b --- /dev/null +++ b/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/config/NamespaceConfig.java @@ -0,0 +1,22 @@ +package io.appform.ranger.discovery.bundle.id.config; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; + +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class NamespaceConfig { + @NotNull + private String namespace; + + /** Size of pre-generated id buffer. Value from DefaultNamespaceConfig will be used if this is null */ + @Min(2) + private Integer idPoolSizePerBucket; +} diff --git a/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/config/PartitionRange.java b/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/config/PartitionRange.java new file mode 100644 index 00000000..8bef7d69 --- /dev/null +++ b/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/config/PartitionRange.java @@ -0,0 +1,32 @@ +package io.appform.ranger.discovery.bundle.id.config; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import io.dropwizard.validation.ValidationMethod; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class PartitionRange { + @NotNull + @Min(0) + private int start; + + /* end partition is inclusive in range */ + @NotNull + @Min(0) + private int end; + + @ValidationMethod(message = "Partition Range should be non-decreasing") + @JsonIgnore + public boolean isRangeValid() { + return start <= end; + } +} diff --git a/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/config/WeightedIdConfig.java b/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/config/WeightedIdConfig.java new file mode 100644 index 00000000..048a77b5 --- /dev/null +++ b/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/config/WeightedIdConfig.java @@ -0,0 +1,42 @@ +package io.appform.ranger.discovery.bundle.id.config; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import io.dropwizard.validation.ValidationMethod; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import javax.validation.Valid; +import javax.validation.constraints.NotNull; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +@Slf4j +@Getter +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class WeightedIdConfig { + @NotNull + @Valid + private List partitions; + + @ValidationMethod(message = "Invalid Partition Range. Partitions should be continuous.") + @JsonIgnore + public boolean isPartitionWeightsValid() { + List sortedPartitions = new ArrayList<>(partitions); + sortedPartitions.sort(Comparator.comparingInt(k -> k.getPartitionRange().getStart())); + for (int i = 0; i < sortedPartitions.size() - 1; i++) { + WeightedPartition currentPartition = sortedPartitions.get(i); + WeightedPartition nextPartition = sortedPartitions.get(i + 1); + if (currentPartition.getPartitionRange().getEnd() + 1 != nextPartition.getPartitionRange().getStart()) { + log.error("Expected next partitionRange to start with {} but was: {}", currentPartition.getPartitionRange().getEnd() + 1, nextPartition.getPartitionRange().getStart()); + return false; + } + } + return true; + } +} \ No newline at end of file diff --git a/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/config/WeightedPartition.java b/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/config/WeightedPartition.java new file mode 100644 index 00000000..4367871a --- /dev/null +++ b/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/config/WeightedPartition.java @@ -0,0 +1,24 @@ +package io.appform.ranger.discovery.bundle.id.config; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; + +import javax.validation.Valid; +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; + +@Getter +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class WeightedPartition { + @NotNull + @Valid + private PartitionRange partitionRange; + + @NotNull + @Min(1) + private int weight; +} diff --git a/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/formatter/IdFormatters.java b/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/formatter/IdFormatters.java index 30bca972..33b1393b 100644 --- a/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/formatter/IdFormatters.java +++ b/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/formatter/IdFormatters.java @@ -23,6 +23,7 @@ public class IdFormatters { private static final IdFormatter originalIdFormatter = new DefaultIdFormatter(); private static final IdFormatter base36IdFormatter = new Base36IdFormatter(originalIdFormatter); + private static final IdFormatter secondPrecisionIdFormatter = new SecondPrecisionIdFormatter(); public static IdFormatter original() { return originalIdFormatter; @@ -32,4 +33,8 @@ public static IdFormatter base36() { return base36IdFormatter; } + public static IdFormatter secondPrecision() { + return secondPrecisionIdFormatter; + } + } diff --git a/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/formatter/SecondPrecisionIdFormatter.java b/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/formatter/SecondPrecisionIdFormatter.java new file mode 100644 index 00000000..5207fdc8 --- /dev/null +++ b/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/formatter/SecondPrecisionIdFormatter.java @@ -0,0 +1,18 @@ +package io.appform.ranger.discovery.bundle.id.formatter; + +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + + +public class SecondPrecisionIdFormatter implements IdFormatter { + + private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormat.forPattern("yyMMddHHmmss"); + + @Override + public String format(final DateTime dateTime, + final int nodeId, + final int randomNonce) { + return String.format("%s%04d%06d", DATE_TIME_FORMATTER.print(dateTime), nodeId, randomNonce); + } +} diff --git a/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/generator/DistributedIdGenerator.java b/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/generator/DistributedIdGenerator.java new file mode 100644 index 00000000..41195711 --- /dev/null +++ b/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/generator/DistributedIdGenerator.java @@ -0,0 +1,45 @@ +package io.appform.ranger.discovery.bundle.id.generator; + +import com.codahale.metrics.MetricRegistry; +import io.appform.ranger.discovery.bundle.id.Id; +import io.appform.ranger.discovery.bundle.id.config.IdGeneratorConfig; +import io.appform.ranger.discovery.bundle.id.formatter.IdFormatter; +import io.appform.ranger.discovery.bundle.id.formatter.IdFormatters; +import io.appform.ranger.discovery.bundle.id.nonce.NonceGeneratorType; +import lombok.val; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + +import java.time.Clock; +import java.util.function.Function; +import java.util.regex.Pattern; + + +public class DistributedIdGenerator extends IdGeneratorBase { + private static final int MINIMUM_ID_LENGTH = 22; + private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormat.forPattern("yyMMddHHmmss"); + private static final Pattern PATTERN = Pattern.compile("(.*)([0-9]{12})([0-9]{4})([0-9]{6})"); + + public DistributedIdGenerator(final IdGeneratorConfig idGeneratorConfig, + final Function partitionResolverSupplier, + final NonceGeneratorType nonceGeneratorType, + final IdFormatter idFormatter, + final MetricRegistry metricRegistry, + final Clock clock) { + super(idGeneratorConfig, partitionResolverSupplier, nonceGeneratorType, idFormatter, metricRegistry, clock, MINIMUM_ID_LENGTH, DATE_TIME_FORMATTER, PATTERN); + } + + public DistributedIdGenerator(final IdGeneratorConfig idGeneratorConfig, + final Function partitionResolverSupplier, + final NonceGeneratorType nonceGeneratorType, + final MetricRegistry metricRegistry, + final Clock clock) { + this(idGeneratorConfig, partitionResolverSupplier, nonceGeneratorType, IdFormatters.secondPrecision(), metricRegistry, clock); + } + + public Id generateForPartition(final String namespace, final int targetPartitionId) { + val idInfo = nonceGenerator.generateForPartition(namespace, targetPartitionId); + return nonceGenerator.getIdFromIdInfo(idInfo, namespace, getIdFormatter()); + } + +} diff --git a/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/generator/IdGeneratorBase.java b/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/generator/IdGeneratorBase.java index 301e88dd..2ed6ab33 100644 --- a/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/generator/IdGeneratorBase.java +++ b/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/generator/IdGeneratorBase.java @@ -1,21 +1,29 @@ package io.appform.ranger.discovery.bundle.id.generator; +import com.codahale.metrics.MetricRegistry; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import io.appform.ranger.discovery.bundle.id.Domain; import io.appform.ranger.discovery.bundle.id.Id; import io.appform.ranger.discovery.bundle.id.IdInfo; +import io.appform.ranger.discovery.bundle.id.config.IdGeneratorConfig; import io.appform.ranger.discovery.bundle.id.constraints.IdValidationConstraint; import io.appform.ranger.discovery.bundle.id.formatter.IdFormatter; import io.appform.ranger.discovery.bundle.id.nonce.NonceGeneratorBase; +import io.appform.ranger.discovery.bundle.id.nonce.NonceGeneratorType; +import io.appform.ranger.discovery.bundle.id.nonce.PartitionAwareNonceGenerator; import io.appform.ranger.discovery.bundle.id.nonce.RandomNonceGenerator; +import io.appform.ranger.discovery.bundle.id.nonce.WeightedNonceGenerator; import io.appform.ranger.discovery.bundle.id.request.IdGenerationRequest; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import lombok.val; import org.joda.time.format.DateTimeFormatter; +import java.time.Clock; import java.util.List; import java.util.Optional; +import java.util.function.Function; import java.util.regex.Pattern; /** @@ -29,6 +37,7 @@ public class IdGeneratorBase { protected final DateTimeFormatter DATE_TIME_FORMATTER; private final Pattern PATTERN; private static int NODE_ID; + @Getter private final IdFormatter idFormatter; protected final NonceGeneratorBase nonceGenerator; @@ -47,6 +56,22 @@ public IdGeneratorBase(final IdFormatter idFormatter, this.PATTERN = pattern; } + protected IdGeneratorBase(final IdGeneratorConfig idGeneratorConfig, + final Function partitionResolverSupplier, + final NonceGeneratorType nonceGeneratorType, + final IdFormatter idFormatter, + final MetricRegistry metricRegistry, + final Clock clock, + final int minimumIdLength, + final DateTimeFormatter dateTimeFormatter, + final Pattern pattern) { + this.idFormatter = idFormatter; + this.nonceGenerator = getNonceGenerator(nonceGeneratorType, idGeneratorConfig, partitionResolverSupplier, metricRegistry, clock); + this.MINIMUM_ID_LENGTH = minimumIdLength; + this.DATE_TIME_FORMATTER = dateTimeFormatter; + this.PATTERN = pattern; + } + public synchronized void cleanUp() { nonceGenerator.cleanUp(); } @@ -147,4 +172,18 @@ public Optional parse(final String idString) { return Optional.empty(); } } + + private NonceGeneratorBase getNonceGenerator(final NonceGeneratorType nonceGeneratorType, + final IdGeneratorConfig idGeneratorConfig, + final Function partitionResolverSupplier, + final MetricRegistry metricRegistry, + final Clock clock) { + switch (nonceGeneratorType) { + case DISTRIBUTED: return new PartitionAwareNonceGenerator(NODE_ID, idGeneratorConfig, partitionResolverSupplier, idFormatter, metricRegistry, clock); + case WEIGHTED_DISTRIBUTED: return new WeightedNonceGenerator(NODE_ID, idGeneratorConfig, partitionResolverSupplier, idFormatter, metricRegistry, clock); + case RANDOM: + default: return new RandomNonceGenerator(NODE_ID, idFormatter); + } + } + } diff --git a/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/nonce/NonceGeneratorBase.java b/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/nonce/NonceGeneratorBase.java index 68e098e2..c53c4861 100644 --- a/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/nonce/NonceGeneratorBase.java +++ b/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/nonce/NonceGeneratorBase.java @@ -9,6 +9,7 @@ import io.appform.ranger.discovery.bundle.id.formatter.IdFormatters; import io.appform.ranger.discovery.bundle.id.request.IdGenerationRequest; import lombok.Getter; +import lombok.val; import org.joda.time.DateTime; import java.security.SecureRandom; @@ -60,6 +61,17 @@ public synchronized void registerDomainSpecificConstraints( .build()); } + public Id getIdFromIdInfo(final IdInfo idInfo, final String namespace, final IdFormatter idFormatter) { + val dateTime = getDateTimeFromTime(idInfo.getTime()); + val id = String.format("%s%s", namespace, idFormatter.format(dateTime, getNodeId(), idInfo.getExponent())); + return Id.builder() + .id(id) + .exponent(idInfo.getExponent()) + .generatedDate(dateTime.toDate()) + .node(getNodeId()) + .build(); + } + /** * Generate id with given namespace * @@ -86,7 +98,7 @@ public abstract Optional generateWithConstraints(final String namespace, final List inConstraints, final boolean skipGlobal); - public abstract Id getIdFromIdInfo(IdInfo idInfo, final String namespace, final IdFormatter idFormatter); + public abstract IdInfo generateForPartition(final String namespace, final int targetPartitionId) ; public abstract DateTime getDateTimeFromTime(final long time); } diff --git a/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/nonce/NonceGeneratorType.java b/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/nonce/NonceGeneratorType.java new file mode 100644 index 00000000..245d6514 --- /dev/null +++ b/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/nonce/NonceGeneratorType.java @@ -0,0 +1,7 @@ +package io.appform.ranger.discovery.bundle.id.nonce; + +public enum NonceGeneratorType { + RANDOM, + DISTRIBUTED, + WEIGHTED_DISTRIBUTED +} diff --git a/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/nonce/PartitionAwareNonceGenerator.java b/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/nonce/PartitionAwareNonceGenerator.java new file mode 100644 index 00000000..a3095942 --- /dev/null +++ b/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/nonce/PartitionAwareNonceGenerator.java @@ -0,0 +1,269 @@ +package io.appform.ranger.discovery.bundle.id.nonce; + +import com.codahale.metrics.MetricRegistry; +import com.google.common.base.Strings; +import dev.failsafe.Failsafe; +import dev.failsafe.FailsafeExecutor; +import dev.failsafe.RetryPolicy; +import io.appform.ranger.discovery.bundle.id.Domain; +import io.appform.ranger.discovery.bundle.id.GenerationResult; +import io.appform.ranger.discovery.bundle.id.Id; +import io.appform.ranger.discovery.bundle.id.IdInfo; +import io.appform.ranger.discovery.bundle.id.IdUtils; +import io.appform.ranger.discovery.bundle.id.IdValidationState; +import io.appform.ranger.discovery.bundle.id.PartitionIdTracker; +import io.appform.ranger.discovery.bundle.id.config.IdGeneratorConfig; +import io.appform.ranger.discovery.bundle.id.config.NamespaceConfig; +import io.appform.ranger.discovery.bundle.id.constraints.IdValidationConstraint; +import io.appform.ranger.discovery.bundle.id.formatter.IdFormatter; +import io.appform.ranger.discovery.bundle.id.formatter.IdFormatters; +import io.appform.ranger.discovery.bundle.id.request.IdGenerationRequest; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import lombok.val; +import org.joda.time.DateTime; + +import java.time.Clock; +import java.time.Instant; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; + + +@SuppressWarnings("unused") +@Slf4j +@Getter +public class PartitionAwareNonceGenerator extends NonceGeneratorBase { + private final FailsafeExecutor RETRYER; + private final Map idStore = new ConcurrentHashMap<>(); + private final Function partitionResolver; + private final IdGeneratorConfig idGeneratorConfig; + private final MetricRegistry metricRegistry; + private final Clock clock; + HashSet timeKeys = new HashSet<>(); + + + /** idStore Structure + { + namespace: [ + timestamp: { + partitions: [ + { + ids: [], + pointer: int + }, + { + ids: [], + pointer: int + } ... + ], + nextIdCounter: int + } + ] + } + */ + + public PartitionAwareNonceGenerator(final int nodeId, + final IdGeneratorConfig idGeneratorConfig, + final Function partitionResolverSupplier, + final IdFormatter idFormatter, + final MetricRegistry metricRegistry, + final Clock clock) { + super(nodeId, idFormatter); + this.idGeneratorConfig = idGeneratorConfig; + this.partitionResolver = partitionResolverSupplier; + this.metricRegistry = metricRegistry; + this.clock = clock; + RetryPolicy retryPolicy = RetryPolicy.builder() + .withMaxAttempts(idGeneratorConfig.getPartitionRetryCount()) + .handleIf(throwable -> true) + .handleResultIf(Objects::isNull) + .handleResultIf(generationResult -> generationResult.getState() == IdValidationState.INVALID_RETRYABLE) + .onRetry(event -> { + val res = event.getLastResult(); + if (null != res && !res.getState().equals(IdValidationState.VALID)) { + reAddId(res.getNamespace(), res.getIdInfo()); + } + }) + .build(); + RETRYER = Failsafe.with(Collections.singletonList(retryPolicy)); + } + + protected PartitionAwareNonceGenerator(final int nodeId, + final IdGeneratorConfig idGeneratorConfig, + final Function partitionResolverSupplier, + final MetricRegistry metricRegistry, + final Clock clock) { + this(nodeId, idGeneratorConfig, partitionResolverSupplier, IdFormatters.secondPrecision(), metricRegistry, clock); + } + + @Override + public IdInfo generate(final String namespace) { + val targetPartitionId = getTargetPartitionId(); + return generateForPartition(namespace, targetPartitionId); + } + + public IdInfo generateForPartition(final String namespace, final int targetPartitionId) { + val instant = clock.instant(); + val prefixIdMap = idStore.computeIfAbsent(namespace, k -> getAndInitPartitionIdTrackers(namespace, instant)); + val partitionTracker = getPartitionTracker(prefixIdMap, instant); + val idCounter = generateForAllPartitions( + partitionTracker, + namespace, + targetPartitionId); + val dateTime = getDateTimeFromTime(partitionTracker.getInstant().getEpochSecond()); + val id = String.format("%s%s", namespace, getIdFormatter().format(dateTime, getNodeId(), idCounter)); + return new IdInfo(idCounter, partitionTracker.getInstant().getEpochSecond()); + } + + private Integer generateForAllPartitions(final PartitionIdTracker partitionIdTracker, + final String namespace, + final int targetPartitionId) { + val idPool = partitionIdTracker.getPartition(targetPartitionId); + Optional idOptional = idPool.getNextId(); + while (idOptional.isEmpty()) { + val idInfo = partitionIdTracker.getIdInfo(); + val dateTime = IdUtils.getDateTimeFromSeconds(idInfo.getTime()); + val txnId = String.format("%s%s", namespace, getIdFormatter().format(dateTime, getNodeId(), idInfo.getExponent())); + val mappedPartitionId = partitionResolver.apply(txnId); + partitionIdTracker.addId(mappedPartitionId, idInfo); + idOptional = idPool.getNextId(); + } + return idOptional.get(); + } + + /** + * Generate id that matches all passed constraints. + * NOTE: There are performance implications for this. + * The evaluation of constraints will take its toll on id generation rates. + * + * @param namespace String namespace + * @param domain Domain for constraint selection + * @param skipGlobal Skip global constrains and use only passed ones + * @return ID if it could be generated + */ + @Override + public Optional generateWithConstraints(final String namespace, final String domain, final boolean skipGlobal) { + return generateWithConstraints(IdGenerationRequest.builder() + .prefix(namespace) + .domain(domain) + .constraints(REGISTERED_DOMAINS.getOrDefault(domain, Domain.DEFAULT).getConstraints()) + .skipGlobal(skipGlobal) + .build()); + } + + @Override + public Optional generateWithConstraints(final String namespace, + final List inConstraints, + final boolean skipGlobal) { + return generateWithConstraints(IdGenerationRequest.builder() + .prefix(namespace) + .constraints(inConstraints) + .skipGlobal(skipGlobal) + .build()); + } + + @Override + public Optional generateWithConstraints(final IdGenerationRequest request) { + val instant = clock.instant(); + val prefixIdMap = idStore.computeIfAbsent(request.getPrefix(), k -> getAndInitPartitionIdTrackers(request.getPrefix(), clock.instant())); + val partitionIdTracker = getPartitionTracker(prefixIdMap, instant); + return Optional.ofNullable(RETRYER.get( + () -> { + val targetPartitionId = getTargetPartitionId(); + val idInfo = generateForPartition(request.getPrefix(), targetPartitionId); + return GenerationResult.builder() + .idInfo(idInfo) + .state(validateId(request.getConstraints(), getIdFromIdInfo(idInfo, request.getPrefix(), getIdFormatter()), request.isSkipGlobal())) + .domain(request.getDomain()) + .build(); + })) + .filter(generationResult -> generationResult.getState() == IdValidationState.VALID) + .map(GenerationResult::getIdInfo); + } + + protected int getTargetPartitionId() { + return getSECURE_RANDOM().nextInt(idGeneratorConfig.getPartitionCount()); + } + + private IdValidationState validateId(final List inConstraints, final Id id, boolean skipGlobal) { + //First evaluate global constraints + val failedGlobalConstraint + = skipGlobal + ? null + : getGLOBAL_CONSTRAINTS().stream() + .filter(constraint -> !constraint.isValid(id)) + .findFirst() + .orElse(null); + if (null != failedGlobalConstraint) { + return failedGlobalConstraint.failFast() + ? IdValidationState.INVALID_NON_RETRYABLE + : IdValidationState.INVALID_RETRYABLE; + } + //Evaluate local + domain constraints + val failedLocalConstraint + = null == inConstraints + ? null + : inConstraints.stream() + .filter(constraint -> !constraint.isValid(id)) + .findFirst() + .orElse(null); + if (null != failedLocalConstraint) { + return failedLocalConstraint.failFast() + ? IdValidationState.INVALID_NON_RETRYABLE + : IdValidationState.INVALID_RETRYABLE; + } + return IdValidationState.VALID; + } + + private int getIdPoolSize(String namespace) { + val idPoolSizeOptional = idGeneratorConfig.getNamespaceConfig().stream() + .filter(namespaceConfig -> namespaceConfig.getNamespace().equals(namespace)) + .map(NamespaceConfig::getIdPoolSizePerBucket) + .filter(Objects::nonNull) + .findFirst(); + return idPoolSizeOptional.orElseGet(() -> idGeneratorConfig.getDefaultNamespaceConfig().getIdPoolSizePerPartition()); + } + + private PartitionIdTracker[] getAndInitPartitionIdTrackers(final String namespace, final Instant instant) { + val partitionTrackerList = new PartitionIdTracker[idGeneratorConfig.getDataStorageLimitInSeconds()]; + for (int i = 0; i getAndInitPartitionIdTrackers(namespace, clock.instant())); + val partitionIdTracker = getPartitionTracker(prefixIdMap, instant); + val mappedPartitionId = partitionResolver.apply(getIdFromIdInfo(idInfo, namespace, getIdFormatter()).getId()); + partitionIdTracker.addId(mappedPartitionId, idInfo); + } + +} diff --git a/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/nonce/RandomNonceGenerator.java b/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/nonce/RandomNonceGenerator.java index f5751af2..010e9923 100644 --- a/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/nonce/RandomNonceGenerator.java +++ b/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/nonce/RandomNonceGenerator.java @@ -78,14 +78,14 @@ public Optional generateWithConstraints(final IdGenerationRequest reques .getCollisionChecker() : Domain.DEFAULT.getCollisionChecker(); return Optional.ofNullable(RETRYER.get( - () -> { + () -> { IdInfo idInfo = random(collisionChecker); val id = getIdFromIdInfo(idInfo, request.getPrefix(), request.getIdFormatter()); - return new GenerationResult(idInfo, - validateId(request.getConstraints(), - id, - request.isSkipGlobal()), - request.getDomain()); + return GenerationResult.builder() + .idInfo(idInfo) + .state(validateId(request.getConstraints(), id, request.isSkipGlobal())) + .domain(null) + .build(); })) .filter(generationResult -> generationResult.getState() == IdValidationState.VALID) .map(GenerationResult::getIdInfo); @@ -97,12 +97,21 @@ public Optional generateWithConstraints(final String namespace, final Li () -> { val idInfo = generate(namespace); val id = getIdFromIdInfo(idInfo, namespace, getIdFormatter()); - return new GenerationResult(idInfo, validateId(inConstraints, id, skipGlobal), null); + return GenerationResult.builder() + .idInfo(idInfo) + .state(validateId(inConstraints, id, skipGlobal)) + .domain(null) + .build(); })) .filter(generationResult -> generationResult.getState() == IdValidationState.VALID) .map(GenerationResult::getIdInfo); } + @Override + public IdInfo generateForPartition(final String namespace, int targetPartitionId) { + return generate(namespace); + } + private IdInfo random(final CollisionChecker collisionChecker) { int randomGen; long time; @@ -158,18 +167,6 @@ private IdValidationState validateId(final List inConstr return IdValidationState.VALID; } - @Override - public Id getIdFromIdInfo(IdInfo idInfo, final String namespace, final IdFormatter idFormatter) { - val dateTime = getDateTimeFromTime(idInfo.getTime()); - val id = String.format("%s%s", namespace, idFormatter.format(dateTime, getNodeId(), idInfo.getExponent())); - return Id.builder() - .id(id) - .exponent(idInfo.getExponent()) - .generatedDate(dateTime.toDate()) - .node(getNodeId()) - .build(); - } - @Override public DateTime getDateTimeFromTime(final long time) { return new DateTime(time); diff --git a/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/nonce/WeightedNonceGenerator.java b/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/nonce/WeightedNonceGenerator.java new file mode 100644 index 00000000..797cac0a --- /dev/null +++ b/ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/nonce/WeightedNonceGenerator.java @@ -0,0 +1,68 @@ +package io.appform.ranger.discovery.bundle.id.nonce; + +import com.codahale.metrics.MetricRegistry; +import com.google.common.base.Preconditions; +import com.google.common.collect.Range; +import com.google.common.collect.RangeMap; +import com.google.common.collect.TreeRangeMap; +import io.appform.ranger.discovery.bundle.id.config.IdGeneratorConfig; +import io.appform.ranger.discovery.bundle.id.config.PartitionRange; +import io.appform.ranger.discovery.bundle.id.config.WeightedIdConfig; +import io.appform.ranger.discovery.bundle.id.formatter.IdFormatter; +import io.appform.ranger.discovery.bundle.id.formatter.IdFormatters; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import lombok.val; + +import java.time.Clock; +import java.util.Objects; +import java.util.function.Function; + + +@SuppressWarnings("unused") +@Slf4j +public class WeightedNonceGenerator extends PartitionAwareNonceGenerator { + @Getter + private int maxShardWeight; + private final RangeMap partitionRangeMap; + + + public WeightedNonceGenerator(final int nodeId, + final IdGeneratorConfig idGeneratorConfig, + final Function partitionResolverSupplier, + final IdFormatter idFormatter, + final MetricRegistry metricRegistry, + final Clock clock) { + super(nodeId, idGeneratorConfig, partitionResolverSupplier, metricRegistry, clock); + Preconditions.checkNotNull(getIdGeneratorConfig().getWeightedIdConfig()); + partitionRangeMap = createWeightRangeMap(getIdGeneratorConfig().getWeightedIdConfig()); + } + + protected WeightedNonceGenerator(final int nodeId, + final IdGeneratorConfig idGeneratorConfig, + final Function partitionResolverSupplier, + final MetricRegistry metricRegistry, + final Clock clock) { + this(nodeId, idGeneratorConfig, partitionResolverSupplier, IdFormatters.secondPrecision(), metricRegistry, clock); + } + + private RangeMap createWeightRangeMap(final WeightedIdConfig weightedIdConfig) { + RangeMap partitionGroups = TreeRangeMap.create(); + int endWeight = -1; + for (val partition : weightedIdConfig.getPartitions()) { + int startWeight = endWeight + 1; + endWeight += partition.getWeight(); + partitionGroups.put(Range.closed(startWeight, endWeight), partition.getPartitionRange()); + } + maxShardWeight = endWeight; + return partitionGroups; + } + + @Override + protected int getTargetPartitionId() { + val randomNum = getSECURE_RANDOM().nextInt(maxShardWeight); + val partitionRange = Objects.requireNonNull(partitionRangeMap.getEntry(randomNum)).getValue(); + return getSECURE_RANDOM().nextInt(partitionRange.getEnd() - partitionRange.getStart() + 1) + partitionRange.getStart(); + } + +} diff --git a/ranger-discovery-bundle/src/test/java/io/appform/ranger/discovery/bundle/id/PartitionAwareNonceGeneratorTest.java b/ranger-discovery-bundle/src/test/java/io/appform/ranger/discovery/bundle/id/PartitionAwareNonceGeneratorTest.java new file mode 100644 index 00000000..f46d2bca --- /dev/null +++ b/ranger-discovery-bundle/src/test/java/io/appform/ranger/discovery/bundle/id/PartitionAwareNonceGeneratorTest.java @@ -0,0 +1,332 @@ +package io.appform.ranger.discovery.bundle.id; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import io.appform.ranger.discovery.bundle.id.config.DefaultNamespaceConfig; +import io.appform.ranger.discovery.bundle.id.config.IdGeneratorConfig; +import io.appform.ranger.discovery.bundle.id.constraints.IdValidationConstraint; +import io.appform.ranger.discovery.bundle.id.formatter.IdFormatters; +import io.appform.ranger.discovery.bundle.id.nonce.NonceGeneratorType; +import io.appform.ranger.discovery.bundle.id.generator.DistributedIdGenerator; +import lombok.extern.slf4j.Slf4j; +import lombok.val; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.math.BigInteger; +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +/** + * Test for {@link DistributedIdGenerator} + */ +@SuppressWarnings({"unused", "FieldMayBeFinal"}) +@Slf4j +class PartitionAwareNonceGeneratorTest { + final int numThreads = 5; + final int iterationCount = 100000; + final int partitionCount = 1024; + final Function partitionResolverSupplier = (txnId) -> Integer.parseInt(txnId.substring(txnId.length() - 6)) % partitionCount; + protected IdGeneratorConfig idGeneratorConfig = + IdGeneratorConfig.builder() + .partitionCount(partitionCount) + .defaultNamespaceConfig(DefaultNamespaceConfig.builder().idPoolSizePerPartition(100).build()) + .build(); + protected DistributedIdGenerator distributedIdGenerator; + private MetricRegistry metricRegistry = mock(MetricRegistry.class); + protected NonceGeneratorType nonceGeneratorType; + + @BeforeEach + void setup() { + nonceGeneratorType = NonceGeneratorType.DISTRIBUTED; + val metricRegistry = mock(MetricRegistry.class); + val meter = mock(Meter.class); + doReturn(meter).when(metricRegistry).meter(anyString()); + doNothing().when(meter).mark(); + distributedIdGenerator = new DistributedIdGenerator( + idGeneratorConfig, partitionResolverSupplier, nonceGeneratorType, metricRegistry, Clock.systemDefaultZone() + ); + } + + @Test + void testGenerateWithBenchmark() throws IOException { + val allIdsList = Collections.synchronizedList(new ArrayList()); + val totalTime = TestUtil.runMTTest( + numThreads, + iterationCount, + (k) -> { + val id = distributedIdGenerator.generate("P"); + allIdsList.add(id); + }, + true, + this.getClass().getName() + ".testGenerateWithBenchmark"); + Assertions.assertEquals(numThreads * iterationCount, allIdsList.size()); + checkUniqueIds(allIdsList); + checkDistribution(allIdsList, partitionResolverSupplier, idGeneratorConfig); + } + + @Test + void testGenerateWithConstraints() throws IOException { + val allIdsList = Collections.synchronizedList(new ArrayList()); + IdValidationConstraint partitionConstraint = (k) -> k.getExponent() % 4 == 0; + val iterationCount = 50000; + distributedIdGenerator.registerGlobalConstraints((k) -> k.getExponent() % 4 == 0); + val totalTime = TestUtil.runMTTest( + numThreads, + iterationCount, + (k) -> { + val id = distributedIdGenerator.generateWithConstraints("P", List.of(), false); + id.ifPresent(allIdsList::add); + }, + false, + this.getClass().getName() + ".testGenerateWithConstraints"); + checkUniqueIds(allIdsList); + +// Assert No ID was generated for Invalid partitions + for (val id: allIdsList) { + Assertions.assertTrue(partitionConstraint.isValid(id)); + } + } + + @Test + void testGenerateAccuracy() throws IOException { + val allIdsList = Collections.synchronizedList(new ArrayList()); + val iterationCount = 500000; + val totalIdCount = numThreads * iterationCount; + val totalTime = TestUtil.runMTTest( + numThreads, + iterationCount, + (k) -> { + val id = distributedIdGenerator.generate("P"); + allIdsList.add(id); + }, + false, + this.getClass().getName() + ".testGenerateAccuracy"); + checkUniqueIds(allIdsList); + checkDistribution(allIdsList, partitionResolverSupplier, idGeneratorConfig); + } + + void checkUniqueIds(List allIdsList) { + List allIdStringList = new ArrayList<>(List.of()); + for (Id id: allIdsList) { + allIdStringList.add(id.getId()); + } + HashSet uniqueIds = new HashSet<>(allIdStringList); + Map frequencyMap = new HashMap<>(); + + // Count occurrences of each integer + for (String num : allIdStringList) { + frequencyMap.put(num, frequencyMap.getOrDefault(num, 0) + 1); + } + // Print integers with count >= 2 + for (Map.Entry entry : frequencyMap.entrySet()) { + if (entry.getValue() >= 2) { + System.out.println("Integer: " + entry.getKey() + ", Count: " + entry.getValue()); + } + } + Assertions.assertEquals(allIdsList.size(), uniqueIds.size()); + } + + protected HashMap getIdCountMap(List allIdsList, Function partitionResolver) { + val idCountMap = new HashMap(); + for (val id: allIdsList) { + val partitionId = partitionResolver.apply(id.getId()); + idCountMap.put(partitionId, idCountMap.getOrDefault(partitionId, 0) + 1); + } + return idCountMap; + } + + protected void checkDistribution(List allIdsList, Function partitionResolver, IdGeneratorConfig config) { + val idCountMap = getIdCountMap(allIdsList, partitionResolver); + val expectedIdCount = (double) allIdsList.size() / config.getPartitionCount(); + for (int partitionId=0; partitionId < config.getPartitionCount(); partitionId++) { + Assertions.assertTrue(expectedIdCount * 0.8 <= idCountMap.get(partitionId), + String.format("Partition %s generated %s ids, expected was more than: %s", + partitionId, idCountMap.get(partitionId), expectedIdCount * 0.8)); + Assertions.assertTrue(idCountMap.get(partitionId) <= expectedIdCount * 1.2, + String.format("Partition %s generated %s ids, expected was less than: %s", + partitionId, idCountMap.get(partitionId), expectedIdCount * 1.2)); + } + } + + @Test + void testUniqueIds() { + HashSet allIDs = new HashSet<>(); + boolean allIdsUnique = true; + for (int i = 0; i < iterationCount; i += 1) { + val txnId = distributedIdGenerator.generate("P").getId(); + if (allIDs.contains(txnId)) { + allIdsUnique = false; + } else { + allIDs.add(txnId); + } + } + Assertions.assertTrue(allIdsUnique); + } + + @Test + void testFirstAndLastPartitionInclusion() throws IOException { + val allIdsList = Collections.synchronizedList(new ArrayList()); + val totalIdCount = numThreads * iterationCount; + val totalTime = TestUtil.runMTTest( + numThreads, + iterationCount, + (k) -> { + val id = distributedIdGenerator.generate("P"); + allIdsList.add(id); + }, + false, + this.getClass().getName() + ".testGenerateAccuracy"); + + val idCountMap = getIdCountMap(allIdsList, partitionResolverSupplier); + Assertions.assertTrue(idCountMap.get(0) > 0); + Assertions.assertTrue(idCountMap.get(partitionCount-1) > 0); + } + + @Test + void testDataReset() throws IOException { + val allIdsList = Collections.synchronizedList(new ArrayList()); + val clock = mock(Clock.class); + doReturn(Instant.parse("2000-01-01T00:00:00Z")).when(clock).instant(); + val distributedIdGenerator = new DistributedIdGenerator(idGeneratorConfig, partitionResolverSupplier, nonceGeneratorType, metricRegistry, clock); + TestUtil.runMTTest( + numThreads, + iterationCount, + (k) -> { + val id = distributedIdGenerator.generate("P"); + allIdsList.add(id); + }, + false, null); + doReturn(Instant.parse("2000-01-01T00:01:00Z")).when(clock).instant(); + TestUtil.runMTTest( + numThreads, + iterationCount, + (k) -> { + val id = distributedIdGenerator.generate("P"); + allIdsList.add(id); + }, + false, null); + Assertions.assertEquals(2 * numThreads * iterationCount, allIdsList.size()); + checkUniqueIds(allIdsList); + } + + @Test + void testComputation() throws IOException { + val partitionResolverCount = new AtomicInteger(0); + Function partitionResolverSupplierWithCount = (t) -> { + partitionResolverCount.incrementAndGet(); + return partitionResolverSupplier.apply(t); + }; + val localdistributedIdGenerator = new DistributedIdGenerator(idGeneratorConfig, partitionResolverSupplierWithCount, nonceGeneratorType, metricRegistry, Clock.systemDefaultZone()); + val allIdsList = Collections.synchronizedList(new ArrayList()); + val totalTime = TestUtil.runMTTest( + numThreads, + iterationCount, + (k) -> { + val id = localdistributedIdGenerator.generate("P"); + allIdsList.add(id); + }, + false, + null); + val expectedIdCount = numThreads * iterationCount; + Assertions.assertEquals(expectedIdCount, allIdsList.size()); + checkUniqueIds(allIdsList); + log.warn("partitionResolverSupplier was called {} times - expected count was: {}", partitionResolverCount.get(), expectedIdCount); + } + + @Test + void testGenerateOriginal() { + distributedIdGenerator = new DistributedIdGenerator( + idGeneratorConfig, partitionResolverSupplier, nonceGeneratorType, mock(MetricRegistry.class), Clock.systemDefaultZone() + ); + val idOptional = distributedIdGenerator.generate("TEST"); + String id = idOptional.getId(); + Assertions.assertEquals(26, id.length()); + } + + @Test + void testGenerateBase36() { + val distributedIdGeneratorLocal = new DistributedIdGenerator( + idGeneratorConfig, + (txnId) -> new BigInteger(txnId.substring(txnId.length() - 6), 36).abs().intValue() % partitionCount, + nonceGeneratorType, + IdFormatters.base36(), + mock(MetricRegistry.class), + Clock.systemDefaultZone() + ); + String id = distributedIdGeneratorLocal.generate("TEST").getId(); + Assertions.assertEquals(18, id.length()); + } + + @Test + void testConstraintFailure() { + Assertions.assertFalse(distributedIdGenerator.generateWithConstraints( + "TST", + Collections.singletonList((id -> false)), + true).isPresent()); + } + + @Test + void testParseFailure() { + //Null or Empty String + Assertions.assertFalse(distributedIdGenerator.parse(null).isPresent()); + Assertions.assertFalse(distributedIdGenerator.parse("").isPresent()); + + //Invalid length + Assertions.assertFalse(distributedIdGenerator.parse("TEST").isPresent()); + + //Invalid chars + Assertions.assertFalse(distributedIdGenerator.parse("XCL983dfb1ee0a847cd9e7321fcabc2f223").isPresent()); + Assertions.assertFalse(distributedIdGenerator.parse("XCL98-3df-b1e:e0a847cd9e7321fcabc2f223").isPresent()); + + //Invalid month + Assertions.assertFalse(distributedIdGenerator.parse("ABC2032250959030643972247").isPresent()); + //Invalid date + Assertions.assertFalse(distributedIdGenerator.parse("ABC2011450959030643972247").isPresent()); + //Invalid hour + Assertions.assertFalse(distributedIdGenerator.parse("ABC2011259659030643972247").isPresent()); + //Invalid minute + Assertions.assertFalse(distributedIdGenerator.parse("ABC2011250972030643972247").isPresent()); + //Invalid sec + Assertions.assertFalse(distributedIdGenerator.parse("ABC2011250959720643972247").isPresent()); + } + + @Test + void testParseSuccess() { + val idString = "ABC2011250959030643972247"; + val id = distributedIdGenerator.parse(idString).orElse(null); + Assertions.assertNotNull(id); + Assertions.assertEquals(idString, id.getId()); + Assertions.assertEquals(972247, id.getExponent()); + Assertions.assertEquals(643, id.getNode()); + Assertions.assertEquals(TestUtil.generateDate(2020, 11, 25, 9, 59, 3, 0, ZoneId.systemDefault()), + id.getGeneratedDate()); + } + + @Test + void testParseSuccessAfterGeneration() { + val generatedId = distributedIdGenerator.generate("TEST123"); + val parsedId = distributedIdGenerator.parse(generatedId.getId()).orElse(null); + Assertions.assertNotNull(parsedId); + Assertions.assertEquals(parsedId.getId(), generatedId.getId()); + Assertions.assertEquals(parsedId.getExponent(), generatedId.getExponent()); + Assertions.assertEquals(parsedId.getNode(), generatedId.getNode()); + } + +} \ No newline at end of file diff --git a/ranger-discovery-bundle/src/test/java/io/appform/ranger/discovery/bundle/id/TestUtil.java b/ranger-discovery-bundle/src/test/java/io/appform/ranger/discovery/bundle/id/TestUtil.java new file mode 100644 index 00000000..83653d27 --- /dev/null +++ b/ranger-discovery-bundle/src/test/java/io/appform/ranger/discovery/bundle/id/TestUtil.java @@ -0,0 +1,84 @@ +package io.appform.ranger.discovery.bundle.id; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.dropwizard.logback.shaded.guava.base.Stopwatch; +import lombok.experimental.UtilityClass; +import lombok.extern.slf4j.Slf4j; +import lombok.val; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.Date; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +@Slf4j +@UtilityClass +public class TestUtil { + private static final String OUTPUT_PATH = "perf/results/%s.json"; + + public double runMTTest(int numThreads, int iterationCount, final Consumer supplier, final boolean save_output, final String outputFileName) throws IOException { + final ExecutorService executorService = Executors.newFixedThreadPool(numThreads); + final List> futures = IntStream.range(0, numThreads) + .mapToObj(i -> executorService.submit(() -> { + final Stopwatch stopwatch = Stopwatch.createStarted(); + IntStream.range(0, iterationCount).forEach(supplier::accept); + return stopwatch.elapsed(TimeUnit.MILLISECONDS); + })) + .collect(Collectors.toList()); + final long total = futures.stream() + .mapToLong(f -> { + try { + return f.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return 0; + } catch (ExecutionException e) { + throw new IllegalStateException(e); + } + }) + .sum(); + if (save_output){ + writeToFile(numThreads, iterationCount, total, outputFileName); + } + log.warn("Finished Execution for {} iterations in avg time: {}", iterationCount, ((double) total) / numThreads); + return total; + } + + public void writeToFile(int numThreads, int iterationCount, long totalMillis, String outputFileName) throws IOException { + val mapper = new ObjectMapper(); + val outputFilePath = Paths.get(String.format(OUTPUT_PATH, outputFileName)); + val outputNode = mapper.createObjectNode(); + outputNode.put("name", outputFileName); + outputNode.put("iterations", iterationCount); + outputNode.put("threads", numThreads); + outputNode.put("totalMillis", totalMillis); + outputNode.put("avgTime", ((double) totalMillis) / numThreads); + Files.write(outputFilePath, mapper.writerWithDefaultPrettyPrinter().writeValueAsBytes(outputNode)); + } + + public Date generateDate(int year, int month, int day, int hour, int min, int sec, int ms, ZoneId zoneId) { + return Date.from( + Instant.from( + ZonedDateTime.of( + LocalDateTime.of( + year, month, day, hour, min, sec, Math.multiplyExact(ms, 1000000) + ), + zoneId + ) + ) + ); + } +} diff --git a/ranger-discovery-bundle/src/test/java/io/appform/ranger/discovery/bundle/id/WeightedIdGeneratorPerfTest.java b/ranger-discovery-bundle/src/test/java/io/appform/ranger/discovery/bundle/id/WeightedIdGeneratorPerfTest.java new file mode 100644 index 00000000..46f37cc3 --- /dev/null +++ b/ranger-discovery-bundle/src/test/java/io/appform/ranger/discovery/bundle/id/WeightedIdGeneratorPerfTest.java @@ -0,0 +1,80 @@ +package io.appform.ranger.discovery.bundle.id; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import io.appform.ranger.discovery.bundle.id.config.DefaultNamespaceConfig; +import io.appform.ranger.discovery.bundle.id.config.IdGeneratorConfig; +import io.appform.ranger.discovery.bundle.id.config.WeightedIdConfig; +import io.appform.ranger.discovery.bundle.id.config.PartitionRange; +import io.appform.ranger.discovery.bundle.id.nonce.NonceGeneratorType; +import io.appform.ranger.discovery.bundle.id.generator.DistributedIdGenerator; +import io.appform.ranger.discovery.bundle.id.config.WeightedPartition; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import lombok.val; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.infra.Blackhole; + +import java.io.IOException; +import java.time.Clock; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; + +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +/** + * Test performance between different constructs + */ +@Slf4j +public class WeightedIdGeneratorPerfTest extends BenchmarkTest { + + @State(Scope.Benchmark) + public static class BenchmarkState { + private DistributedIdGenerator distributedIdGenerator; + final Function partitionResolverSupplier = (txnId) -> Integer.parseInt(txnId.substring(txnId.length()-6)) % 1024; + + + @Setup(Level.Trial) + public void setUp() throws IOException { + val metricRegistry = mock(MetricRegistry.class); + val meter = mock(Meter.class); + doReturn(meter).when(metricRegistry).meter(anyString()); + doNothing().when(meter).mark(); + List partitionConfigList = new ArrayList<>(); + partitionConfigList.add(WeightedPartition.builder() + .partitionRange(PartitionRange.builder().start(0).end(511).build()) + .weight(400).build()); + partitionConfigList.add(WeightedPartition.builder() + .partitionRange(PartitionRange.builder().start(512).end(1023).build()) + .weight(600).build()); + val weightedIdConfig = WeightedIdConfig.builder() + .partitions(partitionConfigList) + .build(); + distributedIdGenerator = new DistributedIdGenerator( + IdGeneratorConfig.builder() + .partitionCount(1024) + .weightedIdConfig(weightedIdConfig) + .defaultNamespaceConfig(DefaultNamespaceConfig.builder().idPoolSizePerPartition(100).build()) + .build(), + partitionResolverSupplier, + NonceGeneratorType.WEIGHTED_DISTRIBUTED, + metricRegistry, + Clock.systemDefaultZone() + ); + } + } + + @SneakyThrows + @Benchmark + public void testGenerate(Blackhole blackhole, BenchmarkState state) { + state.distributedIdGenerator.generate("P"); + } +} \ No newline at end of file diff --git a/ranger-discovery-bundle/src/test/java/io/appform/ranger/discovery/bundle/id/WeightedNonceGeneratorTest.java b/ranger-discovery-bundle/src/test/java/io/appform/ranger/discovery/bundle/id/WeightedNonceGeneratorTest.java new file mode 100644 index 00000000..58ad67e7 --- /dev/null +++ b/ranger-discovery-bundle/src/test/java/io/appform/ranger/discovery/bundle/id/WeightedNonceGeneratorTest.java @@ -0,0 +1,131 @@ +package io.appform.ranger.discovery.bundle.id; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import io.appform.ranger.discovery.bundle.id.config.DefaultNamespaceConfig; +import io.appform.ranger.discovery.bundle.id.config.IdGeneratorConfig; +import io.appform.ranger.discovery.bundle.id.config.WeightedIdConfig; +import io.appform.ranger.discovery.bundle.id.config.PartitionRange; +import io.appform.ranger.discovery.bundle.id.nonce.NonceGeneratorType; +import io.appform.ranger.discovery.bundle.id.generator.DistributedIdGenerator; +import io.appform.ranger.discovery.bundle.id.config.WeightedPartition; +import lombok.extern.slf4j.Slf4j; +import lombok.val; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.time.Clock; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; + +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +/** + * Test for {@link DistributedIdGenerator} + */ +@Slf4j +@SuppressWarnings({"unused", "FieldMayBeFinal"}) +class WeightedNonceGeneratorTest extends PartitionAwareNonceGeneratorTest { + final int numThreads = 5; + final int iterationCountPerThread = 100000; + final int partitionCount = 1024; + final Function partitionResolverSupplier = (txnId) -> Integer.parseInt(txnId.substring(txnId.length() - 6)) % partitionCount; + private MetricRegistry metricRegistry = mock(MetricRegistry.class); + + @BeforeEach + void setup() { + nonceGeneratorType = NonceGeneratorType.WEIGHTED_DISTRIBUTED; + val meter = mock(Meter.class); + doReturn(meter).when(metricRegistry).meter(anyString()); + doNothing().when(meter).mark(); + List partitionConfigList = new ArrayList<>(); + partitionConfigList.add(WeightedPartition.builder() + .partitionRange(PartitionRange.builder().start(0).end(511).build()) + .weight(400).build()); + partitionConfigList.add(WeightedPartition.builder() + .partitionRange(PartitionRange.builder().start(512).end(1023).build()) + .weight(600).build()); + val weightedIdConfig = WeightedIdConfig.builder() + .partitions(partitionConfigList) + .build(); + idGeneratorConfig = + IdGeneratorConfig.builder() + .partitionCount(partitionCount) + .weightedIdConfig(weightedIdConfig) + .defaultNamespaceConfig(DefaultNamespaceConfig.builder().idPoolSizePerPartition(100).build()) + .build(); + distributedIdGenerator = new DistributedIdGenerator(idGeneratorConfig, partitionResolverSupplier, nonceGeneratorType, metricRegistry, Clock.systemDefaultZone()); + } + + protected void checkDistribution(List allIdsList, Function partitionResolver, IdGeneratorConfig config) { + val idCountMap = getIdCountMap(allIdsList, partitionResolver); + int maxShardWeight = 1000; + for (WeightedPartition partition: config.getWeightedIdConfig().getPartitions()) { + val expectedIdCount = ((double) partition.getWeight() / maxShardWeight) * ((double) allIdsList.size() / (partition.getPartitionRange().getEnd()-partition.getPartitionRange().getStart()+1)); + int idCountForPartition = 0; + for (int partitionId = partition.getPartitionRange().getStart(); partitionId <= partition.getPartitionRange().getEnd(); partitionId++) { + Assertions.assertTrue(expectedIdCount * 0.9 <= idCountMap.get(partitionId), + String.format("Partition %s generated %s ids, expected was more than: %s", + partitionId, idCountMap.get(partitionId), expectedIdCount * 0.9)); + Assertions.assertTrue(idCountMap.get(partitionId) <= expectedIdCount * 1.1, + String.format("Partition %s generated %s ids, expected was less than: %s", + partitionId, idCountMap.get(partitionId), expectedIdCount * 1.1)); + idCountForPartition += idCountMap.get(partitionId); + } + log.warn("Partition ID Count: {} - Percentage: {}", idCountForPartition, (double) idCountForPartition * 100 / allIdsList.size()); + } + } + + + @Test + void testGenerateForMinimumRangePartition() throws IOException { + val partitionCount = 10; + Function partitionResolver = (txnId) -> Integer.parseInt(txnId.substring(txnId.length() - 6)) % partitionCount; + List partitionConfigList = List.of( + WeightedPartition.builder() + .partitionRange(PartitionRange.builder().start(0).end(0).build()) + .weight(125).build(), + WeightedPartition.builder() + .partitionRange(PartitionRange.builder().start(1).end(1).build()) + .weight(150).build(), + WeightedPartition.builder() + .partitionRange(PartitionRange.builder().start(2).end(8).build()) + .weight(525).build(), + WeightedPartition.builder() + .partitionRange(PartitionRange.builder().start(9).end(9).build()) + .weight(200).build() + ); + val weightedIdConfig = WeightedIdConfig.builder() + .partitions(partitionConfigList) + .build(); + val idGeneratorConfig = + IdGeneratorConfig.builder() + .partitionCount(partitionCount) + .weightedIdConfig(weightedIdConfig) + .defaultNamespaceConfig(DefaultNamespaceConfig.builder().idPoolSizePerPartition(100).build()) + .build(); + val distributedIdGenerator = new DistributedIdGenerator(idGeneratorConfig, partitionResolver, NonceGeneratorType.WEIGHTED_DISTRIBUTED, metricRegistry, Clock.systemDefaultZone()); + val allIdsList = Collections.synchronizedList(new ArrayList()); + val iterationCount = 100000; + val totalIdCount = numThreads * iterationCount; + val totalTime = TestUtil.runMTTest( + numThreads, + iterationCount, + (k) -> { + val id = distributedIdGenerator.generate("P"); + allIdsList.add(id); + }, + false, + this.getClass().getName() + ".testGenerateAccuracy"); + checkUniqueIds(allIdsList); + checkDistribution(allIdsList, partitionResolver, idGeneratorConfig); + } + +} \ No newline at end of file