Provides Postgresql
database connection and SQL statement processing functionality for Slate.
npm install git://github.com/elm-slate/db-utils
To include the latest version of db-utils in your program use:
"dependencies": {
"@elm-slate/db-utils": "git://github.com/elm-slate/db-utils.git",
}
To use a specific version:
"dependencies": {
"@elm-slate/db-utils": "git://github.com/elm-slate/db-utils.git#0.1.9",
}
close
createClient
commit
createConnectionUrl
createPooledClient
createQueryStream
executeSQLStatement
lockEntities
rollback
setDefaultOptions
Closes a client connection to a Postgresql
database created with either createClient
or createPooledClient
.
client
- The object returned from callingcreateClient
orcreatePooledClient
.err
- optional parameter for clients created withcreatePooledClient
.- if the parameter is missing or
falsy
, then client connection will be destroyed and not returned to the connection pool. - if the parameter is
truthy
the client connection will be returned to the connection pool. - do not return client connections to the connection pool while they are in the middle of a transaction. unpredictable results will occur the next time the connection is used.
- this parameter has no effect for closing clients created with
createClient
.
- if the parameter is missing or
const co = require('co');
const dbUtils = require('@elm-slate/db-utils');
const exampleForClient = co.wrap(function *(connectionParams) {
const dbClient = yield dbUtils.createClient(dbUtils.createConnectionUrl(connectionParams));
dbUtils.close(dbClient);
});
const exampleForPooledClient = co.wrap(function *(connectionParams) {
let pooledClient;
try {
pooledClient = yield dbUtils.createPooledClient(dbUtils.createConnectionUrl(connectionParams));
// closing the pooled connection using this form of close will return the client connection to the connection pool. make the sure connection is not in the middle of
// a transaction or future results using the same connection will be unpredictable.
dbUtils.close(pooledClient);
}
catch (err) {
if (pooledClient) {
// closing pooled client connection with truthy err parameter will destroy the client connection and not return the connection to the connection pool
dbUtils.close(pooledClient, err);
}
}
});
const example = co.wrap(function *(connectionParams) {
yield exampleForClient(connectionParams);
yield exampleForPooledClient(connectionParams);
});
example({host: 'exampleHost', databaseName: 'exampleDbname', user: 'exampleUser', password: 'examplePassword'})
.then(() => {
console.log(`Example complete`);
process.exit(0);
})
.catch(err => {
console.error(err.stack || err.message);
process.exit(1);
});
Commits a transaction.
client
- Apg
Client.
See lockEntities
.
Creates a pg
Client using a Postgresql
connection URL string.
Returns a Promise that is resolved with a pg
Client.
conString
- APostgresql
connection URL string.
See close
.
Creates a Postgresql
connection URL string from connection parameters.
Returns a Postgresql
connection URL string.
connectionParams
- APostgresql
connection parameters object with the following keys:host
- the name of the serverdatabaseName
- name of the databaseuser
- the user (IFF database requires)password
- password (IFF database requires)connectTimeout
- number of milliseconds to wait for a connection (OPTIONAL defaults:15000
)
See close
.
Creates an object containing a dbClient
property which is a pg
Client. The pg
Client object is retrieved from a connection pool built using the supplied Postgresql
connection URL string.
Returns a Promise that is resolved with an object containing a pg
Client (property dbClient
) retrieved from a connection pool.
conString
- APostgresql
connection URL string.
See close
.
Creates a readable stream of rows returned from the input SQL statement.
Returns a readable stream of rows.
client
- Apg
Client.statement
- A SQL statement with or without parameters, i.e. using $1, $2, etc.prepareStmtParams
- An array of prepared statement parameters. Required if the SQL statement contains parameters.options
- An optional object that has properties used by the QueryStream constructor in thepg-query-stream
library which contain the following keys:highWaterMark (default 16384)
- the node Readable StreamhighWaterMark
batchSize (default 10000)
- the number of rows that will be retrieved from the database server for each request to resupply the stream
const co = require('co');
const coread = require('co-read');
const dbUtils = require('@elm-slate/db-utils');
const getRowsFromStream = co.wrap(function *(rowStream) {
var rows = [];
var endOfStream = false;
while (!endOfStream) {
var row = yield coread(rowStream);
// event returned
if (row) {
rows[rows.length] = row;
}
// end of stream
else {
endOfStream = true;
}
}
return rows;
});
const example = co.wrap(function *(connectionParams) {
const pooledClient = yield dbUtils.createPooledClient(dbUtils.createConnectionUrl(connectionParams));
const rowStream = dbUtils.createQueryStream(pooledClient.dbClient, 'SELECT * FROM exampleTable');
rowStream.on('error', err => {
console.error({err: err});
throw err;
});
const rows = yield getRowsFromStream(rowStream);
dbUtils.close(pooledClient);
});
example({host: 'exampleHost', databaseName: 'exampleDbname', user: 'exampleUser', password: 'examplePassword'})
.then(() => {
console.log(`Example complete`);
process.exit(0);
})
.catch(err => {
console.error(err.stack || err.message);
process.exit(1);
});
Executes a SQL statement.
Returns a Promise that is resolved with a pg
Result object.
client
- Apg
Client.statement
- A SQL statement with or without parameters, i.e. using $1, $2, etc.prepareStmtParams
- An array of prepared statement parameters. Required if the SQL statement contains parameters.
const co = require('co');
const dbUtils = require('@elm-slate/db-utils');
const exampleForClient = co.wrap(function *(connectionParams) {
const dbClient = yield dbUtils.createClient(dbUtils.createConnectionUrl(connectionParams));
const results = yield dbUtils.executeSQLStatement(dbClient, 'SELECT * FROM exampleTable');
results.rows.forEach((row, i) => {
console.log(`Row ${i}: ${row.column1}, ${row.column2}`);
});
dbUtils.close(dbClient);
return results.rowCount;
});
const exampleForPooledClient = co.wrap(function *(connectionParams) {
const pooledClient = yield dbUtils.createPooledClient(dbUtils.createConnectionUrl(connectionParams));
const results = yield dbUtils.executeSQLStatement(pooledClient.dbClient, 'SELECT * FROM exampleTable');
results.rows.forEach((row, i) => {
console.log(`Row ${i}: ${row.column1}, ${row.column2}`);
});
dbUtils.close(pooledClient);
});
const example = co.wrap(function *(connectionParams) {
const clientRowCount = yield exampleForClient(connectionParams);
const pooledClientRowCount = yield exampleForPooledClient(connectionParams);
});
example({host: 'exampleHost', databaseName: 'exampleDbname', user: 'exampleUser', password: 'examplePassword'})
.then(() => {
console.log(`Example complete`);
process.exit(0);
})
.catch(err => {
console.error(err.stack || err.message);
process.exit(1);
});
Creates a transaction and exclusively locks entities from access by other cooperative parties using Postgresql advisory transaction locks derived from each entity's uuid. Cooperative parties are also using this function before writing to the Event Source database. Slate requires that all writes first use this function.
client
- Apg
Client.entityIds
- An array of uuids of the entities to lock.
const co = require('co');
const dbUtils = require('@elm-slate/db-utils');
const close = (client, err) => {
try {
if (client)
dbUtils.close(client, err);
}
catch (err) {
console.error(err.stack || err.message);
}
};
const exampleForClient = co.wrap(function *(connectionParams) {
let dbClient;
try {
dbClient = yield dbUtils.createClient(dbUtils.createConnectionUrl(connectionParams));
const locksObtained = yield dbUtils.lockEntities(dbClient, ['06f1ee30-a2f5-4585-9bc7-3c78e32075b9', '2dce4c44-3af8-4c5a-bff8-7ac7cca443e0']);
if (locksObtained) {
// do some work you want to abort
// rollback transaction started by lockEntities
yield dbUtils.rollback(dbClient);
}
close(dbClient);
}
catch (err) {
// closing client will destroy the client connection so no need to do rollback if locks were obtained and rollback was not done
close(dbClient);
throw err;
}
});
const exampleForPooledClient = co.wrap(function *(connectionParams) {
let pooledClient;
try {
pooledClient = yield dbUtils.createPooledClient(dbUtils.createConnectionUrl(connectionParams));
const locksObtained = yield dbUtils.lockEntities(pooledClient.dbClient, ['06f1ee30-a2f5-4585-9bc7-3c78e32075b9', '2dce4c44-3af8-4c5a-bff8-7ac7cca443e0']);
if (locksObtained) {
// do some work
// commit transaction started by dbUtils.lockEntities
yield dbUtils.commit(pooledClient.dbClient);
}
// closing pooledClient without err parameter will return client connection to connection pool
close(pooledClient);
}
catch (err) {
// closing pooledClient with err parameter will destroy client connection and not return it to the connection pool so no need to do rollback if locks were obtained and
// commit was not done
close(pooledClient, err);
throw err;
}
});
const example = co.wrap(function *(connectionParams) {
yield exampleForClient(connectionParams);
yield exampleForPooledClient(connectionParams);
});
example({host: 'exampleHost', databaseName: 'exampleDbname', user: 'exampleUser', password: 'examplePassword'})
.then(() => {
console.log(`Example complete`);
process.exit(0);
})
.catch(err => {
console.error(err.stack || err.message);
process.exit(1);
});
Aborts a transaction.
client
- Apg
Client.
See lockEntities
.
Sets defaults options for dbUtils
.
options
- An options object containing logger, highWaterMark, batchSize and/or connectTimeout properties.logger
- A logger that supportsinfo
anderror
functions, e.g.bunyan
. Default is no log message generated if dbUtils detects an error or has an info message to display.highWaterMark
- Same as inoptions
parameter increateQueryStream
.batchSize
- Same as inoptions
parameter increateQueryStream
.connectTimeout
- Same as inconnectionParams
parameter increateConnectionUrl
.
const co = require('co');
const dbUtils = require('@elm-slate/db-utils');
const example = co.wrap(function *(options) {
dbUtils.setDefaultOptions(options);
});
example({logger: null, highWaterMark: 32 * 1024, batchSize: 20000, connectTimeout: 30000})
.then(() => {
console.log(`Example complete`);
process.exit(0);
})
.catch(err => {
console.error(err.stack || err.message);
process.exit(1);
});