From 2a177425e907809c10b12e91de4fa1823d16fc33 Mon Sep 17 00:00:00 2001 From: Doug Martin Date: Wed, 14 May 2014 23:24:28 -0500 Subject: [PATCH] v0.3.0 * You can now specify `objectMode` when parsing a csv which will cause `data` events to have an object emitted. * You can now pipe directly to the stream returned from `createWriteStream` * You can now transform csvs by piping output from parsing into a formatter. --- .jshintrc | 1 - History.md | 6 +++ README.md | 54 ++++++++++++++++++-------- docs/History.html | 6 +++ docs/index.html | 37 +++++++++++++----- lib/formatter.js | 31 +++++++++++---- lib/parser_stream.js | 57 ++++++++++++++++++++++----- package.json | 16 ++++---- test/assets/test22.csv | 3 ++ test/fast-csv.test.js | 88 ++++++++++++++++++++++++++++++++++++++++++ 10 files changed, 246 insertions(+), 53 deletions(-) create mode 100644 test/assets/test22.csv diff --git a/.jshintrc b/.jshintrc index e2d79223..bb9ba3ad 100644 --- a/.jshintrc +++ b/.jshintrc @@ -33,7 +33,6 @@ "boss": false, "debug": false, "eqnull": true, - "es5": true, "esnext": true, "evil": false, "expr": true, diff --git a/History.md b/History.md index d3659689..819b9f3a 100644 --- a/History.md +++ b/History.md @@ -1,3 +1,9 @@ +# v0.3.0 + +* You can now specify `objectMode` when parsing a csv which will cause `data` events to have an object emitted. +* You can now pipe directly to the stream returned from `createWriteStream` +* You can now transform csvs by piping output from parsing into a formatter. + # v0.2.5 * Fixed issue where not all rows are emitted when using `pause` and `resume` diff --git a/README.md b/README.md index c77f7dee..0fbafc78 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,4 @@ - - - - [![build status](https://secure.travis-ci.org/C2FO/fast-csv.png)](http://travis-ci.org/C2FO/fast-csv) +[![build status](https://secure.travis-ci.org/C2FO/fast-csv.png)](http://travis-ci.org/C2FO/fast-csv) # Fast-csv This is a library that provides CSV parsing and formatting. @@ -18,6 +15,7 @@ This is a library that provides CSV parsing and formatting. All methods accept the following `options` +* `objectMode=true`: Ensure that `data` events have an object emitted rather than the stringified version set to false to have a stringified buffer. * `headers=false`: Ste to true if you expect the first line of your `CSV` to contain headers, alternatly you can specify an array of headers to use. * `ignoreEmpty=false`: If you wish to ignore empty rows. * `delimiter=','`: If your data uses an alternate delimiter such as `;` or `\t`. @@ -33,7 +31,6 @@ All methods accept the following `options` **events** -`parse-error`: Emitted if there was an error parsing a row. `record`: Emitted when a record is parsed. `data-invalid`: Emitted if there was invalid row encounted, **only emitted if the `validate` function is used**. `data`: Emitted with the `stringified` version of a record. @@ -56,7 +53,7 @@ var csvStream = csv() stream.pipe(csvStream); ``` -**`.fromPath(path[, options])** +**`.fromPath(path[, options])`** This method parses a file from the specified path. @@ -73,7 +70,7 @@ csv }); ``` -**`.fromString(string[, options])** +**`.fromString(string[, options])`** This method parses a string @@ -94,7 +91,7 @@ csv }); ``` -**`.fromStream(stream[, options])** +**`.fromStream(stream[, options])`** This accepted a readable stream to parse data from. @@ -223,7 +220,7 @@ This is the lowest level of the write methods, it creates a stream that can be u ```javascript var csvStream = csv.createWriteStream({headers: true}), - writableStream = fs.createWritableStream("my.csv"); + writableStream = fs.createWriteStream("my.csv"); writableStream.on("finish", function(){ console.log("DONE!"); @@ -333,6 +330,37 @@ csv.writeToString([ ], {headers: true}); //"a,b\na1,b1\na2,b2\n" ``` +## Piping from Parser to Writer + +You can use `fast-csv` to pipe the output from a parsed CSV to a transformed CSV by setting the parser to `objectMode` and using `createWriteStream`. + +```javascript +csv + .fromPath("in.csv", {headers: true}) + .pipe(csv.createWriteStream({headers: true})) + .pipe(fs.createWriteStream("out.csv", {encoding: "utf8"})); +``` + +When piping from a parser to a formatter the transforms are maintained also. + + +```javascript +csv + .fromPath("in.csv", {headers: true}) + .transform(function(obj){ + return { + name: obj.Name, + address: obj.Address, + emailAddress: obj.Email_Address, + verified: obj.Verified + }; + }) + .pipe(csv.createWriteStream({headers: true})) + .pipe(fs.createWriteStream("out.csv", {encoding: "utf8"})); +``` + +The output will contain formatted result from the transform function. + ## Benchmarks `Parsing 20000 records AVG over 3 runs` @@ -371,11 +399,3 @@ MIT * Code: `git clone git://github.com/C2FO/fast-csv.git` * Website: * Twitter: [http://twitter.com/c2fo](http://twitter.com/c2fo) - 877.465.4045 - -##Namespaces - - - - - -##Classes diff --git a/docs/History.html b/docs/History.html index 3c30447c..af5630b0 100644 --- a/docs/History.html +++ b/docs/History.html @@ -176,6 +176,12 @@ +

