cloudsearch: Resolve things lower in the stack.

This allows us to use the safe_get function which is very helpful should there
be database corruption. *cough*whoops*cough*
This commit is contained in:
Ricky Ramirez
2013-07-03 14:52:33 -07:00
parent eb919c2933
commit 281b434f14

View File

@@ -70,7 +70,7 @@ def safe_get(get_fn, ids, return_dict=True, **kw):
try:
item = get_fn(i, **kw)
except NotFound:
g.log.info("%r failed for %r", get_fn, i)
g.log.info("%s failed for %r", get_fn.__name__, i)
else:
items[i] = item
if return_dict:
@@ -330,10 +330,10 @@ class CloudSearchUploader(object):
use_safe_get = False
types = ()
def __init__(self, doc_api, things=None, version_offset=_VERSION_OFFSET):
def __init__(self, doc_api, fullnames=None, version_offset=_VERSION_OFFSET):
self.doc_api = doc_api
self._version_offset = version_offset
self.things = self.desired_things(things) if things else []
self.fullnames = fullnames
@classmethod
def desired_fullnames(cls, items):
@@ -429,7 +429,15 @@ class CloudSearchUploader(object):
raise NotImplementedError
def batch_lookups(self):
pass
try:
self.things = Thing._by_fullname(self.fullnames, data=True,
return_dict=False)
except NotFound:
if self.use_safe_get:
self.things = safe_get(Thing._by_fullname, self.fullnames,
data=True, return_dict=False)
else:
raise
def fields(self, thing):
raise NotImplementedError
@@ -502,8 +510,8 @@ class CloudSearchUploader(object):
class LinkUploader(CloudSearchUploader):
types = (Link,)
def __init__(self, doc_api, things=None, version_offset=_VERSION_OFFSET):
super(LinkUploader, self).__init__(doc_api, things, version_offset)
def __init__(self, doc_api, fullnames=None, version_offset=_VERSION_OFFSET):
super(LinkUploader, self).__init__(doc_api, fullnames, version_offset)
self.accounts = {}
self.srs = {}
@@ -514,6 +522,7 @@ class LinkUploader(CloudSearchUploader):
return LinkFields(thing, account, sr).fields()
def batch_lookups(self):
super(LinkUploader, self).batch_lookups()
author_ids = [thing.author_id for thing in self.things
if hasattr(thing, 'author_id')]
try:
@@ -586,14 +595,12 @@ def _run_changed(msgs, chan):
changed = [pickle.loads(msg.body) for msg in msgs]
fullnames = set()
fullnames.update(LinkUploader.desired_fullnames(changed))
fullnames.update(SubredditUploader.desired_fullnames(changed))
things = Thing._by_fullname(fullnames, data=True, return_dict=False)
link_fns = LinkUploader.desired_fullnames(changed)
sr_fns = SubredditUploader.desired_fullnames(changed)
link_uploader = LinkUploader(g.CLOUDSEARCH_DOC_API, things=things)
link_uploader = LinkUploader(g.CLOUDSEARCH_DOC_API, fullnames=link_fns)
subreddit_uploader = SubredditUploader(g.CLOUDSEARCH_SUBREDDIT_DOC_API,
things=things)
fullnames=sr_fns)
link_time = link_uploader.inject()
subreddit_time = subreddit_uploader.inject()
@@ -604,7 +611,7 @@ def _run_changed(msgs, chan):
print ("%s: %d messages in %.2fs seconds (%.2fs secs waiting on "
"cloudsearch); %d duplicates, %s remaining)" %
(start, len(changed), totaltime, cloudsearch_time,
len(changed) - len(things),
len(changed) - len(link_fns | sr_fns),
msgs[-1].delivery_info.get('message_count', 'unknown')))