Skip to content

Commit

Permalink
HIVE-28638: Refactor stats handling in StatsRecordingThreadPool
Browse files Browse the repository at this point in the history
Change-Id: Ia368abbe14aa55b5d24f172aaa453f69a8a57fd4
  • Loading branch information
abstractdog committed Nov 24, 2024
1 parent afe05b9 commit f9fb1f2
Show file tree
Hide file tree
Showing 4 changed files with 294 additions and 202 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.llap;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.FileSystem;
import org.apache.tez.common.counters.FileSystemCounter;
import org.apache.tez.common.counters.TezCounters;
import java.lang.management.ThreadMXBean;
import java.util.HashMap;
import java.util.List;
import java.util.Map;


/**
* Convenience class for handling thread local statistics for different schemes in LLAP.
* The motivation is that thread local stats (available from FileSystem.getAllStatistics()) are in a List, which
* doesn't guarantee that for a single scheme, a single Statistics object is returned (e.g. in case of multiple
* namenodes). This class encapsulates that data, and takes care of merging them transparently.
* LlapThreadLocalStatistics is used in LLAP's task ThreadPool, where the current thread's statistics is calculated by a
* simple delta computation (before/after running the task callable), like:
*
* LlapThreadLocalStatistics statsBefore = new LlapThreadLocalStatistics(...);
* LlapThreadLocalStatistics diff = new LlapThreadLocalStatistics(...).subtract(statsBefore);
*/
public class LlapThreadLocalStatistics {

/**
* LLAP IO related counters.
*/
public enum LlapExecutorCounters {
EXECUTOR_CPU_NS,
EXECUTOR_USER_NS;
}

@VisibleForTesting
Map<String, LlapThreadLocalStatistics.StatisticsData> schemeToThreadLocalStats = new HashMap<>();
@VisibleForTesting
long cpuTime;
@VisibleForTesting
long userTime;

/**
* In this constructor we create a snapshot of the current thread local statistics and take care of merging the
* ones that belong to the same scheme.
*/
public LlapThreadLocalStatistics(ThreadMXBean mxBean) {
this(mxBean, FileSystem.getAllStatistics());
}

/**
* Merges the list to a map.
* Input list:
* 1. FileSystem.Statistics (scheme: file)
* 2. FileSystem.Statistics (scheme: hdfs)
* 3. FileSystem.Statistics (scheme: hdfs)
* Output map:
* "file" -> LlapThreadLocalStatistics.StatisticsData (1)
* "hdfs" -> LlapThreadLocalStatistics.StatisticsData (2 + 3)
*/
public LlapThreadLocalStatistics(ThreadMXBean mxBean, List<FileSystem.Statistics> allStatistics) {
cpuTime = mxBean == null ? -1 : mxBean.getCurrentThreadCpuTime();
userTime = mxBean == null ? -1 : mxBean.getCurrentThreadUserTime();

for (FileSystem.Statistics statistics : allStatistics) {
schemeToThreadLocalStats.merge(statistics.getScheme(),
new StatisticsData(statistics.getThreadStatistics()),
(statsCurrent, statsNew) -> statsCurrent.merge(statistics.getThreadStatistics()));
}
}

// This method iterates on the other LlapThreadLocalStatistics's schemes, and subtract them from this one if it's
// present here too.
public LlapThreadLocalStatistics subtract(LlapThreadLocalStatistics other) {
for (Map.Entry<String, LlapThreadLocalStatistics.StatisticsData> otherStats :
other.schemeToThreadLocalStats.entrySet()){
schemeToThreadLocalStats.computeIfPresent(otherStats.getKey(),
(thisScheme, stats) -> stats.subtract(otherStats.getValue()));
}

cpuTime -= other.cpuTime;
userTime -= other.userTime;

return this;
}

public void fill(TezCounters tezCounters) {
for (Map.Entry<String, LlapThreadLocalStatistics.StatisticsData> threadLocalStats :
schemeToThreadLocalStats.entrySet()){
String scheme = threadLocalStats.getKey();
StatisticsData stats = threadLocalStats.getValue();
tezCounters.findCounter(scheme, FileSystemCounter.BYTES_READ).increment(stats.bytesRead);
tezCounters.findCounter(scheme, FileSystemCounter.BYTES_WRITTEN).increment(stats.bytesWritten);
tezCounters.findCounter(scheme, FileSystemCounter.READ_OPS).increment(stats.readOps);
tezCounters.findCounter(scheme, FileSystemCounter.LARGE_READ_OPS).increment(stats.largeReadOps);
tezCounters.findCounter(scheme, FileSystemCounter.WRITE_OPS).increment(stats.writeOps);
}

if (cpuTime >= 0 && userTime >= 0) {
tezCounters.findCounter(LlapThreadLocalStatistics.LlapExecutorCounters.EXECUTOR_CPU_NS).increment(cpuTime);
tezCounters.findCounter(LlapThreadLocalStatistics.LlapExecutorCounters.EXECUTOR_USER_NS).increment(userTime);
}
}

public String toString(){
return String.format("LlapThreadLocalStatistics: %s", schemeToThreadLocalStats.toString());
}

/**
* Convenience class over Hadoop's FileSystem.Statistics.StatisticsData.
* Unfortunately, neither the fields, nor the convenience methods (e.g. StatisticsData.add, StatisticsData.negate)
* are available here as they are package protected, so we cannot reuse them.
*/
public static class StatisticsData {
long bytesRead;
long bytesWritten;
int readOps;
int largeReadOps;
int writeOps;

public StatisticsData(FileSystem.Statistics.StatisticsData fsStats) {
this.bytesRead = fsStats.getBytesRead();
this.bytesWritten = fsStats.getBytesWritten();
this.readOps = fsStats.getReadOps();
this.largeReadOps = fsStats.getLargeReadOps();
this.writeOps = fsStats.getWriteOps();
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(" bytesRead: ").append(bytesRead);
sb.append(" bytesWritten: ").append(bytesWritten);
sb.append(" readOps: ").append(readOps);
sb.append(" largeReadOps: ").append(largeReadOps);
sb.append(" writeOps: ").append(writeOps);
return sb.toString();
}

public StatisticsData merge(FileSystem.Statistics.StatisticsData other) {
this.bytesRead += other.getBytesRead();
this.bytesWritten += other.getBytesWritten();
this.readOps += other.getReadOps();
this.largeReadOps += other.getLargeReadOps();
this.writeOps += other.getWriteOps();
return this;
}

public StatisticsData subtract(StatisticsData other) {
if (other == null){
return this;
}
this.bytesRead -= other.bytesRead;
this.bytesWritten -= other.bytesWritten;
this.readOps -= other.readOps;
this.largeReadOps -= other.largeReadOps;
this.writeOps -= other.writeOps;
return this;
}
}
}
108 changes: 0 additions & 108 deletions llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,12 @@
import java.lang.management.ThreadMXBean;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
Expand Down Expand Up @@ -125,109 +120,6 @@ public static String getUserNameFromPrincipal(String principal) {
return (components == null || components.length != 3) ? principal : components[0];
}

