Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Brownsys pivottrace 210 beta #1

Open
wants to merge 115 commits into
base: branch-2.1.0-beta
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
115 commits
Select commit Hold shift + click to select a range
c52b51d
Applied XTrace patch
JonathanMace Sep 23, 2013
77cf144
Removed unnecessary import
JonathanMace Oct 2, 2013
9f994bf
Instrumented the WritableRpcEngine. I'm not sure where this is used (as
JonathanMace Oct 2, 2013
c2469a1
Add XTrace metadata to the data transfer header protos
JonathanMace Oct 2, 2013
629774f
Start tracing commands when initiated from the filesystem.
JonathanMace Oct 2, 2013
1a39a7f
Changed the DataNode receiver to start traces or join traces.
JonathanMace Oct 2, 2013
7fb8922
Added XTrace metadata to the block and packet transfer headers
JonathanMace Oct 3, 2013
a9c3ad7
Instrumented the sender/receiver for downloading files from HDFS
JonathanMace Oct 3, 2013
494f1e8
Add check for xtrace metadata
JonathanMace Oct 4, 2013
17848a4
Move start trace to the DataXceiver and add a little instrumentation for
JonathanMace Oct 4, 2013
6c04b81
Add XTrace instrumentation to pipeline write acks
JonathanMace Oct 4, 2013
58e6a9b
Instrument code that mirrors data when pipelining to in-place modify the
JonathanMace Oct 4, 2013
726d9a0
Removed unnecessary instrumentation
JonathanMace Oct 4, 2013
cec72b7
Minor mistakes in the xtrace log statements
JonathanMace Oct 4, 2013
49cc05b
Tiny bits of additional instrumentation
JonathanMace Oct 5, 2013
35e2e17
Changed location of 'close' log messages, since close can be called
JonathanMace Oct 5, 2013
ce8a39e
Finished implementation of file writes from client side; mostly
JonathanMace Oct 8, 2013
74d8f91
Instrumentation bugfix
JonathanMace Oct 8, 2013
d62da51
Instrumented a couple more protobuf messages, added a utility class to
JonathanMace Oct 10, 2013
9bea5f9
Fixed a context joining problem in the block receiver
JonathanMace Oct 11, 2013
ed479e2
Instrumented start and end of many DFSClient api function calls
JonathanMace Oct 11, 2013
327e390
Added some resource tracing events
JonathanMace Oct 12, 2013
1c8aa47
Added an extra log statement to the DFSOutputStream
JonathanMace Oct 16, 2013
198eb9e
Added log statement to indicate that we're forcing a namenode block
JonathanMace Oct 16, 2013
c93495b
Added some instrumentation of some of the main locks used on the
JonathanMace Oct 16, 2013
6307cc5
Removed some annoying unnecessary RPC log messages, and added a 'name'
JonathanMace Oct 16, 2013
84e597d
Changed the startTrace logpoints in DataXceiver to just logEvents.
JonathanMace Oct 17, 2013
bc4a096
Use XTraceResourceTracing branching API to log the explicit computation
JonathanMace Oct 17, 2013
589c555
Slightly modify server to extend the boundary of where the xtrace
JonathanMace Oct 18, 2013
ea700a5
Removed all resource tracing stuff, which will be moved to a new
JonathanMace Oct 21, 2013
ba3446c
Added dependency to XResourceTracing
JonathanMace Oct 21, 2013
9470d96
Fixed an XTrace logging bug
Oct 26, 2013
7b09e09
Temporarily disabled code to propagate metadata between replicas, as …
JonathanMace Oct 29, 2013
ff9ca2b
Merge branch 'brownsys-rtrace-210-beta' of github.com:brownsys/browns…
JonathanMace Oct 29, 2013
810dc4a
Fix for the mysterious bug that was causing checksum inconsistencies.…
JonathanMace Oct 29, 2013
762bae6
Some small instrumentation tweaks
JonathanMace Oct 29, 2013
d01c343
Added XResourceTracing as a bootclasspath option
JonathanMace Nov 12, 2013
c10d329
Added the necessary command line arguments to put xresourcetracing on…
JonathanMace Nov 12, 2013
739d0aa
We can now put xresourcetracing on the bootclasspath. It only takes …
JonathanMace Nov 13, 2013
7b2fded
Use a more accurate estimate for the PacketHeader size, now taking in…
JonathanMace Nov 19, 2013
242285c
Removed some hard-coded xtrace context passing that is now handled ge…
JonathanMace Nov 22, 2013
0145e24
Clear the thread context in lease renewer, which is a long-lived thre…
JonathanMace Dec 11, 2013
1c0d4ab
Fix to add causality when the sending thread has to wait to receive ACKs
JonathanMace Jan 16, 2014
c907d34
Fixed a bug - using less than instead of greater than
JonathanMace Jan 16, 2014
0c858b7
Removed and modified some of the XTraceContext.startTrace events, bec…
JonathanMace Jan 27, 2014
55bbdc6
Commented out the inclusion of resource tracing on the bootclasspath;…
JonathanMace Jan 30, 2014
580097c
Commented out bootclasspath stuff, since for now we don't want or nee…
JonathanMace Feb 14, 2014
6521915
More network instrumentation for RPC calls
JonathanMace Feb 18, 2014
e642c28
Added a few entries to the default hdfs config; by default, there are…
JonathanMace Feb 18, 2014
3320391
Testing the write speed with a lower cache drop behind buffer lag
JonathanMace Feb 18, 2014
4db14a2
Applied broken pom patch from HADOOP-10110; hasn't affected us thus f…
JonathanMace Feb 28, 2014
a7019c7
Temporary instrumentation adding additional events to DataXceiver
JonathanMace Mar 11, 2014
8af128d
Undo previous commit
JonathanMace Mar 11, 2014
e9e1598
Add some more temporary logging
JonathanMace Mar 11, 2014
d8d8727
Removed some logging that was temporary
JonathanMace Mar 11, 2014
a5f52b8
Revert "Removed some logging that was temporary"
JonathanMace Mar 11, 2014
a87d268
Migration from X-Trace 2.0 to X-Trace 3.0. Preliminary commit of unt…
JonathanMace Mar 24, 2014
8d969af
Small fix
JonathanMace Mar 25, 2014
e61df71
Peer cache is long lived, don't attribute to first task we see
JonathanMace Mar 25, 2014
577dc10
Whoops... didn't commit this properly...
JonathanMace Mar 25, 2014
9c25021
Add a few log messages to NativeIO, why not
JonathanMace Mar 26, 2014
4f30e53
Instrument call queue in IPC Server
JonathanMace Mar 26, 2014
630659b
Slight changes to the RPC server and client response sending/processi…
JonathanMace Apr 1, 2014
7fd7a39
Add ability to have connection-per-client in a single process
JonathanMace Apr 1, 2014
5c48be6
For now, comment out the random sleep if a complete call fails after …
JonathanMace Apr 21, 2014
9d9fe90
Add 5ms sleep instead of 400ms sleep
JonathanMace Apr 21, 2014
055a6db
Add instrumentation of DN heartbeats
JonathanMace Apr 23, 2014
d1a31b7
Bad import removed
JonathanMace Apr 23, 2014
3f7f7a8
Instrument more background tasks
JonathanMace Apr 23, 2014
411f677
Add a hacky addition to allow kinda throttling of background block re…
JonathanMace Apr 24, 2014
8ffed3c
Oh, and make methods static
JonathanMace Apr 24, 2014
99afc5b
Fix divide by zero exception
JonathanMace Apr 27, 2014
bd81bf5
Try alternative approach to replication throttling using the balancer…
JonathanMace Apr 27, 2014
6f1ccc4
Also default to large balancer bandwidth
JonathanMace Apr 27, 2014
9e22b3c
Use both approaches simultaneously!
JonathanMace Apr 28, 2014
f0ee65f
Removed balancer bandwidth - unnecessary
JonathanMace Apr 30, 2014
b4d6e3e
Moved the AspectJ stuff to the root pom. I'm really not sure which o…
JonathanMace Jun 5, 2014
b36e2f5
Indentation in pom
JonathanMace Jun 5, 2014
8cd00ff
Add showWeaveInfo=true to pom
JonathanMace Jul 25, 2014
7332cde
Add throttling points to HDFS
JonathanMace Aug 13, 2014
ab3d98e
Rename the throttling point on the RPC server
JonathanMace Aug 15, 2014
2478beb
Put servername in the call queue to allow multiple servers
JonathanMace Sep 1, 2014
3a98346
Fix up the propagation of X-Trace metadata from NodeManager to Contai…
JonathanMace Sep 1, 2014
3750df3
Let the examples in the examples jar start X-Trace tasks, useful for …
JonathanMace Sep 1, 2014
fcdbe7f
Base64 encoding isn't quite compatible with HTTP headers because it u…
JonathanMace Sep 1, 2014
050428e
Fixed up the inclusion of xtrace metadata in the shuffle header
JonathanMace Sep 1, 2014
2636730
Put throttling queue only on NN server for now
JonathanMace Sep 3, 2014
fa384e1
Only add call queue instrumentation for namenode for now
JonathanMace Sep 4, 2014
18f9ecb
Foolishly forgot to use base 16 decoding of XTrace header in shuffle …
JonathanMace Sep 15, 2014
5f9ec89
Add throttling point to ShuffleHandler and to RawFileSystem
JonathanMace Sep 15, 2014
bd2fce1
Add some special handling for MR shuffle handler network output
JonathanMace Sep 16, 2014
28076ab
Remove previous commit instrumentation of shuffle handler, was too much
JonathanMace Sep 16, 2014
0cbaa37
Remove throttling point from local file system, put in spill thread
JonathanMace Sep 17, 2014
4694871
Add some cpu tracking... test
JonathanMace Sep 17, 2014
0acf0bc
Put throttling points in new position
JonathanMace Sep 18, 2014
e8d08a8
remove spill throttler
JonathanMace Sep 18, 2014
a95b2ed
Share throttling point for ifile
JonathanMace Sep 19, 2014
2ad2121
Do this manually...
JonathanMace Sep 20, 2014
d61ef05
Manually disable datanode hostname check. Later versions of HDFS make…
JonathanMace Oct 21, 2014
4fbbbbc
Add HDFS config option to specify whether to fadvise long files
JonathanMace Dec 10, 2014
220170e
Add some logging for datanode selection
JonathanMace Dec 10, 2014
890e326
Revert "Add some logging for datanode selection"
JonathanMace Dec 10, 2014
e7b6020
Applied XTrace patch
JonathanMace Sep 23, 2013
eb13a29
Merge branch 'brownsys-rtrace-210-beta' of github.com:brownsys/browns…
JonathanMace Dec 10, 2014
78ed44e
Temporary random shuffle of datanodes
JonathanMace Dec 10, 2014
a9f3d0f
Remove non-utf8 character
JonathanMace Dec 10, 2014
a572a0a
added mvn dependency on PivotTracing from retro
Dec 22, 2014
3e7f574
Add pivot tracing to aspectj libraries
JonathanMace Dec 22, 2014
0172dbc
Add pivot tracing init to HDFS main methods
JonathanMace Mar 8, 2015
32f8d5f
Add extra room in the packet header for pivot tracing baggage
JonathanMace Mar 8, 2015
0410933
Add agentlib to HDFS command
JonathanMace Mar 16, 2015
2a72991
Add command line option to set mapreduce tenant class
JonathanMace Mar 17, 2015
1e7cef6
Merge branch 'brownsys-pivottrace-210-beta' of github.com:brownsys/br…
JonathanMace Mar 17, 2015
9c4f15b
Fix partial accidental commit
JonathanMace Mar 17, 2015
fb0c798
Why am i such an idiot
JonathanMace Mar 17, 2015
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions hadoop-common-project/hadoop-auth/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,17 @@
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
<scope>test</scope>
</dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,4 +289,10 @@ if not "%HADOOP_MAPRED_HOME%\%MAPRED_DIR%" == "%HADOOP_YARN_HOME%\%YARN_DIR%" (
set CLASSPATH=!CLASSPATH!;%HADOOP_MAPRED_HOME%\%MAPRED_DIR%\*
)

