Fix community post scanning

Update version number to 2.18.0
Fix fetching recent community posts
Fix community post comments
This commit is contained in:
ThioJoe
2025-05-26 16:31:53 -07:00
parent 48d5419289
commit 2888c14e85
5 changed files with 72 additions and 58 deletions

View File

@@ -10,8 +10,8 @@ import time
import re
import requests
YOUTUBE_VIDEO_URL = 'https://www.youtube.com/post/{youtube_id}'
YOUTUBE_COMMUNITY_TAB_URL = 'https://www.youtube.com/channel/{channel_id}/community'
YOUTUBE_VIDEO_URL = 'https://www.youtube.com/post/{youtube_id}' # You can access a post by its ID, it will redirect to the full URL
YOUTUBE_COMMUNITY_TAB_URL = 'https://www.youtube.com/channel/{channel_id}/posts'
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.130 Safari/537.36'
@@ -29,7 +29,7 @@ def regex_search(text, pattern, group=1, default=None):
def ajax_request(session, endpoint, ytcfg, retries=5, sleep=20):
url = 'https://www.youtube.com' + endpoint['commandMetadata']['webCommandMetadata']['apiUrl']
data = {'context': ytcfg['INNERTUBE_CONTEXT'],
'continuation': endpoint['continuationCommand']['token']}
@@ -56,7 +56,7 @@ def get_post_channel_url(youtube_id):
return None # Unable to extract configuration
data = json.loads(regex_search(html, YT_INITIAL_DATA_RE, default=''))
try:
channelURL = data['microformat']['microformatDataRenderer']['urlCanonical']
channelURL = data['metadata']['channelMetadataRenderer']['externalId']
return channelURL
except KeyError:
return None
@@ -66,33 +66,37 @@ def get_post_channel_url(youtube_id):
def fetch_recent_community_posts(channel_id):
session = requests.Session()
session.headers['User-Agent'] = USER_AGENT
session.cookies.set('CONSENT', 'YES+cb', domain='.youtube.com')
response = session.get(YOUTUBE_COMMUNITY_TAB_URL.format(channel_id=channel_id))
if 'uxe=' in response.request.url:
session.cookies.set('CONSENT', 'YES+cb', domain='.youtube.com')
response = session.get(YOUTUBE_COMMUNITY_TAB_URL.format(channel_id=channel_id))
html = response.text
data = json.loads(regex_search(html, YT_INITIAL_DATA_RE, default=''))
rendererSubsection = next(search_dict(data, 'twoColumnBrowseResultsRenderer'), None)
itemSection = next(search_dict(rendererSubsection, 'itemSectionRenderer'), None)
rawPosts = list(search_dict(itemSection, 'backstagePostRenderer'))
# The initial data already contains the most recent posts.
# We search for 'backstagePostThreadRenderer' which contains the post.
rawPosts = list(search_dict(data, 'backstagePostThreadRenderer'))
recentPostsListofDicts = [] # Use list to keep in order - Puts post ID and sample of text into dictionary keypair, strips newlines
# Gets the Post IDs and sample of post text
for post in rawPosts:
id = post['postId']
for post_thread in rawPosts:
# The actual post data is nested inside the 'post' -> 'backstagePostRenderer' keys
try:
text = post['contentText']['runs'][0]['text'].strip().replace('\n', '').replace('\r', '')
post = post_thread['post']['backstagePostRenderer']
id = post['postId']
try:
text = post['contentText']['runs'][0]['text'].strip().replace('\n', '').replace('\r', '')
except KeyError:
text = "[No Text For This Post]"
recentPostsListofDicts.append({id:text})
except KeyError:
text = "[No Text For This Post]"
recentPostsListofDicts.append({id:text})
# Skip if the expected structure is not found
continue
recentPostsListofDicts.reverse() # Reverse list so newest posts are first
return recentPostsListofDicts
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
def download_comments(youtube_id, sort_by=SORT_BY_RECENT, language=None, sleep=.1):
session = requests.Session()
@@ -164,21 +168,27 @@ def download_comments(youtube_id, sort_by=SORT_BY_RECENT, language=None, sleep=.
except (KeyError, ValueError):
totalPostComments = -1
for comment in reversed(list(search_dict(response, 'commentRenderer'))):
# Yield instead of return, function called by for loop
yield {
'cid': comment['commentId'],
'text': ''.join([c['text'] for c in comment['contentText'].get('runs', [])]),
'time': comment['publishedTimeText']['runs'][0]['text'],
'author': comment.get('authorText', {}).get('simpleText', ''),
'channel': comment['authorEndpoint']['browseEndpoint'].get('browseId', ''),
'votes': comment.get('voteCount', {}).get('simpleText', '0'),
'photo': comment['authorThumbnail']['thumbnails'][-1]['url'],
'heart': next(search_dict(comment, 'isHearted'), False),
toolbar_payloads = search_dict(response, 'engagementToolbarStateEntityPayload')
toolbar_states = {payloads['key']:payloads for payloads in toolbar_payloads}
for comment in reversed(list(search_dict(response, 'commentEntityPayload'))):
properties = comment['properties']
author = comment['author']
toolbar = comment['toolbar']
toolbar_state = toolbar_states[properties['toolbarStateKey']]
yield {'cid': properties['commentId'],
'text': properties['content']['content'],
'time': properties['publishedTime'],
'author': author['displayName'],
'channel': author['channelId'],
'votes': toolbar['likeCountLiked'],
'replies': toolbar['replyCount'],
'photo': author['avatarThumbnailUrl'],
'heart': toolbar_state.get('heartState', '') == 'TOOLBAR_HEART_STATE_HEARTED',
'reply': '.' in properties['commentId'],
# Extra data not specific to comment:
'totalPostComments': totalPostComments
}
# Extra data not specific to comment:
'totalPostComments': totalPostComments
}
#time.sleep(sleep)
@@ -201,7 +211,7 @@ def search_dict(partial, search_key):
def main(communityPostID=None, limit=1000, sort=SORT_BY_RECENT, language=None, postScanProgressDict=None, postText=None):
if not communityPostID:
raise ValueError('you need to specify a Youtube ID')
if postScanProgressDict:
i = postScanProgressDict['scanned']
j = postScanProgressDict['total']
@@ -210,10 +220,10 @@ def main(communityPostID=None, limit=1000, sort=SORT_BY_RECENT, language=None, p
print(f'\n Loading Comments For Post: {communityPostID}')
if postText:
print(f" > {F.LIGHTCYAN_EX}Post Text Sample:{S.R} {postText[0:90]}")
print(f" > {F.LIGHTCYAN_EX}Post Text Sample:{S.R} {postText[0:90]}")
count = 0
#print(f' > Loaded {F.YELLOW}{count}{S.R} comment(s)', end='\r')
#print(f' > Loaded {F.YELLOW}{count}{S.R} comment(s)', end='\r')
totalComments = 0
commentsDict = {}
@@ -230,21 +240,21 @@ def main(communityPostID=None, limit=1000, sort=SORT_BY_RECENT, language=None, p
# Doesn't return a number after first page, so don't update after that
if comment['totalPostComments']:
totalComments = comment['totalPostComments']
if totalComments >= 0:
percent = ((count / totalComments) * 100)
progressStats = f"[ {str(count)} / {str(totalComments)} ]".ljust(15, " ") + f" ({percent:.2f}%)"
print(f' > Retrieving Post Comments - {progressStats}', end='\r')
else:
print(f' > Loaded {F.YELLOW}{count}{S.R} comment(s)', end='\r')
print(f' > Retrieving Post Comments - {progressStats}', end='\r')
else:
print(f' > Loaded {F.YELLOW}{count}{S.R} comment(s)', end='\r')
if limit and count >= limit:
print(" ")
print(" ")
break
print(" ")
print(" ")
return commentsDict
if __name__ == "__main__":
main(sys.argv[1:])
main(sys.argv[1:])

File diff suppressed because one or more lines are too long

View File

@@ -83,6 +83,8 @@ def validate_video_id(video_url_or_id, silent=False, pass_exception=False, basic
############################### VALIDATE COMMUNITY POST ID #################################
def validate_post_id(post_url):
isolatedPostID = ""
if "/post/" in post_url:
startIndex = post_url.rindex("/") + 1
endIndex = len(post_url)
@@ -90,12 +92,14 @@ def validate_post_id(post_url):
startIndex = post_url.rindex("lb=") + 3
endIndex = len(post_url)
else:
isolatedPostId = post_url
try:
if startIndex < endIndex and endIndex <= len(post_url):
isolatedPostID = post_url[startIndex:endIndex]
except:
return False, None, None, None, None
isolatedPostID = post_url
if isolatedPostID == "":
try:
if startIndex < endIndex and endIndex <= len(post_url):
isolatedPostID = post_url[startIndex:endIndex]
except:
return False, None, None, None, None
# Post IDs used to be shorter, but apparently now have a longer format
if len(isolatedPostID) == 26 or len(isolatedPostID) == 36:

View File

@@ -36,7 +36,7 @@
### IMPORTANT: I OFFER NO WARRANTY OR GUARANTEE FOR THIS SCRIPT. USE AT YOUR OWN RISK.
### I tested it on my own and implemented some failsafes as best as I could,
### but there could always be some kind of bug. You should inspect the code yourself.
version = "2.18.0-Beta4"
version = "2.18.0"
configVersion = 33
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#
print("Importing Script Modules...")

View File

@@ -85,7 +85,7 @@ colors_enabled = True
# Prompts user to encrypt the token.pickle file with a user-selected password for extra security for files at rest. Only primary config file is checked for this setting.
# NOTE: This should be considered "better than nothing" and not relied upon for keeping critical credentials safe
# If encrypted token file already exists, you will need to enter the password every time you run the program regardless of this setting.
encrypt_token_file = True
encrypt_token_file = False
[scanning]