Skip to content

Commit

Permalink
Merge pull request #15 from salesforce/ashcoder.intTolong
Browse files Browse the repository at this point in the history
Longmetric id support
  • Loading branch information
ashnacoder authored Oct 21, 2020
2 parents 84157be + 97f2d55 commit 3c65cfd
Show file tree
Hide file tree
Showing 41 changed files with 982 additions and 223 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;
Expand Down Expand Up @@ -59,6 +60,10 @@ public class CarbonjAdmin

private final NameUtils nameUtils;

@Value( "${metrics.store.longId:false}" )
private boolean longId;


private Supplier<RuntimeException> notConfigured = ( ) -> new RuntimeException(
"Time Series Store is not configured." );

Expand Down Expand Up @@ -120,7 +125,7 @@ public void listMetrics2( @PathVariable final String pattern, Writer response )
}

@RequestMapping( value = "/dumpnames", method = RequestMethod.GET )
public void dumpNames( @RequestParam( value = "startId", required = false, defaultValue = "0" ) int startId,
public void dumpNames( @RequestParam( value = "startId", required = false, defaultValue = "0" ) long startId,
@RequestParam( value = "startName", required = false ) String startName,
@RequestParam( value = "count", required = false ) Integer count,
@RequestParam( value = "filter", required = false ) String wildcard, Writer response )
Expand All @@ -138,7 +143,7 @@ public void dumpNames( @RequestParam( value = "startId", required = false, defau
}
try
{
tsStore().scanMetrics( startId, Integer.MAX_VALUE, m -> {
tsStore().scanMetrics( startId, getMaxId(), m -> {
if ( !filter.test( m ) )
{
return;
Expand All @@ -164,6 +169,10 @@ public void dumpNames( @RequestParam( value = "startId", required = false, defau
}
}

private long getMaxId() {
return longId ? Long.MAX_VALUE : Integer.MAX_VALUE;
}

private boolean loadLock = false;

private volatile boolean abortLoad = false;
Expand Down Expand Up @@ -528,6 +537,7 @@ static class StopException

static boolean hasDataSince( TimeSeriesStore ts, String metric, int from )
{

for ( String dbName : Arrays.asList( "30m2y", "5m7d", "60s24h" ) )
{
if ( null != ts.getFirst( dbName, metric, from, Integer.MAX_VALUE ) )
Expand All @@ -554,7 +564,7 @@ public void cleanSeries( @RequestParam( value = "from", required = false, defaul

try
{
ts.scanMetrics( 0, Integer.MAX_VALUE, m -> {
ts.scanMetrics( 0, getMaxId(), m -> {
if ( written.get() >= count )
{
// produced big enough result - interrupt execution through exception (signal "donness")
Expand Down Expand Up @@ -622,7 +632,7 @@ public void dumpSeries( @PathVariable final String dbName,
try
{
ts.scanMetrics( cursor,
Integer.MAX_VALUE,
getMaxId(),
m -> {
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public interface TimeSeriesStore

DataPointExportResults exportPoints( String dbName, String metricName );

DataPointExportResults exportPoints( String dbName, int metricId );
DataPointExportResults exportPoints( String dbName, long metricId );

// to support testing
Metric selectRandomMetric();
Expand All @@ -47,13 +47,13 @@ public interface TimeSeriesStore

Metric getMetric( String name, boolean createIfMissing );

Metric getMetric( int metricId );
Metric getMetric( long metricId );

String getMetricName( int metricId );
String getMetricName( long metricId );

void scanMetrics( Consumer<Metric> m );

int scanMetrics( int start, int end, Consumer<Metric> m );
long scanMetrics( long start, long end, Consumer<Metric> m );

List<Metric> findMetrics( String pattern );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ public class TimeSeriesStoreImpl implements TimeSeriesStore

private volatile long logNoOfSeriesThreshold;

private boolean longId;

public static ThreadPoolExecutor newSerialTaskQueue(int queueSize) {
ThreadFactory tf =
new ThreadFactoryBuilder()
Expand Down Expand Up @@ -142,7 +144,8 @@ public TimeSeriesStoreImpl(MetricRegistry metricRegistry, MetricIndex nameIndex,
ThreadPoolExecutor heavyQueryTaskQueue, ThreadPoolExecutor serialTaskQueue,
DataPointStore pointStore, DatabaseMetrics dbMetrics,
boolean batchedSeriesRetrieval, int batchedSeriesSize, boolean dumpIndex,
File dumpIndexFile, int maxNonLeafPointsLoggedPerMin, String metricsStoreConfigFile) {
File dumpIndexFile, int maxNonLeafPointsLoggedPerMin, String metricsStoreConfigFile,
boolean longId) {
this.nameIndex = Preconditions.checkNotNull(nameIndex);
this.eventLogger = eventLogger;
this.pointStore = Preconditions.checkNotNull(pointStore);
Expand All @@ -154,6 +157,7 @@ public TimeSeriesStoreImpl(MetricRegistry metricRegistry, MetricIndex nameIndex,
this.dumpIndex = dumpIndex;
this.dumpIndexFile = dumpIndexFile;
this.nonLeafPointsLogQuota = new Quota(maxNonLeafPointsLoggedPerMin, 60);
this.longId = longId;


rejectedCounter = metricRegistry.counter(
Expand Down Expand Up @@ -358,11 +362,11 @@ public DataPointExportResults exportPoints(String dbName, String metricName) {
}

@Override
public DataPointExportResults exportPoints(String dbName, int metricId) {
public DataPointExportResults exportPoints(String dbName, long metricId) {
return exportPoints(dbName, null, metricId);
}

private DataPointExportResults exportPoints(String dbName, String metricName, Integer metricId) {
private DataPointExportResults exportPoints(String dbName, String metricName, Long metricId) {
if (!RetentionPolicy.dbNameExists(dbName)) {
throw new RuntimeException(String.format("Unknown dbName [%s]", dbName));
}
Expand Down Expand Up @@ -641,13 +645,13 @@ public DeleteAPIResult deleteAPI( String name, boolean delete, Set<String> exclu
}

@Override
public Metric getMetric( int metricId )
public Metric getMetric( long metricId )
{
return nameIndex.getMetric( metricId );
}

@Override
public String getMetricName( int metricId )
public String getMetricName( long metricId )
{
return nameIndex.getMetricName( metricId );
}
Expand All @@ -664,11 +668,18 @@ public void deleteAll()
@Override
public void scanMetrics( Consumer<Metric> m )
{
scanMetrics( 0, Integer.MAX_VALUE, m );
if(longId)
{
scanMetrics( 0, Long.MAX_VALUE, m );
}
else
{
scanMetrics( 0, Integer.MAX_VALUE, m );
}
}

@Override
public int scanMetrics( int start, int end, Consumer<Metric> m )
public long scanMetrics( long start, long end, Consumer<Metric> m )
{
return nameIndex.scanNames( start, end, m );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
*/
package com.demandware.carbonj.service.db;

import java.io.File;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.codahale.metrics.MetricRegistry;
import com.demandware.carbonj.service.db.index.cfgMetricIndex;
import com.demandware.carbonj.service.db.model.DataPointStore;
import com.demandware.carbonj.service.db.model.MetricIndex;
import com.demandware.carbonj.service.db.points.cfgDataPoints;
import com.demandware.carbonj.service.db.util.DatabaseMetrics;
import com.demandware.carbonj.service.engine.cfgCentralThreadPools;
import com.demandware.carbonj.service.events.EventsLogger;
import com.demandware.carbonj.service.events.cfgCarbonjEventsLogger;
Expand All @@ -21,15 +22,12 @@
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.annotation.Import;

import com.demandware.carbonj.service.db.index.cfgMetricIndex;
import com.demandware.carbonj.service.db.model.DataPointStore;
import com.demandware.carbonj.service.db.model.MetricIndex;
import com.demandware.carbonj.service.db.points.cfgDataPoints;
import com.demandware.carbonj.service.db.util.DatabaseMetrics;
import java.io.File;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@Import( { cfgMetricIndex.class, cfgDataPoints.class, cfgCentralThreadPools.class, cfgCarbonjEventsLogger.class } )
@ConditionalOnProperty(name=cfgTimeSeriesStorage.DB_ENABLED_PROPERTY_KEY, havingValue="true", matchIfMissing=true)
Expand All @@ -39,6 +37,9 @@ public class cfgTimeSeriesStorage

public static final String DB_ENABLED_PROPERTY_KEY = "metrics.store.enabled";

@Value( "${metrics.store.longId:false}" )
private boolean longId;

@Value( "${metrics.store.fetchSeriesThreads:20}" )
private int nTaskThreads;

Expand Down Expand Up @@ -85,7 +86,8 @@ TimeSeriesStore timeSeriesStore( MetricIndex nameIndex, DataPointStore pointStor
TimeSeriesStoreImpl.newHeavyQueryTaskQueue( nHeavyQueryThreads, heavyQueryBlockingQueueSize ),
TimeSeriesStoreImpl.newSerialTaskQueue( serialQueueSize ), pointStore,
dbMetrics, batchedSeriesRetrieval,
batchedSeriesSize, dumpIndex, new File( dumpIndexFile ), maxNonLeafPointsLoggedPerMin, metricStoreConfigFile);
batchedSeriesSize, dumpIndex, new File( dumpIndexFile ), maxNonLeafPointsLoggedPerMin, metricStoreConfigFile,
longId);

s.scheduleWithFixedDelay(timeSeriesStore::reload, 60, 60, TimeUnit.SECONDS );
s.scheduleWithFixedDelay(timeSeriesStore::refreshStats, 60, 10, TimeUnit.SECONDS );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,19 @@
import com.google.common.base.Preconditions;

class IdRecord
implements Record<Integer>
implements Record<Long>
{
private Integer key;
private Long key;

private String metricName;

public IdRecord( Integer key, String metricName)
public IdRecord( Long key, String metricName)
{
this.key = Preconditions.checkNotNull(key);
this.metricName = Preconditions.checkNotNull(metricName);
}

public Integer key()
public Long key()
{
return key;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,47 +6,64 @@
*/
package com.demandware.carbonj.service.db.index;

import com.google.common.io.ByteArrayDataInput;
import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
import com.google.common.primitives.Ints;

import static java.nio.charset.StandardCharsets.UTF_8;
import com.google.common.primitives.Longs;

class IdRecordSerializer
implements RecordSerializer<Integer, IdRecord>
implements RecordSerializer<Long, IdRecord>
{
public IdRecordSerializer()
private boolean longId;

public IdRecordSerializer(boolean longId)
{
this.longId = longId;
}

@Override
public Integer key( byte[] keyBytes )
public Long key( byte[] keyBytes )
{
return Ints.fromByteArray( keyBytes );
return longId ? Longs.fromByteArray( keyBytes ) : Integer.valueOf(Ints.fromByteArray(keyBytes)).longValue();
}

@Override
public IdRecord toIndexEntry( byte[] keyBytes, byte[] valueBytes)
{
Integer key = key(keyBytes);
Long key = key(keyBytes);
return toIndexEntry( key, valueBytes);
}

@Override
public IdRecord toIndexEntry( Integer key, byte[] valueBytes)
public IdRecord toIndexEntry( Long key, byte[] valueBytes)
{
String value = new String(valueBytes, UTF_8);
return new IdRecord( key, value );
ByteArrayDataInput in = ByteStreams.newDataInput( valueBytes );
if(longId)
{
// a byte for versioning
byte entryType = in.readByte();
}
return new IdRecord( key, in.readUTF() );
}

@Override
public byte[] keyBytes(Integer key)
public byte[] keyBytes(Long key)
{
return Ints.toByteArray(key);
return longId ? Longs.toByteArray(key) : Ints.toByteArray(key.intValue());
}

@Override
public byte[] valueBytes(IdRecord e)
{
return e.metricName().getBytes( UTF_8 );
ByteArrayDataOutput out = ByteStreams.newDataOutput();
if(longId)
{
// leaving a byte for versioning
out.writeByte(0);
}
out.writeUTF(e.metricName());
return out.toByteArray();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ public interface IndexStore<K, R extends Record<K>>

K maxKey();

int scan( K startKey, K endKey, Consumer<R> c );
long scan( K startKey, K endKey, Consumer<R> c );
}
Loading

0 comments on commit 3c65cfd

Please sign in to comment.