Skip to content

Commit

Permalink
Merge pull request #99 from linkedconnections/development
Browse files Browse the repository at this point in the history
v1.0.0
  • Loading branch information
julianrojas87 authored Feb 15, 2020
2 parents 1b2aba1 + 5f9db7a commit 8183a5f
Show file tree
Hide file tree
Showing 18 changed files with 997 additions and 419 deletions.
17 changes: 7 additions & 10 deletions bin/gtfs2lc-sort.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,16 @@ CURDIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )"
TRIPID_TRIPS=`head -n1 trips.txt | tr "," "\n" | grep -n "trip_id"| cut -d: -f1`
TRIPID_STOPTIMES=`head -n1 stop_times.txt | tr "," "\n" | grep -n "trip_id"| cut -d: -f1`
STOPSEQUENCE_STOPTIMES=`head -n1 stop_times.txt | tr "," "\n" | grep -n "stop_sequence"| cut -d: -f1`
## sort stop_times.txt by trip id and stop sequence
## Sort stop_times.txt by trip id and stop sequence
{ head -n 1 stop_times.txt ; tail -n +2 stop_times.txt | sort -t , -k ${TRIPID_STOPTIMES}d,${TRIPID_STOPTIMES} -k${STOPSEQUENCE_STOPTIMES}n,${STOPSEQUENCE_STOPTIMES}; } > stop_times2.txt ; mv stop_times2.txt stop_times.txt ;
## use stoptimes2connections to create a connections CSV file instead
echo Creating connections.txt file
$CURDIR/stoptimes2connections.js > connections.txt;

## order the connections.txt according to deptime, artime and stops
TRIPID_CONNECTIONS=`head -n1 connections.txt | tr "," "\n" | grep -n "trip_id"| cut -d: -f1`
## Sort trips.txt by trip_id and have the same ordering as stop_times.txt
{ head -n 1 trips.txt ; tail -n +2 trips.txt | sort -t , -k ${TRIPID_TRIPS}d,${TRIPID_TRIPS} ; } > trips2.txt ; mv trips2.txt trips.txt &
## Use stoptimes2connections to create a set of connections and trips files
echo Creating connections and trips files according to the number of CPU processors available
$CURDIR/stoptimes2connections.js;

## Finally sort all files in order to be processed for gtfs2lc
## Finally sort calendar.txt and calendar_dates.txt files in order to be processed for gtfs2lc
echo Sorting files in directory $1;
{ head -n 1 connections.txt ; tail -n +2 connections.txt | sort -t , -k ${TRIPID_CONNECTIONS}d,${TRIPID_CONNECTIONS} ; } > connections2.txt ; mv connections2.txt connections.txt &
{ head -n 1 trips.txt ; tail -n +2 trips.txt | sort -t , -k ${TRIPID_TRIPS}d,${TRIPID_TRIPS} ; } > trips2.txt ; mv trips2.txt trips.txt &
{ head -n 1 calendar.txt ; tail -n +2 calendar.txt | sort -t , -k 1d,1; } > calendar2.txt ; mv calendar2.txt calendar.txt &
{ head -n 1 calendar_dates.txt ; tail -n +2 calendar_dates.txt | sort -t , -k 1d,1; } > calendar_dates2.txt ; mv calendar_dates2.txt calendar_dates.txt &
} ;
Expand Down
126 changes: 36 additions & 90 deletions bin/gtfs2lc.js
Original file line number Diff line number Diff line change
@@ -1,34 +1,18 @@
#!/usr/bin/env node

var program = require('commander'),
gtfs2lc = require('../lib/gtfs2lc.js'),
MongoStream = require('../lib/Connections2Mongo.js'),
N3 = require('n3'),
jsonldstream = require('jsonld-stream'),
Connections2JSONLD = require('../lib/Connections2JSONLD.js'),
fs = require('fs');

//ty http://www.geedew.com/remove-a-directory-that-is-not-empty-in-nodejs/
var deleteFolderRecursive = function(path) {
if (fs.existsSync(path)) {
fs.readdirSync(path).forEach(function (file,index){
var curPath = path + "/" + file;
if(fs.lstatSync(curPath).isDirectory()) { // recurse
deleteFolderRecursive(curPath);
} else { // delete file
fs.unlinkSync(curPath);
}
});
fs.rmdirSync(path);
}
};
gtfs2lc = require('../lib/gtfs2lc.js'),
fs = require('fs'),
del = require('del');