@rem XResourceTracing - add the XBootclasspath jars
@rem set HADOOP_OPTS=%HADOOP_OPTS% -Xbootclasspath/p:%HADOOP_COMMON_HOME%\%HADOOP_COMMON_LIB_JARS_DIR%\xtrace-resource-tracing-1.0.jar
@rem set HADOOP_OPTS=%HADOOP_OPTS% -Xbootclasspath/p:%HADOOP_COMMON_HOME%\%HADOOP_COMMON_LIB_JARS_DIR%\xtrace-2.1-20120824.jar
@rem set HADOOP_OPTS=%HADOOP_OPTS% -Xbootclasspath/p:%HADOOP_COMMON_HOME%\%HADOOP_COMMON_LIB_JARS_DIR%\log4j-1.2.17.jar
@rem set HADOOP_OPTS=%HADOOP_OPTS% -Xbootclasspath/p:%HADOOP_COMMON_HOME%\%HADOOP_COMMON_LIB_JARS_DIR%\aspectjrt-1.7.3.jar

:eof
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,7 @@ if [ "$HADOOP_CLASSPATH" != "" ]; then
fi
fi

#HADOOP_OPTS="$HADOOP_OPTS -Xbootclasspath/p:${HADOOP_COMMON_LIB_JARS_DIR}/xtrace-resource-tracing-1.0.jar"
#HADOOP_OPTS="$HADOOP_OPTS -Xbootclasspath/p:${HADOOP_COMMON_LIB_JARS_DIR}/xtrace-2.1-20120824.jar"
#HADOOP_OPTS="$HADOOP_OPTS -Xbootclasspath/p:${HADOOP_COMMON_LIB_JARS_DIR}/log4j-1.2.17.jar"
#HADOOP_OPTS="$HADOOP_OPTS -Xbootclasspath/p:${HADOOP_COMMON_LIB_JARS_DIR}/aspectjrt-1.7.3.jar"
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public synchronized void write(byte b[], int off, int len)

