diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java index 2d3cc4366..883cec3f8 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.io.BufferedInputStream; +import java.io.Console; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; @@ -325,6 +326,28 @@ public JSONArray generateResultsMessage() } }; + private static String[] getCredentials(CommandLine cmd) + { + Console console = System.console(); + + String user = cmd.getOptionValue("user"); + boolean userIsEmpty = user == null || user.isEmpty(); + if (userIsEmpty) + { + user = new String(console.readLine("Enter username: ")); + userIsEmpty = user == null || user.isEmpty(); + } + + String pass = cmd.getOptionValue("pass"); + boolean passIsEmpty = pass == null || pass.isEmpty(); + if (!userIsEmpty && passIsEmpty) + { + pass = new String(console.readPassword("Enter password for " + user + ": ")); + } + + return new String[] {user, pass}; + } + private static enum FileFormat { THOR, @@ -1205,8 +1228,10 @@ private static void performRead(String[] args, TaskContext context) } String connString = cmd.getOptionValue("url"); - String user = cmd.getOptionValue("user"); - String pass = cmd.getOptionValue("pass"); + + String[] creds = getCredentials(cmd); + String user = creds[0]; + String pass = creds[1]; String outputPath = cmd.getOptionValue("out","."); @@ -1383,8 +1408,10 @@ private static void performReadTest(String[] args, TaskContext context) } String connString = cmd.getOptionValue("url"); - String user = cmd.getOptionValue("user"); - String pass = cmd.getOptionValue("pass"); + + String[] creds = getCredentials(cmd); + String user = creds[0]; + String pass = creds[1]; String outputPath = cmd.getOptionValue("out","."); @@ -1592,8 +1619,9 @@ private static void performCopy(String[] args, TaskContext context) + numThreadsStr + ", must be an integer. Defaulting to: " + NUM_DEFAULT_THREADS + " threads."); } - String user = cmd.getOptionValue("user"); - String pass = cmd.getOptionValue("pass"); + String[] creds = getCredentials(cmd); + String user = creds[0]; + String pass = creds[1]; String destClusterName = cmd.getOptionValue("dest_cluster"); @@ -1773,8 +1801,9 @@ private static void performWrite(String[] args, TaskContext context) + numThreadsStr + ", must be an integer. Defaulting to: " + NUM_DEFAULT_THREADS + " threads."); } - String user = cmd.getOptionValue("user"); - String pass = cmd.getOptionValue("pass"); + String[] creds = getCredentials(cmd); + String user = creds[0]; + String pass = creds[1]; String destClusterName = cmd.getOptionValue("dest_cluster"); diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCRemoteFileWriter.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCRemoteFileWriter.java index f1f4c5322..1cd3f09de 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCRemoteFileWriter.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCRemoteFileWriter.java @@ -21,6 +21,7 @@ import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.semconv.ServerAttributes; import org.apache.logging.log4j.Logger; @@ -142,8 +143,9 @@ public HPCCRemoteFileWriter(FileWriteContext ctx, DataPartition dp, IRecordAcces this.recordAccessor = recordAccessor; - this.writeSpanName = "HPCCRemoteFileWriter.RowService/Write_" + dp.getFileName() + "_" + dp.getThisPart(); + this.writeSpanName = "HPCCRemoteFileWriter/Write_" + dp.getFileName() + "_" + dp.getThisPart(); this.writeSpan = Utils.createChildSpan(context.parentSpan, writeSpanName); + this.writeSpan.setStatus(StatusCode.OK); String primaryIP = dp.getCopyIP(0); String secondaryIP = ""; @@ -154,7 +156,7 @@ public HPCCRemoteFileWriter(FileWriteContext ctx, DataPartition dp, IRecordAcces Attributes attributes = Attributes.of( AttributeKey.stringKey("server.0.address"), primaryIP, AttributeKey.stringKey("server.1.address"), secondaryIP, - ServerAttributes.SERVER_PORT, Long.valueOf(dp.getPort())); + AttributeKey.stringKey("server.port"), Integer.toString(dp.getPort())); writeSpan.setAllAttributes(attributes); this.outputStream = new RowServiceOutputStream(dataPartition.getCopyIP(0), dataPartition.getPort(), dataPartition.getUseSsl(), @@ -181,8 +183,20 @@ public HPCCRemoteFileWriter(FileWriteContext ctx, DataPartition dp, IRecordAcces */ public void writeRecord(T record) throws Exception { - this.binaryRecordWriter.writeRecord(record); - this.recordsWritten++; + try + { + this.binaryRecordWriter.writeRecord(record); + this.recordsWritten++; + } + catch (Exception e) + { + log.error("HPCCRemoteFileWriter: Error writing record: " + e.getMessage()); + this.writeSpan.recordException(e); + this.writeSpan.setStatus(StatusCode.ERROR); + this.writeSpan.end(); + + throw e; + } } /** @@ -197,7 +211,20 @@ public void writeRecords(Iterator it) throws Exception { while (it.hasNext()) { - this.binaryRecordWriter.writeRecord(it.next()); + try + { + this.binaryRecordWriter.writeRecord(it.next()); + this.recordsWritten++; + } + catch (Exception e) + { + log.error("HPCCRemoteFileWriter: Error writing record: " + e.getMessage()); + this.writeSpan.recordException(e); + this.writeSpan.setStatus(StatusCode.ERROR); + this.writeSpan.end(); + + throw e; + } this.recordsWritten++; } } diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java index 4e0c640c4..4192349bb 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java @@ -21,6 +21,7 @@ import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.semconv.ServerAttributes; import org.apache.logging.log4j.Logger; @@ -73,6 +74,7 @@ public static class FileReadContext public int recordReadLimit = -1; public boolean createPrefetchThread = true; public int readSizeKB = -1; + public int readRequestSpanBatchSize = -1; // The number of read requests before creating a new span public Span parentSpan = null; }; @@ -266,21 +268,7 @@ public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilde this.dataPartition = dp; this.recordBuilder = recBuilder; - String readSpanName = "HPCCRemoteFileReader.RowService/Read_" + dataPartition.getFileName() + "_" + dataPartition.getThisPart(); - this.readSpan = Utils.createChildSpan(context.parentSpan, readSpanName); - - String primaryIP = dp.getCopyIP(0); - String secondaryIP = ""; - if (dp.getCopyCount() > 1) - { - secondaryIP = dp.getCopyIP(1); - } - - Attributes attributes = Attributes.of( AttributeKey.stringKey("server.0.address"), primaryIP, - AttributeKey.stringKey("server.1.address"), secondaryIP, - ServerAttributes.SERVER_PORT, Long.valueOf(dp.getPort()), - AttributeKey.longKey("read.size"), Long.valueOf(context.readSizeKB*1000)); - this.readSpan.setAllAttributes(attributes); + this.readSpan = createReadSpan(ctx, dp); if (context.originalRD == null) { @@ -304,6 +292,7 @@ public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilde this.inputStream = new RowServiceInputStream(this.dataPartition, context.originalRD, projectedRecordDefinition, context.connectTimeout, context.recordReadLimit, context.createPrefetchThread, context.readSizeKB, null, false, context.socketOpTimeoutMS, this.readSpan); + this.inputStream.setReadRequestSpanBatchSize(context.readRequestSpanBatchSize); this.binaryRecordReader = new BinaryRecordReader(this.inputStream); this.binaryRecordReader.initialize(this.recordBuilder); @@ -321,6 +310,7 @@ public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilde this.inputStream = new RowServiceInputStream(this.dataPartition, context.originalRD, projectedRecordDefinition, context.connectTimeout, context.recordReadLimit, context.createPrefetchThread, context.readSizeKB, restartInfo, false, context.socketOpTimeoutMS, this.readSpan); + this.inputStream.setReadRequestSpanBatchSize(context.readRequestSpanBatchSize); long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos; if (bytesToSkip < 0) @@ -328,6 +318,7 @@ public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilde Exception e = new Exception("Unable to restart read stream, unexpected stream position in record reader."); this.readSpan.recordException(e); this.readSpan.end(); + throw e; } this.inputStream.skip(bytesToSkip); @@ -344,6 +335,35 @@ public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilde openTimeMs = System.currentTimeMillis(); } + private static Span createReadSpan(FileReadContext context, DataPartition dp) + { + String readSpanName = "HPCCRemoteFileReader/Read_" + dp.getFileName() + "_" + dp.getThisPart(); + Span readSpan = Utils.createChildSpan(context.parentSpan, readSpanName); + readSpan.setStatus(StatusCode.OK); + + String primaryIP = dp.getCopyIP(0); + String secondaryIP = ""; + if (dp.getCopyCount() > 1) + { + secondaryIP = dp.getCopyIP(1); + } + + long readSize = context.readSizeKB; + if (readSize < 0) + { + readSize = RowServiceInputStream.DEFAULT_MAX_READ_SIZE_KB; + } + readSize *= 1000; + + Attributes attributes = Attributes.of( AttributeKey.stringKey("server.0.address"), primaryIP, + AttributeKey.stringKey("server.1.address"), secondaryIP, + AttributeKey.stringKey("server.port"), Integer.toString(dp.getPort()), + AttributeKey.longKey("read.size"), Long.valueOf(readSize)); + readSpan.setAllAttributes(attributes); + + return readSpan; + } + private boolean retryRead() { if (retryCount < maxReadRetries) @@ -364,20 +384,12 @@ private boolean retryRead() try { - String readSpanName = "HPCCRemoteFileReader.RowService/Read_" + dataPartition.getFileName() + "_" + dataPartition.getThisPart(); - if (context.parentSpan != null) - { - this.readSpan = Utils.createChildSpan(context.parentSpan, readSpanName); - } - else - { - this.readSpan = Utils.createSpan(readSpanName); - } + this.readSpan = createReadSpan(context, dataPartition); this.inputStream = new RowServiceInputStream(this.dataPartition, context.originalRD,this.recordBuilder.getRecordDefinition(), context.connectTimeout, context.recordReadLimit, context.createPrefetchThread, context.readSizeKB, restartInfo, false, context.socketOpTimeoutMS, this.readSpan); - + this.inputStream.setReadRequestSpanBatchSize(context.readRequestSpanBatchSize); long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos; if (bytesToSkip < 0) { @@ -391,6 +403,7 @@ private boolean retryRead() catch (Exception e) { this.readSpan.recordException(e); + this.readSpan.setStatus(StatusCode.ERROR); this.readSpan.end(); log.error("Failed to retry read for " + this.dataPartition.toString() + " " + e.getMessage(), e); return false; @@ -529,6 +542,10 @@ public boolean hasNext() } catch (HpccFileException e) { + this.readSpan.recordException(e); + this.readSpan.setStatus(StatusCode.ERROR); + this.readSpan.end(); + if (!retryRead()) { canReadNext = false; @@ -564,6 +581,10 @@ public T next() } catch (HpccFileException e) { + this.readSpan.recordException(e); + this.readSpan.setStatus(StatusCode.ERROR); + this.readSpan.end(); + if (!retryRead()) { log.error("Read failure for " + this.dataPartition.toString() + " " + e.getMessage(), e); diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java index fcffaa974..8345f06cc 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java @@ -24,6 +24,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.io.InputStream; +import java.sql.Timestamp; + import javax.net.SocketFactory; import javax.net.ssl.SSLSocket; import javax.net.ssl.SSLSocketFactory; @@ -55,6 +57,60 @@ */ public class RowServiceInputStream extends InputStream implements IProfilable { + private static class ReadRequestEvent + { + public long requestTime = 0; + public long requestStreamPos = 0; + public long responseTime = 0; + public int bytesRead = 0; + public int requestSize = 0; + }; + + public static class RestartInformation + { + public long streamPos = 0; + public byte[] tokenBin = null; + } + + private static class RowServiceResponse + { + int len = 0; + int errorCode = 0; + int handle = -1; + String errorMessage = null; + } + + public static final int DEFAULT_READ_REQUEST_SPAN_BATCH_SIZE = 25; + public static final int DEFAULT_CONNECT_TIMEOUT_MILIS = 5000; // 5 second connection timeout + public static final int DEFAULT_SOCKET_OP_TIMEOUT_MS = 15000; // 15 second timeout on read / write operations + + // Note: The platform may respond with more data than this if records are larger than this limit. + public static final int DEFAULT_MAX_READ_SIZE_KB = 4096; + private static final int SHORT_SLEEP_MS = 1; + private static final int LONG_WAIT_THRESHOLD_US = 100; + private static final int MAX_HOT_LOOP_NS = 10000; + + // This is used to prevent the prefetch thread from hot looping when + // the network connection is slow. The read on the socket will block until + // at least 512 bytes are available + private static final int MIN_SOCKET_READ_SIZE = 512; + + public static final String BYTES_READ_METRIC = "bytesRead"; + public static final String FIRST_BYTE_TIME_METRIC = "prefetchFirstByteTime"; + public static final String WAIT_TIME_METRIC = "parseWaitTime"; + public static final String MUTEX_WAIT_TIME_METRIC = "mutexWaitTime"; + public static final String SLEEP_TIME_METRIC = "prefetchSleepTime"; + + public static final String FETCH_START_TIME_METRIC = "fetchRequestStartTime"; + public static final String FETCH_TIME_METRIC = "fetchRequestReadTime"; + public static final String FETCH_FINISH_TIME_METRIC = "fetchRequestFinishTime"; + public static final String CLOSE_TIME_METRIC = "connectionCloseTime"; + + public static final String LONG_WAITS_METRIC = "numLongWaits"; + public static final String FETCHES_METRIC = "numFetches"; + public static final String PARTIAL_BLOCK_READS_METRIC = "numPartialBlockReads"; + public static final String BLOCK_READS_METRIC = "numBlockReads"; + private AtomicBoolean active = new AtomicBoolean(false); private AtomicBoolean closed = new AtomicBoolean(false); private boolean simulateFail = false; @@ -72,9 +128,17 @@ public class RowServiceInputStream extends InputStream implements IProfilable private java.io.DataOutputStream dos = null; private String rowServiceVersion = ""; - private Span readSpan = null; + private Span fileReadSpan = null; private String traceContextHeader = null; + private Span readRequestSpan = null; + private int readRequestCount = 0; + private int readRequestStart = 0; + private int readRequestBatchSize = DEFAULT_READ_REQUEST_SPAN_BATCH_SIZE; + + private List readRequestEvents = new ArrayList(); + private ReadRequestEvent currentReadRequestEvent = null; + private int filePartCopyIndexPointer = 0; //pointer into the prioritizedCopyIndexes struct private List prioritizedCopyIndexes = new ArrayList(); @@ -119,61 +183,18 @@ public class RowServiceInputStream extends InputStream implements IProfilable private long numBlockReads = 0; private Socket sock; - public static final int DEFAULT_CONNECT_TIMEOUT_MILIS = 5000; // 5 second connection timeout - public static final int DEFAULT_SOCKET_OP_TIMEOUT_MS = 15000; // 15 second timeout on read / write operations private int connectTimeout = DEFAULT_CONNECT_TIMEOUT_MILIS; private int socketOpTimeoutMs = DEFAULT_SOCKET_OP_TIMEOUT_MS; private static final Charset HPCCCharSet = Charset.forName("ISO-8859-1"); private static final Logger log = LogManager.getLogger(RowServiceInputStream.class); - // Note: The platform may respond with more data than this if records are larger than this limit. - private static final int DEFAULT_MAX_READ_SIZE_KB = 4096; - private static final int SHORT_SLEEP_MS = 1; - private static final int LONG_WAIT_THRESHOLD_US = 100; - private static final int MAX_HOT_LOOP_NS = 10000; - - // This is used to prevent the prefetch thread from hot looping when - // the network connection is slow. The read on the socket will block until - // at least 512 bytes are available - private static final int MIN_SOCKET_READ_SIZE = 512; - private int maxReadSizeKB = DEFAULT_MAX_READ_SIZE_KB; // Buffer compact threshold should always be smaller than buffer prefetch threshold private int bufferPrefetchThresholdKB = DEFAULT_MAX_READ_SIZE_KB/2; private int bufferCompactThresholdKB = DEFAULT_MAX_READ_SIZE_KB/4; - public static final String BYTES_READ_METRIC = "bytesRead"; - public static final String FIRST_BYTE_TIME_METRIC = "prefetchFirstByteTime"; - public static final String WAIT_TIME_METRIC = "parseWaitTime"; - public static final String MUTEX_WAIT_TIME_METRIC = "mutexWaitTime"; - public static final String SLEEP_TIME_METRIC = "prefetchSleepTime"; - - public static final String FETCH_START_TIME_METRIC = "fetchRequestStartTime"; - public static final String FETCH_TIME_METRIC = "fetchRequestReadTime"; - public static final String FETCH_FINISH_TIME_METRIC = "fetchRequestFinishTime"; - public static final String CLOSE_TIME_METRIC = "connectionCloseTime"; - - public static final String LONG_WAITS_METRIC = "numLongWaits"; - public static final String FETCHES_METRIC = "numFetches"; - public static final String PARTIAL_BLOCK_READS_METRIC = "numPartialBlockReads"; - public static final String BLOCK_READS_METRIC = "numBlockReads"; - - public static class RestartInformation - { - public long streamPos = 0; - public byte[] tokenBin = null; - } - - private static class RowServiceResponse - { - int len = 0; - int errorCode = 0; - int handle = -1; - String errorMessage = null; - } - /** * Instantiates a new row service input stream. * @@ -394,8 +415,8 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co if (rdSpan != null && rdSpan.getSpanContext().isValid()) { - this.readSpan = rdSpan; - this.traceContextHeader = org.hpccsystems.ws.client.utils.Utils.getTraceParentHeader(readSpan); + this.fileReadSpan = rdSpan; + this.traceContextHeader = org.hpccsystems.ws.client.utils.Utils.getTraceParentHeader(fileReadSpan); } int copycount = dataPart.getCopyCount(); @@ -558,16 +579,20 @@ RestartInformation getRestartInformationForStreamPos(long streamPos) return restartInfo; } - private void setPrefetchException(HpccFileException e) { this.prefetchException = e; - if (readSpan != null) + if (readRequestSpan != null) + { + readRequestSpan.recordException(e); + readRequestSpan.setStatus(StatusCode.ERROR); + } + else if (fileReadSpan != null) { Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()), ExceptionAttributes.EXCEPTION_MESSAGE, e.getMessage()); - readSpan.recordException(e, attributes); + fileReadSpan.recordException(e, attributes); } } @@ -678,6 +703,21 @@ public void setReadRequestDelay(int sleepTimeMS) this.readRequestDelayMS = sleepTimeMS; } + /** + * Sets the read request span batch size. + * + * @param batchSize the read request span batch size + */ + public void setReadRequestSpanBatchSize(int batchSize) + { + if (batchSize < 1) + { + batchSize = DEFAULT_READ_REQUEST_SPAN_BATCH_SIZE; + } + + this.readRequestBatchSize = batchSize; + } + /** * Simulate a handle failure and use the file token instead. The handle is set to an invalid value so the THOR node * will indicate that the handle is unknown and request a otken. @@ -741,11 +781,11 @@ public void startBlockingFetchRequest(List fetchOffsets) throws Exception if (inFetchingMode == false) { Exception wrappedException = new Exception("Error: attempted to start a fetch request for an input stream in sequential read mode."); - if (readSpan != null) + if (fileReadSpan != null) { Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()), ExceptionAttributes.EXCEPTION_MESSAGE, wrappedException.getMessage()); - readSpan.recordException(wrappedException, attributes); + fileReadSpan.recordException(wrappedException, attributes); } throw wrappedException; } @@ -820,6 +860,75 @@ public void startBlockingFetchRequest(List fetchOffsets) throws Exception } } + private void startNewReadRequestSpan() + { + if (fileReadSpan != null) + { + // finishReadRequestSpan will clear the readRequestSpan when it is finished + boolean shouldStartNewSpan = readRequestSpan == null; + if (shouldStartNewSpan) + { + readRequestSpan = Utils.createChildSpan(fileReadSpan, "ReadRequest[" + readRequestCount + + "," + (readRequestCount + readRequestBatchSize) + "]"); + readRequestSpan.setAttribute("server.index", getFilePartCopy()); + readRequestSpan.setStatus(StatusCode.OK); + readRequestStart = readRequestCount; + } + readRequestCount++; + + currentReadRequestEvent = new ReadRequestEvent(); + currentReadRequestEvent.requestTime = System.currentTimeMillis(); + currentReadRequestEvent.requestStreamPos = streamPos; + currentReadRequestEvent.requestSize = maxReadSizeKB*1000; + } + } + + private void finishReadRequestSpan() + { + if (readRequestSpan != null) + { + if (currentReadRequestEvent != null) + { + currentReadRequestEvent.responseTime = System.currentTimeMillis(); + currentReadRequestEvent.bytesRead = totalDataInCurrentRequest; + readRequestEvents.add(currentReadRequestEvent); + + currentReadRequestEvent = null; + } + + int batchIndex = readRequestCount % readRequestBatchSize; + if (batchIndex == 0 || isClosed()) + { + List requestTimes = new ArrayList(); + List responseTimes = new ArrayList(); + List requestSizes = new ArrayList(); + List bytesRead = new ArrayList(); + List requestStreamPos = new ArrayList(); + + for (ReadRequestEvent event : readRequestEvents) + { + requestTimes.add( (new Timestamp(event.requestTime)).toString() ); + responseTimes.add( (new Timestamp(event.responseTime)).toString() ); + requestSizes.add((long)event.requestSize); + bytesRead.add((long)event.bytesRead); + requestStreamPos.add(event.requestStreamPos); + } + readRequestEvents.clear(); + + readRequestSpan.setAttribute(AttributeKey.stringArrayKey("requestTimes"), requestTimes); + readRequestSpan.setAttribute(AttributeKey.stringArrayKey("responseTimes"), responseTimes); + readRequestSpan.setAttribute(AttributeKey.longArrayKey("requestSizes"), requestSizes); + readRequestSpan.setAttribute(AttributeKey.longArrayKey("bytesRead"), bytesRead); + readRequestSpan.setAttribute(AttributeKey.longArrayKey("requestStreamPos"), requestStreamPos); + readRequestSpan.updateName( "ReadRequest[" + readRequestStart + "," + readRequestCount + "]"); + + readRequestSpan.end(); + readRequestSpan = null; + } + } + } + + // Run from prefetch thread only private int startFetch() { @@ -841,13 +950,7 @@ private int startFetch() numFetches++; if (!this.active.get()) { - if (readSpan != null) - { - Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()), - AttributeKey.longKey("read.offset"), streamPos, - AttributeKey.longKey("read.size"), Long.valueOf(maxReadSizeKB*1000)); - readSpan.addEvent("RowServiceInputStream.readRequest", attributes); - } + startNewReadRequestSpan(); try { @@ -1054,11 +1157,11 @@ private void readDataInFetch() if (bytesToRead < 0) { IOException wrappedException = new IOException(prefix + "Encountered unexpected end of stream mid fetch, this.dis.available() returned " + bytesToRead + " bytes."); - if (readSpan != null) + if (fileReadSpan != null) { Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()), ExceptionAttributes.EXCEPTION_MESSAGE, wrappedException.getMessage()); - readSpan.recordException(wrappedException, attributes); + fileReadSpan.recordException(wrappedException, attributes); } throw wrappedException; } @@ -1147,12 +1250,8 @@ private void finishFetch() catch(Exception ie){} } - if (readSpan != null) - { - Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()), - AttributeKey.longKey("read.bytesRead"), Long.valueOf(totalDataInCurrentRequest)); - readSpan.addEvent("RowServiceInputStream.readResponse", attributes); - } + finishReadRequestSpan(); + //------------------------------------------------------------------------------ // Send read ahead request @@ -1160,6 +1259,7 @@ private void finishFetch() if (inFetchingMode == false) { + startNewReadRequestSpan(); if (readRequestDelayMS > 0) { try @@ -1391,6 +1491,7 @@ public void close() throws IOException catch(Exception e){} } + finishReadRequestSpan(); this.sendCloseFileRequest(); @@ -1696,12 +1797,15 @@ private void makeActive() throws HpccFileException this.handle = 0; String prefix = "RowServiceInputStream.makeActive, file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; - if (readSpan != null) + Span connectSpan = null; + if (fileReadSpan != null) { - Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()) ); - readSpan.addEvent("RowServiceInputStream.connect", attributes); + + connectSpan = Utils.createChildSpan(fileReadSpan, "Connect"); + connectSpan.setAttribute("server.index", getFilePartCopy()); } + boolean needsRetry = false; do { @@ -1748,11 +1852,29 @@ private void makeActive() throws HpccFileException } catch (java.net.UnknownHostException e) { - throw new HpccFileException(prefix + "Bad file part IP address or host name: " + e.getMessage(),e); + HpccFileException wrappedException = new HpccFileException(prefix + "Bad file part IP address or host name: " + e.getMessage(),e); + + if (connectSpan != null) + { + connectSpan.recordException(wrappedException); + connectSpan.setStatus(StatusCode.ERROR); + connectSpan.end(); + } + + throw wrappedException; } catch (java.io.IOException e) { - throw new HpccFileException(prefix + " error making part active:" + e.getMessage(),e); + HpccFileException wrappedException = new HpccFileException(prefix + " error making part active:" + e.getMessage(),e); + + if (connectSpan != null) + { + connectSpan.recordException(wrappedException); + connectSpan.setStatus(StatusCode.ERROR); + connectSpan.end(); + } + + throw wrappedException; } try @@ -1762,17 +1884,35 @@ private void makeActive() throws HpccFileException } catch (java.io.IOException e) { - throw new HpccFileException(prefix + " Failed to make streams for datapart:" + e.getMessage(), e); + HpccFileException wrappedException = new HpccFileException(prefix + " Failed to make streams for datapart:" + e.getMessage(), e); + + if (connectSpan != null) + { + connectSpan.recordException(wrappedException); + connectSpan.setStatus(StatusCode.ERROR); + connectSpan.end(); + } + + throw wrappedException; + } + + if (connectSpan != null) + { + connectSpan.setStatus(StatusCode.OK); + connectSpan.end(); } //------------------------------------------------------------------------------ // Check protocol version //------------------------------------------------------------------------------ - if (readSpan != null) + + Span versionSpan = null; + if (fileReadSpan != null) { - Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()) ); - readSpan.addEvent("RowServiceInputStream.versionRequest", attributes); + versionSpan = Utils.createChildSpan(fileReadSpan, "VersionRequest"); + versionSpan.setAttribute( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()) ); + versionSpan.setStatus(StatusCode.OK); } try @@ -1786,7 +1926,15 @@ private void makeActive() throws HpccFileException } catch (IOException e) { - throw new HpccFileException(prefix+ " Failed on initial remote read transfer: " + e.getMessage(),e); + HpccFileException wrappedException = new HpccFileException(prefix+ " Failed on initial remote read transfer: " + e.getMessage(),e); + if (versionSpan != null) + { + versionSpan.setStatus(StatusCode.ERROR); + versionSpan.recordException(wrappedException); + versionSpan.end(); + } + + throw wrappedException; } RowServiceResponse response = readResponse(); @@ -1805,23 +1953,31 @@ private void makeActive() throws HpccFileException } catch (IOException e) { - throw new HpccFileException(prefix + "Error while attempting to read version response:" + e.getMessage(), e); + HpccFileException wrappedException = new HpccFileException(prefix + "Error while attempting to read version response:" + e.getMessage(), e); + if (versionSpan != null) + { + versionSpan.setStatus(StatusCode.ERROR); + versionSpan.recordException(wrappedException); + versionSpan.end(); + } + + throw wrappedException; } rowServiceVersion = new String(versionBytes, HPCCCharSet); + } - if (readSpan != null) - { - Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()), - ServiceAttributes.SERVICE_VERSION, rowServiceVersion); - readSpan.addEvent("RowServiceInputStream.versionResponse", attributes); - } + if (versionSpan != null) + { + versionSpan.setAttribute(ServiceAttributes.SERVICE_VERSION, rowServiceVersion); + versionSpan.end(); } //------------------------------------------------------------------------------ // Send initial read request //------------------------------------------------------------------------------ + startNewReadRequestSpan(); try { String readTrans = null; @@ -1842,7 +1998,16 @@ private void makeActive() throws HpccFileException } catch (IOException e) { - throw new HpccFileException(prefix + " Failed on initial remote read read trans:" + e.getMessage(), e); + HpccFileException wrappedException = new HpccFileException(prefix + " Failed on initial remote read read trans:" + e.getMessage(), e); + + if (readRequestSpan != null) + { + readRequestSpan.recordException(wrappedException); + readRequestSpan.setStatus(StatusCode.ERROR); + readRequestSpan.end(); + } + + throw wrappedException; } if (CompileTimeConstants.PROFILE_CODE) @@ -2294,6 +2459,14 @@ private void sendCloseFileRequest() throws IOException return; } + Span closeSpan = null; + if (fileReadSpan != null) + { + closeSpan = Utils.createChildSpan(fileReadSpan, "CloseRequest"); + closeSpan.setAttribute("server.index", getFilePartCopy()); + closeSpan.setStatus(StatusCode.OK); + } + String closeFileRequest = makeCloseHandleRequest(); int jsonRequestLen = closeFileRequest.length(); @@ -2317,14 +2490,19 @@ private void sendCloseFileRequest() throws IOException catch (HpccFileException e) { IOException wrappedException = new IOException(prefix + "Failed to close file. Unable to read response with error: " + e.getMessage(), e); - if (readSpan != null) + if (closeSpan != null) { - Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()), - ExceptionAttributes.EXCEPTION_MESSAGE, wrappedException.getMessage()); - readSpan.recordException(wrappedException, attributes); + closeSpan.recordException(wrappedException); + closeSpan.setStatus(StatusCode.ERROR); + closeSpan.end(); } throw wrappedException; } + + if (closeSpan != null) + { + closeSpan.end(); + } } private RowServiceResponse readResponse() throws HpccFileException diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java index e21808f8d..036cd825b 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java @@ -31,7 +31,9 @@ import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.semconv.ServerAttributes; +import io.opentelemetry.semconv.ServiceAttributes; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; @@ -68,7 +70,7 @@ public class RowServiceOutputStream extends OutputStream private long handle = -1; private ByteBuffer scratchBuffer = ByteBuffer.allocate(SCRATCH_BUFFER_LEN); - private Span writeSpan = null; + private Span fileWriteSpan = null; private String traceContextHeader = null; private static class RowServiceResponse @@ -221,13 +223,13 @@ private static class RowServiceResponse * the socket connect timeout in ms (default is 5000) * @param socketOpTimeoutMS * the socket operation(read/write) timeout in ms (default is 15000) - * @param writeSpan + * @param fileWriteSpan * the opentelemetry span to use for tracing * @throws Exception * the exception */ RowServiceOutputStream(String ip, int port, boolean useSSL, String accessToken, FieldDef recordDef, int filePartIndex, String filePartPath, - CompressionAlgorithm fileCompression, int connectTimeoutMs, int sockOpTimeoutMS, Span writeSpan) throws Exception + CompressionAlgorithm fileCompression, int connectTimeoutMs, int sockOpTimeoutMS, Span fileWriteSpan) throws Exception { this.rowServiceIP = ip; this.rowServicePort = port; @@ -248,15 +250,18 @@ private static class RowServiceResponse connectTimeoutMs = DEFAULT_CONNECT_TIMEOUT_MILIS; } - if (writeSpan != null && writeSpan.getSpanContext().isValid()) + if (fileWriteSpan != null && fileWriteSpan.getSpanContext().isValid()) { - this.writeSpan = writeSpan; - this.traceContextHeader = org.hpccsystems.ws.client.utils.Utils.getTraceParentHeader(writeSpan); + this.fileWriteSpan = fileWriteSpan; + this.traceContextHeader = org.hpccsystems.ws.client.utils.Utils.getTraceParentHeader(fileWriteSpan); } - if (this.writeSpan != null) + Span connectSpan = null; + if (this.fileWriteSpan != null) { - writeSpan.addEvent("RowServiceOutputStream.connect", getServerAttributes()); + connectSpan = Utils.createChildSpan(fileWriteSpan, "Connect"); + connectSpan.setStatus(StatusCode.OK); + connectSpan.setAllAttributes(getServerAttributes()); } try @@ -302,21 +307,31 @@ private static class RowServiceResponse log.error(errorMessage); Exception wrappedException = new Exception(errorMessage, e); - if (writeSpan != null) + if (connectSpan != null) { - writeSpan.recordException(wrappedException, getServerAttributes()); + connectSpan.recordException(wrappedException, getServerAttributes()); + connectSpan.setStatus(StatusCode.ERROR); + connectSpan.end(); } throw wrappedException; } + if (connectSpan != null) + { + connectSpan.end(); + } + //------------------------------------------------------------------------------ // Check protocol version //------------------------------------------------------------------------------ - if (writeSpan != null) + Span versionSpan = null; + if (fileWriteSpan != null) { - writeSpan.addEvent("RowServiceOutputStream.versionRequest", getServerAttributes()); + versionSpan = Utils.createChildSpan(fileWriteSpan, "VersionRequest"); + versionSpan.setStatus(StatusCode.OK); + versionSpan.setAllAttributes(getServerAttributes()); } try @@ -331,9 +346,11 @@ private static class RowServiceResponse catch (IOException e) { HpccFileException wrappedException = new HpccFileException("Failed on initial remote read read trans", e); - if (writeSpan != null) + if (versionSpan != null) { - writeSpan.recordException(wrappedException, getServerAttributes()); + versionSpan.recordException(wrappedException, getServerAttributes()); + versionSpan.setStatus(StatusCode.ERROR); + versionSpan.end(); } throw wrappedException; @@ -356,9 +373,11 @@ private static class RowServiceResponse catch (IOException e) { HpccFileException wrappedException = new HpccFileException("Error while attempting to read version response.", e); - if (writeSpan != null) + if (versionSpan != null) { - writeSpan.recordException(wrappedException, getServerAttributes()); + versionSpan.recordException(wrappedException, getServerAttributes()); + versionSpan.setStatus(StatusCode.ERROR); + versionSpan.end(); } throw wrappedException; @@ -367,6 +386,12 @@ private static class RowServiceResponse rowServiceVersion = new String(versionBytes, StandardCharsets.ISO_8859_1); } + if (versionSpan != null) + { + versionSpan.setAttribute(ServiceAttributes.SERVICE_VERSION, rowServiceVersion); + versionSpan.end(); + } + // Go ahead and make the initial write request. This won't write any data to file // but it will cause the file to be opened on the remote server and keeps our access // token from expiring before we can start writing @@ -435,9 +460,9 @@ private void makeInitialWriteRequest() throws Exception if (response.errorCode != RFCCodes.RFCStreamNoError) { IOException wrappedException = new IOException(response.errorMessage); - if (writeSpan != null) + if (fileWriteSpan != null) { - writeSpan.recordException(wrappedException, getServerAttributes()); + fileWriteSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; @@ -481,9 +506,9 @@ private void sendCloseFileRequest() throws IOException catch (IOException e) { IOException wrappedException = new IOException("Failed on close file with error: ", e); - if (writeSpan != null) + if (fileWriteSpan != null) { - writeSpan.recordException(wrappedException, getServerAttributes()); + fileWriteSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; @@ -497,9 +522,9 @@ private void sendCloseFileRequest() throws IOException catch (HpccFileException e) { IOException wrappedException = new IOException("Failed to close file. Unable to read response with error: ", e); - if (writeSpan != null) + if (fileWriteSpan != null) { - writeSpan.recordException(wrappedException, getServerAttributes()); + fileWriteSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; @@ -508,9 +533,9 @@ private void sendCloseFileRequest() throws IOException if (response.errorCode != RFCCodes.RFCStreamNoError) { IOException wrappedException = new IOException(response.errorMessage); - if (writeSpan != null) + if (fileWriteSpan != null) { - writeSpan.recordException(wrappedException, getServerAttributes()); + fileWriteSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; @@ -577,9 +602,9 @@ private RowServiceResponse readResponse() throws HpccFileException if (response.len < 4) { HpccFileException wrappedException = new HpccFileException("Early data termination, no handle"); - if (writeSpan != null) + if (fileWriteSpan != null) { - writeSpan.recordException(wrappedException, getServerAttributes()); + fileWriteSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; @@ -591,9 +616,9 @@ private RowServiceResponse readResponse() throws HpccFileException catch (IOException e) { HpccFileException wrappedException = new HpccFileException("Error while attempting to read row service response: ", e); - if (writeSpan != null) + if (fileWriteSpan != null) { - writeSpan.recordException(wrappedException, getServerAttributes()); + fileWriteSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; @@ -618,9 +643,9 @@ public void close() throws IOException else if (bytesWritten == 0 && compressionAlgo != CompressionAlgorithm.NONE) { IOException wrappedException = new IOException("Fatal error while closing file. Writing compressed files with 0 length is not supported with the remote HPCC cluster."); - if (writeSpan != null) + if (fileWriteSpan != null) { - writeSpan.recordException(wrappedException, getServerAttributes()); + fileWriteSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; @@ -694,9 +719,9 @@ public void write(byte[] b, int off, int len) throws IOException catch (HpccFileException e) { IOException wrappedException = new IOException("Failed during write operation. Unable to read response with error: ", e); - if (writeSpan != null) + if (fileWriteSpan != null) { - writeSpan.recordException(wrappedException, getServerAttributes()); + fileWriteSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException; @@ -705,9 +730,9 @@ public void write(byte[] b, int off, int len) throws IOException if (response.errorCode != RFCCodes.RFCStreamNoError) { IOException wrappedException = new IOException(response.errorMessage); - if (writeSpan != null) + if (fileWriteSpan != null) { - writeSpan.recordException(wrappedException, getServerAttributes()); + fileWriteSpan.recordException(wrappedException, getServerAttributes()); } throw wrappedException;