diff --git a/r2/r2/lib/cloudsearch.py b/r2/r2/lib/cloudsearch.py index a815d0618..32c5000c1 100644 --- a/r2/r2/lib/cloudsearch.py +++ b/r2/r2/lib/cloudsearch.py @@ -103,12 +103,12 @@ def field(name=None, cloudsearch_type=str, lucene_type=SAME_AS_CLOUDSEARCH): name = None else: function = None - + def field_inner(fn): fn.field = Field(name or fn.func_name, cloudsearch_type, lucene_type, fn) return fn - + if function: return field_inner(function) else: @@ -127,7 +127,7 @@ class FieldsMeta(type): class FieldsBase(object): __metaclass__ = FieldsMeta - + def fields(self): data = {} for field in self._fields: @@ -137,26 +137,26 @@ class FieldsBase(object): if val is not None: data[field.name] = val return data - + @classmethod def all_fields(cls): return cls._fields - + @classmethod def cloudsearch_fields(cls, type_=None, types=FIELD_TYPES): types = (type_,) if type_ else types return [f for f in cls._fields if f.cloudsearch_type in types] - + @classmethod def lucene_fields(cls, type_=None, types=FIELD_TYPES): types = (type_,) if type_ else types return [f for f in cls._fields if f.lucene_type in types] - + @classmethod def cloudsearch_fieldnames(cls, type_=None, types=FIELD_TYPES): return [f.name for f in cls.cloudsearch_fields(type_=type_, types=types)] - + @classmethod def lucene_fieldnames(cls, type_=None, types=FIELD_TYPES): return [f.name for f in cls.lucene_fields(type_=type_, types=types)] @@ -167,73 +167,73 @@ class LinkFields(FieldsBase): self.link = link self.author = author self.sr = sr - + @field(cloudsearch_type=int, lucene_type=None) def ups(self): return max(0, self.link._ups) - + @field(cloudsearch_type=int, lucene_type=None) def downs(self): return max(0, self.link._downs) - + @field(cloudsearch_type=int, lucene_type=None) def num_comments(self): return max(0, getattr(self.link, 'num_comments', 0)) - + @field def fullname(self): return self.link._fullname - + @field def subreddit(self): return self.sr.name - + @field def reddit(self): return self.sr.name - + @field def title(self): return self.link.title - + @field(cloudsearch_type=int) def sr_id(self): return self.link.sr_id - + @field(cloudsearch_type=int, lucene_type=datetime) def timestamp(self): return int(time.mktime(self.link._date.utctimetuple())) - + @field(cloudsearch_type=int, lucene_type="yesno") def over18(self): nsfw = (self.sr.over_18 or self.link.over_18 or Link._nsfw.findall(self.link.title)) return (1 if nsfw else 0) - + @field(cloudsearch_type=None, lucene_type="yesno") def nsfw(self): return NotImplemented - + @field(cloudsearch_type=int, lucene_type="yesno") def is_self(self): return (1 if self.link.is_self else 0) - + @field(name="self", cloudsearch_type=None, lucene_type="yesno") def self_(self): return NotImplemented - + @field def author_fullname(self): return self.author._fullname - + @field(name="author") def author_field(self): return '[deleted]' if self.author._deleted else self.author.name - + @field(cloudsearch_type=int) def type_id(self): return self.link._type_id - + @field def site(self): if self.link.is_self: @@ -244,25 +244,25 @@ class LinkFields(FieldsBase): return list(url.domain_permutations()) except ValueError: return None - + @field def selftext(self): if self.link.is_self and self.link.selftext: return self.link.selftext else: return None - + @field def url(self): if not self.link.is_self: return self.link.url else: return None - + @field def flair_css_class(self): return self.link.flair_css_class - + @field def flair_text(self): return self.link.flair_text @@ -275,51 +275,51 @@ class LinkFields(FieldsBase): class SubredditFields(FieldsBase): def __init__(self, sr): self.sr = sr - + @field def name(self): return self.sr.name - + @field def title(self): return self.sr.title - + @field(name="type") def type_(self): return self.sr.type - + @field def language(self): return self.sr.lang - + @field def header_title(self): return self.sr.header_title - + @field def description(self): return self.sr.public_description - + @field def sidebar(self): return self.sr.description - + @field def over18(self): return self.sr.over_18 - + @field def link_type(self): return self.sr.link_type - + @field def activity(self): return self.sr._downs - + @field def subscribers(self): return self.sr._ups - + @field def type_id(self): return self.sr._type_id @@ -328,12 +328,12 @@ class SubredditFields(FieldsBase): class CloudSearchUploader(object): use_safe_get = False types = () - + def __init__(self, doc_api, things=None, version_offset=_VERSION_OFFSET): self.doc_api = doc_api self._version_offset = version_offset self.things = self.desired_things(things) if things else [] - + @classmethod def desired_fullnames(cls, items): '''Pull fullnames that represent instances of 'types' out of items''' @@ -344,11 +344,11 @@ class CloudSearchUploader(object): if item_type in type_ids: fullnames.add(item['fullname']) return fullnames - + @classmethod def desired_things(cls, things): return [t for t in things if isinstance(t, cls.types)] - + def _version_tenths(self): '''Cloudsearch documents don't update unless the sent "version" field is higher than the one currently indexed. As our documents don't have @@ -357,22 +357,22 @@ class CloudSearchUploader(object): "version" - this will last approximately 13 years until bumping up against the version max of 2^32 for cloudsearch docs''' return int(time.time() * 10) - self._version_offset - + def _version_seconds(self): return int(time.time()) - int(self._version_offset / 10) - + _version = _version_tenths - + def add_xml(self, thing, version): add = etree.Element("add", id=thing._fullname, version=str(version), lang="en") - + for field_name, value in self.fields(thing).iteritems(): field = etree.SubElement(add, "field", name=field_name) field.text = _safe_xml_str(value) - + return add - + def delete_xml(self, thing, version=None): '''Return the cloudsearch XML representation of "delete this from the index" @@ -381,7 +381,7 @@ class CloudSearchUploader(object): version = str(version or self._version()) delete = etree.Element("delete", id=thing._fullname, version=version) return delete - + def delete_ids(self, ids): '''Delete documents from the index. 'ids' should be a list of fullnames @@ -393,7 +393,7 @@ class CloudSearchUploader(object): batch = etree.Element("batch") batch.extend(deletes) return self.send_documents(batch) - + def xml_from_things(self): '''Generate a XML tree to send to cloudsearch for adding/updating/deleting the given things @@ -423,30 +423,30 @@ class CloudSearchUploader(object): g.log.warn("Ignoring problem on thing %r.\n\n%r", thing, e) return batch - + def should_index(self, thing): raise NotImplementedError - + def batch_lookups(self): pass - + def fields(self, thing): raise NotImplementedError - + def inject(self, quiet=False): '''Send things to cloudsearch. Return value is time elapsed, in seconds, of the communication with the cloudsearch endpoint ''' xml_things = self.xml_from_things() - + cs_start = datetime.now(g.tz) if len(xml_things): sent = self.send_documents(xml_things) if not quiet: print sent return (datetime.now(g.tz) - cs_start).total_seconds() - + def send_documents(self, docs): '''Open a connection to the cloudsearch endpoint, and send the documents for indexing. Multiple requests are sent if a large number of documents @@ -478,18 +478,18 @@ class CloudSearchUploader(object): class LinkUploader(CloudSearchUploader): types = (Link,) - + def __init__(self, doc_api, things=None, version_offset=_VERSION_OFFSET): super(LinkUploader, self).__init__(doc_api, things, version_offset) self.accounts = {} self.srs = {} - + def fields(self, thing): '''Return fields relevant to a Link search index''' account = self.accounts[thing.author_id] sr = self.srs[thing.sr_id] return LinkFields(thing, account, sr).fields() - + def batch_lookups(self): author_ids = [thing.author_id for thing in self.things if hasattr(thing, 'author_id')] @@ -502,7 +502,7 @@ class LinkUploader(CloudSearchUploader): return_dict=True) else: raise - + sr_ids = [thing.sr_id for thing in self.things if hasattr(thing, 'sr_id')] try: @@ -513,7 +513,7 @@ class LinkUploader(CloudSearchUploader): return_dict=True) else: raise - + def should_index(self, thing): return (thing.promoted is None and getattr(thing, "sr_id", None) != -1) @@ -521,10 +521,10 @@ class LinkUploader(CloudSearchUploader): class SubredditUploader(CloudSearchUploader): types = (Subreddit,) _version = CloudSearchUploader._version_seconds - + def fields(self, thing): return SubredditFields(thing).fields() - + def should_index(self, thing): return getattr(thing, 'author_id', None) != -1 @@ -559,24 +559,24 @@ def _run_changed(msgs, chan): ''' start = datetime.now(g.tz) - + changed = [pickle.loads(msg.body) for msg in msgs] - + fullnames = set() fullnames.update(LinkUploader.desired_fullnames(changed)) fullnames.update(SubredditUploader.desired_fullnames(changed)) things = Thing._by_fullname(fullnames, data=True, return_dict=False) - + link_uploader = LinkUploader(g.CLOUDSEARCH_DOC_API, things=things) subreddit_uploader = SubredditUploader(g.CLOUDSEARCH_SUBREDDIT_DOC_API, things=things) - + link_time = link_uploader.inject() subreddit_time = subreddit_uploader.inject() cloudsearch_time = link_time + subreddit_time - + totaltime = (datetime.now(g.tz) - start).total_seconds() - + print ("%s: %d messages in %.2fs seconds (%.2fs secs waiting on " "cloudsearch); %d duplicates, %s remaining)" % (start, len(changed), totaltime, cloudsearch_time, @@ -610,13 +610,13 @@ def rebuild_link_index(start_at=None, sleeptime=1, cls=Link, cache_key = _REBUILD_INDEX_CACHE_KEY % uploader.__name__.lower() doc_api = getattr(g, doc_api) uploader = uploader(doc_api) - + if start_at is _REBUILD_INDEX_CACHE_KEY: start_at = g.cache.get(cache_key) if not start_at: raise ValueError("Told me to use '%s' key, but it's not set" % cache_key) - + q = cls._query(cls.c._deleted == (True, False), sort=desc('_date'), data=True) if start_at: @@ -676,13 +676,13 @@ class Results(object): self.hits = hits self._facets = facets self._subreddits = [] - + def __repr__(self): return '%s(%r, %r, %r)' % (self.__class__.__name__, self.docs, self.hits, self._facets) - + @property def subreddit_facets(self): '''Filter out subreddits that the user isn't allowed to see''' @@ -745,7 +745,7 @@ def basic_query(query=None, bq=None, faceting=None, size=1000, connection.close() if timer is not None: timer.stop() - + return json.loads(response) @@ -800,7 +800,7 @@ class CloudSearchQuery(object): known_syntaxes = ("cloudsearch", "lucene", "plain") default_syntax = "plain" lucene_parser = None - + def __init__(self, query, sr=None, sort=None, syntax=None, raw_sort=None, faceting=None): if syntax is None: @@ -819,20 +819,20 @@ class CloudSearchQuery(object): self.faceting = faceting self.bq = u'' self.results = None - + def run(self, after=None, reverse=False, num=1000, _update=False): if not self.query: return Results([], 0, {}) - + results = self._run(_update=_update) - + docs, hits, facets = results.docs, results.hits, results._facets - + after_docs = r2utils.get_after(docs, after, num, reverse=reverse) - + self.results = Results(after_docs, hits, facets) return self.results - + def _run(self, start=0, num=1000, _update=False): '''Run the search against self.query''' q = None @@ -850,10 +850,10 @@ class CloudSearchQuery(object): return self._run_cached(q, self.bq.encode('utf-8'), self.sort, self.faceting, start=start, num=num, _update=_update) - + def customize_query(self, bq): return bq - + def __repr__(self): '''Return a string representation of this query''' result = ["<", self.__class__.__name__, "> query:", @@ -865,7 +865,7 @@ class CloudSearchQuery(object): result.append("sort:") result.append(self.sort) return ''.join(result) - + @classmethod def _run_cached(cls, query, bq, sort="relevance", faceting=None, start=0, num=1000, _update=False): @@ -906,18 +906,18 @@ class CloudSearchQuery(object): response = basic_query(query=query, bq=bq, size=num, start=start, rank=sort, search_api=cls.search_api, faceting=faceting, record_stats=True) - + warnings = response['info'].get('messages', []) for warning in warnings: g.log.warn("%(code)s (%(severity)s): %(message)s" % warning) - + hits = response['hits']['found'] docs = [doc['id'] for doc in response['hits']['hit']] facets = response.get('facets', {}) for facet in facets.keys(): values = facets[facet]['constraints'] facets[facet] = values - + results = Results(docs, hits, facets) return results @@ -936,7 +936,7 @@ class LinkSearchQuery(CloudSearchQuery): 'top': 4, 'comments': 5, } - + schema = l2cs.make_schema(LinkFields.lucene_fieldnames()) lucene_parser = l2cs.make_parser( int_fields=LinkFields.lucene_fieldnames(type_=int), @@ -944,11 +944,11 @@ class LinkSearchQuery(CloudSearchQuery): schema=schema) known_syntaxes = ("cloudsearch", "lucene", "plain") default_syntax = "lucene" - + def customize_query(self, bq): subreddit_query = self._get_sr_restriction(self.sr) return self.create_boolean_query(bq, subreddit_query) - + @classmethod def create_boolean_query(cls, query, subreddit_query): '''Join a (user-entered) text query with the generated subreddit query @@ -968,7 +968,7 @@ class LinkSearchQuery(CloudSearchQuery): else: bq = query return bq - + @staticmethod def _get_sr_restriction(sr): '''Return a cloudsearch appropriate query string that restricts @@ -1004,7 +1004,7 @@ class LinkSearchQuery(CloudSearchQuery): bq.append(")") elif not isinstance(sr, FakeSubreddit): bq = ["sr_id:%s" % sr._id] - + return ' '.join(bq) @@ -1015,6 +1015,6 @@ class SubredditSearchQuery(CloudSearchQuery): } sorts_menu_mapping = {'relevance': 1, } - + known_syntaxes = ("plain",) default_syntax = "plain"