Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC4J-636 DFSClient: Improve Opentelemetry tracing #743

Merged
merged 1 commit into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.semconv.ServerAttributes;

import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -142,8 +143,9 @@ public HPCCRemoteFileWriter(FileWriteContext ctx, DataPartition dp, IRecordAcces

this.recordAccessor = recordAccessor;

this.writeSpanName = "HPCCRemoteFileWriter.RowService/Write_" + dp.getFileName() + "_" + dp.getThisPart();
this.writeSpanName = "HPCCRemoteFileWriter/Write_" + dp.getFileName() + "_" + dp.getThisPart();
this.writeSpan = Utils.createChildSpan(context.parentSpan, writeSpanName);
this.writeSpan.setStatus(StatusCode.OK);

String primaryIP = dp.getCopyIP(0);
String secondaryIP = "";
Expand All @@ -154,7 +156,7 @@ public HPCCRemoteFileWriter(FileWriteContext ctx, DataPartition dp, IRecordAcces

Attributes attributes = Attributes.of( AttributeKey.stringKey("server.0.address"), primaryIP,
AttributeKey.stringKey("server.1.address"), secondaryIP,
ServerAttributes.SERVER_PORT, Long.valueOf(dp.getPort()));
AttributeKey.stringKey("server.port"), Integer.toString(dp.getPort()));
writeSpan.setAllAttributes(attributes);

this.outputStream = new RowServiceOutputStream(dataPartition.getCopyIP(0), dataPartition.getPort(), dataPartition.getUseSsl(),
Expand All @@ -181,8 +183,20 @@ public HPCCRemoteFileWriter(FileWriteContext ctx, DataPartition dp, IRecordAcces
*/
public void writeRecord(T record) throws Exception
{
this.binaryRecordWriter.writeRecord(record);
this.recordsWritten++;
try
{
this.binaryRecordWriter.writeRecord(record);
this.recordsWritten++;
}
catch (Exception e)
{
log.error("HPCCRemoteFileWriter: Error writing record: " + e.getMessage());
this.writeSpan.recordException(e);
this.writeSpan.setStatus(StatusCode.ERROR);
this.writeSpan.end();

throw e;
}
}

/**
Expand All @@ -197,7 +211,20 @@ public void writeRecords(Iterator<T> it) throws Exception
{
while (it.hasNext())
{
this.binaryRecordWriter.writeRecord(it.next());
try
{
this.binaryRecordWriter.writeRecord(it.next());
this.recordsWritten++;
}
catch (Exception e)
{
log.error("HPCCRemoteFileWriter: Error writing record: " + e.getMessage());
this.writeSpan.recordException(e);
this.writeSpan.setStatus(StatusCode.ERROR);
this.writeSpan.end();

throw e;
}
this.recordsWritten++;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.semconv.ServerAttributes;

import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -73,6 +74,7 @@ public static class FileReadContext
public int recordReadLimit = -1;
public boolean createPrefetchThread = true;
public int readSizeKB = -1;
public int readRequestSpanBatchSize = -1; // The number of read requests before creating a new span
public Span parentSpan = null;
};

Expand Down Expand Up @@ -266,21 +268,7 @@ public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilde
this.dataPartition = dp;
this.recordBuilder = recBuilder;

String readSpanName = "HPCCRemoteFileReader.RowService/Read_" + dataPartition.getFileName() + "_" + dataPartition.getThisPart();
this.readSpan = Utils.createChildSpan(context.parentSpan, readSpanName);

String primaryIP = dp.getCopyIP(0);
String secondaryIP = "";
if (dp.getCopyCount() > 1)
{
secondaryIP = dp.getCopyIP(1);
}

Attributes attributes = Attributes.of( AttributeKey.stringKey("server.0.address"), primaryIP,
AttributeKey.stringKey("server.1.address"), secondaryIP,
ServerAttributes.SERVER_PORT, Long.valueOf(dp.getPort()),
AttributeKey.longKey("read.size"), Long.valueOf(context.readSizeKB*1000));
this.readSpan.setAllAttributes(attributes);
this.readSpan = createReadSpan(ctx, dp);

if (context.originalRD == null)
{
Expand All @@ -304,6 +292,7 @@ public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilde
this.inputStream = new RowServiceInputStream(this.dataPartition, context.originalRD, projectedRecordDefinition, context.connectTimeout,
context.recordReadLimit, context.createPrefetchThread, context.readSizeKB, null,
false, context.socketOpTimeoutMS, this.readSpan);
this.inputStream.setReadRequestSpanBatchSize(context.readRequestSpanBatchSize);
this.binaryRecordReader = new BinaryRecordReader(this.inputStream);
this.binaryRecordReader.initialize(this.recordBuilder);

Expand All @@ -321,13 +310,15 @@ public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilde
this.inputStream = new RowServiceInputStream(this.dataPartition, context.originalRD, projectedRecordDefinition, context.connectTimeout,
context.recordReadLimit, context.createPrefetchThread, context.readSizeKB, restartInfo,
false, context.socketOpTimeoutMS, this.readSpan);
this.inputStream.setReadRequestSpanBatchSize(context.readRequestSpanBatchSize);

long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos;
if (bytesToSkip < 0)
{
Exception e = new Exception("Unable to restart read stream, unexpected stream position in record reader.");
this.readSpan.recordException(e);
this.readSpan.end();
throw e;
}
this.inputStream.skip(bytesToSkip);

Expand All @@ -344,6 +335,35 @@ public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilde
openTimeMs = System.currentTimeMillis();
}

private static Span createReadSpan(FileReadContext context, DataPartition dp)
{
String readSpanName = "HPCCRemoteFileReader/Read_" + dp.getFileName() + "_" + dp.getThisPart();
Span readSpan = Utils.createChildSpan(context.parentSpan, readSpanName);
readSpan.setStatus(StatusCode.OK);

String primaryIP = dp.getCopyIP(0);
String secondaryIP = "";
if (dp.getCopyCount() > 1)
{
secondaryIP = dp.getCopyIP(1);
}

long readSize = context.readSizeKB;
if (readSize < 0)
{
readSize = RowServiceInputStream.DEFAULT_MAX_READ_SIZE_KB;
}
readSize *= 1000;

Attributes attributes = Attributes.of( AttributeKey.stringKey("server.0.address"), primaryIP,
AttributeKey.stringKey("server.1.address"), secondaryIP,
AttributeKey.stringKey("server.port"), Integer.toString(dp.getPort()),
AttributeKey.longKey("read.size"), Long.valueOf(readSize));
readSpan.setAllAttributes(attributes);

return readSpan;
}

private boolean retryRead()
{
if (retryCount < maxReadRetries)
Expand All @@ -364,20 +384,12 @@ private boolean retryRead()

try
{
String readSpanName = "HPCCRemoteFileReader.RowService/Read_" + dataPartition.getFileName() + "_" + dataPartition.getThisPart();
if (context.parentSpan != null)
{
this.readSpan = Utils.createChildSpan(context.parentSpan, readSpanName);
}
else
{
this.readSpan = Utils.createSpan(readSpanName);
}
this.readSpan = createReadSpan(context, dataPartition);

this.inputStream = new RowServiceInputStream(this.dataPartition, context.originalRD,this.recordBuilder.getRecordDefinition(),
context.connectTimeout, context.recordReadLimit, context.createPrefetchThread,
context.readSizeKB, restartInfo, false, context.socketOpTimeoutMS, this.readSpan);

this.inputStream.setReadRequestSpanBatchSize(context.readRequestSpanBatchSize);
long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos;
if (bytesToSkip < 0)
{
Expand All @@ -391,6 +403,7 @@ private boolean retryRead()
catch (Exception e)
{
this.readSpan.recordException(e);
this.readSpan.setStatus(StatusCode.ERROR);
this.readSpan.end();
log.error("Failed to retry read for " + this.dataPartition.toString() + " " + e.getMessage(), e);
return false;
Expand Down Expand Up @@ -529,6 +542,10 @@ public boolean hasNext()
}
catch (HpccFileException e)
{
this.readSpan.recordException(e);
this.readSpan.setStatus(StatusCode.ERROR);
this.readSpan.end();

if (!retryRead())
{
canReadNext = false;
Expand Down Expand Up @@ -564,6 +581,10 @@ public T next()
}
catch (HpccFileException e)
{
this.readSpan.recordException(e);
this.readSpan.setStatus(StatusCode.ERROR);
this.readSpan.end();

if (!retryRead())
{
log.error("Read failure for " + this.dataPartition.toString() + " " + e.getMessage(), e);
Expand Down
Loading
Loading