Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/candidate-9.6.x' into candidate-…
Browse files Browse the repository at this point in the history
…9.8.x
  • Loading branch information
GordonSmith committed Sep 5, 2024
2 parents be177ec + 1914fa2 commit 8936914
Show file tree
Hide file tree
Showing 5 changed files with 440 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.io.BufferedInputStream;
import java.io.Console;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
Expand Down Expand Up @@ -325,6 +326,28 @@ public JSONArray generateResultsMessage()
}
};

private static String[] getCredentials(CommandLine cmd)
{
Console console = System.console();

String user = cmd.getOptionValue("user");
boolean userIsEmpty = user == null || user.isEmpty();
if (userIsEmpty)
{
user = new String(console.readLine("Enter username: "));
userIsEmpty = user == null || user.isEmpty();
}

String pass = cmd.getOptionValue("pass");
boolean passIsEmpty = pass == null || pass.isEmpty();
if (!userIsEmpty && passIsEmpty)
{
pass = new String(console.readPassword("Enter password for " + user + ": "));
}

return new String[] {user, pass};
}

private static enum FileFormat
{
THOR,
Expand Down Expand Up @@ -1205,8 +1228,10 @@ private static void performRead(String[] args, TaskContext context)
}

String connString = cmd.getOptionValue("url");
String user = cmd.getOptionValue("user");
String pass = cmd.getOptionValue("pass");

String[] creds = getCredentials(cmd);
String user = creds[0];
String pass = creds[1];

String outputPath = cmd.getOptionValue("out",".");

Expand Down Expand Up @@ -1383,8 +1408,10 @@ private static void performReadTest(String[] args, TaskContext context)
}

String connString = cmd.getOptionValue("url");
String user = cmd.getOptionValue("user");
String pass = cmd.getOptionValue("pass");

String[] creds = getCredentials(cmd);
String user = creds[0];
String pass = creds[1];

String outputPath = cmd.getOptionValue("out",".");

Expand Down Expand Up @@ -1592,8 +1619,9 @@ private static void performCopy(String[] args, TaskContext context)
+ numThreadsStr + ", must be an integer. Defaulting to: " + NUM_DEFAULT_THREADS + " threads.");
}

String user = cmd.getOptionValue("user");
String pass = cmd.getOptionValue("pass");
String[] creds = getCredentials(cmd);
String user = creds[0];
String pass = creds[1];

String destClusterName = cmd.getOptionValue("dest_cluster");

Expand Down Expand Up @@ -1773,8 +1801,9 @@ private static void performWrite(String[] args, TaskContext context)
+ numThreadsStr + ", must be an integer. Defaulting to: " + NUM_DEFAULT_THREADS + " threads.");
}

String user = cmd.getOptionValue("user");
String pass = cmd.getOptionValue("pass");
String[] creds = getCredentials(cmd);
String user = creds[0];
String pass = creds[1];

String destClusterName = cmd.getOptionValue("dest_cluster");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.semconv.ServerAttributes;

