Skip to content

Commit

Permalink
Bulk uploads - use the csv file containing entries to be uploaded to …
Browse files Browse the repository at this point in the history
…stream content

Bulk uploads - use the csv file containing entries to be uploaded to stream content to the server if process.bulk.saveServerLoadAndRawResultsInCSV is set to true. This reduces memory footprint of data loader.
  • Loading branch information
ashitsalesforce committed Dec 22, 2024
1 parent de9e6dc commit 4a0ee54
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 10 deletions.
Binary file added java_pid11058.hprof
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ public boolean visit(TableRow row) throws OperationException, DataAccessObjectEx
}
}
}

convertBulkAPINulls(sforceDataRow);

// Make sure to initialize dynaClass only after mapping a row.
// This is to make sure that all polymorphic field mappings specified
// in the mapping file are mapped to parent object.
Expand All @@ -204,15 +205,13 @@ public boolean visit(TableRow row) throws OperationException, DataAccessObjectEx
dynaClass = SforceDynaBean.getDynaBeanInstance(dynaProps);
}
try {
convertBulkAPINulls(sforceDataRow);
DynaBean dynaBean = SforceDynaBean.convertToDynaBean(dynaClass, sforceDataRow);
Map<String, String> fieldMap = BeanUtils.describe(dynaBean);
for (String fName : fieldMap.keySet()) {
if (fieldMap.get(fName) != null) {
// see if any entity foreign key references are embedded here
Object value = this.getFieldValue(fName, dynaBean.get(fName));
dynaBean.set(fName, value);
dynaArraySize += fName.length() + value.toString().length();
}
}
dynaArray.add(dynaBean);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -176,14 +178,14 @@ private String getOverrideMessage(Throwable t) {
}

private void createBatches() throws OperationException, IOException, AsyncApiException {
final ByteArrayOutputStream os = new ByteArrayOutputStream(dynaArraySize);
final ByteArrayOutputStream os = new ByteArrayOutputStream();
final PrintStream out = new PrintStream(os, true, AppConfig.BULK_API_ENCODING);
doOneBatch(out, os, this.dynaArray);
os.close();
}

private void doOneBatch(PrintStream out, ByteArrayOutputStream os, List<DynaBean> rows) throws OperationException,
AsyncApiException {
AsyncApiException, FileNotFoundException {
int processedRecordsCount = 0;
final List<String> userColumns = getController().getDao().getColumnNames();
List<String> headerColumns = null;
Expand Down Expand Up @@ -340,7 +342,7 @@ private void addFieldToBatchRequestHeader(PrintStream serverRequestOutput, Strin
addedCols.add(sfdcColumn);
}

private void writeServerLoadBatchDataToCSV(ByteArrayOutputStream os) {
private String writeServerLoadBatchDataToCSV(ByteArrayOutputStream os) {
String filenamePrefix = "uploadedToServer";
String filename = generateBatchCSVFilename(filenamePrefix, batchCountForJob);
File uploadedToServerCSVFile = new File(filename);
Expand All @@ -352,6 +354,7 @@ private void writeServerLoadBatchDataToCSV(ByteArrayOutputStream os) {
} catch (Exception ex) {
logger.info("unable to create file " + filename);
}
return filename;
}

private void writeRawResultsToCSV(CSVReader serverResultsReader, int batchNum) {
Expand Down Expand Up @@ -392,16 +395,22 @@ private String generateBatchCSVFilename(String prefix, int batchNum) {
+ controller.getFormattedCurrentTimestamp() + ".csv";
}

private void createBatch(ByteArrayOutputStream os, int numRecords) throws AsyncApiException {
private void createBatch(ByteArrayOutputStream os, int numRecords) throws AsyncApiException, FileNotFoundException {
if (numRecords <= 0) return;
final byte[] request = os.toByteArray();
String uploadDataFileName = null;
if (controller.getAppConfig().getBoolean(AppConfig.PROP_SAVE_BULK_SERVER_LOAD_AND_RAW_RESULTS_IN_CSV)) {
this.batchCountForJob++;
writeServerLoadBatchDataToCSV(os);
uploadDataFileName = writeServerLoadBatchDataToCSV(os);
}
BatchInfo bi = null;
if (uploadDataFileName != null) {
bi = this.jobUtil.createBatch(new FileInputStream(uploadDataFileName));
} else {
bi = this.jobUtil.createBatch(new ByteArrayInputStream(request, 0, request.length));
}
os.reset();
BatchInfo bi = this.jobUtil.createBatch(new ByteArrayInputStream(request, 0, request.length));
this.allBatchesInOrder.add(new BatchData(bi.getId(), numRecords));
this.allBatchesInOrder.add(new BatchData(bi.getId(), numRecords));
}

@Override
Expand Down

0 comments on commit 4a0ee54

Please sign in to comment.