console.error("GTFS to linked connections converter use --help to discover more functions");

program
.option('-f, --format <format>', 'Format of the output. Possibilities: csv, ntriples, turtle, json, jsonld (default: json), mongo (extended JSON format to be used with mongoimport) or mongold')
.option('-b, --baseUris <baseUris>', 'path to a file that describes the baseUris in json')
.option('-S, --store <store>', 'store type: LevelStore (uses your harddisk - for if you run out of RAM) or MemStore (default)')
.option('-f, --format <format>', 'Format of the output. Possibilities: csv, n-triples, turtle, json, jsonld, mongo (extended JSON format to be used with mongoimport) or mongold (default: json)')
.option('-b, --baseUris <baseUris>', 'Path to a file that describes the baseUris in json')
.option('-o, --output <output>', 'Path to the location where the result file will be stored')
.option('-s, --stream', 'Get the connections as a stream on the standard output')
.option('-S, --store <store>', 'Store type: KeyvStore (uses your disk to avoid that you run out of RAM) or MemStore (default)')
.arguments('<path>', 'Path to sorted GTFS files')
.action(function (path) {
program.path = path;
Expand All @@ -40,86 +24,48 @@ if (!program.path) {
process.exit();
}

var mapper = new gtfs2lc.Connections({
store : program.store
});
if (program.path.endsWith('/')) {
program.path = program.path.slice(0, -1);
}

var output = program.output || program.path;
if (output.endsWith('/')) {
output = output.slice(0, -1);
}

var baseUris = null;
if (program.baseUris) {
baseUris = JSON.parse(fs.readFileSync(program.baseUris, 'utf-8'));
}

var mapper = new gtfs2lc.Connections({
store: !program.store || program.store === 'undefined' ? 'MemStore' : program.store,
format: !program.format || program.format === 'undefined' ? 'json' : program.format,
baseUris: baseUris
});

var resultStream = null;
mapper.resultStream(program.path, function (stream, stopsdb) {
resultStream = stream;
if (!program.format || program.format === "json") {
stream.on('data', function (connection) {
console.log(JSON.stringify(connection));
})
} else if (program.format === 'mongo') {
stream.pipe(new MongoStream()).on('data', function (connection) {
console.log(JSON.stringify(connection));
});
} else if (program.format === 'csv') {
//print header
console.error('The CSV output is not using a Linked Data format – jsonld is the preferred format.');
console.log('"id","departureStop","departureTime","arrivalStop","arrivalTime","trip","route","headsign"');
var count = 0;

stream.on('data', function (connection) {
console.log(count + ',' + connection["departureStop"] + ',' + connection["departureTime"].toISOString() + ',' + connection["arrivalStop"] + ',' + connection["arrivalTime"].toISOString() + ',' + connection["trip"]["trip_id"] + ',' + connection.trip.route.route_id + ',"' + connection.headsign + '"');
count ++;
});
} else if (['jsonld','mongold'].indexOf(program.format) > -1) {
var context = {
'@context' : {
lc: 'http://semweb.mmlab.be/ns/linkedconnections#',
gtfs : 'http://vocab.gtfs.org/terms#',
xsd: 'http://www.w3.org/2001/XMLSchema#',
trip : { '@type' : '@id', '@id' : 'gtfs:trip' },
Connection : 'lc:Connection',
CancelledConnection: 'lc:CancelledConnection',
departureTime : { '@type' : 'xsd:dateTime', '@id' : 'lc:departureTime' },
departureStop : { '@type' : '@id', '@id' : 'lc:departureStop' },
arrivalStop : { '@type' : '@id', '@id' : 'lc:arrivalStop' },
arrivalTime : { '@type' : 'xsd:dateTime', '@id' : 'lc:arrivalTime' },
}
};
//convert triples stream to jsonld stream
stream = stream.pipe(new Connections2JSONLD(baseUris, stopsdb, context));
//prepare the output
if (program.format === 'mongold') {
//convert JSONLD Stream to MongoDB Stream
stream = stream.pipe(new MongoStream());
}
stream = stream.pipe(new jsonldstream.Serializer()).pipe(process.stdout);
} else if (['ntriples','turtle'].indexOf(program.format) > -1) {
stream = stream.pipe(new gtfs2lc.Connections2Triples(baseUris, stopsdb));
if (program.format === 'ntriples') {
stream = stream.pipe(new N3.StreamWriter({ format : 'N-Triples'}));
} else if (program.format === 'turtle') {
stream = stream.pipe(new N3.StreamWriter({ prefixes: { lc: 'http://semweb.mmlab.be/ns/linkedconnections#', gtfs : 'http://vocab.gtfs.org/terms#', xsd: 'http://www.w3.org/2001/XMLSchema#' } }));
}
stream.pipe(process.stdout);
mapper.resultStream(program.path, output, function (path) {
if (program.stream) {
fs.createReadStream(path).pipe(process.stdout);
} else {
console.error('Linked Connections successfully created at ' + path + '!');
}
stream.on('error', error => {
console.error(error);
});
stream.on('finish', function () {
console.error('Stream ended - everything should be fully converted!');
//clean up the leveldb
deleteFolderRecursive(program.path + "/.services");
deleteFolderRecursive(program.path + "/.trips");
});
});

process.on('SIGINT', function () {
console.error("\nSIGINT Received – Cleaning up");
if (resultStream) {
resultStream.end();
} else {
deleteFolderRecursive(program.path + "/.services");
deleteFolderRecursive(program.path + "/.trips");
del([
output + '/.stops',
output + '/.routes',
output + '/.services',
output + '/raw_*'
],
{ force: true }).then(function () {
process.exit(0);
});
}
process.exit(0);
});
13 changes: 7 additions & 6 deletions bin/linkedconnections-sort.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ CURDIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )"

## should be 1 argument: the connections.nldjsonld file, and this file should exist
[[ $# == 1 ]] && [[ -f $1 ]] && {
## First order it by departureTime, as well as
DEPARTURETIME=$(( `head -n1 $1 | tr "," "\n" | grep -n "departureTime"| cut -d: -f1` ));
DEPARTURESTOP=$(( `head -n1 $1 | tr "," "\n" | grep -n "departureStop"| cut -d: -f1` ));
ARRIVALTIME=$(( `head -n1 $1 | tr "," "\n" | grep -n "arrivalTime"| cut -d: -f1` ));
ARRIVALSTOP=$(( `head -n1 $1 | tr "," "\n" | grep -n "arrivalStop"| cut -d: -f1` ));
ROUTE=$(( `head -n1 $1 | tr "," "\n" | grep -n "gtfs:route"| cut -d: -f1` ));
## Skip first line that corresponds to JSON-LD @context
## Order it by departureTime, as well as
DEPARTURETIME=$(( `sed 1,1d $1 | head -n1 | tr "," "\n" | grep -n "departureTime"| cut -d: -f1` ));
DEPARTURESTOP=$(( `sed 1,1d $1 | head -n1 | tr "," "\n" | grep -n "departureStop"| cut -d: -f1` ));
ARRIVALTIME=$(( `sed 1,1d $1 | head -n1 | tr "," "\n" | grep -n "arrivalTime"| cut -d: -f1` ));
ARRIVALSTOP=$(( `sed 1,1d $1 | head -n1 | tr "," "\n" | grep -n "arrivalStop"| cut -d: -f1` ));
ROUTE=$(( `sed 1,1d $1 | head -n1 | tr "," "\n" | grep -n "gtfs:route"| cut -d: -f1` ));

## And after the sorting, we need to pipe it to a process that is able to join trains. Ordered in descending order, but afterwards again sorted in ascending order
sort $1 -t , -k ${DEPARTURETIME}dr,${DEPARTURETIME} -k ${ARRIVALTIME}dr,${ARRIVALTIME} -k ${ROUTE}dr,${ROUTE} -k ${DEPARTURESTOP}dr,${DEPARTURESTOP} -k ${ARRIVALSTOP}dr,${ARRIVALSTOP} | $CURDIR/linkedconnections-sortandjoin.js | sort -t , -k ${DEPARTURETIME}d,${DEPARTURETIME};
Expand Down
87 changes: 69 additions & 18 deletions bin/stoptimes2connections.js
Original file line number Diff line number Diff line change
@@ -1,31 +1,82 @@
#!/usr/bin/env node

var csv = require('fast-csv'),
fs = require('fs'),
St2C = require('../lib/stoptimes/st2c.js');
const csv = require('fast-csv');
const fs = require('fs');
const readline = require('readline');
const St2C = require('../lib/stoptimes/st2c.js');
const numCPUs = require('os').cpus().length;

var stopTimes = fs.createReadStream('stop_times.txt', { encoding: 'utf8', objectMode: true }).pipe(csv.parse({ objectMode: true, headers: true })).on('error', function (e) {
console.error(e);
// Fragment trips.txt accordingly to the number fo available CPU processors
var trips = fs.createReadStream('trips.txt', { encoding: 'utf8' })
.pipe(csv.parse({ objectMode: true, headers: true, quote: '"' }))
.on('error', function (e) {
console.error(e);
});
var tripsPool = createWriteStreams('trips');
var tripIndex = -1;

trips.on('data', trip => {
if (tripIndex === -1) {
for (let i in tripsPool) {
tripsPool[i].write(Object.keys(trip));
}
tripIndex++;
}

tripsPool[tripIndex].write(Object.values(trip));
tripIndex = tripIndex < numCPUs - 1 ? tripIndex + 1 : 0;
});

trips.on('end', () => {
for (let i in tripsPool) {
tripsPool[i].end();
}
});

// Also fragment stop_times.txt
var stopTimes = fs.createReadStream('stop_times.txt', { encoding: 'utf8', objectMode: true })
.pipe(csv.parse({ objectMode: true, headers: true, quote: '"' }))
.on('error', function (e) {
console.error(e);
});

var connectionsPool = createWriteStreams('connections');
var connIndex = -1;
var currentTrip = null;
var printedRows = 0;

var connectionRules = stopTimes.pipe(new St2C());

connectionRules.on('data', row => {
printConnectionRule(row);
if (connIndex === -1) {
for (let i in connectionsPool) {
connectionsPool[i].write(Object.keys(row));
}
}

if (row['trip_id'] !== currentTrip) {
currentTrip = row['trip_id'];
connIndex = connIndex < numCPUs - 1 ? connIndex + 1 : 0;
}

connectionsPool[connIndex].write(Object.values(row))
printedRows++;
});

connectionRules.on('end', () => {
console.error('Converted ' + printedRows + ' stoptimes to connections');
for (let i in connectionsPool) {
connectionsPool[i].end();
}
console.error('Converted ' + printedRows + ' stop_times to connections');
});

var printConnectionHeader = function (row) {
console.log(Object.keys(row).join(','));
};

var printedRows = 0;
var printConnectionRule = function (row) {
if (printedRows === 0) {
printConnectionHeader(row);
function createWriteStreams(name) {
let writers = [];
for (let i = 0; i < numCPUs; i++) {
const stream = csv.format();
stream.pipe(fs.createWriteStream(name + '_' + i + '.txt', { encoding: 'utf8' }));
writers.push(stream);
}
console.log(Object.values(row).join(','));
printedRows++;
}

return writers;
}
33 changes: 33 additions & 0 deletions lib/Connections2CSV.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
const { Transform } = require('stream');

class Connections2CSV extends Transform {
constructor(header) {
super({ objectMode: true });
this._headerStreamed = false;
if(!header) {
this.headerStreamed = true;
}
}

_transform(connection, encoding, done) {
if (!this.headerStreamed) {
this.headerStreamed = true;
done(null, '"departureStop","departureTime","arrivalStop","arrivalTime","trip","route","headsign"\n');
} else {
let csv = connection["departureStop"] + ',' + connection["departureTime"].toISOString() + ','
+ connection["arrivalStop"] + ',' + connection["arrivalTime"].toISOString() + ',' + connection["trip"]["trip_id"] + ','
+ connection.trip.route.route_id + ',"' + connection.headsign + '"' + '\n';
done(null, csv);
}
}

get headerStreamed() {
return this._headerStreamed;
}

set headerStreamed(value) {
this._headerStreamed = value;
}
}

module.exports = Connections2CSV;
Loading

0 comments on commit 8183a5f

Please sign in to comment.