public static List<StatisticsData> getStatisticsForScheme(final String scheme,
final List<StatisticsData> stats) {
List<StatisticsData> result = new ArrayList<>();
if (stats != null && scheme != null) {
for (StatisticsData s : stats) {
if (s.getScheme().equalsIgnoreCase(scheme)) {
result.add(s);
}
}
}
return result;
}

public static Map<String, FileSystem.Statistics> getCombinedFileSystemStatistics() {
final List<FileSystem.Statistics> allStats = FileSystem.getAllStatistics();
final Map<String, FileSystem.Statistics> result = new HashMap<>();
for (FileSystem.Statistics statistics : allStats) {
final String scheme = statistics.getScheme();
if (result.containsKey(scheme)) {
FileSystem.Statistics existing = result.get(scheme);
FileSystem.Statistics combined = combineFileSystemStatistics(existing, statistics);
result.put(scheme, combined);
} else {
result.put(scheme, statistics);
}
}
return result;
}

private static FileSystem.Statistics combineFileSystemStatistics(final FileSystem.Statistics s1,
final FileSystem.Statistics s2) {
FileSystem.Statistics result = new FileSystem.Statistics(s1);
result.incrementReadOps(s2.getReadOps());
result.incrementLargeReadOps(s2.getLargeReadOps());
result.incrementWriteOps(s2.getWriteOps());
result.incrementBytesRead(s2.getBytesRead());
result.incrementBytesWritten(s2.getBytesWritten());
return result;
}

public static List<StatisticsData> cloneThreadLocalFileSystemStatistics() {
List<StatisticsData> result = new ArrayList<>();
// thread local filesystem stats is private and cannot be cloned. So make a copy to new class
for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) {
result.add(new StatisticsData(statistics.getScheme(), statistics.getThreadStatistics()));
}
return result;
}