for (int n=0;n<len;n+=write1(b, off+n, len-n)) {
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,15 @@
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.google.common.base.Joiner;

import edu.brown.cs.systems.xtrace.XTrace;

/** Provide command line access to a FileSystem. */
@InterfaceAudience.Private
public class FsShell extends Configured implements Tool {

static final XTrace.Logger XTRACE = XTrace.getLogger(FsShell.class);
static final Log LOG = LogFactory.getLog(FsShell.class);

private FileSystem fs;
Expand Down Expand Up @@ -247,22 +252,28 @@ public int run(String argv[]) throws Exception {
} else {
String cmd = argv[0];
Command instance = null;
XTrace.startTask(true);
XTrace.setTenantClass(0);
XTRACE.tag("Executing command", Joiner.on(" ").join(argv));
try {
instance = commandFactory.getInstance(cmd);
if (instance == null) {
throw new UnknownCommandException();
}
exitCode = instance.run(Arrays.copyOfRange(argv, 1, argv.length));
XTRACE.log("Finished executing command");
} catch (IllegalArgumentException e) {
displayError(cmd, e.getLocalizedMessage());
if (instance != null) {
printInstanceUsage(System.err, instance);
}
XTRACE.log("Command failed due to illegal argument");
} catch (Exception e) {
// instance.run catches IOE, so something is REALLY wrong if here
LOG.debug("Error", e);
displayError(cmd, "Fatal internal error");
e.printStackTrace(System.err);
XTRACE.log("Fatal internal error", "Message", e.getMessage());
}
}
return exitCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
@InterfaceAudience.Public
@InterfaceStability.Stable
public class RawLocalFileSystem extends FileSystem {

static final URI NAME = URI.create("file:///");
private Path workingDir;

Expand Down Expand Up @@ -123,7 +124,7 @@ class LocalFSFileInputStream extends FSInputStream implements HasFileDescriptor
private long position;

public LocalFSFileInputStream(Path f) throws IOException {
this.fis = new TrackingFileInputStream(pathToFile(f));
this.fis = new FileInputStream(pathToFile(f));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -34,8 +36,7 @@
import org.apache.hadoop.util.NativeCodeLoader;
import org.apache.hadoop.util.Shell;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.brown.cs.systems.xtrace.XTrace;

/**
* JNI wrappers for various native IO-related calls not available in Java.
Expand Down Expand Up @@ -90,6 +91,7 @@ public static class POSIX {
write. */
public static final int SYNC_FILE_RANGE_WAIT_AFTER = 4;

private static final XTrace.Logger xtrace = XTrace.getLogger(NativeIO.class);
private static final Log LOG = LogFactory.getLog(NativeIO.class);

private static boolean nativeLoaded = false;
Expand Down Expand Up @@ -184,6 +186,7 @@ public static void posixFadviseIfPossible(
if (nativeLoaded && fadvisePossible) {
try {
posix_fadvise(fd, offset, len, flags);
xtrace.log("posix_fadvise", "offset", offset, "len", len, "flags", flags);
} catch (UnsupportedOperationException uoe) {
fadvisePossible = false;
} catch (UnsatisfiedLinkError ule) {
Expand All @@ -205,6 +208,7 @@ public static void syncFileRangeIfPossible(
if (nativeLoaded && syncFileRangePossible) {
try {
sync_file_range(fd, offset, nbytes, flags);
xtrace.log("sync_file_range", "offset", offset, "nbytes", nbytes, "flags", flags);
} catch (UnsupportedOperationException uoe) {
syncFileRangePossible = false;
} catch (UnsatisfiedLinkError ule) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.hadoop.ipc;

import static org.apache.hadoop.ipc.RpcConstants.*;
import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
Expand Down Expand Up @@ -92,8 +92,13 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.ByteString;
import com.google.protobuf.CodedOutputStream;

import edu.brown.cs.systems.resourcetracing.resources.Network;
import edu.brown.cs.systems.xtrace.Context;
import edu.brown.cs.systems.xtrace.XTrace;

/** A client for an IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
* a port and is defined by a parameter class and a value class.
Expand All @@ -102,6 +107,7 @@
*/
public class Client {

public static final XTrace.Logger xtrace = XTrace.getLogger(Client.class);
public static final Log LOG = LogFactory.getLog(Client.class);

/** A counter for generating call IDs. */
Expand Down Expand Up @@ -238,6 +244,9 @@ void checkResponse(RpcResponseHeaderProto header) throws IOException {
}
}
}
if (header.hasXtrace()) {
XTrace.set(header.getXtrace().toByteArray());
}
}

Call createCall(RPC.RpcKind rpcKind, Writable rpcRequest) {
Expand All @@ -255,6 +264,7 @@ static class Call {
IOException error; // exception, null if success
final RPC.RpcKind rpcKind; // Rpc EngineKind
boolean done; // true when call is done
Context xtrace; // X-Trace context for the return

private Call(RPC.RpcKind rpcKind, Writable param) {
this.rpcKind = rpcKind;
Expand Down Expand Up @@ -631,6 +641,8 @@ private synchronized void setupIOstreams() {
if (socket != null || shouldCloseConnection.get()) {
return;
}
xtrace.log("Connecting to server");
Context start_context = XTrace.get();
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to "+server);
Expand Down Expand Up @@ -702,9 +714,13 @@ public AuthMethod run()
// start the receiver thread after the socket connection has been set
// up
start();
XTrace.join(start_context);
xtrace.log("Connected to server");
return;
}
} catch (Throwable t) {
XTrace.join(start_context);
xtrace.log("Failed to connect to server: "+t.getClass().getName(), "Message", t.getMessage());
if (t instanceof IOException) {
markClosed((IOException)t);
} else {
Expand Down Expand Up @@ -882,10 +898,13 @@ private synchronized void sendPing() throws IOException {

@Override
public void run() {
// XTrace: this is a long-lived thread that is lazily started by the first client.
// Clear any thread contexts that may have been set after the thread begins
xtrace.log("RPC Response reader thread started");
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": starting, having connections "
+ connections.size());

XTrace.stop();
try {
while (waitForWork()) {//wait here for work - read or close connection
receiveRpcResponse();
Expand Down Expand Up @@ -946,6 +965,7 @@ public void run() {

if (LOG.isDebugEnabled())
LOG.debug(getName() + " sending #" + call.id);
xtrace.log("Client senderFuture sending call");

byte[] data = d.getData();
int totalLength = d.getLength();
Expand Down Expand Up @@ -991,24 +1011,36 @@ private void receiveRpcResponse() {
}
touch();

XTrace.stop();
try {
int totalLen = in.readInt();
RpcResponseHeaderProto header =
RpcResponseHeaderProto.parseDelimitedFrom(in);
Network.ignore(true);
int totalLen;
RpcResponseHeaderProto header;
try {
totalLen = in.readInt();
header = RpcResponseHeaderProto.parseDelimitedFrom(in);
} finally {
Network.ignore(false);
}
checkResponse(header);

int headerLen = header.getSerializedSize();
headerLen += CodedOutputStream.computeRawVarint32Size(headerLen);

Network.Read.alreadyStarted(in);
Network.Read.alreadyFinished(in, headerLen);

int callId = header.getCallId();
if (LOG.isDebugEnabled())
LOG.debug(getName() + " got value #" + callId);

Call call = calls.get(callId);
Call call = calls.get(callId);
RpcStatusProto status = header.getStatus();

if (status == RpcStatusProto.SUCCESS) {
Writable value = ReflectionUtils.newInstance(valueClass, conf);
value.readFields(in); // read value
call.xtrace = XTrace.get();
call.setRpcResponse(value);
calls.remove(callId);

Expand Down Expand Up @@ -1053,6 +1085,8 @@ private void receiveRpcResponse() {
}
} catch (IOException e) {
markClosed(e);
} finally {
XTrace.stop();
}
}

Expand Down Expand Up @@ -1316,6 +1350,8 @@ public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
ConnectionId remoteId, int serviceClass) throws IOException {
final Call call = createCall(rpcKind, rpcRequest);
Connection connection = getConnection(remoteId, call, serviceClass);
xtrace.log("Sending RPC request", "Call ID", call.id);
call.xtrace = XTrace.get();
try {
connection.sendRpcRequest(call); // send the rpc request
} catch (RejectedExecutionException e) {
Expand All @@ -1325,7 +1361,6 @@ public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
LOG.warn("interrupted waiting to send rpc request to server", e);
throw new IOException(e);
}

boolean interrupted = false;
synchronized (call) {
while (!call.done) {
Expand All @@ -1342,19 +1377,24 @@ public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
Thread.currentThread().interrupt();
}

XTrace.join(call.xtrace);
if (call.error != null) {
if (call.error instanceof RemoteException) {
call.error.fillInStackTrace();
xtrace.log("RPC response received remote exception", "Call ID", call.id, "Message", call.error.getMessage());
throw call.error;
} else { // local exception
InetSocketAddress address = connection.getRemoteAddress();
throw NetUtils.wrapException(address.getHostName(),
IOException e = NetUtils.wrapException(address.getHostName(),
address.getPort(),
NetUtils.getHostname(),
0,
call.error);
xtrace.log("Local exception handling RPC response", "Call ID", call.id, "Message", e.getMessage());
throw e;
}
} else {
xtrace.log("Received RPC response", "Call ID", call.id);
return call.getRpcResponse();
}
}
Expand Down
Loading