Skip to content

Commit

Permalink
[Kernel][Metrics][PR#1] Adds initial interfaces and a Timer class (#3902
Browse files Browse the repository at this point in the history
)

<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [ ] Spark
- [ ] Standalone
- [ ] Flink
- [X] Kernel
- [ ] Other (fill in here)

## Description

Adds the initial interfaces for
#3905 as well as a `Timer` class
that will be used in follow-up PRs.

## How was this patch tested?

Just interface changes.

## Does this PR introduce _any_ user-facing changes?

No.
  • Loading branch information
allisonport-db authored Dec 4, 2024
1 parent a52578b commit 82c2f64
Show file tree
Hide file tree
Showing 6 changed files with 347 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package io.delta.kernel.engine;

import io.delta.kernel.annotation.Evolving;
import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -76,4 +78,9 @@ public interface Engine {
*/
CommitCoordinatorClientHandler getCommitCoordinatorClientHandler(
String name, Map<String, String> conf);

/** Get the engine's {@link MetricsReporter} instances to push reports to. */
default List<MetricsReporter> getMetricsReporters() {
return Collections.emptyList();
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.engine;

import io.delta.kernel.metrics.MetricsReport;

/** Interface for reporting metrics for operations to a Delta table */
public interface MetricsReporter {

/** Indicates that an operation is done by reporting a {@link MetricsReport} */
void report(MetricsReport report);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.internal.metrics;

import static io.delta.kernel.internal.util.Preconditions.checkArgument;

import java.util.concurrent.Callable;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Supplier;

/** A timer class for measuring the duration of operations in nanoseconds */
public class Timer {
private final LongAdder count = new LongAdder();
private final LongAdder totalTime = new LongAdder();

/** @return the number of times this timer was used to record a duration. */
public long count() {
return count.longValue();
}

/** @return the total duration that was recorded in nanoseconds */
public long totalDuration() {
return totalTime.longValue();
}

/**
* Starts the timer and returns a {@link Timed} instance. Call {@link Timed#stop()} to complete
* the timing.
*
* @return A {@link Timed} instance with the start time recorded.
*/
public Timed start() {
return new DefaultTimed(this);
}

/**
* Records a custom amount.
*
* @param amount The amount to record in nanoseconds
*/
public void record(long amount) {
checkArgument(amount >= 0, "Cannot record %s: must be >= 0", amount);
this.totalTime.add(amount);
this.count.increment();
}

public <T> T time(Supplier<T> supplier) {
try (Timed ignore = start()) {
return supplier.get();
}
}

public <T> T timeCallable(Callable<T> callable) throws Exception {
try (Timed ignore = start()) {
return callable.call();
}
}

public void time(Runnable runnable) {
try (Timed ignore = start()) {
runnable.run();
}
}

/**
* A timing sample that carries internal state about the Timer's start position. The timing can be
* completed by calling {@link Timed#stop()}.
*/
public interface Timed extends AutoCloseable {
/** Stops the timer and records the total duration up until {@link Timer#start()} was called. */
void stop();

@Override
default void close() {
stop();
}

Timed NOOP = () -> {};
}

private static class DefaultTimed implements Timed {
private final Timer timer;
private final long startTime;
private boolean closed;

private DefaultTimed(Timer timer) {
this.timer = timer;
this.startTime = System.nanoTime();
}

@Override
public void stop() {
if (closed) {
throw new IllegalStateException("called stop() multiple times");
}
timer.record(System.nanoTime() - startTime);
closed = true;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.metrics;

import java.util.Optional;
import java.util.UUID;

/** Defines the common fields that are shared by reports for Delta operations */
public interface DeltaOperationReport extends MetricsReport {

/** @return the path of the table */
String tablePath();

/** @return a string representation of the operation this report is for */
String operationType();

/** @return the exception thrown if this report is for a failed operation, otherwise empty */
Optional<Exception> exception();

/** @return a unique ID for this report */
UUID reportUUID();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.metrics;

/** Interface containing the metrics for a given operation */
public interface MetricsReport {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.metrics

import java.util.concurrent.Callable
import java.util.function.Supplier

import io.delta.kernel.internal.metrics.Timer
import org.scalatest.funsuite.AnyFunSuite

class MetricsUtilsSuite extends AnyFunSuite {

def millisToNanos(millis: Long): Long = {
millis*1000000
}

/**
* @param incrementFx Function given (duration, timer) increments the timer by approximately
* duration ms
*/
def testTimer(incrementFx: (Long, Timer) => Unit): Unit = {
val timer = new Timer()
assert(timer.count == 0)
assert(timer.totalDuration == 0)

val incrementAmt1 = 0
val paddedEndTimeOp1 = incrementAmt1 + 5 // We pad each operation by 5ms
incrementFx(incrementAmt1, timer)
assert(timer.count == 1)
assert(timer.totalDuration >= millisToNanos(incrementAmt1) &&
timer.totalDuration < millisToNanos(paddedEndTimeOp1))

val incrementAmt2 = 20
val paddedEndTimeOp2 = paddedEndTimeOp1 + incrementAmt2 + 5 // 30
incrementFx(incrementAmt2, timer)
assert(timer.count == 2)
assert(timer.totalDuration >= millisToNanos(incrementAmt1 + incrementAmt2) &&
timer.totalDuration < millisToNanos(paddedEndTimeOp2))

val incrementAmt3 = 50
val paddedEndTimeOp3 = paddedEndTimeOp2 + incrementAmt3 + 5 // 85
incrementFx(incrementAmt3, timer)
assert(timer.count == 3)
assert(timer.totalDuration >= millisToNanos(incrementAmt1 + incrementAmt2 + incrementAmt3) &&
timer.totalDuration < millisToNanos(paddedEndTimeOp3))
}

test("Timer class") {
// Using Timer.record()
testTimer((amount, timer) => timer.record(millisToNanos(amount)))

// Using Timer.start()
testTimer((amount, timer) => {
val timed = timer.start()
Thread.sleep(amount)
timed.stop()
})

// Using Timer.time(supplier)
def supplier(amount: Long): Supplier[Long] = {
() => {
Thread.sleep(amount)
amount
}
}
testTimer((amount, timer) => {
timer.time(supplier(amount))
})

// Using Timer.timeCallable
def callable(amount: Long): Callable[Long] = {
() => {
Thread.sleep(amount)
amount
}
}
testTimer((amount, timer) => {
timer.timeCallable(callable(amount))
})

// Using Timer.time(runnable)
def runnable(amount: Long): Runnable = {
() => Thread.sleep(amount)
}
testTimer((amount, timer) => {
timer.time(runnable(amount))
})
}

test("Timer class with exceptions") {
// We catch the exception outside of the functional interfaces
def catchException(fx: () => Any): Unit = {
try {
fx.apply()
} catch {
case _: Exception =>
}
}

// Using Timer.time(supplier)
def supplier(amount: Long): Supplier[Long] = {
() => {
Thread.sleep(amount)
throw new RuntimeException()
}
}
testTimer((amount, timer) => {
catchException(() => timer.time(supplier(amount)))
})

// Using Timer.timeCallable
def callable(amount: Long): Callable[Long] = {
() => {
Thread.sleep(amount)
throw new RuntimeException()
}
}
testTimer((amount, timer) => {
catchException(() => timer.timeCallable(callable(amount)))
})

// Using Timer.time(runnable)
def runnable(amount: Long): Runnable = {
() => {
Thread.sleep(amount)
throw new RuntimeException()
}
}
testTimer((amount, timer) => {
catchException(() => timer.time(runnable(amount)))
})
}
}

0 comments on commit 82c2f64

Please sign in to comment.