Update Script - New detection engine for more spammer types

- Auto-Smart Mode: Now also detects sex spam bots with very high accuracy. Huge amount of filtering logic added to find the large variety of these types of bots
- Auto-Smart mode can now detect bots with look-alike usernames
- Sample list now shows number of comments by each author
- Better detection of potential false positives from replies
- Filter now ignores uploader's comments regardless of who is scanning
This commit is contained in:
ThioJoe
2021-12-16 19:38:53 -07:00
parent 513b78835b
commit 1fac478bf3
4 changed files with 1764 additions and 39 deletions

View File

@@ -35,7 +35,7 @@
### IMPORTANT: I OFFER NO WARRANTY OR GUARANTEE FOR THIS SCRIPT. USE AT YOUR OWN RISK.
### I tested it on my own and implemented some failsafes as best as I could,
### but there could always be some kind of bug. You should inspect the code yourself.
version = "2.0.0-Beta1"
version = "2.0.0-Beta2"
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#
# Try Imports
@@ -57,6 +57,7 @@ try:
from base64 import b85decode as b64decode
from configparser import ConfigParser
from pkg_resources import parse_version
from confusables import confusable_regex, normalize
# Non Standard Modules
import rtfunicode
@@ -156,8 +157,8 @@ def print_comments(check_video_id_localprint, comments, logMode):
valuesPreparedToPrint = ""
print(f"{F.LIGHTMAGENTA_EX}---------------------------- Match Samples: One comment per matched-comment author ----------------------------{S.R}")
for value in matchSamplesDict.values():
valuesPreparedToWrite = valuesPreparedToWrite + f"{str(value['n'])}. {str(value['authorID'])} | {make_rtf_compatible(str(value['nameAndText']))} \\line \n"
valuesPreparedToPrint = valuesPreparedToPrint + f"{str(value['n'])}. {str(value['nameAndText'])}\n"
valuesPreparedToWrite = valuesPreparedToWrite + value['iString'] + value['cString'] + f"{str(value['authorID'])} | {make_rtf_compatible(str(value['nameAndText']))} \\line \n"
valuesPreparedToPrint = valuesPreparedToPrint + value['iString'] + value['cString'] + f"{str(value['nameAndText'])}\n"
if logMode == True:
write_rtf(logFileName, "-------------------- Match Samples: One comment per matched-comment author -------------------- \\line\\line \n")
write_rtf(logFileName, valuesPreparedToWrite)
@@ -241,7 +242,13 @@ def print_prepared_comments(check_video_id_localprep, comments, j, logMode):
# Adds a sample to matchSamplesDict and preps formatting
def add_sample(authorID, authorNameRaw, commentText):
global matchSamplesDict
count = len(matchSamplesDict) + 1
global authorMatchCountDict
# Make index number and string formatted version
index = len(matchSamplesDict) + 1
iString = f"{str(index)}. ".ljust(4)
authorNumComments = authorMatchCountDict[authorID]
cString = f"[x{str(authorNumComments)}] ".ljust(7)
# Left Justify Author Name and Comment Text
if len(authorNameRaw) > 20:
@@ -255,7 +262,7 @@ def add_sample(authorID, authorNameRaw, commentText):
commentText = commentText[0:85].ljust(85)
# Add comment sample, author ID, name, and counter
matchSamplesDict[authorID] = {'n':count, 'authorID':authorID, 'authorName':authorNameRaw, 'nameAndText':authorName + commentText}
matchSamplesDict[authorID] = {'index':index, 'cString':cString, 'iString':iString, 'count':authorNumComments, 'authorID':authorID, 'authorName':authorNameRaw, 'nameAndText':authorName + commentText}
##########################################################################################
@@ -263,7 +270,7 @@ def add_sample(authorID, authorNameRaw, commentText):
##########################################################################################
# Call the API's commentThreads.list method to list the existing comments.
def get_comments(youtube, currentUser, filterMode, filterSubMode, check_video_id=None, check_channel_id=None, nextPageToken=None, inputtedSpammerChannelID=None, inputtedUsernameFilter=None, inputtedCommentTextFilter=None, regexPattern=None): # None are set as default if no parameters passed into function
def get_comments(youtube, miscData, currentUser, filterMode, filterSubMode, check_video_id=None, check_channel_id=None, nextPageToken=None, inputtedSpammerChannelID=None, inputtedUsernameFilter=None, inputtedCommentTextFilter=None, regexPattern=None): # None are set as default if no parameters passed into function
global scannedCommentsCount
# Initialize some variables
authorChannelName = None
@@ -330,13 +337,13 @@ def get_comments(youtube, currentUser, filterMode, filterSubMode, check_video_id
commentText = "[Deleted/Missing Comment]"
# Runs check against comment info for whichever filter data is relevant
check_against_filter(currentUser, filterMode=filterMode, filterSubMode=filterSubMode, commentID=parent_id, videoID=videoID, authorChannelID=parentAuthorChannelID, parentAuthorChannelID=None, inputtedSpammerChannelID=inputtedSpammerChannelID, inputtedUsernameFilter=inputtedUsernameFilter, inputtedCommentTextFilter=inputtedCommentTextFilter, authorChannelName=authorChannelName, commentText=commentText, regexPattern=regexPattern)
check_against_filter(currentUser, miscData, filterMode=filterMode, filterSubMode=filterSubMode, commentID=parent_id, videoID=videoID, authorChannelID=parentAuthorChannelID, parentAuthorChannelID=None, inputtedSpammerChannelID=inputtedSpammerChannelID, inputtedUsernameFilter=inputtedUsernameFilter, inputtedCommentTextFilter=inputtedCommentTextFilter, authorChannelName=authorChannelName, commentText=commentText, regexPattern=regexPattern)
scannedCommentsCount += 1 # Counts number of comments scanned, add to global count
if numReplies > 0 and len(limitedRepliesList) < numReplies:
get_replies(currentUser, filterMode, filterSubMode, parent_id, videoID, parentAuthorChannelID, inputtedSpammerChannelID, inputtedUsernameFilter, inputtedCommentTextFilter, regexPattern)
get_replies(currentUser, miscData, filterMode, filterSubMode, parent_id, videoID, parentAuthorChannelID, inputtedSpammerChannelID, inputtedUsernameFilter, inputtedCommentTextFilter, regexPattern)
elif numReplies > 0 and len(limitedRepliesList) == numReplies: # limitedRepliesList can never be more than numReplies
get_replies(currentUser, filterMode, filterSubMode, parent_id, videoID, parentAuthorChannelID, inputtedSpammerChannelID, inputtedUsernameFilter, inputtedCommentTextFilter, regexPattern, limitedRepliesList)
get_replies(currentUser, miscData, filterMode, filterSubMode, parent_id, videoID, parentAuthorChannelID, inputtedSpammerChannelID, inputtedUsernameFilter, inputtedCommentTextFilter, regexPattern, limitedRepliesList)
else:
print_count_stats(final=False) # Updates displayed stats if no replies
@@ -348,7 +355,7 @@ def get_comments(youtube, currentUser, filterMode, filterSubMode, check_video_id
##########################################################################################
# Call the API's comments.list method to list the existing comment replies.
def get_replies(currentUser, filterMode, filterSubMode, parent_id, videoID, parentAuthorChannelID, inputtedSpammerChannelID=None, inputtedUsernameFilter=None, inputtedCommentTextFilter=None, regexPattern=None, repliesList=None):
def get_replies(currentUser, miscData, filterMode, filterSubMode, parent_id, videoID, parentAuthorChannelID, inputtedSpammerChannelID=None, inputtedUsernameFilter=None, inputtedCommentTextFilter=None, regexPattern=None, repliesList=None):
global scannedRepliesCount
# Initialize some variables
authorChannelName = None
@@ -403,7 +410,7 @@ def get_replies(currentUser, filterMode, filterSubMode, parent_id, videoID, pare
commentText = "[Deleted/Missing Comment]"
# Runs check against comment info for whichever filter data is relevant
check_against_filter(currentUser, filterMode=filterMode, filterSubMode=filterSubMode, commentID=replyID, videoID=videoID, authorChannelID=authorChannelID, parentAuthorChannelID=parentAuthorChannelID, inputtedSpammerChannelID=inputtedSpammerChannelID, inputtedUsernameFilter=inputtedUsernameFilter, inputtedCommentTextFilter=inputtedCommentTextFilter, authorChannelName=authorChannelName, commentText=commentText, regexPattern=regexPattern, allThreadAuthorNames=allThreadAuthorNames)
check_against_filter(currentUser, miscData, filterMode=filterMode, filterSubMode=filterSubMode, commentID=replyID, videoID=videoID, authorChannelID=authorChannelID, parentAuthorChannelID=parentAuthorChannelID, inputtedSpammerChannelID=inputtedSpammerChannelID, inputtedUsernameFilter=inputtedUsernameFilter, inputtedCommentTextFilter=inputtedCommentTextFilter, authorChannelName=authorChannelName, commentText=commentText, regexPattern=regexPattern, allThreadAuthorNames=allThreadAuthorNames)
# Update latest stats
scannedRepliesCount += 1 # Count number of replies scanned, add to global count
@@ -413,25 +420,42 @@ def get_replies(currentUser, filterMode, filterSubMode, parent_id, videoID, pare
############################## CHECK AGAINST FILTER ######################################
# The basic logic that actually checks each comment against filter criteria
def check_against_filter(currentUser, filterMode, filterSubMode, commentID, videoID, authorChannelID, parentAuthorChannelID=None, inputtedSpammerChannelID=None, inputtedUsernameFilter=None, inputtedCommentTextFilter=None, authorChannelName=None, commentText=None, regexPattern=None, allThreadAuthorNames=None):
def check_against_filter(currentUser, miscData, filterMode, filterSubMode, commentID, videoID, authorChannelID, parentAuthorChannelID=None, inputtedSpammerChannelID=None, inputtedUsernameFilter=None, inputtedCommentTextFilter=None, authorChannelName=None, commentText=None, regexPattern=None, allThreadAuthorNames=None):
global vidIdDict
global matchedCommentsDict
commentTextOriginal = str(commentText)
# Do not even check comment if author ID matches currently logged in user's ID
if currentUser[0] != authorChannelID:
# Logic to avoid false positives from replies to spammers
if allThreadAuthorNames and (filterMode == "AutoSmart" or filterMode == "NameAndText"):
for name in allThreadAuthorNames:
if "@"+str(name) in commentText:
commentText = commentText.replace("@"+str(name), "")
if currentUser[0] != authorChannelID and miscData['channelOwnerID'] != authorChannelID:
if "@" in commentText:
# Logic to avoid false positives from replies to spammers
if allThreadAuthorNames and (filterMode == "AutoSmart" or filterMode == "NameAndText"):
for name in allThreadAuthorNames:
if "@"+str(name) in commentText:
commentText = commentText.replace("@"+str(name), "")
# Extra logic to detect false positive if spammer's comment already deleted, but someone replied
if matchedCommentsDict and filterMode == "AutoSmart":
for key, value in matchedCommentsDict.items():
if "@"+str(value['authorName']) in commentText:
remove = True
for key2,value2 in matchedCommentsDict.items():
if value2['authorID'] == authorChannelID:
remove = False
if remove == True:
commentText = commentText.replace("@"+str(value['authorName']), "")
# If the comment/username matches criteria based on mode, add key/value pair of comment ID and author ID to matchedCommentsDict
# Also add key-value pair of comment ID and video ID to dictionary
# Also count how many spam comments for each author
def add_spam(commentID, videoID):
global matchedCommentsDict
global authorMatchCountDict
matchedCommentsDict[commentID] = {'text':commentTextOriginal, 'authorName':authorChannelName, 'authorID':authorChannelID, 'videoID':videoID}
vidIdDict[commentID] = videoID # Probably remove this later, but still being used for now
if authorChannelID in authorMatchCountDict:
authorMatchCountDict[authorChannelID] += 1
else:
authorMatchCountDict[authorChannelID] = 1
# Checks author of either parent comment or reply (both passed in as commentID) against channel ID inputted by user
if filterMode == "ID":
@@ -493,18 +517,105 @@ def check_against_filter(currentUser, filterMode, filterSubMode, commentID, vide
# Here inputtedComment/Author Filters are tuples of, where 2nd element is list of char-sets to check against
## Also Check if reply author ID is same as parent comment author ID, if so, ignore (to account for users who reply to spammers)
elif filterMode == "AutoSmart":
# Receive Variables
numberFilterSet = inputtedUsernameFilter['spammerNumbersSet']
compiledRegex = inputtedUsernameFilter['compiledRegex']
minNumbersMatchCount = inputtedUsernameFilter['minNumbersMatchCount']
#usernameBlackCharsSet = inputtedUsernameFilter['usernameBlackCharsSet']
spamGenEmojiSet = inputtedUsernameFilter['spamGenEmojiSet']
redAdEmojiSet = inputtedUsernameFilter['redAdEmojiSet']
yellowAdEmojiSet = inputtedUsernameFilter['yellowAdEmojiSet']
hrtSet = inputtedUsernameFilter['hrtSet']
domainRegex = inputtedUsernameFilter['domainRegex']
compiledRegexDict = inputtedUsernameFilter['compiledRegexDict']
languages = inputtedUsernameFilter['languages']
# Processed Variables
combinedString = authorChannelName + commentText
combinedSet = make_char_set(combinedString, stripLettersNumbers=True, stripPunctuation=True)
usernameSet = make_char_set(authorChannelName)
# Functions
def findOnlyObfuscated(regexExpression, originalWord, stringToSearch):
# Confusable thinks s and f look similar, have to compensate to avoid false positive
ignoredConfusablesConverter = {ord('f'):ord('s'),ord('s'):ord('f')}
result = re.findall(regexExpression, stringToSearch.lower())
if result == None:
return False
else:
for match in result:
lowerWord = originalWord.lower()
if match.lower() != lowerWord and match.lower() != lowerWord.translate(ignoredConfusablesConverter):
return True
# Run Checks
if authorChannelID == parentAuthorChannelID:
pass
elif len(numberFilterSet.intersection(combinedSet)) >= minNumbersMatchCount:
add_spam(commentID, videoID)
elif compiledRegex.search(combinedString):
add_spam(commentID, videoID)
# Black Tests
#elif usernameBlackCharsSet.intersection(usernameSet):
# add_spam(commentID, videoID)
elif any(re.search(expression[1], authorChannelName) for expression in compiledRegexDict['usernameBlackWords']):
add_spam(commentID, videoID)
elif any(findOnlyObfuscated(expression[1], expression[0], combinedString) for expression in compiledRegexDict['blackAdWords']):
add_spam(commentID, videoID)
elif re.search(inputtedUsernameFilter['usernameConfuseRegex'], authorChannelName):
add_spam(commentID, videoID)
# Multi Criteria Tests
else:
# Defaults
yellowCount = 0
redCount = 0
languageCount = 0
for language in languages:
if language[2].intersection(combinedSet):
languageCount += 1
# Yellow Tests
if any(findOnlyObfuscated(expression[1], expression[0], combinedString) for expression in compiledRegexDict['yellowAdWords']):
yellowCount += 1
if len(hrtSet.intersection(combinedSet)) >= 2:
yellowCount += 1
if yellowAdEmojiSet.intersection(combinedSet):
yellowCount += 1
if spamGenEmojiSet.intersection(combinedSet):
yellowCount += 1
if combinedString.count('#') >= 5:
yellowCount += 1
if combinedString.count('\n') >= 10:
yellowCount += 1
if re.search(domainRegex, combinedString.lower()):
yellowCount += 1
if languageCount >= 2:
yellowCount += 1
# Red Tests
#if any(foundObfuscated(re.findall(expression[1], combinedString), expression[0]) for expression in compiledRegexDict['redAdWords']):
if any(findOnlyObfuscated(expression[1], expression[0], combinedString) for expression in compiledRegexDict['redAdWords']):
redCount += 1
if redAdEmojiSet.intersection(combinedSet):
redCount += 1
# Calculate Score
if yellowCount >= 3:
add_spam(commentID, videoID)
elif redCount >= 2:
add_spam(commentID, videoID)
elif redCount >= 1 and yellowCount >= 1:
add_spam(commentID, videoID)
else:
pass
@@ -685,7 +796,7 @@ def exclude_authors(inputtedString):
# Get authorIDs for selected sample comments
for authorID, info in matchSamplesDict.items():
if str(info['n']) in SampleIDsToExclude:
if str(info['index']) in SampleIDsToExclude:
authorIDsToExclude += [authorID]
# Get comment IDs to be excluded
@@ -751,13 +862,14 @@ def get_channel_id(video_id):
results = youtube.videos().list(
part="snippet",
id=video_id,
fields="items/snippet/channelId",
fields="items/snippet/channelId,items/snippet/channelTitle",
maxResults=1
).execute()
channel_id = results["items"][0]["snippet"]["channelId"]
channelID = results["items"][0]["snippet"]["channelId"]
channelTitle = results["items"][0]["snippet"]["channelTitle"]
return channel_id
return channelID, channelTitle
############################# GET CURRENTLY LOGGED IN USER #####################################
# Class for custom exception to throw if a comment if invalid channel ID returned
@@ -1137,9 +1249,25 @@ def check_for_update(currentVersion, silentCheck=False):
elif silentCheck == True:
return isUpdateAvailable
############################# Ingest Other Files ##############################
def ingest_domain_file():
def assetFilesPath(relative_path):
if hasattr(sys, '_MEIPASS'): # If running as a pyinstaller bundle
return os.path.join(sys._MEIPASS, relative_path)
return os.path.join(os.path.abspath("assets"), relative_path) # If running as script, specifies resource folder as /assets
# Open list of root zone domain extensions
with open(assetFilesPath("rootZoneDomainList.txt"), 'r', encoding="utf-8") as domainFile:
rootZoneData = domainFile.readlines()
rootZoneList = []
for line in rootZoneData:
line = line.strip()
rootZoneList.append(line.lower())
return rootZoneList
############################# CONFIG FILE FUNCTIONS ##############################
def create_config_file():
def config_path(relative_path):
if hasattr(sys, '_MEIPASS'): # If running as a pyinstaller bundle
#print("Test1") # For Debugging
@@ -1147,7 +1275,7 @@ def create_config_file():
return os.path.join(sys._MEIPASS, relative_path)
#print("Test2") # for Debugging
#print(os.path.join(os.path.abspath("assets"), relative_path)) # For debugging
return os.path.join(os.path.abspath(""), relative_path) # If running as script, specifies resource folder as /assets
return os.path.join(os.path.abspath("assets"), relative_path) # If running as script, specifies resource folder as /assets
configFileName = "SpamPurgeConfig.ini"
@@ -1603,8 +1731,9 @@ def prepare_filter_mode_non_ascii(currentUser, scanMode, config):
sys.exit()
# Auto filter for pre-made list of common spammer-used characters in usernames
def prepare_filter_mode_smart_chars(currentUser, scanMode, config):
def prepare_filter_mode_smart(currentUser, scanMode, config, miscData):
currentUserName = currentUser[1]
domainList = miscData['domainList']
utf_16 = "utf-8"
if config and config['filter_mode'] == "autosmart":
print("Using Auto Smart Mode - Set from config file.")
@@ -1614,7 +1743,19 @@ def prepare_filter_mode_smart_chars(currentUser, scanMode, config):
print(" > Specifically, unicode characters that look like numbers\n")
input("Press Enter to continue...")
# Spam Criteria
# General Spammer Criteria
spamGenEmoji = '👇👆☝👈👉⤵️🔼🅥♜'
#usernameRedChars =""
#usernameBlackChars = ""
usernameBlackWords_Raw = [b'aA|ICWn^M`', b'aA|ICWn>^?c>']
usernameBlackWords = []
for x in usernameBlackWords_Raw: usernameBlackWords.append(b64decode(x).decode(utf_16))
# Prepare General Filters
spamGenEmojiSet = make_char_set(spamGenEmoji)
#usernameBlackCharsSet = make_char_set(usernameBlackChars)
# Type 1 Spammer Criteria
minNumbersMatchCount = 3 # Choice of minimum number of matches from spamNums before considered spam
spamNums = b'@4S%jypiv`lJC5e@4S@nyp`{~mhZfm@4T4ryqWL3kng;a@4S-lyp!*|l<&Ni@4S}pyqE91nD4xq-+|(hpyH9V;*yBsleOZVw&I?E;+~4|pM-+ovAy7_sN#{K;*quDl8NGzw&I<);+}!xo{R9GgoEI*sp65M;*qxEl8WM!x8j|+;+}%yo{aFHgoNO$sp65N;*q!Fl8fS#xZ<6;;+})zo{jLIgoWafq~ejd;*yNwleyxZy5gRM;+~G;o`m9_j_{v^hT@T>;*q)Hl8xe%y5gO?;+}=#o{#XKgoomhrs9#h;*yTyle^-byyBjQ;+~N3k%YbQpM;3vf|%lwr{a;j;*yWzlf2@cz2csS;+~Q4pM;6xk*MO4yyB9O;*-7Noxb9ph~l1-@SlW=;*+Z4lfUqvgp2T>gpBZ?gn{s%gn;m!pN{aIpP2BSpQ7-cpRDkmpO5gJpPBHTpRMqnpQG@dpSJLwpOEmKpPKNUpRVwopQP}epSSRxpONsLpPTTVpRe$ppQZ4fpSbXypOWyMpPcZWpRn+qpQiAgpSkdzpOf&NpPlfXpRw?rpQrGhpStj!pOo;OpPulYpR(|spQ!MipS$p#pOx^PpP%rZpR@3tpQ-SjpS<v$pO)~QpP=xapS19upQ`YkpS|#%pO^5RpP}%bpSAFvpR4elpT6*&pT7'
spamPlus = b';+&e|oSEXDmBO*hmf?`8;(@y2f{NmZlj4Y!;)<2xik{-1wBo0_;-|afsDa|BgyN{8;;5tIsHEbkrQ)cj;;5(MsHozot>UPz;;6aesj=dzvf`|=@42Gyyo=$Rt>S^4;+U!8n5g2IrsA2f;+e7Ho2cTPnc|$9;+&h}oSfpEo#LFH;+&u2oS^EOn(CUH@Sl}{@Sl}|@Sl}}@Sl~2@Sl~3@Sl~4@SmQc@SmQd@SmQe@SmQf@SmQg@SmQh@SmQi'
@@ -1623,14 +1764,94 @@ def prepare_filter_mode_smart_chars(currentUser, scanMode, config):
y = b64decode(spamPlus).decode(utf_16)
z = b64decode(spamOne).decode(utf_16)
# Process / Repair for Filter Use
spammerNumbersSet = make_char_set(x, stripLettersNumbers=False, stripKeyboardSpecialChars=False, stripPunctuation=False)
regexTest1 = f"[{y}][1]"
regexTest2 = f"[+][{z}]"
regexTest3 = f"[{y}][{z}]"
# Prepare Filters for Type 1 Spammers
spammerNumbersSet = make_char_set(x)
regexTest1 = f"[{y}] ?[1]"
regexTest2 = f"[+] ?[{z}]"
regexTest3 = f"[{y}] ?[{z}]"
compiledRegex = re.compile(f"({regexTest1}|{regexTest2}|{regexTest3})")
filterSettings = {'spammerNumbersSet': spammerNumbersSet, 'compiledRegex': compiledRegex, 'minNumbersMatchCount': minNumbersMatchCount}
# Type 2 Spammer Criteria
blackAdWords, redAdWords, yellowAdWords = [], [], []
blackAdWords_Raw = [b'V`yb#YanfTAaHVTW@&5', b'Z*XO9AZ>XdaB^>EX>0', b'b7f^9ZFwMYa&Km7Yy', b'V`yb#YanfTAa-eFWp4', b'V`yb#YanoPZ)Rz1', b'V`yb#Yan)MWMyv']
redAdWords_Raw = [b'W_4q0', b'b7gn', b'WNBk-', b'WFcc~', b'W-4QA', b'W-2OUYX', b'Zgpg3', b'b1HZ', b'F*qv', b'aBp&M']
yellowAdWords_Raw = [b'Y;SgD', b'Vr5}<bZKUFYy', b'VsB)5', b'XK8Y5a{', b'O~a&QV`yb=', b'Xk}@`pJf', b'Xm4}']
redAdEmoji = b64decode(b'@Sl{P').decode(utf_16)
yellowAdEmoji = b64decode(b'@Sl-|@Sm8N@Sm8C@Sl>4@Sl;H@Sly0').decode(utf_16)
hrt = b64decode(b';+duJpOTpHpOTjFpOTmGpOTaCpOTsIpOTvJpOTyKpOT#LpQoYlpOT&MpO&QJouu%el9lkElAZ').decode(utf_16)
for x in blackAdWords_Raw: blackAdWords.append(b64decode(x).decode(utf_16))
for x in redAdWords_Raw: redAdWords.append(b64decode(x).decode(utf_16))
for x in yellowAdWords_Raw: yellowAdWords.append(b64decode(x).decode(utf_16))
# Prepare Filters for Type 2 Spammers
redAdEmojiSet = make_char_set(redAdEmoji)
yellowAdEmojiSet = make_char_set(yellowAdEmoji)
hrtSet = make_char_set(hrt)
# Prepare Regex for Type 2 and General Spammers
compiledRegexDict = {
'usernameBlackWords': [],
'blackAdWords': [],
'redAdWords': [],
'yellowAdWords': []
}
# Compile regex with upper case, otherwise many false positive character matches
bufferMatch, addBuffers = "*_~|`", "\[\]\(\)" # Add 'buffer' chars to compensate for obfuscation
m = bufferMatch
a = addBuffers
for word in usernameBlackWords:
value = re.compile(confusable_regex(word.upper(), include_character_padding=True).replace(m, a))
compiledRegexDict['usernameBlackWords'].append([word, value])
for word in blackAdWords:
value = re.compile(confusable_regex(word.upper(), include_character_padding=True).replace(m, a))
compiledRegexDict['blackAdWords'].append([word, value])
for word in redAdWords:
value = re.compile(confusable_regex(word.upper(), include_character_padding=True).replace(m, a))
compiledRegexDict['redAdWords'].append([word, value])
for word in yellowAdWords:
value = re.compile(confusable_regex(word.upper(), include_character_padding=True).replace(m, a))
compiledRegexDict['yellowAdWords'].append([word, value])
usernameConfuseRegex = re.compile(confusable_regex(miscData['channelOwnerName']))
# Prepare All-domain Regex Expression
prepString = "\.("
first = True
for extension in domainList:
if first == True:
prepString += extension
first = False
else:
prepString = prepString + "|" + extension
prepString = prepString + ")\/"
domainRegex = re.compile(prepString)
# Prepare Multi Language Detection
turkish = 'Ç窺Ğğİ'
germanic = 'ẞßÄä'
cyrillic = "гджзклмнпрстфхцчшщыэюяъь"
japanese = 'ァアィイゥウェエォオカガキギクグケゲコゴサザシジスズセゼソゾタダチヂテデトドナニヌネノハバパヒビピフブプヘベペホボポマミムメモャヤュユョヨラリルレロヮワヰヱヲンヴヵヶヷヸヹヺーヽヾヿぁあぃいぅうぇえぉおかがきぎぐけげこごさざしじすずせぜそぞただちぢっつづてでとどなにぬねのはばぱひびぴふぶぷへべぺほぼぽまみむめもゃやゅゆょよらりるれろゎわゐゑをんゔゕゖゝゞゟ'
languages = [['turkish', turkish, []], ['germanic', germanic, []], ['cyrillic', cyrillic, []], ['japanese', japanese, []]]
for item in languages:
item[2] = make_char_set(item[1])
filterSettings = {
'spammerNumbersSet': spammerNumbersSet,
'compiledRegex': compiledRegex,
'minNumbersMatchCount': minNumbersMatchCount,
'blackAdWords': blackAdWords,
'redAdWords': redAdWords,
'yellowAdWords': yellowAdWords,
#'usernameBlackCharsSet': usernameBlackCharsSet,
'spamGenEmojiSet': spamGenEmojiSet,
'usernameBlackWords': usernameBlackWords,
'redAdEmojiSet': redAdEmojiSet,
'yellowAdEmojiSet': yellowAdEmojiSet,
'hrtSet': hrtSet,
'domainRegex': domainRegex,
'compiledRegexDict': compiledRegexDict,
'usernameConfuseRegex': usernameConfuseRegex,
'languages': languages
}
return filterSettings, None
##########################################################################################
@@ -1654,9 +1875,11 @@ def main():
global scannedRepliesCount
global scannedCommentsCount
global matchSamplesDict
global authorMatchCountDict
# Default values for global variables
matchedCommentsDict = {}
authorMatchCountDict = {}
vidIdDict = {}
vidTitleDict = {}
matchSamplesDict = {}
@@ -1717,7 +1940,12 @@ def main():
input("Press Enter to exit...")
sys.exit()
# Load any other data
print("\n Loading...\n")
miscData = {}
domainList = ingest_domain_file()
miscData['domainList'] = domainList
# Check for program updates
if not config or config['auto_check_update'] == True:
try:
@@ -1815,8 +2043,13 @@ def main():
check_video_id = str(validVideoID[1])
title = get_video_title(check_video_id)
print("\nChosen Video: " + title)
if currentUser[0] != get_channel_id(check_video_id):
channelOwner = get_channel_id(check_video_id)
if currentUser[0] != channelOwner[0]:
userNotChannelOwner = True
miscData['channelOwnerID'] = channelOwner[0]
miscData['channelOwnerName'] = channelOwner[1]
# Ask if correct video, or skip if config
if config and config['skip_confirm_video'] == True:
confirm = True
@@ -1849,6 +2082,8 @@ def main():
except:
print("\nInvalid Input! - Must be a whole number.")
validConfigSetting = False
miscData['channelOwnerID'] = currentUser[0]
miscData['channelOwnerName'] = currentUser[1]
# Create config file
elif scanMode == "makeConfig":
@@ -1963,7 +2198,7 @@ def main():
regexPattern = filterSettings[0]
elif filterMode == "AutoSmart":
filterSettings = prepare_filter_mode_smart_chars(currentUser, scanMode, config)
filterSettings = prepare_filter_mode_smart(currentUser, scanMode, config, miscData)
inputtedUsernameFilter = filterSettings[0]
inputtedCommentTextFilter = filterSettings[0]
@@ -1991,15 +2226,15 @@ def main():
print("(Note: If the program appears to freeze, try right clicking within the window)\n")
print(" --- Scanning --- \n")
def scan_all(youtube, currentUser, filterMode, filterSubMode, check_video_id, check_channel_id, inputtedSpammerChannelID, inputtedUsernameFilter, inputtedCommentTextFilter, regexPattern):
nextPageToken = get_comments(youtube, currentUser, filterMode, filterSubMode, check_video_id, check_channel_id, inputtedSpammerChannelID=inputtedSpammerChannelID, inputtedUsernameFilter=inputtedUsernameFilter, inputtedCommentTextFilter=inputtedCommentTextFilter, regexPattern=regexPattern)
def scan_all(youtube, miscData, currentUser, filterMode, filterSubMode, check_video_id, check_channel_id, inputtedSpammerChannelID, inputtedUsernameFilter, inputtedCommentTextFilter, regexPattern):
nextPageToken = get_comments(youtube, miscData, currentUser, filterMode, filterSubMode, check_video_id, check_channel_id, inputtedSpammerChannelID=inputtedSpammerChannelID, inputtedUsernameFilter=inputtedUsernameFilter, inputtedCommentTextFilter=inputtedCommentTextFilter, regexPattern=regexPattern)
print_count_stats(final=False) # Prints comment scan stats, updates on same line
# After getting first page, if there are more pages, goes to get comments for next page
while nextPageToken != "End" and scannedCommentsCount < maxScanNumber:
nextPageToken = get_comments(youtube, currentUser, filterMode, filterSubMode, check_video_id, check_channel_id, nextPageToken, inputtedSpammerChannelID=inputtedSpammerChannelID, inputtedUsernameFilter=inputtedUsernameFilter, inputtedCommentTextFilter=inputtedCommentTextFilter, regexPattern=regexPattern)
nextPageToken = get_comments(youtube, miscData, currentUser, filterMode, filterSubMode, check_video_id, check_channel_id, nextPageToken, inputtedSpammerChannelID=inputtedSpammerChannelID, inputtedUsernameFilter=inputtedUsernameFilter, inputtedCommentTextFilter=inputtedCommentTextFilter, regexPattern=regexPattern)
print_count_stats(final=True) # Prints comment scan stats, finalizes
params = [youtube, currentUser, filterMode, filterSubMode, check_video_id, check_channel_id, inputtedSpammerChannelID, inputtedUsernameFilter, inputtedCommentTextFilter, regexPattern]
params = [youtube, miscData, currentUser, filterMode, filterSubMode, check_video_id, check_channel_id, inputtedSpammerChannelID, inputtedUsernameFilter, inputtedCommentTextFilter, regexPattern]
scan_all(*params)
##########################################################
bypass = False

File diff suppressed because it is too large Load Diff

View File

@@ -3,4 +3,5 @@ google_auth_oauthlib==0.4.6
protobuf==3.19.1
colorama==0.4.4
rtfunicode==2.0
confusables==1.2.0
certifi