Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add back old Content Services as Channels #506

Open
wants to merge 9 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion lib/fetching.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
process.title = 'aggie-fetching';

const childProcess = require('./child-process');
const Source = require('../models/source');
const downstream = require('./fetching/downstream');
const { initChannels } = require('./fetching/sourceToChannel');

Expand Down
8 changes: 4 additions & 4 deletions lib/fetching/channels/crowdtangle.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class AggieCrowdTangleChannel extends CrowdTangleChannel {
constructor(options) {
super(options);

this.savedSearchToggle = false;
this.savedSearchToggle = true;
this.hashedToken = hash(this.dashboardToken);
}

Expand Down Expand Up @@ -103,9 +103,11 @@ class AggieCrowdTangleChannel extends CrowdTangleChannel {
}

/**
* Overrides the CrowdTangleChannel `fetchPage` function.
* Overrides the CrowdTangleChannel's `fetchPage` function.
*/
async fetchPage() {
this.savedSearchToggle = !this.savedSearchToggle;

this.listPair = await this.readListPair();
const { crowdtangle_saved_searches: savedSearches } = this.listPair;

Expand All @@ -126,8 +128,6 @@ class AggieCrowdTangleChannel extends CrowdTangleChannel {
const posts = await super.fetchPage();
posts.forEach((post) => this.addCrowdTangleTags(post));

this.savedSearchToggle = !this.savedSearchToggle;

return posts;
}
}
Expand Down
131 changes: 131 additions & 0 deletions lib/fetching/channels/elmo.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
const { PageChannel } = require('downstream');
const moment = require('moment');
const request = require('request');
const hash = require('../../hash');

/**
* A Channel that fetches data from an instance of ELMO, a field data gathering system.
*
* Note: This Channel was copied directly from the old ELMO content service.
* If that content service contained bugs, then this Channel does too.
*/
class ELMOChannel extends PageChannel {

constructor(options) {
super({
...options,
namespace: options.namespace || `elmo-${hash(options.authToken)}`,
});

if (!options.authToken) {
throw new Error('The `authToken` field is required.');
}

if (!options.url) {
throw new Error('The `url` field is required.');
}

// initialize ELMOChannel variables
this.authToken = options.authToken;
this.url = options.url;
this.interval = options.interval || this.interval;
}

async fetchPage() {
let posts;

// Do the request. We just fetch one page at this time. If there is more data, we will fetch it next time.
// Assumes the responses will be returned in chronological order, newest to oldest.
let res;
let body;
try {
const { r, b } = await this.sendHTTPRequest({
url: this.getURLWithDate(),
headers: {
Authorization: 'Token token=' + this.authToken,
},
});
res = r;
body = b;
} catch (err) {
throw new Error('HTTP error:' + err.message);
}

if (res.statusCode !== 200) {
throw new Error.HTTP(res.statusCode);
}

// Parse JSON.
let responses = [];
try {
responses = JSON.parse(body);
} catch (err) {
throw new Error('Parse error: ' + err.message);
}

if (!(responses instanceof Array)) {
throw new Error('Wrong data');
}

// Need to reverse because we want oldest to newest.
responses = responses.reverse();

// Parse response data and return them.
posts = responses.map((response) => {
return this.parse(response);
});

return posts;
}

sendHTTPRequest(params) {
return new Promise((resolve, reject) => {
request(params, (err, res, body) => {
if (err) {
reject(err);
} else {
resolve({ res, body });
}
});
});
}

parse(response) {
const content = response.answers
.map((answer) => {
return '[' + answer.code + ': ' + answer.answer + ']';
})
.join(' ');

const now = new Date();
const authoredAt = new Date(response.created_at);
const url = this._baseUrl() + 'responses/' + response.id;
const author = response.submitter;

return {
authoredAt,
fetchedAt: now,
author,
content,
url,
platform: 'elmo',
// TODO: platformID
raw: response,
};
}

// Extracts the base URL from the source URL.
// e.g. http://example.com/api/v1/m/nepaltestmission/responses.json?form_id=99
// becomes http://example.com/api/v1/m/nepaltestmission/
getBaseURL() {
return this.url.match(/https?:.+\//)[0].replace('api/v1', 'en');
}

// Gets the ELMO URL with the date constraint applied.
getURLWithDate() {
return this.url + (this.lastTimestamp ? '&created_after=' + moment.utc(this.lastTimestamp).format('YYYYMMDDHHmmss') : '');
}

}

module.exports = ELMOChannel;
132 changes: 132 additions & 0 deletions lib/fetching/channels/rss.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
const { PageChannel } = require('downstream');
const FeedParser = require('feedparser');
const sanitizeHTML = require('sanitize-html');
const request = require('request');
const hash = require('../../hash');

/**
* A Channel that fetches posts from an RSS feed.
*
* Note: This Channel was copied directly from the old RSS content service.
* If that content service contained bugs, then this Channel does too.
*/
class RSSChannel extends PageChannel {
constructor(options) {
super({
...options,
namespace: options.namespace || `rss-${hash(options.url)}`,
});


if (!options.url) {
throw new Error('The `url` field is required.');
}

// initialize RSSChannel variables
this.url = options.url;
this.interval = options.interval || this.interval;
}

fetchPage() {
return new Promise((resolve, reject) => {
let posts;
const self = this;
const feedparser = new FeedParser();
this.doRequest()
.then(({ res, stream }) => {
if (res.statusCode !== 200) {
return reject(new Error.HTTP(res.statusCode));
}

stream.pipe(feedparser);

// Handle feedparser errors.
feedparser.on('error', (err) => {
return reject(new Error('Parse error: ' + err.message));
});

// Each time feedparser returns a readable, add it to the array (if it's new).
feedparser.on('readable', function() {
let data;

// Read and parse stream data
while (null !== (data = this.read())) {
const post = self.parse(data);

// Validate, make sure it's new
if (self.validate(post) && self.isNew(post)) {
posts.push(data);
}
}
});

// When feedparser says it's done, call callback
feedparser.on('end', (err) => {
if (err) {
return reject(err);
}

resolve(posts);
});

})
.catch((err) => {
reject(new Error('HTTP error:' + err.message));
});
});
}

// Makes request to RSS feed. Returns a stream object.
doRequest() {
return new Promise((resolve, reject) => {
const req = request(this.url);

req.on('error', (err) => {
reject(err);
});

req.on('response', function (res) {
resolve({
res,
stream: this,
});
});
});
}

// Validate post data
validate(post) {
if (!post.authoredAt || !post.content) {
return false;
}
return true;
}

// Determine whether to skip or include a report based on its authored date
isNew(post) {
return !this.lastTimestamp || post.authoredAt > this.lastTimestamp;
}

parse(data) {
const content = data.title ? '[' + data.title + '] ' : '';
content += data.description ? sanitizeHTML(data.description) : '';

const now = new Date();
const authoredAt = new Date(data.date);
const author = data.author;
const url = data.link;

return {
authoredAt,
fetchedAt: now,
content,
author,
url,
platform: 'rss',
raw: data,
}
}

}

module.exports = RSSChannel;
40 changes: 40 additions & 0 deletions lib/fetching/channels/smsgh.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
const { WebChannel } = require('downstream');

/**
* A Channel that receives SMS messages via a web server.
*
* Note: This Channel was copied from the old SMSGH content service.
* If that content service contained bugs, then this Channel does too.
*/
class SMSGhChannel extends WebChannel {

constructor(options) {
super({
...options,
path: `smsgh/${options.keywords}`, // must be a URL-safe string
});
}

parse(bodyBuffer) {
const json = JSON.parse(bodyBuffer.toString());

const now = new Date();
const authoredAt = new Date(json.date || now);
const url = '';
const author = json.from || 'anonymous';
const content = json.fulltext || '-NO TEXT-';

return {
authoredAt,
fetchedAt: now,
url,
author,
platform: 'smsgh',
content,
}
}
}

WebChannel.PORT = 4000;

module.exports = SMSGhChannel;
Loading