Skip to content

Commit

Permalink
Merge pull request #89 from linkedconnections/development
Browse files Browse the repository at this point in the history
v1.3.3
  • Loading branch information
julianrojas87 authored Dec 12, 2022
2 parents 87bc9b3 + da92fd4 commit e004a48
Show file tree
Hide file tree
Showing 9 changed files with 252 additions and 305 deletions.
9 changes: 6 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ The Web Server does not provide any functionality by itself, it needs at least o

- **storage:** This is the path that tells the server where to store and where to look for the data fragments, created from the different datasets. **This should not include a trailing slash**. Make sure you have enough disk space to store and process datasets.

- **sortMemory:** Max amount of RAM memory that can be used by the Linked Connections sorting process. Default is 2G.

- **organization:** URI and name of the data publisher.

- **keywords:** Related keywords for a given dataset. E.g., types of vehicles available.
Expand All @@ -61,7 +63,7 @@ The Web Server does not provide any functionality by itself, it needs at least o

- **updatePeriod:** Cron expression that defines how often should the server look for and process a new version of the dataset. We use the [node-cron](https://github.com/kelektiv/node-cron) library for this.

- **fragmentSize:** Defines the maximum size of every linked data fragment in bytes.
- **fragmentSize:** Defines the maximum number of connections per data fragment.

- **realTimeData:** If available, here we define all the parameters related with a GTFS-RT feed.
- **downloadUrl:** Here we define the URL to download the GTFS-RT data feed.
Expand All @@ -77,6 +79,7 @@ The Web Server does not provide any functionality by itself, it needs at least o
```js
{
"storage": "/opt/linked-connections-data", //datasets storage path
"sortMemory": "4G",
"organization": {
"id": "https://...",
"name": "Organization name"
Expand All @@ -89,14 +92,14 @@ The Web Server does not provide any functionality by itself, it needs at least o
"downloadUrl": "https://...",
"downloadOnLaunch": false,
"updatePeriod": "0 0 3 * * *", //every day at 3 am
"fragmentSize": 50000, // 50 Kb
"fragmentSize": 1000, // 1000 connections/fragment
"realTimeData": {
"downloadUrl": "https://...",
"headers": { "apiKeyHttpHeader": "my_api_key" },
"updatePeriod": "*/30 * * * * *", //every 30s
"fragmentTimeSpan": 600, // 600 seconds
"compressionPeriod": "0 0 3 * * *", // Every day at 3 am
"indexStore": "MemStore", // MemStore for RAM and KeyvStore for disk processing,=
"indexStore": "MemStore", // MemStore for RAM and LevelStore for disk processing
"deduce": true // Set true only if the GTFS-RT feed does not provide tripIds
},
"baseURIs": {
Expand Down
5 changes: 3 additions & 2 deletions datasets_config.json.example
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"storage": "/drive/folder/subfolder",
"sortMemory": "4G",
"organization": {
"id": "https://example.org/your/URL",
"name": "Data publisher name"
Expand All @@ -12,14 +13,14 @@
"downloadUrl": "http://...",
"downloadOnLaunch": true,
"updatePeriod": "0 0 3 28 * *",
"fragmentSize": 50000,
"fragmentSize": 1000,
"realTimeData": {
"downloadUrl": "http://...",
"headers": { "apiKeyHeader": "my_api_key" },
"updatePeriod": "*/30 * * * * *",
"fragmentTimeSpan": 600,
"compressionPeriod": "* * * * * *",
"indexStore": "MemStore",
"indexStore": "LevelStore",
"deduce": false
},
"baseURIs": {
Expand Down
293 changes: 84 additions & 209 deletions lib/manager/dataset_manager.js

Large diffs are not rendered by default.

91 changes: 58 additions & 33 deletions lib/manager/pageWriterStream.js
Original file line number Diff line number Diff line change
@@ -1,42 +1,67 @@
const Writable = require('stream').Writable;
const fs = require('fs');
const zlib = require('zlib');

module.exports = class pageWriterStream extends Writable {

constructor(targetPath, size) {
super({ objectMode: true });
this._targetPath = targetPath + '/';
this._size = size;
this._byteCount = 0;
this._currentFileName = '';
this._wstream = '';
this._lastDepartureTime = null;
}

_write(data, encoding, done) {
if (!data['@context']) {
let dataString = JSON.stringify(data);
let buffer = Buffer.from(dataString);

if (this._currentFileName == '') {
this._currentFileName = data.departureTime;
this._wstream = fs.createWriteStream(this._targetPath + this._currentFileName + '.jsonld');
this._wstream.write(dataString);
this._byteCount += buffer.byteLength;
} else {
if (this._byteCount >= this._size && data.departureTime != this._lastDepartureTime) {
this._wstream.end();
this._currentFileName = data.departureTime;
this._wstream = fs.createWriteStream(this._targetPath + this._currentFileName + '.jsonld');
this._wstream.write(dataString);
this._byteCount = buffer.byteLength;
constructor(targetPath, size) {
super({ objectMode: true });
this._targetPath = targetPath + '/';
this._size = size;
this._count = 0;
this._currentFileName = null;
this._lastDepartureTime = null;
this._gzip = null;
this._wstream = null;
}

_write(data, encoding, done) {
const dataValue = data.value;

if (!dataValue['@context']) {
const dataString = JSON.stringify(dataValue);

if (!this._currentFileName) {
this._currentFileName = dataValue.departureTime;
this.initWriter();
this._gzip.write(dataString, () => {
this._count++;
done();
});

} else {
if (this._count >= this._size && dataValue.departureTime != this._lastDepartureTime) {
this._gzip.end(null, () => {
this._currentFileName = dataValue.departureTime;
this.initWriter();
this._gzip.write(dataString, () => {
this._count = 1
this._lastDepartureTime = dataValue.departureTime;
done();
});
});
} else {
this._gzip.write(',\n' + dataString, () => {
this._count++;
this._lastDepartureTime = dataValue.departureTime;
done();
});
}
}
} else {
this._wstream.write(',\n' + dataString);
this._byteCount += buffer.byteLength;
done();
}
}
this._lastDepartureTime = data.departureTime;
}
done();
}

_final(done) {
this._wstream.on('finish', () => {
done();
});
this._gzip.end();
}

initWriter() {
this._gzip = zlib.createGzip();
this._wstream = this._gzip.pipe(fs.createWriteStream(this._targetPath + this._currentFileName + '.jsonld.gz'));
}
}
1 change: 0 additions & 1 deletion lib/routes/router.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ const Catalog = require('./catalog');
const Stops = require('./stops');
const Routes = require('./routes');
const Feed = require('./feed');
const fs = require('fs');
const utils = require('../utils/utils')

const router = express.Router();
Expand Down
8 changes: 6 additions & 2 deletions lib/routes/stops.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,14 @@ class Stops {
"@type": data['location_type'] ? locationTypes[parseInt(data['location_type'])] : "Stop",
"latitude": data['stop_lat'].trim(),
"longitude": data['stop_lon'].trim(),
"name": data['stop_name'].trim(),
"code": data['stop_code'].trim()
"name": data['stop_name'].trim()
};

// Add stop_code if present
if(data['stop_code']) {
obj['code'] = data['stop_code'].trim();
}

// Add parent station and platform code if any
if (obj['@type'] && obj['@type'] !== "Station") {
obj['parentStation'] = index.has(data['parent_station']) ?
Expand Down
96 changes: 73 additions & 23 deletions lib/utils/utils.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
const util = require('util');
const fs = require('fs');
const { request } = require('undici');
const zlib = require('zlib');
const unzip = require('unzipper');
const md5 = require('md5');
const jsonld = require('jsonld');
const Logger = require('./logger');
const cronParser = require('cron-parser');
const N3 = require('n3');
const {DataFactory} = N3;
const {namedNode, literal, blankNode, defaultGraph, quad} = DataFactory;
const { DataFactory } = N3;
const { namedNode, literal, blankNode, defaultGraph, quad } = DataFactory;

const readFile = util.promisify(fs.readFile);
const readdir = util.promisify(fs.readdir);
Expand All @@ -27,6 +28,63 @@ module.exports = new class Utils {
logger = Logger.getLogger(this._serverConfig.logLevel || 'info');
}

async download(url, headers) {
const checkUrl = new URL(url);
if (["http:", "https:"].includes(checkUrl.protocol)) {
const res = await request(url, {
method: "GET",
headers,
maxRedirections: 5
});

if (res.statusCode <= 400) {
logger.debug(`Downloading ${url} ...`);
return res;
} else {
throw new Error(`HTTP error (${res.statusCode}) while requesting: ${url}`);
}
} else {
throw new Error(`Invalid URL: ${url}`);
}

}

downloadGTFSToDisk(url, headers, path) {
return new Promise(async (resolve, reject) => {
try {
const res = await this.download(url, headers);
const lastModified = res.headers['last-modified'] ? new Date(res.headers['last-modified']).toISOString() : new Date().toISOString();
const fileName = `${path}/${lastModified}.zip`;
const writer = fs.createWriteStream(fileName, { encoding: 'base64' });

res.body.pipe(writer)
.on('error', err => reject(err))
.on('finish', () => {
resolve(lastModified);
});
} catch (err) {
reject(err);
}
});
}

copyFileFromDisk(source, path) {
return new Promise((resolve, reject) => {
if (fs.existsSync(source) && source.endsWith('.zip')) {
const stat = fs.statSync(source);
const name = new Date(util.inspect(stat.mtime)).toISOString();
const copy = fs.createReadStream(source)
.pipe(fs.createWriteStream(`${path}/${name}.zip`));

copy.on('finish', () => {
resolve(name);
});
} else {
reject(new Error('Invalid GTFS file'));
}
});
}

readAndGunzip(path) {
return new Promise((resolve, reject) => {
let buffers = [];
Expand Down Expand Up @@ -78,7 +136,7 @@ module.exports = new class Utils {
path += '.zip';
}
fs.createReadStream(path)
.pipe(unzip.Extract({path: dirName}))
.pipe(unzip.Extract({ path: dirName }))
.on('close', () => {
resolve(dirName);
})
Expand All @@ -95,7 +153,7 @@ module.exports = new class Utils {

for (let v of versions) {
let diff = Math.abs(date.getTime() - new Date(v).getTime());
diffs.push({'version': v, 'diff': diff});
diffs.push({ 'version': v, 'diff': diff });
}

diffs.sort((a, b) => {
Expand Down Expand Up @@ -489,7 +547,7 @@ module.exports = new class Utils {

async addHydraMetada(params) {
try {
let template = await readFile('./statics/skeleton.jsonld', {encoding: 'utf8'});
let template = await readFile('./statics/skeleton.jsonld', { encoding: 'utf8' });
let jsonld_skeleton = JSON.parse(template);
let host = params.host;
let agency = params.agency;
Expand Down Expand Up @@ -526,7 +584,7 @@ module.exports = new class Utils {

async addLDESMetadata(params) {
try {
let template = await readFile('./statics/skeleton-ldes.jsonld', {encoding: 'utf8'});
let template = await readFile('./statics/skeleton-ldes.jsonld', { encoding: 'utf8' });
let ldes = JSON.parse(template);
let host = params.host;
let agency = params.agency;
Expand Down Expand Up @@ -618,7 +676,7 @@ module.exports = new class Utils {

async addSHACLMetadata(params) {
try {
let template = await readFile('./statics/shape.jsonld', {encoding: 'utf8'});
let template = await readFile('./statics/shape.jsonld', { encoding: 'utf8' });
let shape = JSON.parse(template);

let id = params.id;
Expand Down Expand Up @@ -692,18 +750,18 @@ module.exports = new class Utils {
if ((memento && memento < now) || departureTime < (now - 10800000)) {
// Immutable (for browsers which support it, sometimes limited to https only
// 1 year expiry date to keep it long enough in cache for the others
res.set({'Cache-Control': 'public, max-age=31536000000, immutable'});
res.set({'Expires': new Date(now.getTime() + 31536000000).toUTCString()});
res.set({ 'Cache-Control': 'public, max-age=31536000000, immutable' });
res.set({ 'Expires': new Date(now.getTime() + 31536000000).toUTCString() });
} else {
// Let clients hold on to this data for 1 second longer than nginx. This way nginx can update before the clients
res.set({'Cache-Control': 'public, s-maxage=' + maxage + ', max-age=' + (maxage + 1) + ', stale-if-error=' + (maxage + 15) + ', proxy-revalidate'});
res.set({'Expires': validUntilDate.toUTCString()});
res.set({ 'Cache-Control': 'public, s-maxage=' + maxage + ', max-age=' + (maxage + 1) + ', stale-if-error=' + (maxage + 15) + ', proxy-revalidate' });
res.set({ 'Expires': validUntilDate.toUTCString() });
}

res.set({'ETag': etag});
res.set({'Vary': 'Accept-Encoding, Accept-Datetime'});
res.set({'Last-Modified': lastModifiedDate.toUTCString()});
res.set({'Content-Type': 'application/ld+json'});
res.set({ 'ETag': etag });
res.set({ 'Vary': 'Accept-Encoding, Accept-Datetime' });
res.set({ 'Last-Modified': lastModifiedDate.toUTCString() });
res.set({ 'Content-Type': 'application/ld+json' });

// If an if-none-match header exists, and if the real-time data hasn't been updated since, just return a 304
// According to the spec this header takes priority over if-modified-since
Expand Down Expand Up @@ -861,14 +919,6 @@ module.exports = new class Utils {
return value;
}

// parseCronExp(cronExp){
// let sec = 0;
// const arr = cronExp.split(" ");
// for (let i = 0; i < arr.length; i++) {
//
// }
// }

get datasetsConfig() {
return this._datasetsConfig;
}
Expand Down
11 changes: 5 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,22 @@
"del": "^5.1.0",
"express": "^4.17.1",
"fast-csv": "^4.1.1",
"follow-redirects": "^1.10.0",
"graceful-fs": "^4.2.3",
"gtfs2lc": "^1.0.4",
"gtfsrt2lc": "^1.4.6",
"gtfs2lc": "^2.1.9",
"gtfsrt2lc": "^2.0.5",
"jsonld": "^5.2.0",
"jsonld-stream": "^1.0.3",
"md5": "^2.2.1",
"morgan": "^1.9.1",
"n3": "^1.3.5",
"node-watch": "^0.6.3",
"serve-favicon": "^2.5.0",
"stream-json": "^1.7.4",
"undici": "^5.12.0",
"unzipper": "^0.10.10",
"uri-templates": "^0.2.0",
"winston": "^3.2.1"
},
"devDependencies": {
"coveralls": "^3.0.9",
"jest": "^25.1.0"
"jest": "^29.3.1"
}
}
Loading

0 comments on commit e004a48

Please sign in to comment.