Skip to content

Commit

Permalink
Added account property 'connections'
Browse files Browse the repository at this point in the history
  • Loading branch information
andris9 committed Mar 16, 2024
1 parent 67b5e99 commit 555c5db
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 72 deletions.
6 changes: 6 additions & 0 deletions lib/account.js
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,12 @@ class Account {
}
break;

// number values
case 'connections':
case 'listRegistry':
result[key] = Number(accountData[key]) || 0;
break;

default:
result[key] = accountData[key];
break;
Expand Down
131 changes: 80 additions & 51 deletions lib/imap-connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ class IMAPConnection extends BaseClient {
this.syncing = false;

this.connectionCount = 0;
this.connections = new Set();

this.state = 'connecting';
}
Expand Down Expand Up @@ -220,6 +221,8 @@ class IMAPConnection extends BaseClient {
})
})
);
this.connections.add(commandClient);
await this.redis.hSetExists(this.getAccountKey(), 'connections', this.connections.size.toString());

commandClient.log.debug({ msg: 'Created command client', reason });

Expand All @@ -236,13 +239,18 @@ class IMAPConnection extends BaseClient {
}

commandClient.on('error', err => {
commandClient.log.error({ msg: 'IMAP connection error', cid: commandCid, channel: 'command', account: this.account, err });
commandClient?.log.error({ msg: 'IMAP connection error', cid: commandCid, channel: 'command', account: this.account, err });
this.commandClient = null;
});

commandClient.on('close', async () => {
this.connections.delete(commandClient);
await this.redis.hSetExists(this.getAccountKey(), 'connections', this.connections.size.toString());
commandClient.log.info({ msg: 'Connection closed', cid: commandCid, channel: 'command', account: this.account });

this.commandClient = null;
commandClient.removeAllListeners();
commandClient = null;
});

return commandClient;
Expand Down Expand Up @@ -628,63 +636,27 @@ class IMAPConnection extends BaseClient {
}
});

imapClient.on('close', async () => {
imapClient.log.info({ msg: 'Connection closed', type: 'imapClient', account: this.account, disabled: imapClient.disabled });

try {
for (let [, mailbox] of this.mailboxes) {
if (mailbox.syncing) {
try {
// set failure flag
await this.redis.hSetNew(
this.getAccountKey(),
'syncError',
JSON.stringify({
path: mailbox.path,
time: new Date().toISOString(),
error: {
error: 'Connection closed unexpectedly'
}
})
);
} catch (err) {
// ignore
}
}

if (mailbox.selected) {
// should be at most one though
await mailbox.onClose();
}
}

if (!imapClient.disabled && imapClient === this.imapClient) {
imapClient.log.debug({ msg: 'Requesting reconnection due to unexpected close', type: 'imapClient', account: this.account });
await this.reconnect();
}
} catch (err) {
imapClient.log.error({ msg: 'Connection close error', err });
}
});

return response;
}

