Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC4J-611 Add OpenTelemetry tracing to dfsclient #716

Merged
merged 1 commit into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,14 @@

import org.hpccsystems.commons.ecl.FieldDef;
import org.hpccsystems.commons.ecl.RecordDefinitionTranslator;

import org.hpccsystems.dfs.client.RowServiceOutputStream;
import org.hpccsystems.dfs.client.Utils;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.semconv.ServerAttributes;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
Expand All @@ -37,6 +44,9 @@ public class HPCCRemoteFileWriter<T>
private long recordsWritten = 0;
private long openTimeMs = 0;

private Span writeSpan = null;
private String writeSpanName = null;

/**
* A remote file writer.
*
Expand Down Expand Up @@ -105,9 +115,24 @@ public HPCCRemoteFileWriter(DataPartition dp, FieldDef recordDef, IRecordAccesso

this.recordAccessor = recordAccessor;

this.writeSpanName = "HPCCRemoteFileWriter.RowService/Write_" + dp.getFileName() + "_" + dp.getThisPart();
this.writeSpan = Utils.createSpan(writeSpanName);

String primaryIP = dp.getCopyIP(0);
String secondaryIP = "";
if (dp.getCopyCount() > 1)
{
secondaryIP = dp.getCopyIP(1);
}

Attributes attributes = Attributes.of( AttributeKey.stringKey("server.primary.address"), primaryIP,
AttributeKey.stringKey("server.secondary.address"), secondaryIP,
ServerAttributes.SERVER_PORT, Long.valueOf(dp.getPort()));
writeSpan.setAllAttributes(attributes);

this.outputStream = new RowServiceOutputStream(dataPartition.getCopyIP(0), dataPartition.getPort(), dataPartition.getUseSsl(),
dataPartition.getFileAccessBlob(), this.recordDef, this.dataPartition.getThisPart(), this.dataPartition.getCopyPath(0),
fileCompression, connectTimeoutMs, socketOpTimeoutMs);
fileCompression, connectTimeoutMs, socketOpTimeoutMs, this.writeSpan);

this.binaryRecordWriter = new BinaryRecordWriter(this.outputStream);
this.binaryRecordWriter.initialize(this.recordAccessor);
Expand Down Expand Up @@ -161,6 +186,8 @@ public void close() throws Exception
this.report();
this.binaryRecordWriter.finalize();

this.writeSpan.end();

long closeTimeMs = System.currentTimeMillis();
double writeTimeS = (closeTimeMs - openTimeMs) / 1000.0;
log.info("HPCCRemoteFileWriter: Closing file part: " + dataPartition.getThisPart()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,17 @@
*******************************************************************************/
package org.hpccsystems.dfs.client;

import org.hpccsystems.dfs.client.Utils;

import org.hpccsystems.commons.ecl.FieldDef;
import org.hpccsystems.commons.ecl.RecordDefinitionTranslator;
import org.hpccsystems.commons.errors.HpccFileException;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.semconv.ServerAttributes;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

Expand Down Expand Up @@ -48,6 +56,9 @@ public class HpccRemoteFileReader<T> implements Iterator<T>
private long openTimeMs = 0;
private long recordsRead = 0;

private Span readSpan = null;
private String readSpanName = null;

public static final int NO_RECORD_LIMIT = -1;
public static final int DEFAULT_READ_SIZE_OPTION = -1;
public static final int DEFAULT_CONNECT_TIMEOUT_OPTION = -1;
Expand Down Expand Up @@ -204,6 +215,22 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde
this.createPrefetchThread = createPrefetchThread;
this.socketOpTimeoutMs = socketOpTimeoutMs;

this.readSpanName = "HPCCRemoteFileReader.RowService/Read_" + dp.getFileName() + "_" + dp.getThisPart();
this.readSpan = Utils.createSpan(readSpanName);

String primaryIP = dp.getCopyIP(0);
String secondaryIP = "";
if (dp.getCopyCount() > 1)
{
secondaryIP = dp.getCopyIP(1);
}

Attributes attributes = Attributes.of( AttributeKey.stringKey("server.primary.address"), primaryIP,
AttributeKey.stringKey("server.secondary.address"), secondaryIP,
ServerAttributes.SERVER_PORT, Long.valueOf(dp.getPort()),
AttributeKey.longKey("read.size"), Long.valueOf(readSizeKB*1000));
readSpan.setAllAttributes(attributes);

if (connectTimeout < 1)
{
connectTimeout = RowServiceInputStream.DEFAULT_CONNECT_TIMEOUT_MILIS;
Expand All @@ -212,18 +239,24 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde

if (this.originalRecordDef == null)
{
throw new Exception("HpccRemoteFileReader: Provided original record definition is null, original record definition is required.");
Exception e = new Exception("HpccRemoteFileReader: Provided original record definition is null, original record definition is required.");
this.readSpan.recordException(e);
this.readSpan.end();
throw e;
}

FieldDef projectedRecordDefinition = recBuilder.getRecordDefinition();
if (projectedRecordDefinition == null)
{
throw new Exception("IRecordBuilder does not have a valid record definition.");
Exception e = new Exception("IRecordBuilder does not have a valid record definition.");
this.readSpan.recordException(e);
this.readSpan.end();
throw e;
}

if (resumeInfo == null)
{
this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB, null, false, socketOpTimeoutMs);
this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB, null, false, socketOpTimeoutMs, readSpan);
this.binaryRecordReader = new BinaryRecordReader(this.inputStream);
this.binaryRecordReader.initialize(this.recordBuilder);

Expand All @@ -238,11 +271,14 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde
restartInfo.streamPos = resumeInfo.inputStreamPos;
restartInfo.tokenBin = resumeInfo.tokenBin;

this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB, restartInfo, false, socketOpTimeoutMs);
this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB, restartInfo, false, socketOpTimeoutMs, this.readSpan);

long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos;
if (bytesToSkip < 0)
{
throw new Exception("Unable to restart unexpected stream pos in record reader.");
Exception e = new Exception("Unable to restart read stream, unexpected stream position in record reader.");
this.readSpan.recordException(e);
this.readSpan.end();
}
this.inputStream.skip(bytesToSkip);

Expand Down Expand Up @@ -279,9 +315,11 @@ private boolean retryRead()

try
{
this.readSpan = Utils.createSpan(readSpanName);
this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef,
this.recordBuilder.getRecordDefinition(), this.connectTimeout, this.limit, this.createPrefetchThread,
this.readSizeKB, restartInfo, false, this.socketOpTimeoutMs);
this.readSizeKB, restartInfo, false, this.socketOpTimeoutMs, this.readSpan);

long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos;
if (bytesToSkip < 0)
{
Expand All @@ -294,6 +332,8 @@ private boolean retryRead()
}
catch (Exception e)
{
this.readSpan.recordException(e);
this.readSpan.end();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This situation seems important enough to report as an exception on the span, and if we can determine the readSpan has failed at this point, we should set the span status to fail.

log.error("Failed to retry read for " + this.dataPartition.toString() + " " + e.getMessage(), e);
return false;
}
Expand Down Expand Up @@ -499,7 +539,9 @@ public void close() throws Exception
return;
}

this.readSpan.end();
report();

this.inputStream.close();
isClosed = true;

Expand Down
Loading
Loading