mirror of
https://github.com/FoxxMD/context-mod.git
synced 2026-01-14 07:57:57 -05:00
Compare commits
1 Commits
0.13.4
...
streamBuff
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
14a317ace4 |
27
package-lock.json
generated
27
package-lock.json
generated
@@ -46,6 +46,7 @@
|
||||
"express-session-cache-manager": "^1.0.2",
|
||||
"express-socket.io-session": "^1.3.5",
|
||||
"fast-deep-equal": "^3.1.3",
|
||||
"fixed-size-list": "^0.3.0",
|
||||
"globrex": "^0.1.2",
|
||||
"got": "^11.8.2",
|
||||
"he": "^1.2.0",
|
||||
@@ -3979,6 +3980,14 @@
|
||||
"micromatch": "^4.0.2"
|
||||
}
|
||||
},
|
||||
"node_modules/fixed-size-list": {
|
||||
"version": "0.3.0",
|
||||
"resolved": "https://registry.npmjs.org/fixed-size-list/-/fixed-size-list-0.3.0.tgz",
|
||||
"integrity": "sha512-c6I8wEE4ZtjKz35BaodH7yWuWmcaUVQwgBeNcI3LxJu79YH+ezHvf1oS9VkgJmyVy5eQ8Wh6jNVcj2rB4rgVgA==",
|
||||
"dependencies": {
|
||||
"mitt": "^1.2.0"
|
||||
}
|
||||
},
|
||||
"node_modules/flat": {
|
||||
"version": "5.0.2",
|
||||
"resolved": "https://registry.npmjs.org/flat/-/flat-5.0.2.tgz",
|
||||
@@ -6113,6 +6122,11 @@
|
||||
"resolved": "https://registry.npmjs.org/minimist/-/minimist-1.2.6.tgz",
|
||||
"integrity": "sha512-Jsjnk4bw3YJqYzbdyBiNsPWHPfO++UGG749Cxs6peCu5Xg4nrena6OVxOYxrQTqww0Jmwt+Ref8rggumkTLz9Q=="
|
||||
},
|
||||
"node_modules/mitt": {
|
||||
"version": "1.2.0",
|
||||
"resolved": "https://registry.npmjs.org/mitt/-/mitt-1.2.0.tgz",
|
||||
"integrity": "sha512-r6lj77KlwqLhIUku9UWYes7KJtsczvolZkzp8hbaDPPaE24OmWl5s539Mytlj22siEQKosZ26qCBgda2PKwoJw=="
|
||||
},
|
||||
"node_modules/mkdirp": {
|
||||
"version": "0.5.6",
|
||||
"resolved": "https://registry.npmjs.org/mkdirp/-/mkdirp-0.5.6.tgz",
|
||||
@@ -13588,6 +13602,14 @@
|
||||
"micromatch": "^4.0.2"
|
||||
}
|
||||
},
|
||||
"fixed-size-list": {
|
||||
"version": "0.3.0",
|
||||
"resolved": "https://registry.npmjs.org/fixed-size-list/-/fixed-size-list-0.3.0.tgz",
|
||||
"integrity": "sha512-c6I8wEE4ZtjKz35BaodH7yWuWmcaUVQwgBeNcI3LxJu79YH+ezHvf1oS9VkgJmyVy5eQ8Wh6jNVcj2rB4rgVgA==",
|
||||
"requires": {
|
||||
"mitt": "^1.2.0"
|
||||
}
|
||||
},
|
||||
"flat": {
|
||||
"version": "5.0.2",
|
||||
"resolved": "https://registry.npmjs.org/flat/-/flat-5.0.2.tgz",
|
||||
@@ -15191,6 +15213,11 @@
|
||||
"resolved": "https://registry.npmjs.org/minimist/-/minimist-1.2.6.tgz",
|
||||
"integrity": "sha512-Jsjnk4bw3YJqYzbdyBiNsPWHPfO++UGG749Cxs6peCu5Xg4nrena6OVxOYxrQTqww0Jmwt+Ref8rggumkTLz9Q=="
|
||||
},
|
||||
"mitt": {
|
||||
"version": "1.2.0",
|
||||
"resolved": "https://registry.npmjs.org/mitt/-/mitt-1.2.0.tgz",
|
||||
"integrity": "sha512-r6lj77KlwqLhIUku9UWYes7KJtsczvolZkzp8hbaDPPaE24OmWl5s539Mytlj22siEQKosZ26qCBgda2PKwoJw=="
|
||||
},
|
||||
"mkdirp": {
|
||||
"version": "0.5.6",
|
||||
"resolved": "https://registry.npmjs.org/mkdirp/-/mkdirp-0.5.6.tgz",
|
||||
|
||||
@@ -68,6 +68,7 @@
|
||||
"express-session-cache-manager": "^1.0.2",
|
||||
"express-socket.io-session": "^1.3.5",
|
||||
"fast-deep-equal": "^3.1.3",
|
||||
"fixed-size-list": "^0.3.0",
|
||||
"globrex": "^0.1.2",
|
||||
"got": "^11.8.2",
|
||||
"he": "^1.2.0",
|
||||
|
||||
@@ -572,7 +572,7 @@ class Bot implements BotInstanceFunctions {
|
||||
if (stream !== undefined) {
|
||||
this.logger.info('Restarting SHARED COMMENT STREAM due to a subreddit config change');
|
||||
stream.end('Replacing with a new stream with updated subreddits');
|
||||
processed = stream.processed;
|
||||
processed = stream.processedBuffer;
|
||||
}
|
||||
if (sharedCommentsSubreddits.length > 100) {
|
||||
this.logger.warn(`SHARED COMMENT STREAM => Reddit can only combine 100 subreddits for getting new Comments but this bot has ${sharedCommentsSubreddits.length}`);
|
||||
@@ -605,7 +605,7 @@ class Bot implements BotInstanceFunctions {
|
||||
if (stream !== undefined) {
|
||||
this.logger.info('Restarting SHARED SUBMISSION STREAM due to a subreddit config change');
|
||||
stream.end('Replacing with a new stream with updated subreddits');
|
||||
processed = stream.processed;
|
||||
processed = stream.processedBuffer;
|
||||
}
|
||||
if (sharedSubmissionsSubreddits.length > 100) {
|
||||
this.logger.warn(`SHARED SUBMISSION STREAM => Reddit can only combine 100 subreddits for getting new Submissions but this bot has ${sharedSubmissionsSubreddits.length}`);
|
||||
|
||||
@@ -7,6 +7,7 @@ import {mergeArr, parseDuration, random} from "../util";
|
||||
import { Logger } from "winston";
|
||||
import {ErrorWithCause} from "pony-cause";
|
||||
import dayjs, {Dayjs as DayjsObj} from "dayjs";
|
||||
import { FixedSizeList } from 'fixed-size-list'
|
||||
|
||||
type Awaitable<T> = Promise<T> | T;
|
||||
|
||||
@@ -14,10 +15,12 @@ interface RCBPollingOptions<T> extends SnooStormOptions {
|
||||
subreddit: string,
|
||||
enforceContinuity?: boolean
|
||||
logger: Logger
|
||||
sort?: string
|
||||
name?: string,
|
||||
processed?: Set<T[keyof T]>
|
||||
processed?: FixedSizeList<T[keyof T]>
|
||||
label?: string
|
||||
dateCutoff?: boolean
|
||||
maxHistory?: number
|
||||
}
|
||||
|
||||
interface RCBPollConfiguration<T> extends PollConfiguration<T>,RCBPollingOptions<T> {
|
||||
@@ -40,6 +43,9 @@ export class SPoll<T extends RedditContent<object>> extends Poll<T> {
|
||||
name: string = 'Reddit Stream';
|
||||
logger: Logger;
|
||||
subreddit: string;
|
||||
// using a fixed sized "regular" array means slightly more memory usage vs. a Set when holding N items
|
||||
// BUT now we can limit N items to something reasonable instead of having a crazy big Set with all items seen since stream was started
|
||||
processedBuffer: FixedSizeList<T[keyof T]>;
|
||||
|
||||
constructor(options: RCBPollConfiguration<T>) {
|
||||
super(options);
|
||||
@@ -54,6 +60,7 @@ export class SPoll<T extends RedditContent<object>> extends Poll<T> {
|
||||
label = 'Polling',
|
||||
processed,
|
||||
dateCutoff,
|
||||
maxHistory = 300,
|
||||
} = options;
|
||||
this.subreddit = subreddit;
|
||||
this.name = name !== undefined ? name : this.name;
|
||||
@@ -67,8 +74,10 @@ export class SPoll<T extends RedditContent<object>> extends Poll<T> {
|
||||
// if we pass in processed on init the intention is to "continue" from where the previous stream left off
|
||||
// WITHOUT new start behavior
|
||||
if (processed !== undefined) {
|
||||
this.processed = processed;
|
||||
this.processedBuffer = processed;
|
||||
this.newStart = false;
|
||||
} else {
|
||||
this.processedBuffer = new FixedSizeList<T[keyof T]>(maxHistory);
|
||||
}
|
||||
|
||||
clearInterval(this.interval);
|
||||
@@ -97,14 +106,14 @@ export class SPoll<T extends RedditContent<object>> extends Poll<T> {
|
||||
}
|
||||
for (const item of batch) {
|
||||
const id = item[self.identifier];
|
||||
if (self.processed.has(id)) {
|
||||
if (self.processedBuffer.data.some(x => x === id)) {
|
||||
anyAlreadySeen = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
// add new item to list and set as processed
|
||||
newItems.push(item);
|
||||
self.processed.add(id);
|
||||
self.processedBuffer.add(id);
|
||||
}
|
||||
page++;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user