Skip to content

Commit

Permalink
1.0.1 - added rowPredicate to transform each row before writing
Browse files Browse the repository at this point in the history
  • Loading branch information
frankleng committed May 4, 2021
1 parent 6c0501f commit 6556a95
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 14 deletions.
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

### Example
```javascript
const service = new ddbCsv({
import DdbCsv from 'dynamocsv';
const service = new DdbCsv({
client: new DynamoDBClient({}),
target: 'dump.csv',
input: { tableName: 'ddb_test_table', index: 'awesomeness' },
Expand All @@ -13,6 +14,11 @@ const service = new ddbCsv({
},
filterExpressionMap: { name: 'frank' }
},
rowPredicate: (row, context) => {
const id = uuidv5(`${row['hash']}_${row['range']}`);
context.prependHeader('id'); // safe to call for each row, headers are stored as a Set
return [id, ...row];
}
});

await service.exec();
Expand All @@ -28,4 +34,4 @@ await service.exec();
+ *query* - a simplified mapping to generate `KeyConditionExpression` and `FilterExpression`. See example above - key/val pair generates a simple equality condition, use an object to specify operator and value if needed.
+ *keyCondExpressionMap*
+ *filterExpressionMap*

+ *rowPredicate* - (row, context) => row - a function that is called for each row. See example above - use the `context` methods to modify headers as needed.
33 changes: 21 additions & 12 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,38 @@ type Params = {
index?: ScanCommandInput['IndexName'];
limit?: ScanCommandInput['Limit'];
};
target: string | WriteStream;
query?: {
keyCondExpressionMap?: KeyCondExpressionMap;
filterExpressionMap?: FilterExpressionMap;
};
target: string | WriteStream;
rowPredicate?: (data: any, context: Dynamocsv) => any;
};

export default class Dynamocsv {
private readonly input: Params['input'];
private readonly query: Params['query'];
private readonly targetStream: WriteStream;
private readonly rowPredicate: ((data: any, context: Dynamocsv) => any) | undefined;
private client: DynamoDBClient;
private writeCount: number;
private readonly targetStream: WriteStream;
private readonly headers: Set<string>;
private headers: Set<string>;
private rows: any[];

constructor({ client, target, input, query }: Params) {
constructor({ client, target, input, query, rowPredicate }: Params) {
this.client = client;
this.input = { ...input, limit: input.limit || DEFAULT_QUERY_LIMIT };
this.query = query;
this.writeCount = 0;
this.headers = new Set();
this.rows = [];
this.targetStream = target instanceof WriteStream ? target : fs.createWriteStream(target, { flags: 'a' });
this.rowPredicate = rowPredicate;
}

private writeToTarget(): void {
let endData = csv.unparse({
fields: [...this.headers],
fields: [...this.headers.values()],
data: this.rows,
});

Expand All @@ -59,8 +62,14 @@ export default class Dynamocsv {
this.rows = [];
}

private addHeader(header: string): void {
appendHeader(header: string): Set<string> {
if (!this.headers.has(header)) this.headers.add(header);
return this.headers;
}

prependHeader(header: string): Set<string> {
if (!this.headers.has(header)) this.headers = new Set([header, ...this.headers]);
return this.headers;
}

/**
Expand All @@ -82,13 +91,13 @@ export default class Dynamocsv {
this.rows = result.Items
? result.Items.map((item) => {
const row: { [p: string]: any } = {};
const data = unmarshall(item);
Object.keys(data).forEach((key) => {
this.addHeader(key.trim());
const val = data[key];
row[key] = typeof val === 'object' ? JSON.stringify(val) : val;
const cols = unmarshall(item);
Object.keys(cols).forEach((header) => {
this.appendHeader(header.trim());
const val = cols[header];
row[header] = typeof val === 'object' ? JSON.stringify(val) : val;
});
return row;
return this.rowPredicate ? this.rowPredicate(row, this) : row;
})
: [];

Expand Down
Empty file added test.ts
Empty file.

0 comments on commit 6556a95

Please sign in to comment.