From 9316019b01ac54f87f73bbcec1c6bfaaa1ea45f3 Mon Sep 17 00:00:00 2001 From: FoxxMD Date: Tue, 20 Jul 2021 20:15:15 -0400 Subject: [PATCH] Implement api nanny mode to help with heavy api usage --- src/App.ts | 161 +++++++++++++++++- src/Server/server.ts | 12 +- src/Server/views/status.ejs | 39 ++++- src/Subreddit/Manager.ts | 63 ++++++- ...{ProxiedSnoowrap.ts => SnoowrapClients.ts} | 18 +- src/index.ts | 2 - 6 files changed, 268 insertions(+), 27 deletions(-) rename src/Utils/{ProxiedSnoowrap.ts => SnoowrapClients.ts} (66%) diff --git a/src/App.ts b/src/App.ts index cb8a943..cb46403 100644 --- a/src/App.ts +++ b/src/App.ts @@ -3,9 +3,9 @@ import {Manager} from "./Subreddit/Manager"; import winston, {Logger} from "winston"; import { argParseInt, - createRetryHandler, + createRetryHandler, formatNumber, labelledFormat, logLevels, - parseBool, + parseBool, parseDuration, parseFromJsonOrYamlToObject, parseSubredditName, sleep @@ -15,11 +15,12 @@ import EventEmitter from "events"; import CacheManager from './Subreddit/SubredditResources'; import dayjs, {Dayjs} from "dayjs"; import LoggedError from "./Utils/LoggedError"; -import ProxiedSnoowrap from "./Utils/ProxiedSnoowrap"; +import {ProxiedSnoowrap, RequestTrackingSnoowrap} from "./Utils/SnoowrapClients"; import {ModQueueStream, UnmoderatedStream} from "./Subreddit/Streams"; import {getDefaultLogger} from "./Utils/loggerFactory"; -import {RUNNING, STOPPED, SYSTEM, USER} from "./Common/interfaces"; +import {DurationString, PAUSED, RUNNING, STOPPED, SYSTEM, USER} from "./Common/interfaces"; import {sharedModqueue} from "./Utils/CommandConfig"; +import { Duration } from "dayjs/plugin/duration"; const {transports} = winston; @@ -44,10 +45,20 @@ export class App { nextHeartbeat?: Dayjs; heartBeating: boolean = false; apiLimitWarning: number; + softLimit: number | string = 250; + hardLimit: number | string = 50; + nannyMode?: 'soft' | 'hard'; + nextExpiration!: Dayjs; botName?: string; startedAt: Dayjs = dayjs(); sharedModqueue: boolean = false; + apiSample: number[] = []; + interval: any; + apiRollingAvg: number = 0; + apiEstDepletion?: Duration; + depletedInSecs: number = 0; + constructor(options: any = {}) { const { subreddits = [], @@ -223,7 +234,7 @@ export class App { while (true) { this.nextHeartbeat = dayjs().add(this.heartbeatInterval, 'second'); await sleep(this.heartbeatInterval * 1000); - const heartbeat = `HEARTBEAT -- Reddit API Rate Limit remaining: ${this.client.ratelimitRemaining}` + const heartbeat = `HEARTBEAT -- API Remaining: ${this.client.ratelimitRemaining} | Usage Rolling Avg: ${this.apiRollingAvg}/s | Est Depletion: ${this.apiEstDepletion === undefined ? 'N/A' : this.apiEstDepletion.humanize()} (${formatNumber(this.depletedInSecs, {toFixed: 0})} seconds)` if (this.apiLimitWarning >= this.client.ratelimitRemaining) { this.logger.warn(heartbeat); } else { @@ -282,6 +293,28 @@ export class App { } async runManagers() { + // this.apiSampleInterval = setInterval((function(self) { + // return function() { + // const rollingSample = self.apiSample.slice(0, 7) + // rollingSample.unshift(self.client.ratelimitRemaining); + // self.apiSample = rollingSample; + // const diff = self.apiSample.reduceRight((acc: number[], curr, index) => { + // if(self.apiSample[index + 1] !== undefined) { + // const d = Math.abs(curr - self.apiSample[index + 1]); + // if(d === 0) { + // return [...acc, 0]; + // } + // return [...acc, d/10]; + // } + // return acc; + // }, []); + // self.apiRollingAvg = diff.reduce((acc, curr) => acc + curr,0) / diff.length; // api requests per second + // const depletedIn = self.client.ratelimitRemaining / self.apiRollingAvg; // number of seconds until current remaining limit is 0 + // self.apiEstDepletion = dayjs.duration({seconds: depletedIn}); + // self.logger.info(`API Usage Rolling Avg: ${self.apiRollingAvg}/s | Est Depletion: ${self.apiEstDepletion.humanize()} (${depletedIn} seconds)`); + // } + // })(this), 10000); + if(this.subManagers.every(x => !x.validConfigLoaded)) { this.logger.warn('All managers have invalid configs!'); } @@ -296,8 +329,126 @@ export class App { if (this.heartbeatInterval !== 0 && !this.heartBeating) { this.heartbeat(); } + this.runApiNanny(); const emitter = new EventEmitter(); await pEvent(emitter, 'end'); } + + async runApiNanny() { + while(true) { + await sleep(10000); + this.nextExpiration = dayjs(this.client.ratelimitExpiration); + const nowish = dayjs().add(10, 'second'); + if(nowish.isAfter(this.nextExpiration)) { + // it's possible no api calls are being made because of a hard limit + // need to make an api call to update this + // @ts-ignore + await this.client.getMe(); + this.nextExpiration = dayjs(this.client.ratelimitExpiration); + } + const rollingSample = this.apiSample.slice(0, 7) + rollingSample.unshift(this.client.ratelimitRemaining); + this.apiSample = rollingSample; + const diff = this.apiSample.reduceRight((acc: number[], curr, index) => { + if(this.apiSample[index + 1] !== undefined) { + const d = Math.abs(curr - this.apiSample[index + 1]); + if(d === 0) { + return [...acc, 0]; + } + return [...acc, d/10]; + } + return acc; + }, []); + this.apiRollingAvg = diff.reduce((acc, curr) => acc + curr,0) / diff.length; // api requests per second + this.depletedInSecs = this.client.ratelimitRemaining / this.apiRollingAvg; // number of seconds until current remaining limit is 0 + this.apiEstDepletion = dayjs.duration({seconds: this.depletedInSecs}); + this.logger.info(`API Usage Rolling Avg: ${formatNumber(this.apiRollingAvg)}/s | Est Depletion: ${this.apiEstDepletion.humanize()} (${formatNumber(this.depletedInSecs, {toFixed: 0})} seconds)`); + + + let hardLimitHit = false; + if(typeof this.hardLimit === 'string') { + const hardDur = parseDuration(this.hardLimit); + hardLimitHit = hardDur.asSeconds() > this.apiEstDepletion.asSeconds(); + } else { + hardLimitHit = this.hardLimit > this.client.ratelimitRemaining; + } + + if(hardLimitHit) { + if(this.nannyMode === 'hard') { + continue; + } + this.logger.info(`Detected HARD LIMIT of ${this.hardLimit} remaining`, {leaf: 'Api Nanny'}); + this.logger.info(`API Remaining: ${this.client.ratelimitRemaining} | Usage Rolling Avg: ${this.apiRollingAvg}/s | Est Depletion: ${this.apiEstDepletion.humanize()} (${formatNumber(this.depletedInSecs, {toFixed: 0})} seconds)`, {leaf: 'Api Nanny'}); + this.logger.info(`All subreddit event polling has been paused`, {leaf: 'Api Nanny'}); + + for(const m of this.subManagers) { + m.pauseEvents('system'); + } + + this.nannyMode = 'hard'; + continue; + } + + let softLimitHit = false; + if(typeof this.softLimit === 'string') { + const softDur = parseDuration(this.softLimit); + softLimitHit = softDur.asSeconds() > this.apiEstDepletion.asSeconds(); + } else { + softLimitHit = this.softLimit > this.client.ratelimitRemaining; + } + + if(softLimitHit) { + if(this.nannyMode === 'soft') { + continue; + } + this.logger.info(`Detected SOFT LIMIT of ${this.softLimit} remaining`, {leaf: 'Api Nanny'}); + this.logger.info(`API Remaining: ${this.client.ratelimitRemaining} | Usage Rolling Avg: ${formatNumber(this.apiRollingAvg)}/s | Est Depletion: ${this.apiEstDepletion.humanize()} (${formatNumber(this.depletedInSecs, {toFixed: 0})} seconds)`, {leaf: 'Api Nanny'}); + this.logger.info('Trying to detect heavy usage subreddits...', {leaf: 'Api Nanny'}); + let threshold = 0.5; + let offenders = this.subManagers.filter(x => { + const combinedPerSec = x.eventsRollingAvg + x.rulesUniqueRollingAvg; + return combinedPerSec > threshold; + }); + if(offenders.length === 0) { + threshold = 0.25; + // reduce threshold + offenders = this.subManagers.filter(x => { + const combinedPerSec = x.eventsRollingAvg + x.rulesUniqueRollingAvg; + return combinedPerSec > threshold; + }); + } + + if(offenders.length > 0) { + this.logger.info(`Slowing subreddits using >- ${threshold}req/s:`, {leaf: 'Api Nanny'}); + for(const m of offenders) { + m.delayBy = 1.5; + m.logger.info(`SLOW MODE (Currently ~${formatNumber(m.eventsRollingAvg + m.rulesUniqueRollingAvg)}req/sec)`, {leaf: 'Api Nanny'}); + } + } else { + this.logger.info(`Couldn't detect specific offenders, slowing all...`, {leaf: 'Api Nanny'}); + for(const m of this.subManagers) { + m.delayBy = 1.5; + m.logger.info(`SLOW MODE (Currently ~${formatNumber(m.eventsRollingAvg + m.rulesUniqueRollingAvg)}req/sec)`, {leaf: 'Api Nanny'}); + } + } + this.nannyMode = 'soft'; + continue; + } + + if(this.nannyMode !== undefined) { + this.logger.info('Turning off due to better conditions...', {leaf: 'Api Nanny'}); + for(const m of this.subManagers) { + m.delayBy = undefined; + if(m.queueState.state === PAUSED && m.queueState.causedBy === SYSTEM) { + m.startQueue(); + } + if(m.eventsState.state === PAUSED && m.eventsState.causedBy === SYSTEM) { + await m.startEvents(); + } + } + this.nannyMode = undefined; + } + } + } } diff --git a/src/Server/server.ts b/src/Server/server.ts index 69dbea1..a4e3d3b 100644 --- a/src/Server/server.ts +++ b/src/Server/server.ts @@ -17,7 +17,7 @@ import { boolToString, COMMENT_URL_ID, filterLogBySubreddit, - formatLogLineToHtml, + formatLogLineToHtml, formatNumber, isLogLineMinLevel, LogEntry, parseLinkIdentifier, @@ -293,7 +293,8 @@ const rcbServer = async function (options: any = {}) { wikiLastCheck: m.lastWikiCheck.local().format('MMMM D, YYYY h:mm A Z'), stats: m.getStats(), startedAt: 'Not Started', - startedAtHuman: 'Not Started' + startedAtHuman: 'Not Started', + delayBy: m.delayBy === undefined ? 'No' : `Delayed by ${m.delayBy} sec`, }; // TODO replace indicator data with js on client page let indicator; @@ -349,6 +350,8 @@ const rcbServer = async function (options: any = {}) { dryRun: boolToString(bot.dryRun === true), logs: logs.get('all'), checks: checks, + softLimit: bot.softLimit, + hardLimit: bot.hardLimit, stats: rest, }; if(allManagerData.logs === undefined) { @@ -585,8 +588,11 @@ const opStats = (bot: App) => { nextHeartbeat, nextHeartbeatHuman, apiLimit: bot.client.ratelimitRemaining, + apiAvg: formatNumber(bot.apiRollingAvg), + nannyMode: bot.nannyMode || 'Off', + apiDepletion: bot.apiEstDepletion === undefined ? 'Not Calculated' : bot.apiEstDepletion.humanize(), limitReset, - limitResetHuman: `in ${dayjs.duration(limitReset.diff(dayjs())).humanize()}` + limitResetHuman: `in ${dayjs.duration(limitReset.diff(dayjs())).humanize()}`, } } diff --git a/src/Server/views/status.ejs b/src/Server/views/status.ejs index 19dfe53..e29c218 100644 --- a/src/Server/views/status.ejs +++ b/src/Server/views/status.ejs @@ -134,6 +134,8 @@ <%= `${data.runningActivities} Processing / ${data.queuedActivities} Queued` %> + + <%= data.delayBy %> <% } %> <% if (data.name === 'All') { %> @@ -159,15 +161,6 @@ <%= data.nextHeartbeatHuman %> - - <%= data.apiLimit %>/600 - - - - <%= data.limitReset %> - - <%= data.limitResetHuman %> - <% } %> <% if (data.name !== 'All') { %> @@ -179,6 +172,34 @@ <% } %> + <% if (data.name === 'All') { %> +
+
+

API

+
+
+
+ + < <%= data.softLimit %> + + < <%= data.hardLimit %> + + <%= data.nannyMode %> + + <%= data.apiLimit %>/600 (~<%= data.apiAvg %>req/s) + + in ~<%= data.apiDepletion %> + + + + <%= data.limitReset %> + + <%= data.limitResetHuman %> + +
+
+
+ <% } %> <% if (data.name !== 'All') { %>
diff --git a/src/Subreddit/Manager.ts b/src/Subreddit/Manager.ts index c855f6c..7d7cf3c 100644 --- a/src/Subreddit/Manager.ts +++ b/src/Subreddit/Manager.ts @@ -4,7 +4,7 @@ import {SubmissionCheck} from "../Check/SubmissionCheck"; import {CommentCheck} from "../Check/CommentCheck"; import { createRetryHandler, - determineNewResults, + determineNewResults, formatNumber, mergeArr, parseFromJsonOrYamlToObject, pollingInfo, sleep, totalFromMapStats, } from "../util"; import {Poll} from "snoostorm"; @@ -97,8 +97,14 @@ export class Manager { causedBy: SYSTEM } + // use by api nanny to slow event consumption + delayBy?: number; + eventsCheckedTotal: number = 0; eventsCheckedSinceStartTotal: number = 0; + eventsSample: number[] = []; + eventsSampleInterval: any; + eventsRollingAvg: number = 0; checksRunTotal: number = 0; checksRunSinceStartTotal: number = 0; checksTriggered: Map = new Map(); @@ -109,6 +115,9 @@ export class Manager { rulesCachedSinceStartTotal: number = 0; rulesTriggeredTotal: number = 0; rulesTriggeredSinceStartTotal: number = 0; + rulesUniqueSample: number[] = []; + rulesUniqueSampleInterval: any; + rulesUniqueRollingAvg: number = 0; actionsRun: Map = new Map(); actionsRunSinceStart: Map = new Map(); @@ -116,6 +125,7 @@ export class Manager { return { eventsCheckedTotal: this.eventsCheckedTotal, eventsCheckedSinceStartTotal: this.eventsCheckedSinceStartTotal, + eventsAvg: formatNumber(this.eventsRollingAvg), checksRunTotal: this.checksRunTotal, checksRunSinceStartTotal: this.checksRunSinceStartTotal, checksTriggered: this.checksTriggered, @@ -128,6 +138,7 @@ export class Manager { rulesCachedSinceStartTotal: this.rulesCachedSinceStartTotal, rulesTriggeredTotal: this.rulesTriggeredTotal, rulesTriggeredSinceStartTotal: this.rulesTriggeredSinceStartTotal, + rulesAvg: formatNumber(this.rulesUniqueRollingAvg), actionsRun: this.actionsRun, actionsRunTotal: totalFromMapStats(this.actionsRun), actionsRunSinceStart: this.actionsRunSinceStart, @@ -164,6 +175,10 @@ export class Manager { this.client = client; this.queue = queue(async (task: CheckTask, cb) => { + if(this.delayBy !== undefined) { + this.logger.debug(`SOFT API LIMIT MODE: Delaying Event run by ${this.delayBy} seconds`); + await sleep(this.delayBy * 1000); + } await this.runChecks(task.checkType, task.activity, task.options); } // TODO allow concurrency?? @@ -176,6 +191,46 @@ export class Manager { this.logger.debug('All queued activities have been processed.'); }); this.queue.pause(); + + this.eventsSampleInterval = setInterval((function(self) { + return function() { + const rollingSample = self.eventsSample.slice(0, 7) + rollingSample.unshift(self.eventsCheckedTotal) + self.eventsSample = rollingSample; + const diff = self.eventsSample.reduceRight((acc: number[], curr, index) => { + if(self.eventsSample[index + 1] !== undefined) { + const d = curr - self.eventsSample[index + 1]; + if(d === 0) { + return [...acc, 0]; + } + return [...acc, d/10]; + } + return acc; + }, []); + self.eventsRollingAvg = diff.reduce((acc, curr) => acc + curr,0) / diff.length; + //self.logger.debug(`Event Rolling Avg: ${formatNumber(self.eventsRollingAvg)}/s`); + } + })(this), 10000); + + this.rulesUniqueSampleInterval = setInterval((function(self) { + return function() { + const rollingSample = self.rulesUniqueSample.slice(0, 7) + rollingSample.unshift(self.rulesRunTotal - self.rulesCachedTotal); + self.rulesUniqueSample = rollingSample; + const diff = self.rulesUniqueSample.reduceRight((acc: number[], curr, index) => { + if(self.rulesUniqueSample[index + 1] !== undefined) { + const d = curr - self.rulesUniqueSample[index + 1]; + if(d === 0) { + return [...acc, 0]; + } + return [...acc, d/10]; + } + return acc; + }, []); + self.rulesUniqueRollingAvg = diff.reduce((acc, curr) => acc + curr,0) / diff.length; + //self.logger.debug(`Unique Rules Run Rolling Avg: ${formatNumber(self.rulesUniqueRollingAvg)}/s`); + } + })(this), 10000); } protected parseConfigurationFromObject(configObj: object) { @@ -429,8 +484,8 @@ export class Manager { this.actionsRunSinceStart.set(name, (this.actionsRunSinceStart.get(name) || 0) + 1) } - this.logger.verbose(`Run Stats: Checks ${checksRun} | Rules => Total: ${totalRulesRun} Unique: ${allRuleResults.length} Cached: ${totalRulesRun - allRuleResults.length} | Actions ${actionsRun}`); - this.logger.verbose(`Reddit API Stats: Initial Limit ${startingApiLimit} | Current Limit ${this.client.ratelimitRemaining} | Est. Calls Made ${startingApiLimit - this.client.ratelimitRemaining}`); + this.logger.verbose(`Run Stats: Checks ${checksRun} | Rules => Total: ${totalRulesRun} Unique: ${allRuleResults.length} Cached: ${totalRulesRun - allRuleResults.length} Rolling Avg: ~${formatNumber(this.rulesUniqueRollingAvg)}/s | Actions ${actionsRun}`); + this.logger.verbose(`Reddit API Stats: Initial ${startingApiLimit} | Current ${this.client.ratelimitRemaining} | Used ~${startingApiLimit - this.client.ratelimitRemaining} | Events ~${formatNumber(this.eventsRollingAvg)}/s`); this.currentLabels = []; } catch (err) { this.logger.error('Error occurred while cleaning up Activity check and generating stats', err); @@ -692,7 +747,7 @@ export class Manager { } else { this.eventsState = { state: PAUSED, - causedBy: USER + causedBy }; for(const s of this.streams) { s.end(); diff --git a/src/Utils/ProxiedSnoowrap.ts b/src/Utils/SnoowrapClients.ts similarity index 66% rename from src/Utils/ProxiedSnoowrap.ts rename to src/Utils/SnoowrapClients.ts index 7a2e555..aaa45be 100644 --- a/src/Utils/ProxiedSnoowrap.ts +++ b/src/Utils/SnoowrapClients.ts @@ -12,7 +12,20 @@ import Snoowrap from "snoowrap"; // } // } -class ProxiedSnoowrap extends Snoowrap { +export class RequestTrackingSnoowrap extends Snoowrap { + requestCount: number = 0; + + oauthRequest(...args: any) { + // send all requests through a proxy + if(args[1] === undefined || args[1] === 1) { + this.requestCount++; + } + // @ts-ignore + return super.oauthRequest(...args); + } +} + +export class ProxiedSnoowrap extends RequestTrackingSnoowrap { proxyEndpoint: string; constructor(args: any) { @@ -29,6 +42,3 @@ class ProxiedSnoowrap extends Snoowrap { })) } } - - -export default ProxiedSnoowrap; diff --git a/src/index.ts b/src/index.ts index e0cddf4..2c715b8 100644 --- a/src/index.ts +++ b/src/index.ts @@ -42,7 +42,6 @@ const program = new Command(); (async function () { try { - debugger; preRunCmd.parse(process.argv); const { operatorConfig = process.env.OPERATOR_CONFIG } = preRunCmd.opts(); try { @@ -199,7 +198,6 @@ const program = new Command(); await program.parseAsync(); } catch (err) { - debugger; if(!err.logged && !(err instanceof LoggedError)) { const logger = winston.loggers.get('default'); if (err.name === 'StatusCodeError' && err.response !== undefined) {