src/test/resources/codegeneration/*.java
+ src/main/java/com/foundations/**/*.java
**/*.html
**/*.txt
**/*.xml
diff --git a/src/main/java/com/foundations/comparator/attributes/DateTimeSortAttributes.java b/src/main/java/com/foundations/comparator/attributes/DateTimeSortAttributes.java
new file mode 100644
index 0000000..bbb784c
--- /dev/null
+++ b/src/main/java/com/foundations/comparator/attributes/DateTimeSortAttributes.java
@@ -0,0 +1,29 @@
+package com.foundations.comparator.attributes;
+
+public final class DateTimeSortAttributes extends SortAttributes {
+
+ public static final String DEFAULT_PATTERN = "yyyy-MM-dd HH:mm:ss";
+
+ private String _pattern;
+
+ public DateTimeSortAttributes() {
+ _pattern = DEFAULT_PATTERN;
+ }
+
+ /**
+ * The date and time pattern used for the column to be sorted.
+ *
+ * Pattern syntax is based on java.text.SimpleDateFormat
+ * class documentation
+ */
+ public void setPattern(String value) {
+ _pattern = value;
+ }
+
+ /**
+ * Returns the date and time pattern for the column to be sorted.
+ */
+ public String getPattern() {
+ return _pattern;
+ }
+}
diff --git a/src/main/java/com/foundations/comparator/attributes/DecimalSortAttributes.java b/src/main/java/com/foundations/comparator/attributes/DecimalSortAttributes.java
new file mode 100644
index 0000000..2d9a8c4
--- /dev/null
+++ b/src/main/java/com/foundations/comparator/attributes/DecimalSortAttributes.java
@@ -0,0 +1,33 @@
+package com.foundations.comparator.attributes;
+
+import java.math.RoundingMode;
+
+public final class DecimalSortAttributes extends SortAttributes {
+
+ public static final int DEFAULT_SCALE = 2;
+ public static final RoundingMode DEFAULT_ROUNDING_MODE = RoundingMode.HALF_EVEN;
+
+ private int _scale;
+ private RoundingMode _roundingMode;
+
+ public DecimalSortAttributes() {
+ _scale = DEFAULT_SCALE;
+ _roundingMode = DEFAULT_ROUNDING_MODE;
+ }
+
+ public void setScale(int value) {
+ _scale = value;
+ }
+
+ public int getScale() {
+ return _scale;
+ }
+
+ public void setRoundingMode(RoundingMode value) {
+ _roundingMode = value;
+ }
+
+ public RoundingMode getRoundingMode() {
+ return _roundingMode;
+ }
+ }
diff --git a/src/main/java/com/foundations/comparator/attributes/SortAttributes.java b/src/main/java/com/foundations/comparator/attributes/SortAttributes.java
new file mode 100644
index 0000000..ced569b
--- /dev/null
+++ b/src/main/java/com/foundations/comparator/attributes/SortAttributes.java
@@ -0,0 +1,42 @@
+package com.foundations.comparator.attributes;
+
+public class SortAttributes {
+
+ public static final boolean DEFAULT_ASCENDING_ORDER = true;
+ public static final boolean DEFAULT_NULL_LOW_SORT_ORDER = true;
+ public static final boolean DEFAULT_TRIM = false;
+
+ private boolean _ascendingOrder;
+ private boolean _trim;
+ private boolean _nullLowSortOrder;
+
+ public SortAttributes() {
+ _ascendingOrder = DEFAULT_ASCENDING_ORDER;
+ _trim = DEFAULT_TRIM;
+ _nullLowSortOrder = DEFAULT_NULL_LOW_SORT_ORDER;
+ }
+
+ public void setAscendingOrder(boolean value) {
+ _ascendingOrder = value;
+ }
+
+ public boolean isAscendingOrder() {
+ return _ascendingOrder;
+ }
+
+ public void setTrim(boolean value) {
+ _trim = value;
+ }
+
+ public boolean isTrim() {
+ return _trim;
+ }
+
+ public void setNullLowSortOrder(boolean value) {
+ _nullLowSortOrder = value;
+ }
+
+ public boolean isNullLowSortOrder() {
+ return _nullLowSortOrder;
+ }
+}
diff --git a/src/main/java/com/foundations/comparator/attributes/StringSortAttributes.java b/src/main/java/com/foundations/comparator/attributes/StringSortAttributes.java
new file mode 100644
index 0000000..f6f0d3b
--- /dev/null
+++ b/src/main/java/com/foundations/comparator/attributes/StringSortAttributes.java
@@ -0,0 +1,31 @@
+package com.foundations.comparator.attributes;
+
+public final class StringSortAttributes extends SortAttributes {
+
+ public static final boolean DEFAULT_CASE_SENSITIVE = true;
+ public static final boolean DEFAULT_STRIP_ACCENTS = false;
+
+ private boolean _caseSensitive;
+ private boolean _stripAccents;
+
+ public StringSortAttributes() {
+ _caseSensitive = DEFAULT_CASE_SENSITIVE;
+ _stripAccents = DEFAULT_STRIP_ACCENTS;
+ }
+
+ public void setCaseSensitive(boolean value) {
+ _caseSensitive = value;
+ }
+
+ public boolean isCaseSensitive() {
+ return _caseSensitive;
+ }
+
+ public void setStripAccents(boolean value) {
+ _stripAccents = value;
+ }
+
+ public boolean isStripAccents() {
+ return _stripAccents;
+ }
+}
diff --git a/src/main/java/com/foundations/comparator/column/AbstractComparator.java b/src/main/java/com/foundations/comparator/column/AbstractComparator.java
new file mode 100644
index 0000000..a2db25e
--- /dev/null
+++ b/src/main/java/com/foundations/comparator/column/AbstractComparator.java
@@ -0,0 +1,97 @@
+package com.foundations.comparator.column;
+
+import com.foundations.comparator.attributes.SortAttributes;
+
+public abstract class AbstractComparator implements IColumnComparator {
+ private String _name;
+ private int _sortOrder;
+ private SortAttributes _sortAttributes;
+
+ public AbstractComparator(String name, int sortOrder, SortAttributes sortAttributes) {
+ _name = name;
+ _sortOrder = sortOrder;
+ _sortAttributes = sortAttributes;
+ }
+
+ public String getName() {
+ return _name;
+ }
+
+ public int getSortOrder() {
+ return _sortOrder;
+ }
+
+ public SortAttributes getSortAttributes() {
+ return _sortAttributes;
+ }
+
+ /**
+ * Returns a result of zero, greater than one, or less than one
+ * depending on the comparison of the two supplied Strings
+ *
+ * A return value of zero indicates the two Strings are equal
+ * A return value greater than one indicates String a is bigger than String b
+ * A return value less than one indicates String a is less than String b
+ *
+ * The first step in comparing the Strings involves swapping them if they are not
+ * already in ascending order.
+ *
+ * Next, any required trimming is performed if the Trim attribute has been set.
+ * The Strings are then normalized, ensuring zero-length Strings are treated as
+ * nulls.
+ *
+ * If both Strings turn out to be null after normalization, zero is returned.
+ * If one of the two Strings is null, the compare will consider the NullLowSortOrder
+ * attribute to determine the final result.
+ *
+ * If both Strings are not null, sub-classes must determine the final result
+ * of the compare by returning the value from a call to abstract method
+ * extendedCompare.
+ */
+ public int compare(String a, String b) {
+ int result = 0;
+ String stringA = normalizeString((_sortAttributes.isAscendingOrder() ? a : b));
+ String stringB = normalizeString((_sortAttributes.isAscendingOrder() ? b : a));
+
+ if( stringA != null && stringB != null ) {
+ result = extendedCompare(stringA, stringB);
+ }
+ else if( stringA == null && stringB == null ) {
+ result = 0;
+ }
+ else if ( stringA == null ) {
+ result = _sortAttributes.isNullLowSortOrder() ? -1 : 1;
+ }
+ else {
+ result = _sortAttributes.isNullLowSortOrder() ? 1 : -1;
+ }
+ return result;
+ }
+
+ /**
+ * Normalize the String for sorting
+ *
+ * Normalizing involves transforming the original value so
+ * that zero length Strings are treated as nulls. It also
+ * involves stripping trailing and leading spaces from the
+ * original, provided the isTrim attribute is set.
+ *
+ * @param original the String to be normalized
+ * @return the normalized text
+ */
+ private String normalizeString(String original) {
+ String result = null;
+
+ if( original != null ) {
+ if( _sortAttributes.isTrim() ) {
+ original = original.trim();
+ }
+ if( original.length() > 0 ) {
+ result = original;
+ }
+ }
+ return result;
+ }
+
+ protected abstract int extendedCompare(String a, String b);
+}
diff --git a/src/main/java/com/foundations/comparator/column/BooleanComparator.java b/src/main/java/com/foundations/comparator/column/BooleanComparator.java
new file mode 100644
index 0000000..d0f6fef
--- /dev/null
+++ b/src/main/java/com/foundations/comparator/column/BooleanComparator.java
@@ -0,0 +1,32 @@
+package com.foundations.comparator.column;
+
+import com.foundations.comparator.attributes.SortAttributes;
+
+public class BooleanComparator extends AbstractComparator {
+
+ public BooleanComparator(String name, int sortOrder, SortAttributes attributes) {
+ super(name, sortOrder, attributes);
+ }
+
+ protected int extendedCompare(String a, String b) {
+ Boolean aValue = new Boolean(parse(a));
+ Boolean bValue = new Boolean(parse(b));
+
+ return aValue.compareTo(bValue);
+ }
+
+ private boolean parse(String value) {
+ boolean result = false;
+
+ if ( value.toLowerCase().equals("true") || value.equals("1") ) {
+ result = true;
+ }
+ else if ( value.toLowerCase().equals("false") || value.equals("0") ) {
+ result = false;
+ }
+ else {
+ throw new RuntimeException( "Boolean Parse Exception: " + value);
+ }
+ return result;
+ }
+}
diff --git a/src/main/java/com/foundations/comparator/column/DateTimeComparator.java b/src/main/java/com/foundations/comparator/column/DateTimeComparator.java
new file mode 100644
index 0000000..b4cf56e
--- /dev/null
+++ b/src/main/java/com/foundations/comparator/column/DateTimeComparator.java
@@ -0,0 +1,55 @@
+package com.foundations.comparator.column;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import com.foundations.comparator.attributes.DateTimeSortAttributes;
+
+public final class DateTimeComparator extends AbstractComparator {
+
+ private SimpleDateFormat _formatter;
+
+ public DateTimeComparator(String name, int sortOrder, DateTimeSortAttributes sortAttributes) {
+ super(name, sortOrder, sortAttributes);
+ _formatter = new SimpleDateFormat(sortAttributes.getPattern());
+ }
+
+ protected int extendedCompare(String a, String b) {
+ int result;
+
+ try {
+ Date aValue = _formatter.parse(a);
+ Date bValue = _formatter.parse(b);
+ result = aValue.compareTo(bValue);
+ }
+ catch (ParseException e) {
+ throw new RuntimeException("Parse Exception: " + e.getMessage());
+ }
+
+ return result;
+ }
+}
+
+
+////////////////////////// USE FOLLOWING CODE FOR JAVA 8 ///////
+// import java.time.LocalDateTime;
+// import java.time.format.DateTimeFormatter;
+//
+// public final class DateTimeComparator extends AbstractComparator {
+//
+// private DateTimeFormatter _formatter;
+//
+// public DateTimeComparator(String name, int sortOrder, DateTimeSortAttributes sortAttributes) {
+// super(name, sortOrder, sortAttributes);
+// _formatter = DateTimeFormatter.ofPattern(sortAttributes.getPattern());
+// }
+//
+// protected int extendedCompare(String a, String b) {
+// LocalDateTime aValue = LocalDateTime.parse(a, _formatter);
+// LocalDateTime bValue = LocalDateTime.parse(b, _formatter);
+//
+// return aValue.compareTo(bValue);
+// }
+// }
+//////////////////////////USE FOLLOWING CODE FOR JAVA 8 ///////
diff --git a/src/main/java/com/foundations/comparator/column/DecimalComparator.java b/src/main/java/com/foundations/comparator/column/DecimalComparator.java
new file mode 100644
index 0000000..0b6db95
--- /dev/null
+++ b/src/main/java/com/foundations/comparator/column/DecimalComparator.java
@@ -0,0 +1,26 @@
+package com.foundations.comparator.column;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+
+import com.foundations.comparator.attributes.DecimalSortAttributes;
+
+public final class DecimalComparator extends AbstractComparator {
+
+ private int _scale;
+ private RoundingMode _roundingMode;
+
+ public DecimalComparator(String name, int sortOrder, DecimalSortAttributes attributes) {
+ super(name, sortOrder, attributes);
+
+ _scale = attributes.getScale();
+ _roundingMode = attributes.getRoundingMode();
+ }
+
+ protected int extendedCompare(String a, String b) {
+ BigDecimal aValue = new BigDecimal(a).setScale(_scale, _roundingMode);
+ BigDecimal bValue = new BigDecimal(b).setScale(_scale, _roundingMode);
+
+ return aValue.compareTo(bValue);
+ }
+}
diff --git a/src/main/java/com/foundations/comparator/column/IColumnComparator.java b/src/main/java/com/foundations/comparator/column/IColumnComparator.java
new file mode 100644
index 0000000..5e1b2fe
--- /dev/null
+++ b/src/main/java/com/foundations/comparator/column/IColumnComparator.java
@@ -0,0 +1,8 @@
+package com.foundations.comparator.column;
+
+import java.util.Comparator;
+
+public interface IColumnComparator extends Comparator {
+
+ public int getSortOrder();
+}
diff --git a/src/main/java/com/foundations/comparator/column/IntegerComparator.java b/src/main/java/com/foundations/comparator/column/IntegerComparator.java
new file mode 100644
index 0000000..7f656ab
--- /dev/null
+++ b/src/main/java/com/foundations/comparator/column/IntegerComparator.java
@@ -0,0 +1,19 @@
+package com.foundations.comparator.column;
+
+import java.math.BigInteger;
+
+import com.foundations.comparator.attributes.SortAttributes;
+
+public final class IntegerComparator extends AbstractComparator {
+
+ public IntegerComparator(String name, int sortOrder, SortAttributes attributes) {
+ super(name, sortOrder, attributes);
+ }
+
+ protected int extendedCompare(String a, String b) {
+ BigInteger aValue = new BigInteger(a);
+ BigInteger bValue = new BigInteger(b);
+
+ return aValue.compareTo(bValue);
+ }
+}
diff --git a/src/main/java/com/foundations/comparator/column/StringComparator.java b/src/main/java/com/foundations/comparator/column/StringComparator.java
new file mode 100644
index 0000000..dd99efb
--- /dev/null
+++ b/src/main/java/com/foundations/comparator/column/StringComparator.java
@@ -0,0 +1,51 @@
+package com.foundations.comparator.column;
+
+import java.text.Normalizer;
+import java.util.regex.Pattern;
+
+import com.foundations.comparator.attributes.StringSortAttributes;
+
+public final class StringComparator extends AbstractComparator {
+
+ private boolean _isStripAccents;
+ private boolean _isCaseSensitive;
+ private Pattern _pattern;
+
+ public StringComparator(String name, int sortOrder, StringSortAttributes attributes) {
+ super(name, sortOrder, attributes);
+ _isStripAccents = attributes.isStripAccents();
+ _isCaseSensitive = attributes.isCaseSensitive();
+ _pattern = Pattern.compile("\\p{InCombiningDiacriticalMarks}+");
+ }
+
+ protected int extendedCompare(String a, String b) {
+
+ if( _isStripAccents ) {
+ a = stripAccents(a);
+ b = stripAccents(b);
+ }
+ return _isCaseSensitive ? a.compareTo(b) : a.compareToIgnoreCase(b);
+ }
+
+ /**
+ * Removes diacritics (~= accents) from a string. The case will not be altered.
+ * For instance, 'à' will be replaced by 'a'.
+ * Note that ligatures will be left as is.
+ *
+ *
+ * stripAccents(null) = null
+ * stripAccents("") = ""
+ * stripAccents("control") = "control"
+ * stripAccents("éclair") = "eclair"
+ *
+ * This function is a modified version of stripAccents in
+ * org.apache.commons.lang3.StringUtils
+ *
+ * @param input String to be stripped
+ * @return input text with diacritics removed
+ */
+ private String stripAccents(String input) {
+ String decomposed = Normalizer.normalize(input, Normalizer.Form.NFD);
+ return _pattern.matcher(decomposed).replaceAll("");
+ }
+}
diff --git a/src/main/java/com/sforce/dataset/DatasetUtilMain.java b/src/main/java/com/sforce/dataset/DatasetUtilMain.java
index d7351a7..803de88 100644
--- a/src/main/java/com/sforce/dataset/DatasetUtilMain.java
+++ b/src/main/java/com/sforce/dataset/DatasetUtilMain.java
@@ -55,6 +55,7 @@
import com.sforce.soap.partner.PartnerConnection;
import com.sforce.ws.ConnectionException;
+@SuppressWarnings("deprecation")
public class DatasetUtilMain {
public static final String defaultEndpoint = "https://login.salesforce.com/services/Soap/u/31.0";
@@ -625,6 +626,7 @@ public static void printUsage()
System.out.println("--rowLimit: (Optional) the number of rows to extract, -1=all, deafult=1000");
System.out.println("--sessionId : (Optional) the salesforce sessionId. if specified,specify endpoint");
System.out.println("--fileEncoding : (Optional) the encoding of the inputFile default UTF-8");
+ System.out.println("--uploadFormat : (Optional) the whether to upload as binary or csv. default binary");
// System.out.println("jsonConfig: (Optional) the dataflow definition json file");
System.out.println("*******************************************************************************\n");
System.out.println("Usage Example 1: Upload a csv to a dataset");
diff --git a/src/main/java/com/sforce/dataset/flow/DataFlowUtil.java b/src/main/java/com/sforce/dataset/flow/DataFlowUtil.java
index e6509e3..2d41b26 100644
--- a/src/main/java/com/sforce/dataset/flow/DataFlowUtil.java
+++ b/src/main/java/com/sforce/dataset/flow/DataFlowUtil.java
@@ -309,19 +309,19 @@ public static void startDataFlow(PartnerConnection partnerConnection, DataFlow d
URI patchURI = new URI(u.getScheme(),u.getUserInfo(), u.getHost(), u.getPort(), df._url.replace("json", "start"), null,null);
- HttpPut httpPatch = new HttpPut(patchURI);
+ HttpPut httput = new HttpPut(patchURI);
// httpPatch.addHeader("Accept", "*/*");
// httpPatch.addHeader("Content-Type", "application/json");
- Map map = new LinkedHashMap();
- map.put("_uid", df._uid);
- ObjectMapper mapper = new ObjectMapper();
- StringEntity entity = new StringEntity(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(map), "UTF-8");
- entity.setContentType("application/json");
- httpPatch.setConfig(requestConfig);
- httpPatch.setEntity(entity);
- httpPatch.addHeader("Authorization","OAuth "+sessionID);
- CloseableHttpResponse emresponse = httpClient.execute(httpPatch);
+// Map map = new LinkedHashMap();
+// map.put("_uid", df._uid);
+// ObjectMapper mapper = new ObjectMapper();
+// StringEntity entity = new StringEntity(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(map), "UTF-8");
+// entity.setContentType("application/json");
+ httput.setConfig(requestConfig);
+// httpPatch.setEntity(entity);
+ httput.addHeader("Authorization","OAuth "+sessionID);
+ CloseableHttpResponse emresponse = httpClient.execute(httput);
String reasonPhrase = emresponse.getStatusLine().getReasonPhrase();
int statusCode = emresponse.getStatusLine().getStatusCode();
if (statusCode != HttpStatus.SC_OK) {
diff --git a/src/main/java/com/sforce/dataset/loader/DatasetLoader.java b/src/main/java/com/sforce/dataset/loader/DatasetLoader.java
index d8ce6e8..d46801d 100644
--- a/src/main/java/com/sforce/dataset/loader/DatasetLoader.java
+++ b/src/main/java/com/sforce/dataset/loader/DatasetLoader.java
@@ -41,8 +41,7 @@
import java.nio.charset.MalformedInputException;
import java.text.NumberFormat;
import java.util.Arrays;
-//import java.util.HashMap;
-//import java.util.HashSet;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
@@ -64,11 +63,6 @@
import org.supercsv.io.CsvListReader;
import org.supercsv.prefs.CsvPreference;
-
-
-
-
-//import com.csvreader.CsvReader;
import com.sforce.async.AsyncApiException;
import com.sforce.async.BatchInfo;
import com.sforce.async.BatchStateEnum;
@@ -80,6 +74,7 @@
import com.sforce.async.OperationEnum;
import com.sforce.dataset.loader.file.schema.ExternalFileSchema;
import com.sforce.dataset.loader.file.schema.FieldType;
+import com.sforce.dataset.loader.file.sort.CsvExternalSort;
import com.sforce.dataset.util.DatasetUtils;
import com.sforce.dataset.util.SfdcUtils;
import com.sforce.soap.partner.PartnerConnection;
@@ -92,7 +87,6 @@
public class DatasetLoader {
-// private static final int DEFAULT_BUFFER_SIZE = 10*1024;
private static final int DEFAULT_BUFFER_SIZE = 8*1024*1024;
private static final int EOF = -1;
private static final char LF = '\n';
@@ -100,35 +94,16 @@ public class DatasetLoader {
private static final char QUOTE = '"';
private static final char COMMA = ',';
-// String username = null;
-// String password = null;
-// String endpoint = null;
-// String token = null;
-// String sessionId = null;
private static final String[] filePartsHdr = {"InsightsExternalDataId","PartNumber","DataFile"};
private static final Pattern validChars = Pattern.compile("^[A-Za-z]+[A-Za-z\\d_]*$");
public static final NumberFormat nf = NumberFormat.getIntegerInstance();
+ private static int MAX_NUM_UPLOAD_THREADS = 3;
-// public DatasetLoader()
-// {
-// super();
-// }
-
- /*
- public DatasetLoader(String username,String password, String token, String endpoint, String sessionId) throws Exception
- {
- super();
- this.username = username;
- this.password = password;
- this.token = token;
- this.endpoint = endpoint;
- this.sessionId = sessionId;
- }
- */
+ @SuppressWarnings("deprecation")
public static boolean uploadDataset(String inputFileString,
String uploadFormat, CodingErrorAction codingErrorAction,
Charset inputFileCharset, String datasetAlias,
@@ -142,8 +117,8 @@ public static boolean uploadDataset(String inputFileString,
long digestTime = 0L;
long uploadTime = 0L;
boolean updateHdrJson = false;
- //we only want a small capacity otherwise the reader thread will runaway and the writer thread will become slower
- BlockingQueue q = new LinkedBlockingQueue(3);
+ //we only want a small capacity otherwise the reader thread will runaway
+ BlockingQueue q = new LinkedBlockingQueue(10);
if(uploadFormat==null||uploadFormat.trim().isEmpty())
@@ -165,14 +140,12 @@ public static boolean uploadDataset(String inputFileString,
System.out.println("\n*******************************************************************************");
if(FilenameUtils.getExtension(inputFile.getName()).equalsIgnoreCase("csv"))
{
-// System.out.println("Detecting schema from csv file {"+ inputFile +"} ...");
schema = ExternalFileSchema.init(inputFile, inputFileCharset);
if(schema==null)
{
System.err.println("Failed to parse schema file {"+ ExternalFileSchema.getSchemaFile(inputFile) +"}");
return false;
}
-// System.out.println("Schema file {"+ ExternalFileSchema.getSchemaFile(inputFile) +"} successfully generated...");
}else
{
schema = ExternalFileSchema.load(inputFile, inputFileCharset);
@@ -181,7 +154,6 @@ public static boolean uploadDataset(String inputFileString,
System.err.println("Failed to load schema file {"+ ExternalFileSchema.getSchemaFile(inputFile) +"}");
return false;
}
-// System.out.println("Schema file {"+ ExternalFileSchema.getSchemaFile(inputFile) +"} successfully loaded...");
}
System.out.println("*******************************************************************************\n");
@@ -225,17 +197,17 @@ public static boolean uploadDataset(String inputFileString,
}
//Insert header
- File metadataJson = ExternalFileSchema.getSchemaFile(inputFile);
- if(metadataJson == null || !metadataJson.canRead())
+ File metadataJsonFile = ExternalFileSchema.getSchemaFile(inputFile);
+ if(metadataJsonFile == null || !metadataJsonFile.canRead())
{
- System.err.println("Error: metadata Json file {"+metadataJson+"} not found");
+ System.err.println("Error: metadata Json file {"+metadataJsonFile+"} not found");
return false;
}
String hdrId = getLastIncompleteFileHdr(partnerConnection, datasetAlias);
if(hdrId==null)
{
- hdrId = insertFileHdr(partnerConnection, datasetAlias,datasetFolder, FileUtils.readFileToByteArray(metadataJson), uploadFormat, Operation);
+ hdrId = insertFileHdr(partnerConnection, datasetAlias,datasetFolder, FileUtils.readFileToByteArray(metadataJsonFile), uploadFormat, Operation);
}else
{
System.out.println("Record {"+hdrId+"} is being reused from InsightsExternalData");
@@ -247,6 +219,7 @@ public static boolean uploadDataset(String inputFileString,
return false;
}
+ inputFile = CsvExternalSort.sortFile(inputFile, inputFileCharset, false, 1);
//Create the Bin file
// File binFile = new File(csvFile.getParent(), datasetName + ".bin");
@@ -266,18 +239,19 @@ public static boolean uploadDataset(String inputFileString,
{
if(uploadFormat.equalsIgnoreCase("binary") && FilenameUtils.getExtension(inputFile.getName()).equalsIgnoreCase("csv"))
{
- GzipCompressorOutputStream out = null;
+ FileOutputStream fos = null;
+ BufferedOutputStream out = null;
BufferedOutputStream bos = null;
+ GzipCompressorOutputStream gzos = null;
try
{
gzbinFile = new File(inputFile.getParent(), hdrId + "." + FilenameUtils.getBaseName(inputFile.getName()) + ".gz");
GzipParameters gzipParams = new GzipParameters();
gzipParams.setFilename(FilenameUtils.getBaseName(inputFile.getName()) + ".bin");
- bos = new BufferedOutputStream(new FileOutputStream(gzbinFile),DEFAULT_BUFFER_SIZE);
- out = new GzipCompressorOutputStream(bos,gzipParams);
-// BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(gzbinFile));
-// CSVReader reader = new CSVReader(new InputStreamReader(new FileInputStream(inputFile), "UTF-8"));
-// String[] header = reader.readNext();
+ fos = new FileOutputStream(gzbinFile);
+ bos = new BufferedOutputStream(fos,DEFAULT_BUFFER_SIZE);
+ gzos = new GzipCompressorOutputStream(bos,gzipParams);
+ out = new BufferedOutputStream(gzos,DEFAULT_BUFFER_SIZE);
long totalRowCount = 0;
long successRowCount = 0;
long errorRowCount = 0;
@@ -285,9 +259,6 @@ public static boolean uploadDataset(String inputFileString,
EbinFormatWriter w = new EbinFormatWriter(out, schema.objects.get(0).fields.toArray(new FieldType[0]));
ErrorWriter ew = new ErrorWriter(inputFile,",");
-// System.out.println("CodingErrorAction: "+codingErrorAction);
-
-// CsvReader reader = new CsvReader(new InputStreamReader(new BOMInputStream(new FileInputStream(inputFile), false), DatasetUtils.utf8Decoder(codingErrorAction, inputFileCharset)));
CsvListReader reader = new CsvListReader(new InputStreamReader(new BOMInputStream(new FileInputStream(inputFile), false), DatasetUtils.utf8Decoder(codingErrorAction , inputFileCharset )), CsvPreference.STANDARD_PREFERENCE);
WriterThread writer = new WriterThread(q, w, ew);
Thread th = new Thread(writer,"Writer-Thread");
@@ -296,13 +267,8 @@ public static boolean uploadDataset(String inputFileString,
try
{
-// if(reader.readHeaders())
-// {
@SuppressWarnings("unused")
-// String[] header = reader.getHeaders();
String[] header = reader.getHeader(true);
- // String[] nextLine;
- // while ((nextLine = reader.readNext()) != null) {
boolean hasmore = true;
System.out.println("\n*******************************************************************************");
System.out.println("File: "+inputFile+", being digested to file: "+gzbinFile);
@@ -319,8 +285,6 @@ public static boolean uploadDataset(String inputFileString,
if(row.size()!=0 )
{
q.put(row.toArray(new String[row.size()]));
-// w.addrow(row.toArray(new String[row.size()]));
-// successRowCount = w.getNrows();
}
}else
{
@@ -335,50 +299,83 @@ public static boolean uploadDataset(String inputFileString,
System.err.println("Row {"+totalRowCount+"} has error {"+t+"}");
if(t instanceof MalformedInputException)
{
+ while(!q.isEmpty())
+ {
+ try
+ {
+ Thread.sleep(1000);
+ }catch(InterruptedException in)
+ {
+ in.printStackTrace();
+ }
+ }
+
while(!writer.isDone())
{
q.put(new String[0]);
- Thread.sleep(1000);
+ try
+ {
+ Thread.sleep(1000);
+ }catch(InterruptedException in)
+ {
+ in.printStackTrace();
+ }
}
System.err.println("\n*******************************************************************************");
System.err.println("The input file is not utf8 encoded. Please save it as UTF8 file first");
System.err.println("*******************************************************************************\n");
status = false;
hasmore = false;
-// }else
-// {
-//
-// System.err.println("\n*******************************************************************************");
-// System.err.println("t");
-// System.err.println("*******************************************************************************\n");
-// status = false;
-// hasmore = false;
-// if(row!=null)
-// {
-// ew.addError(row.toArray(new String[row.size()]), t.getMessage());
-// errorRowCount++;
-// }
}
- //t.printStackTrace();
}
}//end while
+ while(!q.isEmpty())
+ {
+ try
+ {
+ System.out.println("1 Waiting for writer to finish");
+ Thread.sleep(1000);
+ }catch(InterruptedException in)
+ {
+ in.printStackTrace();
+ }
+ }
+
while(!writer.isDone())
{
q.put(new String[0]);
- Thread.sleep(1000);
+ try
+ {
+ System.out.println("2 Waiting for writer to finish");
+ Thread.sleep(1000);
+ }catch(InterruptedException in)
+ {
+ in.printStackTrace();
+ }
}
// }
- successRowCount = w.getSuccessRowCount();
- errorRowCount = writer.getErrorRowCount();
+ successRowCount = w.getSuccessRowCount();
+ errorRowCount = writer.getErrorRowCount();
}finally
{
- reader.close();
- w.finish();
- ew.finish();
- out.close();
- bos.close();
+ if(reader!=null)
+ reader.close();
+ if(w!=null)
+ w.finish();
+ if(ew!=null)
+ ew.finish();
+ if(out!=null)
+ out.close();
+ if(gzos!=null)
+ gzos.close();
+ if(bos!=null)
+ bos.close();
+ if(fos!=null)
+ fos.close();
out = null;
+ gzos = null;
bos = null;
+ fos = null;
}
long endTime = System.currentTimeMillis();
digestTime = endTime-startTime;
@@ -391,7 +388,7 @@ public static boolean uploadDataset(String inputFileString,
return false;
}
System.out.println("\n*******************************************************************************");
- System.out.println("Total Rows: "+totalRowCount+", Success Rows: "+successRowCount+", Eror Rows: "+errorRowCount);
+ System.out.println("Total Rows: "+nf.format(totalRowCount)+", Success Rows: "+nf.format(successRowCount)+", Eror Rows: "+nf.format(errorRowCount));
if(gzbinFile.length()>0)
System.out.println("File: "+inputFile+", Size {"+nf.format(inputFile.length())+"} compressed to file: "+gzbinFile+", Size {"+nf.format(gzbinFile.length())+"} % Compression: "+(inputFile.length()/gzbinFile.length())*100 +"%"+", Digest Time {"+nf.format(digestTime) + "} msecs");
System.out.println("*******************************************************************************\n");
@@ -403,6 +400,13 @@ public static boolean uploadDataset(String inputFileString,
} catch (IOException e) {
}
}
+ if (gzos != null) {
+ try {
+ gzos.close();
+ gzos = null;
+ } catch (IOException e) {
+ }
+ }
if (bos != null) {
try {
bos.close();
@@ -410,6 +414,13 @@ public static boolean uploadDataset(String inputFileString,
} catch (IOException e) {
}
}
+ if (fos != null) {
+ try {
+ fos.close();
+ fos = null;
+ } catch (IOException e) {
+ }
+ }
}
}else if(!FilenameUtils.getExtension(inputFile.getName()).equalsIgnoreCase("zip") && !FilenameUtils.getExtension(inputFile.getName()).equalsIgnoreCase("gz"))
{
@@ -426,7 +437,7 @@ public static boolean uploadDataset(String inputFileString,
IOUtils.copy(fis, gzOut);
long endTime = System.currentTimeMillis();
if(gzbinFile.length()>0)
- System.out.println("File: "+inputFile+", Size {"+nf.format(inputFile.length())+"} compressed to file: "+gzbinFile+", Size {"+nf.format(gzbinFile.length())+"} % Compression: "+(inputFile.length()/gzbinFile.length())*100 +"%"+", Compression Time {"+nf.format((endTime-startTime)) + "} msecs");
+ System.out.println(" Input File, Size {"+nf.format(inputFile.length())+"} compressed to gz file, Size {"+nf.format(gzbinFile.length())+"} % Compression: "+(inputFile.length()/gzbinFile.length())*100 +"%"+", Compression Time {"+nf.format((endTime-startTime)) + "} msecs");
}finally
{
if(gzOut!=null)
@@ -473,15 +484,10 @@ public static boolean uploadDataset(String inputFileString,
gzbinFile = lastgzbinFile;
}
- //Upload the file
-// if(useSoapAPI)
long startTime = System.currentTimeMillis();
- status = uploadEM(gzbinFile, uploadFormat, ExternalFileSchema.getSchemaFile(inputFile), datasetAlias,datasetFolder, useBulkAPI, partnerConnection, hdrId, datasetArchiveDir, "Overwrite", updateHdrJson);
+ status = uploadEM(gzbinFile, uploadFormat, metadataJsonFile, datasetAlias,datasetFolder, useBulkAPI, partnerConnection, hdrId, datasetArchiveDir, "Overwrite", updateHdrJson);
long endTime = System.currentTimeMillis();
uploadTime = endTime-startTime;
-
-// else
-// status = DatasetUploader.uploadEM(gzbinFile, datasetAlias, username, password,endpoint,token, format);
} catch(MalformedInputException mie)
{
@@ -546,6 +552,9 @@ public static boolean uploadEM(File dataFile, String dataFormat, File metadataJs
*/
public static boolean uploadEM(File dataFile, String dataFormat, byte[] metadataJsonBytes, String datasetAlias,String datasetFolder, boolean useBulk, PartnerConnection partnerConnection, String hdrId, File datasetArchiveDir, String Operation, boolean updateHdrJson) throws Exception
{
+ BlockingQueue