Skip to content

Commit

Permalink
Merge pull request #59 from linkedconnections/development
Browse files Browse the repository at this point in the history
0.8.2 release
  • Loading branch information
pietercolpaert authored Oct 16, 2018
2 parents bef98ca + 6b44ebf commit 6a7d2e3
Show file tree
Hide file tree
Showing 10 changed files with 155 additions and 140 deletions.
3 changes: 3 additions & 0 deletions bin/gtfs2lc-sort.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@
sed 's/\r//' trips.txt > trips_unix.txt; mv trips_unix.txt trips.txt &
sed 's/\r//' calendar.txt > calendar_unix.txt; mv calendar_unix.txt calendar.txt &
sed 's/\r//' calendar_dates.txt > calendar_dates_unix.txt; mv calendar_dates_unix.txt calendar_dates.txt ;
sed 's/\r//' routes.txt > routes_unix.txt; mv routes_unix.txt routes.txt ;

} ;
echo Removing UTF-8 artifacts in directory $1;
{
sed '1s/^\xEF\xBB\xBF//' stop_times.txt > stop_times_unix.txt; mv stop_times_unix.txt stop_times.txt &
sed '1s/^\xEF\xBB\xBF//' trips.txt > trips_unix.txt; mv trips_unix.txt trips.txt &
sed '1s/^\xEF\xBB\xBF//' calendar.txt > calendar_unix.txt; mv calendar_unix.txt calendar.txt &
sed '1s/^\xEF\xBB\xBF//' calendar_dates.txt > calendar_dates_unix.txt; mv calendar_dates_unix.txt calendar_dates.txt ;
sed '1s/^\xEF\xBB\xBF//' routes.txt > routes_unix.txt; mv routes_unix.txt routes.txt ;
} ;
echo Sorting files in directory $1;
{ head -n 1 stop_times.txt ; tail -n +2 stop_times.txt | sort -t , -k1,1 -k5,5n ; } > stop_times2.txt ; mv stop_times2.txt stop_times.txt &
Expand Down
6 changes: 4 additions & 2 deletions bin/gtfs2lc.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,12 @@ mapper.resultStream(program.path, function (stream) {
});
} else if (program.format === 'csv') {
//print header
console.log('"id","departureStop","departureTime","arrivalStop","arrivalTime","trip","route"');
console.error('The CSV output is not using a Linked Data format – jsonld is the preferred format.');
console.log('"id","departureStop","departureTime","arrivalStop","arrivalTime","trip","route","headsign"');
var count = 0;

stream.on('data', function (connection) {
console.log(count + ',' + connection["departureStop"] + ',' + connection["departureTime"].toISOString() + ',' + connection["arrivalStop"] + ',' + connection["arrivalTime"].toISOString() + ',' + connection["trip"] + ',' + connection["route"]);
console.log(count + ',' + connection["departureStop"] + ',' + connection["departureTime"].toISOString() + ',' + connection["arrivalStop"] + ',' + connection["arrivalTime"].toISOString() + ',' + connection["trip"]["trip_id"] + ',' + connection.trip.route.route_id + ',"' + connection.headsign + '"');
count ++;
});
} else if ([,'jsonld','mongold'].indexOf(program.format) > -1) {
Expand Down
10 changes: 6 additions & 4 deletions lib/Connections2JSONLD.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ var Connections2JSONLD = function (baseUris, context) {
"@id": "http://semweb.mmlab.be/ns/linkedconnections#arrivalStop"
},
"departureTime": "http://semweb.mmlab.be/ns/linkedconnections#departureTime",
"arrivalTime": "http://semweb.mmlab.be/ns/linkedconnections#arrivalTime"
"arrivalTime": "http://semweb.mmlab.be/ns/linkedconnections#arrivalTime",
"direction": "gtfs:headsign"
}
};

Expand All @@ -47,8 +48,9 @@ Connections2JSONLD.prototype._transform = function (connection, encoding, done)
"gtfs:route": this._uris.getRouteId(connection)
};

