Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC4J-595 Enhance log messages #709

Merged
merged 7 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public boolean hasNext() throws HpccFileException
{
if (this.rootRecordBuilder == null)
{
throw new HpccFileException("RecordReader must be initialized before being used.");
throw new HpccFileException("BinaryRecordReader.hasNext(): RecordReader must be initialized before being used. rootRecordBuilder is null, hasNext() failed.");
}

int nextByte = -1;
Expand Down Expand Up @@ -299,7 +299,7 @@ public boolean hasNext() throws HpccFileException
}
catch (IOException e)
{
throw new HpccFileException(e);
throw new HpccFileException("BinaryRecordReader.hasNext(): failed to peek at the next byte in the input stream:" + e.getMessage(),e);
}

return nextByte >= 0;
Expand All @@ -314,7 +314,7 @@ public Object getNext() throws HpccFileException
{
if (this.rootRecordBuilder == null)
{
throw new HpccFileException("RecordReader must be initialized before being used.");
throw new HpccFileException("BinaryRecordReader.getNext(): RecordReader must be initialized before being used, rootRecordBuilder is null.");
}

if (!this.hasNext()) throw new NoSuchElementException("No next record!");
Expand All @@ -325,13 +325,13 @@ public Object getNext() throws HpccFileException
record = parseRecord(this.rootRecordDefinition, this.rootRecordBuilder, this.defaultLE);
if (record == null)
{
throw new HpccFileException("RecordContent not found, or invalid record structure. Check logs for more information.");
throw new HpccFileException("BinaryRecordReader.getNext(): RecordContent not found, or invalid record structure. Check logs for more information.");
}

}
catch (Exception e)
{
throw new HpccFileException("Failed to parse next record: " + e.getMessage(), e);
throw new HpccFileException("BinaryRecordReader.getNext(): Failed to parse next record: " + e.getMessage(), e);
}

this.streamPosAfterLastRecord = this.inputStream.getStreamPosition();
Expand Down Expand Up @@ -370,7 +370,7 @@ private Object parseFlatField(FieldDef fd, boolean isLittleEndian) throws Unpars

if (fd.isFixed() && fd.getDataLen() > Integer.MAX_VALUE)
{
throw new UnparsableContentException("Data length: " + fd.getDataLen() + " exceeds max supported length: " + Integer.MAX_VALUE);
throw new UnparsableContentException("BinaryRecordReader.parseFlatField(): Data length: " + fd.getDataLen() + " exceeds max supported length: " + Integer.MAX_VALUE);
}

// Embedded field lengths are little endian
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class DataPartition implements Serializable
private String fileAccessBlob;
private FileType fileType;
private boolean isTLK;
private String fileName;

