Skip to content

Commit

Permalink
Next iteration: all streams finish. TODO: step 4 and 5
Browse files Browse the repository at this point in the history
  • Loading branch information
Pieter Colpaert committed Nov 4, 2015
1 parent 7477fc1 commit e6d8c2b
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 14 deletions.
9 changes: 7 additions & 2 deletions lib/StreamIterator.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ var StreamIterator = function (stream) {
this._stream = stream;
this._currentCB = null;
var self = this;
this._streamEnded = false;
this._stream.on("end", function () {
self._streamEnded = true;
if (self._currentCB) {
self._currentCB();
}
Expand All @@ -18,12 +20,15 @@ StreamIterator.prototype.next = function (callback) {
var self = this;
this._currentCB = callback;
var object = this._stream.read();
if (!object) {
if (!object && !this._streamEnded) {
this._stream.once("readable", function () {
self.next(callback);
});
} else {
} else if (object && !this._streamEnded) {
callback(object);
} else {
//stream ended
callback(null);
}
};

Expand Down
21 changes: 12 additions & 9 deletions lib/gtfs2lc.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
var csv = require('fast-csv'),
Connections = require('./stoptimes/st2c.js'),
ConnectionRules = require('./stoptimes/st2c.js'),
ConnectionsBuilder = require('./ConnectionsBuilder.js'),
Services = require('./services/calendar.js'),
through2 = require('through2'),
level = require('level');
Expand All @@ -16,34 +17,36 @@ var Mapper = function (options) {
* Step 3: also index trips.txt in a leveldb on key trip_id
* Step 4: create a stream of connection rules from stop_times.txt
* Step 5: pipe this stream to something that expands everything into connections and returns this stream.
* Caveat: coding this with numerous callbacks and streams, makes this code not chronologically ordered.
*/
Mapper.prototype.resultStream = function (path, done) {
var trips = fs.createReadStream(path + '/trips.txt', {encoding:'utf8', objectMode: true}).pipe(csv({objectMode:true,headers: true}));
var calendarDates = fs.createReadStream(path + '/calendar_dates.txt', {encoding:'utf8', objectMode: true}).pipe(csv({objectMode:true,headers: true}));
var services = fs.createReadStream(path + '/calendar.txt', {encoding:'utf8', objectMode: true}).pipe(csv({objectMode:true,headers: true})).pipe(new Services(calendarDates));
var connectionRules = fs.createReadStream(path + '/stop_times.txt', {encoding:'utf8', objectMode: true}).pipe(csv({objectMode:true,headers: true})).pipe(new Connections());
//Preprations for step 4
var connectionRules = fs.createReadStream(path + '/stop_times.txt', {encoding:'utf8', objectMode: true}).pipe(csv({objectMode:true,headers: true})).pipe(new ConnectionRules());

//Step 2 & 3: store in leveldb
//Step 2 & 3: store in leveldb in 2 hidden directories
var tripsdb = level('.trips');
var servicesdb = level('.services');
var count = 0;
var finished = function () {
count ++;
console.log("finish");
//wait for the 2 functions to finish
//wait for the 2 streams to finish (services and trips) to write to the stores
if (count === 2) {
done();
console.error("Indexing services and trips succesful!");
//Step 4 and 5: let's create our connections!
done(connectionRules.pipe(new ConnectionsBuilder(tripsdb, servicesdb)));
}
};

services.pipe(through2.obj(function (service, encoding, doneService) {
servicesdb.put(service['service_id'], service['dates'], {valueEncoding: 'json'}, doneService);
})).on('finish', finished);
/* trips.pipe(through2.obj(function (trip, encoding, doneTrip) {

trips.pipe(through2.obj(function (trip, encoding, doneTrip) {
tripsdb.put(trip['trip_id'], trip, {valueEncoding: 'json'}, doneTrip);
})).on('finish', finished);
*/

};

Expand Down
8 changes: 5 additions & 3 deletions lib/services/calendar.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ CalendarToServices.prototype._transform = function (calendar, encoding, done) {
};

CalendarToServices.prototype._flush = function (done) {
//TODO: for some reason, it's not flushing...
//read the rest of the calendarDatesIterator
var self = this;
var recursiveCB = function (calendar) {
Expand All @@ -89,12 +88,15 @@ CalendarToServices.prototype._flush = function (done) {
self._currentCalendarDate = cd;
self._matchCalendarDates([], self._currentCalendarDate['service_id'], recursiveCB);
} else {
console.log('FLUSHED');
done();
}
});
};
this._matchCalendarDates([], this._currentCalendarDate['service_id'], recursiveCB);
if (this._currentCalendarDate) {
this._matchCalendarDates([], this._currentCalendarDate['service_id'], recursiveCB);
} else {
done();
}
};

module.exports = CalendarToServices;

0 comments on commit e6d8c2b

Please sign in to comment.