Implement api nanny mode to help with heavy api usage

This commit is contained in:
FoxxMD
2021-07-20 20:15:15 -04:00
parent 4642f67104
commit 9316019b01
6 changed files with 268 additions and 27 deletions

View File

@@ -3,9 +3,9 @@ import {Manager} from "./Subreddit/Manager";
import winston, {Logger} from "winston";
import {
argParseInt,
createRetryHandler,
createRetryHandler, formatNumber,
labelledFormat, logLevels,
parseBool,
parseBool, parseDuration,
parseFromJsonOrYamlToObject,
parseSubredditName,
sleep
@@ -15,11 +15,12 @@ import EventEmitter from "events";
import CacheManager from './Subreddit/SubredditResources';
import dayjs, {Dayjs} from "dayjs";
import LoggedError from "./Utils/LoggedError";
import ProxiedSnoowrap from "./Utils/ProxiedSnoowrap";
import {ProxiedSnoowrap, RequestTrackingSnoowrap} from "./Utils/SnoowrapClients";
import {ModQueueStream, UnmoderatedStream} from "./Subreddit/Streams";
import {getDefaultLogger} from "./Utils/loggerFactory";
import {RUNNING, STOPPED, SYSTEM, USER} from "./Common/interfaces";
import {DurationString, PAUSED, RUNNING, STOPPED, SYSTEM, USER} from "./Common/interfaces";
import {sharedModqueue} from "./Utils/CommandConfig";
import { Duration } from "dayjs/plugin/duration";
const {transports} = winston;
@@ -44,10 +45,20 @@ export class App {
nextHeartbeat?: Dayjs;
heartBeating: boolean = false;
apiLimitWarning: number;
softLimit: number | string = 250;
hardLimit: number | string = 50;
nannyMode?: 'soft' | 'hard';
nextExpiration!: Dayjs;
botName?: string;
startedAt: Dayjs = dayjs();
sharedModqueue: boolean = false;
apiSample: number[] = [];
interval: any;
apiRollingAvg: number = 0;
apiEstDepletion?: Duration;
depletedInSecs: number = 0;
constructor(options: any = {}) {
const {
subreddits = [],
@@ -223,7 +234,7 @@ export class App {
while (true) {
this.nextHeartbeat = dayjs().add(this.heartbeatInterval, 'second');
await sleep(this.heartbeatInterval * 1000);
const heartbeat = `HEARTBEAT -- Reddit API Rate Limit remaining: ${this.client.ratelimitRemaining}`
const heartbeat = `HEARTBEAT -- API Remaining: ${this.client.ratelimitRemaining} | Usage Rolling Avg: ${this.apiRollingAvg}/s | Est Depletion: ${this.apiEstDepletion === undefined ? 'N/A' : this.apiEstDepletion.humanize()} (${formatNumber(this.depletedInSecs, {toFixed: 0})} seconds)`
if (this.apiLimitWarning >= this.client.ratelimitRemaining) {
this.logger.warn(heartbeat);
} else {
@@ -282,6 +293,28 @@ export class App {
}
async runManagers() {
// this.apiSampleInterval = setInterval((function(self) {
// return function() {
// const rollingSample = self.apiSample.slice(0, 7)
// rollingSample.unshift(self.client.ratelimitRemaining);
// self.apiSample = rollingSample;
// const diff = self.apiSample.reduceRight((acc: number[], curr, index) => {
// if(self.apiSample[index + 1] !== undefined) {
// const d = Math.abs(curr - self.apiSample[index + 1]);
// if(d === 0) {
// return [...acc, 0];
// }
// return [...acc, d/10];
// }
// return acc;
// }, []);
// self.apiRollingAvg = diff.reduce((acc, curr) => acc + curr,0) / diff.length; // api requests per second
// const depletedIn = self.client.ratelimitRemaining / self.apiRollingAvg; // number of seconds until current remaining limit is 0
// self.apiEstDepletion = dayjs.duration({seconds: depletedIn});
// self.logger.info(`API Usage Rolling Avg: ${self.apiRollingAvg}/s | Est Depletion: ${self.apiEstDepletion.humanize()} (${depletedIn} seconds)`);
// }
// })(this), 10000);
if(this.subManagers.every(x => !x.validConfigLoaded)) {
this.logger.warn('All managers have invalid configs!');
}
@@ -296,8 +329,126 @@ export class App {
if (this.heartbeatInterval !== 0 && !this.heartBeating) {
this.heartbeat();
}
this.runApiNanny();
const emitter = new EventEmitter();
await pEvent(emitter, 'end');
}
async runApiNanny() {
while(true) {
await sleep(10000);
this.nextExpiration = dayjs(this.client.ratelimitExpiration);
const nowish = dayjs().add(10, 'second');
if(nowish.isAfter(this.nextExpiration)) {
// it's possible no api calls are being made because of a hard limit
// need to make an api call to update this
// @ts-ignore
await this.client.getMe();
this.nextExpiration = dayjs(this.client.ratelimitExpiration);
}
const rollingSample = this.apiSample.slice(0, 7)
rollingSample.unshift(this.client.ratelimitRemaining);
this.apiSample = rollingSample;
const diff = this.apiSample.reduceRight((acc: number[], curr, index) => {
if(this.apiSample[index + 1] !== undefined) {
const d = Math.abs(curr - this.apiSample[index + 1]);
if(d === 0) {
return [...acc, 0];
}
return [...acc, d/10];
}
return acc;
}, []);
this.apiRollingAvg = diff.reduce((acc, curr) => acc + curr,0) / diff.length; // api requests per second
this.depletedInSecs = this.client.ratelimitRemaining / this.apiRollingAvg; // number of seconds until current remaining limit is 0
this.apiEstDepletion = dayjs.duration({seconds: this.depletedInSecs});
this.logger.info(`API Usage Rolling Avg: ${formatNumber(this.apiRollingAvg)}/s | Est Depletion: ${this.apiEstDepletion.humanize()} (${formatNumber(this.depletedInSecs, {toFixed: 0})} seconds)`);
let hardLimitHit = false;
if(typeof this.hardLimit === 'string') {
const hardDur = parseDuration(this.hardLimit);
hardLimitHit = hardDur.asSeconds() > this.apiEstDepletion.asSeconds();
} else {
hardLimitHit = this.hardLimit > this.client.ratelimitRemaining;
}
if(hardLimitHit) {
if(this.nannyMode === 'hard') {
continue;
}
this.logger.info(`Detected HARD LIMIT of ${this.hardLimit} remaining`, {leaf: 'Api Nanny'});
this.logger.info(`API Remaining: ${this.client.ratelimitRemaining} | Usage Rolling Avg: ${this.apiRollingAvg}/s | Est Depletion: ${this.apiEstDepletion.humanize()} (${formatNumber(this.depletedInSecs, {toFixed: 0})} seconds)`, {leaf: 'Api Nanny'});
this.logger.info(`All subreddit event polling has been paused`, {leaf: 'Api Nanny'});
for(const m of this.subManagers) {
m.pauseEvents('system');
}
this.nannyMode = 'hard';
continue;
}
let softLimitHit = false;
if(typeof this.softLimit === 'string') {
const softDur = parseDuration(this.softLimit);
softLimitHit = softDur.asSeconds() > this.apiEstDepletion.asSeconds();
} else {
softLimitHit = this.softLimit > this.client.ratelimitRemaining;
}
if(softLimitHit) {
if(this.nannyMode === 'soft') {
continue;
}
this.logger.info(`Detected SOFT LIMIT of ${this.softLimit} remaining`, {leaf: 'Api Nanny'});
this.logger.info(`API Remaining: ${this.client.ratelimitRemaining} | Usage Rolling Avg: ${formatNumber(this.apiRollingAvg)}/s | Est Depletion: ${this.apiEstDepletion.humanize()} (${formatNumber(this.depletedInSecs, {toFixed: 0})} seconds)`, {leaf: 'Api Nanny'});
this.logger.info('Trying to detect heavy usage subreddits...', {leaf: 'Api Nanny'});
let threshold = 0.5;
let offenders = this.subManagers.filter(x => {
const combinedPerSec = x.eventsRollingAvg + x.rulesUniqueRollingAvg;
return combinedPerSec > threshold;
});
if(offenders.length === 0) {
threshold = 0.25;
// reduce threshold
offenders = this.subManagers.filter(x => {
const combinedPerSec = x.eventsRollingAvg + x.rulesUniqueRollingAvg;
return combinedPerSec > threshold;
});
}
if(offenders.length > 0) {
this.logger.info(`Slowing subreddits using >- ${threshold}req/s:`, {leaf: 'Api Nanny'});
for(const m of offenders) {
m.delayBy = 1.5;
m.logger.info(`SLOW MODE (Currently ~${formatNumber(m.eventsRollingAvg + m.rulesUniqueRollingAvg)}req/sec)`, {leaf: 'Api Nanny'});
}
} else {
this.logger.info(`Couldn't detect specific offenders, slowing all...`, {leaf: 'Api Nanny'});
for(const m of this.subManagers) {
m.delayBy = 1.5;
m.logger.info(`SLOW MODE (Currently ~${formatNumber(m.eventsRollingAvg + m.rulesUniqueRollingAvg)}req/sec)`, {leaf: 'Api Nanny'});
}
}
this.nannyMode = 'soft';
continue;
}
if(this.nannyMode !== undefined) {
this.logger.info('Turning off due to better conditions...', {leaf: 'Api Nanny'});
for(const m of this.subManagers) {
m.delayBy = undefined;
if(m.queueState.state === PAUSED && m.queueState.causedBy === SYSTEM) {
m.startQueue();
}
if(m.eventsState.state === PAUSED && m.eventsState.causedBy === SYSTEM) {
await m.startEvents();
}
}
this.nannyMode = undefined;
}
}
}
}

View File

@@ -17,7 +17,7 @@ import {
boolToString,
COMMENT_URL_ID,
filterLogBySubreddit,
formatLogLineToHtml,
formatLogLineToHtml, formatNumber,
isLogLineMinLevel,
LogEntry,
parseLinkIdentifier,
@@ -293,7 +293,8 @@ const rcbServer = async function (options: any = {}) {
wikiLastCheck: m.lastWikiCheck.local().format('MMMM D, YYYY h:mm A Z'),
stats: m.getStats(),
startedAt: 'Not Started',
startedAtHuman: 'Not Started'
startedAtHuman: 'Not Started',
delayBy: m.delayBy === undefined ? 'No' : `Delayed by ${m.delayBy} sec`,
};
// TODO replace indicator data with js on client page
let indicator;
@@ -349,6 +350,8 @@ const rcbServer = async function (options: any = {}) {
dryRun: boolToString(bot.dryRun === true),
logs: logs.get('all'),
checks: checks,
softLimit: bot.softLimit,
hardLimit: bot.hardLimit,
stats: rest,
};
if(allManagerData.logs === undefined) {
@@ -585,8 +588,11 @@ const opStats = (bot: App) => {
nextHeartbeat,
nextHeartbeatHuman,
apiLimit: bot.client.ratelimitRemaining,
apiAvg: formatNumber(bot.apiRollingAvg),
nannyMode: bot.nannyMode || 'Off',
apiDepletion: bot.apiEstDepletion === undefined ? 'Not Calculated' : bot.apiEstDepletion.humanize(),
limitReset,
limitResetHuman: `in ${dayjs.duration(limitReset.diff(dayjs())).humanize()}`
limitResetHuman: `in ${dayjs.duration(limitReset.diff(dayjs())).humanize()}`,
}
}

View File

@@ -134,6 +134,8 @@
</span>
<span><%= `${data.runningActivities} Processing / ${data.queuedActivities} Queued` %></span>
</span>
<label>Slow Mode</label>
<span><%= data.delayBy %></span>
<% } %>
<% if (data.name === 'All') { %>
<label>Uptime</label>
@@ -159,15 +161,6 @@
</span>
<span id="nextHeartbeatHuman"><%= data.nextHeartbeatHuman %></span>
</span>
<label>Api Limit</label>
<span><span id="apiLimit"><%= data.apiLimit %></span>/600</span>
<label>Api Limit Reset</label>
<span class="has-tooltip">
<span class='tooltip rounded shadow-lg p-1 bg-gray-100 text-black -mt-2'>
<span id="limitReset"><%= data.limitReset %></span>
</span>
<span id="limitResetHuman"><%= data.limitResetHuman %></span>
</span>
<% } %>
</div>
<% if (data.name !== 'All') { %>
@@ -179,6 +172,34 @@
<% } %>
</div>
</div>
<% if (data.name === 'All') { %>
<div class="bg-white shadow-md rounded my-3 dark:bg-gray-500 dark:text-white">
<div class="space-x-4 px-4 p-2 leading-2 font-semibold bg-gray-300 dark:bg-gray-700 dark:text-white">
<h4>API</h4>
</div>
<div class="p-4">
<div class="stats">
<label>Soft Limit</label>
<span>< <span id="softLimit"><%= data.softLimit %></span></span>
<label>Hard Limit</label>
<span>< <span id="softLimit"><%= data.hardLimit %></span></span>
<label>Api Nanny</label>
<span><b><span id="nannyMode"><%= data.nannyMode %></span></b></span>
<label>Api Usage</label>
<span><span id="apiLimit"><%= data.apiLimit %></span>/600 (~<span id="apiAvg"><%= data.apiAvg %></span>req/s)</span>
<label>Depleted</label>
<span>in ~<span id="apiDepletion"><%= data.apiDepletion %></span></span>
<label>Limit Reset</label>
<span class="has-tooltip">
<span class='tooltip rounded shadow-lg p-1 bg-gray-100 text-black -mt-2'>
<span id="limitReset"><%= data.limitReset %></span>
</span>
<span id="limitResetHuman"><%= data.limitResetHuman %></span>
</span>
</div>
</div>
</div>
<% } %>
<% if (data.name !== 'All') { %>
<div class="bg-white shadow-md rounded my-3 dark:bg-gray-500 dark:text-white">
<div class="space-x-4 px-4 p-2 leading-2 font-semibold bg-gray-300 dark:bg-gray-700 dark:text-white">

View File

@@ -4,7 +4,7 @@ import {SubmissionCheck} from "../Check/SubmissionCheck";
import {CommentCheck} from "../Check/CommentCheck";
import {
createRetryHandler,
determineNewResults,
determineNewResults, formatNumber,
mergeArr, parseFromJsonOrYamlToObject, pollingInfo, sleep, totalFromMapStats,
} from "../util";
import {Poll} from "snoostorm";
@@ -97,8 +97,14 @@ export class Manager {
causedBy: SYSTEM
}
// use by api nanny to slow event consumption
delayBy?: number;
eventsCheckedTotal: number = 0;
eventsCheckedSinceStartTotal: number = 0;
eventsSample: number[] = [];
eventsSampleInterval: any;
eventsRollingAvg: number = 0;
checksRunTotal: number = 0;
checksRunSinceStartTotal: number = 0;
checksTriggered: Map<string, number> = new Map();
@@ -109,6 +115,9 @@ export class Manager {
rulesCachedSinceStartTotal: number = 0;
rulesTriggeredTotal: number = 0;
rulesTriggeredSinceStartTotal: number = 0;
rulesUniqueSample: number[] = [];
rulesUniqueSampleInterval: any;
rulesUniqueRollingAvg: number = 0;
actionsRun: Map<string, number> = new Map();
actionsRunSinceStart: Map<string, number> = new Map();
@@ -116,6 +125,7 @@ export class Manager {
return {
eventsCheckedTotal: this.eventsCheckedTotal,
eventsCheckedSinceStartTotal: this.eventsCheckedSinceStartTotal,
eventsAvg: formatNumber(this.eventsRollingAvg),
checksRunTotal: this.checksRunTotal,
checksRunSinceStartTotal: this.checksRunSinceStartTotal,
checksTriggered: this.checksTriggered,
@@ -128,6 +138,7 @@ export class Manager {
rulesCachedSinceStartTotal: this.rulesCachedSinceStartTotal,
rulesTriggeredTotal: this.rulesTriggeredTotal,
rulesTriggeredSinceStartTotal: this.rulesTriggeredSinceStartTotal,
rulesAvg: formatNumber(this.rulesUniqueRollingAvg),
actionsRun: this.actionsRun,
actionsRunTotal: totalFromMapStats(this.actionsRun),
actionsRunSinceStart: this.actionsRunSinceStart,
@@ -164,6 +175,10 @@ export class Manager {
this.client = client;
this.queue = queue(async (task: CheckTask, cb) => {
if(this.delayBy !== undefined) {
this.logger.debug(`SOFT API LIMIT MODE: Delaying Event run by ${this.delayBy} seconds`);
await sleep(this.delayBy * 1000);
}
await this.runChecks(task.checkType, task.activity, task.options);
}
// TODO allow concurrency??
@@ -176,6 +191,46 @@ export class Manager {
this.logger.debug('All queued activities have been processed.');
});
this.queue.pause();
this.eventsSampleInterval = setInterval((function(self) {
return function() {
const rollingSample = self.eventsSample.slice(0, 7)
rollingSample.unshift(self.eventsCheckedTotal)
self.eventsSample = rollingSample;
const diff = self.eventsSample.reduceRight((acc: number[], curr, index) => {
if(self.eventsSample[index + 1] !== undefined) {
const d = curr - self.eventsSample[index + 1];
if(d === 0) {
return [...acc, 0];
}
return [...acc, d/10];
}
return acc;
}, []);
self.eventsRollingAvg = diff.reduce((acc, curr) => acc + curr,0) / diff.length;
//self.logger.debug(`Event Rolling Avg: ${formatNumber(self.eventsRollingAvg)}/s`);
}
})(this), 10000);
this.rulesUniqueSampleInterval = setInterval((function(self) {
return function() {
const rollingSample = self.rulesUniqueSample.slice(0, 7)
rollingSample.unshift(self.rulesRunTotal - self.rulesCachedTotal);
self.rulesUniqueSample = rollingSample;
const diff = self.rulesUniqueSample.reduceRight((acc: number[], curr, index) => {
if(self.rulesUniqueSample[index + 1] !== undefined) {
const d = curr - self.rulesUniqueSample[index + 1];
if(d === 0) {
return [...acc, 0];
}
return [...acc, d/10];
}
return acc;
}, []);
self.rulesUniqueRollingAvg = diff.reduce((acc, curr) => acc + curr,0) / diff.length;
//self.logger.debug(`Unique Rules Run Rolling Avg: ${formatNumber(self.rulesUniqueRollingAvg)}/s`);
}
})(this), 10000);
}
protected parseConfigurationFromObject(configObj: object) {
@@ -429,8 +484,8 @@ export class Manager {
this.actionsRunSinceStart.set(name, (this.actionsRunSinceStart.get(name) || 0) + 1)
}
this.logger.verbose(`Run Stats: Checks ${checksRun} | Rules => Total: ${totalRulesRun} Unique: ${allRuleResults.length} Cached: ${totalRulesRun - allRuleResults.length} | Actions ${actionsRun}`);
this.logger.verbose(`Reddit API Stats: Initial Limit ${startingApiLimit} | Current Limit ${this.client.ratelimitRemaining} | Est. Calls Made ${startingApiLimit - this.client.ratelimitRemaining}`);
this.logger.verbose(`Run Stats: Checks ${checksRun} | Rules => Total: ${totalRulesRun} Unique: ${allRuleResults.length} Cached: ${totalRulesRun - allRuleResults.length} Rolling Avg: ~${formatNumber(this.rulesUniqueRollingAvg)}/s | Actions ${actionsRun}`);
this.logger.verbose(`Reddit API Stats: Initial ${startingApiLimit} | Current ${this.client.ratelimitRemaining} | Used ~${startingApiLimit - this.client.ratelimitRemaining} | Events ~${formatNumber(this.eventsRollingAvg)}/s`);
this.currentLabels = [];
} catch (err) {
this.logger.error('Error occurred while cleaning up Activity check and generating stats', err);
@@ -692,7 +747,7 @@ export class Manager {
} else {
this.eventsState = {
state: PAUSED,
causedBy: USER
causedBy
};
for(const s of this.streams) {
s.end();

View File

@@ -12,7 +12,20 @@ import Snoowrap from "snoowrap";
// }
// }
class ProxiedSnoowrap extends Snoowrap {
export class RequestTrackingSnoowrap extends Snoowrap {
requestCount: number = 0;
oauthRequest(...args: any) {
// send all requests through a proxy
if(args[1] === undefined || args[1] === 1) {
this.requestCount++;
}
// @ts-ignore
return super.oauthRequest(...args);
}
}
export class ProxiedSnoowrap extends RequestTrackingSnoowrap {
proxyEndpoint: string;
constructor(args: any) {
@@ -29,6 +42,3 @@ class ProxiedSnoowrap extends Snoowrap {
}))
}
}
export default ProxiedSnoowrap;

View File

@@ -42,7 +42,6 @@ const program = new Command();
(async function () {
try {
debugger;
preRunCmd.parse(process.argv);
const { operatorConfig = process.env.OPERATOR_CONFIG } = preRunCmd.opts();
try {
@@ -199,7 +198,6 @@ const program = new Command();
await program.parseAsync();
} catch (err) {
debugger;
if(!err.logged && !(err instanceof LoggedError)) {
const logger = winston.loggers.get('default');
if (err.name === 'StatusCodeError' && err.response !== undefined) {