Skip to content

Commit

Permalink
HPCC4J-636 DFSClient: Improve Opentelemetry tracing
Browse files Browse the repository at this point in the history
- Improved span names
- Transitioned read request events to read spans
- Move events to spans for connect, version and close
- Added span batch support

Signed-off-by: James McMullan [email protected]
  • Loading branch information
jpmcmu committed Aug 30, 2024
1 parent 9e73d7c commit 2d1ef88
Show file tree
Hide file tree
Showing 4 changed files with 386 additions and 158 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.semconv.ServerAttributes;

import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -142,8 +143,9 @@ public HPCCRemoteFileWriter(FileWriteContext ctx, DataPartition dp, IRecordAcces

this.recordAccessor = recordAccessor;

this.writeSpanName = "HPCCRemoteFileWriter.RowService/Write_" + dp.getFileName() + "_" + dp.getThisPart();
this.writeSpanName = "HPCCRemoteFileWriter/Write_" + dp.getFileName() + "_" + dp.getThisPart();
this.writeSpan = Utils.createChildSpan(context.parentSpan, writeSpanName);
this.writeSpan.setStatus(StatusCode.OK);

String primaryIP = dp.getCopyIP(0);
String secondaryIP = "";
Expand Down Expand Up @@ -181,8 +183,20 @@ public HPCCRemoteFileWriter(FileWriteContext ctx, DataPartition dp, IRecordAcces
*/
public void writeRecord(T record) throws Exception
{
this.binaryRecordWriter.writeRecord(record);
this.recordsWritten++;
try
{
this.binaryRecordWriter.writeRecord(record);
this.recordsWritten++;
}
catch (Exception e)
{
log.error("HPCCRemoteFileWriter: Error writing record: " + e.getMessage());
this.writeSpan.recordException(e);
this.writeSpan.setStatus(StatusCode.ERROR);
this.writeSpan.end();

throw e;
}
}

/**
Expand All @@ -197,7 +211,20 @@ public void writeRecords(Iterator<T> it) throws Exception
{
while (it.hasNext())
{
this.binaryRecordWriter.writeRecord(it.next());
try
{
this.binaryRecordWriter.writeRecord(it.next());
this.recordsWritten++;
}
catch (Exception e)
{
log.error("HPCCRemoteFileWriter: Error writing record: " + e.getMessage());
this.writeSpan.recordException(e);
this.writeSpan.setStatus(StatusCode.ERROR);
this.writeSpan.end();

throw e;
}
this.recordsWritten++;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.semconv.ServerAttributes;

import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -73,6 +74,7 @@ public static class FileReadContext
public int recordReadLimit = -1;
public boolean createPrefetchThread = true;
public int readSizeKB = -1;
public int readRequestSpanBatchSize = -1; // The number of read requests before creating a new span
public Span parentSpan = null;
};

Expand Down Expand Up @@ -266,21 +268,7 @@ public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilde
this.dataPartition = dp;
this.recordBuilder = recBuilder;

String readSpanName = "HPCCRemoteFileReader.RowService/Read_" + dataPartition.getFileName() + "_" + dataPartition.getThisPart();
this.readSpan = Utils.createChildSpan(context.parentSpan, readSpanName);

String primaryIP = dp.getCopyIP(0);
String secondaryIP = "";
if (dp.getCopyCount() > 1)
{
secondaryIP = dp.getCopyIP(1);
}

Attributes attributes = Attributes.of( AttributeKey.stringKey("server.0.address"), primaryIP,
AttributeKey.stringKey("server.1.address"), secondaryIP,
ServerAttributes.SERVER_PORT, Long.valueOf(dp.getPort()),
AttributeKey.longKey("read.size"), Long.valueOf(context.readSizeKB*1000));
this.readSpan.setAllAttributes(attributes);
this.readSpan = createReadSpan(ctx, dp);

if (context.originalRD == null)
{
Expand All @@ -304,6 +292,7 @@ public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilde
this.inputStream = new RowServiceInputStream(this.dataPartition, context.originalRD, projectedRecordDefinition, context.connectTimeout,
context.recordReadLimit, context.createPrefetchThread, context.readSizeKB, null,
false, context.socketOpTimeoutMS, this.readSpan);
this.inputStream.setReadRequestSpanBatchSize(context.readRequestSpanBatchSize);
this.binaryRecordReader = new BinaryRecordReader(this.inputStream);
this.binaryRecordReader.initialize(this.recordBuilder);

Expand All @@ -321,13 +310,15 @@ public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilde
this.inputStream = new RowServiceInputStream(this.dataPartition, context.originalRD, projectedRecordDefinition, context.connectTimeout,
context.recordReadLimit, context.createPrefetchThread, context.readSizeKB, restartInfo,
false, context.socketOpTimeoutMS, this.readSpan);
this.inputStream.setReadRequestSpanBatchSize(context.readRequestSpanBatchSize);

long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos;
if (bytesToSkip < 0)
{
Exception e = new Exception("Unable to restart read stream, unexpected stream position in record reader.");
this.readSpan.recordException(e);
this.readSpan.end();
throw e;
}
this.inputStream.skip(bytesToSkip);

Expand All @@ -344,6 +335,35 @@ public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilde
openTimeMs = System.currentTimeMillis();
}

private static Span createReadSpan(FileReadContext context, DataPartition dp)
{
String readSpanName = "HPCCRemoteFileReader/Read_" + dp.getFileName() + "_" + dp.getThisPart();
Span readSpan = Utils.createChildSpan(context.parentSpan, readSpanName);
readSpan.setStatus(StatusCode.OK);

String primaryIP = dp.getCopyIP(0);
String secondaryIP = "";
if (dp.getCopyCount() > 1)
{
secondaryIP = dp.getCopyIP(1);
}

long readSize = context.readSizeKB;
if (readSize < 0)
{
readSize = RowServiceInputStream.DEFAULT_MAX_READ_SIZE_KB;
}
readSize *= 1000;

Attributes attributes = Attributes.of( AttributeKey.stringKey("server.0.address"), primaryIP,
AttributeKey.stringKey("server.1.address"), secondaryIP,
ServerAttributes.SERVER_PORT, Long.valueOf(dp.getPort()),
AttributeKey.longKey("read.size"), Long.valueOf(readSize));
readSpan.setAllAttributes(attributes);

return readSpan;
}

private boolean retryRead()
{
if (retryCount < maxReadRetries)
Expand All @@ -364,20 +384,12 @@ private boolean retryRead()

try
{
String readSpanName = "HPCCRemoteFileReader.RowService/Read_" + dataPartition.getFileName() + "_" + dataPartition.getThisPart();
if (context.parentSpan != null)
{
this.readSpan = Utils.createChildSpan(context.parentSpan, readSpanName);
}
else
{
this.readSpan = Utils.createSpan(readSpanName);
}
this.readSpan = createReadSpan(context, dataPartition);

this.inputStream = new RowServiceInputStream(this.dataPartition, context.originalRD,this.recordBuilder.getRecordDefinition(),
context.connectTimeout, context.recordReadLimit, context.createPrefetchThread,
context.readSizeKB, restartInfo, false, context.socketOpTimeoutMS, this.readSpan);

this.inputStream.setReadRequestSpanBatchSize(context.readRequestSpanBatchSize);
long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos;
if (bytesToSkip < 0)
{
Expand All @@ -391,6 +403,7 @@ private boolean retryRead()
catch (Exception e)
{
this.readSpan.recordException(e);
this.readSpan.setStatus(StatusCode.ERROR);
this.readSpan.end();
log.error("Failed to retry read for " + this.dataPartition.toString() + " " + e.getMessage(), e);
return false;
Expand Down Expand Up @@ -529,6 +542,10 @@ public boolean hasNext()
}
catch (HpccFileException e)
{
this.readSpan.recordException(e);
this.readSpan.setStatus(StatusCode.ERROR);
this.readSpan.end();

if (!retryRead())
{
canReadNext = false;
Expand Down Expand Up @@ -564,6 +581,10 @@ public T next()
}
catch (HpccFileException e)
{
this.readSpan.recordException(e);
this.readSpan.setStatus(StatusCode.ERROR);
this.readSpan.end();

if (!retryRead())
{
log.error("Read failure for " + this.dataPartition.toString() + " " + e.getMessage(), e);
Expand Down
Loading

0 comments on commit 2d1ef88

Please sign in to comment.