public static class StatisticsData {
long bytesRead;
long bytesWritten;
int readOps;
int largeReadOps;
int writeOps;
String scheme;

public StatisticsData(String scheme, FileSystem.Statistics.StatisticsData fsStats) {
this.scheme = scheme;
this.bytesRead = fsStats.getBytesRead();
this.bytesWritten = fsStats.getBytesWritten();
this.readOps = fsStats.getReadOps();
this.largeReadOps = fsStats.getLargeReadOps();
this.writeOps = fsStats.getWriteOps();
}

public long getBytesRead() {
return bytesRead;
}

public long getBytesWritten() {
return bytesWritten;
}

public int getReadOps() {
return readOps;
}

public int getLargeReadOps() {
return largeReadOps;
}

public int getWriteOps() {
return writeOps;
}

public String getScheme() {
return scheme;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(" scheme: ").append(scheme);
sb.append(" bytesRead: ").append(bytesRead);
sb.append(" bytesWritten: ").append(bytesWritten);
sb.append(" readOps: ").append(readOps);
sb.append(" largeReadOps: ").append(largeReadOps);
sb.append(" writeOps: ").append(writeOps);
return sb.toString();
}
}

public static String getAmHostNameFromAddress(InetSocketAddress address, Configuration conf) {
if (!HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_DAEMON_AM_USE_FQDN)) {
return address.getHostName();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.llap;

import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.fs.FileSystem;
import org.junit.Assert;
import org.junit.Test;

import java.lang.management.ThreadMXBean;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class TestLlapThreadLocalStatistics {

private static final ThreadMXBean mxBean = LlapUtil.initThreadMxBean();

@Test
public void testEmptyStatistics() {
LlapThreadLocalStatistics before = new LlapThreadLocalStatistics(mxBean, new ArrayList<>());
LlapThreadLocalStatistics after = new LlapThreadLocalStatistics(mxBean, new ArrayList<>());
Assert.assertEquals(0, after.subtract(before).schemeToThreadLocalStats.keySet().size());
}

@Test
public void testCpuTimeUserTime() {
LlapThreadLocalStatistics before = new LlapThreadLocalStatistics(mxBean, new ArrayList<>());
Assert.assertTrue("cpuTime should be >0", before.cpuTime > 0);
Assert.assertTrue("userTime should be >0", before.userTime > 0);

LlapThreadLocalStatistics after = new LlapThreadLocalStatistics(mxBean, new ArrayList<>());
Assert.assertTrue("cpuTime should increase", after.cpuTime > before.cpuTime);
Assert.assertTrue("userTime should increase", after.userTime > before.userTime);
}

@Test
public void testCountersMergedForTheSameScheme() {
LlapThreadLocalStatistics stats = new LlapThreadLocalStatistics(mxBean,
createMockStatistics(new String[]{"file", "hdfs", "hdfs"}, new Integer[]{1, 1, 1}));
Assert.assertEquals(1, stats.schemeToThreadLocalStats.get("file").bytesRead);
Assert.assertEquals(2, stats.schemeToThreadLocalStats.get("hdfs").bytesRead);
}

@Test
public void testCountersBeforeAfter() {
LlapThreadLocalStatistics before = new LlapThreadLocalStatistics(mxBean,
createMockStatistics(new String[]{"file", "hdfs", "hdfs"}, new Integer[]{1, 1, 1}));
LlapThreadLocalStatistics after = new LlapThreadLocalStatistics(mxBean,
createMockStatistics(new String[]{"file", "hdfs", "hdfs"}, new Integer[]{3, 1, 4}));

Assert.assertEquals(1, before.schemeToThreadLocalStats.get("file").bytesRead);
Assert.assertEquals(2, before.schemeToThreadLocalStats.get("hdfs").bytesRead);
Assert.assertEquals(3, after.schemeToThreadLocalStats.get("file").bytesRead);
Assert.assertEquals(5, after.schemeToThreadLocalStats.get("hdfs").bytesRead);

after.subtract(before);

// file: 3 - 1
Assert.assertEquals(2, after.schemeToThreadLocalStats.get("file").bytesRead);
// hdfs: (1 + 4) - (1 + 1)
Assert.assertEquals(3, after.schemeToThreadLocalStats.get("hdfs").bytesRead);
}

private List<FileSystem.Statistics> createMockStatistics(String[] schemes, Integer[] values) {
return IntStream.range(0, schemes.length).mapToObj(i -> {
FileSystem.Statistics stat = new FileSystem.Statistics(schemes[i]);
stat.incrementBytesRead(values[i]);
return stat;
}).collect(Collectors.toList());
}
}
Loading

0 comments on commit f9fb1f2

Please sign in to comment.