async reconnect(force) {
this.logger.debug({ msg: 'Establishing connection', force });
if (this.paused || this._connecting || this.isClosing || (this.isClosed && !force)) {
if (this._connecting) {
// backoff reconnect already in progress
return false;
}
if (this.paused || this.isClosing || (this.isClosed && !force)) {
this.logger.debug({
msg: 'Skipped establishing connection',
paused: this.paused,
hasClient: !!this.imapClient,
usable: this.imapClient?.usable,
connecting: this._connecting,
closing: this.isClosing,
closed: this.isClosed,
force
});
return false;
}
this.logger.debug({ msg: 'Establishing connection', force });

if (force) {
this.closeSubconnections();
Expand All @@ -705,7 +677,7 @@ class IMAPConnection extends BaseClient {
startingDelay: 2000
});
this.logger.debug({
msg: 'Connection initiated',
msg: 'Connection created',
hasClient: !!this.imapClient,
usable: this.imapClient && this.imapClient.usable,
connected: this.isConnected()
Expand Down Expand Up @@ -1198,21 +1170,25 @@ class IMAPConnection extends BaseClient {

if (this.imapClient) {
this.logger.debug({ msg: 'Clearing previous connection' });
this.imapClient.disabled = true;
let prevImapClient = this.imapClient;
prevImapClient.disabled = true;
try {
this.imapClient.removeAllListeners();
this.imapClient.on('error', err => {
prevImapClient.removeAllListeners();
prevImapClient.once('error', err => {
this.logger.error({ msg: 'IMAP connection error', type: 'imapClient', previous: true, account: this.account, err });
});
this.imapClient.close();
prevImapClient.close();
if (this.connectionClient) {
this.logger.debug({ msg: 'Clearing previous command connection' });
this.connectionClient.close();
}
} catch (err) {
this.logger.error({ msg: 'IMAP close error', err });
} finally {
this.imapClient = null;
if (prevImapClient === this.imapClient) {
this.imapClient = null;
}
prevImapClient = null;
}
}

Expand All @@ -1236,6 +1212,11 @@ class IMAPConnection extends BaseClient {
expungeHandler: async payload => await this.expungeHandler(payload)
})
);
this.connections.add(imapClient);
await this.redis.hSetExists(this.getAccountKey(), 'connections', this.connections.size.toString());

imapClient.log.debug({ msg: 'Created primary client' });

this.imapClient = imapClient;

imapClient.primaryConnection = true;
Expand All @@ -1255,8 +1236,8 @@ class IMAPConnection extends BaseClient {
});

imapClient.on('error', err => {
this.logger.error({ msg: 'IMAP connection error', type: 'imapClient', account: this.account, err });
if (imapClient !== this.imapClient) {
imapClient?.log.error({ msg: 'IMAP connection error', type: 'imapClient', account: this.account, err });
if (imapClient !== this.imapClient || this._connecting) {
return;
}
this.reconnect().catch(err => {
Expand All @@ -1274,6 +1255,53 @@ class IMAPConnection extends BaseClient {
metricsMeta({}, this.logger, 'imapBytesReceived', 'inc', imapStats.received);
});

imapClient.on('close', async () => {
this.connections.delete(imapClient);
await this.redis.hSetExists(this.getAccountKey(), 'connections', this.connections.size.toString());
imapClient?.log.info({ msg: 'Connection closed', type: 'imapClient', account: this.account, disabled: imapClient.disabled });

try {
for (let [, mailbox] of this.mailboxes) {
if (mailbox.syncing) {
try {
// set failure flag
await this.redis.hSetNew(
this.getAccountKey(),
'syncError',
JSON.stringify({
path: mailbox.path,
time: new Date().toISOString(),
error: {
error: 'Connection closed unexpectedly'
}
})
);
} catch (err) {
// ignore
}
}

if (mailbox.selected) {
// should be at most one though
await mailbox.onClose();
}
}
} catch (err) {
imapClient.log.error({ msg: 'Connection close error', err });
}

try {
if (!imapClient.disabled && imapClient === this.imapClient && !this._connecting) {
imapClient.log.debug({ msg: 'Requesting reconnection due to unexpected close', type: 'imapClient', account: this.account });
await this.reconnect();
}
} catch (err) {
imapClient.log.error({ msg: 'Reconnection error', err });
}

imapClient = null;
});

try {
await this.connect();

Expand Down Expand Up @@ -3625,6 +3653,7 @@ class IMAPConnection extends BaseClient {

// create new
const subconnection = new Subconnection({
parent: this,
account: this.account,
mailbox,
getImapConfig: async () => await this.getImapConfig(),
Expand Down
59 changes: 38 additions & 21 deletions lib/subconnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ class Subconnection extends EventEmitter {
constructor(opts) {
super();
opts = opts || {};
this.parent = opts.parent;

this.mailbox = opts.mailbox || {};
this.path = this.mailbox.path;
this.logger = opts.logger;
Expand Down Expand Up @@ -91,38 +93,65 @@ class Subconnection extends EventEmitter {
async start() {
if (this.imapClient) {
// close previous
this.imapClient.disabled = true;
let prevImapClient = this.imapClient;
prevImapClient.disabled = true;
try {
this.imapClient.removeAllListeners();
this.imapClient.on('error', err => {
prevImapClient.removeAllListeners();
prevImapClient.once('error', err => {
this.logger.error({ msg: 'IMAP connection error', previous: true, account: this.account, err });
});
this.imapClient.close();
prevImapClient.close();
} catch (err) {
this.logger.error({ msg: 'IMAP close error', err });
} finally {
this.imapClient = null;
if (prevImapClient === this.imapClient) {
this.imapClient = null;
}
prevImapClient = null;
}
}

let imapConfig = await this.getImapConfig(null, this);

this.imapClient = new ImapFlow(
let imapClient = new ImapFlow(
Object.assign({}, imapConfig, {
logger: this.logger,
expungeHandler: async payload => await this.expungeHandler(payload)
})
);
this.parent.connections.add(imapClient);
await this.parent.redis.hSetExists(this.parent.getAccountKey(), 'connections', this.parent.connections.size.toString());

this.imapClient = imapClient;

this.imapClient.subConnection = true;
imapClient.subConnection = true;

this.imapClient.on('error', err => {
this.logger.error({ msg: 'IMAP connection error', account: this.account, err });
imapClient.on('error', err => {
imapClient?.log.error({ msg: 'IMAP connection error', account: this.account, err });
if (imapClient !== this.imapClient || this._connecting) {
return;
}
this.reconnect().catch(err => {
this.logger.error({ msg: 'IMAP reconnection error', account: this.account, err });
});
});

imapClient.on('close', async () => {
this.parent.connections.delete(imapClient);
await this.parent.redis.hSetExists(this.parent.getAccountKey(), 'connections', this.parent.connections.size.toString());
imapClient.log.info({ msg: 'Connection closed', account: this.account });

try {
if (!imapClient.disabled && imapClient === this.imapClient && !this._connecting) {
await this.reconnect();
}
} catch (err) {
this.logger.error({ msg: 'Reconnection error', err });
}

imapClient = null;
});

try {
await this.connect();
} catch (err) {
Expand Down Expand Up @@ -170,18 +199,6 @@ class Subconnection extends EventEmitter {
this.requestSync();
});

imapClient.on('close', async () => {
this.logger.info({ msg: 'Connection closed', account: this.account });

try {
if (!imapClient.disabled) {
await this.reconnect();
}
} catch (err) {
this.logger.error({ msg: 'Connection close error', err });
}
});

return response;
}

Expand Down
1 change: 1 addition & 0 deletions workers/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -2850,6 +2850,7 @@ When making API calls remember that requests against the same account are queued
'state',
'smtpStatus',
'syncError',
'connections',
'locale',
'tz'
]) {
Expand Down

0 comments on commit 555c5db

Please sign in to comment.