From 638755494a44b10f26906ea5d45e947852cfa803 Mon Sep 17 00:00:00 2001 From: Puneet Gupta Date: Mon, 8 Dec 2014 16:51:22 -0800 Subject: [PATCH] Lots of new features and bug fixes Added computed fields fixed issue in start workflow Added multi thread file part uploader Added buffered writer for bin file --- .gitignore | 1 + README.md | 9 +- pom.xml | 6 + .../attributes/DateTimeSortAttributes.java | 29 ++ .../attributes/DecimalSortAttributes.java | 33 ++ .../comparator/attributes/SortAttributes.java | 42 ++ .../attributes/StringSortAttributes.java | 31 ++ .../comparator/column/AbstractComparator.java | 97 +++++ .../comparator/column/BooleanComparator.java | 32 ++ .../comparator/column/DateTimeComparator.java | 55 +++ .../comparator/column/DecimalComparator.java | 26 ++ .../comparator/column/IColumnComparator.java | 8 + .../comparator/column/IntegerComparator.java | 19 + .../comparator/column/StringComparator.java | 51 +++ .../com/sforce/dataset/DatasetUtilMain.java | 2 + .../com/sforce/dataset/flow/DataFlowUtil.java | 20 +- .../sforce/dataset/loader/DatasetLoader.java | 303 +++++++++----- .../dataset/loader/EbinFormatWriter.java | 306 ++++++++------ .../loader/FilePartsUploaderThread.java | 142 +++++++ .../loader/file/schema/DetectFieldTypes.java | 5 + .../file/schema/ExternalFileSchema.java | 32 ++ .../dataset/loader/file/schema/FieldType.java | 133 +++--- .../file/sort/CsvColumnComparatorFactory.java | 74 ++++ .../loader/file/sort/CsvExternalSort.java | 387 ++++++++++++++++++ .../loader/file/sort/CsvRowComparator.java | 148 +++++++ .../sforce/dataset/util/SfdcExtracter.java | 4 +- 26 files changed, 1710 insertions(+), 285 deletions(-) create mode 100644 src/main/java/com/foundations/comparator/attributes/DateTimeSortAttributes.java create mode 100644 src/main/java/com/foundations/comparator/attributes/DecimalSortAttributes.java create mode 100644 src/main/java/com/foundations/comparator/attributes/SortAttributes.java create mode 100644 src/main/java/com/foundations/comparator/attributes/StringSortAttributes.java create mode 100644 src/main/java/com/foundations/comparator/column/AbstractComparator.java create mode 100644 src/main/java/com/foundations/comparator/column/BooleanComparator.java create mode 100644 src/main/java/com/foundations/comparator/column/DateTimeComparator.java create mode 100644 src/main/java/com/foundations/comparator/column/DecimalComparator.java create mode 100644 src/main/java/com/foundations/comparator/column/IColumnComparator.java create mode 100644 src/main/java/com/foundations/comparator/column/IntegerComparator.java create mode 100644 src/main/java/com/foundations/comparator/column/StringComparator.java create mode 100644 src/main/java/com/sforce/dataset/loader/FilePartsUploaderThread.java create mode 100644 src/main/java/com/sforce/dataset/loader/file/sort/CsvColumnComparatorFactory.java create mode 100644 src/main/java/com/sforce/dataset/loader/file/sort/CsvExternalSort.java create mode 100644 src/main/java/com/sforce/dataset/loader/file/sort/CsvRowComparator.java diff --git a/.gitignore b/.gitignore index ee91892..0adfd68 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ ## build output target/ local-proj-repo/ +SalesEdgeEltWorkflow/ ## eclipse files .project diff --git a/README.md b/README.md index e6e5822..93e25e7 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ Just download the latest jar from release section (see link at the top) and foll Input Parameter ---action :“load” OR “augment” OR “downloadxmd” OR “uploadxmd” OR "detectEncoding". Use load for loading csv, augment for augmenting existing datasets, downloadxmd to download existing xmd files, uploadxmd for uploading user.xmd.json, “extract” for extracting data from salesforce, "detectEncoding" to detect the encoding of the inputFile. +--action :"load" OR "augment" OR "downloadxmd" OR "uploadxmd" OR "detectEncoding". Use load for loading csv, augment for augmenting existing datasets, downloadxmd to download existing xmd files, uploadxmd for uploading user.xmd.json, "extract" for extracting data from salesforce, "detectEncoding" to detect the encoding of the inputFile. --u : Salesforce.com login @@ -18,7 +18,7 @@ Input Parameter --token : (Optional) Salesforce.com token ---endpoint: (Optional) The salesforce soap api endpoint (test/prod) Default: https://login.salesforce.com/services/Soap/u/31.0 +--endpoint: (Optional) The Salesforce soap api endpoint (test/prod) Default: https://login.salesforce.com/services/Soap/u/31.0 --dataset : (Optional) the dataset alias. required if action=load @@ -30,12 +30,15 @@ Input Parameter --rowLimit: (Optional) the number of rows to extract, -1=all, default=1000 ---sessionId : (Optional) the salesforce sessionId. if specified,specify endpoint +--sessionId : (Optional) the Salesforce sessionId. if specified,specify endpoint --fileEncoding : (Optional) the encoding of the inputFile default UTF-8 --CodingErrorAction:(optional) What to do in case input characters are not UTF8: IGNORE|REPORT|REPLACE. Default REPORT. If you change this option you risk importing garbage characters +--uploadFormat : (Optional) the whether to upload as binary or csv. default binary"); + + ## Usage Example 1: Only Generate the schema file from CSV java -jar datasetutils-32.0.0.jar --action load --inputFile Opportunity.csv diff --git a/pom.xml b/pom.xml index 92cf6f4..ffd0db8 100644 --- a/pom.xml +++ b/pom.xml @@ -101,6 +101,11 @@ 4.11 test + + com.google.code.externalsortinginjava + externalsortinginjava + 0.1.9 + clean install @@ -206,6 +211,7 @@
license.txt
src/test/resources/codegeneration/*.java + src/main/java/com/foundations/**/*.java **/*.html **/*.txt **/*.xml diff --git a/src/main/java/com/foundations/comparator/attributes/DateTimeSortAttributes.java b/src/main/java/com/foundations/comparator/attributes/DateTimeSortAttributes.java new file mode 100644 index 0000000..bbb784c --- /dev/null +++ b/src/main/java/com/foundations/comparator/attributes/DateTimeSortAttributes.java @@ -0,0 +1,29 @@ +package com.foundations.comparator.attributes; + +public final class DateTimeSortAttributes extends SortAttributes { + + public static final String DEFAULT_PATTERN = "yyyy-MM-dd HH:mm:ss"; + + private String _pattern; + + public DateTimeSortAttributes() { + _pattern = DEFAULT_PATTERN; + } + + /** + * The date and time pattern used for the column to be sorted. + * + * Pattern syntax is based on java.text.SimpleDateFormat + * class documentation + */ + public void setPattern(String value) { + _pattern = value; + } + + /** + * Returns the date and time pattern for the column to be sorted. + */ + public String getPattern() { + return _pattern; + } +} diff --git a/src/main/java/com/foundations/comparator/attributes/DecimalSortAttributes.java b/src/main/java/com/foundations/comparator/attributes/DecimalSortAttributes.java new file mode 100644 index 0000000..2d9a8c4 --- /dev/null +++ b/src/main/java/com/foundations/comparator/attributes/DecimalSortAttributes.java @@ -0,0 +1,33 @@ +package com.foundations.comparator.attributes; + +import java.math.RoundingMode; + +public final class DecimalSortAttributes extends SortAttributes { + + public static final int DEFAULT_SCALE = 2; + public static final RoundingMode DEFAULT_ROUNDING_MODE = RoundingMode.HALF_EVEN; + + private int _scale; + private RoundingMode _roundingMode; + + public DecimalSortAttributes() { + _scale = DEFAULT_SCALE; + _roundingMode = DEFAULT_ROUNDING_MODE; + } + + public void setScale(int value) { + _scale = value; + } + + public int getScale() { + return _scale; + } + + public void setRoundingMode(RoundingMode value) { + _roundingMode = value; + } + + public RoundingMode getRoundingMode() { + return _roundingMode; + } + } diff --git a/src/main/java/com/foundations/comparator/attributes/SortAttributes.java b/src/main/java/com/foundations/comparator/attributes/SortAttributes.java new file mode 100644 index 0000000..ced569b --- /dev/null +++ b/src/main/java/com/foundations/comparator/attributes/SortAttributes.java @@ -0,0 +1,42 @@ +package com.foundations.comparator.attributes; + +public class SortAttributes { + + public static final boolean DEFAULT_ASCENDING_ORDER = true; + public static final boolean DEFAULT_NULL_LOW_SORT_ORDER = true; + public static final boolean DEFAULT_TRIM = false; + + private boolean _ascendingOrder; + private boolean _trim; + private boolean _nullLowSortOrder; + + public SortAttributes() { + _ascendingOrder = DEFAULT_ASCENDING_ORDER; + _trim = DEFAULT_TRIM; + _nullLowSortOrder = DEFAULT_NULL_LOW_SORT_ORDER; + } + + public void setAscendingOrder(boolean value) { + _ascendingOrder = value; + } + + public boolean isAscendingOrder() { + return _ascendingOrder; + } + + public void setTrim(boolean value) { + _trim = value; + } + + public boolean isTrim() { + return _trim; + } + + public void setNullLowSortOrder(boolean value) { + _nullLowSortOrder = value; + } + + public boolean isNullLowSortOrder() { + return _nullLowSortOrder; + } +} diff --git a/src/main/java/com/foundations/comparator/attributes/StringSortAttributes.java b/src/main/java/com/foundations/comparator/attributes/StringSortAttributes.java new file mode 100644 index 0000000..f6f0d3b --- /dev/null +++ b/src/main/java/com/foundations/comparator/attributes/StringSortAttributes.java @@ -0,0 +1,31 @@ +package com.foundations.comparator.attributes; + +public final class StringSortAttributes extends SortAttributes { + + public static final boolean DEFAULT_CASE_SENSITIVE = true; + public static final boolean DEFAULT_STRIP_ACCENTS = false; + + private boolean _caseSensitive; + private boolean _stripAccents; + + public StringSortAttributes() { + _caseSensitive = DEFAULT_CASE_SENSITIVE; + _stripAccents = DEFAULT_STRIP_ACCENTS; + } + + public void setCaseSensitive(boolean value) { + _caseSensitive = value; + } + + public boolean isCaseSensitive() { + return _caseSensitive; + } + + public void setStripAccents(boolean value) { + _stripAccents = value; + } + + public boolean isStripAccents() { + return _stripAccents; + } +} diff --git a/src/main/java/com/foundations/comparator/column/AbstractComparator.java b/src/main/java/com/foundations/comparator/column/AbstractComparator.java new file mode 100644 index 0000000..a2db25e --- /dev/null +++ b/src/main/java/com/foundations/comparator/column/AbstractComparator.java @@ -0,0 +1,97 @@ +package com.foundations.comparator.column; + +import com.foundations.comparator.attributes.SortAttributes; + +public abstract class AbstractComparator implements IColumnComparator { + private String _name; + private int _sortOrder; + private SortAttributes _sortAttributes; + + public AbstractComparator(String name, int sortOrder, SortAttributes sortAttributes) { + _name = name; + _sortOrder = sortOrder; + _sortAttributes = sortAttributes; + } + + public String getName() { + return _name; + } + + public int getSortOrder() { + return _sortOrder; + } + + public SortAttributes getSortAttributes() { + return _sortAttributes; + } + + /** + * Returns a result of zero, greater than one, or less than one + * depending on the comparison of the two supplied Strings

+ * + * A return value of zero indicates the two Strings are equal + * A return value greater than one indicates String a is bigger than String b + * A return value less than one indicates String a is less than String b + * + * The first step in comparing the Strings involves swapping them if they are not + * already in ascending order. + * + * Next, any required trimming is performed if the Trim attribute has been set. + * The Strings are then normalized, ensuring zero-length Strings are treated as + * nulls. + * + * If both Strings turn out to be null after normalization, zero is returned. + * If one of the two Strings is null, the compare will consider the NullLowSortOrder + * attribute to determine the final result. + * + * If both Strings are not null, sub-classes must determine the final result + * of the compare by returning the value from a call to abstract method + * extendedCompare. + */ + public int compare(String a, String b) { + int result = 0; + String stringA = normalizeString((_sortAttributes.isAscendingOrder() ? a : b)); + String stringB = normalizeString((_sortAttributes.isAscendingOrder() ? b : a)); + + if( stringA != null && stringB != null ) { + result = extendedCompare(stringA, stringB); + } + else if( stringA == null && stringB == null ) { + result = 0; + } + else if ( stringA == null ) { + result = _sortAttributes.isNullLowSortOrder() ? -1 : 1; + } + else { + result = _sortAttributes.isNullLowSortOrder() ? 1 : -1; + } + return result; + } + + /** + * Normalize the String for sorting

+ * + * Normalizing involves transforming the original value so + * that zero length Strings are treated as nulls. It also + * involves stripping trailing and leading spaces from the + * original, provided the isTrim attribute is set. + * + * @param original the String to be normalized + * @return the normalized text + */ + private String normalizeString(String original) { + String result = null; + + if( original != null ) { + if( _sortAttributes.isTrim() ) { + original = original.trim(); + } + if( original.length() > 0 ) { + result = original; + } + } + return result; + } + + protected abstract int extendedCompare(String a, String b); +} diff --git a/src/main/java/com/foundations/comparator/column/BooleanComparator.java b/src/main/java/com/foundations/comparator/column/BooleanComparator.java new file mode 100644 index 0000000..d0f6fef --- /dev/null +++ b/src/main/java/com/foundations/comparator/column/BooleanComparator.java @@ -0,0 +1,32 @@ +package com.foundations.comparator.column; + +import com.foundations.comparator.attributes.SortAttributes; + +public class BooleanComparator extends AbstractComparator { + + public BooleanComparator(String name, int sortOrder, SortAttributes attributes) { + super(name, sortOrder, attributes); + } + + protected int extendedCompare(String a, String b) { + Boolean aValue = new Boolean(parse(a)); + Boolean bValue = new Boolean(parse(b)); + + return aValue.compareTo(bValue); + } + + private boolean parse(String value) { + boolean result = false; + + if ( value.toLowerCase().equals("true") || value.equals("1") ) { + result = true; + } + else if ( value.toLowerCase().equals("false") || value.equals("0") ) { + result = false; + } + else { + throw new RuntimeException( "Boolean Parse Exception: " + value); + } + return result; + } +} diff --git a/src/main/java/com/foundations/comparator/column/DateTimeComparator.java b/src/main/java/com/foundations/comparator/column/DateTimeComparator.java new file mode 100644 index 0000000..b4cf56e --- /dev/null +++ b/src/main/java/com/foundations/comparator/column/DateTimeComparator.java @@ -0,0 +1,55 @@ +package com.foundations.comparator.column; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; + +import com.foundations.comparator.attributes.DateTimeSortAttributes; + +public final class DateTimeComparator extends AbstractComparator { + + private SimpleDateFormat _formatter; + + public DateTimeComparator(String name, int sortOrder, DateTimeSortAttributes sortAttributes) { + super(name, sortOrder, sortAttributes); + _formatter = new SimpleDateFormat(sortAttributes.getPattern()); + } + + protected int extendedCompare(String a, String b) { + int result; + + try { + Date aValue = _formatter.parse(a); + Date bValue = _formatter.parse(b); + result = aValue.compareTo(bValue); + } + catch (ParseException e) { + throw new RuntimeException("Parse Exception: " + e.getMessage()); + } + + return result; + } +} + + +////////////////////////// USE FOLLOWING CODE FOR JAVA 8 /////// +// import java.time.LocalDateTime; +// import java.time.format.DateTimeFormatter; +// +// public final class DateTimeComparator extends AbstractComparator { +// +// private DateTimeFormatter _formatter; +// +// public DateTimeComparator(String name, int sortOrder, DateTimeSortAttributes sortAttributes) { +// super(name, sortOrder, sortAttributes); +// _formatter = DateTimeFormatter.ofPattern(sortAttributes.getPattern()); +// } +// +// protected int extendedCompare(String a, String b) { +// LocalDateTime aValue = LocalDateTime.parse(a, _formatter); +// LocalDateTime bValue = LocalDateTime.parse(b, _formatter); +// +// return aValue.compareTo(bValue); +// } +// } +//////////////////////////USE FOLLOWING CODE FOR JAVA 8 /////// diff --git a/src/main/java/com/foundations/comparator/column/DecimalComparator.java b/src/main/java/com/foundations/comparator/column/DecimalComparator.java new file mode 100644 index 0000000..0b6db95 --- /dev/null +++ b/src/main/java/com/foundations/comparator/column/DecimalComparator.java @@ -0,0 +1,26 @@ +package com.foundations.comparator.column; + +import java.math.BigDecimal; +import java.math.RoundingMode; + +import com.foundations.comparator.attributes.DecimalSortAttributes; + +public final class DecimalComparator extends AbstractComparator { + + private int _scale; + private RoundingMode _roundingMode; + + public DecimalComparator(String name, int sortOrder, DecimalSortAttributes attributes) { + super(name, sortOrder, attributes); + + _scale = attributes.getScale(); + _roundingMode = attributes.getRoundingMode(); + } + + protected int extendedCompare(String a, String b) { + BigDecimal aValue = new BigDecimal(a).setScale(_scale, _roundingMode); + BigDecimal bValue = new BigDecimal(b).setScale(_scale, _roundingMode); + + return aValue.compareTo(bValue); + } +} diff --git a/src/main/java/com/foundations/comparator/column/IColumnComparator.java b/src/main/java/com/foundations/comparator/column/IColumnComparator.java new file mode 100644 index 0000000..5e1b2fe --- /dev/null +++ b/src/main/java/com/foundations/comparator/column/IColumnComparator.java @@ -0,0 +1,8 @@ +package com.foundations.comparator.column; + +import java.util.Comparator; + +public interface IColumnComparator extends Comparator { + + public int getSortOrder(); +} diff --git a/src/main/java/com/foundations/comparator/column/IntegerComparator.java b/src/main/java/com/foundations/comparator/column/IntegerComparator.java new file mode 100644 index 0000000..7f656ab --- /dev/null +++ b/src/main/java/com/foundations/comparator/column/IntegerComparator.java @@ -0,0 +1,19 @@ +package com.foundations.comparator.column; + +import java.math.BigInteger; + +import com.foundations.comparator.attributes.SortAttributes; + +public final class IntegerComparator extends AbstractComparator { + + public IntegerComparator(String name, int sortOrder, SortAttributes attributes) { + super(name, sortOrder, attributes); + } + + protected int extendedCompare(String a, String b) { + BigInteger aValue = new BigInteger(a); + BigInteger bValue = new BigInteger(b); + + return aValue.compareTo(bValue); + } +} diff --git a/src/main/java/com/foundations/comparator/column/StringComparator.java b/src/main/java/com/foundations/comparator/column/StringComparator.java new file mode 100644 index 0000000..dd99efb --- /dev/null +++ b/src/main/java/com/foundations/comparator/column/StringComparator.java @@ -0,0 +1,51 @@ +package com.foundations.comparator.column; + +import java.text.Normalizer; +import java.util.regex.Pattern; + +import com.foundations.comparator.attributes.StringSortAttributes; + +public final class StringComparator extends AbstractComparator { + + private boolean _isStripAccents; + private boolean _isCaseSensitive; + private Pattern _pattern; + + public StringComparator(String name, int sortOrder, StringSortAttributes attributes) { + super(name, sortOrder, attributes); + _isStripAccents = attributes.isStripAccents(); + _isCaseSensitive = attributes.isCaseSensitive(); + _pattern = Pattern.compile("\\p{InCombiningDiacriticalMarks}+"); + } + + protected int extendedCompare(String a, String b) { + + if( _isStripAccents ) { + a = stripAccents(a); + b = stripAccents(b); + } + return _isCaseSensitive ? a.compareTo(b) : a.compareToIgnoreCase(b); + } + + /** + *

