mirror of
https://github.com/ThioJoe/YT-Spammer-Purge.git
synced 2026-01-09 14:18:06 -05:00
@@ -34,12 +34,13 @@
|
||||
### IMPORTANT: I OFFER NO WARRANTY OR GUARANTEE FOR THIS SCRIPT. USE AT YOUR OWN RISK.
|
||||
### I tested it on my own and implemented some failsafes as best as I could,
|
||||
### but there could always be some kind of bug. You should inspect the code yourself.
|
||||
version = "1.4.1"
|
||||
version = "1.5.0-Testing"
|
||||
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#
|
||||
|
||||
import os
|
||||
from datetime import datetime
|
||||
import traceback
|
||||
import re
|
||||
|
||||
from googleapiclient.errors import HttpError
|
||||
from googleapiclient.discovery import build
|
||||
@@ -94,47 +95,12 @@ def get_authenticated_service():
|
||||
creds.refresh(Request())
|
||||
else:
|
||||
flow = InstalledAppFlow.from_client_secrets_file(CLIENT_SECRETS_FILE, scopes=YOUTUBE_READ_WRITE_SSL_SCOPE)
|
||||
creds = flow.run_local_server(port=0)
|
||||
# Save the credentials for the next run
|
||||
creds = flow.run_local_server(port=0, authorization_prompt_message="Log in using the browser window.")
|
||||
# Save the credentials for the next run
|
||||
with open(TOKEN_FILE_NAME, 'w') as token:
|
||||
token.write(creds.to_json())
|
||||
return build(API_SERVICE_NAME, API_VERSION, credentials=creds, discoveryServiceUrl=DISCOVERY_SERVICE_URL)
|
||||
|
||||
|
||||
|
||||
##########################################################################################
|
||||
##################################### GET REPLIES ########################################
|
||||
##########################################################################################
|
||||
|
||||
# Call the API's comments.list method to list the existing comment replies.
|
||||
def get_replies(parent_id, video_id):
|
||||
global spamCommentsID
|
||||
global scannedRepliesCount
|
||||
|
||||
results = youtube.comments().list(
|
||||
part="snippet",
|
||||
parentId=parent_id,
|
||||
maxResults=100, # 100 is the max per page, but multiple pages will be scanned
|
||||
#fields="items/snippet/authorDisplayName,items/snippet/authorChannelId/value,items/snippet/textDisplay,items/id", # If want to get author name and comment text
|
||||
fields="items/snippet/authorChannelId/value,items/id",
|
||||
textFormat="plainText"
|
||||
).execute()
|
||||
|
||||
# Iterates through items in results
|
||||
for item in results["items"]:
|
||||
authorChannelID = item["snippet"]["authorChannelId"]["value"]
|
||||
replyID = item["id"]
|
||||
scannedRepliesCount += 1 # Count number of comment threads scanned, add to global count
|
||||
|
||||
# If the comment is from the spammer channel, add to list of spam comment IDs
|
||||
# Also add key-value pair of comment ID and video ID to dictionary
|
||||
if any(authorChannelID == x for x in spammer_channel_id):
|
||||
spamCommentsID += [replyID]
|
||||
vidIdDict[replyID] = video_id
|
||||
|
||||
print_count_stats(final=False) # Prints out current count stats
|
||||
|
||||
return True
|
||||
|
||||
##########################################################################################
|
||||
############################### PRINT SPECIFIC COMMENTS ##################################
|
||||
@@ -200,14 +166,21 @@ def print_prepared_comments(check_video_id_localprep, comments, j, logMode):
|
||||
##########################################################################################
|
||||
|
||||
# Call the API's commentThreads.list method to list the existing comments.
|
||||
def get_comments(youtube, check_video_id=None, check_channel_id=None, nextPageToken=None): # None are set as default if no parameters passed into function
|
||||
global scannedThreadsCount
|
||||
def get_comments(youtube, filterMode, check_video_id=None, check_channel_id=None, nextPageToken=None, inputtedSpammerChannelID=None, inputtedUsernameFilter=None, inputtedCommentTextFilter=None, regexPattern=None): # None are set as default if no parameters passed into function
|
||||
global scannedCommentsCount
|
||||
global spamCommentsID
|
||||
#fieldsToFetch = "nextPageToken,items/id,items/snippet/topLevelComment/id,items/snippet/totalReplyCount,items/snippet/topLevelComment/snippet/authorDisplayName,items/snippet/topLevelComment/snippet/authorChannelId/value,items/snippet/topLevelComment/snippet/textDisplay,items/snippet/topLevelComment/snippet/videoId"
|
||||
fieldsToFetch = "nextPageToken,items/snippet/topLevelComment/id,items/snippet/totalReplyCount,items/snippet/topLevelComment/snippet/authorChannelId/value,items/snippet/topLevelComment/snippet/videoId"
|
||||
# Initialize some variables
|
||||
authorChannelName = None
|
||||
commentText = None
|
||||
|
||||
# Gets comment threads for a specific video
|
||||
if filterMode == 1: # User entered spammer IDs -- Get Extra Info: None
|
||||
fieldsToFetch = "nextPageToken,items/snippet/topLevelComment/id,items/snippet/totalReplyCount,items/snippet/topLevelComment/snippet/videoId,items/snippet/topLevelComment/snippet/authorChannelId/value"
|
||||
if filterMode == 2 or filterMode == 4: # Filter char by Username / Auto Regex non-ascii username -- Get Extra Info: Author Display Name
|
||||
fieldsToFetch = "nextPageToken,items/snippet/topLevelComment/id,items/snippet/totalReplyCount,items/snippet/topLevelComment/snippet/videoId,items/snippet/topLevelComment/snippet/authorChannelId/value,items/snippet/topLevelComment/snippet/authorDisplayName"
|
||||
if filterMode == 3: # Filter char by Comment text -- Get Extra Info: Comment Text
|
||||
fieldsToFetch = "nextPageToken,items/snippet/topLevelComment/id,items/snippet/totalReplyCount,items/snippet/topLevelComment/snippet/videoId,items/snippet/topLevelComment/snippet/authorChannelId/value,items/snippet/topLevelComment/snippet/textDisplay"
|
||||
|
||||
|
||||
# Gets all comment threads for a specific video
|
||||
if check_video_id is not None:
|
||||
results = youtube.commentThreads().list(
|
||||
part="snippet",
|
||||
@@ -218,8 +191,8 @@ def get_comments(youtube, check_video_id=None, check_channel_id=None, nextPageTo
|
||||
textFormat="plainText"
|
||||
).execute()
|
||||
|
||||
# Get comment threads across the whole channel
|
||||
if check_video_id is None:
|
||||
# Get all comment threads across the whole channel
|
||||
elif check_video_id is None:
|
||||
results = youtube.commentThreads().list(
|
||||
part="snippet",
|
||||
allThreadsRelatedToChannelId=check_channel_id,
|
||||
@@ -235,35 +208,134 @@ def get_comments(youtube, check_video_id=None, check_channel_id=None, nextPageTo
|
||||
except KeyError:
|
||||
RetrievedNextPageToken = "End"
|
||||
|
||||
# After getting comments threads for page, goes through each thread and gets replies
|
||||
# After getting all comments threads for page, extracts data for each and stores matches in spamCommentsID
|
||||
# Also goes through each thread and execuites get_replies() to get reply content and matches
|
||||
for item in results["items"]:
|
||||
comment = item["snippet"]["topLevelComment"]
|
||||
#text = comment["snippet"]["textDisplay"] # If need to retrieve comment text
|
||||
videoID = comment["snippet"]["videoId"] # Only enable if NOT checking specific video
|
||||
parent_id = item["snippet"]["topLevelComment"]["id"]
|
||||
numReplies = item["snippet"]["totalReplyCount"]
|
||||
|
||||
# Need to be able to catch exceptions because sometimes the API will return a comment from non-existent / deleted channel
|
||||
try:
|
||||
authorChannelID = item["snippet"]["topLevelComment"]["snippet"]["authorChannelId"]["value"]
|
||||
#author = comment["snippet"]["authorDisplayName"] # If need to retrieve author name
|
||||
try:
|
||||
authorChannelID = comment["snippet"]["authorChannelId"]["value"]
|
||||
except KeyError:
|
||||
authorChannelID = "[Deleted Channel]"
|
||||
|
||||
# Need to be able to catch exceptions because sometimes the API will return a comment from non-existent / deleted channel
|
||||
# Need individual tries because not all are fetched for each mode
|
||||
if filterMode == 2 or filterMode == 4:
|
||||
try:
|
||||
authorChannelName = comment["snippet"]["authorDisplayName"]
|
||||
except KeyError:
|
||||
authorChannelName = "[Deleted Channel]"
|
||||
if filterMode == 3:
|
||||
try:
|
||||
commentText = comment["snippet"]["textDisplay"]
|
||||
except KeyError:
|
||||
commentText = "[Deleted/Missing Comment]"
|
||||
|
||||
# Runs check against comment info for whichever filter data is relevant
|
||||
check_against_filter(filterMode, parent_id, videoID, inputtedSpammerChannelID, inputtedUsernameFilter, inputtedCommentTextFilter, authorChannelID, authorChannelName, commentText, regexPattern)
|
||||
scannedCommentsCount += 1 # Counts number of comments scanned, add to global count
|
||||
|
||||
if any(authorChannelID == x for x in spammer_channel_id):
|
||||
spamCommentsID += [parent_id]
|
||||
vidIdDict[parent_id] = videoID
|
||||
|
||||
if numReplies > 0:
|
||||
get_replies(parent_id=parent_id, video_id=videoID)
|
||||
scannedThreadsCount += 1 # Counts number of comment threads with at least one reply, adds to counter
|
||||
get_replies(filterMode, parent_id, videoID, inputtedSpammerChannelID, inputtedUsernameFilter, inputtedCommentTextFilter, regexPattern)
|
||||
else:
|
||||
print_count_stats(final=False) # Updates displayed stats if no replies
|
||||
|
||||
return RetrievedNextPageToken
|
||||
|
||||
|
||||
##########################################################################################
|
||||
##################################### GET REPLIES ########################################
|
||||
##########################################################################################
|
||||
|
||||
# Call the API's comments.list method to list the existing comment replies.
|
||||
def get_replies(filterMode, parent_id, videoID, inputtedSpammerChannelID=None, inputtedUsernameFilter=None, inputtedCommentTextFilter=None, regexPattern=None):
|
||||
global scannedRepliesCount
|
||||
# Initialize some variables
|
||||
authorChannelName = None
|
||||
commentText = None
|
||||
|
||||
if filterMode == 1: # User entered spammer IDs -- Get Extra Info: None
|
||||
fieldsToFetch = "items/snippet/authorChannelId/value,items/id"
|
||||
elif filterMode == 2 or filterMode == 4: # Filter by Username -- Get Extra Info: Author Display Name
|
||||
fieldsToFetch = "items/snippet/authorChannelId/value,items/id,items/snippet/authorDisplayName"
|
||||
elif filterMode == 3: # Filter by comment text -- Get Extra Info: Comment Text
|
||||
fieldsToFetch = "items/snippet/authorChannelId/value,items/id,items/snippet/textDisplay"
|
||||
|
||||
results = youtube.comments().list(
|
||||
part="snippet",
|
||||
parentId=parent_id,
|
||||
maxResults=100, # 100 is the max per page, but multiple pages will be scanned
|
||||
fields=fieldsToFetch,
|
||||
textFormat="plainText"
|
||||
).execute()
|
||||
|
||||
# Iterates through items in results
|
||||
# Need to be able to catch exceptions because sometimes the API will return a comment from non-existent / deleted channel
|
||||
# Need individual tries because not all are fetched for each mode
|
||||
for item in results["items"]:
|
||||
replyID = item["id"]
|
||||
try:
|
||||
authorChannelID = item["snippet"]["authorChannelId"]["value"]
|
||||
except KeyError:
|
||||
authorChannelID = "[Deleted Channel]"
|
||||
|
||||
if filterMode == 2 or filterMode == 4:
|
||||
try:
|
||||
authorChannelName = item["snippet"]["authorDisplayName"]
|
||||
except KeyError:
|
||||
authorChannelName = "[Deleted Channel]"
|
||||
|
||||
if filterMode == 3:
|
||||
try:
|
||||
commentText = item["snippet"]["textDisplay"]
|
||||
except KeyError:
|
||||
commentText = "[Deleted/Missing Comment]"
|
||||
|
||||
# Runs check against comment info for whichever filter data is relevant
|
||||
check_against_filter(filterMode, replyID, videoID, inputtedSpammerChannelID, inputtedUsernameFilter, inputtedCommentTextFilter, authorChannelID, authorChannelName, commentText, regexPattern)
|
||||
|
||||
# Update latest stats
|
||||
scannedRepliesCount += 1 # Count number of replies scanned, add to global count
|
||||
print_count_stats(final=False) # Prints out current count stats
|
||||
|
||||
return True
|
||||
|
||||
############################## CHECK AGAINST FILTER ######################################
|
||||
# The basic logic that actually checks each comment against filter criteria
|
||||
def check_against_filter(filterMode, commentID, videoID, inputtedSpammerChannelID=None, inputtedUsernameFilter=None, inputtedCommentTextFilter=None, authorChannelID=None, authorChannelName=None, commentText=None, regexPattern=None):
|
||||
global vidIdDict
|
||||
global spamCommentsID
|
||||
# If the comment matches criteria based on mode, add to list of spam comment IDs
|
||||
# Also add key-value pair of comment ID and video ID to dictionary
|
||||
|
||||
# Checks author of either parent comment or reply (both passed in as commentID) against channel ID inputted by user
|
||||
if filterMode == 1:
|
||||
if any(authorChannelID == x for x in inputtedSpammerChannelID):
|
||||
spamCommentsID += [commentID]
|
||||
vidIdDict[commentID] = videoID
|
||||
|
||||
# Check if author channel name contains any characters entered by user
|
||||
elif filterMode == 2:
|
||||
authorChannelName = make_char_set(str(authorChannelName))
|
||||
if any(x in inputtedUsernameFilter for x in authorChannelName):
|
||||
spamCommentsID += [commentID]
|
||||
vidIdDict[commentID] = videoID
|
||||
|
||||
# Check if comment text contains any characters entered by user
|
||||
elif filterMode == 3:
|
||||
commentText = make_char_set(str(commentText))
|
||||
if any(x in inputtedCommentTextFilter for x in commentText):
|
||||
spamCommentsID += [commentID]
|
||||
vidIdDict[commentID] = videoID
|
||||
|
||||
# Check if author name contains non-ascii characters with Regex, sensitivity based on user selection
|
||||
elif filterMode == 4:
|
||||
if re.search(str(regexPattern), authorChannelName):
|
||||
spamCommentsID += [commentID]
|
||||
vidIdDict[commentID] = videoID
|
||||
|
||||
##########################################################################################
|
||||
################################ DELETE COMMENTS #########################################
|
||||
##########################################################################################
|
||||
@@ -275,23 +347,33 @@ def delete_found_comments(commentsDictionary,banChoice):
|
||||
def delete(commentIDs):
|
||||
youtube.comments().setModerationStatus(id=commentIDs, moderationStatus="rejected", banAuthor=banChoice).execute()
|
||||
|
||||
print("Deleting Comments. Please Wait...")
|
||||
commentsList = list(commentsDictionary.keys()) # Takes comment IDs out of dictionary and into list
|
||||
if len(commentsList) > 50:
|
||||
remainder = len(commentsList) % 50
|
||||
numDivisions = int((len(commentsList)-remainder)/50)
|
||||
for i in range(numDivisions):
|
||||
commentsList = list(commentsDictionary.keys()) # Takes comment IDs out of dictionary and into list
|
||||
total = len(commentsList)
|
||||
deletedCounter = 0
|
||||
def print_progress(d, t): print("Deleting Comments... - Progress: [" + str(d) + " / " + str(t) + "] (In Groups of 50)", end="\r") # Prints progress of deletion
|
||||
print_progress(deletedCounter, total)
|
||||
|
||||
if total > 50: # If more than 50 comments, break into chunks of 50
|
||||
remainder = total % 50 # Gets how many left over after dividing into chunks of 50
|
||||
numDivisions = int((total-remainder)/50) # Gets how many full chunks of 50 there are
|
||||
for i in range(numDivisions): # Loops through each full chunk of 50
|
||||
delete(commentsList[i*50:i*50+50])
|
||||
deletedCounter += 50
|
||||
print_progress(deletedCounter, total)
|
||||
if remainder > 0:
|
||||
delete(commentsList[numDivisions*50:len(commentsList)])
|
||||
delete(commentsList[numDivisions*50:total]) # Deletes any leftover comments range after last full chunk
|
||||
deletedCounter += remainder
|
||||
print_progress(deletedCounter, total)
|
||||
else:
|
||||
delete(commentsList)
|
||||
print("Comments Deleted! Will now verify each is gone.\n")
|
||||
print_progress(deletedCounter, total)
|
||||
print("Comments Deleted! Will now verify each is gone. \n")
|
||||
|
||||
# Takes in dictionary of comment IDs and video IDs, and checks if comments still exist individually
|
||||
def check_deleted_comments(commentsDictionary):
|
||||
i = 0 # Count number of remaining comments
|
||||
j = 1 # Count number of checked
|
||||
total = len(commentsDictionary)
|
||||
for key, value in commentsDictionary.items():
|
||||
results = youtube.comments().list(
|
||||
part="snippet",
|
||||
@@ -300,7 +382,7 @@ def check_deleted_comments(commentsDictionary):
|
||||
fields="items",
|
||||
textFormat="plainText"
|
||||
).execute()
|
||||
print("Verifying Comments Deleted..." + "."*j, end="\r")
|
||||
print("Verifying Deleted Comments: [" + str(j) + " / " + str(total) + "]", end="\r")
|
||||
j += 1
|
||||
|
||||
if results["items"]: # Check if the items result is empty
|
||||
@@ -403,9 +485,9 @@ def convert_comment_id_to_video_id(comment_id):
|
||||
# Prints Scanning Statistics, can be version that overwrites itself or one that finalizes and moves to next line
|
||||
def print_count_stats(final):
|
||||
if final == True:
|
||||
print("Top Level Comments Scanned: " + str(scannedCommentsCount) + " | Replies Scanned: " + str(scannedRepliesCount) + " | Spam Found So Far: " + str(len(spamCommentsID)) + "\n")
|
||||
print("Top Level Comments Scanned: " + str(scannedCommentsCount) + " | Replies Scanned: " + str(scannedRepliesCount) + " | Matches Found So Far: " + str(len(spamCommentsID)) + "\n")
|
||||
else:
|
||||
print("Top Level Comments Scanned: " + str(scannedCommentsCount) + " | Replies Scanned: " + str(scannedRepliesCount) + " | Spam Found So Far: " + str(len(spamCommentsID)), end = "\r")
|
||||
print("Top Level Comments Scanned: " + str(scannedCommentsCount) + " | Replies Scanned: " + str(scannedRepliesCount) + " | Matches Found So Far: " + str(len(spamCommentsID)), end = "\r")
|
||||
|
||||
return None
|
||||
|
||||
@@ -530,7 +612,8 @@ def process_spammer_ids(rawString):
|
||||
# Remove whitespace from each list item
|
||||
for i in range(len(inputList)):
|
||||
inputList[i] = inputList[i].strip()
|
||||
IDList = list(inputList) # Need to use list() so each list is separately affected, otherwise same pointer
|
||||
inputList = list(filter(None, inputList)) # Remove empty strings from list
|
||||
IDList = list(inputList) # Need to use list() instead of just setting equal so each list is separately affected, otherwise same pointer
|
||||
|
||||
# Validate each ID in list
|
||||
for i in range(len(inputList)):
|
||||
@@ -548,6 +631,191 @@ def open_log_file(name):
|
||||
logFile = open(name, "a", encoding="utf-8") # Opens log file in write mode
|
||||
|
||||
|
||||
######################### Convert string to set of characters#########################
|
||||
def make_char_set(stringInput, stripLettersNumbers=False, stripKeyboardSpecialChars=False, stripPunctuation=False):
|
||||
# Optional lists of characters to strip from string
|
||||
punctuationChars = ("!?\".,;:'-/()")
|
||||
numbersLettersChars = ("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
|
||||
keyboardSpecialChars = ("!@#$%^&*()_+-=[]\{\}|;':,./<>?`~")
|
||||
|
||||
listedInput = list(stringInput)
|
||||
for i in range(len(listedInput)):
|
||||
listedInput[i] = listedInput[i].strip()
|
||||
if stripLettersNumbers == True:
|
||||
listedInput[i] = listedInput[i].strip(numbersLettersChars)
|
||||
if stripKeyboardSpecialChars == True:
|
||||
listedInput[i] = listedInput[i].strip(keyboardSpecialChars)
|
||||
if stripPunctuation == True:
|
||||
listedInput[i] = listedInput[i].strip(punctuationChars)
|
||||
listedInput[i] = listedInput[i].strip('\ufe0f') # Strips invisible varation selector for emojis
|
||||
listedInput = list(filter(None, listedInput))
|
||||
|
||||
|
||||
return listedInput
|
||||
|
||||
|
||||
##########################################################################################
|
||||
################################## FILTERING MODES #######################################
|
||||
##########################################################################################
|
||||
|
||||
# 1
|
||||
# For if user chooses filter mode 1, to enter channel ID/Link
|
||||
# Returns new deletionEnabled value, and inputtedSpammerChannelID
|
||||
def prepare_filter_mode_ID(currentUser, deletionEnabledLocal, scanMode):
|
||||
currentUserID = currentUser[0]
|
||||
# Filter scanMode 1: User inputs spammer channel ID(s) or link(s)
|
||||
# User inputs channel ID of the spammer, while loop repeats until valid input
|
||||
processResult = (False, None) #Tuple, first element is status of validity of channel ID, second element is channel ID
|
||||
while processResult[0] == False:
|
||||
inputtedSpammerChannelID = input("Enter the Channel link(s) or ID(s) of the spammer (comma separated): ")
|
||||
processResult = process_spammer_ids(inputtedSpammerChannelID)
|
||||
if processResult[0] == True:
|
||||
inputtedSpammerChannelID = processResult[1] # After processing, if valid, inputtedSpammerChannelID is a list of channel IDs
|
||||
print("\n")
|
||||
|
||||
# Check if spammer ID and user's channel ID are the same, and warn
|
||||
# If using channel-wide scanning mode, program will not run for safety purposes
|
||||
if any(currentUserID == i for i in inputtedSpammerChannelID) and scanMode == 2:
|
||||
print("WARNING - You are scanning for your own channel ID!")
|
||||
print("For safety purposes, this program's delete functionality is disabled when scanning for yourself across your entire channel.")
|
||||
print("If you want to delete your own comments for testing purposes, you can instead scan an individual video.")
|
||||
confirmation = choice("Continue?")
|
||||
if confirmation == False:
|
||||
input("Ok, Cancelled. Press Enter to Exit...")
|
||||
exit()
|
||||
elif any(currentUserID == i for i in inputtedSpammerChannelID) and scanMode == 1:
|
||||
print("WARNING: You are scanning for your own channel ID! This would delete all of your comments on the video!")
|
||||
print(" (You WILL still be asked to confirm before actually deleting anything)")
|
||||
print("If you are testing and want to scan and/or delete your own comments, enter 'Y' to continue, otherwise enter 'N' to exit.")
|
||||
confirmation = choice("Continue?")
|
||||
if confirmation == True: # After confirmation, deletion functionality is eligible to be enabled later
|
||||
deletionEnabledLocal = "HalfTrue"
|
||||
elif confirmation == False:
|
||||
input("Ok, Cancelled. Press Enter to Exit...")
|
||||
exit()
|
||||
else:
|
||||
deletionEnabledLocal = "HalfTrue" # If no matching problem found, deletion functionality is eligible to be enabled later
|
||||
|
||||
return deletionEnabledLocal, inputtedSpammerChannelID
|
||||
|
||||
# 2
|
||||
# For Filter mode 2, user inputs characters in username to filter
|
||||
def prepare_filter_mode_username(currentUser, deletionEnabledLocal, scanMode):
|
||||
# Create set of characters from users's channel name
|
||||
currentUserName = currentUser[1]
|
||||
channelChars = make_char_set(currentUserName) # Converts channel name to set of characters to compare with entered filter characters
|
||||
|
||||
print("\nInput ONLY any special characters / emojis you want to search for in usernames. Do not include commas or spaces!")
|
||||
print("Note: Letters and numbers will not be included for safety purposes, even if you enter them.")
|
||||
print("Example: 👋🔥✔️✨")
|
||||
validEntry = False
|
||||
while validEntry == False:
|
||||
inputChars = input("Input the characters to search (no commas or spaces): ")
|
||||
inputChars = make_char_set(inputChars, stripLettersNumbers=True, stripKeyboardSpecialChars=False, stripPunctuation=False)
|
||||
|
||||
if any(x in inputChars for x in channelChars):
|
||||
print("WARNING! Character(s) you entered are within your own username, ' " + currentUserName + " '! : " + str(inputChars & channelChars))
|
||||
if scanMode == 1:
|
||||
print("Are you SURE you want to search your own comments? (You WILL still get a confirmation before deleting)")
|
||||
if choice("Choose") == True:
|
||||
deletionEnabledLocal = "HalfTrue"
|
||||
validEntry = True
|
||||
elif scanMode == 2:
|
||||
print("For safety purposes, this program's delete functionality is disabled when scanning for yourself across your entire channel.")
|
||||
print("Choose 'N' to choose different characters. Choose 'Y' to continue (But you will get an error when trying to delete!)\n")
|
||||
if choice("Continue?") == True:
|
||||
validEntry = True
|
||||
else:
|
||||
print("Usernames will be scanned for ANY of these individual characters: " + str(inputChars))
|
||||
if choice("Begin scanning? ") == True:
|
||||
validEntry = True
|
||||
deletionEnabledLocal = "HalfTrue"
|
||||
inputtedCommentTextFilter = inputChars
|
||||
|
||||
return deletionEnabledLocal, inputtedCommentTextFilter
|
||||
|
||||
# 3
|
||||
# For Filter mode 3, user inputs characters in comment text to filter
|
||||
def prepare_filter_mode_comment_text(currentUser, deletionEnabledLocal, scanMode):
|
||||
print("\nInput ONLY any special characters / emojis you want to search for in all comments. Do not include commas or spaces!")
|
||||
print("Note: Letters, numbers, and punctuation will not be included for safety purposes, even if you enter them.")
|
||||
print("Example: 👋🔥✔️✨")
|
||||
validEntry = False
|
||||
while validEntry == False:
|
||||
inputChars = input("Input the characters to search (no commas or spaces): ")
|
||||
inputChars = make_char_set(inputChars, stripLettersNumbers=True, stripKeyboardSpecialChars=False, stripPunctuation=True)
|
||||
|
||||
print("Comment text will be scanned for ANY of these individual characters: " + str(inputChars))
|
||||
if choice("Begin scanning? ") == True:
|
||||
validEntry = True
|
||||
deletionEnabledLocal = "HalfTrue"
|
||||
inputtedCommentTextFilter = inputChars
|
||||
|
||||
return deletionEnabledLocal, inputtedCommentTextFilter
|
||||
|
||||
|
||||
# 4
|
||||
# For Filter mode 4, user inputs nothing, program scans for non-ascii
|
||||
def prepare_filter_mode_non_ascii(currentUser, deletionEnabledLocal, scanMode):
|
||||
print("\n-------------------------------------------------------")
|
||||
print("~~~ This mode automatically searches for usernames that contain special characters (aka not letters/numbers) ~~~\n")
|
||||
print("Choose the sensitivity level of the filter. You will be shown examples after you choose.")
|
||||
print(" 1. Allow Extended ASCII: Filter rare unicode & Emojis only")
|
||||
print(" 2. Allow Standard ASCII only: Also filter semi-common foreign characters")
|
||||
print(" 3. NUKE Mode (╯°□°)╯︵ ┻━┻: Allow ONLY numbers, letters, and spaces")
|
||||
print("")
|
||||
# Get user input for mode selection,
|
||||
confirmation = False
|
||||
while confirmation == False:
|
||||
selection = input("Choose Mode: ")
|
||||
try: selection = int(selection) # If not number entered, will get caught later as invalid
|
||||
except: pass
|
||||
|
||||
if selection == 1:
|
||||
print("Filters/Deletes usernames with emojis, unicode symbols, and rare foreign characters such as: ✔️ ☝️ 🡆 ▲ π Ɲ Œ")
|
||||
if choice("Choose this mode?") == True:
|
||||
regexPattern = r"[^\x00-\xFF]"
|
||||
confirmation = True
|
||||
elif selection == 2:
|
||||
print("Filters/Deletes usernames with anything EXCEPT the following: Letters, numbers, punctuation, and special characters you can usually type with your keyboard like: % * & () + ")
|
||||
if choice("Choose this mode?") == True:
|
||||
regexPattern = r"[^\x00-\x7F]"
|
||||
confirmation = True
|
||||
elif selection == 3:
|
||||
print("Filters/Deletes usernames with anything EXCEPT letters, numbers, and spaces -- Likely to cause collateral damage!")
|
||||
if choice("Choose this mode?") == True:
|
||||
regexPattern = r"[^a-zA-Z0-9 ]"
|
||||
confirmation = True
|
||||
else:
|
||||
print("Invalid input. Please try again.")
|
||||
|
||||
if re.search(regexPattern, currentUser[1]):
|
||||
confirmation = False
|
||||
print("!! WARNING !! This search mode would detect your own username!")
|
||||
if scanMode == 1:
|
||||
if choice("Are you REALLY sure you want to use this filter sensitivity?") == True:
|
||||
deletionEnabledLocal = "HalfTrue"
|
||||
confirmation = True
|
||||
elif scanMode == 2:
|
||||
print("For safety purposes, this program's delete functionality is disabled when scanning for yourself across your entire channel.")
|
||||
print("Choose 'N' to choose a different filter sensitivity. Choose 'Y' to continue (But you will get an error when trying to delete!)\n")
|
||||
if choice("Continue?") == True:
|
||||
confirmation = True
|
||||
|
||||
if selection == 1:
|
||||
autoModeName = "Allow Extended ASCII"
|
||||
elif selection == 2:
|
||||
autoModeName = "Allow Standard ASCII only"
|
||||
elif selection == 3:
|
||||
autoModeName = "NUKE Mode (╯°□°)╯︵ ┻━┻ - Allow only letters, numbers, and spaces"
|
||||
|
||||
if confirmation == True:
|
||||
deletionEnabledLocal = "HalfTrue"
|
||||
return deletionEnabledLocal, regexPattern, autoModeName
|
||||
else:
|
||||
input("How did you get here? Something very strange went wrong. Press Enter to Exit...")
|
||||
exit()
|
||||
|
||||
##########################################################################################
|
||||
##########################################################################################
|
||||
###################################### MAIN ##############################################
|
||||
@@ -557,20 +825,17 @@ def open_log_file(name):
|
||||
def main():
|
||||
# Declare Global Variables
|
||||
global youtube
|
||||
global spammer_channel_id
|
||||
global spamCommentsID
|
||||
global vidIdDict
|
||||
global scannedThreadsCount
|
||||
global scannedRepliesCount
|
||||
global scannedCommentsCount
|
||||
|
||||
# Default values for global variables
|
||||
# Spammer_channel_id doesn't need to be initialized because gets assigned in this function
|
||||
spamCommentsID = []
|
||||
vidIdDict = {}
|
||||
scannedThreadsCount = 0
|
||||
scannedRepliesCount = 0
|
||||
scannedCommentsCount = 0
|
||||
regexPattern = ""
|
||||
|
||||
# Declare Default Variables
|
||||
maxScanNumber = 999999999
|
||||
@@ -623,94 +888,105 @@ def main():
|
||||
print(" 1. Scan Single Video")
|
||||
print(" 2. Scan Entire Channel")
|
||||
|
||||
# Make sure input is valid, if not ask again
|
||||
validMode = False
|
||||
while validMode == False:
|
||||
mode = str(input("Choice: "))
|
||||
# If chooses to scan single video - Validate Video ID, get title, and confirm with user - otherwise exits
|
||||
if mode == "1":
|
||||
validMode = True
|
||||
|
||||
#While loop to get video ID and if invalid ask again
|
||||
validVideoID = (False, None) # Tuple, first element is status of validity of video ID, second element is video ID
|
||||
confirm = False
|
||||
while validVideoID[0] == False or confirm == False:
|
||||
check_video_id = input("Enter Video link or ID to scan: ")
|
||||
validVideoID = validate_video_id(check_video_id) # Sends link or video ID for isolation and validation
|
||||
|
||||
if validVideoID[0] == True: #validVideoID now contains True/False and video ID
|
||||
check_video_id = str(validVideoID[1])
|
||||
title = get_video_title(check_video_id)
|
||||
print("Chosen Video: " + title)
|
||||
confirm = choice("Is this correct?")
|
||||
if currentUser[0] != get_channel_id(check_video_id):
|
||||
print("\n >>> WARNING It is not possible to delete comments on someone elses video! Who do you think you are!? <<<")
|
||||
input("\n Press Enter to continue for testing purposes... (But you will get an error when trying to delete!)\n")
|
||||
scanMode = input("Choice (1 or 2): ")
|
||||
try: scanMode = int(scanMode) # If not number entered, will get caught later as invalid
|
||||
except: pass
|
||||
|
||||
# If chooses to scan entire channel - Validate Channel ID
|
||||
elif mode == "2":
|
||||
if scanMode == 1 or scanMode == 2:
|
||||
validMode = True
|
||||
# While loop to get max scan number, not an integer, asks again
|
||||
validInteger = False
|
||||
while validInteger == False:
|
||||
try:
|
||||
maxScanNumber = int(input("Enter the maximum number of comments to scan: "))
|
||||
if maxScanNumber > 0:
|
||||
validInteger = True # If it gets here, it's an integer, otherwise goes to exception
|
||||
else:
|
||||
print("\nInvalid Input! Number must be greater than zero.")
|
||||
except:
|
||||
print("\nInvalid Input! - Must be a whole number.")
|
||||
|
||||
else:
|
||||
print("\nInvalid choice! - Enter either 1 or 2. ")
|
||||
|
||||
# User inputs channel ID of the spammer, while loop repeats until valid input
|
||||
processResult = (False, None) #Tuple, first element is status of validity of channel ID, second element is channel ID
|
||||
while processResult[0] == False:
|
||||
spammer_channel_id = input("Enter the Channel link(s) or ID(s) of the spammer (comma separated): ")
|
||||
processResult = process_spammer_ids(spammer_channel_id)
|
||||
if processResult[0] == True:
|
||||
spammer_channel_id = processResult[1] # After processing, if valid, spammer_channel_id is a list of channel IDs
|
||||
print("\n")
|
||||
# If chooses to scan single video - Validate Video ID, get title, and confirm with user - otherwise exits
|
||||
if scanMode == 1:
|
||||
#While loop to get video ID and if invalid ask again
|
||||
validVideoID = (False, None) # Tuple, first element is status of validity of video ID, second element is video ID
|
||||
confirm = False
|
||||
while validVideoID[0] == False or confirm == False:
|
||||
check_video_id = input("Enter Video link or ID to scan: ")
|
||||
validVideoID = validate_video_id(check_video_id) # Sends link or video ID for isolation and validation
|
||||
|
||||
if validVideoID[0] == True: #validVideoID now contains True/False and video ID
|
||||
check_video_id = str(validVideoID[1])
|
||||
title = get_video_title(check_video_id)
|
||||
print("\nChosen Video: " + title)
|
||||
confirm = choice("Is this correct?")
|
||||
if currentUser[0] != get_channel_id(check_video_id) and confirm == True:
|
||||
print("\n >>> WARNING It is not possible to delete comments on someone elses video! Who do you think you are!? <<<")
|
||||
input("\n Press Enter to continue for testing purposes... (But you will get an error when trying to delete!)\n")
|
||||
|
||||
# Check if spammer ID and user's channel ID are the same, and warn
|
||||
# If using channel-wide scanning mode, program will not run for safety purposes
|
||||
if any(currentUser[0] == i for i in spammer_channel_id) and mode == "2":
|
||||
print("WARNING - You are scanning for your own channel ID!")
|
||||
print("For safety purposes, this program's delete functionality is disabled when scanning for yourself across your entire channel (Mode 2).")
|
||||
print("If you want to delete your own comments for testing purposes, you can instead scan an individual video (Mode 1).")
|
||||
confirmation = choice("Continue?")
|
||||
if confirmation == False:
|
||||
input("Ok, Cancelled. Press Enter to Exit...")
|
||||
exit()
|
||||
elif any(currentUser[0] == i for i in spammer_channel_id) and mode == "1":
|
||||
print("WARNING: You are scanning for your own channel ID! This would delete all of your comments on the video!")
|
||||
print(" (You WILL still be asked to confirm before actually deleting anything)")
|
||||
print("If you are testing and want to scan and/or delete your own comments, enter 'Y' to continue, otherwise enter 'N' to exit.")
|
||||
confirmation = choice("Continue?")
|
||||
if confirmation == True: # After confirmation, deletion functionality is eligible to be enabled later
|
||||
deletionEnabled = "HalfTrue"
|
||||
elif confirmation == False:
|
||||
input("Ok, Cancelled. Press Enter to Exit...")
|
||||
exit()
|
||||
else:
|
||||
deletionEnabled = "HalfTrue" # If no matching problem found, deletion functionality is eligible to be enabled later
|
||||
# If chooses to scan entire channel - Validate Channel ID
|
||||
elif scanMode == 2:
|
||||
# While loop to get max scan number, not an integer, asks again
|
||||
validInteger = False
|
||||
while validInteger == False:
|
||||
try:
|
||||
maxScanNumber = int(input("Enter the maximum number of comments to scan: "))
|
||||
if maxScanNumber > 0:
|
||||
validInteger = True # If it gets here, it's an integer, otherwise goes to exception
|
||||
else:
|
||||
print("\nInvalid Input! Number must be greater than zero.")
|
||||
except:
|
||||
print("\nInvalid Input! - Must be a whole number.")
|
||||
|
||||
|
||||
# User inputs filtering mode
|
||||
print("\n-------------------------------------------------------")
|
||||
print("~~~~~~~ Choose how to identify spammers ~~~~~~~")
|
||||
print(" 1. Enter Spammer's channel ID(s) or link(s)")
|
||||
print(" 2. Scan usernames for certain individual characters you choose")
|
||||
print(" 3. Scan comment text for certain individual characters you choose")
|
||||
print(" 4. Auto Mode: Scan usernames for ANY non-ASCII special characters (May cause collateral damage!)")
|
||||
|
||||
# Make sure input is valid, if not ask again
|
||||
validFilterMode = False
|
||||
while validFilterMode == False:
|
||||
filterMode = input("\nChoice (1-4): ")
|
||||
try: filterMode = int(filterMode) # If not number entered, will get caught later as invalid
|
||||
except: pass
|
||||
|
||||
if filterMode == 1 or filterMode == 2 or filterMode == 3 or filterMode == 4:
|
||||
validFilterMode = True
|
||||
else:
|
||||
print("\nInvalid choice! - Enter either 1, 2, 3 or 4. ")
|
||||
|
||||
### Prepare Filtering Modes ###
|
||||
# Default values for filter criteria
|
||||
inputtedSpammerChannelID = None
|
||||
inputtedUsernameFilter = None
|
||||
inputtedCommentTextFilter = None
|
||||
|
||||
if filterMode == 1:
|
||||
filterSettings = prepare_filter_mode_ID(currentUser, deletionEnabled, scanMode)
|
||||
inputtedSpammerChannelID = filterSettings[1]
|
||||
elif filterMode == 2:
|
||||
filterSettings = prepare_filter_mode_username(currentUser, deletionEnabled, scanMode)
|
||||
inputtedUsernameFilter = filterSettings[1]
|
||||
elif filterMode == 3:
|
||||
filterSettings = prepare_filter_mode_comment_text(currentUser, deletionEnabled, scanMode)
|
||||
inputtedCommentTextFilter = filterSettings[1]
|
||||
elif filterMode == 4:
|
||||
filterSettings = prepare_filter_mode_non_ascii(currentUser, deletionEnabled, scanMode)
|
||||
regexPattern = filterSettings[1]
|
||||
deletionEnabled = filterSettings[0]
|
||||
|
||||
##################### START SCANNING #####################
|
||||
try:
|
||||
# Goes to get comments for first page
|
||||
if nextPageToken == "start":
|
||||
print("Scanning... \n")
|
||||
nextPageToken = get_comments(youtube, check_video_id=check_video_id, check_channel_id=check_channel_id)
|
||||
print_count_stats(final=False) # Prints comment scan stats, updates on same line
|
||||
print("Scanning... \n")
|
||||
nextPageToken = get_comments(youtube, filterMode, check_video_id, check_channel_id, inputtedSpammerChannelID=inputtedSpammerChannelID, inputtedUsernameFilter=inputtedUsernameFilter, inputtedCommentTextFilter=inputtedCommentTextFilter, regexPattern=regexPattern)
|
||||
print_count_stats(final=False) # Prints comment scan stats, updates on same line
|
||||
|
||||
# After getting first page, if there are more pages, goes to get comments for next page
|
||||
while nextPageToken != "End" and scannedCommentsCount < maxScanNumber:
|
||||
nextPageToken = get_comments(youtube, check_video_id=check_video_id, check_channel_id=check_channel_id, nextPageToken=nextPageToken)
|
||||
|
||||
nextPageToken = get_comments(youtube, filterMode, check_video_id, check_channel_id, nextPageToken, inputtedSpammerChannelID=inputtedSpammerChannelID, inputtedUsernameFilter=inputtedUsernameFilter, inputtedCommentTextFilter=inputtedCommentTextFilter, regexPattern=regexPattern)
|
||||
print_count_stats(final=True) # Prints comment scan stats, finalizes
|
||||
##########################################################
|
||||
|
||||
# Counts number of spam comments and prints list
|
||||
# Counts number of found spam comments and prints list
|
||||
spam_count = len(spamCommentsID)
|
||||
if spam_count == 0: # If no spam comments found, exits
|
||||
print("No spam comments found!\n")
|
||||
@@ -728,7 +1004,14 @@ def main():
|
||||
# Write heading info to log file
|
||||
open_log_file(logFileName)
|
||||
logFile.write("----------- YouTube Spammer Purge Log File ----------- \n\n")
|
||||
logFile.write("Channel IDs of spammer searched: " + str(spammer_channel_id) + "\n\n")
|
||||
if filterMode == 1:
|
||||
logFile.write("Channel IDs of spammer searched: " + str(inputtedSpammerChannelID) + "\n\n")
|
||||
elif filterMode == 2:
|
||||
logFile.write("Characters searched in Usernames: " + str(inputtedUsernameFilter) + "\n\n")
|
||||
elif filterMode == 3:
|
||||
logFile.write("Characters searched in Comment Text: " + str(inputtedCommentTextFilter) + "\n\n")
|
||||
elif filterMode == 4:
|
||||
logFile.write("Automatic Search Mode: " + str(filterSettings[2]))
|
||||
logFile.write("Number of Spammer Comments Found: " + str(len(spamCommentsID)) + "\n\n")
|
||||
logFile.write("IDs of Spammer Comments: " + "\n" + str(spamCommentsID) + "\n\n\n")
|
||||
|
||||
@@ -753,11 +1036,13 @@ def main():
|
||||
exit()
|
||||
elif confirmDelete == "YES":
|
||||
deletionEnabled = "True"
|
||||
elif deletionEnabled == "False" and spammer_channel_id == currentUser[0] and mode == "2":
|
||||
input("\nDeletion functionality disabled for this mode because you scanned your own channel. Press Enter to exit...")
|
||||
elif deletionEnabled == "False" and inputtedSpammerChannelID == currentUser[0] and scanMode == 2:
|
||||
input("\nDeletion functionality disabled for this scanning mode because you scanned your own channel. Press Enter to exit...")
|
||||
exit()
|
||||
else:
|
||||
input("\nFAILSAFE: For an unknown reason, the deletion functionality was not enabled. Cannot delete comments. Press Enter to exit...")
|
||||
print("\nThe deletion functionality was not enabled. Cannot delete comments.")
|
||||
print("Possible Causes: You're trying to scan someone elses video, or your search criteria matched your own channel in channel-wide scan mode.")
|
||||
input("Press Enter to exit...")
|
||||
exit()
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user