From 281b434f14c9225a22fde5f413d896f18a3ee7d0 Mon Sep 17 00:00:00 2001 From: Ricky Ramirez Date: Wed, 3 Jul 2013 14:52:33 -0700 Subject: [PATCH] cloudsearch: Resolve things lower in the stack. This allows us to use the safe_get function which is very helpful should there be database corruption. *cough*whoops*cough* --- r2/r2/lib/cloudsearch.py | 33 ++++++++++++++++++++------------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/r2/r2/lib/cloudsearch.py b/r2/r2/lib/cloudsearch.py index 3704a3160..2d19eed8c 100644 --- a/r2/r2/lib/cloudsearch.py +++ b/r2/r2/lib/cloudsearch.py @@ -70,7 +70,7 @@ def safe_get(get_fn, ids, return_dict=True, **kw): try: item = get_fn(i, **kw) except NotFound: - g.log.info("%r failed for %r", get_fn, i) + g.log.info("%s failed for %r", get_fn.__name__, i) else: items[i] = item if return_dict: @@ -330,10 +330,10 @@ class CloudSearchUploader(object): use_safe_get = False types = () - def __init__(self, doc_api, things=None, version_offset=_VERSION_OFFSET): + def __init__(self, doc_api, fullnames=None, version_offset=_VERSION_OFFSET): self.doc_api = doc_api self._version_offset = version_offset - self.things = self.desired_things(things) if things else [] + self.fullnames = fullnames @classmethod def desired_fullnames(cls, items): @@ -429,7 +429,15 @@ class CloudSearchUploader(object): raise NotImplementedError def batch_lookups(self): - pass + try: + self.things = Thing._by_fullname(self.fullnames, data=True, + return_dict=False) + except NotFound: + if self.use_safe_get: + self.things = safe_get(Thing._by_fullname, self.fullnames, + data=True, return_dict=False) + else: + raise def fields(self, thing): raise NotImplementedError @@ -502,8 +510,8 @@ class CloudSearchUploader(object): class LinkUploader(CloudSearchUploader): types = (Link,) - def __init__(self, doc_api, things=None, version_offset=_VERSION_OFFSET): - super(LinkUploader, self).__init__(doc_api, things, version_offset) + def __init__(self, doc_api, fullnames=None, version_offset=_VERSION_OFFSET): + super(LinkUploader, self).__init__(doc_api, fullnames, version_offset) self.accounts = {} self.srs = {} @@ -514,6 +522,7 @@ class LinkUploader(CloudSearchUploader): return LinkFields(thing, account, sr).fields() def batch_lookups(self): + super(LinkUploader, self).batch_lookups() author_ids = [thing.author_id for thing in self.things if hasattr(thing, 'author_id')] try: @@ -586,14 +595,12 @@ def _run_changed(msgs, chan): changed = [pickle.loads(msg.body) for msg in msgs] - fullnames = set() - fullnames.update(LinkUploader.desired_fullnames(changed)) - fullnames.update(SubredditUploader.desired_fullnames(changed)) - things = Thing._by_fullname(fullnames, data=True, return_dict=False) + link_fns = LinkUploader.desired_fullnames(changed) + sr_fns = SubredditUploader.desired_fullnames(changed) - link_uploader = LinkUploader(g.CLOUDSEARCH_DOC_API, things=things) + link_uploader = LinkUploader(g.CLOUDSEARCH_DOC_API, fullnames=link_fns) subreddit_uploader = SubredditUploader(g.CLOUDSEARCH_SUBREDDIT_DOC_API, - things=things) + fullnames=sr_fns) link_time = link_uploader.inject() subreddit_time = subreddit_uploader.inject() @@ -604,7 +611,7 @@ def _run_changed(msgs, chan): print ("%s: %d messages in %.2fs seconds (%.2fs secs waiting on " "cloudsearch); %d duplicates, %s remaining)" % (start, len(changed), totaltime, cloudsearch_time, - len(changed) - len(things), + len(changed) - len(link_fns | sr_fns), msgs[-1].delivery_info.get('message_count', 'unknown')))