Skip to content

Commit

Permalink
Move additional methods to BaseClient for sharing
Browse files Browse the repository at this point in the history
  • Loading branch information
andris9 committed May 14, 2024
1 parent 44cda34 commit a494d47
Show file tree
Hide file tree
Showing 3 changed files with 211 additions and 200 deletions.
200 changes: 199 additions & 1 deletion lib/api-client/base-client.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,39 @@
'use strict';

const { parentPort } = require('worker_threads');
const crypto = require('crypto');
const logger = require('../logger');
const { REDIS_PREFIX } = require('../consts');
const { webhooks: Webhooks } = require('../webhooks');
const { getESClient } = require('../document-store');
const { getThread } = require('../threads');
const settings = require('../settings');

const {
ACCOUNT_INITIALIZED_NOTIFY,
REDIS_PREFIX,
MESSAGE_NEW_NOTIFY,
MESSAGE_DELETED_NOTIFY,
MESSAGE_UPDATED_NOTIFY,
EMAIL_BOUNCE_NOTIFY,

MAILBOX_DELETED_NOTIFY
} = require('../consts');

let pfStructuredClone = typeof structuredClone === 'function' ? structuredClone : data => JSON.parse(JSON.stringify(data));

async function metricsMeta(meta, logger, key, method, ...args) {
try {
parentPort.postMessage({
cmd: 'metrics',
key,
method,
args,
meta: meta || {}
});
} catch (err) {
logger.error({ msg: 'Failed to post metrics to parent', err });
}
}

