-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
executable file
·376 lines (339 loc) · 12.7 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
"use strict";
var express = require('express');
var logger = require('morgan');
var bodyParser = require('body-parser');
var sqlite3 = require('sqlite3');
var cors = require('cors');
var Promise = require('promise');
// Used to access the beer list from the current CBF website. We will ensure that
// our list of available beers is up-to-date with this list.
var cbf = require('./cbfAccess.js');
// Define our database schema and connection pooling
var dbModule = require('./db.js');
var config = require('./config.js');
var app = express();
// Provided by https://cloud.google.com/appengine/docs/flexible/nodejs/runtime#https_and_forwarding_proxies
app.set('trust proxy', true);
app.use(function (req, res, next) {
if (req.get('x-appengine-https') === 'on' && !req.get('x-forwarded-proto')) {
req.headers['x-forwarded-proto'] = 'https';
}
next();
});
// Needed to enabled Cross-Origin Resource Sharing so that web-pages from
// bedewell.com can still call the GCE service.
app.use(cors());
app.use( bodyParser.json() );
app.use( bodyParser.urlencoded({extended: true}) );
/**
* Here we define all the web service routes and the overall behaviour
* of the web service calls. The business logic for dealing with
* ratings is done below
*/
// Everything under /static is served by this route - particularly
// the test server
app.use('/static', express.static( __dirname + '/static' ));
// To allow development builds of the app to support the bedewell.com endpoint
// services framework we enable a get on the beer.service. This is ONLY useful
// when running in development
app.get('/endpoints/beer.service', ( req, res ) => {
res.send('');
});
/**
* Define all the GET requests first - these are all methods of retrieving
* information from the service.
*/
app.get('/get/ratings/journal', function( req, res ) {
var watermarkFrom = req.query.watermark;
getRatingsFromWatermark(watermarkFrom, function(output) {
res.send(output);
});
});
app.get('/get/ratings/all', function( req, res ) {
getAllRatings( function(output) {
res.send(output)
});
});
app.get('/get/ratings/user', function( req, res ) {
var user = req.query.user;
getUserRatings( user, function(output) {
res.send(output)
});
});
app.post('/post/newrating', function( req, res ) {
var body = req.body;
if (!( 'beerUUID' in body && 'rating' in body && 'user' in body)) {
res.send({});
}
addRating(body, function(err, watermark) {
if (err) {
res.send(err);
} else {
res.send( { beerUUID:body.beerUUID, rating:body.rating, watermark:watermark });
}
});
});
app.post('/post/testData', function( req, res ) {
testDatabase();
res.send(req.body);
});
app.post('/post/uploadDatabase', function( req, res ) {
saveDatabaseToGcloud().then(
(OK) => {res.send({result:'OK'});},
(err) => {res.send(err)});
});
// Our application will need to respond to health checks when running on
// Compute Engine with Managed Instance Groups.
app.get('/_ah/health', function (req, res) {
res.status(200).send('ok');
});
app.get('/_ah/stop', function (req, res) {
saveDatabaseToGcloud().then(
(OK) => {res.send({result:'OK'})}
).catch(
(err) => {res.send(err)}
);
});
// Only log unexpected requests - below the ones above that
// we currently expect.
app.use( logger('combined') );
app.get( '/get*', function( req, res ) {
console.log('GET', req.query);
res.send(req.query);
});
app.post( '/post*', function( req, res ) {
console.log('POST', req.body);
res.send(req.body);
});
// Create shared connection for use in querying the DB
var db;
var selectJournalQuery, selectRatingQuery;
var selectUserBeerRating, selectUserRatings;
var cbfObject;
var dbPool;
var dbSaveInterval;
// Make sure that the database is correctly created. On completion we can
// prepare some lookup queries,
dbModule.createDB(onDBCreated);
function onDBCreated(err) {
db = new sqlite3.cached.Database(config.dbLocation);
selectJournalQuery = db.prepare('SELECT beerID, rating FROM journal WHERE rowid > $watermark');
selectRatingQuery = db.prepare('SELECT * from beers');
selectUserBeerRating = db.prepare('SELECT rating from user_rating JOIN beers ON beerID = beers.rowid WHERE user = $user AND beerUUID = $beerUUID');
selectUserRatings = db.prepare('SELECT beerID, rating from user_rating WHERE user = $user');
dbPool = dbModule.makeDBPool();
// Finally get the correct watermark from the DB and start the application
// listening on port 3000.
var p1 = getLatestWatermarkFromDB().then( (val) => {
lastWatermark = val;
}).catch( (err) => {});
var p2 = cbf.getBeerDataFromCBF().then(
(obj) => {
cbfObject = obj;
console.log('Loaded beer data from CBF website');
ensureAllBeersExistInRatingsTable( cbfObject.producers )
},
(error) => {
console.log(`Got error from CBF Server: ${error.message}`);
});
Promise.all([p1, p2]).then(
(OK) => {
// Interact with this application on a production port
app.listen(config.port, (err) => {
console.log('Started listening on port ' + config.port)
});
dbSaveInterval = setInterval(saveDatabaseToGcloud, config.dbSaveInterval);
}).catch(
(error) => {
console.log('UNABLE TO START APP: ' + error);
});
}
function getLatestWatermarkFromDB() {
return new Promise( function( resolve, reject ) {
db.get('SELECT max(rowid) as lastWatermark FROM journal', function(err, row) {
if (err) {
reject(err);
} else {
if (row.lastWatermark !== null) {
resolve(row.lastWatermark);
} else {
resolve(0);
}
}
});
});
}
function saveDatabaseToGcloud() {
console.log('Uploading ' + config.dbFilename + ' to gcloud');
return cbf.uploadDatabaseToGcloud();
}
function ensureAllBeersExistInRatingsTable( producers ) {
var stmt = db.prepare('INSERT INTO beers VALUES ($beerUUID, $name, $brewery, 0, 0, 0, 0, 0)');
producers.forEach( (producer) => {
producer.products.forEach( (product) => {
stmt.run( {$beerUUID: product.id, $name: product.name, $brewery: producer.name} );
})
});
}
/**
* This is the business logic for dealing with storing ratings.
*/
var lastWatermark = 0;
function getRatingsFromWatermark( watermarkFrom, callback ) {
- selectJournalQuery.all( {$watermark: watermarkFrom},
function(err, rows) {
if (err) {
callback(err);
return
}
try {
var ratings = new Array(rows.length);
var beerIDs = new Array(rows.length);
for ( var i = 0; i < rows.length; i++ ) {
var t = rows[i];
ratings[i] = t.rating;
beerIDs[i] = t.beerID;
}
callback({newWatermark: lastWatermark, journal: {beerIDs:beerIDs, ratings:ratings}});
} catch (err) {
callback(err);
}
});
}
function getUserRatings( user, callback ) {
selectUserRatings.all( {$user: user},
function(err, rows) {
if (err) {
callback(err);
} else {
callback(rows);
}
});
}
function getAllRatings( callback ) {
selectRatingQuery.all( function( err, rows ) {
if (err) {
callback(err);
return
}
try {
var ratings = new Array(rows.length);
var beerUUIDs = new Array(rows.length);
var names = new Array(rows.length);
for ( var i = 0; i < rows.length; i++ ) {
var t = rows[i];
ratings[i] = [t.r1, t.r2, t.r3, t.r4, t.r5];
beerUUIDs[i] = t.beerUUID;
names[i] = t.name;
}
callback( {newWatermark: lastWatermark, beerUUIDs: beerUUIDs, ratings: ratings, names: names} );
} catch (err) {
callback(err);
}
});
}
function addRating( obj, callback ) {
/*
* Code structure is going to be:
* Search user_ratings for old rating
* If it exists we will need to remove it from the existing
*/
dbPool.acquire( function(err, client) {
if (err) {
callback(err, null);
} else {
addRatingOnPooledConnection(client, obj, callback);
}
});
}
function addRatingOnPooledConnection(client, obj, callback, retries) {
retries = retries || 5;
if (retries < 5) console.log('Retries = ' + retries);
var beforeExit = function(arg) {dbPool.release(client)};
client.begin().then( (noerror) => {
// We managed to get a successful lock on the database to update the
// ratings so start looking for an existing rating for this user and beer
client.selectUserBeerRating.get({$user: obj.user, $beerUUID: obj.beerUUID}, function( err, row ) {
if (err) {
client.rollback().then( beforeExit, beforeExit );
callback(err, null);
return
}
// Return from select statement lets us know if this user and beer combination
// has been rated before. If it has, we need to undo the
var change = { 6: obj.beerUUID, 1:0, 2:0, 3:0, 4:0, 5:0 };
var journalChanges = [];
if ( row !== undefined ) {
// Giving the same rating again? Simply ignore since nothing needs to
// be done.
if ( row.rating == obj.rating ) {
client.rollback().then( beforeExit, beforeExit );
callback(null, lastWatermark);
return;
}
// Otherwise, old rating needs to be removed from the journal
// and from the ratings count
change[row.rating] += -1;
journalChanges.push({$user: obj.user, $beerUUID: obj.beerUUID, $rating:-row.rating});
}
// New rating needs to be added to the journal and the ratings count
change[obj.rating] += 1;
journalChanges.push({$user: obj.user, $beerUUID: obj.beerUUID, $rating:obj.rating});
var counter = 0;
var firstError = null
var afterAllQuerysAreComplete = function(err) {
counter--;
if (err && firstError === null) {
firstError = err;
}
if ( counter === 0 ) {
if ( firstError === null ) {
client.commit().then( beforeExit, beforeExit );
lastWatermark = this.lastID;
callback(null, lastWatermark);
} else {
client.rollback().then( beforeExit, beforeExit );
callback(firstError, null);
}
}
};
var getNewQueryCompleteFunction = function() {
counter++;
return afterAllQuerysAreComplete
};
// Replace user beer rating so we know this user has rated this beer
client.setUserBeerRating.run(
{$user: obj.user, $beerUUID: obj.beerUUID, $rating: obj.rating},
getNewQueryCompleteFunction());
// Update the ratings counts
client.updateBeerRatings.run(change, getNewQueryCompleteFunction());
// Insert the changes into the journal
for ( let journalChange of journalChanges ) {
client.insertJournalEntry.run(journalChange, getNewQueryCompleteFunction());
}
});
}, (err) => {
// Error in BEGIN TRANSACTION
if ( retries < 1 ) {
beforeExit(null);
callback(err, null);
} else {
setTimeout(addRatingOnPooledConnection, 100, client, obj, callback, retries-1);
}
});
}
function logDBError(err, text) {
if (err) {
text = text || '';
console.log(err + text);
}
}
function testDatabase() {
db.serialize(function() {
var stmt = db.prepare("SELECT * FROM ratings");
stmt.all( function(err, rows) {
console.log(rows);
});
stmt.finalize();
});
}