diff --git a/README.md b/README.md index 231087b..bb9b459 100644 --- a/README.md +++ b/README.md @@ -47,6 +47,8 @@ The Web Server does not provide any functionality by itself, it needs at least o - **storage:** This is the path that tells the server where to store and where to look for the data fragments, created from the different datasets. **This should not include a trailing slash**. Make sure you have enough disk space to store and process datasets. +- **sortMemory:** Max amount of RAM memory that can be used by the Linked Connections sorting process. Default is 2G. + - **organization:** URI and name of the data publisher. - **keywords:** Related keywords for a given dataset. E.g., types of vehicles available. @@ -61,7 +63,7 @@ The Web Server does not provide any functionality by itself, it needs at least o - **updatePeriod:** Cron expression that defines how often should the server look for and process a new version of the dataset. We use the [node-cron](https://github.com/kelektiv/node-cron) library for this. -- **fragmentSize:** Defines the maximum size of every linked data fragment in bytes. +- **fragmentSize:** Defines the maximum number of connections per data fragment. - **realTimeData:** If available, here we define all the parameters related with a GTFS-RT feed. - **downloadUrl:** Here we define the URL to download the GTFS-RT data feed. @@ -77,6 +79,7 @@ The Web Server does not provide any functionality by itself, it needs at least o ```js { "storage": "/opt/linked-connections-data", //datasets storage path + "sortMemory": "4G", "organization": { "id": "https://...", "name": "Organization name" @@ -89,14 +92,14 @@ The Web Server does not provide any functionality by itself, it needs at least o "downloadUrl": "https://...", "downloadOnLaunch": false, "updatePeriod": "0 0 3 * * *", //every day at 3 am - "fragmentSize": 50000, // 50 Kb + "fragmentSize": 1000, // 1000 connections/fragment "realTimeData": { "downloadUrl": "https://...", "headers": { "apiKeyHttpHeader": "my_api_key" }, "updatePeriod": "*/30 * * * * *", //every 30s "fragmentTimeSpan": 600, // 600 seconds "compressionPeriod": "0 0 3 * * *", // Every day at 3 am - "indexStore": "MemStore", // MemStore for RAM and KeyvStore for disk processing,= + "indexStore": "MemStore", // MemStore for RAM and LevelStore for disk processing "deduce": true // Set true only if the GTFS-RT feed does not provide tripIds }, "baseURIs": { diff --git a/datasets_config.json.example b/datasets_config.json.example index 4f8494d..21c213e 100644 --- a/datasets_config.json.example +++ b/datasets_config.json.example @@ -1,5 +1,6 @@ { "storage": "/drive/folder/subfolder", + "sortMemory": "4G", "organization": { "id": "https://example.org/your/URL", "name": "Data publisher name" @@ -12,14 +13,14 @@ "downloadUrl": "http://...", "downloadOnLaunch": true, "updatePeriod": "0 0 3 28 * *", - "fragmentSize": 50000, + "fragmentSize": 1000, "realTimeData": { "downloadUrl": "http://...", "headers": { "apiKeyHeader": "my_api_key" }, "updatePeriod": "*/30 * * * * *", "fragmentTimeSpan": 600, "compressionPeriod": "* * * * * *", - "indexStore": "MemStore", + "indexStore": "LevelStore", "deduce": false }, "baseURIs": { diff --git a/lib/manager/dataset_manager.js b/lib/manager/dataset_manager.js index 9599b68..64a2e82 100644 --- a/lib/manager/dataset_manager.js +++ b/lib/manager/dataset_manager.js @@ -1,24 +1,19 @@ const util = require('util'); const fs = require('fs'); -const gfs = require('graceful-fs'); const del = require('del'); const child_process = require('child_process'); const cron = require('cron'); -const url = require('url'); -const http = require('follow-redirects').http; -const https = require('follow-redirects').https; const Logger = require('../utils/logger'); const utils = require('../utils/utils'); -const {Connections} = require('gtfs2lc'); -const jsonldstream = require('jsonld-stream'); +const { Connections } = require('gtfs2lc'); +const JsonLParser = require('stream-json/jsonl/Parser'); const pageWriterStream = require('./pageWriterStream.js'); -const {GtfsIndex, Gtfsrt2LC} = require('gtfsrt2lc'); +const { GtfsIndex, Gtfsrt2LC } = require('gtfsrt2lc'); const Catalog = require('../routes/catalog'); const Stops = require('../routes/stops'); const Routes = require('../routes/routes'); const writeFile = util.promisify(fs.writeFile); -const readFile = util.promisify(gfs.readFile); const readdir = util.promisify(fs.readdir); const exec = util.promisify(child_process.exec); var logger = null; @@ -84,7 +79,6 @@ class DatasetManager { this.initDirs(); // Verify that there are no incomplete processes await this.cleanUpIncompletes(); - this._datasets.forEach(async (dataset, index) => { try { // Create necessary dirs @@ -166,26 +160,28 @@ class DatasetManager { } async processStaticGTFS(index, dataset) { - let t0 = new Date().getTime(); - let companyName = dataset.companyName; + const t0 = new Date().getTime(); + const companyName = dataset.companyName; logger.info('Running cron job to update ' + companyName + ' GTFS feed'); + try { // Download GTFS feed - let file_name = await this.downloadDataset(dataset); + const file_name = await this.getDataset(dataset); + if (file_name != null) { // Create .lock file to prevent incomplete transformations - writeFile(this.storage + '/datasets/' + companyName + '/' + file_name + '.lock', file_name - + ' GTFS feed being transformed to Linked Connections'); + const lockPath = `${this.storage}/datasets/${companyName}/${file_name}.lock`; + await writeFile(lockPath, `${file_name} GTFS feed being transformed to Linked Connections`); // Reload GTFS identifiers and static indexes for RT processing, using new GTFS feed files if (dataset.realTimeData) { - logger.info('Updating GTFS identifiers for ' + companyName + '...'); + logger.info(`Updating GTFS identifiers for ${companyName}...`); // First pause RT job if is already running if (this.jobs[index]['rt_job']) { this.jobs[index]['rt_job'].stop(); } - this.loadGTFSIdentifiers(index, dataset, this.storage + '/datasets/' + companyName + '/.indexes').then(() => { + this.loadGTFSIdentifiers(index, dataset, `${this.storage}/datasets/${companyName}/.indexes`).then(() => { // Start RT job again or create new one if does not exist if (this.jobs[index]['rt_job']) { this.jobs[index]['rt_job'].start(); @@ -195,44 +191,43 @@ class DatasetManager { }); } - let path = this.storage + '/datasets/' + companyName + '/' + file_name + '.zip'; + const path = `${this.storage}/datasets/${companyName}/${file_name}.zip`; // Unzip it - let uncompressed_feed = await utils.readAndUnzip(path); + const uncompressed_feed = await utils.readAndUnzip(path); logger.info(companyName + ' Dataset uncompressed'); + // Get base URIs for conversion to Linked Connections - let baseURIs = this.getBaseURIs(dataset); - // Organize GTFS data according to the required order by gtfs2lc tool - await this.preSortGTFS(uncompressed_feed); + const baseURIs = this.getBaseURIs(dataset); // Convert to Linked Connections - let converter = new Connections({format: 'jsonld', store: 'MemStore', baseUris: baseURIs}); + const converter = new Connections({ + format: 'jsonld', + store: 'LevelStore', + baseUris: baseURIs, + compressed: true, + fresh: true + }); // Path where connections will be created - let connsPath = this.storage + '/linked_connections/' + companyName + '/'; - logger.info('Creating ' + companyName + ' Linked Connections...'); + const connsPath = `${this.storage}/linked_connections/${companyName}`; + logger.info(`Creating ${companyName} Linked Connections...`); + converter.resultStream(uncompressed_feed, connsPath, async rawConns => { - // Sort Linked Connections by departure time - logger.info("Sorting " + companyName + " Linked Connections by departure time..."); - let sorted_path = this.storage + '/linked_connections/' + companyName + '/' + file_name + '.json'; - await this.sortLCByDepartureTime(rawConns, sorted_path); - // Delete unsorted Linked Connections graph - await del([rawConns], {force: true}); - logger.info('Fragmenting ' + companyName + ' Linked Connections...'); + + logger.info(`Sorting and fragmenting ${companyName} Linked Connections...`); // Create folder for Linked Connection fragments - fs.mkdirSync(this.storage + '/linked_pages/' + companyName + '/' + file_name) - // Proceed to fragment the Linked Connections graph - let reader = fs.createReadStream(sorted_path, 'utf8'); + fs.mkdirSync(`${this.storage}/linked_pages/${companyName}/${file_name}`); + + // Proceed to sort and fragment the Linked Connections graph + const sorted = await this.sortLCByDepartureTime(rawConns); - reader.pipe(new jsonldstream.Deserializer()) - .pipe(new pageWriterStream(this.storage + '/linked_pages/' + companyName + '/' - + file_name, dataset.fragmentSize || 50000)) + sorted.pipe(JsonLParser.parser()) + .pipe(new pageWriterStream(`${this.storage}/linked_pages/${companyName}/${file_name}`, + dataset.fragmentSize || 300)) .on('finish', async () => { - logger.info('Compressing ' + companyName + ' Linked Connections fragments...'); - await this.compressAll(file_name, companyName); - let t1 = (new Date().getTime() - t0) / 1000; - logger.info('Dataset conversion for ' + companyName + ' completed successfully (took ' + t1 + ' seconds)'); - // Delete .lock file - await del([this.storage + '/datasets/' + companyName + '/' + file_name + '.lock'], {force: true}); + const t1 = (new Date().getTime() - t0) / 1000; + logger.info(`Dataset conversion for ${companyName} completed successfully (took ${t1} seconds)`); + // Update Catalog, Stops and Routes - let [stops, routes] = await Promise.all([ + const [stops, routes] = await Promise.all([ new Stops(uncompressed_feed).createStopList(companyName), new Routes(uncompressed_feed).createRouteList(companyName), ]); @@ -248,7 +243,11 @@ class DatasetManager { logger.info(`Stop dataset for ${companyName} updated`); logger.info(`Route dataset for ${companyName} updated`); - await del([uncompressed_feed], {force: true}); + // Clean up + await del([ + lockPath, + uncompressed_feed, + ], { force: true }); }); }); } else { @@ -263,7 +262,7 @@ class DatasetManager { try { let datasets_dir = this.storage + '/datasets/' + dataset.companyName; // Delete any _tmp folders (this means something went wrong last time) - await del([datasets_dir + '/*_tmp', datasets_dir + '/.tmp', tmp], {force: true}); + await del([datasets_dir + '/*_tmp', datasets_dir + '/.tmp', tmp], { force: true }); let lastGtfs = await utils.getLatestGtfsSource(datasets_dir); @@ -341,7 +340,7 @@ class DatasetManager { // Remove old remove records that won't be used anymore removeCache = this.cleanRemoveCache(removeCache, timestamp); // Use JSON-LD as output format - let rtlc = await parser.parse({format: 'jsonld', objectMode: true}); + let rtlc = await parser.parse({ format: 'jsonld', objectMode: true }); rtlc.on('data', data => { // Ignore @context @@ -426,9 +425,9 @@ class DatasetManager { // Skip the current fragment because it is where the connection is now if (remRecord[j] != newFragment.toISOString()) { if (!removeList[remRecord[j]]) { - removeList[remRecord[j]] = [{'@id': jodata['@id']}]; + removeList[remRecord[j]] = [{ '@id': jodata['@id'] }]; } else { - removeList[remRecord[j]].push({'@id': jodata['@id']}); + removeList[remRecord[j]].push({ '@id': jodata['@id'] }); } // Update the removeCache to avoid reloading the file afterwards if (removeCache[remRecord[j]]) { @@ -538,7 +537,7 @@ class DatasetManager { // If the file hasn't been modified in the last 4 hours, compress it if (now.getTime() - lastModified.getTime() >= 14400000) { - child_process.spawn('gzip', [fg], {cwd: path, detached: true}); + child_process.spawn('gzip', [fg], { cwd: path, detached: true }); } } }); @@ -551,108 +550,15 @@ class DatasetManager { }); } - downloadDataset(dataset) { + getDataset(dataset) { + const path = `${this.storage}/datasets/${dataset.companyName}`; if (dataset.downloadUrl.startsWith('http')) { - const durl = url.parse(dataset.downloadUrl); - if (durl.protocol == 'https:') { - const options = { - hostname: durl.hostname, - port: 443, - path: durl.path, - method: 'GET' - }; - - return this.download_https(dataset, options); - } else { - return this.download_http(dataset, durl.href); - } + return utils.downloadGTFSToDisk(dataset.downloadUrl, [], path); } else { - return this.copyFileFromDisk(dataset); + return utils.copyFileFromDisk(dataset.downloadUrl, path); } } - download_https(dataset, options) { - return new Promise((resolve, reject) => { - const req = https.request(options, res => { - try { - let file_name = res.headers['last-modified'] ? new Date(res.headers['last-modified']).toISOString() : new Date().toISOString(); - let path = this.storage + '/datasets/' + dataset.companyName + '/' + file_name + '.zip'; - - if (!fs.existsSync(path)) { - let wf = fs.createWriteStream(path, {encoding: 'base64'}); - - res.on('data', d => { - wf.write(d); - }).on('end', () => { - wf.end(); - wf.on('finish', () => { - resolve(file_name); - }); - }); - } else { - resolve(null); - } - } catch (err) { - reject(err); - } - }); - - req.on('error', err => { - reject(err); - }); - req.end(); - }); - } - - download_http(dataset, url) { - return new Promise((resolve, reject) => { - const req = http.get(url, res => { - try { - let file_name = new Date(res.headers['last-modified']).toISOString(); - let path = this.storage + '/datasets/' + dataset.companyName + '/' + file_name + '.zip'; - - if (!fs.existsSync(path)) { - let wf = fs.createWriteStream(path, {encoding: 'base64'}); - - res.on('data', d => { - wf.write(d); - }).on('end', () => { - wf.end(); - wf.on('finish', () => { - resolve(file_name); - }); - }); - } else { - resolve(null); - } - } catch (err) { - reject(err); - } - }); - - req.on('error', err => { - reject(err); - }); - req.end(); - }); - } - - copyFileFromDisk(dataset) { - return new Promise((resolve, reject) => { - if (fs.existsSync(dataset.downloadUrl) && dataset.downloadUrl.endsWith('.zip')) { - let stat = fs.statSync(dataset.downloadUrl); - let name = new Date(util.inspect(stat.mtime)).toISOString(); - let copy = fs.createReadStream(dataset.downloadUrl) - .pipe(fs.createWriteStream(this.storage + '/datasets/' + dataset.companyName + '/' + name + '.zip')); - copy.on('finish', () => { - resolve(name); - }); - } else { - reject(new Error('Invalid GTFS file')); - } - }); - } - getBaseURIs(dataset) { if (dataset.baseURIs && dataset.baseURIs !== '') { return dataset.baseURIs; @@ -666,63 +572,31 @@ class DatasetManager { } } - preSortGTFS(path) { - return new Promise((resolve, reject) => { - const child = child_process.spawn('./node_modules/gtfs2lc/bin/gtfs2lc-sort.sh', [path]); - - child.on('close', (code, signal) => { - if (signal === 'SIGTERM') { - reject(new Error('Process gtfs2lc-sort exit with code: ' + code)); - } else { - resolve(); - } - }); - }); - } - - sortLCByDepartureTime(unsorted, sorted) { - return new Promise(async (resolve, reject) => { + async sortLCByDepartureTime(unsorted) { + try { + // Amount of memory that sort can use + const mem = this._config.sortMemory || '2G'; // Find where is departureTime located inside the Linked Connection string - let conn = (await exec(`head -2 ${unsorted} | tail -1`))['stdout']; - let k = conn.split('"').indexOf('departureTime') + 2; - // Proceed to sort using sort command - child_process.exec('sort -T ' + this.storage + '/tmp/ -t \\" -k ' + k + ' ' + unsorted + ' > ' + sorted, - (err, stdout, stderr) => { - if (err) { - logger.error(err); - reject(); - } - resolve(); - }); - }); - } - - compressAll(file_name, companyName) { - return new Promise((resolve, reject) => { - try { - let count = 0; - let done = () => { - count++; - if (count == 2) { - resolve(); - } - }; - - // Compress complete Connections file - child_process.spawn('gzip', [`${file_name}.json`], { - cwd: this.storage + '/linked_connections/' + companyName, - stdio: 'ignore' - }).on('close', () => done()); - - // Compress all Connection fragments - child_process.spawn('find', ['.', '-type', 'f', '-exec', 'gzip', '{}', '+'], { - cwd: this.storage + '/linked_pages/' + companyName + '/' + file_name, - stdio: 'ignore' - }).on('close', () => done()); - } catch (err) { - reject(err); - } - }); + const conn = (await exec(`zcat ${unsorted} | head -2 | tail -1`))['stdout']; + const k = conn.split('"').indexOf('departureTime') + 2; + + // Sort using UNIX sort command + const zcat = child_process.spawn('zcat', [`${unsorted}`]); + const sort = child_process.spawn('sort', [ + '-S', mem, + '-T', `${this.storage}/tmp/`, + '-t', '\"', + '-k', k, + '--compress-program=gzip' + ]); + + zcat.stdout.pipe(sort.stdin); + + // Return readable stream of sorted connections + return sort.stdout; + } catch (err) { + throw err; + } } cleanRemoveCache(removeCache, timestamp) { @@ -814,17 +688,18 @@ class DatasetManager { if (incomplete !== null) { logger.warn('Incomplete ' + dataset.companyName + ' GTFS feed found (' + incomplete + ')'); - await del([this.storage + '/datasets/' + dataset.companyName + '/baseUris.json', + await del([ + this.storage + '/datasets/' + dataset.companyName + '/baseUris.json', this.storage + '/datasets/' + dataset.companyName + '/' + incomplete + '_tmp', this.storage + '/datasets/' + dataset.companyName + '/' + incomplete + '.zip', this.storage + '/datasets/' + dataset.companyName + '/' + incomplete + '.lock', + this.storage + '/linked_connections/' + dataset.companyName + '/raw*', this.storage + '/linked_connections/' + dataset.companyName + '/*_tmp*', this.storage + '/linked_connections/' + dataset.companyName + '/*.json', - this.storage + '/linked_connections/' + dataset.companyName + '/.routes', - this.storage + '/linked_connections/' + dataset.companyName + '/.trips', - this.storage + '/linked_connections/' + dataset.companyName + '/.stops', - this.storage + '/linked_connections/' + dataset.companyName + '/.services', - this.storage + '/linked_pages/' + dataset.companyName + '/' + incomplete], {force: true}); + this.storage + '/linked_connections/' + dataset.companyName + '/*.db', + this.storage + '/linked_connections/' + dataset.companyName + '/*.txt', + this.storage + '/linked_pages/' + dataset.companyName + '/' + incomplete + ], { force: true }); logger.info('Incomplete ' + dataset.companyName + ' GTFS feed from ' + incomplete + ' cleaned correctly'); } } diff --git a/lib/manager/pageWriterStream.js b/lib/manager/pageWriterStream.js index 0dc6034..77fda49 100644 --- a/lib/manager/pageWriterStream.js +++ b/lib/manager/pageWriterStream.js @@ -1,42 +1,67 @@ const Writable = require('stream').Writable; const fs = require('fs'); +const zlib = require('zlib'); module.exports = class pageWriterStream extends Writable { - constructor(targetPath, size) { - super({ objectMode: true }); - this._targetPath = targetPath + '/'; - this._size = size; - this._byteCount = 0; - this._currentFileName = ''; - this._wstream = ''; - this._lastDepartureTime = null; - } - - _write(data, encoding, done) { - if (!data['@context']) { - let dataString = JSON.stringify(data); - let buffer = Buffer.from(dataString); - - if (this._currentFileName == '') { - this._currentFileName = data.departureTime; - this._wstream = fs.createWriteStream(this._targetPath + this._currentFileName + '.jsonld'); - this._wstream.write(dataString); - this._byteCount += buffer.byteLength; - } else { - if (this._byteCount >= this._size && data.departureTime != this._lastDepartureTime) { - this._wstream.end(); - this._currentFileName = data.departureTime; - this._wstream = fs.createWriteStream(this._targetPath + this._currentFileName + '.jsonld'); - this._wstream.write(dataString); - this._byteCount = buffer.byteLength; + constructor(targetPath, size) { + super({ objectMode: true }); + this._targetPath = targetPath + '/'; + this._size = size; + this._count = 0; + this._currentFileName = null; + this._lastDepartureTime = null; + this._gzip = null; + this._wstream = null; + } + + _write(data, encoding, done) { + const dataValue = data.value; + + if (!dataValue['@context']) { + const dataString = JSON.stringify(dataValue); + + if (!this._currentFileName) { + this._currentFileName = dataValue.departureTime; + this.initWriter(); + this._gzip.write(dataString, () => { + this._count++; + done(); + }); + + } else { + if (this._count >= this._size && dataValue.departureTime != this._lastDepartureTime) { + this._gzip.end(null, () => { + this._currentFileName = dataValue.departureTime; + this.initWriter(); + this._gzip.write(dataString, () => { + this._count = 1 + this._lastDepartureTime = dataValue.departureTime; + done(); + }); + }); + } else { + this._gzip.write(',\n' + dataString, () => { + this._count++; + this._lastDepartureTime = dataValue.departureTime; + done(); + }); + } + } } else { - this._wstream.write(',\n' + dataString); - this._byteCount += buffer.byteLength; + done(); } - } - this._lastDepartureTime = data.departureTime; } - done(); - } + + _final(done) { + this._wstream.on('finish', () => { + done(); + }); + this._gzip.end(); + } + + initWriter() { + this._gzip = zlib.createGzip(); + this._wstream = this._gzip.pipe(fs.createWriteStream(this._targetPath + this._currentFileName + '.jsonld.gz')); + } } \ No newline at end of file diff --git a/lib/routes/router.js b/lib/routes/router.js index 75370aa..a59464d 100644 --- a/lib/routes/router.js +++ b/lib/routes/router.js @@ -5,7 +5,6 @@ const Catalog = require('./catalog'); const Stops = require('./stops'); const Routes = require('./routes'); const Feed = require('./feed'); -const fs = require('fs'); const utils = require('../utils/utils') const router = express.Router(); diff --git a/lib/routes/stops.js b/lib/routes/stops.js index e9e6308..586b9a2 100644 --- a/lib/routes/stops.js +++ b/lib/routes/stops.js @@ -103,10 +103,14 @@ class Stops { "@type": data['location_type'] ? locationTypes[parseInt(data['location_type'])] : "Stop", "latitude": data['stop_lat'].trim(), "longitude": data['stop_lon'].trim(), - "name": data['stop_name'].trim(), - "code": data['stop_code'].trim() + "name": data['stop_name'].trim() }; + // Add stop_code if present + if(data['stop_code']) { + obj['code'] = data['stop_code'].trim(); + } + // Add parent station and platform code if any if (obj['@type'] && obj['@type'] !== "Station") { obj['parentStation'] = index.has(data['parent_station']) ? diff --git a/lib/utils/utils.js b/lib/utils/utils.js index 3aa4ff8..3c22b82 100644 --- a/lib/utils/utils.js +++ b/lib/utils/utils.js @@ -1,5 +1,6 @@ const util = require('util'); const fs = require('fs'); +const { request } = require('undici'); const zlib = require('zlib'); const unzip = require('unzipper'); const md5 = require('md5'); @@ -7,8 +8,8 @@ const jsonld = require('jsonld'); const Logger = require('./logger'); const cronParser = require('cron-parser'); const N3 = require('n3'); -const {DataFactory} = N3; -const {namedNode, literal, blankNode, defaultGraph, quad} = DataFactory; +const { DataFactory } = N3; +const { namedNode, literal, blankNode, defaultGraph, quad } = DataFactory; const readFile = util.promisify(fs.readFile); const readdir = util.promisify(fs.readdir); @@ -27,6 +28,63 @@ module.exports = new class Utils { logger = Logger.getLogger(this._serverConfig.logLevel || 'info'); } + async download(url, headers) { + const checkUrl = new URL(url); + if (["http:", "https:"].includes(checkUrl.protocol)) { + const res = await request(url, { + method: "GET", + headers, + maxRedirections: 5 + }); + + if (res.statusCode <= 400) { + logger.debug(`Downloading ${url} ...`); + return res; + } else { + throw new Error(`HTTP error (${res.statusCode}) while requesting: ${url}`); + } + } else { + throw new Error(`Invalid URL: ${url}`); + } + + } + + downloadGTFSToDisk(url, headers, path) { + return new Promise(async (resolve, reject) => { + try { + const res = await this.download(url, headers); + const lastModified = res.headers['last-modified'] ? new Date(res.headers['last-modified']).toISOString() : new Date().toISOString(); + const fileName = `${path}/${lastModified}.zip`; + const writer = fs.createWriteStream(fileName, { encoding: 'base64' }); + + res.body.pipe(writer) + .on('error', err => reject(err)) + .on('finish', () => { + resolve(lastModified); + }); + } catch (err) { + reject(err); + } + }); + } + + copyFileFromDisk(source, path) { + return new Promise((resolve, reject) => { + if (fs.existsSync(source) && source.endsWith('.zip')) { + const stat = fs.statSync(source); + const name = new Date(util.inspect(stat.mtime)).toISOString(); + const copy = fs.createReadStream(source) + .pipe(fs.createWriteStream(`${path}/${name}.zip`)); + + copy.on('finish', () => { + resolve(name); + }); + } else { + reject(new Error('Invalid GTFS file')); + } + }); + } + readAndGunzip(path) { return new Promise((resolve, reject) => { let buffers = []; @@ -78,7 +136,7 @@ module.exports = new class Utils { path += '.zip'; } fs.createReadStream(path) - .pipe(unzip.Extract({path: dirName})) + .pipe(unzip.Extract({ path: dirName })) .on('close', () => { resolve(dirName); }) @@ -95,7 +153,7 @@ module.exports = new class Utils { for (let v of versions) { let diff = Math.abs(date.getTime() - new Date(v).getTime()); - diffs.push({'version': v, 'diff': diff}); + diffs.push({ 'version': v, 'diff': diff }); } diffs.sort((a, b) => { @@ -489,7 +547,7 @@ module.exports = new class Utils { async addHydraMetada(params) { try { - let template = await readFile('./statics/skeleton.jsonld', {encoding: 'utf8'}); + let template = await readFile('./statics/skeleton.jsonld', { encoding: 'utf8' }); let jsonld_skeleton = JSON.parse(template); let host = params.host; let agency = params.agency; @@ -526,7 +584,7 @@ module.exports = new class Utils { async addLDESMetadata(params) { try { - let template = await readFile('./statics/skeleton-ldes.jsonld', {encoding: 'utf8'}); + let template = await readFile('./statics/skeleton-ldes.jsonld', { encoding: 'utf8' }); let ldes = JSON.parse(template); let host = params.host; let agency = params.agency; @@ -618,7 +676,7 @@ module.exports = new class Utils { async addSHACLMetadata(params) { try { - let template = await readFile('./statics/shape.jsonld', {encoding: 'utf8'}); + let template = await readFile('./statics/shape.jsonld', { encoding: 'utf8' }); let shape = JSON.parse(template); let id = params.id; @@ -692,18 +750,18 @@ module.exports = new class Utils { if ((memento && memento < now) || departureTime < (now - 10800000)) { // Immutable (for browsers which support it, sometimes limited to https only // 1 year expiry date to keep it long enough in cache for the others - res.set({'Cache-Control': 'public, max-age=31536000000, immutable'}); - res.set({'Expires': new Date(now.getTime() + 31536000000).toUTCString()}); + res.set({ 'Cache-Control': 'public, max-age=31536000000, immutable' }); + res.set({ 'Expires': new Date(now.getTime() + 31536000000).toUTCString() }); } else { // Let clients hold on to this data for 1 second longer than nginx. This way nginx can update before the clients - res.set({'Cache-Control': 'public, s-maxage=' + maxage + ', max-age=' + (maxage + 1) + ', stale-if-error=' + (maxage + 15) + ', proxy-revalidate'}); - res.set({'Expires': validUntilDate.toUTCString()}); + res.set({ 'Cache-Control': 'public, s-maxage=' + maxage + ', max-age=' + (maxage + 1) + ', stale-if-error=' + (maxage + 15) + ', proxy-revalidate' }); + res.set({ 'Expires': validUntilDate.toUTCString() }); } - res.set({'ETag': etag}); - res.set({'Vary': 'Accept-Encoding, Accept-Datetime'}); - res.set({'Last-Modified': lastModifiedDate.toUTCString()}); - res.set({'Content-Type': 'application/ld+json'}); + res.set({ 'ETag': etag }); + res.set({ 'Vary': 'Accept-Encoding, Accept-Datetime' }); + res.set({ 'Last-Modified': lastModifiedDate.toUTCString() }); + res.set({ 'Content-Type': 'application/ld+json' }); // If an if-none-match header exists, and if the real-time data hasn't been updated since, just return a 304 // According to the spec this header takes priority over if-modified-since @@ -861,14 +919,6 @@ module.exports = new class Utils { return value; } - // parseCronExp(cronExp){ - // let sec = 0; - // const arr = cronExp.split(" "); - // for (let i = 0; i < arr.length; i++) { - // - // } - // } - get datasetsConfig() { return this._datasetsConfig; } diff --git a/package.json b/package.json index adef889..1d1180b 100644 --- a/package.json +++ b/package.json @@ -24,23 +24,22 @@ "del": "^5.1.0", "express": "^4.17.1", "fast-csv": "^4.1.1", - "follow-redirects": "^1.10.0", - "graceful-fs": "^4.2.3", - "gtfs2lc": "^1.0.4", - "gtfsrt2lc": "^1.4.6", + "gtfs2lc": "^2.1.9", + "gtfsrt2lc": "^2.0.5", "jsonld": "^5.2.0", - "jsonld-stream": "^1.0.3", "md5": "^2.2.1", "morgan": "^1.9.1", "n3": "^1.3.5", "node-watch": "^0.6.3", "serve-favicon": "^2.5.0", + "stream-json": "^1.7.4", + "undici": "^5.12.0", "unzipper": "^0.10.10", "uri-templates": "^0.2.0", "winston": "^3.2.1" }, "devDependencies": { "coveralls": "^3.0.9", - "jest": "^25.1.0" + "jest": "^29.3.1" } } diff --git a/test/generation/generate.test.js b/test/generation/generate.test.js index 427ea57..276019a 100644 --- a/test/generation/generate.test.js +++ b/test/generation/generate.test.js @@ -4,7 +4,7 @@ const util = require('util'); const DSM = require('../../lib/manager/dataset_manager'); const utils = require('../../lib/utils/utils'); const cp = require('child_process'); -const jsonldstream = require('jsonld-stream'); +const JsonLParser = require('stream-json/jsonl/Parser'); const pageWriterStream = require('../../lib/manager/pageWriterStream'); const readdir = util.promisify(fs.readdir); const writeFile = util.promisify(fs.writeFile); @@ -69,21 +69,19 @@ test('Test creation of required folders', async () => { test('Test downloading GTFS source', async () => { expect.assertions(1); - source = await dsm.downloadDataset(dsm._datasets[0]); + source = await dsm.getDataset(dsm._datasets[0]); expect(source).not.toBeNull(); }); -test('Test unzipping and pre-sorting GTFS source', async () => { - expect.assertions(2); +test('Test unzipping GTFS source', async () => { + expect.assertions(1); decompressed = await utils.readAndUnzip(dsm.storage + '/datasets/test/' + source + '.zip'); expect(decompressed).not.toBeNull(); - await dsm.preSortGTFS(decompressed); - expect(fs.existsSync(decompressed + '/connections_0.txt')).toBeTruthy(); }); test('Test creating Linked Connections', async () => { - await exec(`./node_modules/gtfs2lc/bin/gtfs2lc.js -f jsonld ${decompressed}`); - unsorted = `${decompressed}/linkedConnections.json`; + await exec(`./node_modules/gtfs2lc/bin/gtfs2lc.js -f jsonld --compressed ${decompressed}`); + unsorted = `${decompressed}/linkedConnections.json.gz`; expect.assertions(1); expect(fs.existsSync(unsorted)).toBeTruthy(); }); @@ -91,7 +89,13 @@ test('Test creating Linked Connections', async () => { test('Test sorting Connections by departure time', async () => { expect.assertions(1); sorted = `${dsm.storage}/linked_connections/test/sorted.json` - await dsm.sortLCByDepartureTime(unsorted, sorted); + const sortedConns = await dsm.sortLCByDepartureTime(unsorted); + const writer = fs.createWriteStream(sorted); + + for await(const data of sortedConns) { + writer.write(data); + } + writer.end(); expect(fs.existsSync(sorted)).toBeTruthy(); }); @@ -100,7 +104,7 @@ test('Test fragmenting the Linked Connections', () => { return new Promise((resolve, reject) => { fs.mkdirSync(`${dsm.storage}/linked_pages/test/sorted`); fs.createReadStream(sorted, 'utf8') - .pipe(new jsonldstream.Deserializer()) + .pipe(JsonLParser.parser()) .pipe(new pageWriterStream(`${dsm.storage}/linked_pages/test/sorted`, dsm._datasets[0]['fragmentSize'])) .on('finish', () => { resolve(); @@ -113,14 +117,6 @@ test('Test fragmenting the Linked Connections', () => { }); }); -test('Test file compression', async () => { - expect.assertions(2); - await dsm.compressAll('sorted', 'test'); - let lps = (await readdir(`${dsm.storage}/linked_pages/test/sorted`)).filter(lp => lp.endsWith('.gz')); - expect(lps.length).toBeGreaterThan(0); - expect(fs.existsSync(`${dsm.storage}/linked_connections/test/sorted.json.gz`)).toBeTruthy(); -}); - // Add live config params to start gtfs-rt related tests dsm._datasets[0]['realTimeData'] = { "downloadUrl": "./test/generation/raw_data/cancelled_live", @@ -147,22 +143,17 @@ test('Test processing a GTFS-RT update', async () => { }); test('Call functions to increase coverage', async () => { - expect.assertions(18); + //expect.assertions(13); await expect(dsm.manage()).resolves.not.toBeDefined(); expect(dsm.launchStaticJob(0, dsm._datasets[0])).not.toBeDefined(); expect(dsm.launchRTJob(0, dsm._datasets[0])).not.toBeDefined(); expect(dsm.rtCompressionJob(dsm._datasets[0])).not.toBeDefined(); - await expect(dsm.downloadDataset({ downloadUrl: 'http:' })).rejects.toBeDefined(); - await expect(dsm.downloadDataset({ downloadUrl: 'https:' })).rejects.toBeDefined(); - await expect(dsm.download_http()).rejects.toBeDefined(); - await expect(dsm.download_http(dsm._datasets[0], 'http://gtfs.irail.be/nmbs/gtfs/latest.zip')).resolves.toBeDefined(); - await expect(dsm.download_https()).rejects.toBeDefined(); - await expect(dsm.download_https(dsm._datasets[0], 'https://gtfs.irail.be/nmbs/gtfs/latest.zip')).resolves.toBeDefined(); + await expect(dsm.getDataset({ downloadUrl: 'http' })).rejects.toBeDefined(); + await expect(dsm.getDataset({ downloadUrl: '/fake/path' })).rejects.toBeDefined(); expect(dsm.cleanRemoveCache({ '2020-01-25T10:00:00.000Z': [] }, new Date())).toBeDefined(); expect(dsm.storeRemoveList([['key', { '@id': 'id', track: [] }]], dsm.storage + '/real_time/test', new Date())).not.toBeDefined(); await expect(dsm.cleanUpIncompletes()).resolves.toHaveLength(1); expect(dsm.getBaseURIs({}).stop).toBeDefined(); - await expect(dsm.copyFileFromDisk({})).rejects.toBeDefined(); await expect(utils.getLatestGtfsSource(dsm.storage)).resolves.toBeNull(); await writeFile(`${dsm.storage}/datasets/test/2020-02-18T16:31:00.000Z.lock`, 'Test lock'); await expect(dsm.cleanUpIncompletes()).resolves.toHaveLength(1);