Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
264 changes: 263 additions & 1 deletion docs/api/openapidocs.json

Large diffs are not rendered by default.

17 changes: 17 additions & 0 deletions lib/api/filters.js
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,10 @@ module.exports = (db, server, userHandler, settingsHandler) => {
filterData.action.targets = targets;
}

if (Array.isArray(values.action.keywords) && values.action.keywords.length) {
filterData.action.keywords = values.action.keywords;
}

if (values.action.mailbox) {
let mailboxData;
try {
Expand Down Expand Up @@ -1166,6 +1170,14 @@ module.exports = (db, server, userHandler, settingsHandler) => {
hasChanges = true;
}

if (Array.isArray(values.action.keywords) && values.action.keywords.length) {
$set['action.keywords'] = values.action.keywords;
hasChanges = true;
} else if ('keywords' in req.params.action) {
$unset['action.keywords'] = true;
hasChanges = true;
}

if (values.action) {
if (!values.action.mailbox) {
if ('mailbox' in req.params.action) {
Expand Down Expand Up @@ -1361,6 +1373,11 @@ function getFilterStrings(filter, mailboxes) {
];
}
break;
case 'keywords':
if (filter.action[key] && filter.action[key].length) {
return ['apply keywords', filter.action[key].join(', ')];
}
break;
}
return false;
})
Expand Down
231 changes: 214 additions & 17 deletions lib/api/messages.js

Large diffs are not rendered by default.

143 changes: 126 additions & 17 deletions lib/api/updates.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const crypto = require('crypto');
const Joi = require('joi');
const ObjectId = require('mongodb').ObjectId;
const tools = require('../tools');
const consts = require('../consts');
const roles = require('../roles');
const base32 = require('base32.js');
const { sessSchema, sessIPSchema } = require('../schemas');
Expand All @@ -16,6 +17,7 @@ const getMailboxCounterCb = (db, mailbox, type, callback) => {
.catch(err => callback(err));
};


module.exports = (db, server, notifier) => {
server.get(
{
Expand Down Expand Up @@ -226,7 +228,7 @@ function formatJournalData(e) {
let data = {};
Object.keys(e).forEach(key => {
if (!['_id', 'ignore', 'user', 'modseq', 'unseenChange', 'created'].includes(key)) {
if (e.command !== 'COUNTERS' && key === 'unseen') {
if (!['COUNTERS', 'KEYWORD_COUNTERS', 'FLAGGED_COUNTER'].includes(e.command) && key === 'unseen') {
return;
}
data[key] = e[key];
Expand All @@ -249,6 +251,82 @@ function loadJournalStream(db, req, res, user, lastEventId, done) {
}

let mailboxes = new Set();
let changedKeywords = new Set();
let flaggedChanged = false;

let emitFlaggedCounter = async next => {
if (!flaggedChanged) {
return next();
}

try {
const [total, unseen] = await Promise.all([tools.getFlaggedCounter(db, user), tools.getFlaggedCounter(db, user, 'unseen')]);
res.write(
formatJournalData({
command: 'FLAGGED_COUNTER',
_id: lastEventId,
total,
unseen
})
);
} catch {
// ignore
}
next();
};

let emitKeywordCounters = async next => {
if (!changedKeywords.size) {
return next();
}

try {
const userKey = user.toString();
const cachedResults = await Promise.all(
[...changedKeywords].map(async keyword => {
const isCached = await db.redis.exists(`kw:total:${userKey}:${keyword}`);
return isCached ? keyword : null;
})
);

const toEmit = cachedResults.filter(Boolean);
if (!toEmit.length) {
return next();
}

const keywordResults = await Promise.all(
toEmit.map(async keyword => {
let total, unseen;
try {
total = await tools.getKeywordCounter(db, user, keyword);
} catch {
total = 0;
}
try {
unseen = await tools.getKeywordCounter(db, user, keyword, 'unseen');
} catch {
unseen = 0;
}
return { keyword, total, unseen };
})
);

for (const { keyword, total, unseen } of keywordResults) {
res.write(
formatJournalData({
command: 'KEYWORD_COUNTERS',
_id: lastEventId,
keyword,
total,
unseen
})
);
}
} catch {
// ignore
}
next();
};

let cursor = db.database.collection('journal').find(query).sort({ _id: 1 });
let processed = 0;
Expand All @@ -259,21 +337,25 @@ function loadJournalStream(db, req, res, user, lastEventId, done) {
}
if (!e) {
return cursor.close(() => {
let finalize = () =>
emitFlaggedCounter(() =>
emitKeywordCounters(() =>
done(null, {
lastEventId,
processed
})
)
);

if (!mailboxes.size) {
return done(null, {
lastEventId,
processed
});
return finalize();
}

mailboxes = Array.from(mailboxes);
let mailboxPos = 0;
let emitCounters = () => {
if (mailboxPos >= mailboxes.length) {
return done(null, {
lastEventId,
processed
});
return finalize();
}
let mailbox = new ObjectId(mailboxes[mailboxPos++]);
getMailboxCounterCb(db, mailbox, false, (err, total) => {
Expand Down Expand Up @@ -316,24 +398,51 @@ function loadJournalStream(db, req, res, user, lastEventId, done) {
if (e.mailbox) {
mailboxes.add(e.mailbox.toString());
}
if (e.flagged) {
flaggedChanged = true;
}
break;
case 'FETCH':
if (e.mailbox && (e.unseen || e.unseenChange)) {
mailboxes.add(e.mailbox.toString());
}

if (e.flaggedChangedTo === true || e.flaggedChangedTo === false) {
flaggedChanged = true;
}

if (e.unseenChange && (e.flags ?? []).includes('\\Flagged')) {
flaggedChanged = true;
}
break;
}

try {
let data = formatJournalData(e);
res.write(data);
} catch (err) {
console.error(err);
console.error(e);
let writeEntryAndContinue = () => {
try {
let data = formatJournalData(e);
res.write(data);
} catch (err) {
console.error(err);
console.error(e);
}

processed++;
return setImmediate(processNext);
};

for (const keyword of [...(e.keywords ?? []), ...(e.addedKeywords ?? []), ...(e.removedKeywords ?? [])]) {
changedKeywords.add(keyword);
}

if (e.unseenChange) {
for (const flag of e.flags ?? []) {
if (flag && !consts.SYSTEM_FLAGS.has(flag)) {
changedKeywords.add(flag);
}
}
}

processed++;
return setImmediate(processNext);
writeEntryAndContinue();
});
};

Expand Down
6 changes: 5 additions & 1 deletion lib/consts.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ module.exports = {
DELETED_USER_MESSAGE_RETENTION: 14 * 24 * 3600 * 1000,

MAILBOX_COUNTER_TTL: 24 * 3600,
KEYWORD_COUNTER_TTL: 24 * 3600,

// how much plaintext to store in a full text indexed field
MAX_PLAINTEXT_INDEXED: 1 * 1024,
Expand Down Expand Up @@ -144,5 +145,8 @@ module.exports = {
MAX_MAILBOX_NAME_LENGTH: 512,

// Number of mailbox subpaths in a single mailbox path
MAX_SUB_MAILBOXES: 128
MAX_SUB_MAILBOXES: 128,

// System IMAP flags and internal pseudo-flags that are not custom keywords
SYSTEM_FLAGS: new Set(['\\Answered', '\\Flagged', '\\Draft', '\\Deleted', '\\Seen', '$Forwarded'])
};
17 changes: 17 additions & 0 deletions lib/filter-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,13 @@ class FilterHandler {
return;
}

if (key === 'keywords') {
let existingKeywords = filterActions.get('keywords') ?? [];
let newKeywords = filterData.action[key] ?? [];
filterActions.set('keywords', [...new Set([...existingKeywords, ...newKeywords])]);
return;
}

if (key === 'spam' && filterData.action[key] === null) {
return;
}
Expand Down Expand Up @@ -646,6 +653,16 @@ class FilterHandler {
filterResults.push({ flagged: true });
}
break;
case 'keywords':
if (Array.isArray(value) && value.length) {
for (let keyword of value) {
if (!flags.includes(keyword)) {
flags.push(keyword);
}
}
filterResults.push({ keywords: value });
}
break;
case 'mailbox':
if (value) {
// positive value is spam
Expand Down
2 changes: 2 additions & 0 deletions lib/handlers/on-copy.js
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,8 @@ async function copyHandler(server, messageHandler, connection, mailbox, update,
uid: messageData.uid,
message: messageData._id,
unseen: messageData.unseen,
flagged: messageData.flagged,
keywords: tools.extractKeywords(messageData.flags),
idate: messageData.idate,
thread: messageData.thread
};
Expand Down
20 changes: 18 additions & 2 deletions lib/handlers/on-store.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const imapTools = require('../../imap-core/lib/imap-tools');
const db = require('../db');
const tools = require('../tools');
const consts = require('../consts');
const extractKeywords = tools.extractKeywords;

// STORE / UID STORE, updates flags for selected UIDs
module.exports = server => (mailbox, update, session, callback) => {
Expand Down Expand Up @@ -151,6 +152,8 @@ module.exports = server => (mailbox, update, session, callback) => {
let flagsupdate = false; // query object for updates

let updated = false;
const oldKeywords = new Set(extractKeywords(message.flags));
const messageWasFlagged = message.flags.includes('\\Flagged');
let existingFlags = message.flags.map(flag => flag.toLowerCase().trim());
switch (update.action) {
case 'set':
Expand Down Expand Up @@ -345,16 +348,29 @@ module.exports = server => (mailbox, update, session, callback) => {
}
});

notifyEntries.push({
const currentKeywords = extractKeywords(message.flags);
const addedKeywords = currentKeywords.filter(keyword => !oldKeywords.has(keyword));
const removedKeywords = [...oldKeywords].filter(keyword => !currentKeywords.includes(keyword));
const messageIsFlagged = message.flags.includes('\\Flagged');

const notifyEntry = {
command: 'FETCH',
ignore: session.id,
uid: message.uid,
flags: message.flags,
addedKeywords,
removedKeywords,
thread: message.thread,
message: message._id,
modseq,
unseenChange
});
};

if (messageWasFlagged !== messageIsFlagged) {
notifyEntry.flaggedChangedTo = messageIsFlagged;
}

notifyEntries.push(notifyEntry);

if (updateEntries.length >= consts.BULK_BATCH_SIZE) {
return db.database.collection('messages').bulkWrite(
Expand Down
Loading