Skip to content

Commit

Permalink
Merge pull request #422 from OpenFn/sftp-close-conn-on-error
Browse files Browse the repository at this point in the history
`sftp` disconnect on error
  • Loading branch information
mtuchi authored Oct 24, 2023
2 parents bb79e16 + 2c56455 commit d05de5d
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 21 deletions.
6 changes: 6 additions & 0 deletions .changeset/gold-cars-applaud.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@openfn/language-sftp': patch
---

- Properly disconnect on error
- Improve operation logs
31 changes: 21 additions & 10 deletions packages/sftp/src/Adaptor.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
parseCsv,
} from '@openfn/language-common';
import Client from 'ssh2-sftp-client';
import { isObjectEmpty, handleResponse, handleError, handleLog } from './Utils';
import { isObjectEmpty, handleResponse } from './Utils';

let sftp = null;

Expand All @@ -31,7 +31,10 @@ export function execute(...operations) {
connect,
...operations,
disconnect
)({ ...initialState, ...state });
)({ ...initialState, ...state }).catch(e => {
console.error(e);
return disconnect(state);
});
}

function connect(state) {
Expand Down Expand Up @@ -83,8 +86,7 @@ export function list(dirPath, filter, callback) {
return state => {
return sftp
.list(dirPath, filter)
.then(files => handleResponse(files, state, callback))
.catch(handleError);
.then(files => handleResponse(files, state, callback));
};
}

Expand Down Expand Up @@ -129,6 +131,7 @@ export function getCSV(filePath, parsingOptions = {}) {
})
.then(() => {
console.debug('Parsing rows to JSON.\n');
console.time('Stream finished');
return new Promise((resolve, reject) => {
const content = Buffer.concat(results).toString('utf8');
resolve(content.split('\r\n'));
Expand All @@ -137,8 +140,10 @@ export function getCSV(filePath, parsingOptions = {}) {
return nextState;
});
})
.then(state => handleLog('Stream finished.', state))
.catch(handleError);
.then(state => {
console.timeEnd('Stream finished');
return state;
});
}
};
}
Expand All @@ -160,11 +165,14 @@ export function getCSV(filePath, parsingOptions = {}) {
*/
export function putCSV(localFilePath, remoteFilePath, parsingOptions) {
return state => {
console.time('Upload finished');
return sftp
.put(localFilePath, remoteFilePath, parsingOptions)
.then(response => handleResponse(response, state))
.then(state => handleLog('Upload finished.', state))
.catch(e => handleError(e, true));
.then(state => {
console.timeEnd('Upload finished');
return state;
});
};
}

Expand Down Expand Up @@ -192,6 +200,7 @@ export function getJSON(filePath, encoding) {
})
.then(() => {
console.debug('Receiving stream.\n');
console.time('Stream finished');

return new Promise((resolve, reject) => {
const content = Buffer.concat(results).toString('utf8');
Expand All @@ -201,8 +210,10 @@ export function getJSON(filePath, encoding) {
return nextState;
});
})
.then(state => handleLog('Stream finished.', state))
.catch(e => handleError(e, true));
.then(state => {
console.timeEnd('Stream finished');
return state;
});
};
}

Expand Down
11 changes: 0 additions & 11 deletions packages/sftp/src/Utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,3 @@ export function handleResponse(response, state, callback) {
if (callback) return callback(composeNextState(state, response));
return composeNextState(state, response);
}

export function handleLog(message, state) {
console.log(`✓ Success at ${new Date()}:\n∟`, message);
return state;
}

export function handleError(error, throwError = false) {
const errorString = `✗ Error at ${new Date()}:\n∟ ${error}`;
if (throwError) throw new Error(errorString);
return console.log(errorString);
}

0 comments on commit d05de5d

Please sign in to comment.