Removes diacritics (~= accents) from a string. The case will not be altered.

+ *

For instance, 'à' will be replaced by 'a'.

+ *

Note that ligatures will be left as is.

+ * + *
+     * stripAccents(null) = null
+     * stripAccents("") = ""
+     * stripAccents("control") = "control"
+     * stripAccents("éclair") = "eclair"
+     * 
+ * This function is a modified version of stripAccents in + * org.apache.commons.lang3.StringUtils

+ * + * @param input String to be stripped + * @return input text with diacritics removed + */ + private String stripAccents(String input) { + String decomposed = Normalizer.normalize(input, Normalizer.Form.NFD); + return _pattern.matcher(decomposed).replaceAll(""); + } +} diff --git a/src/main/java/com/sforce/dataset/DatasetUtilMain.java b/src/main/java/com/sforce/dataset/DatasetUtilMain.java index d7351a7..803de88 100644 --- a/src/main/java/com/sforce/dataset/DatasetUtilMain.java +++ b/src/main/java/com/sforce/dataset/DatasetUtilMain.java @@ -55,6 +55,7 @@ import com.sforce.soap.partner.PartnerConnection; import com.sforce.ws.ConnectionException; +@SuppressWarnings("deprecation") public class DatasetUtilMain { public static final String defaultEndpoint = "https://login.salesforce.com/services/Soap/u/31.0"; @@ -625,6 +626,7 @@ public static void printUsage() System.out.println("--rowLimit: (Optional) the number of rows to extract, -1=all, deafult=1000"); System.out.println("--sessionId : (Optional) the salesforce sessionId. if specified,specify endpoint"); System.out.println("--fileEncoding : (Optional) the encoding of the inputFile default UTF-8"); + System.out.println("--uploadFormat : (Optional) the whether to upload as binary or csv. default binary"); // System.out.println("jsonConfig: (Optional) the dataflow definition json file"); System.out.println("*******************************************************************************\n"); System.out.println("Usage Example 1: Upload a csv to a dataset"); diff --git a/src/main/java/com/sforce/dataset/flow/DataFlowUtil.java b/src/main/java/com/sforce/dataset/flow/DataFlowUtil.java index e6509e3..2d41b26 100644 --- a/src/main/java/com/sforce/dataset/flow/DataFlowUtil.java +++ b/src/main/java/com/sforce/dataset/flow/DataFlowUtil.java @@ -309,19 +309,19 @@ public static void startDataFlow(PartnerConnection partnerConnection, DataFlow d URI patchURI = new URI(u.getScheme(),u.getUserInfo(), u.getHost(), u.getPort(), df._url.replace("json", "start"), null,null); - HttpPut httpPatch = new HttpPut(patchURI); + HttpPut httput = new HttpPut(patchURI); // httpPatch.addHeader("Accept", "*/*"); // httpPatch.addHeader("Content-Type", "application/json"); - Map map = new LinkedHashMap(); - map.put("_uid", df._uid); - ObjectMapper mapper = new ObjectMapper(); - StringEntity entity = new StringEntity(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(map), "UTF-8"); - entity.setContentType("application/json"); - httpPatch.setConfig(requestConfig); - httpPatch.setEntity(entity); - httpPatch.addHeader("Authorization","OAuth "+sessionID); - CloseableHttpResponse emresponse = httpClient.execute(httpPatch); +// Map map = new LinkedHashMap(); +// map.put("_uid", df._uid); +// ObjectMapper mapper = new ObjectMapper(); +// StringEntity entity = new StringEntity(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(map), "UTF-8"); +// entity.setContentType("application/json"); + httput.setConfig(requestConfig); +// httpPatch.setEntity(entity); + httput.addHeader("Authorization","OAuth "+sessionID); + CloseableHttpResponse emresponse = httpClient.execute(httput); String reasonPhrase = emresponse.getStatusLine().getReasonPhrase(); int statusCode = emresponse.getStatusLine().getStatusCode(); if (statusCode != HttpStatus.SC_OK) { diff --git a/src/main/java/com/sforce/dataset/loader/DatasetLoader.java b/src/main/java/com/sforce/dataset/loader/DatasetLoader.java index d8ce6e8..d46801d 100644 --- a/src/main/java/com/sforce/dataset/loader/DatasetLoader.java +++ b/src/main/java/com/sforce/dataset/loader/DatasetLoader.java @@ -41,8 +41,7 @@ import java.nio.charset.MalformedInputException; import java.text.NumberFormat; import java.util.Arrays; -//import java.util.HashMap; -//import java.util.HashSet; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.LinkedList; @@ -64,11 +63,6 @@ import org.supercsv.io.CsvListReader; import org.supercsv.prefs.CsvPreference; - - - - -//import com.csvreader.CsvReader; import com.sforce.async.AsyncApiException; import com.sforce.async.BatchInfo; import com.sforce.async.BatchStateEnum; @@ -80,6 +74,7 @@ import com.sforce.async.OperationEnum; import com.sforce.dataset.loader.file.schema.ExternalFileSchema; import com.sforce.dataset.loader.file.schema.FieldType; +import com.sforce.dataset.loader.file.sort.CsvExternalSort; import com.sforce.dataset.util.DatasetUtils; import com.sforce.dataset.util.SfdcUtils; import com.sforce.soap.partner.PartnerConnection; @@ -92,7 +87,6 @@ public class DatasetLoader { -// private static final int DEFAULT_BUFFER_SIZE = 10*1024; private static final int DEFAULT_BUFFER_SIZE = 8*1024*1024; private static final int EOF = -1; private static final char LF = '\n'; @@ -100,35 +94,16 @@ public class DatasetLoader { private static final char QUOTE = '"'; private static final char COMMA = ','; -// String username = null; -// String password = null; -// String endpoint = null; -// String token = null; -// String sessionId = null; private static final String[] filePartsHdr = {"InsightsExternalDataId","PartNumber","DataFile"}; private static final Pattern validChars = Pattern.compile("^[A-Za-z]+[A-Za-z\\d_]*$"); public static final NumberFormat nf = NumberFormat.getIntegerInstance(); + private static int MAX_NUM_UPLOAD_THREADS = 3; -// public DatasetLoader() -// { -// super(); -// } - - /* - public DatasetLoader(String username,String password, String token, String endpoint, String sessionId) throws Exception - { - super(); - this.username = username; - this.password = password; - this.token = token; - this.endpoint = endpoint; - this.sessionId = sessionId; - } - */ + @SuppressWarnings("deprecation") public static boolean uploadDataset(String inputFileString, String uploadFormat, CodingErrorAction codingErrorAction, Charset inputFileCharset, String datasetAlias, @@ -142,8 +117,8 @@ public static boolean uploadDataset(String inputFileString, long digestTime = 0L; long uploadTime = 0L; boolean updateHdrJson = false; - //we only want a small capacity otherwise the reader thread will runaway and the writer thread will become slower - BlockingQueue q = new LinkedBlockingQueue(3); + //we only want a small capacity otherwise the reader thread will runaway + BlockingQueue q = new LinkedBlockingQueue(10); if(uploadFormat==null||uploadFormat.trim().isEmpty()) @@ -165,14 +140,12 @@ public static boolean uploadDataset(String inputFileString, System.out.println("\n*******************************************************************************"); if(FilenameUtils.getExtension(inputFile.getName()).equalsIgnoreCase("csv")) { -// System.out.println("Detecting schema from csv file {"+ inputFile +"} ..."); schema = ExternalFileSchema.init(inputFile, inputFileCharset); if(schema==null) { System.err.println("Failed to parse schema file {"+ ExternalFileSchema.getSchemaFile(inputFile) +"}"); return false; } -// System.out.println("Schema file {"+ ExternalFileSchema.getSchemaFile(inputFile) +"} successfully generated..."); }else { schema = ExternalFileSchema.load(inputFile, inputFileCharset); @@ -181,7 +154,6 @@ public static boolean uploadDataset(String inputFileString, System.err.println("Failed to load schema file {"+ ExternalFileSchema.getSchemaFile(inputFile) +"}"); return false; } -// System.out.println("Schema file {"+ ExternalFileSchema.getSchemaFile(inputFile) +"} successfully loaded..."); } System.out.println("*******************************************************************************\n"); @@ -225,17 +197,17 @@ public static boolean uploadDataset(String inputFileString, } //Insert header - File metadataJson = ExternalFileSchema.getSchemaFile(inputFile); - if(metadataJson == null || !metadataJson.canRead()) + File metadataJsonFile = ExternalFileSchema.getSchemaFile(inputFile); + if(metadataJsonFile == null || !metadataJsonFile.canRead()) { - System.err.println("Error: metadata Json file {"+metadataJson+"} not found"); + System.err.println("Error: metadata Json file {"+metadataJsonFile+"} not found"); return false; } String hdrId = getLastIncompleteFileHdr(partnerConnection, datasetAlias); if(hdrId==null) { - hdrId = insertFileHdr(partnerConnection, datasetAlias,datasetFolder, FileUtils.readFileToByteArray(metadataJson), uploadFormat, Operation); + hdrId = insertFileHdr(partnerConnection, datasetAlias,datasetFolder, FileUtils.readFileToByteArray(metadataJsonFile), uploadFormat, Operation); }else { System.out.println("Record {"+hdrId+"} is being reused from InsightsExternalData"); @@ -247,6 +219,7 @@ public static boolean uploadDataset(String inputFileString, return false; } + inputFile = CsvExternalSort.sortFile(inputFile, inputFileCharset, false, 1); //Create the Bin file // File binFile = new File(csvFile.getParent(), datasetName + ".bin"); @@ -266,18 +239,19 @@ public static boolean uploadDataset(String inputFileString, { if(uploadFormat.equalsIgnoreCase("binary") && FilenameUtils.getExtension(inputFile.getName()).equalsIgnoreCase("csv")) { - GzipCompressorOutputStream out = null; + FileOutputStream fos = null; + BufferedOutputStream out = null; BufferedOutputStream bos = null; + GzipCompressorOutputStream gzos = null; try { gzbinFile = new File(inputFile.getParent(), hdrId + "." + FilenameUtils.getBaseName(inputFile.getName()) + ".gz"); GzipParameters gzipParams = new GzipParameters(); gzipParams.setFilename(FilenameUtils.getBaseName(inputFile.getName()) + ".bin"); - bos = new BufferedOutputStream(new FileOutputStream(gzbinFile),DEFAULT_BUFFER_SIZE); - out = new GzipCompressorOutputStream(bos,gzipParams); -// BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(gzbinFile)); -// CSVReader reader = new CSVReader(new InputStreamReader(new FileInputStream(inputFile), "UTF-8")); -// String[] header = reader.readNext(); + fos = new FileOutputStream(gzbinFile); + bos = new BufferedOutputStream(fos,DEFAULT_BUFFER_SIZE); + gzos = new GzipCompressorOutputStream(bos,gzipParams); + out = new BufferedOutputStream(gzos,DEFAULT_BUFFER_SIZE); long totalRowCount = 0; long successRowCount = 0; long errorRowCount = 0; @@ -285,9 +259,6 @@ public static boolean uploadDataset(String inputFileString, EbinFormatWriter w = new EbinFormatWriter(out, schema.objects.get(0).fields.toArray(new FieldType[0])); ErrorWriter ew = new ErrorWriter(inputFile,","); -// System.out.println("CodingErrorAction: "+codingErrorAction); - -// CsvReader reader = new CsvReader(new InputStreamReader(new BOMInputStream(new FileInputStream(inputFile), false), DatasetUtils.utf8Decoder(codingErrorAction, inputFileCharset))); CsvListReader reader = new CsvListReader(new InputStreamReader(new BOMInputStream(new FileInputStream(inputFile), false), DatasetUtils.utf8Decoder(codingErrorAction , inputFileCharset )), CsvPreference.STANDARD_PREFERENCE); WriterThread writer = new WriterThread(q, w, ew); Thread th = new Thread(writer,"Writer-Thread"); @@ -296,13 +267,8 @@ public static boolean uploadDataset(String inputFileString, try { -// if(reader.readHeaders()) -// { @SuppressWarnings("unused") -// String[] header = reader.getHeaders(); String[] header = reader.getHeader(true); - // String[] nextLine; - // while ((nextLine = reader.readNext()) != null) { boolean hasmore = true; System.out.println("\n*******************************************************************************"); System.out.println("File: "+inputFile+", being digested to file: "+gzbinFile); @@ -319,8 +285,6 @@ public static boolean uploadDataset(String inputFileString, if(row.size()!=0 ) { q.put(row.toArray(new String[row.size()])); -// w.addrow(row.toArray(new String[row.size()])); -// successRowCount = w.getNrows(); } }else { @@ -335,50 +299,83 @@ public static boolean uploadDataset(String inputFileString, System.err.println("Row {"+totalRowCount+"} has error {"+t+"}"); if(t instanceof MalformedInputException) { + while(!q.isEmpty()) + { + try + { + Thread.sleep(1000); + }catch(InterruptedException in) + { + in.printStackTrace(); + } + } + while(!writer.isDone()) { q.put(new String[0]); - Thread.sleep(1000); + try + { + Thread.sleep(1000); + }catch(InterruptedException in) + { + in.printStackTrace(); + } } System.err.println("\n*******************************************************************************"); System.err.println("The input file is not utf8 encoded. Please save it as UTF8 file first"); System.err.println("*******************************************************************************\n"); status = false; hasmore = false; -// }else -// { -// -// System.err.println("\n*******************************************************************************"); -// System.err.println("t"); -// System.err.println("*******************************************************************************\n"); -// status = false; -// hasmore = false; -// if(row!=null) -// { -// ew.addError(row.toArray(new String[row.size()]), t.getMessage()); -// errorRowCount++; -// } } - //t.printStackTrace(); } }//end while + while(!q.isEmpty()) + { + try + { + System.out.println("1 Waiting for writer to finish"); + Thread.sleep(1000); + }catch(InterruptedException in) + { + in.printStackTrace(); + } + } + while(!writer.isDone()) { q.put(new String[0]); - Thread.sleep(1000); + try + { + System.out.println("2 Waiting for writer to finish"); + Thread.sleep(1000); + }catch(InterruptedException in) + { + in.printStackTrace(); + } } // } - successRowCount = w.getSuccessRowCount(); - errorRowCount = writer.getErrorRowCount(); + successRowCount = w.getSuccessRowCount(); + errorRowCount = writer.getErrorRowCount(); }finally { - reader.close(); - w.finish(); - ew.finish(); - out.close(); - bos.close(); + if(reader!=null) + reader.close(); + if(w!=null) + w.finish(); + if(ew!=null) + ew.finish(); + if(out!=null) + out.close(); + if(gzos!=null) + gzos.close(); + if(bos!=null) + bos.close(); + if(fos!=null) + fos.close(); out = null; + gzos = null; bos = null; + fos = null; } long endTime = System.currentTimeMillis(); digestTime = endTime-startTime; @@ -391,7 +388,7 @@ public static boolean uploadDataset(String inputFileString, return false; } System.out.println("\n*******************************************************************************"); - System.out.println("Total Rows: "+totalRowCount+", Success Rows: "+successRowCount+", Eror Rows: "+errorRowCount); + System.out.println("Total Rows: "+nf.format(totalRowCount)+", Success Rows: "+nf.format(successRowCount)+", Eror Rows: "+nf.format(errorRowCount)); if(gzbinFile.length()>0) System.out.println("File: "+inputFile+", Size {"+nf.format(inputFile.length())+"} compressed to file: "+gzbinFile+", Size {"+nf.format(gzbinFile.length())+"} % Compression: "+(inputFile.length()/gzbinFile.length())*100 +"%"+", Digest Time {"+nf.format(digestTime) + "} msecs"); System.out.println("*******************************************************************************\n"); @@ -403,6 +400,13 @@ public static boolean uploadDataset(String inputFileString, } catch (IOException e) { } } + if (gzos != null) { + try { + gzos.close(); + gzos = null; + } catch (IOException e) { + } + } if (bos != null) { try { bos.close(); @@ -410,6 +414,13 @@ public static boolean uploadDataset(String inputFileString, } catch (IOException e) { } } + if (fos != null) { + try { + fos.close(); + fos = null; + } catch (IOException e) { + } + } } }else if(!FilenameUtils.getExtension(inputFile.getName()).equalsIgnoreCase("zip") && !FilenameUtils.getExtension(inputFile.getName()).equalsIgnoreCase("gz")) { @@ -426,7 +437,7 @@ public static boolean uploadDataset(String inputFileString, IOUtils.copy(fis, gzOut); long endTime = System.currentTimeMillis(); if(gzbinFile.length()>0) - System.out.println("File: "+inputFile+", Size {"+nf.format(inputFile.length())+"} compressed to file: "+gzbinFile+", Size {"+nf.format(gzbinFile.length())+"} % Compression: "+(inputFile.length()/gzbinFile.length())*100 +"%"+", Compression Time {"+nf.format((endTime-startTime)) + "} msecs"); + System.out.println(" Input File, Size {"+nf.format(inputFile.length())+"} compressed to gz file, Size {"+nf.format(gzbinFile.length())+"} % Compression: "+(inputFile.length()/gzbinFile.length())*100 +"%"+", Compression Time {"+nf.format((endTime-startTime)) + "} msecs"); }finally { if(gzOut!=null) @@ -473,15 +484,10 @@ public static boolean uploadDataset(String inputFileString, gzbinFile = lastgzbinFile; } - //Upload the file -// if(useSoapAPI) long startTime = System.currentTimeMillis(); - status = uploadEM(gzbinFile, uploadFormat, ExternalFileSchema.getSchemaFile(inputFile), datasetAlias,datasetFolder, useBulkAPI, partnerConnection, hdrId, datasetArchiveDir, "Overwrite", updateHdrJson); + status = uploadEM(gzbinFile, uploadFormat, metadataJsonFile, datasetAlias,datasetFolder, useBulkAPI, partnerConnection, hdrId, datasetArchiveDir, "Overwrite", updateHdrJson); long endTime = System.currentTimeMillis(); uploadTime = endTime-startTime; - -// else -// status = DatasetUploader.uploadEM(gzbinFile, datasetAlias, username, password,endpoint,token, format); } catch(MalformedInputException mie) { @@ -546,6 +552,9 @@ public static boolean uploadEM(File dataFile, String dataFormat, File metadataJs */ public static boolean uploadEM(File dataFile, String dataFormat, byte[] metadataJsonBytes, String datasetAlias,String datasetFolder, boolean useBulk, PartnerConnection partnerConnection, String hdrId, File datasetArchiveDir, String Operation, boolean updateHdrJson) throws Exception { + BlockingQueue> q = new LinkedBlockingQueue>(); + LinkedList existingFileParts = new LinkedList(); + if(datasetAlias==null||datasetAlias.trim().isEmpty()) { throw new IllegalArgumentException("datasetAlias cannot be blank"); @@ -569,31 +578,111 @@ public static boolean uploadEM(File dataFile, String dataFormat, byte[] metadata hdrId = insertFileHdr(partnerConnection, datasetAlias,datasetFolder, metadataJsonBytes, dataFormat, Operation); }else { - LinkedList existingFileParts = getUploadedFileParts(partnerConnection, hdrId); + existingFileParts = getUploadedFileParts(partnerConnection, hdrId); if(updateHdrJson && existingFileParts.isEmpty()) updateFileHdr(partnerConnection, hdrId, datasetAlias, datasetFolder, metadataJsonBytes, dataFormat, "None", Operation); } - if(hdrId !=null && !hdrId.isEmpty()) + if(hdrId ==null || hdrId.isEmpty()) { + return false; + } Map fileParts = chunkBinary(dataFile, datasetArchiveDir); - if(useBulk) - { - if(eu.insertFilePartsBulk(partnerConnection, hdrId, createBatchZip(fileParts, hdrId), 0)) - return updateFileHdr(partnerConnection, hdrId, null, null, null, null, "Process", null); - else - return false; - }else + boolean allPartsUploaded = false; + int retryCount=0; + int totalErrorCount = 0; + if(fileParts.size() upThreads = new LinkedList(); + for(int i = 1;i<=MAX_NUM_UPLOAD_THREADS;i++) + { + FilePartsUploaderThread writer = new FilePartsUploaderThread(q, partnerConnection, hdrId); + Thread th = new Thread(writer,"FilePartsUploaderThread-"+i); + th.setDaemon(true); + th.start(); + upThreads.add(writer); + } + + if(useBulk) + { + if(eu.insertFilePartsBulk(partnerConnection, hdrId, createBatchZip(fileParts, hdrId), 0)) + return updateFileHdr(partnerConnection, hdrId, null, null, null, null, "Process", null); + else + return false; + }else + { + for(int i:fileParts.keySet()) + { + if(!existingFileParts.contains(i)) + { + HashMap tmp = new HashMap(); + tmp.put(i,fileParts.get(i)); + q.put(tmp); + } + } + while(!q.isEmpty()) + { + try + { + Thread.sleep(1000); + }catch(InterruptedException in) + { + in.printStackTrace(); + } + } + } + + for(int i = 0;i()); + try + { + Thread.sleep(1000); + }catch(InterruptedException in) + { + in.printStackTrace(); + } + } + totalErrorCount = totalErrorCount + uploader.getErrorRowCount(); + } + + allPartsUploaded = true; + existingFileParts = getUploadedFileParts(partnerConnection, hdrId); + for(int i:fileParts.keySet()) + { + if(!existingFileParts.contains(i)) + { + allPartsUploaded = false; + }else + { + FileUtils.deleteQuietly(fileParts.get(i)); + } + } + if(allPartsUploaded) + break; + retryCount++; } - }else - { - return false; - } + + if(totalErrorCount==0 && allPartsUploaded) + { + return updateFileHdr(partnerConnection, hdrId, null, null, null, null, "Process", null); + }else + { + System.err.println("Not all file parts were uploaded to InsightsExternalDataPart, remaining files:"); + for(int i:fileParts.keySet()) + { + if(!existingFileParts.contains(i)) + { + System.err.println(fileParts.get(i)); + } + } + return false; + } } @@ -652,6 +741,7 @@ private static String insertFileHdr(PartnerConnection partnerConnection, String return rowId; } + /* private boolean insertFileParts(PartnerConnection partnerConnection, String insightsExternalDataId, Map fileParts, int retryCount) throws Exception { LinkedHashMap failedFileParts = new LinkedHashMap(); @@ -711,6 +801,7 @@ private boolean insertFileParts(PartnerConnection partnerConnection, String insi } return true; } + */ private boolean insertFilePartsBulk(PartnerConnection partnerConnection, String insightsExternalDataId, Map fileParts, int retryCount) throws Exception { @@ -962,13 +1053,13 @@ private static Map createBatchZip(Map fileParts,Stri /** * This method will format the error message so that it can be logged by - * INFA in the error csv file + * in the error csv file * * @param errors * Array of com.sforce.soap.partner.Error[] * @return formated Error String */ - private static String getErrorMessage(com.sforce.soap.partner.Error[] errors) + static String getErrorMessage(com.sforce.soap.partner.Error[] errors) { StringBuffer strBuf = new StringBuffer(); for(com.sforce.soap.partner.Error err:errors) @@ -1300,7 +1391,6 @@ private static boolean checkAPIAccess(PartnerConnection partnerConnection) { return false; } - //SELECT Action,Id,Status FROM InsightsExternalData WHERE EdgemartAlias = 'puntest' AND Status = 'New' AND Action = 'None' ORDER BY CreatedDate ASC NULLS LAST private static String getLastIncompleteFileHdr(PartnerConnection partnerConnection, String datasetAlias) throws Exception { @@ -1342,11 +1432,9 @@ private static String getLastIncompleteFileHdr(PartnerConnection partnerConnecti private static LinkedList getUploadedFileParts(PartnerConnection partnerConnection, String hdrId) throws Exception { LinkedList existingPartList = new LinkedList(); -// String rowId = null; String soqlQuery = String.format("SELECT Id,PartNumber FROM InsightsExternalDataPart WHERE InsightsExternalDataId = '%s' ORDER BY PartNumber ASC",hdrId); partnerConnection.setQueryOptions(2000); QueryResult qr = partnerConnection.query(soqlQuery); -// int rowsSoFar = 0; boolean done = false; if (qr.getSize() > 0) { @@ -1362,7 +1450,6 @@ private static LinkedList getUploadedFileParts(PartnerConnection partne else existingPartList.add(new Integer(value.toString())); } -// rowsSoFar++; } if (qr.isDone()) { done = true; diff --git a/src/main/java/com/sforce/dataset/loader/EbinFormatWriter.java b/src/main/java/com/sforce/dataset/loader/EbinFormatWriter.java index 440090b..65d6e80 100644 --- a/src/main/java/com/sforce/dataset/loader/EbinFormatWriter.java +++ b/src/main/java/com/sforce/dataset/loader/EbinFormatWriter.java @@ -26,19 +26,33 @@ package com.sforce.dataset.loader; import java.io.IOException; import java.io.OutputStream; +import java.math.BigDecimal; import java.text.DecimalFormat; import java.text.NumberFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; +import java.util.TimeZone; import java.util.regex.Pattern; -import com.sforce.dataset.loader.file.schema.FieldType; +import javax.script.Bindings; +import javax.script.CompiledScript; +import javax.script.SimpleBindings; +import com.sforce.dataset.loader.file.schema.FieldType; +/** + * @author pgupta + * + * This format has been deprecated and not recommended for use. + * Simply upload the file in CSV format + * + */ +@Deprecated public class EbinFormatWriter { static byte[] delimiter = new byte[] {(byte)0x3A}; // ":" @@ -53,6 +67,7 @@ public class EbinFormatWriter { LinkedList measure_index = new LinkedList(); LinkedList _dataTypes = new LinkedList(); + LinkedHashMap prev = new LinkedHashMap();; private int numColumns = 0; private OutputStream out; @@ -63,6 +78,8 @@ public class EbinFormatWriter { private volatile int successRowCount = 0; private volatile int totalRowCount = 0; + Calendar cal = Calendar.getInstance(); + DecimalFormat df = new DecimalFormat("00"); public static final NumberFormat nf = NumberFormat.getIntegerInstance(); long startTime = 0L; @@ -78,7 +95,7 @@ public EbinFormatWriter(OutputStream out, FieldType[] dataTypes) throws IOException { this(out); - this.numColumns = dataTypes.length; +// this.numColumns = dataTypes.length; for (FieldType dataType: dataTypes) { _dataTypes.add(dataType); @@ -92,9 +109,14 @@ public EbinFormatWriter(OutputStream out, FieldType[] dataTypes) _dataTypes.add(FieldType.GetStringKeyDataType(dataType.getName() + "_Quarter", null, null)); _dataTypes.add(FieldType.GetStringKeyDataType(dataType.getName() + "_Week", null, null)); } + if(!dataType.isComputedField) + numColumns++; } this.initmeasures(_dataTypes); + + cal.setTimeZone(TimeZone.getTimeZone("GMT")); + df.setMinimumIntegerDigits(2); } protected EbinFormatWriter(OutputStream out) throws IOException @@ -104,8 +126,6 @@ protected EbinFormatWriter(OutputStream out) throws IOException out.write(magic, 0, 3); out.write(version_high); out.write(version_low); -// out.writeByte(version_high); -// out.writeByte(version_low); } @@ -114,6 +134,7 @@ public void addrow(String[] values) throws IOException,NumberFormatException, P LinkedList measure_values = new LinkedList(); LinkedList dim_keys = new LinkedList(); LinkedList dim_values = new LinkedList(); + LinkedHashMap curr = new LinkedHashMap(); int count = 0; int key_value_count = 0; @@ -134,162 +155,231 @@ public void addrow(String[] values) throws IOException,NumberFormatException, P startTime = newStartTime; } - if(totalRowCount/interval>=10) + if(interval < 1000000 && totalRowCount/interval>=10) { interval = interval*10; } while(key_value_count<_dataTypes.size()) { - if(values[count]==null||values[count].trim().length()==0) - values[count] = _dataTypes.get(key_value_count).getDefaultValue(); + if(_dataTypes.get(key_value_count).isSkipped) + { + key_value_count++; + count++; + continue; + } + Object columnValue = _dataTypes.get(key_value_count).getDefaultValue(); + if(_dataTypes.get(key_value_count).getfType() == FieldType.DATE) + columnValue = _dataTypes.get(key_value_count).getDefaultDate(); + if(_dataTypes.get(key_value_count).isComputedField) + { + Bindings bindings = new SimpleBindings(); + CompiledScript cs = _dataTypes.get(key_value_count).getCompiledScript(); + try + { + if(cs!=null) + { + bindings.put("curr", curr); + bindings.put("prev", prev); + columnValue = cs.eval(bindings); + } + }catch(Throwable t) + { + System.out.println("Field {"+_dataTypes.get(key_value_count).name+"} has Invalid Expression {"+_dataTypes.get(key_value_count).getComputedFieldExpression()+"}"); + t.printStackTrace(); + } + }else + { + if(values[count]!=null) + { + columnValue = values[count]; + } + } if (_dataTypes.get(key_value_count).getfType() == FieldType.MEASURE) { try { double v = 0L; - if(values[count]==null||values[count].trim().length()==0) + double mv = 0L; + if(columnValue!=null) { - measure_values.add((long)v); - key_value_count++; - }else - { - v = Double.parseDouble(values[count]); + if(columnValue instanceof Double) + v = (Double)columnValue; + else + v = (new BigDecimal(columnValue.toString())).doubleValue(); + if (_dataTypes.get(key_value_count).getMeasure_multiplier() > 1) { - v = v*_dataTypes.get(key_value_count).getMeasure_multiplier(); + mv = v*_dataTypes.get(key_value_count).getMeasure_multiplier(); + }else + { + mv = v; } - if(v>edgeMaxValue || vedgeMaxValue || mv 0 ? _dataTypes.get(key_value_count).precision: maxTextLength; + if(_dataTypes.get(key_value_count).isMultiValue()) { - if(val!=null && val.length() > maxTextLength) - dim_values.add(val.substring(0, maxTextLength)); + String vals[] = columnValue.toString().split(Pattern.quote(_dataTypes.get(key_value_count).getMultiValueSeparator())); + for(String val:vals) + { + if(val!=null && val.length() > precision) + dim_values.add(val.substring(0, precision)); + else + dim_values.add(val); + dim_keys.add(_dataTypes.get(key_value_count).getName()); + } + }else + { + if(columnValue.toString().length() > precision) + dim_values.add(columnValue.toString().substring(0, precision)); else - dim_values.add(val); + dim_values.add(columnValue.toString()); dim_keys.add(_dataTypes.get(key_value_count).getName()); } - }else - { - if(values[count]!=null && values[count].length() > maxTextLength) - dim_values.add(values[count].substring(0, maxTextLength)); - else - dim_values.add(values[count]); - dim_keys.add(_dataTypes.get(key_value_count).getName()); + curr.put(_dataTypes.get(key_value_count).name, columnValue.toString()); + key_value_count++; } - key_value_count++; } count++; } arr(measure_values); dict(dim_keys, dim_values); + prev = curr; successRowCount++; } @@ -316,7 +406,6 @@ protected void initmeasures(LinkedList _dataTypes) throws IOExceptio vInt(_dataTypes.get((int) measure_index.get(i)).getMeasure_multiplier()); } out.write(checksum); -// out.writeByte(checksum); } protected void vInt(long i) throws IOException @@ -325,34 +414,27 @@ protected void vInt(long i) throws IOException while ((i & ~0x7F) != 0) { b = (byte) ((i & 0x7F) | 0x80); out.write(b); -// out.writeByte(b); i = i >> 7; } out.write((int)i); -// out.writeByte((int)i); } - /* protected void dict(LinkedList dim_keys, LinkedList dim_values) throws IOException { int dlen = dim_values.size(); vInt(dlen); for (int i = 0; i < dlen; i++) { - String d = dim_keys.get(i); - String v = dim_values.get(i); -// System.err.println(d+": "+v); - vInt(v.getBytes("UTF-8").length + d.getBytes("UTF-8").length + 1); -// byte[] b1 = d.getBytes("UTF-8"); - out.write(d.getBytes("UTF-8")); + byte[] v = dim_values.get(i).getBytes("UTF-8"); + byte[] d = dim_keys.get(i).getBytes("UTF-8"); + vInt(v.length + d.length + 1); + out.write(d); out.write(delimiter); -// byte[] b2 = v.getBytes("UTF-8"); - out.write(v.getBytes("UTF-8")); + out.write(v); } out.write(checksum); -// out.writeByte(checksum); } - */ - + + /* protected void dict(LinkedList dim_keys, LinkedList dim_values) throws IOException { int dlen = dim_values.size(); @@ -370,6 +452,7 @@ protected void dict(LinkedList dim_keys, LinkedList dim_values) } out.write(checksum); } + */ protected void arr(LinkedList measure_values) throws IOException { @@ -377,21 +460,7 @@ protected void arr(LinkedList measure_values) throws IOException vEncodedIntCustom(measure_values.get(i)); } } - - /* - public void addrow(String[] keys, String[] values, long[] measure_values) throws IOException - { - totalRowCount++; - if (measure_values.length != measures.length) { - String message = "Row " + totalRowCount + " contains an invalid number of measures, expected " + - measures.length + " measure(s), got " + measure_values.length + ".\n"; - throw new IOException(message); - } - arr(measure_values); - dict(keys, values); - } - */ - + public void finish() throws IOException { if (out != null) { @@ -402,6 +471,13 @@ public void finish() throws IOException if(totalRowCount==0) { throw new IOException("Atleast one row must be written"); + }else + { + long newStartTime = System.currentTimeMillis(); + if(startTime==0) + startTime = newStartTime; + System.out.println("Processed last row {"+nf.format(totalRowCount) +"} time {"+nf.format(newStartTime-startTime)+"}"); + startTime = newStartTime; } } diff --git a/src/main/java/com/sforce/dataset/loader/FilePartsUploaderThread.java b/src/main/java/com/sforce/dataset/loader/FilePartsUploaderThread.java new file mode 100644 index 0000000..6c03869 --- /dev/null +++ b/src/main/java/com/sforce/dataset/loader/FilePartsUploaderThread.java @@ -0,0 +1,142 @@ +/* + * Copyright (c) 2014, salesforce.com, inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, are permitted provided + * that the following conditions are met: + * + * Redistributions of source code must retain the above copyright notice, this list of conditions and the + * following disclaimer. + * + * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and + * the following disclaimer in the documentation and/or other materials provided with the distribution. + * + * Neither the name of salesforce.com, inc. nor the names of its contributors may be used to endorse or + * promote products derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A + * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR + * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED + * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ +package com.sforce.dataset.loader; + +import java.io.File; +import java.text.NumberFormat; +import java.util.Map; +import java.util.concurrent.BlockingQueue; + +import org.apache.commons.io.FileUtils; + +import com.sforce.soap.partner.PartnerConnection; +import com.sforce.soap.partner.SaveResult; +import com.sforce.soap.partner.sobject.SObject; + +public class FilePartsUploaderThread implements Runnable { + + + private final BlockingQueue> queue; + private final PartnerConnection partnerConnection; + private final String insightsExternalDataId; + + private volatile boolean isDone = false; + private volatile int errorRowCount = 0; + private volatile int totalRowCount = 0; + + public static final NumberFormat nf = NumberFormat.getIntegerInstance(); + +FilePartsUploaderThread(BlockingQueue> q,PartnerConnection partnerConnection, String insightsExternalDataId) + { + if(partnerConnection==null || insightsExternalDataId == null || q == null) + { + throw new IllegalArgumentException("Constructor input cannot be null"); + } + queue = q; + this.partnerConnection = partnerConnection; + this.insightsExternalDataId = insightsExternalDataId; + } + + public void run() { + try { + Map row = queue.take(); + System.out.println("Start: " + Thread.currentThread().getName()); + while (!row.isEmpty()) { + try + { + totalRowCount++; + if(!insertFileParts(partnerConnection, insightsExternalDataId, row, 0)) + errorRowCount++; + }catch(Throwable t) + { + errorRowCount++; + t.printStackTrace(); + } + row = queue.take(); + } + }catch (Throwable t) { + System.out.println (Thread.currentThread().getName() + " " + t.getMessage()); + } + System.out.println("END: " + Thread.currentThread().getName()); + isDone = true; + } + +public boolean isDone() { + return isDone; +} + +public int getErrorRowCount() { + return errorRowCount; +} + + public int getTotalRowCount() { + return totalRowCount; +} + + + private boolean insertFileParts(PartnerConnection partnerConnection, String insightsExternalDataId, Map fileParts, int retryCount) throws Exception + { + for(int i:fileParts.keySet()) + { + try { + long startTime = System.currentTimeMillis(); + SObject sobj = new SObject(); + sobj.setType("InsightsExternalDataPart"); + sobj.setField("DataFile", FileUtils.readFileToByteArray(fileParts.get(i))); + sobj.setField("InsightsExternalDataId", insightsExternalDataId); + sobj.setField("PartNumber",i); //Part numbers should start at 1 + SaveResult[] results = partnerConnection.create(new SObject[] { sobj }); + long endTime = System.currentTimeMillis(); + for(SaveResult sv:results) + { + if(sv.isSuccess()) + { + System.out.println("File Part {"+ fileParts.get(i) + "} Inserted into InsightsExternalDataPart: " +sv.getId() + ", upload time {"+nf.format(endTime-startTime)+"} msec"); + return true; + }else + { + System.err.println("File Part {"+ fileParts.get(i) + "} Insert Failed: " + (DatasetLoader.getErrorMessage(sv.getErrors()))); + } + } + } catch (Throwable t) { + t.printStackTrace(); + System.err.println("File Part {"+ fileParts.get(i) + "} Insert Failed: " + t.toString()); + } + } +// if(retryCount<3) +// { +// retryCount++; +// Thread.sleep(1000*retryCount); +//// partnerConnection = DatasetUtils.login(0, username, password, token, endpoint, sessionId); +// return insertFileParts(partnerConnection, insightsExternalDataId, fileParts, retryCount); +// }else + { + return false; + } + } + + +} \ No newline at end of file diff --git a/src/main/java/com/sforce/dataset/loader/file/schema/DetectFieldTypes.java b/src/main/java/com/sforce/dataset/loader/file/schema/DetectFieldTypes.java index 0832b60..0c7b8c0 100644 --- a/src/main/java/com/sforce/dataset/loader/file/schema/DetectFieldTypes.java +++ b/src/main/java/com/sforce/dataset/loader/file/schema/DetectFieldTypes.java @@ -161,9 +161,14 @@ public LinkedList detect(File inputCsv, ExternalFileSchema userSchema newField = FieldType.GetStringKeyDataType(devNames[i], null, null); int prec = detectTextPrecision(columnValues); if(prec>255) + { System.out.println(", Type: Text, Precison: "+255+" (Column will be truncated to 255 characters)"); + } else + { System.out.println(", Type: Text, Precison: "+prec); + } + newField.setPrecision(255); //Assume upper limit for precision of text fields even if the values may be smaller } } if(newField!=null) diff --git a/src/main/java/com/sforce/dataset/loader/file/schema/ExternalFileSchema.java b/src/main/java/com/sforce/dataset/loader/file/schema/ExternalFileSchema.java index 13f0280..5e6497d 100644 --- a/src/main/java/com/sforce/dataset/loader/file/schema/ExternalFileSchema.java +++ b/src/main/java/com/sforce/dataset/loader/file/schema/ExternalFileSchema.java @@ -267,6 +267,8 @@ public static ExternalFileSchema load(File inputCSV, Charset fileCharset) throws List fieldsCopy = new LinkedList(fields); for(FieldType field:fieldsCopy) { + if(field.isComputedField) + continue; boolean found = false; for (int i=0; i< devNames.length; i++) { @@ -382,6 +384,14 @@ private static void validateSchema(ExternalFileSchema schema) throws IllegalArgu message.append("[objects["+objectCount+"].fields["+fieldCount+"].fullyQualifiedName] in schema cannot be null or empty\n"); } + if(user_field.getfType()==FieldType.MEASURE) + { + if(user_field.getDefaultValue()==null|| !isLatinNumber(user_field.getDefaultValue())) + { + message.append("field fullyQualifiedName {"+user_field.fullyQualifiedName+"} in schema must have default numeric value\n"); + } + } + }else { message.append("[objects["+objectCount+"].fields["+fieldCount+"].name] in schema cannot be null or empty\n"); @@ -479,6 +489,13 @@ public static ExternalFileSchema merge(ExternalFileSchema userSchema, ExternalFi } merged_fields.add(merged_field); } + + for(FieldType user_field:user_fields) + { + if(user_field.isComputedField) + merged_fields.add(user_field); + } + merged_object.acl = user_object.acl!=null?user_object.acl:auto_object.acl; merged_object.connector = user_object.connector!=null?user_object.connector:auto_object.connector; merged_object.description = user_object.description!=null?user_object.description:auto_object.description; @@ -683,7 +700,22 @@ public static boolean isLatinNumber(char c) { return (c >= '0' && c <= '9'); } + public static boolean isLatinNumber(String str) { + if (str == null) { + return false; + } + int sz = str.length(); + if (sz == 0) { + return false; + } + for (int i = 0; i < sz; i++) { + char c = str.charAt(i); + if (!(c >= '0' && c <= '9')) + return false; + } + return true; + } } diff --git a/src/main/java/com/sforce/dataset/loader/file/schema/FieldType.java b/src/main/java/com/sforce/dataset/loader/file/schema/FieldType.java index 52b802c..7db2a3f 100644 --- a/src/main/java/com/sforce/dataset/loader/file/schema/FieldType.java +++ b/src/main/java/com/sforce/dataset/loader/file/schema/FieldType.java @@ -26,9 +26,17 @@ package com.sforce.dataset.loader.file.schema; import java.math.BigDecimal; +import java.text.ParseException; import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.TimeZone; import java.util.regex.Pattern; +import javax.script.Compilable; +import javax.script.CompiledScript; +import javax.script.ScriptEngine; +import javax.script.ScriptEngineManager; + import com.fasterxml.jackson.annotation.JsonIgnore; public class FieldType { @@ -92,6 +100,19 @@ public class FieldType { //public String acl = null; //Optional //public boolean isAclField = false; //Optional public int fiscalMonthOffset = 0; + public int firstDayOfWeek = 1; //1=SUNDAY, 2=MONDAY etc.. + public boolean canTruncateValue = true; //Optional + public boolean isSkipped = false; //Optional + public String decimalSeparator = "."; + public int sortIndex = 0; //Index start at 1, 0 means not to sort + public boolean isSortAscending = true; //Optional if index > 0 then will sort ascending if true + + public boolean isComputedField = false; //Optional if this field is computed + public String computedFieldExpression = null; //Optional the expression to compute this field + private transient CompiledScript compiledScript = null; + private transient SimpleDateFormat compiledDateFormat = null; + private transient Date defaultDate = null; + public static FieldType GetStringKeyDataType(String name, String multivalueSeperator, String defaultValue) { @@ -142,29 +163,13 @@ public static FieldType GetMeasureKeyDataType(String name,int precision,int scal kdt.setDescription(name); return kdt; } - /* - * @date_format: Use the Java date format String as defined in http://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html - - Letter Date or Time Component Presentation Examples - G Era designator Text AD - y Year Year 1996; 96 - M Month in year Month July; Jul; 07 - w Week in year Number 27 - W Week in month Number 2 - D Day in year Number 189 - d Day in month Number 10 - F Day of week in month Number 2 - E Day in week Text Tuesday; Tue - a Am/pm marker Text PM - H Hour in day (0-23) Number 0 - k Hour in day (1-24) Number 24 - K Hour in am/pm (0-11) Number 0 - h Hour in am/pm (1-12) Number 12 - m Minute in hour Number 30 - s Second in minute Number 55 - S Millisecond Number 978 - z Time zone General time zone Pacific Standard Time; PST; GMT-08:00 - Z Time zone RFC 822 time zone -0800 + + + /** + * @param name the field name + * @param format refer to Date format section in https://developer.salesforce.com/docs/atlas.en-us.bi_dev_guide_ext_data.meta/bi_dev_guide_ext_data/bi_ext_data_schema_reference.htm + * @param defaultValue + * @return */ public static FieldType GetDateKeyDataType(String name, String format, String defaultValue) { @@ -266,40 +271,35 @@ public void setScale(int scale) { } public String getFormat() { -//// String temp = format; -//// if(temp != null && (!temp.contains("'T'") && temp.contains("T"))) -//// temp = temp.replace("T", "'T'"); -//// if(temp != null && (!temp.contains("'Z'") && temp.contains("Z"))) -//// temp = temp.replace("Z", "'Z'"); -// return switchFormat(format); return format; } public void setFormat(String format) { if(this.type.equals(FieldType.DATE_TYPE) && format != null) { - new SimpleDateFormat(format); -// String temp = format; -// if(temp != null && (temp.contains("'T'"))) -// temp = temp.replace("'T'", "T"); -// if(temp != null && (temp.contains("'Z'"))) -// temp = temp.replace("'Z'", "Z"); - this.format = format; + compiledDateFormat = new SimpleDateFormat(format); + compiledDateFormat.setTimeZone(TimeZone.getTimeZone("GMT")); //All dates must be in GMT + if(this.defaultValue!=null && !this.defaultValue.isEmpty()) + { + try { + this.defaultDate = compiledDateFormat.parse(this.defaultValue); + } catch (ParseException e) { + throw new IllegalArgumentException(e.toString()); + } + } } + this.format = format; } -// @JsonIgnore -// public String switchFormat(String fmt) { -// if(fmt==null||fmt.isEmpty()) -// fmt = format; -// String temp = fmt; -// if(temp != null && (!temp.contains("'T'") && temp.contains("T"))) -// temp = temp.replace("T", "'T'"); -// if(temp != null && (!temp.contains("'Z'") && temp.contains("Z"))) -// temp = temp.replace("Z", "'Z'"); -// return temp; -// } + @JsonIgnore + public SimpleDateFormat getCompiledDateFormat() { + return compiledDateFormat; + } + @JsonIgnore + public Date getDefaultDate() { + return defaultDate; + } public String getMultiValueSeparator() { return multiValueSeparator; @@ -314,6 +314,17 @@ public String getDefaultValue() { } public void setDefaultValue(String defaultValue) { + if(this.type.equals(FieldType.DATE_TYPE) && compiledDateFormat != null) + { + if(defaultValue!=null && !defaultValue.isEmpty()) + { + try { + this.defaultDate = compiledDateFormat.parse(defaultValue); + } catch (ParseException e) { + throw new IllegalArgumentException(e.toString()); + } + } + } this.defaultValue = defaultValue; } @@ -411,7 +422,35 @@ public void setFiscalMonthOffset(int fiscalMonthOffset) { this.fiscalMonthOffset = fiscalMonthOffset; } + public String getComputedFieldExpression() { + return computedFieldExpression; + } + public void setComputedFieldExpression(String computedFieldExpression) { + if(computedFieldExpression != null && computedFieldExpression.length()!=0) + { + try + { + ScriptEngineManager mgr = new ScriptEngineManager(); + ScriptEngine jsEngine = mgr.getEngineByName("JavaScript"); + if (jsEngine instanceof Compilable) + { + Compilable compEngine = (Compilable)jsEngine; + this.compiledScript = compEngine.compile(computedFieldExpression); + this.computedFieldExpression = computedFieldExpression; + } + } catch(Throwable t) + { + throw new IllegalArgumentException(t.toString()); + } + } + } + + @JsonIgnore + public CompiledScript getCompiledScript() { + return compiledScript; + } + @Override public int hashCode() { diff --git a/src/main/java/com/sforce/dataset/loader/file/sort/CsvColumnComparatorFactory.java b/src/main/java/com/sforce/dataset/loader/file/sort/CsvColumnComparatorFactory.java new file mode 100644 index 0000000..802f304 --- /dev/null +++ b/src/main/java/com/sforce/dataset/loader/file/sort/CsvColumnComparatorFactory.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2014, salesforce.com, inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, are permitted provided + * that the following conditions are met: + * + * Redistributions of source code must retain the above copyright notice, this list of conditions and the + * following disclaimer. + * + * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and + * the following disclaimer in the documentation and/or other materials provided with the distribution. + * + * Neither the name of salesforce.com, inc. nor the names of its contributors may be used to endorse or + * promote products derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A + * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR + * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED + * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ +package com.sforce.dataset.loader.file.sort; + +import com.foundations.comparator.attributes.DateTimeSortAttributes; +import com.foundations.comparator.attributes.DecimalSortAttributes; +import com.foundations.comparator.attributes.SortAttributes; +import com.foundations.comparator.attributes.StringSortAttributes; +import com.foundations.comparator.column.DateTimeComparator; +import com.foundations.comparator.column.DecimalComparator; +import com.foundations.comparator.column.IColumnComparator; +import com.foundations.comparator.column.IntegerComparator; +import com.foundations.comparator.column.StringComparator; +import com.sforce.dataset.loader.file.schema.FieldType; + +public class CsvColumnComparatorFactory { + + + public static final IColumnComparator createColumnComparator(FieldType fieldType) { + IColumnComparator comparator = null; + if(fieldType.getfType() == FieldType.STRING) { + StringSortAttributes attributes = new StringSortAttributes(); + attributes.setCaseSensitive(true); + attributes.setAscendingOrder(fieldType.isSortAscending); + comparator = new StringComparator(fieldType.name, fieldType.sortIndex, attributes); + } + else if(fieldType.getfType() == FieldType.DATE) { + DateTimeSortAttributes attributes = new DateTimeSortAttributes(); + attributes.setAscendingOrder(fieldType.isSortAscending); + attributes.setPattern(fieldType.format); + comparator = new DateTimeComparator(fieldType.name, fieldType.sortIndex, attributes); + } + else if(fieldType.getfType() == FieldType.MEASURE) + { + if(fieldType.scale > 0) + { + DecimalSortAttributes attributes = new DecimalSortAttributes(); + attributes.setAscendingOrder(fieldType.isSortAscending); + attributes.setScale(fieldType.scale); + comparator = new DecimalComparator(fieldType.name, fieldType.sortIndex, attributes); + }else + { + SortAttributes attributes = new SortAttributes(); + attributes.setAscendingOrder(fieldType.isSortAscending); + comparator = new IntegerComparator(fieldType.name, fieldType.sortIndex, attributes); + } + + } + return comparator; + } +} diff --git a/src/main/java/com/sforce/dataset/loader/file/sort/CsvExternalSort.java b/src/main/java/com/sforce/dataset/loader/file/sort/CsvExternalSort.java new file mode 100644 index 0000000..9193fed --- /dev/null +++ b/src/main/java/com/sforce/dataset/loader/file/sort/CsvExternalSort.java @@ -0,0 +1,387 @@ +/* + * Copyright (c) 2014, salesforce.com, inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, are permitted provided + * that the following conditions are met: + * + * Redistributions of source code must retain the above copyright notice, this list of conditions and the + * following disclaimer. + * + * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and + * the following disclaimer in the documentation and/or other materials provided with the distribution. + * + * Neither the name of salesforce.com, inc. nor the names of its contributors may be used to endorse or + * promote products derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A + * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR + * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED + * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ +package com.sforce.dataset.loader.file.sort; + +import java.io.BufferedWriter; +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.FilenameUtils; +import org.apache.commons.io.input.BOMInputStream; +import org.supercsv.io.CsvListReader; +import org.supercsv.io.CsvListWriter; +import org.supercsv.prefs.CsvPreference; + +import com.google.code.externalsorting.ExternalSort; +import com.sforce.dataset.loader.file.schema.ExternalFileSchema; +import com.sforce.dataset.util.DatasetUtils; + +public class CsvExternalSort extends ExternalSort { + + public static File sortFile(File inputCsv, final Charset cs, final boolean distinct,final int headersize) throws IOException + { + if(inputCsv==null || !inputCsv.canRead()) + { + throw new IOException("File not found {"+inputCsv+"}"); + } + + File outputFile = new File(inputCsv.getParent(), FilenameUtils.getBaseName(inputCsv.getName())+ "_sorted." + FilenameUtils.getExtension(inputCsv.getName())); + + ExternalFileSchema schema = ExternalFileSchema.load(inputCsv, cs); + if(schema==null || schema.objects == null || schema.objects.size()==0 || schema.objects.get(0).fields == null) + { + throw new IOException("File does not have valid metadata json {"+ExternalFileSchema.getSchemaFile(inputCsv)+"}"); + } + + CsvRowComparator cmp = null; +// try +// { + cmp = new CsvRowComparator(schema.objects.get(0).fields); + if(cmp.getSortColumnCount()==0) + return inputCsv; +// }catch(Throwable t) +// { +// t.printStackTrace(); +// return inputCsv; +// } + + List l = sortInBatch(inputCsv, cs, cmp, distinct, headersize); + System.out.println("CsvExternalSort created " + l.size() + " tmp files"); + mergeSortedFiles(l, outputFile, cmp, cs, distinct, inputCsv, headersize); + return outputFile; + } + + /** + * @param fbr + * data source + * @param datalength + * estimated data volume (in bytes) + * @param cmp + * string comparator + * @param maxtmpfiles + * maximal number of temporary files + * @param maxMemory + * maximum amount of memory to use (in bytes) + * @param cs + * character set to use (can use + * Charset.defaultCharset()) + * @param tmpdirectory + * location of the temporary files (set to null for + * default location) + * @param distinct + * Pass true if duplicate lines should be + * discarded. + * @param numHeader + * number of lines to preclude before sorting starts + * @param usegzip + * use gzip compression for the temporary files + * @return a list of temporary flat files + * @throws IOException + */ + public static List sortInBatch(File inputCsv, final Charset cs, CsvRowComparator cmp, final boolean distinct,final int numHeader) throws IOException + { + List files = new ArrayList(); + long blocksize = estimateBestSizeOfBlocks(inputCsv.length(), DEFAULTMAXTEMPFILES, estimateAvailableMemory());// in bytes + CsvListReader reader = new CsvListReader(new InputStreamReader(new BOMInputStream(new FileInputStream(inputCsv), false), DatasetUtils.utf8Decoder(null , cs )), CsvPreference.STANDARD_PREFERENCE); + File tmpdirectory = new File(inputCsv.getParent(),"archive"); + try + { + FileUtils.forceMkdir(tmpdirectory); + }catch(Throwable t) + { + t.printStackTrace(); + } + + try { + List> tmplist = new ArrayList>(); + try { + int counter = 0; + List row = new ArrayList(); + while (row != null) { + long currentblocksize = 0;// in bytes + while ((currentblocksize < blocksize) && ((row = reader.read()) != null)) // as long as you have enough memory + { + if (counter < numHeader) { + counter++; + continue; + } + tmplist.add(row); + currentblocksize += row.size()*300; + } + files.add(sortAndSave(tmplist, cmp, cs, tmpdirectory, distinct)); + tmplist.clear(); + } + } catch (EOFException oef) { + if (tmplist.size() > 0) { + files.add(sortAndSave(tmplist, cmp, cs,tmpdirectory, distinct)); + tmplist.clear(); + } + } + } finally { + reader.close(); + } + return files; + } + + /** + * Sort a list and save it to a temporary file + * + * @return the file containing the sorted data + * @param tmplist + * data to be sorted + * @param cmp + * string comparator + * @param cs + * charset to use for output (can use + * Charset.defaultCharset()) + * @param tmpdirectory + * location of the temporary files (set to null for + * default location) + * @param distinct + * Pass true if duplicate lines should be + * discarded. + * @param usegzip + * set to true if you are using gzip compression for the + * temporary files + * @throws IOException + */ + private static File sortAndSave(List> tmplist, + CsvRowComparator cmp, Charset cs, File tmpdirectory, boolean distinct) throws IOException { + Collections.sort(tmplist, cmp); + File newtmpfile = File.createTempFile("sortInBatch", + "flatfile", tmpdirectory); + newtmpfile.deleteOnExit(); + CsvListWriter writer = new CsvListWriter(new BufferedWriter(new OutputStreamWriter(new FileOutputStream(newtmpfile), cs)),CsvPreference.STANDARD_PREFERENCE); + List lastLine = null; + try { + for (List rowData : tmplist) { + // Skip duplicate lines + if (!distinct || !rowData.equals(lastLine)) { + writer.write(rowData); + lastLine = rowData; + } + } + } finally { + writer.close(); + } + return newtmpfile; + } + + + /** + * This merges a bunch of temporary flat files + * + * @param files + * The {@link List} of sorted {@link File}s to be merged. + * @param distinct + * Pass true if duplicate lines should be + * discarded. (elchetz@gmail.com) + * @param outputfile + * The output {@link File} to merge the results to. + * @param cmp + * The {@link Comparator} to use to compare + * {@link String}s. + * @param cs + * The {@link Charset} to be used for the byte to + * character conversion. + * @param append + * Pass true if result should append to + * {@link File} instead of overwrite. Default to be false + * for overloading methods. + * @param usegzip + * assumes we used gzip compression for temporary files + * @return The number of lines sorted. (P. Beaudoin) + * @throws IOException + * @since v0.1.4 + */ + public static int mergeSortedFiles(List files, File outputfile, + final Comparator> cmp, Charset cs, boolean distinct, File inputfile,final int headersize) throws IOException { + ArrayList bfbs = new ArrayList(); + for (File f : files) { + CsvFileBuffer bfb = new CsvFileBuffer(f, cs); + bfbs.add(bfb); + } + CsvListWriter writer = new CsvListWriter(new BufferedWriter(new OutputStreamWriter(new FileOutputStream(outputfile), cs)),CsvPreference.STANDARD_PREFERENCE); + copyHeader(inputfile, writer, cs, headersize); + int rowcounter = mergeSortedFiles(writer, cmp, distinct, bfbs); + for (File f : files) + f.delete(); + return rowcounter; + } + + + /** + * This merges several BinaryFileBuffer to an output writer. + * + * @param fbw + * A buffer where we write the data. + * @param cmp + * A comparator object that tells us how to sort the + * lines. + * @param distinct + * Pass true if duplicate lines should be + * discarded. (elchetz@gmail.com) + * @param buffers + * Where the data should be read. + * @return The number of lines sorted. (P. Beaudoin) + * @throws IOException + * + */ + public static int mergeSortedFiles(CsvListWriter writer, + final Comparator> cmp, boolean distinct, + List buffers) throws IOException { + PriorityQueue pq = new PriorityQueue( + 11, new Comparator() { + @Override + public int compare(CsvFileBuffer i, + CsvFileBuffer j) { + return cmp.compare(i.peek(), j.peek()); + } + }); + for (CsvFileBuffer bfb : buffers) + if (!bfb.empty()) + pq.add(bfb); + int rowcounter = 0; + List lastLine = null; + try { + while (pq.size() > 0) { + CsvFileBuffer bfb = pq.poll(); + List r = bfb.pop(); + // Skip duplicate lines + if (!distinct || !r.equals(lastLine)) { + writer.write(r); + lastLine = r; + } + ++rowcounter; + if (bfb.empty()) { + bfb.close(); + } else { + pq.add(bfb); // add it back + } + } + } finally { + writer.close(); + for (CsvFileBuffer bfb : pq) + bfb.close(); + } + return rowcounter; + + } + + + /** + * @param fbr + * data source + * @param datalength + * estimated data volume (in bytes) + * @param cmp + * string comparator + * @param maxtmpfiles + * maximal number of temporary files + * @param maxMemory + * maximum amount of memory to use (in bytes) + * @param cs + * character set to use (can use + * Charset.defaultCharset()) + * @param tmpdirectory + * location of the temporary files (set to null for + * default location) + * @param distinct + * Pass true if duplicate lines should be + * discarded. + * @param numHeader + * number of lines to preclude before sorting starts + * @param usegzip + * use gzip compression for the temporary files + * @return a list of temporary flat files + * @throws IOException + */ + public static void copyHeader(File inputCsv, CsvListWriter writer, Charset cs, final int numHeader) throws IOException + { + CsvListReader reader = new CsvListReader(new InputStreamReader(new BOMInputStream(new FileInputStream(inputCsv), false), DatasetUtils.utf8Decoder(null , cs )), CsvPreference.STANDARD_PREFERENCE); + try { + int counter = 0; + List row = new ArrayList(); + while ((counter < numHeader) && ((row = reader.read()) != null)) + { + writer.write(row); + counter++; + } + } catch (EOFException oef) { + } finally { + reader.close(); + } + } +} + + +/** + * This is essentially a thin wrapper on top of a CsvListReader... which keeps + * the last line in memory. + */ +final class CsvFileBuffer { + public CsvFileBuffer(File inputCsv, Charset cs) throws IOException { + this.reader = new CsvListReader(new InputStreamReader(new BOMInputStream(new FileInputStream(inputCsv), false), DatasetUtils.utf8Decoder(null , cs )), CsvPreference.STANDARD_PREFERENCE); + reload(); + } + public void close() throws IOException { + this.reader.close(); + } + + public boolean empty() { + return this.cache == null; + } + + public List peek() { + return this.cache; + } + + public List pop() throws IOException { + List answer = peek();// make a copy + reload(); + return answer; + } + + private void reload() throws IOException { + this.cache = reader.read(); + } + + public CsvListReader reader; + private List cache; + +} diff --git a/src/main/java/com/sforce/dataset/loader/file/sort/CsvRowComparator.java b/src/main/java/com/sforce/dataset/loader/file/sort/CsvRowComparator.java new file mode 100644 index 0000000..36ea634 --- /dev/null +++ b/src/main/java/com/sforce/dataset/loader/file/sort/CsvRowComparator.java @@ -0,0 +1,148 @@ +/* + * Copyright (c) 2014, salesforce.com, inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, are permitted provided + * that the following conditions are met: + * + * Redistributions of source code must retain the above copyright notice, this list of conditions and the + * following disclaimer. + * + * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and + * the following disclaimer in the documentation and/or other materials provided with the distribution. + * + * Neither the name of salesforce.com, inc. nor the names of its contributors may be used to endorse or + * promote products derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A + * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR + * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED + * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ +package com.sforce.dataset.loader.file.sort; + +import java.util.Comparator; +import java.util.LinkedList; +import java.util.List; + +import com.foundations.comparator.column.IColumnComparator; +import com.sforce.dataset.loader.file.schema.FieldType; + +public class CsvRowComparator implements Comparator> { + + private IColumnComparator[] _columnComparators; + private int[] _sortColumnIndices; + int maxSortIndex = 0; + + public CsvRowComparator(List fields) { + _columnComparators = getColumnComparators(fields); + setSortColumnIndices(fields); + } + + private IColumnComparator[] getColumnComparators(List fields) + { + List list = new LinkedList(); + for(FieldType fld:fields) { + if(!fld.isComputedField && fld.sortIndex > 0) { + list.add(CsvColumnComparatorFactory.createColumnComparator(fld)); + } + } + _columnComparators = list.toArray(new IColumnComparator[list.size()]); + return _columnComparators; + } + + /** + * Loops through each ColumnComparator to determine the proper sort order sequence + * to execute during the sort. Called by constructor. + * + * - ensures the sort order is unique when non-zero + * - ensures sort order values of zero are ignored + * - ensures the sort order starts at 1 + * - ensures the sort order contain no gaps, such as 1, 2, 4 + * @param fields + * + */ + private void setSortColumnIndices(List fields) { + int size = _columnComparators.length; + boolean foundIndex=false; + _sortColumnIndices = new int[size]; + for( int sortOrder = 1; sortOrder <= size; sortOrder++ ) { + foundIndex = false; + for( int comparatorIndex = 0; comparatorIndex < fields.size(); comparatorIndex++ ) { + if( fields.get(comparatorIndex).sortIndex == sortOrder ) { + _sortColumnIndices[sortOrder - 1] = comparatorIndex; + if(comparatorIndex > maxSortIndex) + maxSortIndex = comparatorIndex; + foundIndex = true; + break; + } + } + if( !foundIndex ) { + throw new IllegalArgumentException("Illegal sortIndex defined in input metadata json, must start at 1, be unique and not contain gaps"); + } + } + if( !foundIndex && size > 0) { + throw new IllegalArgumentException("Illegal sortIndex defined in input metadata json, must start at 1, be unique and not contain gaps"); + } + } + + + public IColumnComparator getColumnComparator(int index) { + return _columnComparators[index]; + } + + /** + * Compares two rows returning the result of the compare. + * + * - The columns to be sorted will have been pre-determined. + * - The choice of delimiter separating the row's columns has been pre-determined + * - The number of columns has been pre-determined + * - The column datatypes has been pre-determined + * + * The first non-zero comparison is returned respecting sort order priority. A + * return value of zero indicates the rows are equal looking only at the columns + * deemed sortable (Those with a non-zero sort order value) + * + * @param a The first row to compare. + * @param b The second row to compare. + */ + public int compare(List a, List b) + { + int result = 0; + if( a.size() < maxSortIndex ) { + throw new IllegalArgumentException("Incorrect number of tokens detected:\n\n" + a + "\n"); + } + + if( b.size() < maxSortIndex ) { + throw new IllegalArgumentException("Incorrect number of tokens detected:\n\n" + b + "\n"); + } + + int compartorCnt=0; + for( int index : _sortColumnIndices ) { + result = _columnComparators[compartorCnt].compare(a.get(index), b.get(index)); + if( result != 0 ) { + break; + } + compartorCnt++; + } + return result; + } + + /** + * Iterates through this TableComparator's array of IColumnComparator + * and returns the count of columns that are sortable. A sortable + * column is one where its sort order is greater than 0 + * + * @return the number of sortable columns + */ + public int getSortColumnCount() { + return _columnComparators.length; + } + + + +} diff --git a/src/main/java/com/sforce/dataset/util/SfdcExtracter.java b/src/main/java/com/sforce/dataset/util/SfdcExtracter.java index 90c1939..54b6872 100644 --- a/src/main/java/com/sforce/dataset/util/SfdcExtracter.java +++ b/src/main/java/com/sforce/dataset/util/SfdcExtracter.java @@ -305,8 +305,8 @@ public static LinkedHashMap createWF(Map selectedObjectList, Part { if(labels.containsKey(fld.getLabel())) { - System.err.println("Skipping field {"+fld.getName()+"} as it has duplicate label matching field {"+labels.get(fld.getLabel())+"}"); - continue; + System.err.println("field {"+fld.getName()+"} has duplicate label matching field {"+labels.get(fld.getLabel())+"}"); +// continue; } labels.put(fld.getLabel(), fld.getName()); Map temp = new LinkedHashMap();