Skip to content

Commit

Permalink
Send realtime updates to clients via SockJS
Browse files Browse the repository at this point in the history
  • Loading branch information
nylen committed Jul 25, 2016
1 parent 0291c7c commit c1ce6ff
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 49 deletions.
140 changes: 93 additions & 47 deletions bin/intake.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
#!/usr/bin/env node

var moment = require('moment'),
net = require('net'),
split = require('split');

JSON.stringifyCanonical = require('canonical-json');

var config = require('../lib/config');

var insertTemperatureQuery = `
INSERT INTO temperature_data (
measured_at,
Expand Down Expand Up @@ -61,67 +64,110 @@ function schedulesAreEqual(a, b) {
);
}

var updateListener = net.connect(config.updateListener.port, 'localhost', function() {
console.log('Connected to API socket');
updateListener.connected = true;
updateListener.on('end', function() {
console.log('Disconnected from API socket');
updateListener = null;
});
});

updateListener.on('error', function(err) {
console.log('Failed to connect to API socket: ' + err.message);
updateListener = null;
});

require('../lib/db').connect(function(err, db) {
if (err) throw err;

var lastSchedule = null;

function sendUpdate(obj, next) {
if (updateListener && updateListener.connected) {
var update = Object.assign({
type : 'update',
}, obj);
delete update.schedule;
updateListener.write(JSON.stringify(update) + "\n");
}
db.query(insertTemperatureQuery, [
// measured_at DATETIME NOT NULL
moment.utc(obj.timestamp).toDate(),
// temp_1 MEDIUMINT
Math.round(obj.computed.T1 * 100),
// temp_2 MEDIUMINT
Math.round(obj.computed.T2 * 100),
// temp_3 MEDIUMINT
Math.round(obj.computed.T3 * 100),
// temp_avg MEDIUMINT NOT NULL
Math.round(obj.computed.temperature * 100),
// setpoint MEDIUMINT
obj.setpoint ? Math.round(obj.setpoint * 100) : null
], function(err) {
console.log('updated');
next(err);
});
}

function sendSchedule(obj, next) {
if (obj.schedule) {
if (!schedulesAreEqual(obj.schedule, lastSchedule)) {
if (updateListener && updateListener.connected) {
var update = {
type : 'schedule',
schedule : obj.schedule,
};
updateListener.write(JSON.stringify(update) + "\n");
}
db.query(insertScheduleQuery, [
// changed_at DATETIME NOT NULL
moment.utc(obj.schedule.now).toDate(),
// schedule_started_at DATETIME
moment.utc(obj.schedule.startedAt).toDate(),
// step_started_at DATETIME
moment.utc(obj.schedule.stepStartedAt).toDate(),
// steps_json VARCHAR(1000)
JSON.stringifyCanonical(obj.schedule.steps)
], function(err) {
console.log('updated schedule');
next(err);
});
}
} else {
next(null);
}
}

process.stdin.pipe(split())
.on('data', function(line) {
if (!line.trim()) return;
var obj = null;
try {
var obj = JSON.parse(line);
if (
!obj.timestamp ||
!obj.computed.T1 ||
!obj.computed.T2 ||
!obj.computed.T3 ||
!obj.computed.temperature ||
!('setpoint' in obj)
) {
throw new Error('invalid object data');
}
process.stdin.pause();
db.query(insertTemperatureQuery, [
// measured_at DATETIME NOT NULL
moment.utc(obj.timestamp).toDate(),
// temp_1 MEDIUMINT
Math.round(obj.computed.T1 * 100),
// temp_2 MEDIUMINT
Math.round(obj.computed.T2 * 100),
// temp_3 MEDIUMINT
Math.round(obj.computed.T3 * 100),
// temp_avg MEDIUMINT NOT NULL
Math.round(obj.computed.temperature * 100),
// setpoint MEDIUMINT
obj.setpoint ? Math.round(obj.setpoint * 100) : null
], function(err) {
process.stdin.resume();
if (err) throw err;
console.log('updated');
});
obj = JSON.parse(line);
} catch (err) {
console.error('Invalid object: ' + err.message);
}
if (obj.schedule) {
if (!schedulesAreEqual(obj.schedule, lastSchedule)) {
process.stdin.pause();
db.query(insertScheduleQuery, [
// changed_at DATETIME NOT NULL
moment.utc(obj.schedule.now).toDate(),
// schedule_started_at DATETIME
moment.utc(obj.schedule.startedAt).toDate(),
// step_started_at DATETIME
moment.utc(obj.schedule.stepStartedAt).toDate(),
// steps_json VARCHAR(1000)
JSON.stringifyCanonical(obj.schedule.steps)
], function(err) {
process.stdin.resume();
if (
!obj.timestamp ||
!obj.computed.T1 ||
!obj.computed.T2 ||
!obj.computed.T3 ||
!obj.computed.temperature ||
!('setpoint' in obj)
) {
console.error('invalid object data');
obj = null;
}
if (obj) {
process.stdin.pause();
sendUpdate(obj, function(err) {
if (err) throw err;
sendSchedule(obj, function(err) {
if (err) throw err;
console.log('updated schedule');
lastSchedule = obj.schedule;
process.stdin.resume();
});
}
});
}
})
.on('end', function() {
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"js-yaml" : "^3.6.1",
"moment" : "^2.13.0",
"mysql" : "^2.10.2",
"sockjs" : "^0.3.17",
"split" : "^1.0.0",
"suspend" : "^0.7.0"
},
Expand Down
42 changes: 40 additions & 2 deletions server.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
const cors = require('cors');
const express = require('express');
const http = require('http');
const moment = require('moment');
const net = require('net');
const sockjs = require('sockjs');
const split = require('split');

const config = require('./lib/config');
const TemperatureDataStore = require('./lib/TemperatureDataStore');

const app = express();
const apiRouter = express.Router();
const server = http.createServer(app);

let clientsToUpdate = [];

require('./lib/db').connect((err, db) => {
if (err) throw err;
Expand All @@ -16,6 +23,35 @@ require('./lib/db').connect((err, db) => {
console.log(`TemperatureDataStore: ${msg}`);
});

const updateListener = net.createServer(client => {
client.pipe(split())
.on('data', function(line) {
if (!line.trim()) return;
clientsToUpdate.forEach(c => c.write(line));
});
});

updateListener.listen(config.updateListener.port, 'localhost', () => {
console.log('Listening for updates on :' + config.updateListener.port);
});

// TODO: upgrade to Apache 2.4 - http://stackoverflow.com/q/27526281/106302
const updateServer = sockjs.createServer({
sockjs_url : 'http://cdn.jsdelivr.net/sockjs/1.1.1/sockjs.min.js',
websocket : false,
});

updateServer.on('connection', function(conn) {
clientsToUpdate.push(conn);
conn.on('close', function() {
clientsToUpdate = clientsToUpdate.filter(c => c !== conn);
});
});

updateServer.installHandlers(server, {
prefix : (config.http.basePath || '') + '/sockjs'
});

apiRouter.get('/data', (req, res) => {
const min = +req.query.min || +moment.utc().subtract(2, 'days');
const max = +req.query.max || +moment.utc();
Expand Down Expand Up @@ -71,12 +107,14 @@ require('./lib/db').connect((err, db) => {
app.use(cors());
app.use(config.http.basePath || '/', apiRouter);

app.listen(config.http.port, () => {
console.log('Listening on :' + config.http.port);
server.listen(config.http.port, () => {
console.log('Listening for requests on :' + config.http.port);
});

process.on('SIGINT', () => {
console.log();
updateListener.close();
console.log('Update listener closed');
db.end(err => {
if (err) throw err;
console.log('MySQL connection closed');
Expand Down

0 comments on commit c1ce6ff

Please sign in to comment.