v0.3.0

+
    +
  • You can now specify objectMode when parsing a csv which will cause data events to have an object emitted.
  • +
  • You can now pipe directly to the stream returned from createWriteStream
  • +
  • You can now transform csvs by piping output from parsing into a formatter.
  • +

v0.2.5

  • Fixed issue where not all rows are emitted when using pause and resume
  • diff --git a/docs/index.html b/docs/index.html index b0be0e56..ec070f20 100644 --- a/docs/index.html +++ b/docs/index.html @@ -177,8 +177,7 @@ -

    -

    build status

    +

    build status

    Fast-csv

    This is a library that provides CSV parsing and formatting.

    NOTE As of v0.2.0 fast-csv supports multi-line values.

    @@ -188,6 +187,7 @@

    Usage

    Parsing

    All methods accept the following options

      +
    • objectMode=true: Ensure that data events have an object emitted rather than the stringified version set to false to have a stringified buffer.
    • headers=false: Ste to true if you expect the first line of your CSV to contain headers, alternatly you can specify an array of headers to use.
    • ignoreEmpty=false: If you wish to ignore empty rows.
    • delimiter=',': If your data uses an alternate delimiter such as ; or \t.
        @@ -207,8 +207,7 @@

        Parsing

      events

      -

      parse-error: Emitted if there was an error parsing a row. -record: Emitted when a record is parsed. +

      record: Emitted when a record is parsed. data-invalid: Emitted if there was invalid row encounted, only emitted if the validate function is used. data: Emitted with the stringified version of a record.

      ([options])

      @@ -224,7 +223,7 @@

      Parsing

      }); stream.pipe(csvStream); -

      `.fromPath(path[, options])

      +

      .fromPath(path[, options])

      This method parses a file from the specified path.

      var csv = require("fast-csv");
       
      @@ -236,7 +235,7 @@ 

      Parsing

      .on("end", function(){ console.log("done"); });
      -

      `.fromString(string[, options])

      +

      .fromString(string[, options])

      This method parses a string

      var csv = require("fast-csv");
       
      @@ -252,7 +251,7 @@ 

      Parsing

      .on("end", function(){ console.log("done"); });
      -

      `.fromStream(stream[, options])

      +

      .fromStream(stream[, options])

      This accepted a readable stream to parse data from.

      var stream = fs.createReadStream("my.csv");
       
      @@ -342,7 +341,7 @@ 

      Formatting

      createWriteStream(options)

      This is the lowest level of the write methods, it creates a stream that can be used to create a csv of unknown size and pipe to an output csv.

      var csvStream = csv.createWriteStream({headers: true}),
      -    writableStream = fs.createWritableStream("my.csv");
      +    writableStream = fs.createWriteStream("my.csv");
       
       writableStream.on("finish", function(){
         console.log("DONE!");
      @@ -417,6 +416,26 @@ 

      Formatting

      {a: "a1", b: "b1"}, {a: "a2", b: "b2"} ], {headers: true}); //"a,b\na1,b1\na2,b2\n"
      +

      Piping from Parser to Writer

      +

      You can use fast-csv to pipe the output from a parsed CSV to a transformed CSV by setting the parser to objectMode and using createWriteStream.

      +
      csv
      +   .fromPath("in.csv", {headers: true})
      +   .pipe(csv.createWriteStream({headers: true}))
      +   .pipe(fs.createWriteStream("out.csv", {encoding: "utf8"}));
      +

      When piping from a parser to a formatter the transforms are maintained also.

      +
      csv
      +   .fromPath("in.csv", {headers: true})
      +   .transform(function(obj){
      +        return {
      +            name: obj.Name,
      +            address: obj.Address,
      +            emailAddress: obj.Email_Address,
      +            verified: obj.Verified
      +        };
      +   })
      +   .pipe(csv.createWriteStream({headers: true}))
      +   .pipe(fs.createWriteStream("out.csv", {encoding: "utf8"}));
      +

      The output will contain formatted result from the transform function.

      Benchmarks

      Parsing 20000 records AVG over 3 runs

      fast-csv: 198.67ms
      @@ -438,8 +457,6 @@ 

      Meta

    • Website: http://c2fo.com
    • Twitter: http://twitter.com/c2fo - 877.465.4045
    -

    Namespaces

    -

    Classes


    diff --git a/lib/formatter.js b/lib/formatter.js index 15ed5aaa..a91662ed 100644 --- a/lib/formatter.js +++ b/lib/formatter.js @@ -1,8 +1,10 @@ var fs = require("fs"), + util = require("util"), extended = require("./extended"), isUndefinedOrNull = extended.isUndefinedOrNull, hash = extended.hash, stream = require("stream"), + Transform = stream.Transform, LINE_BREAK = extended.LINE_BREAK; function createFormatter(options) { @@ -73,7 +75,7 @@ function wrapWriter(writer, options) { hasHeaders = extended.has(options, "headers") ? options.headers : true, parsedHeaders = hasHeaders ? false : true, headersLength = 0, i = -1, - writerWrite = writer.push, headers, + writerWrite = writer.write, headers, buffer = [], totalCount = 0, MAX_BUFFER_SIZE = options.maxBuffer || 100000; @@ -82,7 +84,7 @@ function wrapWriter(writer, options) { if (item) { var isHash = !extended.isArray(item), vals; if (!parsedHeaders) { - totalCount++ + totalCount++; parsedHeaders = true; if (isHash) { headers = hash.keys(item); @@ -117,17 +119,32 @@ function wrapWriter(writer, options) { writerWrite.call(writer, new Buffer(buffer.join("")).toString("utf8")); buffer.length = 0; } - writerWrite.call(writer, null); + writer.end(); } }; return writer; } +function CsvTransformStream(opts) { + Transform.call(this, opts); + wrapWriter(this, opts); +} + +util.inherits(CsvTransformStream, Transform); + +extended(CsvTransformStream).extend({ + + _transform: function (str, encoding, cb) { + cb(null, str); + }, + _flush: function (cb) { + this.write(null); + cb(null); + } +}); + function createWriteStream(options) { - var writer = new stream.Readable(); - writer._read = function () { - }; - return wrapWriter(writer, options); + return new CsvTransformStream(options); } function write(arr, options) { diff --git a/lib/parser_stream.js b/lib/parser_stream.js index 24592896..b589b39b 100644 --- a/lib/parser_stream.js +++ b/lib/parser_stream.js @@ -1,22 +1,42 @@ var extended = require("./extended"), isUndefined = extended.isUndefined, - EventEmitter = require("events").EventEmitter, util = require("util"), out = process.stdout, stream = require("stream"), EMPTY = /^\s*(?:''|"")?\s*(?:,\s*(?:''|"")?\s*)*$/, - VALUE = /([^,'"\s\\]*(?:\s+[^,'"\s\\]+)*)/, - LINE_SPLIT = /[\r\n]+/, DEFAULT_DELIMITER = ",", createParser = require("./parser"); +function spreadArgs(f, args, scope) { + var ret; + switch ((args || []).length) { + case 0: + ret = f.call(scope); + break; + case 1: + ret = f.call(scope, args[0]); + break; + case 2: + ret = f.call(scope, args[0], args[1]); + break; + case 3: + ret = f.call(scope, args[0], args[1], args[2]); + break; + default: + ret = f.apply(scope, args); + } + return ret; +} + + function ParserStream(options) { + options = options || {}; + options.objectMode = extended.has(options, "objectMode") ? options.objectMode : true stream.Transform.call(this, options); this.lines = ""; this._parsedHeaders = false; this._rowCount = -1; this._emitData = false; - options = options || {}; var delimiter; if (extended.has(options, "delimiter")) { delimiter = options.delimiter; @@ -31,6 +51,7 @@ function ParserStream(options) { this.parser = createParser(options); this._headers = options.headers; this._ignoreEmpty = options.ignoreEmpty; + this.__objectMode = options.objectMode; this.__buffered = []; return this; } @@ -39,16 +60,17 @@ util.inherits(ParserStream, stream.Transform); var origOn = ParserStream.prototype.on, origPause = ParserStream.prototype.pause, - origResume = ParserStream.prototype.resume; + origResume = ParserStream.prototype.resume, + origEmit = ParserStream.prototype.emit; function pause() { - origPause.apply(this, arguments); + spreadArgs(origPause, arguments, this); this.paused = true; this.pause = pause; } function resume() { - origResume.apply(this, arguments); + spreadArgs(origResume, arguments, this); this.paused = false; if (this.__pausedDone) { this.__pausedDone(); @@ -60,6 +82,10 @@ extended(ParserStream).extend({ __pausedDone: null, + __endEmitted: false, + + __emittedData: false, + __handleLine: function __parseLineData(line, index, ignore) { var ignoreEmpty = this._ignoreEmpty; if (extended.isBoolean(ignoreEmpty) && ignoreEmpty && (!line || EMPTY.test(line.join("")))) { @@ -129,7 +155,7 @@ extended(ParserStream).extend({ __emitRecord: function (dataRow, count) { this.emit("record", dataRow, count); if (this._emitData) { - this.push(JSON.stringify(dataRow)); + this.push(this.__objectMode ? dataRow : JSON.stringify(dataRow)); } }, @@ -152,7 +178,7 @@ extended(ParserStream).extend({ this._parse(this.lines, false); } //increment row count so we aren't 0 based - this.emit("end", ++this._rowCount); + this.emit("end"); callback(); }, @@ -170,6 +196,17 @@ extended(ParserStream).extend({ } }, + emit: function (event) { + if (event === "end") { + if (!this.__endEmitted) { + this.__endEmitted = true; + spreadArgs(origEmit, ["end", ++this._rowCount], this); + } + } else { + spreadArgs(origEmit, arguments, this); + } + }, + resume: function () { if (this.paused) { this.paused = false; @@ -199,7 +236,7 @@ extended(ParserStream).extend({ if (evt === "data" || evt === "readable") { this._emitData = true; } - origOn.apply(this, arguments); + spreadArgs(origOn, arguments, this); return this; }, diff --git a/package.json b/package.json index 46cbc961..ba5b446b 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "fast-csv", - "version": "0.2.5", + "version": "0.3.0", "description": "CSV parser and writer", "main": "index.js", "scripts": { @@ -25,15 +25,15 @@ "it": "~0.2.6", "grunt-it": "~0.3.1", "grunt": "~0.4.1", - "grunt-contrib-jshint": "~0.4.3" - }, - "dependencies": { - "is-extended": "0.0.8", - "object-extended": "0.0.5", - "extended": "0.0.4", - "string-extended": "0.0.7" + "grunt-contrib-jshint": "~0.10.0" }, "engines": { "node": ">=0.10" + }, + "dependencies": { + "is-extended": "0.0.10", + "object-extended": "0.0.7", + "extended": "0.0.6", + "string-extended": "0.0.8" } } diff --git a/test/assets/test22.csv b/test/assets/test22.csv new file mode 100644 index 00000000..6ca6d1a3 --- /dev/null +++ b/test/assets/test22.csv @@ -0,0 +1,3 @@ +a,b +a1,b1 +a2,b2 \ No newline at end of file diff --git a/test/fast-csv.test.js b/test/fast-csv.test.js index d0ac7427..7c6d7a7f 100644 --- a/test/fast-csv.test.js +++ b/test/fast-csv.test.js @@ -162,6 +162,50 @@ it.describe("fast-csv", function (it) { }); }); + + it.should("emit data as a buffer if objectMode is false", function (next) { + var actual = []; + csv + .fromPath(path.resolve(__dirname, "./assets/test4.csv"), {headers: true, objectMode: false}) + .on("data", function (data) { + actual.push(JSON.parse(data + "")); + }). + on("end", function () { + assert.deepEqual(actual, expected4); + assert.equal(9, actual.length); + next(); + }); + }); + + it.should("emit data as an object if objectMode is true", function (next) { + var actual = []; + csv + .fromPath(path.resolve(__dirname, "./assets/test4.csv"), {headers: true, objectMode: true}) + .on("data", function (data) { + actual.push(data); + }) + .on("end", function (count) { + assert.deepEqual(actual, expected4); + assert.equal(count, actual.length); + next(); + }); + }); + + it.should("emit data as an object if objectMode is not specified", function (next) { + var actual = []; + csv + .fromPath(path.resolve(__dirname, "./assets/test4.csv"), {headers: true, objectMode: true}) + .on("data", function (data) { + actual.push(data); + }) + .on("end", function (count) { + assert.deepEqual(actual, expected4); + assert.equal(count, actual.length); + next(); + }); + }); + + it.should("allow piping from a stream", function (next) { var actual = []; var stream = csv({headers: true}) @@ -735,4 +779,48 @@ it.describe("fast-csv", function (it) { stream.write(null); }); }); + + it.describe("piping from parser to formatter", function (it) { + + it.should("allow piping from a parser to a formatter", function (next) { + var writable = fs.createWriteStream(path.resolve(__dirname, "assets/test.csv"), {encoding: "utf8"}) + csv + .fromPath(path.resolve(__dirname, "./assets/test22.csv"), {headers: true, objectMode: true}) + .on("error", next) + .pipe(csv.createWriteStream({headers: true})) + .on("error", next) + .pipe(writable) + .on("error", next); + + writable + .on("finish", function () { + assert.equal(fs.readFileSync(path.resolve(__dirname, "assets/test.csv")).toString(), "a,b\na1,b1\na2,b2"); + fs.unlinkSync(path.resolve(__dirname, "assets/test.csv")); + next(); + }); + }); + + it.should("preserve transforms", function (next) { + var writable = fs.createWriteStream(path.resolve(__dirname, "assets/test.csv"), {encoding: "utf8"}) + csv + .fromPath(path.resolve(__dirname, "./assets/test22.csv"), {headers: true}) + .transform(function (obj) { + obj.a = obj.a + "-parsed"; + obj.b = obj.b + "-parsed"; + return obj; + }) + .on("error", next) + .pipe(csv.createWriteStream({headers: true})) + .on("error", next) + .pipe(writable) + .on("error", next); + + writable + .on("finish", function () { + assert.equal(fs.readFileSync(path.resolve(__dirname, "assets/test.csv")).toString(), "a,b\na1-parsed,b1-parsed\na2-parsed,b2-parsed"); + fs.unlinkSync(path.resolve(__dirname, "assets/test.csv")); + next(); + }); + }); + }); }); \ No newline at end of file