Skip to content

Commit

Permalink
HPCC4J-611 Add OpenTelemetry tracing to dfsclient
Browse files Browse the repository at this point in the history
- Added tracing support to read pathways
- Added tracing support to write pathways

Signed-off-by: James McMullan [email protected]
  • Loading branch information
jpmcmu committed Jul 3, 2024
1 parent ce994b3 commit ea88b69
Show file tree
Hide file tree
Showing 6 changed files with 490 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@

import org.hpccsystems.commons.ecl.FieldDef;
import org.hpccsystems.commons.ecl.RecordDefinitionTranslator;

import org.hpccsystems.dfs.client.RowServiceOutputStream;
import org.hpccsystems.dfs.client.Utils;

import io.opentelemetry.api.trace.Span;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
Expand All @@ -37,6 +41,9 @@ public class HPCCRemoteFileWriter<T>
private long recordsWritten = 0;
private long openTimeMs = 0;

private Span writeSpan = null;
private String writeSpanName = null;

/**
* A remote file writer.
*
Expand Down Expand Up @@ -105,9 +112,12 @@ public HPCCRemoteFileWriter(DataPartition dp, FieldDef recordDef, IRecordAccesso

this.recordAccessor = recordAccessor;

this.writeSpanName = "HPCCRemoteFileWriter.RowService/Write_" + dp.getFileName() + "_" + dp.getThisPart();
this.writeSpan = Utils.createSpan(writeSpanName);

this.outputStream = new RowServiceOutputStream(dataPartition.getCopyIP(0), dataPartition.getPort(), dataPartition.getUseSsl(),
dataPartition.getFileAccessBlob(), this.recordDef, this.dataPartition.getThisPart(), this.dataPartition.getCopyPath(0),
fileCompression, connectTimeoutMs, socketOpTimeoutMs);
fileCompression, connectTimeoutMs, socketOpTimeoutMs, this.writeSpan);

this.binaryRecordWriter = new BinaryRecordWriter(this.outputStream);
this.binaryRecordWriter.initialize(this.recordAccessor);
Expand Down Expand Up @@ -161,6 +171,8 @@ public void close() throws Exception
this.report();
this.binaryRecordWriter.finalize();

this.writeSpan.end();

long closeTimeMs = System.currentTimeMillis();
double writeTimeS = (closeTimeMs - openTimeMs) / 1000.0;
log.info("HPCCRemoteFileWriter: Closing file part: " + dataPartition.getThisPart()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,17 @@
*******************************************************************************/
package org.hpccsystems.dfs.client;

import org.hpccsystems.dfs.client.Utils;

import org.hpccsystems.commons.ecl.FieldDef;
import org.hpccsystems.commons.ecl.RecordDefinitionTranslator;
import org.hpccsystems.commons.errors.HpccFileException;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.semconv.ServerAttributes;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

Expand Down Expand Up @@ -48,6 +56,9 @@ public class HpccRemoteFileReader<T> implements Iterator<T>
private long openTimeMs = 0;
private long recordsRead = 0;

private Span readSpan = null;
private String readSpanName = null;

public static final int NO_RECORD_LIMIT = -1;
public static final int DEFAULT_READ_SIZE_OPTION = -1;
public static final int DEFAULT_CONNECT_TIMEOUT_OPTION = -1;
Expand Down Expand Up @@ -204,6 +215,24 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde
this.createPrefetchThread = createPrefetchThread;
this.socketOpTimeoutMs = socketOpTimeoutMs;

this.readSpanName = "HPCCRemoteFileReader.RowService/Read_" + dp.getFileName() + "_" + dp.getThisPart();
this.readSpan = Utils.createSpan(readSpanName);

String IPs = "";
for (int i = 0; i < dp.getCopyCount() ; i++)
{
IPs += dp.getCopyIP(i);
if (i < dp.getCopyCount() - 1)
{
IPs += ",";
}
}

Attributes attributes = Attributes.of( ServerAttributes.SERVER_ADDRESS, IPs,
ServerAttributes.SERVER_PORT, Long.valueOf(dp.getPort()),
AttributeKey.longKey("read.size"), Long.valueOf(readSizeKB*1000));
readSpan.setAllAttributes(attributes);

if (connectTimeout < 1)
{
connectTimeout = RowServiceInputStream.DEFAULT_CONNECT_TIMEOUT_MILIS;
Expand All @@ -212,18 +241,24 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde

if (this.originalRecordDef == null)
{
throw new Exception("HpccRemoteFileReader: Provided original record definition is null, original record definition is required.");
Exception e = new Exception("HpccRemoteFileReader: Provided original record definition is null, original record definition is required.");
this.readSpan.recordException(e);
this.readSpan.end();
throw e;
}

FieldDef projectedRecordDefinition = recBuilder.getRecordDefinition();
if (projectedRecordDefinition == null)
{
throw new Exception("IRecordBuilder does not have a valid record definition.");
Exception e = new Exception("IRecordBuilder does not have a valid record definition.");
this.readSpan.recordException(e);
this.readSpan.end();
throw e;
}

if (resumeInfo == null)
{
this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB, null, false, socketOpTimeoutMs);
this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB, null, false, socketOpTimeoutMs, readSpan);
this.binaryRecordReader = new BinaryRecordReader(this.inputStream);
this.binaryRecordReader.initialize(this.recordBuilder);

Expand All @@ -238,11 +273,14 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde
restartInfo.streamPos = resumeInfo.inputStreamPos;
restartInfo.tokenBin = resumeInfo.tokenBin;

this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB, restartInfo, false, socketOpTimeoutMs);
this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB, restartInfo, false, socketOpTimeoutMs, this.readSpan);

long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos;
if (bytesToSkip < 0)
{
throw new Exception("Unable to restart unexpected stream pos in record reader.");
Exception e = new Exception("Unable to restart read stream, unexpected stream position in record reader.");
this.readSpan.recordException(e);
this.readSpan.end();
}
this.inputStream.skip(bytesToSkip);

Expand Down Expand Up @@ -279,9 +317,11 @@ private boolean retryRead()

try
{
this.readSpan = Utils.createSpan(readSpanName);
this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef,
this.recordBuilder.getRecordDefinition(), this.connectTimeout, this.limit, this.createPrefetchThread,
this.readSizeKB, restartInfo, false, this.socketOpTimeoutMs);
this.readSizeKB, restartInfo, false, this.socketOpTimeoutMs, this.readSpan);

long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos;
if (bytesToSkip < 0)
{
Expand All @@ -294,6 +334,8 @@ private boolean retryRead()
}
catch (Exception e)
{
this.readSpan.recordException(e);
this.readSpan.end();
log.error("Failed to retry read for " + this.dataPartition.toString() + " " + e.getMessage(), e);
return false;
}
Expand Down Expand Up @@ -499,7 +541,9 @@ public void close() throws Exception
return;
}

this.readSpan.end();
report();

this.inputStream.close();
isClosed = true;

Expand Down
Loading

0 comments on commit ea88b69

Please sign in to comment.