import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -142,8 +143,9 @@ public HPCCRemoteFileWriter(FileWriteContext ctx, DataPartition dp, IRecordAcces

this.recordAccessor = recordAccessor;

this.writeSpanName = "HPCCRemoteFileWriter.RowService/Write_" + dp.getFileName() + "_" + dp.getThisPart();
this.writeSpanName = "HPCCRemoteFileWriter/Write_" + dp.getFileName() + "_" + dp.getThisPart();
this.writeSpan = Utils.createChildSpan(context.parentSpan, writeSpanName);
this.writeSpan.setStatus(StatusCode.OK);

String primaryIP = dp.getCopyIP(0);
String secondaryIP = "";
Expand All @@ -154,7 +156,7 @@ public HPCCRemoteFileWriter(FileWriteContext ctx, DataPartition dp, IRecordAcces

Attributes attributes = Attributes.of( AttributeKey.stringKey("server.0.address"), primaryIP,
AttributeKey.stringKey("server.1.address"), secondaryIP,
ServerAttributes.SERVER_PORT, Long.valueOf(dp.getPort()));
AttributeKey.stringKey("server.port"), Integer.toString(dp.getPort()));
writeSpan.setAllAttributes(attributes);

this.outputStream = new RowServiceOutputStream(dataPartition.getCopyIP(0), dataPartition.getPort(), dataPartition.getUseSsl(),
Expand All @@ -181,8 +183,20 @@ public HPCCRemoteFileWriter(FileWriteContext ctx, DataPartition dp, IRecordAcces
*/
public void writeRecord(T record) throws Exception
{
this.binaryRecordWriter.writeRecord(record);
this.recordsWritten++;
try
{
this.binaryRecordWriter.writeRecord(record);
this.recordsWritten++;
}
catch (Exception e)
{
log.error("HPCCRemoteFileWriter: Error writing record: " + e.getMessage());
this.writeSpan.recordException(e);
this.writeSpan.setStatus(StatusCode.ERROR);
this.writeSpan.end();

throw e;
}
}

/**
Expand All @@ -197,7 +211,20 @@ public void writeRecords(Iterator<T> it) throws Exception
{
while (it.hasNext())
{
this.binaryRecordWriter.writeRecord(it.next());
try
{
this.binaryRecordWriter.writeRecord(it.next());
this.recordsWritten++;
}
catch (Exception e)
{
log.error("HPCCRemoteFileWriter: Error writing record: " + e.getMessage());
this.writeSpan.recordException(e);
this.writeSpan.setStatus(StatusCode.ERROR);
this.writeSpan.end();

throw e;
}
this.recordsWritten++;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.semconv.ServerAttributes;

import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -73,6 +74,7 @@ public static class FileReadContext
public int recordReadLimit = -1;
public boolean createPrefetchThread = true;
public int readSizeKB = -1;
public int readRequestSpanBatchSize = -1; // The number of read requests before creating a new span
public Span parentSpan = null;
};

Expand Down Expand Up @@ -266,21 +268,7 @@ public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilde
this.dataPartition = dp;
this.recordBuilder = recBuilder;

String readSpanName = "HPCCRemoteFileReader.RowService/Read_" + dataPartition.getFileName() + "_" + dataPartition.getThisPart();
this.readSpan = Utils.createChildSpan(context.parentSpan, readSpanName);

String primaryIP = dp.getCopyIP(0);
String secondaryIP = "";
if (dp.getCopyCount() > 1)
{
secondaryIP = dp.getCopyIP(1);
}

Attributes attributes = Attributes.of( AttributeKey.stringKey("server.0.address"), primaryIP,
AttributeKey.stringKey("server.1.address"), secondaryIP,
ServerAttributes.SERVER_PORT, Long.valueOf(dp.getPort()),
AttributeKey.longKey("read.size"), Long.valueOf(context.readSizeKB*1000));
this.readSpan.setAllAttributes(attributes);
this.readSpan = createReadSpan(ctx, dp);

if (context.originalRD == null)
{
Expand All @@ -304,6 +292,7 @@ public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilde
this.inputStream = new RowServiceInputStream(this.dataPartition, context.originalRD, projectedRecordDefinition, context.connectTimeout,
context.recordReadLimit, context.createPrefetchThread, context.readSizeKB, null,
false, context.socketOpTimeoutMS, this.readSpan);
this.inputStream.setReadRequestSpanBatchSize(context.readRequestSpanBatchSize);
this.binaryRecordReader = new BinaryRecordReader(this.inputStream);
this.binaryRecordReader.initialize(this.recordBuilder);

Expand All @@ -321,13 +310,15 @@ public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilde
this.inputStream = new RowServiceInputStream(this.dataPartition, context.originalRD, projectedRecordDefinition, context.connectTimeout,
context.recordReadLimit, context.createPrefetchThread, context.readSizeKB, restartInfo,
false, context.socketOpTimeoutMS, this.readSpan);
this.inputStream.setReadRequestSpanBatchSize(context.readRequestSpanBatchSize);

long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos;
if (bytesToSkip < 0)
{
Exception e = new Exception("Unable to restart read stream, unexpected stream position in record reader.");
this.readSpan.recordException(e);
this.readSpan.end();
throw e;
}
this.inputStream.skip(bytesToSkip);

Expand All @@ -344,6 +335,35 @@ public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilde
openTimeMs = System.currentTimeMillis();
}

private static Span createReadSpan(FileReadContext context, DataPartition dp)
{
String readSpanName = "HPCCRemoteFileReader/Read_" + dp.getFileName() + "_" + dp.getThisPart();
Span readSpan = Utils.createChildSpan(context.parentSpan, readSpanName);
readSpan.setStatus(StatusCode.OK);

String primaryIP = dp.getCopyIP(0);
String secondaryIP = "";
if (dp.getCopyCount() > 1)
{
secondaryIP = dp.getCopyIP(1);
}

long readSize = context.readSizeKB;
if (readSize < 0)
{
readSize = RowServiceInputStream.DEFAULT_MAX_READ_SIZE_KB;
}
readSize *= 1000;

Attributes attributes = Attributes.of( AttributeKey.stringKey("server.0.address"), primaryIP,
AttributeKey.stringKey("server.1.address"), secondaryIP,
AttributeKey.stringKey("server.port"), Integer.toString(dp.getPort()),
AttributeKey.longKey("read.size"), Long.valueOf(readSize));
readSpan.setAllAttributes(attributes);

return readSpan;
}

private boolean retryRead()
{
if (retryCount < maxReadRetries)
Expand All @@ -364,20 +384,12 @@ private boolean retryRead()

try
{
String readSpanName = "HPCCRemoteFileReader.RowService/Read_" + dataPartition.getFileName() + "_" + dataPartition.getThisPart();
if (context.parentSpan != null)
{
this.readSpan = Utils.createChildSpan(context.parentSpan, readSpanName);
}
else
{
this.readSpan = Utils.createSpan(readSpanName);
}
this.readSpan = createReadSpan(context, dataPartition);

this.inputStream = new RowServiceInputStream(this.dataPartition, context.originalRD,this.recordBuilder.getRecordDefinition(),
context.connectTimeout, context.recordReadLimit, context.createPrefetchThread,
context.readSizeKB, restartInfo, false, context.socketOpTimeoutMS, this.readSpan);

this.inputStream.setReadRequestSpanBatchSize(context.readRequestSpanBatchSize);
long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos;
if (bytesToSkip < 0)
{
Expand All @@ -391,6 +403,7 @@ private boolean retryRead()
catch (Exception e)
{
this.readSpan.recordException(e);
this.readSpan.setStatus(StatusCode.ERROR);
this.readSpan.end();
log.error("Failed to retry read for " + this.dataPartition.toString() + " " + e.getMessage(), e);
return false;
Expand Down Expand Up @@ -529,6 +542,10 @@ public boolean hasNext()
}
catch (HpccFileException e)
{
this.readSpan.recordException(e);
this.readSpan.setStatus(StatusCode.ERROR);
this.readSpan.end();

if (!retryRead())
{
canReadNext = false;
Expand Down Expand Up @@ -564,6 +581,10 @@ public T next()
}
catch (HpccFileException e)
{
this.readSpan.recordException(e);
this.readSpan.setStatus(StatusCode.ERROR);
this.readSpan.end();

if (!retryRead())
{
log.error("Read failure for " + this.dataPartition.toString() + " " + e.getMessage(), e);
Expand Down
Loading

0 comments on commit 8936914

Please sign in to comment.