class BaseClient {
constructor(account, options) {
Expand Down Expand Up @@ -86,6 +117,173 @@ class BaseClient {
currentState() {
return 'connected';
}

async setStateVal() {
let [[e1], [e2], [e3, prevVal], [e4, incrVal], [e5, stateVal]] = await this.redis
.multi()
.hSetExists(this.getAccountKey(), 'state', this.state)
.hSetBigger(this.getAccountKey(), 'runIndex', this.runIndex.toString())
.hget(this.getAccountKey(), `state:count:${this.state}`)
.hIncrbyExists(this.getAccountKey(), `state:count:${this.state}`, 1)
.hget(this.getAccountKey(), 'state')
.exec();

if (e1 || e2 || e3 || e4 || e5) {
throw e1 || e2 || e3 || e4 || e5;
}

if (stateVal === 'connected' && incrVal === 1 && prevVal === '0') {
// first connected event!
await this.notify(false, ACCOUNT_INITIALIZED_NOTIFY, {
initialized: true
});
}
}

async notify(mailbox, event, data, extraOpts) {
extraOpts = extraOpts || {};
const { skipWebhook, canSync = true } = extraOpts;

metricsMeta({ account: this.account }, this.logger, 'events', 'inc', {
event
});

switch (event) {
case 'connectError':
case 'authenticationError': {
let shouldNotify = await this.setErrorState(event, data);

if (!shouldNotify) {
// do not send a webhook as nothing really changed
return;
}
break;
}
}

let serviceUrl = (await settings.get('serviceUrl')) || true;

let payload = {
serviceUrl,
account: this.account,
date: new Date().toISOString()
};

let path = (mailbox && mailbox.path) || (data && data.path);
if (path) {
payload.path = path;
}

if (mailbox && mailbox.listingEntry && mailbox.listingEntry.specialUse) {
payload.specialUse = mailbox.listingEntry.specialUse;
}

if (event) {
payload.event = event;
}

if (data) {
payload.data = data;
}

let queueKeep = (await settings.get('queueKeep')) || true;

let addDocumentQueueJob =
canSync &&
this.documentsQueue &&
[MESSAGE_NEW_NOTIFY, MESSAGE_DELETED_NOTIFY, MESSAGE_UPDATED_NOTIFY, EMAIL_BOUNCE_NOTIFY, MAILBOX_DELETED_NOTIFY].includes(event) &&
(await settings.get('documentStoreEnabled'));

if (addDocumentQueueJob && payload.data && event === MESSAGE_NEW_NOTIFY && !payload.data.threadId) {
// Generate a thread ID for the email. This is also stored in ElasticSearch.
const { index, client } = await getESClient(logger);
try {
if (client) {
let thread = await getThread(client, index, this.account, payload.data, logger);
if (thread) {
payload.data.threadId = thread;
logger.info({
msg: 'Provisioned thread ID for a message',
account: this.account,
message: payload.data.id,
threadId: payload.data.threadId
});
}
}
} catch (err) {
if (logger.notifyError) {
logger.notifyError(err, event => {
event.setUser(this.account);
event.addMetadata('ee', {
index
});
});
}
logger.error({ msg: 'Failed to resolve thread', account: this.account, message: payload.data.id, err });
}
}

const defaultJobOptions = {
removeOnComplete: queueKeep,
removeOnFail: queueKeep,
attempts: 10,
backoff: {
type: 'exponential',
delay: 5000
}
};

// use more attempts for ElasticSearch updates
const documentJobOptions = Object.assign(pfStructuredClone(defaultJobOptions), { attempts: 16 });

if (!skipWebhook && addDocumentQueueJob) {
// add both jobs as a Flow

let notifyPayload = await Webhooks.formatPayload(event, payload);

const queueFlow = [
{
name: event,
data: payload,
queueName: 'documents'
}
];

await Webhooks.pushToQueue(event, notifyPayload, {
routesOnly: true,
queueFlow
});

await this.flowProducer.add(
{
name: event,
data: notifyPayload,
queueName: 'notify',
children: queueFlow
},
{
queuesOptions: {
notify: {
defaultJobOptions
},
documents: {
defaultJobOptions: documentJobOptions
}
}
}
);
} else {
// add to queues as normal jobs

if (!skipWebhook) {
await Webhooks.pushToQueue(event, await Webhooks.formatPayload(event, payload));
}

if (addDocumentQueueJob) {
await this.documentsQueue.add(event, payload, documentJobOptions);
}
}
}
}

module.exports = { BaseClient };
34 changes: 11 additions & 23 deletions lib/api-client/gmail-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ class PageCursor {
class GmailClient extends BaseClient {
constructor(account, options) {
super(account, options);

this.cachedAccessToken = null;
this.cachedAccessTokenRaw = null;
}

async request(...args) {
Expand All @@ -202,11 +205,9 @@ class GmailClient extends BaseClient {
async listMailboxes(options) {
console.log('LIST MAILBOXES', options);
await this.prepare();
console.log(1);
const accessToken = await this.getToken();
console.log(2, accessToken);

let labelsResult = await this.request(`${GMAIL_API_BASE}/gmail/v1/users/me/labels`);
console.log(3, labelsResult);

let labels = labelsResult.labels.filter(label => !SKIP_LABELS.includes(label.id));

let resultLabels;
Expand Down Expand Up @@ -294,8 +295,6 @@ class GmailClient extends BaseClient {
console.log('LIST MESSAGES', query);
await this.prepare();

const accessToken = await this.getToken();

let pageSize = Math.abs(Number(query.pageSize) || 20);
let requestQuery = {
maxResults: pageSize
Expand Down Expand Up @@ -327,6 +326,7 @@ class GmailClient extends BaseClient {
let messageEntry = await this.request(`${GMAIL_API_BASE}/gmail/v1/users/me/messages/${query.search.emailId}`, 'get', {
format: 'full'
});

if (messageEntry) {
messageList.push(this.formatMessage(messageEntry, { path }));
}
Expand All @@ -346,14 +346,9 @@ class GmailClient extends BaseClient {

if (query.search.threadId) {
// Threading is a special case
let threadListingResult = await this.oAuth2Client.request(
accessToken,
`${GMAIL_API_BASE}/gmail/v1/users/me/threads/${query.search.threadId}`,
'get',
{
format: 'full'
}
);
let threadListingResult = await this.request(`${GMAIL_API_BASE}/gmail/v1/users/me/threads/${query.search.threadId}`, 'get', {
format: 'full'
});

let messageCount = threadListingResult?.messages?.length || 0;
let currentPage = pageCursor.currentPage();
Expand Down Expand Up @@ -444,7 +439,7 @@ class GmailClient extends BaseClient {
};

for (let { id: message } of listingResult.messages) {
promises.push(this.oAuth2Client.request(accessToken, `${GMAIL_API_BASE}/gmail/v1/users/me/messages/${message}`));
promises.push(this.request(`${GMAIL_API_BASE}/gmail/v1/users/me/messages/${message}`));
if (promises.length > LIST_BATCH_SIZE) {
await resolvePromises();
}
Expand Down Expand Up @@ -916,15 +911,8 @@ class GmailClient extends BaseClient {

await this.prepare();

const accessToken = await this.getToken();

const requestQuery = {};
const result = await this.oAuth2Client.request(
accessToken,
`${GMAIL_API_BASE}/gmail/v1/users/me/messages/${messageId}/attachments/${id}`,
'get',
requestQuery
);
const result = await this.request(`${GMAIL_API_BASE}/gmail/v1/users/me/messages/${messageId}/attachments/${id}`, 'get', requestQuery);

return {
content: result?.data ? Buffer.from(result?.data, 'base64url') : null,
Expand Down
Loading

0 comments on commit a494d47

Please sign in to comment.