public static enum FileType
{
Expand Down Expand Up @@ -197,13 +198,42 @@ private DataPartition(String[] copyLocations, String[] copyPaths, int partNum, i
* the file type
*/
private DataPartition(String[] copylocations, String[] copyPaths, int this_part, int num_parts, int clearport, boolean sslport, FileFilter filter,
String fileAccessBlob, FileType fileType)
String fileAccessBlob, FileType fileType) {
this(copylocations,copyPaths,this_part,num_parts,clearport,sslport,filter,fileAccessBlob,fileType,null);
}
/**
* Construct the data part, used by makeParts.
*
* @param copylocations
* locations of all copies of this file part
* @param copyPaths
* the copy paths
* @param this_part
* part number
* @param num_parts
* number of parts
* @param clearport
* port number of clear communications
* @param sslport
* port number of ssl communications
* @param filter
* the file filter object
* @param fileAccessBlob
* file access token
* @param fileType
* the file type
* @param fileName
* the file name
*/
private DataPartition(String[] copylocations, String[] copyPaths, int this_part, int num_parts, int clearport, boolean sslport, FileFilter filter,
String fileAccessBlob, FileType fileType, String fileName)
{
this.this_part = this_part;
this.num_parts = num_parts;
this.rowservicePort = clearport;
this.useSSL = sslport;
this.fileFilter = filter;
this.fileName=fileName;
if (this.fileFilter == null)
{
this.fileFilter = new FileFilter();
Expand Down Expand Up @@ -348,6 +378,16 @@ public boolean getUseSsl()
return useSSL;
}

/**
* File name being read
*
* @return filename
*/
public String getFileName()
{
return fileName;
}

/**
* Copy Path.
*
Expand Down Expand Up @@ -415,8 +455,7 @@ public DataPartition setFilter(FileFilter filter)
public String toString()
{
StringBuilder sb = new StringBuilder();
sb.append(this.getThisPart());
sb.append(" copy locations: {");
sb.append("part ").append(this.getThisPart()).append(", copy locations: {");
for (int copyindex = 0; copyindex < getCopyCount(); copyindex++)
{
if (copyindex > 0) sb.append(", ");
Expand Down Expand Up @@ -471,6 +510,31 @@ public static DataPartition[] createPartitions(DFUFilePartWrapper[] dfuparts, Cl
return createPartitions(dfuparts, clusterremapper, max_parts, FileFilter.nullFilter(), fileAccessBlob, FileType.FLAT);
}


/**
* Creates the partitions.
*
* @param dfuparts
* the dfuparts
* @param clusterremapper
* the clusterremapper
* @param max_parts
* the max parts
* @param filter
* the filter
* @param fileAccessBlob
* the file access blob
* @param fileType
* the file type
* @return the data partition[]
* @throws HpccFileException
* the hpcc file exception
*/
public static DataPartition[] createPartitions(DFUFilePartWrapper[] dfuparts, ClusterRemapper clusterremapper, int max_parts, FileFilter filter,
String fileAccessBlob, FileType fileType) throws HpccFileException {
return createPartitions(dfuparts,clusterremapper,max_parts,filter,fileAccessBlob,fileType,null);
}

/**
* Creates the partitions.
*
Expand All @@ -486,12 +550,14 @@ public static DataPartition[] createPartitions(DFUFilePartWrapper[] dfuparts, Cl
* the file access blob
* @param fileType
* the file type
* @param fileName
* the file name
* @return the data partition[]
* @throws HpccFileException
* the hpcc file exception
*/
public static DataPartition[] createPartitions(DFUFilePartWrapper[] dfuparts, ClusterRemapper clusterremapper, int max_parts, FileFilter filter,
String fileAccessBlob, FileType fileType) throws HpccFileException
String fileAccessBlob, FileType fileType, String fileName) throws HpccFileException
{
DataPartition[] rslt = new DataPartition[dfuparts.length];

Expand All @@ -508,7 +574,7 @@ public static DataPartition[] createPartitions(DFUFilePartWrapper[] dfuparts, Cl

DataPartition new_dp = new DataPartition(clusterremapper.reviseIPs(dfuparts[i].getCopies()), copyPaths, dfuparts[i].getPartIndex(),
dfuparts.length, clusterremapper.revisePort(null), clusterremapper.getUsesSSLConnection(null), filter, fileAccessBlob,
fileType);
fileType,fileName);
new_dp.isTLK = dfuparts[i].isTopLevelKey();

rslt[i] = new_dp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ private void createDataParts() throws HpccFileException
{
ClusterRemapper clusterremapper = ClusterRemapper.makeMapper(clusterRemapInfo, fileinfoforread);
this.dataParts = DataPartition.createPartitions(fileinfoforread.getFileParts(), clusterremapper,
/* maxParts currently ignored anyway */0, filter, fileinfoforread.getFileAccessInfoBlob(), fileType);
/* maxParts currently ignored anyway */0, filter, fileinfoforread.getFileAccessInfoBlob(), fileType,this.getFileName());

// Check to see if this file has a TLK. The TLK will always be the last partition.
// If we do have a TLK remove it from the standard list of data partitions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde
* @throws Exception
* general exception
*/
public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilder recBuilder, int connectTimeout, int limit, boolean createPrefetchThread, int readSizeKB, FileReadResumeInfo resumeInfo, int socketOpTimeoutMs) throws Exception
public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilder recBuilder, int connectTimeout, int limit, boolean createPrefetchThread, int readSizeKB, FileReadResumeInfo resumeInfo, int socketOpTimeoutMs) throws Exception
{
this.handlePrefetch = createPrefetchThread;
this.originalRecordDef = originalRD;
Expand Down Expand Up @@ -280,8 +280,8 @@ private boolean retryRead()
try
{
this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef,
this.recordBuilder.getRecordDefinition(), this.connectTimeout, this.limit, this.createPrefetchThread,
this.readSizeKB, restartInfo, false, this.socketOpTimeoutMs);
this.recordBuilder.getRecordDefinition(), this.connectTimeout, this.limit, this.createPrefetchThread,
this.readSizeKB, restartInfo, false, this.socketOpTimeoutMs);
long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos;
if (bytesToSkip < 0)
{
Expand Down Expand Up @@ -434,7 +434,7 @@ public boolean hasNext()
if (!retryRead())
{
canReadNext = false;
log.error("Read failure for " + this.dataPartition.toString(), e);
log.error("Read failure for " + this.dataPartition.toString() +":" + e.getMessage(),e);
java.util.NoSuchElementException exception = new java.util.NoSuchElementException("Fatal read error: " + e.getMessage());
exception.initCause(e);
throw exception;
Expand Down Expand Up @@ -505,7 +505,7 @@ public void close() throws Exception

long closeTimeMs = System.currentTimeMillis();
double readTimeS = (closeTimeMs - openTimeMs) / 1000.0;
log.info("HPCCRemoteFileReader: Closing file part: " + dataPartition.getThisPart()
log.info("HPCCRemoteFileReader: Closing file part: " + dataPartition.getThisPart() + " for " + dataPartition.getFileName()
+ " read time: " + readTimeS + "s "
+ " records read: " + recordsRead);
}
Expand Down Expand Up @@ -550,8 +550,8 @@ public void report()
{
if (getRemoteReadMessageCount() > 0)
{
log.warn("DataPartition '" + this.dataPartition + "' read operation messages:\n");
log.warn("DataPartition '" + this.dataPartition + "' read operation messages for " + dataPartition.getFileName() + ":\n");
log.warn(getRemoteReadMessages());
}
}
}
}
Loading
Loading