if(connection.trip.trip_headsign) {
lc["direction"] = connection.trip.trip_headsign;
//the headsign is already the result here of earlier checking whether there’s a trip headsign or a route headsign if connection headsign was not set. It can be used reliably
if (connection.headsign) {
lc["direction"] = connection.headsign;
}

var pickupType = types[0];
Expand All @@ -60,7 +62,7 @@ Connections2JSONLD.prototype._transform = function (connection, encoding, done)
var dropOffType = types[0];
if (connection['drop_off_type'] && connection['drop_off_type'] !== null) {
dropOffType = types[connection['drop_off_type']];
lc["gtfs:dropOffType"] = pickupType;
lc["gtfs:dropOffType"] = dropOffType;
}

done(null, lc);
Expand Down
19 changes: 12 additions & 7 deletions lib/ConnectionsBuilder.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ ConnectionsBuilder.prototype._transform = function (connectionRule, encoding, do
// * a trip: { route_id: 'AAMV',service_id: 'WE',trip_id: 'AAMV4',trip_headsign: 'to Airport',direction_id: '1', block_id: '', shape_id: '' }
var departureDFM = moment.duration(connectionRule['departure_dfm']);
var arrivalDFM = moment.duration(connectionRule['arrival_dfm']);
var self = this;
this._tripsdb.get(connectionRule['trip_id'], function (error, trip) {
this._tripsdb.get(connectionRule['trip_id'], (error, trip) => {
if (!error) {
self._routesdb.get(trip['route_id'], function (error, route) {
this._routesdb.get(trip['route_id'], (error, route) => {
if (!error) {
self._servicesdb.get(trip['service_id'], function (error, service) {
this._servicesdb.get(trip['service_id'], (error, service) => {
if (!error) {
for (var i in service) {
trip["route"] = route;
Expand Down Expand Up @@ -64,15 +63,21 @@ ConnectionsBuilder.prototype._transform = function (connectionRule, encoding, do
connection['pickup_type'] = connectionRule['pickup_type'];
}

self.push(connection);
this.push(connection);
}
} else {
console.error('Did not find this service id in calendar or calendar dates: ', trip);
}
done();
});

} else {
console.error('Did not find this route id in routes.txt: ', trip, error);
done();
}
});

} else {
console.error('Did not find this trip id in trips.txt: ', connectionRule);
done();
}
});
};
Expand Down
30 changes: 15 additions & 15 deletions lib/StreamIterator.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
var util = require('util'),
moment = require('moment'),
EventEmitter = require('events').EventEmitter;
const util = require('util'),
moment = require('moment'),
{AsyncIterator} = require('asynciterator'),
EventEmitter = require('events').EventEmitter;

var StreamIterator = function (stream, interval) {
this._interval = interval;
this._stream = stream;
this._currentObject = null;
this._currentCB = null;
var self = this;
this._iterator = AsyncIterator.wrap(stream);
this._streamEnded = false;
this._stream.on("end", function () {
self._streamEnded = true;
if (self._currentCB) {
self._currentCB();
this._currentCB;
this._iterator.on("end", () => {
this._streamEnded = true;
if (this._currentCB) {
this._currentCB();
}
});
};
Expand All @@ -24,13 +23,13 @@ StreamIterator.prototype.getCurrentObject = function () {
};

StreamIterator.prototype.next = function (callback) {
var self = this;
this._currentCB = callback;
var object = this._stream.read();
var object = this._iterator.read();
if (!object && !this._streamEnded) {
this._stream.once("readable", function () {
self.next(callback);
this._iterator.once("readable", () => {
this.next(callback);
});
//Filter our object on the date property and check whether it’s in our interval.
} else if (object && this._interval.inclusiveBetween(moment(object['date'], 'YYYYMMDD'))) {
this._currentObject = object;
callback(object);
Expand All @@ -45,3 +44,4 @@ StreamIterator.prototype.next = function (callback) {
};

module.exports = StreamIterator;

152 changes: 82 additions & 70 deletions lib/gtfs2connections.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
var csv = require('fast-csv'),
ConnectionRules = require('./stoptimes/st2c.js'),
ConnectionsBuilder = require('./ConnectionsBuilder.js'),
Services = require('./services/calendar.js'),
DateInterval = require('./DateInterval.js'),
Store = require('./stores/Store.js'),
through2 = require('through2'),
moment = require('moment'),
fs = require('fs');
const csv = require('fast-csv'),
ConnectionRules = require('./stoptimes/st2c.js'),
ConnectionsBuilder = require('./ConnectionsBuilder.js'),
Services = require('./services/calendar.js'),
DateInterval = require('./DateInterval.js'),
Store = require('./stores/Store.js'),
{AsyncIterator} = require('asynciterator'),
moment = require('moment'),
fs = require('fs');

var Mapper = function (options) {
this._options = options;
this._options.interval = new DateInterval(options.startDate, options.endDate);
if (!this._options.store) {
this._options.store = 'MemStore';
}
this._options = options;
this._options.interval = new DateInterval(options.startDate, options.endDate);
if (!this._options.store) {
this._options.store = 'MemStore';
}
};

/**
Expand All @@ -26,66 +26,78 @@ if (!this._options.store) {
* Caveat: coding this with numerous callbacks and streams, makes this code not chronologically ordered.
*/
Mapper.prototype.resultStream = function (path, done) {
var routes = fs.createReadStream(path + '/routes.txt', { encoding: 'utf8', objectMode: true }).pipe(csv({ objectMode: true, headers: true })).on('error', function (e) {
console.error(e);
});

var trips = fs.createReadStream(path + '/trips.txt', { encoding: 'utf8', objectMode: true }).pipe(csv({ objectMode: true, headers: true })).on('error', function (e) {
console.error(e);
});
var calendarDates = fs.createReadStream(path + '/calendar_dates.txt', { encoding: 'utf8', objectMode: true }).pipe(csv({ objectMode: true, headers: true })).on('error', function (e) {
console.error(e);
});
var services = fs.createReadStream(path + '/calendar.txt', { encoding: 'utf8', objectMode: true }).pipe(csv({ objectMode: true, headers: true })).pipe(new Services(calendarDates, this._options)).on('error', function (e) {
console.error(e);
});
//Preparations for step 4
var connectionRules = fs.createReadStream(path + '/stop_times.txt', { encoding: 'utf8', objectMode: true }).pipe(csv({ objectMode: true, headers: true })).pipe(new ConnectionRules()).on('error', function (e) {
console.error(e);
});

//Step 2 & 3: store in leveldb in 3 hidden directories, or in memory, depending on the options
var routesdb = Store(path + '/.routes', this._options.store);
var tripsdb = Store(path + '/.trips', this._options.store);
var servicesdb = Store(path + '/.services', this._options.store);
var count = 0;
var self = this;

var finished = function () {
count++;
//wait for the 3 streams to finish (services and trips) to write to the stores
if (count === 3) {
console.error("Indexing services and trips succesful!");
//Step 4 and 5: let's create our connections!
done(connectionRules.pipe(new ConnectionsBuilder(tripsdb, servicesdb, routesdb, self._options)));
}
};
var routes = fs.createReadStream(path + '/routes.txt', { encoding: 'utf8', objectMode: true }).pipe(csv({ objectMode: true, headers: true })).on('error', function (e) {
console.error(e);
});

services.pipe(through2.obj(function (service, encoding, doneService) {
if (service['service_id']) {
servicesdb.put(service['service_id'], service['dates'], doneService);
}
})).on('error', function (e) {
console.error(e);
}).on('finish', finished);
var trips = fs.createReadStream(path + '/trips.txt', { encoding: 'utf8', objectMode: true }).pipe(csv({ objectMode: true, headers: true })).on('error', function (e) {
console.error(e);
});
var calendarDates = fs.createReadStream(path + '/calendar_dates.txt', { encoding: 'utf8', objectMode: true }).pipe(csv({ objectMode: true, headers: true })).on('error', function (e) {
console.error(e);
});
var services = fs.createReadStream(path + '/calendar.txt', { encoding: 'utf8', objectMode: true }).pipe(csv({ objectMode: true, headers: true })).pipe(new Services(calendarDates, this._options)).on('error', function (e) {
console.error(e);
});
//Preparations for step 4
//Step 2 & 3: store in leveldb in 3 hidden directories, or in memory, depending on the options
var routesdb = Store(path + '/.routes', this._options.store);
var tripsdb = Store(path + '/.trips', this._options.store);
var servicesdb = Store(path + '/.services', this._options.store);
var count = 0;
var options = this._options;
var finished = function () {
count++;
//wait for the 3 streams to finish (services, trips and routes) to write to the stores
if (count === 3) {
console.error("Indexing services and trips succesful!");
//Step 4 and 5: let's create our connections!
var connectionRules = fs.createReadStream(path + '/stop_times.txt', { encoding: 'utf8', objectMode: true }).pipe(csv({ objectMode: true, headers: true })).pipe(new ConnectionRules()).on('error', function (e) {
console.error('Hint: Did you run gtfs2lc-sort?');
console.error(e);
});
let connectionsBuilder = new ConnectionsBuilder(tripsdb, servicesdb, routesdb, options);
let connectionsStream = connectionRules.pipe(connectionsBuilder);
done(connectionsStream);
}
};

trips.pipe(through2.obj(function (trip, encoding, doneTrip) {
if (trip['trip_id']) {
tripsdb.put(trip['trip_id'], trip, doneTrip);
}
})).on('error', function (e) {
console.error(e);
}).on('finish', finished);
var servicesIterator = AsyncIterator.wrap(services);
var tripsIterator = AsyncIterator.wrap(trips);
var routesIterator = AsyncIterator.wrap(routes);

servicesIterator.transform((service, doneService) =>{
if (service['service_id']) {
servicesdb.put(service['service_id'], service['dates'], doneService);
} else {
doneService();
}
}).on('data', () => {
}).on('error', function (e) {
console.error(e);
}).on('end', finished);

tripsIterator.transform((trip, doneTrip) => {
if (trip['trip_id']) {
tripsdb.put(trip['trip_id'], trip, doneTrip);
} else {
doneTrip();
}
}).on('data', () => {
}).on('error', function (e) {
console.error(e);
}).on('end', finished);

routes.pipe(through2.obj(function (route, encoding, doneRoute) {
if (route['route_id']) {
routesdb.put(route['route_id'], route, doneRoute);
}
})).on('error', function (e) {
console.error(e);
}).on('finish', finished);

routesIterator.transform((route, doneRoute) => {
if (route['route_id']) {
routesdb.put(route['route_id'], route, doneRoute);
} else {
doneRoute();
}
}).on('data', () => {
}).on('error', function (e) {
console.error(e);
}).on('end', finished);
};

module.exports = Mapper;
42 changes: 14 additions & 28 deletions lib/stoptimes/st2c.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,21 @@ util.inherits(StopTimesToConnections, Transform);
* When ordered, we can just take 2 gtfs:StopTimes and bring them together in 1 "connection rule", which is an intermediate data structure we define here
*/
StopTimesToConnections.prototype._transform = function (stopTime, encoding, done) {
// If both pick_up_type of previousStopTime and drop_off_type of current stoptime are 1, then discard this stoptime and look further, but now for the stoptime that continues from this "via"
if (stopTime['drop_off_type'] !== null && stopTime['drop_off_type'] == 1 && stopTime['pickup_type'] !== null && stopTime['pickup_type'] == 1) {
if (this._previousStopTime && !this._previousStopTime['vias']) {
this._previousStopTime['vias'] = [stopTime['stop_id']];
} else if (this._previousStopTime) {
this._previousStopTime['vias'].push(stopTime['stop_id']);
}
} else {
if (!this._previousStopTime) {
this._previousStopTime = stopTime;
} else {
if (this._previousStopTime['trip_id'] === stopTime['trip_id']) {
//dfm is "duration from midnight" (see GTFS reference)
this.push({
trip_id: this._previousStopTime['trip_id'],
arrival_dfm: stopTime['arrival_time'],
departure_dfm: this._previousStopTime['departure_time'],
departure_stop: this._previousStopTime['stop_id'],
arrival_stop : stopTime['stop_id'],
departure_stop_headsign: this._previousStopTime['stop_headsign'],
arrival_stop_headsign: stopTime['stop_headsign'],
pickup_type: this._previousStopTime['pickup_type'],
drop_off_type: stopTime['drop_off_type'],
vias: this._previousStopTime['vias']
});
}
}
this._previousStopTime = stopTime;
if (this._previousStopTime && this._previousStopTime['trip_id'] === stopTime['trip_id']) {
//dfm is "duration from midnight" (see GTFS reference)
this.push({
trip_id: this._previousStopTime['trip_id'],
arrival_dfm: stopTime['arrival_time'],
departure_dfm: this._previousStopTime['departure_time'],
departure_stop: this._previousStopTime['stop_id'],
arrival_stop : stopTime['stop_id'],
departure_stop_headsign: this._previousStopTime['stop_headsign'],
arrival_stop_headsign: stopTime['stop_headsign'],
pickup_type: this._previousStopTime['pickup_type'],
drop_off_type: stopTime['drop_off_type'],
});
}
this._previousStopTime = stopTime;
done();
};

Expand Down
Loading

0 comments on commit 6a7d2e3

Please sign in to comment.