Skip to content

Commit

Permalink
Add predownload functionality to Pinot
Browse files Browse the repository at this point in the history
  • Loading branch information
lnbest0707-uber committed Dec 19, 2024
1 parent 90b437f commit f23ae1a
Show file tree
Hide file tree
Showing 18 changed files with 1,957 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
UPSERT_VALID_DOC_ID_SNAPSHOT_COUNT("upsertValidDocIdSnapshotCount", false),
UPSERT_PRIMARY_KEYS_IN_SNAPSHOT_COUNT("upsertPrimaryKeysInSnapshotCount", false),
REALTIME_INGESTION_OFFSET_LAG("offsetLag", false),
REALTIME_CONSUMER_DIR_USAGE("bytes", true);
REALTIME_CONSUMER_DIR_USAGE("bytes", true),
SEGMENT_DOWNLOAD_SPEED("bytes", true),
PREDOWNLOAD_SPEED("bytes", true);

private final String _gaugeName;
private final String _unit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,13 @@ public enum ServerMeter implements AbstractMetrics.Meter {
* That means that if a stage has 10 workers and all of them reach the limit, this will be increased by 1.
* But if a single query has 2 different window operators and each one reaches the limit, this will be increased by 2.
*/
WINDOW_TIMES_MAX_ROWS_REACHED("times", true);
WINDOW_TIMES_MAX_ROWS_REACHED("times", true),

// predownload metrics
SEGMENT_DOWNLOAD_COUNT("predownloadSegmentCount", true),
SEGMENT_DOWNLOAD_FAILURE_COUNT("predownloadSegmentFailureCount", true),
PREDOWNLOAD_SUCCEED("predownloadSucceed", true),
PREDOWNLOAD_FAILED("predownloadFailed", true);

private final String _meterName;
private final String _unit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ public enum ServerTimer implements AbstractMetrics.Timer {
RECEIVE_UPSTREAM_WAIT_CPU_TIME_MS("millis", true),
// How long it took the server to start.
STARTUP_SUCCESS_DURATION_MS("millis", true),
STARTUP_FAILURE_DURATION_MS("millis", true);
STARTUP_FAILURE_DURATION_MS("millis", true),

PREDOWNLOAD_TIME("millis", true);

private final String _timerName;
private final boolean _global;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public long getDiskSizeBytes() {
// or default columns will not exist.

// check that v3 subdirectory exists since the format may not have been converted
if (_segmentDirectory.exists()) {
if (_segmentDirectory != null && _segmentDirectory.exists()) {
try {
return FileUtils.sizeOfDirectory(_segmentDirectory.toPath().toFile());
} catch (IllegalArgumentException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.apache.pinot.tools.predownload;

public class PredownloadException extends RuntimeException {
public PredownloadException(String message) {
super(message);
}

public PredownloadException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.apache.pinot.tools.predownload;

import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.metrics.ServerTimer;


public class PredownloadMetrics {
private static final String INSTANCE_ID_TAG = "instance_id";
private static final String SEGMENT_ID_TAG = "segment_id";
private static final long BYTES_TO_MB = 1024 * 1024;

private final ServerMetrics _serverMetrics;

public PredownloadMetrics() {
_serverMetrics = ServerMetrics.get();
}

public void segmentDownloaded(boolean succeed, String segmentName, long segmentSizeBytes, long downloadTimeMs) {
if (succeed) {
_serverMetrics.addMeteredGlobalValue(ServerMeter.SEGMENT_DOWNLOAD_COUNT, 1);
// Report download speed in MB/s, avoid divide by 0
_serverMetrics.setValueOfGlobalGauge(ServerGauge.SEGMENT_DOWNLOAD_SPEED,
(segmentSizeBytes / BYTES_TO_MB) / (downloadTimeMs / 1000 + 1));
} else {
_serverMetrics.addMeteredGlobalValue(ServerMeter.SEGMENT_DOWNLOAD_FAILURE_COUNT, 1);
}
}

public void preDownloadSucceed(long totalSegmentSizeBytes, long totalDownloadTimeMs) {
_serverMetrics.setValueOfGlobalGauge(ServerGauge.PREDOWNLOAD_SPEED,
(totalSegmentSizeBytes / BYTES_TO_MB) / (totalDownloadTimeMs / 1000 + 1));
_serverMetrics.addTimedValue(ServerTimer.PREDOWNLOAD_TIME, totalDownloadTimeMs, TimeUnit.MILLISECONDS);
}

public void preDownloadComplete(PredownloadCompleteReason reason) {
if (reason.isSucceed()) {
_serverMetrics.addMeteredGlobalValue(ServerMeter.PREDOWNLOAD_SUCCEED, 1);
} else {
_serverMetrics.addMeteredGlobalValue(ServerMeter.PREDOWNLOAD_FAILED, 1);
}
}
}
Loading

0 comments on commit f23ae1a

Please sign in to comment.