Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use hafas.tripsByName() to find trips #9

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions example.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
'use strict'

const createThrottledHafas = require('vbb-hafas/throttle')
// const createThrottledHafas = require('vbb-hafas/throttle')
const {default: PQueue} = require('p-queue')
const {request} = require('hafas-client/lib/default-profile')
const createHafas = require('hafas-client')
const vbbProfile = require('hafas-client/p/vbb')
const createMonitor = require('.')

const potsdamerPlatz = {
Expand All @@ -13,11 +17,18 @@ const bbox = process.env.BBOX
? JSON.parse(process.env.BBOX)
: potsdamerPlatz

const queue = new PQueue({concurrency: 1})
const throttledRequest = (...args) => queue.add(() => request(...args))

const userAgent = 'hafas-monitor-trips example'
const hafas = createThrottledHafas(userAgent, 5, 1000) // 5 req/s
// const hafas = createThrottledHafas(userAgent, 5, 1000) // 5 req/s
const hafas = createHafas({
...vbbProfile,
request: throttledRequest,
}, userAgent)

const monitor = createMonitor(hafas, bbox, {
fetchTripsInterval: 10_000, // 10s
fetchTripsInterval: 60_000, // 60s
})
monitor.once('error', (err) => {
console.error(err)
Expand Down
162 changes: 101 additions & 61 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,22 @@ const {
register: globalMetricsRegistry,
Counter, Summary, Gauge,
} = require('prom-client')
const computeTiles = require('./lib/compute-tiles')
const redisOpts = require('./lib/redis-opts')
const noCache = require('./lib/no-cache')
const createWatchedTrips = require('./lib/watched-trips')
const tripsListSegmentsFilters = require('./lib/trips-list-segments-filters')
const createIsStopoverObsolete = require('./lib/is-stopover-obsolete')

const SECOND = 1000
const MINUTE = 60 * SECOND
const MAX_TILE_SIZE = 5 // in kilometers

// maximum nr. of trips returned by hafas.tripsByName()
// todo: move to hafas-client or determine dynamically
const TRIPS_BY_NAME_MAX_RESULTS = 1000

const TOO_MANY_QUEUED_MSG = `\
There are too many pending requests for the tile/trip fetching \
There are too many pending requests for the trips list/trip fetching \
intervals to be adhered to. Consider monitoring a smaller bbox or \
increasing the request throughput.\
`
Expand All @@ -45,11 +49,11 @@ const createMonitor = (hafas, bbox, opt) => {
metricsRegistry: globalMetricsRegistry,
...opt,
}
const fetchTilesInterval = Math.max(
const fetchTripsListInterval = Math.max(
Math.min(fetchTripsInterval, 3 * MINUTE),
30 * SECOND,
)
debug('fetchTilesInterval', fetchTilesInterval)
debug('fetchTripsListInterval', fetchTripsListInterval)

// metrics
const hafasRequestsTotal = new Counter({
Expand All @@ -76,18 +80,13 @@ const createMonitor = (hafas, bbox, opt) => {
// todo: use sliding window via maxAgeSeconds & ageBuckets?
labelNames: ['call'],
})
const monitoredTilesTotal = new Gauge({
name: 'monitored_tiles_total',
help: 'nr. of tiles being monitored',
registers: [metricsRegistry],
})
const monitoredTripsTotal = new Gauge({
name: 'monitored_trips_total',
help: 'nr. of trips being monitored',
registers: [metricsRegistry],
})
const tilesRefreshesPerSecond = new Gauge({
name: 'tiles_refreshes_second',
const tripsListRefreshesPerSecond = new Gauge({
name: 'trips_list_refreshes_second',
help: 'how often the list of trips is refreshed',
registers: [metricsRegistry],
})
Expand All @@ -97,14 +96,10 @@ const createMonitor = (hafas, bbox, opt) => {
registers: [metricsRegistry],
})

const tiles = computeTiles(bbox, {maxTileSize})
monitoredTilesTotal.set(tiles.length)
debug('tiles', tiles)

const out = new EventEmitter()

const redis = new Redis(redisOpts)
const watchedTrips = createWatchedTrips(redis, fetchTilesInterval * 1.5, monitoredTripsTotal)
const watchedTrips = createWatchedTrips(redis, fetchTripsListInterval * 1.5, monitoredTripsTotal)
const tripSeen = async (trips) => {
for (const [id, lineName] of trips) debugTrips('trip seen', id, lineName)
await watchedTrips.put(trips)
Expand All @@ -115,10 +110,10 @@ const createMonitor = (hafas, bbox, opt) => {
}

const checkQueueLoad = throttle(() => {
const tSinceFetchAllTiles = Date.now() - tLastFetchTiles
const tSinceFetchTripsList = Date.now() - tLastFetchTripsList
const tSinceFetchAllTrips = Date.now() - tLastFetchTrips
if (
tSinceFetchAllTiles > fetchTilesInterval * 1.5 ||
tSinceFetchTripsList > fetchTripsListInterval * 1.5 ||
tSinceFetchAllTrips > fetchTripsInterval * 1.5
) {
out.emit('too-many-queued')
Expand All @@ -132,40 +127,45 @@ const createMonitor = (hafas, bbox, opt) => {
checkQueueLoad()
}

const fetchTile = async (tile) => {
debugFetch('fetching tile', tile)
const fetchTripsListRecursively = async (lineNameOrFahrtNr = '*', tripsByNameOpts = {}) => {
debugFetch('fetching trips list (segment)', lineNameOrFahrtNr, tripsByNameOpts)

const t0 = Date.now()
let movements
let trips
try {
movements = await hafas.radar(tile, {
results: 1000, duration: 0, frames: 0, polylines: false,
// todo: `opt.language`
...hafasRadarOpts,
...noCache,
})
trips = await hafas.tripsByName(lineNameOrFahrtNr, tripsByNameOpts)
} catch (err) {
if (err && err.isHafasError) {
debugFetch('hafas error', err)
hafasErrorsTotal.inc({call: 'radar'})
out.emit('hafas-error', err)
return;
if (err && err.code === 'NO_MATCH') {
trips = []
} else {
if (err && err.isHafasError) {
debugFetch('hafas error', err)
hafasErrorsTotal.inc({call: 'radar'})
out.emit('hafas-error', err)
}
if (err && err.code === 'ECONNRESET') {
econnresetErrorsTotal.inc()
}
throw err
}
if (err && err.code === 'ECONNRESET') {
econnresetErrorsTotal.inc()
}
throw err
}
onReqTime('radar', Date.now() - t0)

for (const m of movements) {
const loc = m.location
debugFetch(m.tripId, m.line && m.line.name, loc.latitude, loc.longitude)

out.emit('position', loc, m)
onReqTime('tripsByName', Date.now() - t0)

if (trips.length >= TRIPS_BY_NAME_MAX_RESULTS) {
debugFetch(`maximum nr. of trips (${trips.length}), segmenting`)
const segments = tripsListSegmentsFilters(hafas, lineNameOrFahrtNr, tripsByNameOpts, trips)

const tripSets = await Promise.all(segments.map(({lineNameOrFahrtNr, opts}) => {
return fetchTripsListRecursively(lineNameOrFahrtNr, opts)
}))
trips = [].concat(...tripSets)
} else {
debugFetch(`acceptable nr. of trips (${trips.length}), not segmenting`)
}

await tripSeen(movements.map(m => [m.tripId, m.line && m.line.name || '']))
await tripSeen(trips.map(t => [t.id, t.line && t.line.name || '']))

return trips
}

const isStopoverObsolete = createIsStopoverObsolete(bbox)
Expand Down Expand Up @@ -198,6 +198,19 @@ const createMonitor = (hafas, bbox, opt) => {
}
onReqTime('trip', Date.now() - t0)

// HAFAS' tripsByName() returns some trip IDs that, when fetched via
// trip(), yield invalid trips (too sparse). We filter them out here.
// e.g. on-demand trips
if (
!trip.origin || !trip.origin.type
|| !trip.destination || !trip.destination.type
|| !Array.isArray(trip.stopovers) || trip.stopovers.length === 0
) {
debugTrips('invalid trip', id, trip)
await tripObsolete(id)
return;
}

if (trip.stopovers.every(isStopoverObsolete)) {
const st = trip.stopovers.map(s => [s.stop.location, s.arrival || s.plannedArrival])
debugTrips('trip obsolete', id, lineName, st)
Expand All @@ -214,31 +227,58 @@ const createMonitor = (hafas, bbox, opt) => {
}, trip)
}

if (trip.currentLocation) {
const loc = trip.currentLocation
debugFetch(trip.id, trip.line && trip.line.name, loc.latitude, loc.longitude)

const arrOf = (st) => {
return Date.parse(st.arrival || st.plannedArrival || st.departure || st.plannedDeparture)
}
// todo: use dep time?
const stopoversIdx = trip.stopovers.findIndex(st => arrOf(st) < Date.now())
const m = {
tripId: trip.id,
direction: trip.direction,
line: trip.line,
location: trip.currentLocation,
nextStopovers: stopoversIdx < 0
? []
: trip.stopovers.slice(stopoversIdx).slice(0, 3),
frames: [],
}
out.emit('position', loc, m)
}

await tripSeen([
[trip.id, trip.line && trip.line.name || '']
])
}

let running = false
let fetchTilesTimer = null, tLastFetchTiles = Date.now()
let fetchTripsListTimer = null, tLastFetchTripsList = Date.now()
let fetchTripsTimer = null, tLastFetchTrips = Date.now()

const fetchAllTiles = async () => {
const fetchTripsList = async () => {
if (!running) return;
tilesRefreshesPerSecond.set(1000 / (Date.now() - tLastFetchTiles))
debug('refreshing all tiles')
tLastFetchTiles = Date.now()

try {
await Promise.all(tiles.map(fetchTile))
} catch (err) {
out.emit('error', err)
}
debug('refreshing the trips list')
tLastFetchTripsList = Date.now()
let trips = await fetchTripsListRecursively()

tripsListRefreshesPerSecond.set(1000 / (Date.now() - tLastFetchTripsList))
tLastFetchTripsList = Date.now()
debug('done refreshing the trips list')

// HAFAS' tripsByName() also returns invalid trips. We filter them out here.
trips = trips.filter(t => t.id && t.line && t.line.name)

await Promise.all(trips.map(async (t) => {
await tripSeen(t.id, t.line && t.line.name || '')
}))

debug('done refreshing tiles')
if (running) {
const tNext = Math.max(100, fetchTilesInterval - (Date.now() - tLastFetchTiles))
fetchTilesTimer = setTimeout(fetchAllTiles, tNext)
const tNext = Math.max(100, fetchTripsListInterval - (Date.now() - tLastFetchTripsList))
fetchTripsListTimer = setTimeout(fetchTripsList, tNext)
}
}

Expand Down Expand Up @@ -270,9 +310,9 @@ const createMonitor = (hafas, bbox, opt) => {
debug('starting monitor')
running = true

fetchAllTiles()
fetchTripsList()
.then(fetchAllTrips)
.catch(() => {}) // silence rejection
// .catch(() => {}) // silence rejection
}

const stop = () => {
Expand All @@ -281,8 +321,8 @@ const createMonitor = (hafas, bbox, opt) => {
running = false

redis.quit()
clearTimeout(fetchTilesTimer)
fetchTilesTimer = null
clearTimeout(fetchTripsListTimer)
fetchTripsListTimer = null
clearTimeout(fetchTripsTimer)
fetchTripsTimer = null
out.emit('stop')
Expand Down
44 changes: 0 additions & 44 deletions lib/compute-tiles.js

This file was deleted.

Loading