feat: De-dup activities from different polling sources

Previously CM would process the same activity multiple times if it was ingested from two different polling sources (modqueue and unmoderated/newSub). Introduce queue control flow to ensure activity is de-duped or refreshed before processing if this scenario occurs.

* Use a queue (firehose) to bottleneck all activities from different sources before pushing to worker queues
* Keep track of items currently ingested but not completely processed and use firehose to de-dupe queued items (flag to refresh) or re-queue if currently processing (and flag to refresh)
This commit is contained in:
FoxxMD
2021-09-17 11:50:49 -04:00
parent 1ff59ad6e8
commit 01755eada5
2 changed files with 93 additions and 4 deletions

View File

@@ -5,7 +5,7 @@ import {CommentCheck} from "../Check/CommentCheck";
import {
cacheStats,
createRetryHandler,
determineNewResults, formatNumber,
determineNewResults, findLastIndex, formatNumber,
mergeArr, parseFromJsonOrYamlToObject, pollingInfo, resultsSummary, sleep, totalFromMapStats, triggeredIndicator,
} from "../util";
import {Poll} from "snoostorm";
@@ -49,6 +49,7 @@ export interface runCheckOptions {
checkNames?: string[],
delayUntil?: number,
dryRun?: boolean,
refresh?: boolean,
}
export interface CheckTask {
@@ -97,6 +98,12 @@ export interface ManagerStats {
},
}
interface QueuedIdentifier {
id: string,
shouldRefresh: boolean
state: 'queued' | 'processing'
}
export class Manager {
subreddit: Subreddit;
client: Snoowrap;
@@ -120,6 +127,15 @@ export class Manager {
globalDryRun?: boolean;
emitter: EventEmitter = new EventEmitter();
queue: QueueObject<CheckTask>;
// firehose is used to ensure all activities from different polling streams are unique
// that is -- if the same activities is in both modqueue and unmoderated we don't want to process the activity twice or use stale data
//
// so all activities get queued to firehose, it keeps track of items by id (using queuedItemsMeta)
// and ensures that if any activities are ingested while they are ALSO currently queued or working then they are properly handled by either
// 1) if queued, do not re-queue but instead tell worker to refresh before processing
// 2) if currently processing then re-queue but also refresh before processing
firehose: QueueObject<CheckTask>;
queuedItemsMeta: QueuedIdentifier[] = [];
globalMaxWorkers: number;
subMaxWorkers?: number;
@@ -250,6 +266,7 @@ export class Manager {
this.queue = this.generateQueue(this.getMaxWorkers(this.globalMaxWorkers));
this.queue.pause();
this.firehose = this.generateFirehose();
this.eventsSampleInterval = setInterval((function(self) {
return function() {
@@ -310,6 +327,32 @@ export class Manager {
return maxWorkers;
}
protected generateFirehose() {
return queue(async (task: CheckTask, cb) => {
// items in queuedItemsMeta will be processing FIFO so earlier elements (by index) are older
//
// if we insert the same item again because it is currently being processed AND THEN we get the item AGAIN we only want to update the newest meta
// so search the array backwards to get the neweset only
const queuedItemIndex = findLastIndex(this.queuedItemsMeta, x => x.id === task.activity.id);
if(queuedItemIndex !== -1) {
const itemMeta = this.queuedItemsMeta[queuedItemIndex];
let msg = `Item ${itemMeta.id} is already ${itemMeta.state}.`;
if(itemMeta.state === 'queued') {
this.logger.debug(`${msg} Flagging to refresh data before processing.`);
this.queuedItemsMeta.splice(queuedItemIndex, 1, {...itemMeta, shouldRefresh: true});
} else {
this.logger.debug(`${msg} Re-queuing item but will also refresh data before processing.`);
this.queuedItemsMeta.push({id: task.activity.id, shouldRefresh: true, state: 'queued'});
this.queue.push(task);
}
} else {
this.queuedItemsMeta.push({id: task.activity.id, shouldRefresh: false, state: 'queued'});
this.queue.push(task);
}
}
, 1);
}
protected generateQueue(maxWorkers: number) {
if (maxWorkers > 1) {
this.logger.warn(`Setting max queue workers above 1 (specified: ${maxWorkers}) may have detrimental effects to log readability and api usage. Consult the documentation before using this advanced/experimental feature.`);
@@ -320,7 +363,16 @@ export class Manager {
this.logger.debug(`SOFT API LIMIT MODE: Delaying Event run by ${this.delayBy} seconds`);
await sleep(this.delayBy * 1000);
}
await this.runChecks(task.checkType, task.activity, task.options);
const queuedItemIndex = this.queuedItemsMeta.findIndex(x => x.id === task.activity.id);
try {
const itemMeta = this.queuedItemsMeta[queuedItemIndex];
this.queuedItemsMeta.splice(queuedItemIndex, 1, {...itemMeta, state: 'processing'});
await this.runChecks(task.checkType, task.activity, {...task.options, refresh: itemMeta.shouldRefresh});
} finally {
// always remove item meta regardless of success or failure since we are done with it meow
this.queuedItemsMeta.splice(queuedItemIndex, 1);
}
}
, maxWorkers);
q.error((err, task) => {
@@ -516,8 +568,11 @@ export class Manager {
checkNames = [],
delayUntil,
dryRun,
refresh = false,
} = options || {};
let wasRefreshed = false;
if (delayUntil !== undefined) {
const created = dayjs.unix(item.created_utc);
const diff = dayjs().diff(created, 's');
@@ -526,8 +581,16 @@ export class Manager {
await sleep(delayUntil - diff);
// @ts-ignore
item = await activity.refresh();
wasRefreshed = true;
}
}
// refresh signal from firehose if activity was ingested multiple times before processing or re-queued while processing
// want to make sure we have the most recent data
if(!wasRefreshed && refresh === true) {
this.logger.verbose('Refreshed data (probably due to signal from firehose)');
// @ts-ignore
item = await activity.refresh();
}
const startingApiLimit = this.client.ratelimitRemaining;
@@ -746,7 +809,7 @@ export class Manager {
checkType = 'Comment';
}
if (checkType !== undefined) {
this.queue.push({checkType, activity: item, options: {delayUntil}})
this.firehose.push({checkType, activity: item, options: {delayUntil}})
}
};
@@ -779,14 +842,19 @@ export class Manager {
} else if (!this.validConfigLoaded) {
this.logger.warn('Cannot start activity processing queue while manager has an invalid configuration');
} else {
if(this.queueState.state === STOPPED) {
// extra precaution to make sure queue meta is cleared before starting queue
this.queuedItemsMeta = [];
}
this.queue.resume();
this.firehose.resume();
this.logger.info(`Activity processing queue started RUNNING with ${this.queue.length()} queued activities`);
this.queueState = {
state: RUNNING,
causedBy
}
if(!suppressNotification) {
this.notificationManager.handle('runStateChanged', 'Queue Started', reason, causedBy)
this.notificationManager.handle('runStateChanged', 'Queue Started', reason, causedBy);
}
}
}
@@ -848,7 +916,9 @@ export class Manager {
this.logger.verbose(`Activity processing queue is stopping...waiting for ${this.queue.running()} activities to finish processing`);
}
this.logger.info(`Activity processing queue stopped by ${causedBy} and ${this.queue.length()} queued activities cleared (waited ${dayjs().diff(pauseWaitStart, 's')} seconds while activity processing finished)`);
this.firehose.kill();
this.queue.kill();
this.queuedItemsMeta = [];
}
this.queueState = {

View File

@@ -1103,3 +1103,22 @@ export const objectToStringSummary = (obj: object): string => {
}
return parts.join(' | ');
}
/**
* Returns the index of the last element in the array where predicate is true, and -1
* otherwise.
* @param array The source array to search in
* @param predicate find calls predicate once for each element of the array, in descending
* order, until it finds one where predicate returns true. If such an element is found,
* findLastIndex immediately returns that element index. Otherwise, findLastIndex returns -1.
*
* @see https://stackoverflow.com/a/53187807/1469797
*/
export function findLastIndex<T>(array: Array<T>, predicate: (value: T, index: number, obj: T[]) => boolean): number {
let l = array.length;
while (l--) {
if (predicate(array[l], l, array))
return l;
}
return -1;
}