-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
183 lines (154 loc) · 6.83 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
const express = require('express');
const { Kafka } = require('kafkajs');
const config = require('./config.json');
const dbAccess = require('./models/db_access');
const registry = require('./routes/register_routes');
const session = require('express-session');
const cors = require('cors');
const SnappyCodec = require('kafkajs-snappy');
const { CompressionTypes, CompressionCodecs } = require('kafkajs');
CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec;
const kafka = new Kafka({
clientId: 'my-app',
brokers: config.kafka.bootstrapServers
});
const consumer = kafka.consumer({ groupId: config.kafka.groupId });
const producer = kafka.producer();
const topic1 = "Twitter-Kafka";
const topic2 = "FederatedPosts";
async function runConsumer() {
try {
console.log('Database successfully connected.');
await consumer.connect();
await consumer.subscribe({ topics: [topic1, topic2], fromBeginning: true });
console.log('Kafka consumer connected and subscribed.');
consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log(`Received message from ${topic}: ${message.value.toString()}`);
try {
const data = JSON.parse(message.value.toString());
if (topic === topic1) {
console.log('Processing Twitter post:', data);
await handleTwitterPosts(data);
} else if (topic === topic2) {
console.log('Processing Federated post:', data);
await handleFederatedPosts(data);
}
await consumer.commitOffsets([{ topic, partition, offset: (message.offset + 1).toString() }]);
} catch (err) {
console.error('Error processing message:', err);
}
}
});
} catch (error) {
console.error('Error during startup:', error);
}
}
// 1. Text field
// 2. HASHTAG
// insert this into our table
// is data parseable?
// check if texts / posts are undefined ;; if any are not, don't consider it
async function handleTwitterPosts(data) {
try {
console.log('Processing Twitter post:', data);
const text = data.text;
const hashtags = data.hashtags || [];
if (!text) {
console.log('Error processing Twitter post: text is undefined.');
return;
}
const insertPostQuery = `
INSERT INTO posts (author_id, title, content, parent_post, timestamp)
VALUES (-1, '', ?, NULL, NOW());
`;
const postResult = await dbAccess.send_sql(insertPostQuery, [text]);
const postId = postResult.insertId;
console.log(`Post inserted with ID: ${postId}`);
for (const rawHashtag of hashtags) {
const hashtag = rawHashtag.startsWith('#') ? rawHashtag : `#${rawHashtag}`;
let tagId;
const tagCheckQuery = `SELECT hashtag_id FROM hashtags WHERE hashtagname = ?;`;
const tagCheckResult = await dbAccess.send_sql(tagCheckQuery, [hashtag]);
if (tagCheckResult.length > 0) {
tagId = tagCheckResult[0].hashtag_id;
} else {
const tagInsertQuery = `INSERT INTO hashtags (hashtagname) VALUES (?);`;
const tagInsertResult = await dbAccess.send_sql(tagInsertQuery, [hashtag]);
tagId = tagInsertResult.insertId;
}
const postTagRelationQuery = `INSERT INTO post_tagged_with (post_id, hashtag_id) VALUES (?, ?);`;
await dbAccess.send_sql(postTagRelationQuery, [postId, tagId]);
}
console.log('Twitter post processed successfully with hashtags and inserted into texts table.');
} catch (error) {
console.error('Error processing Twitter post:', error);
console.error(`Failed operation data: ${JSON.stringify(data)}`);
}
}
// federated posts
async function handleFederatedPosts(data) {
try {
console.log('Processing Federated post:', data);
const post_text = data.post_text;
const content_type = data.content_type;
if (!post_text) {
throw new Error('Post text is undefined or empty');
}
const author_id = -1; // Use a consistent author ID for federated posts
// Regular expression to find hashtags
const hashtagRegex = /#(\w+)/g;
let match;
const hashtags = [];
while ((match = hashtagRegex.exec(post_text)) !== null) {
hashtags.push(`#${match[1]}`);
}
// Insert the post into the database
const insertPostQuery = `
INSERT INTO posts (title, content, author_id, timestamp)
VALUES (?, ?, ?, NOW());
`;
const postResult = await dbAccess.send_sql(insertPostQuery, ['', post_text, author_id]);
const postId = postResult.insertId;
console.log(`Federated post inserted with ID: ${postId}`);
// Handle each hashtag found in the post text
for (const hashtag of hashtags) {
let tagId;
const tagCheckQuery = `SELECT hashtag_id FROM hashtags WHERE hashtagname = ?;`;
const tagCheckResult = await dbAccess.send_sql(tagCheckQuery, [hashtag]);
if (tagCheckResult.length > 0) {
tagId = tagCheckResult[0].hashtag_id;
} else {
const tagInsertQuery = `INSERT INTO hashtags (hashtagname) VALUES (?);`;
const tagInsertResult = await dbAccess.send_sql(tagInsertQuery, [hashtag]);
tagId = tagInsertResult.insertId;
}
const postTagRelationQuery = `INSERT INTO post_tagged_with (post_id, hashtag_id) VALUES (?, ?);`;
await dbAccess.send_sql(postTagRelationQuery, [postId, tagId]);
}
console.log('Federated post processed successfully with hashtags and inserted into texts table.');
} catch (error) {
console.error('Error processing Federated post:', error);
}
}
async function sendPostToKafka(post, producer, topic) {
try {
await producer.connect();
// for post to match the required Kafka message structure
const kafkaMessage = {
username: post.author_id,
source_site: 'g07',
post_uuid_within_site: post.uuid,
post_text: post.content,
content_type: 'testing please work!!!!!!!!!!!!!!!!!!!!!!! '
};
// prepare message for Kafka
const messages = [{ value: JSON.stringify(kafkaMessage) }];
await producer.send({ topic, messages });
console.log('Post sent to Kafka:', kafkaMessage);
} catch (error) {
console.error('Error sending post to Kafka:', error);
}
}
runConsumer().catch(error => console.error('Error in Kafka Consumer:', error));
module.exports = sendPostToKafka;