mirror of
https://github.com/ThioJoe/YT-Spammer-Purge.git
synced 2026-01-09 22:28:08 -05:00
Add Confusables Local, False Positive Notice, TimeStamps, Creator Specific Filters
- Add new folder in Scripts containing a slightly modified version of the confusables module. Added several custom characters. No longer require installing confusables remotely - Added some filters for comments targeted at specific large channels, and that overflow elsewhere - Improved / fixed behavior of certain regex filters. Converted remaining filter lists to regex - In match samples list, added warning if some comments are possible false positives, and highlights them - Log file now includes timestamp of each comment
This commit is contained in:
3
Scripts/confusablesCustom/Note About This Module.txt
Normal file
3
Scripts/confusablesCustom/Note About This Module.txt
Normal file
@@ -0,0 +1,3 @@
|
||||
This included modules folder is from here: https://github.com/woodgern/confusables
|
||||
|
||||
It has been included so that I could modify the custom_confusables.txt file with additional characters.
|
||||
79
Scripts/confusablesCustom/__init__.py
Normal file
79
Scripts/confusablesCustom/__init__.py
Normal file
@@ -0,0 +1,79 @@
|
||||
import json
|
||||
import re
|
||||
import os
|
||||
from itertools import product
|
||||
|
||||
from .config import CONFUSABLE_MAPPING_PATH, NON_NORMAL_ASCII_CHARS
|
||||
from .utils import is_ascii
|
||||
|
||||
|
||||
# read confusable mappings from file, build 2-way map of the pairs
|
||||
with open(os.path.join(os.path.dirname(__file__), CONFUSABLE_MAPPING_PATH), "r") as mappings:
|
||||
CONFUSABLE_MAP = json.loads(mappings.readline())
|
||||
|
||||
|
||||
def is_confusable(str1, str2):
|
||||
while str1 and str2:
|
||||
length1, length2 = 0, 0
|
||||
for index in range(len(str1), 0, -1):
|
||||
if str1[:index] in confusable_characters(str2[0]):
|
||||
length1 = index
|
||||
break
|
||||
for index in range(len(str2), 0, -1):
|
||||
if str2[:index] in confusable_characters(str1[0]):
|
||||
length2 = index
|
||||
break
|
||||
|
||||
if not length1 and not length2:
|
||||
return False
|
||||
elif not length2 or length1 >= length2:
|
||||
str1 = str1[length1:]
|
||||
str2 = str2[1:]
|
||||
else:
|
||||
str1 = str1[1:]
|
||||
str2 = str2[length2:]
|
||||
return str1 == str2
|
||||
|
||||
def confusable_characters(char):
|
||||
mapped_chars = CONFUSABLE_MAP.get(char)
|
||||
if mapped_chars:
|
||||
return mapped_chars
|
||||
if len(char) <= 1:
|
||||
return [char]
|
||||
return None
|
||||
|
||||
def confusable_regex(string, include_character_padding=False):
|
||||
space_regex = "[\*_~|`\-\.]*" if include_character_padding else ''
|
||||
regex = space_regex
|
||||
for char in string:
|
||||
escaped_chars = [re.escape(c) for c in confusable_characters(char)]
|
||||
regex += "(?:" + "|".join(escaped_chars) + ")" + space_regex
|
||||
|
||||
return regex
|
||||
|
||||
def normalize(string, prioritize_alpha=False):
|
||||
normal_forms = set([""])
|
||||
for char in string:
|
||||
normalized_chars = []
|
||||
confusable_chars = confusable_characters(char)
|
||||
if not is_ascii(char) or not char.isalpha():
|
||||
for confusable in confusable_chars:
|
||||
if prioritize_alpha:
|
||||
if ((char.isalpha() and confusable.isalpha() and is_ascii(confusable)) or (not char.isalpha() and is_ascii(confusable))) and confusable not in NON_NORMAL_ASCII_CHARS:
|
||||
normal = confusable
|
||||
if len(confusable) > 1:
|
||||
normal = normalize(confusable)[0]
|
||||
normalized_chars.append(normal)
|
||||
else:
|
||||
if is_ascii(confusable) and confusable not in NON_NORMAL_ASCII_CHARS:
|
||||
normal = confusable
|
||||
if len(confusable) > 1:
|
||||
normal = normalize(confusable)[0]
|
||||
normalized_chars.append(normal)
|
||||
else:
|
||||
normalized_chars = [char]
|
||||
|
||||
if len(normalized_chars) == 0:
|
||||
normalized_chars = [char]
|
||||
normal_forms = set([x[0]+x[1].lower() for x in list(product(normal_forms, normalized_chars))])
|
||||
return sorted(list(normal_forms))
|
||||
1
Scripts/confusablesCustom/assets/confusable_mapping.json
Normal file
1
Scripts/confusablesCustom/assets/confusable_mapping.json
Normal file
File diff suppressed because one or more lines are too long
9638
Scripts/confusablesCustom/assets/confusables.txt
Normal file
9638
Scripts/confusablesCustom/assets/confusables.txt
Normal file
File diff suppressed because it is too large
Load Diff
28
Scripts/confusablesCustom/assets/custom_confusables.txt
Normal file
28
Scripts/confusablesCustom/assets/custom_confusables.txt
Normal file
@@ -0,0 +1,28 @@
|
||||
0061 ; 0040 ; # a → @
|
||||
065 ; 0033 ; # e → 3
|
||||
0069 ; 0021 ; # i → !
|
||||
0041 ; 1D00 ; # A → ᴀ
|
||||
0042 ; 0299 ; # B → ʙ
|
||||
0043 ; 1D04 ; # C → ᴄ
|
||||
0044 ; 1D05 ; # D → ᴅ
|
||||
0045 ; 1D07 ; # E → ᴇ
|
||||
0046 ; A730 ; # F → ꜰ
|
||||
0047 ; 0262 ; # G → ɢ
|
||||
0048 ; 029C ; # H → ʜ
|
||||
0049 ; 026A ; # I → ɪ
|
||||
004A ; 1D0A ; # J → ᴊ
|
||||
004B ; 1D0B ; # K → ᴋ
|
||||
004C ; 029F ; # L → ʟ
|
||||
004D ; 1D0D ; # M → ᴍ
|
||||
004E ; 0274 ; # N → ɴ
|
||||
004F ; 1D0F ; # O → ᴏ
|
||||
0050 ; 1D18 ; # P → ᴘ
|
||||
0051 ; A7AF ; # Q → ꞯ
|
||||
0052 ; 0280 ; # R → ʀ
|
||||
0053 ; A731 ; # S → ꜱ
|
||||
0054 ; 1D1B ; # T → ᴛ
|
||||
0055 ; 1D1C ; # U → ᴜ
|
||||
0056 ; 1D20 ; # V → ᴠ
|
||||
0057 ; 1D21 ; # W → ᴡ
|
||||
0059 ; 028F ; # Y → ʏ
|
||||
005A ; 1D22 ; # Z → ᴢ
|
||||
5
Scripts/confusablesCustom/config.py
Normal file
5
Scripts/confusablesCustom/config.py
Normal file
@@ -0,0 +1,5 @@
|
||||
CUSTOM_CONFUSABLE_PATH = "assets/custom_confusables.txt"
|
||||
CONFUSABLES_PATH = "assets/confusables.txt"
|
||||
CONFUSABLE_MAPPING_PATH = "assets/confusable_mapping.json"
|
||||
MAX_SIMILARITY_DEPTH = 2
|
||||
NON_NORMAL_ASCII_CHARS = ['@']
|
||||
97
Scripts/confusablesCustom/parse.py
Normal file
97
Scripts/confusablesCustom/parse.py
Normal file
@@ -0,0 +1,97 @@
|
||||
import json
|
||||
from unicodedata import normalize
|
||||
import string
|
||||
import os
|
||||
from config import CUSTOM_CONFUSABLE_PATH, CONFUSABLES_PATH, CONFUSABLE_MAPPING_PATH, MAX_SIMILARITY_DEPTH
|
||||
|
||||
def _asciify(char):
|
||||
return normalize('NFD',char).encode('ascii', 'ignore').decode('ascii')
|
||||
|
||||
def _get_accented_characters(char):
|
||||
return [u for u in (chr(i) for i in range(137928)) if u != char and _asciify(u) == char]
|
||||
|
||||
def _get_confusable_chars(character, unicode_confusable_map, depth):
|
||||
mapped_chars = unicode_confusable_map[character]
|
||||
|
||||
group = set([character])
|
||||
if depth <= MAX_SIMILARITY_DEPTH:
|
||||
for mapped_char in mapped_chars:
|
||||
group.update(_get_confusable_chars(mapped_char, unicode_confusable_map, depth + 1))
|
||||
return group
|
||||
|
||||
def parse_new_mapping_file():
|
||||
unicode_confusable_map = {}
|
||||
|
||||
with open(os.path.join(os.path.dirname(__file__), CONFUSABLES_PATH), "r", encoding = 'utf-8') as unicode_mappings:
|
||||
with open(os.path.join(os.path.dirname(__file__), CUSTOM_CONFUSABLE_PATH), "r", encoding = 'utf-8') as custom_mappings:
|
||||
mappings = unicode_mappings.readlines()
|
||||
mappings.extend(custom_mappings)
|
||||
|
||||
for mapping_line in mappings:
|
||||
if not mapping_line.strip() or mapping_line[0] == '#' or mapping_line[1] == '#':
|
||||
continue
|
||||
|
||||
mapping = mapping_line.split(";")[:2]
|
||||
str1 = chr(int(mapping[0].strip(), 16))
|
||||
mapping[1] = mapping[1].strip().split(" ")
|
||||
mapping[1] = [chr(int(x, 16)) for x in mapping[1]]
|
||||
str2 = "".join(mapping[1])
|
||||
|
||||
if unicode_confusable_map.get(str1):
|
||||
unicode_confusable_map[str1].add(str2)
|
||||
else:
|
||||
unicode_confusable_map[str1] = set([str2])
|
||||
|
||||
if unicode_confusable_map.get(str2):
|
||||
unicode_confusable_map[str2].add(str1)
|
||||
else:
|
||||
unicode_confusable_map[str2] = set([str1])
|
||||
|
||||
if len(str1) == 1:
|
||||
case_change = str1.lower() if str1.isupper() else str1.upper()
|
||||
if case_change != str1:
|
||||
unicode_confusable_map[str1].add(case_change)
|
||||
if unicode_confusable_map.get(case_change) is not None:
|
||||
unicode_confusable_map[case_change].add(str1)
|
||||
else:
|
||||
unicode_confusable_map[case_change] = set([str1])
|
||||
|
||||
if len(str2) == 1:
|
||||
case_change = str2.lower() if str2.isupper() else str2.upper()
|
||||
if case_change != str2:
|
||||
unicode_confusable_map[str2].add(case_change)
|
||||
if unicode_confusable_map.get(case_change) is not None:
|
||||
unicode_confusable_map[case_change].add(str2)
|
||||
else:
|
||||
unicode_confusable_map[case_change] = set([str2])
|
||||
|
||||
for char in string.ascii_lowercase:
|
||||
accented = _get_accented_characters(char)
|
||||
unicode_confusable_map[char].update(accented)
|
||||
for accent in accented:
|
||||
if unicode_confusable_map.get(accent):
|
||||
unicode_confusable_map[accent].add(char)
|
||||
else:
|
||||
unicode_confusable_map[accent] = set([char])
|
||||
|
||||
for char in string.ascii_uppercase:
|
||||
accented = _get_accented_characters(char)
|
||||
unicode_confusable_map[char].update(accented)
|
||||
for accent in accented:
|
||||
if unicode_confusable_map.get(accent):
|
||||
unicode_confusable_map[accent].add(char)
|
||||
else:
|
||||
unicode_confusable_map[accent] = set([char])
|
||||
|
||||
CONFUSABLE_MAP = {}
|
||||
characters_to_map = list(unicode_confusable_map.keys())
|
||||
for character in list(unicode_confusable_map.keys()):
|
||||
char_group = _get_confusable_chars(character, unicode_confusable_map, 0)
|
||||
|
||||
CONFUSABLE_MAP[character] = list(char_group)
|
||||
|
||||
mapping_file = open(os.path.join(os.path.dirname(__file__), CONFUSABLE_MAPPING_PATH), "w")
|
||||
mapping_file.write(json.dumps(CONFUSABLE_MAP))
|
||||
mapping_file.close()
|
||||
|
||||
parse_new_mapping_file()
|
||||
5
Scripts/confusablesCustom/utils.py
Normal file
5
Scripts/confusablesCustom/utils.py
Normal file
@@ -0,0 +1,5 @@
|
||||
def is_ascii(string):
|
||||
for char in string:
|
||||
if ord(char) >= 128:
|
||||
return False
|
||||
return True
|
||||
File diff suppressed because one or more lines are too long
@@ -5,6 +5,7 @@ import Scripts.utils as utils
|
||||
import Scripts.auth as auth
|
||||
from Scripts.utils import choice
|
||||
from unicodedata import category as unicode_category
|
||||
from datetime import datetime
|
||||
|
||||
import rtfunicode
|
||||
import os
|
||||
@@ -44,6 +45,14 @@ def print_comments(current, config, scanVideoID, loggingEnabled, scanMode, logMo
|
||||
write_plaintext_log(current.logFileName, commentsContents)
|
||||
print(" ")
|
||||
|
||||
# Check if any flagged as possible false positives
|
||||
possibleFalsePositive = False
|
||||
for author in current.matchSamplesDict.values():
|
||||
if author['possibleFalsePositive'] == True:
|
||||
possibleFalsePositive = True
|
||||
break
|
||||
|
||||
|
||||
# Print Sample Match List
|
||||
valuesPreparedToWrite = ""
|
||||
valuesPreparedToPrint = ""
|
||||
@@ -70,16 +79,28 @@ def print_comments(current, config, scanVideoID, loggingEnabled, scanMode, logMo
|
||||
spamThreadNotice = False
|
||||
|
||||
def print_and_write(value, writeValues, printValues):
|
||||
if loggingEnabled == True and logMode == "rtf":
|
||||
writeValues = writeValues + value['iString'] + value['cString'] + f"{str(value['authorID'])} | {make_rtf_compatible(str(value['nameAndText']))} \\line \n"
|
||||
elif loggingEnabled == True and logMode == "plaintext":
|
||||
writeValues = writeValues + value['iString'] + value['cString'] + f"{str(value['authorID'])} | {str(value['nameAndText'])}\n"
|
||||
indexString = value['iString']
|
||||
countString = value['cString']
|
||||
authorID = value['authorID']
|
||||
nameAndText = value['nameAndText']
|
||||
if doWritePrint:
|
||||
printValues = printValues + value['iString'] + value['cString'] + f"{str(value['nameAndText'])}\n"
|
||||
printValues = printValues + indexString + countString + f"{str(nameAndText)}\n"
|
||||
# After making print values, remove the ANSI escape / color codes used, so they won't be written to file
|
||||
indexString = indexString.replace(u"\u001b[32m", "").replace(u"\u001b[0m", "")
|
||||
countString = countString.replace(u"\u001b[32m", "").replace(u"\u001b[0m", "")
|
||||
nameAndText = nameAndText.replace(u"\u001b[32m", "").replace(u"\u001b[0m", "")
|
||||
|
||||
if loggingEnabled == True and logMode == "rtf":
|
||||
writeValues = writeValues + indexString + countString + f"{str(authorID)} | {make_rtf_compatible(str(nameAndText))} \\line \n"
|
||||
elif loggingEnabled == True and logMode == "plaintext":
|
||||
writeValues = writeValues + indexString + countString + f"{str(authorID)} | {str(nameAndText)}\n"
|
||||
return writeValues, printValues
|
||||
# --------------------------------------------------------------------------------------------
|
||||
|
||||
if doWritePrint:
|
||||
print(f"{F.LIGHTMAGENTA_EX}============================ Match Samples: One comment per matched-comment author ============================{S.R}")
|
||||
if possibleFalsePositive:
|
||||
print(f"{F.GREEN}======= {B.GREEN}{F.BLACK} NOTE: {S.R}{F.GREEN} Possible false positives marked with * and highlighted in green. Check them extra well! ======={S.R}")
|
||||
for value in current.matchSamplesDict.values():
|
||||
if value['matchReason'] != "Duplicate" and value['matchReason'] != "Spam Bot Thread" and value['matchReason'] != "Repost":
|
||||
valuesPreparedToWrite, valuesPreparedToPrint = print_and_write(value, valuesPreparedToWrite, valuesPreparedToPrint)
|
||||
@@ -93,9 +114,9 @@ def print_comments(current, config, scanVideoID, loggingEnabled, scanMode, logMo
|
||||
elif value['matchReason'] == "Repost":
|
||||
hasReposts = True
|
||||
if config['fuzzy_stolen_comment_detection'] == True:
|
||||
similarity = str(round(float(config['levenshtein_distance'])*100))+"%"
|
||||
repostSimilarity = str(round(float(config['levenshtein_distance'])*100))+"%"
|
||||
else:
|
||||
similarity = "100%"
|
||||
repostSimilarity = "100%"
|
||||
minLength = str(config['stolen_minimum_text_length'])
|
||||
if doWritePrint:
|
||||
print(valuesPreparedToPrint)
|
||||
@@ -127,7 +148,7 @@ def print_comments(current, config, scanVideoID, loggingEnabled, scanMode, logMo
|
||||
# Print Repost Match Samples
|
||||
if hasReposts == True:
|
||||
print(f"{F.LIGHTMAGENTA_EX}------------------------- {S.BRIGHT}{F.WHITE}{B.BLUE} Non-Matched {S.R}{F.LIGHTCYAN_EX} Commenters, but who stole a previous comment{F.LIGHTMAGENTA_EX} -------------------------{S.R}")
|
||||
print(f"{F.MAGENTA}-------------------------- ( {F.LIGHTBLUE_EX}Similarity Threshold: {similarity} | Minimum Length: {minLength}{F.MAGENTA} ) ----------------------------{S.R}")
|
||||
print(f"{F.MAGENTA}-------------------------- ( {F.LIGHTBLUE_EX}Similarity Threshold: {repostSimilarity} | Minimum Length: {minLength}{F.MAGENTA} ) ----------------------------{S.R}")
|
||||
for value in current.matchSamplesDict.values():
|
||||
if value['matchReason'] == "Repost":
|
||||
repostValuesToWrite, repostValuesToPrint = print_and_write(value, repostValuesToWrite, repostValuesToPrint)
|
||||
@@ -139,7 +160,12 @@ def print_comments(current, config, scanVideoID, loggingEnabled, scanMode, logMo
|
||||
if loggingEnabled == True:
|
||||
|
||||
if logMode == "rtf":
|
||||
matchSamplesContent = "==================== Match Samples: One comment per matched-comment author ==================== \\line\\line \n" + valuesPreparedToWrite
|
||||
if possibleFalsePositive:
|
||||
addLine = "\\line \n==================== NOTE: Possible false positives marked with * Check them extra well! ==================== \\line\\line \n"
|
||||
else: addLine = "\\line\\line \n"
|
||||
matchSamplesContent = f"==================== Match Samples: One comment per matched-comment author ==================== {addLine}" + valuesPreparedToWrite
|
||||
|
||||
|
||||
if doWritePrint:
|
||||
write_rtf(current.logFileName, matchSamplesContent)
|
||||
if current.spamThreadsDict:
|
||||
@@ -156,12 +182,16 @@ def print_comments(current, config, scanVideoID, loggingEnabled, scanMode, logMo
|
||||
write_rtf(current.logFileName, duplicateSamplesContent)
|
||||
|
||||
if hasReposts == True:
|
||||
repostSamplesContent = " \n \\line\\line -------------------- Non-Matched Commenters, but who reposted a previous comment -------------------- \\line \n"
|
||||
repostSamplesContent += f"---------------------- ( Similarity Threshold: {similarity} | Minimum Length: {minLength} ) ---------------------- \\line\\line \n" + repostValuesToWrite
|
||||
repostSamplesContent = " \n \\line\\line -------------------- Non-Matched Commenters, but who stole a previous comment -------------------- \\line \n"
|
||||
repostSamplesContent += f"---------------------- ( Similarity Threshold: {repostSimilarity} | Minimum Length: {minLength} ) ---------------------- \\line\\line \n" + repostValuesToWrite
|
||||
if doWritePrint:
|
||||
write_rtf(current.logFileName, repostSamplesContent)
|
||||
elif logMode == "plaintext":
|
||||
matchSamplesContent = "==================== Match Samples: One comment per matched-comment author ====================\n" + valuesPreparedToWrite
|
||||
if possibleFalsePositive:
|
||||
addLine = "==================== NOTE: Possible false positives marked with * Check them extra well! ==================== \n"
|
||||
else: addLine = ""
|
||||
|
||||
matchSamplesContent = f"==================== Match Samples: One comment per matched-comment author ====================\n{addLine}" + valuesPreparedToWrite
|
||||
if doWritePrint:
|
||||
write_plaintext_log(current.logFileName, matchSamplesContent)
|
||||
if current.spamThreadsDict:
|
||||
@@ -181,7 +211,7 @@ def print_comments(current, config, scanVideoID, loggingEnabled, scanMode, logMo
|
||||
logFileContents = commentsContents + matchSamplesContent + spamThreadSamplesContent + duplicateSamplesContent + repostSamplesContent
|
||||
if hasReposts == True:
|
||||
repostSamplesContent = "\n-------------------- Non-Matched Commenters, but who stole a previous comment --------------------\n"
|
||||
repostSamplesContent += f"---------------------- ( Similarity Threshold: {similarity} | Minimum Length: {minLength} ) ----------------------\n" + repostValuesToWrite
|
||||
repostSamplesContent += f"---------------------- ( Similarity Threshold: {repostSimilarity} | Minimum Length: {minLength} ) ----------------------\n" + repostValuesToWrite
|
||||
if doWritePrint:
|
||||
write_plaintext_log(current.logFileName, repostSamplesContent)
|
||||
else:
|
||||
@@ -256,6 +286,11 @@ def print_prepared_comments(current, commentsContents, scanVideoID, comments, j,
|
||||
videoID = metadata['videoID']
|
||||
matchReason = metadata['matchReason']
|
||||
originalCommentID = metadata['originalCommentID']
|
||||
timestamp = metadata['timestamp']
|
||||
|
||||
# Convert timestamp to readable format. First parses, then reconverts to new string
|
||||
timeObject = datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ')
|
||||
dateAndTime = timeObject.strftime("%b %d, %Y @ %I:%M:%S %p")
|
||||
|
||||
# Truncates very long comments, and removes excessive multiple lines
|
||||
if len(text) > 1500:
|
||||
@@ -266,6 +301,7 @@ def print_prepared_comments(current, commentsContents, scanVideoID, comments, j,
|
||||
# Add one sample from each matching author to current.matchSamplesDict, containing author ID, name, and text
|
||||
if matchReason != "Also By Matched Author" and author_id_local not in current.matchSamplesDict.keys():
|
||||
add_sample(current, author_id_local, author, text, matchReason)
|
||||
mark_possible_false_positive(current, author_id_local, text, matchReason)
|
||||
|
||||
# Build comment direct link
|
||||
if scanMode == "communityPost" or scanMode == "recentCommunityPosts":
|
||||
@@ -310,7 +346,6 @@ def print_prepared_comments(current, commentsContents, scanVideoID, comments, j,
|
||||
else:
|
||||
originalCommentInfoLine = ""
|
||||
|
||||
|
||||
if logMode == "rtf":
|
||||
commentInfo = (
|
||||
# Author Info
|
||||
@@ -324,6 +359,7 @@ def print_prepared_comments(current, commentsContents, scanVideoID, comments, j,
|
||||
+ " > Reason: " + matchReason + "\\line "+ "\n"
|
||||
+ originalCommentInfoLine
|
||||
+ titleInfoLine
|
||||
+ " > Timestamp: " + dateAndTime + "\\line " + "\n"
|
||||
+ " > Direct Link: " + directLink + " \\line "+ "\n"
|
||||
+ " > Author Channel ID: \cf6" + author_id_local + r"\cf1 \line "+ "\n"
|
||||
+ "=============================================================================================\\line\\line\\line" + "\n\n\n"
|
||||
@@ -341,6 +377,7 @@ def print_prepared_comments(current, commentsContents, scanVideoID, comments, j,
|
||||
+ " > Reason: " + matchReason + "\n"
|
||||
+ originalCommentInfoLine
|
||||
+ titleInfoLine
|
||||
+ " > Timestamp: " + dateAndTime + "\n"
|
||||
+ " > Direct Link: " + directLink + "\n"
|
||||
+ " > Author Channel ID: " + author_id_local + "\n"
|
||||
+ "=============================================================================================\n\n\n"
|
||||
@@ -736,23 +773,34 @@ def sort_samples(current):
|
||||
if item[1]['matchReason'] == 'Repost':
|
||||
newDict[item[0]] = item[1]
|
||||
|
||||
|
||||
# # Assign Indexes and strings to print with index for each author
|
||||
# def assign_index(author, i):
|
||||
# iString = f"{str(i)}. ".ljust(4)
|
||||
# current.matchSamplesDict[author]['index'] = i
|
||||
# current.matchSamplesDict[author]['iString'] = iString
|
||||
# i += 1
|
||||
# return i
|
||||
|
||||
i = 1
|
||||
for author in newDict.keys():
|
||||
iString = f"{str(i)}. ".ljust(4)
|
||||
for author, dictionary in newDict.items():
|
||||
# Makes the sample line green if marked as possible false positive, also adds asterisk
|
||||
if dictionary['possibleFalsePositive'] == True:
|
||||
iString = f"{F.GREEN}*{str(i)}. {S.R}".ljust(5)
|
||||
newDict[author]['cString'] = f"{F.GREEN}{dictionary['cString']}{S.R}" #cString is 'count string'
|
||||
newDict[author]['nameAndText'] = f"{F.GREEN}{dictionary['nameAndText']}{S.R}"
|
||||
else:
|
||||
iString = f" {str(i)}. ".ljust(5)
|
||||
newDict[author]['index'] = i
|
||||
newDict[author]['iString'] = iString
|
||||
i += 1
|
||||
|
||||
current.matchSamplesDict = newDict
|
||||
return current
|
||||
|
||||
# -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
||||
|
||||
def mark_possible_false_positive(current, authorID, text, matchReason):
|
||||
if matchReason != 'Filter Match':
|
||||
current.matchSamplesDict[authorID]['possibleFalsePositive'] = False
|
||||
return current
|
||||
|
||||
falseList = ['bot', 'scam', 'spam']
|
||||
if any(word in text.lower() for word in falseList):
|
||||
current.matchSamplesDict[authorID]['possibleFalsePositive'] = True
|
||||
else:
|
||||
current.matchSamplesDict[authorID]['possibleFalsePositive'] = False
|
||||
|
||||
return current
|
||||
|
||||
|
||||
@@ -528,8 +528,9 @@ def add_spam(current, config, miscData, currentCommentDict, videoID, matchReason
|
||||
commentTextRaw = str(currentCommentDict['commentText']) # Use str() to ensure not pointing to same place in memory
|
||||
commentText = str(currentCommentDict['commentText']).replace("\r", "")
|
||||
originalCommentID = currentCommentDict['originalCommentID']
|
||||
timestamp = currentCommentDict['timestamp']
|
||||
|
||||
dictToUse[commentID] = {'text':commentText, 'textUnsanitized':commentTextRaw, 'authorName':authorChannelName, 'authorID':authorChannelID, 'videoID':videoID, 'matchReason':matchReason, 'originalCommentID':originalCommentID}
|
||||
dictToUse[commentID] = {'text':commentText, 'textUnsanitized':commentTextRaw, 'authorName':authorChannelName, 'authorID':authorChannelID, 'videoID':videoID, 'matchReason':matchReason, 'originalCommentID':originalCommentID, 'timestamp':timestamp}
|
||||
current.vidIdDict[commentID] = videoID # Probably remove this later, but still being used for now
|
||||
|
||||
# Count of comments per author
|
||||
@@ -808,6 +809,7 @@ def check_against_filter(current, filtersDict, miscData, config, currentCommentD
|
||||
compiledRegexDict = smartFilter['compiledRegexDict']
|
||||
compiledObfuRegexDict = smartFilter['compiledObfuRegexDict']
|
||||
basicFilterDict = smartFilter['basicFilterDict']
|
||||
preciseRegexDict = smartFilter['preciseRegexDict']
|
||||
numberFilterSet = smartFilter['spammerNumbersSet']
|
||||
compiledNumRegex = smartFilter['compiledNumRegex']
|
||||
minNumbersMatchCount = smartFilter['minNumbersMatchCount']
|
||||
@@ -874,8 +876,13 @@ def check_against_filter(current, filtersDict, miscData, config, currentCommentD
|
||||
# Processed Variables
|
||||
combinedString = authorChannelName + commentText
|
||||
combinedSet = utils.make_char_set(combinedString, stripLettersNumbers=True, stripPunctuation=True)
|
||||
upLowTextSet = set(commentText.replace(miscData.channelOwnerName, ""))
|
||||
#usernameSet = utils.make_char_set(authorChannelName)
|
||||
# UpLow Text Set
|
||||
index = commentText.lower().rfind(miscData.channelOwnerName.lower())
|
||||
if index != -1:
|
||||
processedText = commentText.replace(commentText[index:index+len(miscData.channelOwnerName)], "")
|
||||
else:
|
||||
processedText = commentText
|
||||
upLowTextSet = set(processedText)
|
||||
|
||||
# Run Checks
|
||||
if authorChannelID == parentAuthorChannelID:
|
||||
@@ -893,13 +900,15 @@ def check_against_filter(current, filtersDict, miscData, config, currentCommentD
|
||||
add_spam(current, config, miscData, currentCommentDict, videoID)
|
||||
elif compiledRegexDict['blackAdWords'].search(authorChannelName):
|
||||
add_spam(current, config, miscData, currentCommentDict, videoID)
|
||||
elif compiledRegexDict['textBlackWords'].search(commentText):
|
||||
add_spam(current, config, miscData, currentCommentDict, videoID)
|
||||
elif any(findObf(expressionPair[0], expressionPair[1], commentText) for expressionPair in compiledObfuRegexDict['textObfuBlackWords']):
|
||||
add_spam(current, config, miscData, currentCommentDict, videoID)
|
||||
elif any(word in commentText.lower() for word in basicFilterDict['textExactBlackWords']):
|
||||
elif preciseRegexDict['textExactBlackWords'].search(commentText.lower()):
|
||||
add_spam(current, config, miscData, currentCommentDict, videoID)
|
||||
elif any((word in commentText and not upLowTextSet.intersection(lowAlSet)) for word in basicFilterDict['textUpLowBlackWords']):
|
||||
elif preciseRegexDict['textUpLowBlackWords'].search(commentText) and not upLowTextSet.intersection(lowAlSet):
|
||||
add_spam(current, config, miscData, currentCommentDict, videoID)
|
||||
elif any(findObf(expressionPair[0], expressionPair[1], commentText) for expressionPair in compiledObfuRegexDict['usernameObfuBlackWords']):
|
||||
elif any(findObf(expressionPair[0], expressionPair[1], authorChannelName) for expressionPair in compiledObfuRegexDict['usernameObfuBlackWords']):
|
||||
add_spam(current, config, miscData, currentCommentDict, videoID)
|
||||
elif spamListCombinedRegex.search(combinedString.lower()):
|
||||
add_spam(current, config, miscData, currentCommentDict, videoID)
|
||||
@@ -907,7 +916,7 @@ def check_against_filter(current, filtersDict, miscData, config, currentCommentD
|
||||
add_spam(current, config, miscData, currentCommentDict, videoID)
|
||||
elif sensitive and re.search(smartFilter['usernameConfuseRegex'], authorChannelName):
|
||||
add_spam(current, config, miscData, currentCommentDict, videoID)
|
||||
elif not sensitive and (findObf(smartFilter['usernameConfuseRegex'], list(miscData.channelOwnerName), authorChannelName) or authorChannelName == miscData.channelOwnerName):
|
||||
elif not sensitive and (findObf(smartFilter['usernameConfuseRegex'], miscData.channelOwnerName, authorChannelName) or authorChannelName == miscData.channelOwnerName):
|
||||
add_spam(current, config, miscData, currentCommentDict, videoID)
|
||||
# Multi Criteria Tests
|
||||
else:
|
||||
@@ -959,7 +968,7 @@ def check_against_filter(current, filtersDict, miscData, config, currentCommentD
|
||||
if compiledRegexDict['redAdWords'].search(combinedString):
|
||||
redCount += 1
|
||||
|
||||
if any(word in combinedString.lower() for word in basicFilterDict['exactRedAdWords']):
|
||||
if preciseRegexDict['exactRedAdWords'].search(combinedString.lower()):
|
||||
redCount += 1
|
||||
|
||||
if redAdEmojiSet.intersection(combinedSet):
|
||||
|
||||
@@ -10,7 +10,7 @@ import Scripts.operations as operations
|
||||
import Scripts.files as files
|
||||
import Scripts.filter_variables as filter
|
||||
|
||||
from confusables import confusable_regex, normalize
|
||||
from Scripts.confusablesCustom import confusable_regex, normalize
|
||||
from base64 import b85decode as b64decode
|
||||
import pathlib
|
||||
|
||||
@@ -303,6 +303,13 @@ def prepare_filter_mode_smart(scanMode, config, miscData, sensitive=False):
|
||||
'redAdWords': filter.redAdWordsCompiled,
|
||||
'yellowAdWords': filter.yellowAdWordsCompiled,
|
||||
'usernameRedWords': filter.usernameRedWordsCompiled,
|
||||
'textBlackWords': filter.textBlackWordsCompiled,
|
||||
}
|
||||
|
||||
preciseRegexDict = {
|
||||
'textExactBlackWords': re.compile(filter.textExactBlackWords),
|
||||
'textUpLowBlackWords': re.compile(filter.textUpLowBlackWords),
|
||||
'exactRedAdWords': re.compile(filter.exactRedAdWords),
|
||||
}
|
||||
|
||||
compiledObfuRegexDict = {
|
||||
@@ -311,8 +318,6 @@ def prepare_filter_mode_smart(scanMode, config, miscData, sensitive=False):
|
||||
}
|
||||
|
||||
basicFilterDict = {
|
||||
'textExactBlackWords': filter.textExactBlackWords,
|
||||
'textUpLowBlackWords': filter.textUpLowBlackWords,
|
||||
'usernameRedWords': filter.usernameRedWordsCompiled,
|
||||
'exactRedAdWords': filter.exactRedAdWords,
|
||||
}
|
||||
@@ -347,7 +352,6 @@ def prepare_filter_mode_smart(scanMode, config, miscData, sensitive=False):
|
||||
onlyVideoLinkRegex = re.compile(r"^((?:https?:)?\/\/)?((?:www|m)\.)?((?:youtube\.com|youtu.be))(\/(?:[\w\-]+\?v=|embed\/|v\/)?)([\w\-]+)(\S+)?$")
|
||||
compiledRegexDict['onlyVideoLinkRegex'] = onlyVideoLinkRegex
|
||||
|
||||
|
||||
# Compile Thread Detection Regex
|
||||
nameRegex = re.compile(rf'\b{filter.salutationRegex}\s+([a-zA-Z]+\.?)\s+([a-zA-Z]+)')
|
||||
nakedNameRegex = re.compile(rf'\b{filter.nakedNamePreRegex}\s+([a-zA-Z]+\.?)\s+([a-zA-Z]+)')
|
||||
@@ -427,6 +431,7 @@ def prepare_filter_mode_smart(scanMode, config, miscData, sensitive=False):
|
||||
'compiledRegexDict': compiledRegexDict,
|
||||
'compiledObfuRegexDict': compiledObfuRegexDict,
|
||||
'basicFilterDict': basicFilterDict,
|
||||
'preciseRegexDict': preciseRegexDict,
|
||||
'usernameConfuseRegex': usernameConfuseRegex,
|
||||
'languages': languages,
|
||||
'sensitive': sensitive,
|
||||
|
||||
@@ -36,7 +36,7 @@
|
||||
### IMPORTANT: I OFFER NO WARRANTY OR GUARANTEE FOR THIS SCRIPT. USE AT YOUR OWN RISK.
|
||||
### I tested it on my own and implemented some failsafes as best as I could,
|
||||
### but there could always be some kind of bug. You should inspect the code yourself.
|
||||
version = "2.16.0-Beta2"
|
||||
version = "2.16.0-Beta3"
|
||||
configVersion = 30
|
||||
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#
|
||||
print("Importing Script Modules...")
|
||||
@@ -1321,7 +1321,7 @@ def main():
|
||||
print(f"Defaulting to '{F.YELLOW}False{S.R}'")
|
||||
input("\nPress Enter to continue...")
|
||||
|
||||
### ----------------------------------------------------------------
|
||||
### ----------------------------------------------------------------
|
||||
|
||||
### ------------- Decide whether to ask before deleting -------------
|
||||
# Using config to determine deletion type, block invalid settings
|
||||
|
||||
@@ -3,7 +3,6 @@ google_auth_oauthlib==0.4.6
|
||||
protobuf==3.19.1
|
||||
colorama==0.4.4
|
||||
rtfunicode==2.0
|
||||
confusables==1.2.0
|
||||
certifi>=2021.10.8
|
||||
six>=1.16.0
|
||||
python-Levenshtein>=0.12.2
|
||||
|
||||
Reference in New Issue
Block a user