Skip to content

Commit

Permalink
feat(libpostal): run libpostal in child process with IPC
Browse files Browse the repository at this point in the history
  • Loading branch information
missinglink committed Mar 11, 2020
1 parent 7a1de5f commit 3eca6ea
Show file tree
Hide file tree
Showing 20 changed files with 135 additions and 80 deletions.
2 changes: 1 addition & 1 deletion .jshintrc
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"node": true,
"curly": true,
"eqeqeq": true,
"esversion": 6,
"esversion": 8,
"freeze": true,
"immed": true,
"indent": 2,
Expand Down
12 changes: 9 additions & 3 deletions api/extract.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ var sqlite3 = require('sqlite3'),
query = requireDir('../query'),
analyze = require('../lib/analyze');

async function asyncForEach(array, callback) {
for (let index = 0; index < array.length; index++) {
await callback(array[index], index, array);
}
}

// export setup method
function setup( addressDbPath, streetDbPath ){

Expand All @@ -16,16 +22,16 @@ function setup( addressDbPath, streetDbPath ){
query.attach( db, streetDbPath, 'street' );

// query method
var q = function( coord, names, cb ){
var q = async function( coord, names, cb ){

var point = {
lat: parseFloat( coord.lat ),
lon: parseFloat( coord.lon )
};

var normalized = [];
names.forEach( function( name ){
normalized = normalized.concat( analyze.street( name ) );
await asyncForEach(names, async function( name ){
normalized = normalized.concat( await analyze.street( name ) );
});

// error checking
Expand Down
4 changes: 2 additions & 2 deletions api/search.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ function setup( addressDbPath, streetDbPath ){
db.run('PRAGMA street.mmap_size=268435456;');

// query method
var q = function( coord, number, street, cb ){
var q = async function( coord, number, street, cb ){

var point = {
lat: parseFloat( coord.lat ),
Expand All @@ -33,7 +33,7 @@ function setup( addressDbPath, streetDbPath ){

var normalized = {
number: analyze.housenumber( number ),
street: analyze.street( street )
street: await analyze.street( street )
};

// error checking
Expand Down
7 changes: 4 additions & 3 deletions cmd/extract.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

var extract = require('../api/extract'),
pretty = require('../lib/pretty');
const extract = require('../api/extract');
const pretty = require('../lib/pretty');
const postal = require('../lib/libpostal');

// help text
if( process.argv.length !== 7 ){
Expand Down Expand Up @@ -36,3 +36,4 @@ conn.query( point, names, function( err, res ){
});

conn.close();
postal.close();
7 changes: 4 additions & 3 deletions cmd/oa.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

var tty = require('tty'),
oa = require('../api/oa');
const tty = require('tty');
const oa = require('../api/oa');
const postal = require('../lib/libpostal');

// help text
if( process.argv.length !== 4 ){
Expand All @@ -16,4 +17,4 @@ if( tty.isatty( process.stdin ) ){
}

// run script
oa( process.stdin, process.argv[2], process.argv[3] );
oa( process.stdin, process.argv[2], process.argv[3], postal.close );
7 changes: 4 additions & 3 deletions cmd/osm.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

var tty = require('tty'),
osm = require('../api/osm');
const tty = require('tty');
const osm = require('../api/osm');
const postal = require('../lib/libpostal');

// help text
if( process.argv.length !== 4 ){
Expand All @@ -16,4 +17,4 @@ if( tty.isatty( process.stdin ) ){
}

// run script
osm( process.stdin, process.argv[2], process.argv[3] );
osm( process.stdin, process.argv[2], process.argv[3], postal.close );
7 changes: 4 additions & 3 deletions cmd/polyline.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

var tty = require('tty'),
polyline = require('../api/polyline');
const tty = require('tty');
const polyline = require('../api/polyline');
const postal = require('../lib/libpostal');

// help text
if( process.argv.length < 3 ){
Expand All @@ -16,4 +17,4 @@ if( tty.isatty( process.stdin ) ){
}

// run script
polyline( process.stdin, process.argv[2] );
polyline(process.stdin, process.argv[2], postal.close);
4 changes: 3 additions & 1 deletion cmd/search.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@

var search = require('../api/search');
const search = require('../api/search');
const postal = require('../lib/libpostal');

// help text
if( process.argv.length < 8 || process.argv.length > 9 ){
Expand Down Expand Up @@ -35,3 +36,4 @@ conn.query( point, number, street, function( err, res ){
});

conn.close();
postal.close();
29 changes: 13 additions & 16 deletions cmd/server.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@

var express = require('express'),
directory = require('serve-index'),
polyline = require('@mapbox/polyline'),
search = require('../api/search'),
extract = require('../api/extract'),
street = require('../api/street'),
near = require('../api/near'),
pretty = require('../lib/pretty'),
analyze = require('../lib/analyze'),
project = require('../lib/project'),
proximity = require('../lib/proximity');

const morgan = require( 'morgan' );
const express = require('express');
const polyline = require('@mapbox/polyline');
const search = require('../api/search');
const extract = require('../api/extract');
const street = require('../api/street');
const near = require('../api/near');
const pretty = require('../lib/pretty');
const analyze = require('../lib/analyze');

const morgan = require('morgan');
const logger = require('pelias-logger').get('interpolation');
const through = require( 'through2' );
const through = require('through2');
const _ = require('lodash');

// optionally override port using env var
Expand Down Expand Up @@ -208,10 +205,10 @@ app.use('/demo', express.static('demo'));
// app.use('/builds', express.static('/data/builds'));
// app.use('/builds', directory('/data/builds', { hidden: false, icons: false, view: 'details' }));

app.listen( PORT, function() {
app.listen( PORT, async function() {

// force loading of libpostal
analyze.street( 'test street' );
await analyze.street( 'test street' );

console.log( 'server listening on port', PORT );
});
7 changes: 4 additions & 3 deletions cmd/tiger.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

var tty = require('tty'),
tiger = require('../api/tiger');
const tty = require('tty');
const tiger = require('../api/tiger');
const postal = require('../lib/libpostal');

// help text
if( process.argv.length !== 4 ){
Expand All @@ -16,4 +17,4 @@ if( tty.isatty( process.stdin ) ){
}

// run script
tiger( process.stdin, process.argv[2], process.argv[3] );
tiger( process.stdin, process.argv[2], process.argv[3], postal.close );
5 changes: 3 additions & 2 deletions cmd/vertices.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@

var vertices = require('../api/vertices');
const vertices = require('../api/vertices');
const postal = require('../lib/libpostal');

// help text
if( process.argv.length !== 4 ){
Expand All @@ -10,4 +11,4 @@ if( process.argv.length !== 4 ){
}

// run script
vertices( process.argv[2], process.argv[3] );
vertices( process.argv[2], process.argv[3], postal.close() );
34 changes: 6 additions & 28 deletions lib/analyze.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,17 @@
// it could mean 'apt 1, no 7'; or could even be a valid range 'one to seven'.
// note: these values provide a means of setting some sane defaults for which
// ranges we try to parse and which ones we leave.
var MIN_RANGE = 1; // the miniumum amount β is higher than α
var MAX_RANGE = 6; // the maximum amount β is higher than α
var MIN_RANGE_HOUSENUMBER = 10; // the minimum acceptible value for both α and β

/*
* Return the appropriate version of node-postal
*/

var _nodepostal_module;
function get_libpostal() {
// lazy load this dependency; since it's large (~2GB RAM) and may be
// accidentally required by a process which doesn't use it.
if (!_nodepostal_module) {
// load the mock library if MOCK_LIBPOSTAL env var is set
if (process.env.MOCK_LIBPOSTAL) {
_nodepostal_module = require('../test/lib/mock_libpostal');
// otherwise load the real thing
} else {
_nodepostal_module = require('node-postal');
}
}

return _nodepostal_module;
}
const MIN_RANGE = 1; // the miniumum amount β is higher than α
const MAX_RANGE = 6; // the maximum amount β is higher than α
const MIN_RANGE_HOUSENUMBER = 10; // the minimum acceptible value for both α and β
const postal = require('./libpostal');

/**
analyze input streetname string and return a list of expansions.
**/
function street( streetName ){
const postal = get_libpostal();

async function street( streetName ){
// use libpostal to expand the address
var expansions = postal.expand.expand_address( streetName );
let expansions = await postal.expand.expand_address( streetName );

// remove ordinals
expansions = expansions.map(function( item ){
Expand Down
32 changes: 32 additions & 0 deletions lib/libpostal.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
const util = require('util');
const child = require('child_process');
const child_script = require('path').join(__dirname, './libpostal_child.js');
const bus = require('ipc-messages-manager').parent;
const stdio = ['inherit', 'inherit', 'inherit', 'ipc'];

// spawn child process
const proc = child.spawn('node', [child_script], { stdio });

// log subprocess errors
proc.on('error', (e) => console.error(e));

// specify 'expand_address' IPC API
function expand_address_ipc (address, options, cb) {
bus.send(proc, 'expand.expand_address', { address, options }, (res) => cb(null, res));
}

// specify 'expand_address' async API
const expand_address_async = async function(address) {
const promise = util.promisify(expand_address_ipc);
return promise(address, {});
};

module.exports = {
close: () => {
proc.disconnect();
proc.kill();
},
expand: {
expand_address: expand_address_async
}
};
19 changes: 19 additions & 0 deletions lib/libpostal_child.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
const bus = require('ipc-messages-manager').child;
const isMock = process.env.hasOwnProperty('MOCK_LIBPOSTAL');
const moduleName = isMock ? '../test/lib/mock_libpostal' : 'node-postal';
let postal;

function blockUntilModuleLoaded(){
if (!postal) {
postal = require(moduleName);
console.log('libpostal child process ready');
}
}

bus.actions.on('expand.expand_address', (args, cb) => {
blockUntilModuleLoaded();
const expanded = postal.expand.expand_address(args.address, args.options);
cb(expanded);
});

console.log('libpostal child process connected');
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@
"csv-parse": "^4.4.6",
"express": "^4.14.0",
"fs-extra": "^8.0.0",
"ipc-messages-manager": "^1.0.2",
"jsftp": "^2.0.0",
"lodash": "^4.17.4",
"morgan": "^1.9.0",
"node-postal": "^1.0.0",
"node-postal": "https://github.com/imothee/node-postal.git",
"pbf2json": "^6.4.0",
"pelias-config": "^4.0.0",
"pelias-logger": "^1.2.1",
Expand Down
4 changes: 2 additions & 2 deletions stream/address/lookup.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ if( hasFD3 ){

function streamFactory(db){

return through.obj(function( batch, _, next ){
return through.obj(async function( batch, _, next ){

// invalid batch
if( !batch || !batch.length ){
Expand All @@ -30,7 +30,7 @@ function streamFactory(db){

// all street names in batch should be the same
// perform libpostal normalization
var names = analyze.street( result.getStreet() );
var names = await analyze.street( result.getStreet() );

// ensure at least one name was produced
if( !names.length ){
Expand Down
14 changes: 11 additions & 3 deletions stream/street/augment.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ var through = require('through2'),
// eg: http://geojson.io/#id=gist:anonymous/ce8b0cdd2ba83ef24cfaab49d36d8cdd&map=15/52.5011/13.3222
var FUDGE_FACTOR = 0.005;

// async version of forEach
async function asyncForEach(array, callback) {
for (let index = 0; index < array.length; index++) {
await callback(array[index], index, array);
}
}

/**
this stream augments the parsed data with additional fields.
Expand All @@ -15,12 +22,13 @@ var FUDGE_FACTOR = 0.005;
- apply 'fudge factor' to bbox
**/
function streamFactory(){
return through.obj(function( street, _, next ){
return through.obj(async function( street, _, next ){

// normalize all names
var names = [];
street.getNames().forEach( function( name ){
names = names.concat( analyze.street( name ) );
await asyncForEach(street.getNames(), async function (name) {
const analyzed = await analyze.street(name);
names = names.concat(analyzed);
});

// if the source file contains no valid names for this polyline
Expand Down
3 changes: 3 additions & 0 deletions test/_func.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
var tape = require('tape');
var common = {};

var postal = require('../lib/libpostal');
tape.onFinish(postal.close);

var tests = [
require('./functional/basic/run.js'),
require('./functional/disjoined/run.js'),
Expand Down
3 changes: 3 additions & 0 deletions test/_unit.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
var tape = require('tape');
var common = {};

var postal = require('../lib/libpostal');
tape.onFinish(postal.close);

var tests = [
require('./interface.js'),
require('./lib/analyze.js'),
Expand Down
Loading

0 comments on commit 3eca6ea

Please sign in to comment.