mirror of
https://github.com/reddit-archive/reddit.git
synced 2026-04-05 03:00:15 -04:00
Refactor comment tree management and support multiple implementations.
This helps us migrate comment trees out of permacache, and offers a new lockless schema for storing the data.
This commit is contained in:
@@ -25,7 +25,8 @@ from itertools import chain
|
||||
from r2.lib.utils import tup, to36
|
||||
from r2.lib.db.sorts import epoch_seconds
|
||||
from r2.lib.cache import sgm
|
||||
from r2.models.link import Link
|
||||
from r2.models.comment_tree import CommentTree
|
||||
from r2.models.link import Comment, Link
|
||||
|
||||
MAX_ITERATIONS = 50000
|
||||
|
||||
@@ -48,6 +49,7 @@ def _get_sort_value(comment, sort):
|
||||
return getattr(comment, sort)
|
||||
|
||||
def add_comments(comments):
|
||||
links = Link._byID([com.link_id for com in tup(comments)], data=True)
|
||||
comments = tup(comments)
|
||||
|
||||
link_map = {}
|
||||
@@ -55,85 +57,20 @@ def add_comments(comments):
|
||||
link_map.setdefault(com.link_id, []).append(com)
|
||||
|
||||
for link_id, coms in link_map.iteritems():
|
||||
link = links[link_id]
|
||||
try:
|
||||
with g.make_lock("comment_tree", lock_key(link_id)):
|
||||
add_comments_nolock(link_id, coms)
|
||||
with CommentTree.mutation_context(link):
|
||||
cache = get_comment_tree(link)
|
||||
cache.add_comments(coms)
|
||||
except:
|
||||
g.log.exception(
|
||||
'add_comments_nolock failed for link %s, recomputing tree',
|
||||
link_id)
|
||||
|
||||
# calculate it from scratch
|
||||
link_comments(link_id, _update = True)
|
||||
get_comment_tree(link, _update=True)
|
||||
update_comment_votes(coms)
|
||||
|
||||
def add_comments_nolock(link_id, comments):
|
||||
cids, comment_tree, depth, num_children = link_comments(link_id)
|
||||
|
||||
#dfs to find the list of parents for the new comment
|
||||
def find_parents():
|
||||
stack = [cid for cid in comment_tree[None]]
|
||||
parents = []
|
||||
while stack:
|
||||
cur_cm = stack.pop()
|
||||
if cur_cm == cm_id:
|
||||
return parents
|
||||
elif cur_cm in comment_tree:
|
||||
#make cur_cm the end of the parents list
|
||||
parents = parents[:depth[cur_cm]] + [cur_cm]
|
||||
for child in comment_tree[cur_cm]:
|
||||
stack.append(child)
|
||||
|
||||
new_parents = {}
|
||||
for comment in comments:
|
||||
cm_id = comment._id
|
||||
p_id = comment.parent_id
|
||||
|
||||
#make sure we haven't already done this before (which would happen
|
||||
#if the tree isn't cached when you add a comment)
|
||||
if comment._id in cids:
|
||||
continue
|
||||
|
||||
#add to comment list
|
||||
cids.append(comment._id)
|
||||
|
||||
#add to tree
|
||||
comment_tree.setdefault(p_id, []).append(cm_id)
|
||||
|
||||
#add to depth
|
||||
depth[cm_id] = depth[p_id] + 1 if p_id else 0
|
||||
|
||||
#update children
|
||||
num_children[cm_id] = 0
|
||||
|
||||
#if this comment had a parent, find the parent's parents
|
||||
if p_id:
|
||||
new_parents[cm_id] = p_id
|
||||
for p_id in find_parents():
|
||||
num_children[p_id] += 1
|
||||
|
||||
# update our cache of children -> parents as well:
|
||||
key = parent_comments_key(link_id)
|
||||
r = g.permacache.get(key)
|
||||
|
||||
if not r:
|
||||
r = _parent_dict_from_tree(comment_tree)
|
||||
|
||||
for cm_id, parent_id in new_parents.iteritems():
|
||||
# print "Now, I set %s -> %s" % (cm_id, parent_id)
|
||||
r[cm_id] = parent_id
|
||||
|
||||
for comment in comments:
|
||||
cm_id = comment._id
|
||||
if cm_id not in new_parents:
|
||||
r[cm_id] = None
|
||||
# print "And I set %s -> None" % cm_id
|
||||
|
||||
g.permacache.set(key, r)
|
||||
|
||||
g.permacache.set(comments_key(link_id),
|
||||
(cids, comment_tree, depth, num_children))
|
||||
|
||||
def update_comment_votes(comments, write_consistency_level = None):
|
||||
from r2.models import CommentSortsCache
|
||||
|
||||
@@ -154,35 +91,14 @@ def update_comment_votes(comments, write_consistency_level = None):
|
||||
write_consistency_level = write_consistency_level)
|
||||
|
||||
def delete_comment(comment):
|
||||
with g.make_lock("comment_tree", lock_key(comment.link_id)):
|
||||
cids, comment_tree, depth, num_children = link_comments(comment.link_id)
|
||||
|
||||
# only completely remove comments with no children
|
||||
if comment._id not in comment_tree:
|
||||
if comment._id in cids:
|
||||
cids.remove(comment._id)
|
||||
if comment._id in depth:
|
||||
del depth[comment._id]
|
||||
if comment._id in num_children:
|
||||
del num_children[comment._id]
|
||||
g.permacache.set(comments_key(comment.link_id),
|
||||
(cids, comment_tree, depth, num_children))
|
||||
|
||||
# update the link's comment count and schedule it for search reindexing
|
||||
link = Link._byID(comment.link_id, data = True)
|
||||
link._incr('num_comments', -1)
|
||||
with CommentTree.mutation_context(comment.link_id):
|
||||
link = Link._byID(comment.link_id, data=True)
|
||||
cache = get_comment_tree(link)
|
||||
cache.delete_comment(comment)
|
||||
from r2.lib.db.queries import changed
|
||||
changed(link)
|
||||
|
||||
def _parent_dict_from_tree(comment_tree):
|
||||
parents = {}
|
||||
for parent, childs in comment_tree.iteritems():
|
||||
for child in childs:
|
||||
parents[child] = parent
|
||||
return parents
|
||||
|
||||
def _comment_sorter_from_cids(cids, sort):
|
||||
from r2.models import Comment
|
||||
comments = Comment._byID(cids, data = False, return_dict = False)
|
||||
return dict((x._id, _get_sort_value(x, sort)) for x in comments)
|
||||
|
||||
@@ -202,8 +118,8 @@ def _get_comment_sorter(link_id, sort):
|
||||
for (c_id, val) in sorter.iteritems())
|
||||
return sorter
|
||||
|
||||
def link_comments_and_sort(link_id, sort):
|
||||
from r2.models import Comment, CommentSortsCache
|
||||
def link_comments_and_sort(link, sort):
|
||||
from r2.models import CommentSortsCache
|
||||
|
||||
# This has grown sort of organically over time. Right now the
|
||||
# cache of the comments tree consists in three keys:
|
||||
@@ -220,12 +136,13 @@ def link_comments_and_sort(link_id, sort):
|
||||
# (CommentSortsCache) rather than a permacache key. One of
|
||||
# these exists for each sort (hot, new, etc)
|
||||
|
||||
# performance hack: preload these into the LocalCache at the same
|
||||
# time
|
||||
g.permacache.get_multi([comments_key(link_id),
|
||||
parent_comments_key(link_id)])
|
||||
|
||||
cids, cid_tree, depth, num_children = link_comments(link_id)
|
||||
link_id = link._id
|
||||
cache = get_comment_tree(link)
|
||||
cids = cache.cids
|
||||
tree = cache.tree
|
||||
depth = cache.depth
|
||||
num_children = cache.num_children
|
||||
parents = cache.parents
|
||||
|
||||
# load the sorter
|
||||
sorter = _get_comment_sorter(link_id, sort)
|
||||
@@ -247,9 +164,6 @@ def link_comments_and_sort(link_id, sort):
|
||||
|
||||
sorter.update(_comment_sorter_from_cids(sorter_needed, sort))
|
||||
|
||||
# load the parents
|
||||
key = parent_comments_key(link_id)
|
||||
parents = g.permacache.get(key)
|
||||
if parents is None:
|
||||
g.log.debug("comment_tree.py: parents cache miss for Link %s"
|
||||
% link_id)
|
||||
@@ -260,96 +174,25 @@ def link_comments_and_sort(link_id, sort):
|
||||
parents = {}
|
||||
|
||||
if not parents and len(cids) > 0:
|
||||
with g.make_lock("comment_tree", lock_key(link_id)):
|
||||
# reload from the cache so the sorter and parents are
|
||||
# maximally consistent
|
||||
r = g.permacache.get(comments_key(link_id))
|
||||
cids, cid_tree, depth, num_children = r
|
||||
with CommentTree.mutation_context(link):
|
||||
# reload under lock so the sorter and parents are consistent
|
||||
cache = get_comment_tree(link)
|
||||
cache.parents = cache.parent_dict_from_tree(cache.tree)
|
||||
|
||||
key = parent_comments_key(link_id)
|
||||
if not parents:
|
||||
parents = _parent_dict_from_tree(cid_tree)
|
||||
g.permacache.set(key, parents)
|
||||
return (cache.cids, cache.tree, cache.depth, cache.num_children,
|
||||
cache.parents, sorter)
|
||||
|
||||
return cids, cid_tree, depth, num_children, parents, sorter
|
||||
|
||||
|
||||
def link_comments(link_id, _update=False):
|
||||
key = comments_key(link_id)
|
||||
|
||||
r = g.permacache.get(key)
|
||||
|
||||
if r and not _update:
|
||||
return r
|
||||
else:
|
||||
# This operation can take longer than most (note the inner
|
||||
# locks) better to time out request temporarily than to deal
|
||||
# with an inconsistent tree
|
||||
with g.make_lock("comment_tree", lock_key(link_id), timeout=180):
|
||||
r = _load_link_comments(link_id)
|
||||
# rebuild parent dict
|
||||
cids, cid_tree, depth, num_children, num_comments = r
|
||||
r = r[:-1] # Remove num_comments from r; we don't need to cache it.
|
||||
g.permacache.set(parent_comments_key(link_id),
|
||||
_parent_dict_from_tree(cid_tree))
|
||||
|
||||
g.permacache.set(key, r)
|
||||
|
||||
# update the link's comment count and schedule it for search
|
||||
# reindexing
|
||||
link = Link._byID(link_id, data = True)
|
||||
link.num_comments = num_comments
|
||||
link._commit()
|
||||
from r2.lib.db.queries import changed
|
||||
changed(link)
|
||||
|
||||
return r
|
||||
|
||||
def _load_link_comments(link_id):
|
||||
from r2.models import Comment
|
||||
q = Comment._query(Comment.c.link_id == link_id,
|
||||
Comment.c._deleted == (True, False),
|
||||
Comment.c._spam == (True, False),
|
||||
optimize_rules=True,
|
||||
data = True)
|
||||
comments = list(q)
|
||||
cids = [c._id for c in comments]
|
||||
|
||||
#make a tree
|
||||
comment_tree = {}
|
||||
for cm in comments:
|
||||
p_id = cm.parent_id
|
||||
comment_tree.setdefault(p_id, []).append(cm._id)
|
||||
|
||||
#calculate the depths
|
||||
depth = {}
|
||||
level = 0
|
||||
cur_level = comment_tree.get(None, ())
|
||||
while cur_level:
|
||||
next_level = []
|
||||
for cm_id in cur_level:
|
||||
depth[cm_id] = level
|
||||
next_level.extend(comment_tree.get(cm_id, ()))
|
||||
cur_level = next_level
|
||||
level += 1
|
||||
|
||||
#calc the number of children
|
||||
num_children = {}
|
||||
for cm_id in cids:
|
||||
num = 0
|
||||
todo = [cm_id]
|
||||
iteration_count = 0
|
||||
while todo:
|
||||
if iteration_count > MAX_ITERATIONS:
|
||||
raise Exception("bad comment tree for link %s" % link_id)
|
||||
more = comment_tree.get(todo.pop(0), ())
|
||||
num += len(more)
|
||||
todo.extend(more)
|
||||
iteration_count += 1
|
||||
num_children[cm_id] = num
|
||||
|
||||
num_comments = sum(1 for c in comments if not c._deleted)
|
||||
return cids, comment_tree, depth, num_children, num_comments
|
||||
def get_comment_tree(link, _update=False):
|
||||
cache = CommentTree.by_link(link)
|
||||
if cache and not _update:
|
||||
return cache
|
||||
with CommentTree.mutation_context(link, timeout=180):
|
||||
cache = CommentTree.rebuild(link)
|
||||
# the tree rebuild updated the link's comment count, so schedule it for
|
||||
# search reindexing
|
||||
from r2.lib.db.queries import changed
|
||||
changed(link)
|
||||
return cache
|
||||
|
||||
# message conversation functions
|
||||
def messages_key(user_id):
|
||||
@@ -555,7 +398,7 @@ def tree_sort_fn(tree):
|
||||
return threads[-1] if threads else root
|
||||
|
||||
def _populate(after_id = None, estimate=54301242):
|
||||
from r2.models import Comment, CommentSortsCache, desc
|
||||
from r2.models import CommentSortsCache, desc
|
||||
from r2.lib.db import tdb_cassandra
|
||||
from r2.lib import utils
|
||||
|
||||
|
||||
@@ -25,7 +25,7 @@ from r2.models import Message, Inbox, Subreddit, ModContribSR, ModeratorInbox, M
|
||||
from r2.lib.db.thing import Thing, Merge
|
||||
from r2.lib.db.operators import asc, desc, timeago
|
||||
from r2.lib.db.sorts import epoch_seconds
|
||||
from r2.lib.utils import fetch_things2, tup, UniqueIterator, set_last_modified
|
||||
from r2.lib.utils import fetch_things2, tup, UniqueIterator
|
||||
from r2.lib import utils
|
||||
from r2.lib import amqp, sup, filters
|
||||
from r2.lib.comment_tree import add_comments, update_comment_votes
|
||||
@@ -854,7 +854,7 @@ def new_comment(comment, inbox_rels):
|
||||
amqp.add_item('new_comment', comment._fullname)
|
||||
|
||||
if not g.amqp_host:
|
||||
add_comment_tree([comment])
|
||||
add_comments([comment])
|
||||
|
||||
job_dict = { job_key: comment }
|
||||
add_queries(job, **job_dict)
|
||||
@@ -1310,16 +1310,6 @@ def add_all_users():
|
||||
for user in fetch_things2(q):
|
||||
update_user(user)
|
||||
|
||||
def add_comment_tree(comments):
|
||||
#update the comment cache
|
||||
add_comments(comments)
|
||||
#update last modified
|
||||
links = Link._byID(list(set(com.link_id for com in tup(comments))),
|
||||
data = True, return_dict = False)
|
||||
for link in links:
|
||||
set_last_modified(link, 'comments')
|
||||
LastModified.touch(link._fullname, 'Comments')
|
||||
|
||||
# amqp queue processing functions
|
||||
|
||||
def run_new_comments(limit=1000):
|
||||
@@ -1351,7 +1341,7 @@ def run_commentstree(qname="commentstree_q", limit=100):
|
||||
data = True, return_dict = False)
|
||||
print 'Processing %r' % (comments,)
|
||||
|
||||
add_comment_tree(comments)
|
||||
add_comments(comments)
|
||||
|
||||
amqp.handle_items(qname, _run_commentstree, limit = limit)
|
||||
|
||||
|
||||
@@ -22,7 +22,7 @@
|
||||
|
||||
from builder import Builder, MAX_RECURSION, empty_listing
|
||||
from r2.lib.wrapped import Wrapped
|
||||
from r2.lib.comment_tree import link_comments, link_comments_and_sort, tree_sort_fn, MAX_ITERATIONS
|
||||
from r2.lib.comment_tree import link_comments_and_sort, tree_sort_fn, MAX_ITERATIONS
|
||||
from r2.models.link import *
|
||||
from r2.lib.db import operators
|
||||
from r2.lib import utils
|
||||
@@ -50,11 +50,10 @@ class _CommentBuilder(Builder):
|
||||
cdef list cid
|
||||
cdef dict cid_tree
|
||||
cdef dict depth
|
||||
cdef dict num_children
|
||||
cdef dict parents
|
||||
cdef dict sorter
|
||||
|
||||
r = link_comments_and_sort(self.link._id, self.sort.col)
|
||||
r = link_comments_and_sort(self.link, self.sort.col)
|
||||
cids, cid_tree, depth, num_children, parents, sorter = r
|
||||
|
||||
cdef dict debug_dict = dict(
|
||||
|
||||
440
r2/r2/models/comment_tree.py
Normal file
440
r2/r2/models/comment_tree.py
Normal file
@@ -0,0 +1,440 @@
|
||||
# The contents of this file are subject to the Common Public Attribution
|
||||
# License Version 1.0. (the "License"); you may not use this file except in
|
||||
# compliance with the License. You may obtain a copy of the License at
|
||||
# http://code.reddit.com/LICENSE. The License is based on the Mozilla Public
|
||||
# License Version 1.1, but Sections 14 and 15 have been added to cover use of
|
||||
# software over a computer network and provide for limited attribution for the
|
||||
# Original Developer. In addition, Exhibit A has been modified to be consistent
|
||||
# with Exhibit B.
|
||||
#
|
||||
# Software distributed under the License is distributed on an "AS IS" basis,
|
||||
# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for
|
||||
# the specific language governing rights and limitations under the License.
|
||||
#
|
||||
# The Original Code is reddit.
|
||||
#
|
||||
# The Original Developer is the Initial Developer. The Initial Developer of
|
||||
# the Original Code is reddit Inc.
|
||||
#
|
||||
# All portions of the code written by reddit are Copyright (c) 2006-2012 reddit
|
||||
# Inc. All Rights Reserved.
|
||||
###############################################################################
|
||||
|
||||
from r2.lib.db import tdb_cassandra
|
||||
from r2.lib import utils
|
||||
from r2.models.last_modified import LastModified
|
||||
from r2.models.link import Comment
|
||||
|
||||
from pycassa import batch, types
|
||||
from pycassa.cassandra import ttypes
|
||||
from pycassa.system_manager import ASCII_TYPE, COUNTER_COLUMN_TYPE
|
||||
|
||||
from pylons import g
|
||||
|
||||
|
||||
class CommentTreeStorageBase(object):
|
||||
_maintain_num_children = True
|
||||
|
||||
class NoOpContext:
|
||||
def __enter__(self):
|
||||
pass
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
def mutation_context(cls, link, timeout=None):
|
||||
return cls.NoOpContext()
|
||||
|
||||
@classmethod
|
||||
def by_link(cls, link):
|
||||
raise NotImplementedError
|
||||
|
||||
@classmethod
|
||||
def add_comments(cls, tree, comments):
|
||||
cids = tree.cids
|
||||
depth = tree.depth
|
||||
num_children = tree.num_children
|
||||
|
||||
#dfs to find the list of parents for the new comment
|
||||
def find_parents(cid):
|
||||
# initialize stack with copy of top-level cids
|
||||
stack = tree.tree[None][:]
|
||||
parents = []
|
||||
while stack:
|
||||
cur = stack.pop()
|
||||
if cur == cid:
|
||||
return parents
|
||||
elif cur in tree.tree:
|
||||
#make cur the end of the parents list
|
||||
parents = parents[:depth[cur]] + [cur]
|
||||
stack.extend(tree.tree[cur])
|
||||
|
||||
new_parents = {}
|
||||
for comment in comments:
|
||||
cid = comment._id
|
||||
p_id = comment.parent_id
|
||||
|
||||
#make sure we haven't already done this before (which would happen
|
||||
#if the tree isn't cached when you add a comment)
|
||||
if cid in cids:
|
||||
continue
|
||||
|
||||
#add to comment list
|
||||
cids.append(cid)
|
||||
|
||||
#add to tree
|
||||
tree.tree.setdefault(p_id, []).append(cid)
|
||||
|
||||
#add to depth
|
||||
depth[cid] = depth[p_id] + 1 if p_id else 0
|
||||
|
||||
#update children
|
||||
if cls._maintain_num_children:
|
||||
num_children[cid] = 0
|
||||
|
||||
#if this comment had a parent, find the parent's parents
|
||||
if p_id:
|
||||
new_parents[cid] = p_id
|
||||
if cls._maintain_num_children:
|
||||
for p_id in find_parents(cid):
|
||||
num_children[p_id] += 1
|
||||
|
||||
# update our cache of children -> parents as well:
|
||||
if not tree.parents:
|
||||
tree.parents = tree.parent_dict_from_tree(tree.tree)
|
||||
|
||||
parents = tree.parents
|
||||
|
||||
for cid, p_id in new_parents.iteritems():
|
||||
parents[cid] = p_id
|
||||
|
||||
for comment in comments:
|
||||
cid = comment._id
|
||||
if cid not in new_parents:
|
||||
parents[cid] = None
|
||||
|
||||
@classmethod
|
||||
def delete_comment(cls, tree, comment):
|
||||
# only remove leaf comments from the tree
|
||||
if comment._id not in tree.tree:
|
||||
if comment._id in tree.cids:
|
||||
tree.cids.remove(comment._id)
|
||||
if comment._id in tree.depth:
|
||||
del tree.depth[comment._id]
|
||||
if comment._id in tree.num_children:
|
||||
del tree.num_children[comment._id]
|
||||
|
||||
|
||||
class CommentTreeStorageV2(CommentTreeStorageBase):
|
||||
"""Cassandra column-based storage for comment trees.
|
||||
|
||||
Under this implementation, each column in a link's row corresponds to a
|
||||
comment on that link. The column name is an encoding of the tuple of
|
||||
(comment.parent_id, comment._id), and the value is a counter giving the
|
||||
size of the subtree rooted at the comment.
|
||||
|
||||
Key features:
|
||||
- does not use permacache!
|
||||
- does not require locking for updates
|
||||
"""
|
||||
|
||||
__metaclass__ = tdb_cassandra.ThingMeta
|
||||
_connection_pool = 'main'
|
||||
_use_db = True
|
||||
|
||||
_type_prefix = None
|
||||
_cf_name = 'CommentTree'
|
||||
|
||||
# column keys are tuples of (depth, parent_id, comment_id)
|
||||
_compare_with = types.CompositeType(
|
||||
types.IntegerType(),
|
||||
types.IntegerType(),
|
||||
types.IntegerType())
|
||||
|
||||
# column values are counters
|
||||
_extra_schema_creation_args = {
|
||||
'default_validation_class': COUNTER_COLUMN_TYPE,
|
||||
'replicate_on_write': True,
|
||||
}
|
||||
|
||||
COLUMN_READ_BATCH_SIZE = tdb_cassandra.max_column_count
|
||||
COLUMN_WRITE_BATCH_SIZE = 1000
|
||||
|
||||
@staticmethod
|
||||
def _key(link_id):
|
||||
return utils.to36(link_id)
|
||||
|
||||
@staticmethod
|
||||
def _column_to_obj(cols):
|
||||
for col in cols:
|
||||
for (depth, pid, cid), val in col.iteritems():
|
||||
yield (depth, None if pid == -1 else pid, cid), val
|
||||
|
||||
@classmethod
|
||||
def by_link(cls, link):
|
||||
try:
|
||||
row = cls.get_row(cls._key(link._id))
|
||||
except ttypes.NotFoundException:
|
||||
row = {}
|
||||
return cls._from_row(row)
|
||||
|
||||
@classmethod
|
||||
def get_row(cls, key):
|
||||
row = []
|
||||
size = 0
|
||||
column_start = ''
|
||||
while True:
|
||||
batch = cls._cf.get(key, column_count=cls.COLUMN_READ_BATCH_SIZE,
|
||||
column_start=column_start)
|
||||
row.extend(batch.iteritems())
|
||||
num_fetched = len(row) - size
|
||||
size = len(row)
|
||||
if num_fetched < cls.COLUMN_READ_BATCH_SIZE:
|
||||
break
|
||||
depth, pid, cid = row[-1][0]
|
||||
column_start = (depth, pid if pid is not None else -1, cid + 1)
|
||||
return row
|
||||
|
||||
@classmethod
|
||||
def _from_row(cls, row):
|
||||
# row is a dict of {(depth, parent_id, comment_id): subtree_size}
|
||||
cids = []
|
||||
tree = {}
|
||||
depth = {}
|
||||
parents = {}
|
||||
num_children = {}
|
||||
for (d, pid, cid), val in row:
|
||||
if cid == -1:
|
||||
continue
|
||||
if pid == -1:
|
||||
pid = None
|
||||
cids.append(cid)
|
||||
tree.setdefault(pid, []).append(cid)
|
||||
depth[cid] = d
|
||||
parents[cid] = pid
|
||||
num_children[cid] = val - 1
|
||||
return dict(cids=cids, tree=tree, depth=depth,
|
||||
num_children=num_children, parents=parents)
|
||||
|
||||
@classmethod
|
||||
@tdb_cassandra.will_write
|
||||
def add_comments(cls, tree, comments):
|
||||
CommentTreeStorageBase.add_comments(tree, comments)
|
||||
g.log.debug('building updates dict')
|
||||
updates = {}
|
||||
for c in comments:
|
||||
pids = [int(pid_str, 36) if pid_str else -1
|
||||
for pid_str in c.parents.split(':')]
|
||||
pids.append(c._id)
|
||||
for d, (pid, cid) in enumerate(zip(pids, pids[1:])):
|
||||
k = (d, pid, cid)
|
||||
updates[k] = updates.get(k, 0) + 1
|
||||
|
||||
g.log.debug('writing %d updates', len(updates))
|
||||
# increment counters in slices of 100
|
||||
cols = updates.keys()
|
||||
for i in xrange(0, len(updates), cls.COLUMN_WRITE_BATCH_SIZE):
|
||||
g.log.debug(
|
||||
'adding updates %d..%d', i, i + cls.COLUMN_WRITE_BATCH_SIZE)
|
||||
update_batch = {c: updates[c]
|
||||
for c in cols[i:i + cls.COLUMN_WRITE_BATCH_SIZE]}
|
||||
with batch.Mutator(g.cassandra_pools[cls._connection_pool]) as m:
|
||||
m.insert(cls._cf, cls._key(tree.link_id), update_batch)
|
||||
g.log.debug('added %d comments with %d updates',
|
||||
len(comments), len(updates))
|
||||
|
||||
@classmethod
|
||||
@tdb_cassandra.will_write
|
||||
def delete_comment(cls, tree, comment):
|
||||
CommentTreeStorageBase.delete_comment(tree, comment)
|
||||
with batch.Mutator(g.cassandra_pools[cls._connection_pool]) as m:
|
||||
m.insert(cls._cf, cls._key(tree.link_id),
|
||||
dict((c.parents + ':' + c._id36), '0'))
|
||||
|
||||
@classmethod
|
||||
@tdb_cassandra.will_write
|
||||
def upgrade(cls, tree, link):
|
||||
cids = []
|
||||
for parent, children in tree.tree.iteritems():
|
||||
cids.extend(children)
|
||||
|
||||
comments = {}
|
||||
for i in xrange(0, len(cids), 100):
|
||||
g.log.debug(' loading comments %d..%d', i, i + 100)
|
||||
comments.update(Comment._byID(cids[i:i + 100], data=True))
|
||||
|
||||
# need to fill in parents attr for each comment
|
||||
modified = []
|
||||
stack = [None]
|
||||
while stack:
|
||||
pid = stack.pop()
|
||||
if pid is None:
|
||||
parents = ''
|
||||
else:
|
||||
parents = comments[pid].parents + ':' + comments[pid]._id36
|
||||
children = tree.tree.get(pid, [])
|
||||
stack.extend(children)
|
||||
for cid in children:
|
||||
if comments[cid].parents != parents:
|
||||
comments[cid].parents = parents
|
||||
modified.append(comments[cid])
|
||||
|
||||
for i, comment in enumerate(modified):
|
||||
comment._commit()
|
||||
|
||||
cls.add_comments(tree, comments.values())
|
||||
|
||||
|
||||
class CommentTreeStorageV1(CommentTreeStorageBase):
|
||||
"""Cassandra storage of comment trees, using permacache."""
|
||||
|
||||
@staticmethod
|
||||
def _comments_key(link_id):
|
||||
return 'comments_' + str(link_id)
|
||||
|
||||
@staticmethod
|
||||
def _parent_comments_key(link_id):
|
||||
return 'comments_parents_' + str(link_id)
|
||||
|
||||
@staticmethod
|
||||
def _lock_key(link_id):
|
||||
return 'comment_lock_' + str(link_id)
|
||||
|
||||
@classmethod
|
||||
def mutation_context(cls, link, timeout=None):
|
||||
return g.make_lock("comment_tree", cls._lock_key(link._id),
|
||||
timeout=timeout)
|
||||
|
||||
@classmethod
|
||||
def by_link(cls, link):
|
||||
key = cls._comments_key(link._id)
|
||||
p_key = cls._parent_comments_key(link._id)
|
||||
# prefetch both values, they'll be locally cached
|
||||
g.permacache.get_multi([key, p_key])
|
||||
|
||||
r = g.permacache.get(key)
|
||||
if not r:
|
||||
return None
|
||||
cids, cid_tree, depth, num_children = r
|
||||
parents = g.permacache.get(p_key)
|
||||
if parents is None:
|
||||
parents = {}
|
||||
return dict(cids=cids, tree=cid_tree, depth=depth,
|
||||
num_children=num_children, parents=parents)
|
||||
|
||||
@classmethod
|
||||
def add_comments(cls, tree, comments):
|
||||
with cls.mutation_context(tree.link):
|
||||
CommentTreeStorageBase.add_comments(tree, comments)
|
||||
g.permacache.set(cls._comments_key(tree.link_id),
|
||||
(tree.cids, tree.tree, tree.depth,
|
||||
tree.num_children))
|
||||
g.permacache.set(cls._parent_comments_key(tree.link_id),
|
||||
tree.parents)
|
||||
|
||||
|
||||
class CommentTree:
|
||||
"""Storage for pre-computed relationships between a link's comments.
|
||||
|
||||
An instance of this class serves as a snapshot of a single link's comment
|
||||
tree. The actual storage implementation is separated to allow for different
|
||||
schemes for different links.
|
||||
|
||||
Attrs:
|
||||
- cids: list of ints; link's comment IDs
|
||||
- tree: dict of int to list of ints; each non-leaf entry in cids has a
|
||||
key in this dict, and the corresponding value is the list of IDs for
|
||||
that comment's immediate children
|
||||
- depth: dict of int to int; each entry in cids has a key in this dict,
|
||||
and the corresponding value is that comment's depth in the tree
|
||||
(with a value of 0 for top-level comments)
|
||||
- num_children: dict of int to int; each entry in cids has a key in this
|
||||
dict, and the corresponding value is the count of that comment's
|
||||
descendents in the tree
|
||||
- parents: dict of int to int; each entry in cids has a key in this dict,
|
||||
and the corresponding value is the ID of that comment's parent (or
|
||||
None in the case of top-level comments)
|
||||
"""
|
||||
|
||||
IMPLEMENTATIONS = {
|
||||
1: CommentTreeStorageV1,
|
||||
2: CommentTreeStorageV2,
|
||||
}
|
||||
|
||||
DEFAULT_IMPLEMENTATION = 2
|
||||
|
||||
def __init__(self, link, **kw):
|
||||
self.link = link
|
||||
self.link_id = link._id
|
||||
self.__dict__.update(kw)
|
||||
|
||||
@classmethod
|
||||
def mutation_context(cls, link, timeout=None):
|
||||
impl = cls.IMPLEMENTATIONS[link.comment_tree_version]
|
||||
return impl.mutation_context(link, timeout=timeout)
|
||||
|
||||
@classmethod
|
||||
def by_link(cls, link):
|
||||
impl = cls.IMPLEMENTATIONS[link.comment_tree_version]
|
||||
data = impl.by_link(link)
|
||||
if data is None:
|
||||
return None
|
||||
else:
|
||||
return cls(link, **data)
|
||||
|
||||
def add_comments(self, comments):
|
||||
impl = self.IMPLEMENTATIONS[self.link.comment_tree_version]
|
||||
impl.add_comments(self, comments)
|
||||
utils.set_last_modified(self.link, 'comments')
|
||||
LastModified.touch(self.link._fullname, 'Comments')
|
||||
|
||||
def add_comment(self, comment):
|
||||
return self.add_comments([comment])
|
||||
|
||||
def delete_comment(self, comment):
|
||||
impl = cls.IMPLEMENTATIONS[link.comment_tree_version]
|
||||
impl.delete_comment(self, comment)
|
||||
self.link._incr('num_comments', -1)
|
||||
|
||||
@classmethod
|
||||
def rebuild(cls, link):
|
||||
# fetch all comments and sort by parent_id, so parents are added to the
|
||||
# tree before their children
|
||||
q = Comment._query(Comment.c.link_id == link._id,
|
||||
Comment.c._deleted == (True, False),
|
||||
Comment.c._spam == (True, False),
|
||||
optimize_rules=True,
|
||||
data=True)
|
||||
comments = sorted(utils.fetch_things2(q, chunk_size=1000),
|
||||
key=lambda c: c.parent_id)
|
||||
|
||||
# build tree from scratch (for V2 results in double-counting in cass)
|
||||
tree = cls(link, cids=[], tree={}, depth={}, num_children={},
|
||||
parents={})
|
||||
impl = cls.IMPLEMENTATIONS[link.comment_tree_version]
|
||||
impl.add_comments(tree, comments)
|
||||
|
||||
link.num_comments = sum(1 for c in comments if not c._deleted)
|
||||
link._commit()
|
||||
|
||||
return tree
|
||||
|
||||
@classmethod
|
||||
def upgrade(cls, link, to_version=None):
|
||||
if to_version is None:
|
||||
to_version = cls.DEFAULT_IMPLEMENTATION
|
||||
while link.comment_tree_version < to_version:
|
||||
tree = cls.by_link(link)
|
||||
new_impl = cls.IMPLEMENTATIONS[link.comment_tree_version + 1]
|
||||
new_impl.upgrade(tree, link)
|
||||
link.comment_tree_version += 1
|
||||
link._commit()
|
||||
|
||||
@staticmethod
|
||||
def parent_dict_from_tree(tree):
|
||||
parents = {}
|
||||
for parent, children in tree.iteritems():
|
||||
for child in children:
|
||||
parents[child] = parent
|
||||
return parents
|
||||
@@ -20,8 +20,8 @@
|
||||
# Inc. All Rights Reserved.
|
||||
###############################################################################
|
||||
|
||||
from r2.lib.db.thing import Thing, Relation, NotFound, MultiRelation, \
|
||||
CreationError
|
||||
from r2.lib.db.thing import (
|
||||
Thing, Relation, NotFound, MultiRelation, CreationError)
|
||||
from r2.lib.db.operators import desc
|
||||
from r2.lib.utils import base_url, tup, domain, title_to_url, UrlParser
|
||||
from account import Account, DeletedUser
|
||||
@@ -68,7 +68,8 @@ class Link(Thing, Printable):
|
||||
noselfreply=False,
|
||||
ip='0.0.0.0',
|
||||
flair_text=None,
|
||||
flair_css_class=None)
|
||||
flair_css_class=None,
|
||||
comment_tree_version=1)
|
||||
_essentials = ('sr_id', 'author_id')
|
||||
_nsfw = re.compile(r"\bnsfw\b", re.I)
|
||||
|
||||
@@ -132,7 +133,8 @@ class Link(Thing, Printable):
|
||||
author_id=author._id,
|
||||
sr_id=sr._id,
|
||||
lang=sr.lang,
|
||||
ip=ip)
|
||||
ip=ip,
|
||||
comment_tree_version=2)
|
||||
l._commit()
|
||||
l.set_url_cache()
|
||||
if author._spam:
|
||||
@@ -667,7 +669,8 @@ class Comment(Thing, Printable):
|
||||
moderator_banned=False,
|
||||
new=False,
|
||||
gildings=0,
|
||||
banned_before_moderator=False)
|
||||
banned_before_moderator=False,
|
||||
parents=None)
|
||||
_essentials = ('link_id', 'author_id')
|
||||
|
||||
def _markdown(self):
|
||||
@@ -677,12 +680,19 @@ class Comment(Thing, Printable):
|
||||
def _new(cls, author, link, parent, body, ip):
|
||||
from r2.lib.db.queries import changed
|
||||
|
||||
kw = {}
|
||||
if link.comment_tree_version > 1:
|
||||
if parent:
|
||||
kw['parents'] = parent.parents + ':' + parent._id36
|
||||
else:
|
||||
kw['parents'] = ':'
|
||||
c = Comment(_ups=1,
|
||||
body=body,
|
||||
link_id=link._id,
|
||||
sr_id=link.sr_id,
|
||||
author_id=author._id,
|
||||
ip=ip)
|
||||
ip=ip,
|
||||
**kw)
|
||||
|
||||
c._spam = author._spam
|
||||
|
||||
|
||||
Reference in New Issue
Block a user