diff --git a/r2/r2/controllers/front.py b/r2/r2/controllers/front.py index 5da9df441..1fe4f73ec 100755 --- a/r2/r2/controllers/front.py +++ b/r2/r2/controllers/front.py @@ -38,8 +38,9 @@ from r2.lib.db.operators import desc from r2.lib.db import queries from r2.lib.db.tdb_cassandra import MultiColumnQuery from r2.lib.strings import strings -from r2.lib.search import SearchQuery, SearchException, InvalidQuery -from r2.lib.solrsearch import RelatedSearchQuery, SubredditSearchQuery +from r2.lib.search import (SearchQuery, SubredditSearchQuery, SearchException, + InvalidQuery) +from r2.lib.solrsearch import RelatedSearchQuery from r2.lib.contrib.pysolr import SolrError from r2.lib import jsontemplates from r2.lib import sup diff --git a/r2/r2/lib/cloudsearch.py b/r2/r2/lib/cloudsearch.py index 94046314c..ef34ced75 100644 --- a/r2/r2/lib/cloudsearch.py +++ b/r2/r2/lib/cloudsearch.py @@ -1,5 +1,6 @@ import cPickle as pickle from datetime import datetime +import functools import httplib import json from lxml import etree @@ -21,7 +22,6 @@ from r2.models import (Account, Link, Subreddit, Thing, All, DefaultSR, _CHUNK_SIZE = 4000000 # Approx. 4 MB, to stay under the 5MB limit _VERSION_OFFSET = 13257906857 ILLEGAL_XML = re.compile(u'[\x00-\x08\x0b\x0c\x0e-\x1F\uD800-\uDFFF\uFFFE\uFFFF]') -USE_SAFE_GET = False def _safe_xml_str(s, use_encoding="utf-8"): @@ -40,81 +40,6 @@ def _safe_xml_str(s, use_encoding="utf-8"): return s -class CloudSearchHTTPError(httplib.HTTPException): pass -class InvalidQuery(Exception): pass - - -def _version(): - '''Cloudsearch documents don't update unless the sent "version" field - is higher than the one currently indexed. As our documents don't have - "versions" and could in theory be updated multiple times in one second, - for now, use "tenths of a second since 12:00:00.00 1/1/2012" as the - "version" - this will last approximately 13 years until bumping up against - the version max of 2^32 for cloudsearch docs''' - return int(time.time() * 10) - _VERSION_OFFSET - - -### Document Upload ### - -def add_xml(thing, version, srs, accounts): - '''Return an etree XML representation of the thing, suitable for - sending to cloudsearch - - ''' - add = etree.Element("add", id=thing._fullname, version=str(version), - lang="en") - - account = accounts[thing.author_id] - sr = srs[thing.sr_id] - nsfw = sr.over_18 or thing.over_18 or Link._nsfw.findall(thing.title) - - fields = {"ups": max(0, thing._ups), - "downs": max(0, thing._downs), - "num_comments": max(0, getattr(thing, 'num_comments', 0)), - "fullname": thing._fullname, - "subreddit": sr.name, - "reddit": sr.name, - "title": thing.title, - "timestamp": thing._date.strftime("%s"), - "sr_id": thing.sr_id, - "over18": 1 if nsfw else 0, - "is_self": 1 if thing.is_self else 0, - "author_fullname": account._fullname, - } - - if account._deleted: - fields['author'] = '[deleted]' - else: - fields['author'] = account.name - - if thing.is_self: - fields['site'] = g.domain - if thing.selftext: - fields['selftext'] = thing.selftext - else: - fields['url'] = thing.url - try: - fields['site'] = ' '.join(r2utils.UrlParser(thing.url).domain_permutations()) - except ValueError: - # UrlParser couldn't handle thing.url, oh well - pass - - for field_name, value in fields.iteritems(): - field = etree.SubElement(add, "field", name=field_name) - field.text = _safe_xml_str(value) - - return add - - -def delete_xml(thing, version): - '''Return the cloudsearch XML representation of - "delete this from the index" - - ''' - delete = etree.Element("delete", id=thing._fullname, version=str(version)) - return delete - - def safe_get(get_fn, ids, return_dict=True, **kw): items = {} for i in ids: @@ -130,77 +55,258 @@ def safe_get(get_fn, ids, return_dict=True, **kw): return items.values() -def xml_from_things(things): - '''Generate a XML tree to send to cloudsearch for - adding/updating/deleting the given things - - ''' - batch = etree.Element("batch") - - author_ids = [thing.author_id for thing in things - if hasattr(thing, 'author_id')] - try: - accounts = Account._byID(author_ids, data=True, return_dict=True) - except NotFound: - if USE_SAFE_GET: - accounts = safe_get(Account._byID, author_ids, data=True, - return_dict=True) - else: - raise +class CloudSearchHTTPError(httplib.HTTPException): pass +class InvalidQuery(Exception): pass - sr_ids = [thing.sr_id for thing in things if hasattr(thing, 'sr_id')] - try: - srs = Subreddit._byID(sr_ids, data=True, return_dict=True) - except NotFound: - if USE_SAFE_GET: - srs = safe_get(Subreddit._byID, sr_ids, data=True, return_dict=True) - else: - raise + +class CloudSearchUploader(object): + use_safe_get = False + types = () - version = _version() - for thing in things: + def __init__(self, doc_api, things=None, version_offset=_VERSION_OFFSET): + self.doc_api = doc_api + self._version_offset = version_offset + self.things = self.desired_things(things) if things else [] + + @classmethod + def desired_fullnames(cls, items): + '''Pull fullnames that represent instances of 'types' out of items''' + fullnames = set() + type_ids = [type_._type_id for type_ in cls.types] + for item in items: + item_type = r2utils.decompose_fullname(item['fullname'])[1] + if item_type in type_ids: + fullnames.add(item['fullname']) + return fullnames + + @classmethod + def desired_things(cls, things): + return [t for t in things if isinstance(t, cls.types)] + + def _version_tenths(self): + '''Cloudsearch documents don't update unless the sent "version" field + is higher than the one currently indexed. As our documents don't have + "versions" and could in theory be updated multiple times in one second, + for now, use "tenths of a second since 12:00:00.00 1/1/2012" as the + "version" - this will last approximately 13 years until bumping up against + the version max of 2^32 for cloudsearch docs''' + return int(time.time() * 10) - self._version_offset + + def _version_seconds(self): + return int(time.time()) - int(self._version_offset / 10) + + _version = _version_tenths + + def add_xml(self, thing, version): + add = etree.Element("add", id=thing._fullname, version=str(version), + lang="en") + + for field_name, value in self.fields(thing).iteritems(): + field = etree.SubElement(add, "field", name=field_name) + field.text = _safe_xml_str(value) + + return add + + def delete_xml(self, thing, version=None): + '''Return the cloudsearch XML representation of + "delete this from the index" + + ''' + version = str(version or self._version()) + delete = etree.Element("delete", id=thing._fullname, version=version) + return delete + + def delete_ids(self, ids): + '''Delete documents from the index. + 'ids' should be a list of fullnames + + ''' + version = self._version() + deletes = [etree.Element("delete", id=id_, version=str(version)) + for id_ in ids] + batch = etree.Element("batch") + batch.extend(deletes) + return self.send_documents(batch) + + def xml_from_things(self): + '''Generate a XML tree to send to cloudsearch for + adding/updating/deleting the given things + + ''' + batch = etree.Element("batch") + self.batch_lookups() + version = self._version() + for thing in self.things: + try: + if thing._spam or thing._deleted: + delete_node = self.delete_xml(thing, version) + batch.append(delete_node) + elif self.should_index(thing): + add_node = self.add_xml(thing, version) + batch.append(add_node) + except (AttributeError, KeyError): + # AttributeError may occur if a needed attribute is somehow + # missing from the DB + # KeyError will occur for whichever items (if any) triggered + # the safe_get() call above, because the needed (but + # invalid) Account or Subreddit is missing from the srs or + # accounts dictionary + # In either case, the sanest approach is to simply not index + # the item. If it gets voted on later (or otherwise sent back + # to the queue), perhaps it will have been fixed. + pass + return batch + + def should_index(self, thing): + raise NotImplementedError + + def batch_lookups(self): + pass + + def fields(self, thing): + raise NotImplementedError + + def inject(self, quiet=False): + '''Send things to cloudsearch. Return value is time elapsed, in seconds, + of the communication with the cloudsearch endpoint + + ''' + xml_things = self.xml_from_things() + + cs_start = datetime.now(g.tz) + if len(xml_things): + sent = self.send_documents(xml_things) + if not quiet: + print sent + return (datetime.now(g.tz) - cs_start).total_seconds() + + def send_documents(self, docs): + '''Open a connection to the cloudsearch endpoint, and send the documents + for indexing. Multiple requests are sent if a large number of documents + are being sent (see chunk_xml()) + + Raises CloudSearchHTTPError if the endpoint indicates a failure + ''' + responses = [] + connection = httplib.HTTPConnection(self.doc_api, 80) + chunker = chunk_xml(docs) try: - if thing._spam or thing._deleted: - delete_node = delete_xml(thing, version) - batch.append(delete_node) - elif thing.promoted is None and getattr(thing, "sr_id", None) != -1: - add_node = add_xml(thing, version, srs, accounts) - batch.append(add_node) - except (AttributeError, KeyError): - # AttributeError may occur if a needed attribute is somehow missing - # from the DB - # KeyError will occur for whichever items (if any) triggered the - # safe_get() call above, because the needed (but invalid) - # Account or Subreddit is missing from the srs or accounts - # dictionary - # In either case, the sanest approach is to simply not index the - # item. If it gets voted on later (or otherwise sent back to the - # queue), perhaps it will have been fixed. - pass - return batch + for data in chunker: + headers = {} + headers['Content-Type'] = 'application/xml' + # HTTPLib calculates Content-Length header automatically + connection.request('POST', "/2011-02-01/documents/batch", + data, headers) + response = connection.getresponse() + if 200 <= response.status < 300: + responses.append(response.read()) + else: + raise CloudSearchHTTPError(response.status, + response.reason, + response.read()) + finally: + connection.close() + return responses -def delete_ids(ids): - '''Delete documents from the index. 'ids' should be a list of fullnames''' - version = _version() - deletes = [etree.Element("delete", id=id_, version=str(version)) - for id_ in ids] - batch = etree.Element("batch") - batch.extend(deletes) - return send_documents(batch) - - -def inject(things): - '''Send things to cloudsearch. Return value is time elapsed, in seconds, - of the communication with the cloudsearch endpoint +class LinkUploader(CloudSearchUploader): + types = (Link,) - ''' - xml_things = xml_from_things(things) + def __init__(self, doc_api, things=None, version_offset=_VERSION_OFFSET): + super(LinkUploader, self).__init__(doc_api, things, version_offset) + self.accounts = {} + self.srs = {} - cs_start = datetime.now(g.tz) - if len(xml_things): - print send_documents(xml_things) - return (datetime.now(g.tz) - cs_start).total_seconds() + def fields(self, thing): + '''Return fields relevant to a Link search index''' + account = self.accounts[thing.author_id] + sr = self.srs[thing.sr_id] + nsfw = sr.over_18 or thing.over_18 or Link._nsfw.findall(thing.title) + + fields = {"ups": max(0, thing._ups), + "downs": max(0, thing._downs), + "num_comments": max(0, getattr(thing, 'num_comments', 0)), + "fullname": thing._fullname, + "subreddit": sr.name, + "reddit": sr.name, + "title": thing.title, + "timestamp": thing._date.strftime("%s"), + "sr_id": thing.sr_id, + "over18": 1 if nsfw else 0, + "is_self": 1 if thing.is_self else 0, + "author_fullname": account._fullname, + } + + if account._deleted: + fields['author'] = '[deleted]' + else: + fields['author'] = account.name + + if thing.is_self: + fields['site'] = g.domain + if thing.selftext: + fields['selftext'] = thing.selftext + else: + fields['url'] = thing.url + try: + url = r2utils.UrlParser(thing.url) + domains = ' '.join(url.domain_permutations()) + fields['site'] = domains + except ValueError: + # UrlParser couldn't handle thing.url, oh well + pass + + return fields + + def batch_lookups(self): + author_ids = [thing.author_id for thing in self.things + if hasattr(thing, 'author_id')] + try: + self.accounts = Account._byID(author_ids, data=True, + return_dict=True) + except NotFound: + if self.use_safe_get: + self.accounts = safe_get(Account._byID, author_ids, data=True, + return_dict=True) + else: + raise + + sr_ids = [thing.sr_id for thing in self.things + if hasattr(thing, 'sr_id')] + try: + self.srs = Subreddit._byID(sr_ids, data=True, return_dict=True) + except NotFound: + if self.use_safe_get: + self.srs = safe_get(Subreddit._byID, sr_ids, data=True, + return_dict=True) + else: + raise + + def should_index(self, thing): + return (thing.promoted is None and getattr(thing, "sr_id", None) != -1) + + +class SubredditUploader(CloudSearchUploader): + types = (Subreddit,) + _version = CloudSearchUploader._version_seconds + + def fields(self, thing): + fields = {'name': thing.name, + 'title': thing.title, + 'type': thing.type, # XXX Int type? + 'language': thing.lang, + 'header_title': thing.header_title, + 'description': thing.public_description, + 'sidebar': thing.description, + 'over18': thing.over_18, # Reminder: copy-field to nsfw + 'link_type': thing.link_type, # XXX Use integer? + 'activity': thing._downs, + 'subscribers': thing._ups, + } + return fields + + def should_index(self, thing): + return getattr(thing, 'author_id', None) != -1 def chunk_xml(xml, depth=0): @@ -218,7 +324,8 @@ def chunk_xml(xml, depth=0): half = len(xml) / 2 left_half = xml # for ease of reading right_half = etree.Element("batch") - # etree magic simultaneously removes the elements from the other tree + # etree magic simultaneously removes the elements from one tree + # when they are appended to a different tree right_half.append(xml[half:]) for chunk in chunk_xml(left_half, depth=depth): yield chunk @@ -226,45 +333,6 @@ def chunk_xml(xml, depth=0): yield chunk -def send_documents(docs): - '''Open a connection to the cloudsearch endpoint, and send the documents - for indexing. Multiple requests are sent if a large number of documents - are being sent (see chunk_xml()) - - Raises CloudSearchHTTPError if the endpoint indicates a failure - ''' - responses = [] - connection = httplib.HTTPConnection(g.CLOUDSEARCH_DOC_API, 80) - chunker = chunk_xml(docs) - try: - for data in chunker: - headers = {} - headers['Content-Type'] = 'application/xml' - # HTTPLib calculates Content-Length header automatically - connection.request('POST', "/2011-02-01/documents/batch", - data, headers) - response = connection.getresponse() - if 200 <= response.status < 300: - responses.append(response.read()) - else: - raise CloudSearchHTTPError(response.status, response.reason, - response.read()) - finally: - connection.close() - return responses - - -def _desired_things(items, types): - '''Pull fullnames that represent instances of 'types' out of items''' - # This will fail if the _type_id for some things is >36 - fullnames = set() - type_ids = [r2utils.to36(type_._type_id) for type_ in types] - for item in items: - if item['fullname'][1] in type_ids: - fullnames.add(item['fullname']) - return fullnames - - def _run_changed(msgs, chan): '''Consume the cloudsearch_changes queue, and print reporting information on how long it took and how many remain @@ -274,12 +342,18 @@ def _run_changed(msgs, chan): changed = [pickle.loads(msg.body) for msg in msgs] - # Only handle links to start with - - fullnames = _desired_things(changed, (Link,)) + fullnames = set() + fullnames.update(LinkUploader.desired_fullnames(changed)) + fullnames.update(SubredditUploader.desired_fullnames(changed)) things = Thing._by_fullname(fullnames, data=True, return_dict=False) - cloudsearch_time = inject(things) + link_uploader = LinkUploader(g.CLOUDSEARCH_DOC_API, things=things) + subreddit_uploader = SubredditUploader(g.CLOUDSEARCH_SUBREDDIT_DOC_API, + things=things) + + link_time = link_uploader.inject() + subreddit_time = subreddit_uploader.inject() + cloudsearch_time = link_time + subreddit_time totaltime = (datetime.now(g.tz) - start).total_seconds() @@ -297,8 +371,7 @@ def run_changed(drain=False, min_size=500, limit=1000, sleep_time=10, ''' if use_safe_get: - global USE_SAFE_GET - USE_SAFE_GET = True + CloudSearchUploader.use_safe_get = True amqp.handle_items('cloudsearch_changes', _run_changed, min_size=min_size, limit=limit, drain=drain, sleep_time=sleep_time, verbose=verbose) @@ -308,16 +381,21 @@ def _progress_key(item): return "%s/%s" % (item._id, item._date) -_REBUILD_INDEX_CACHE_KEY = "cloudsearch_cursor" +_REBUILD_INDEX_CACHE_KEY = "cloudsearch_cursor_%s" -def rebuild_index(start_at=None, sleeptime=1, cls=Link, estimate=50000000, - chunk_size=1000): +def rebuild_link_index(start_at=None, sleeptime=1, cls=Link, + uploader=LinkUploader, doc_api='CLOUDSEARCH_DOC_API', + estimate=50000000, chunk_size=1000): + cache_key = _REBUILD_INDEX_CACHE_KEY % uploader.__name__.lower() + doc_api = getattr(g, doc_api) + uploader = uploader(doc_api) + if start_at is _REBUILD_INDEX_CACHE_KEY: - start_at = g.cache.get(start_at) + start_at = g.cache.get(cache_key) if not start_at: raise ValueError("Told me to use '%s' key, but it's not set" % - _REBUILD_INDEX_CACHE_KEY) + cache_key) q = cls._query(cls.c._deleted == (True, False), sort=desc('_date'), data=True) @@ -329,11 +407,12 @@ def rebuild_index(start_at=None, sleeptime=1, cls=Link, estimate=50000000, q = r2utils.progress(q, verbosity=1000, estimate=estimate, persec=True, key=_progress_key) for chunk in r2utils.in_chunks(q, size=chunk_size): + uploader.things = chunk for x in range(5): try: - inject(chunk) + uploader.inject() except httplib.HTTPException as err: - print "Got %s, sleeping %s secs" % (err, x) + print "Got %s, sleeping %s secs" % (err, x) time.sleep(x) continue else: @@ -341,17 +420,33 @@ def rebuild_index(start_at=None, sleeptime=1, cls=Link, estimate=50000000, else: raise err last_update = chunk[-1] - g.cache.set(_REBUILD_INDEX_CACHE_KEY, last_update._fullname) + g.cache.set(cache_key, last_update._fullname) time.sleep(sleeptime) -def test_run(start_link, count=1000): +rebuild_subreddit_index = functools.partial(rebuild_link_index, + cls=Subreddit, + uploader=SubredditUploader, + doc_api='CLOUDSEARCH_SUBREDDIT_DOC_API', + estimate=200000, + chunk_size=1000) + + +def test_run_link(start_link, count=1000): '''Inject `count` number of links, starting with `start_link`''' if isinstance(start_link, basestring): start_link = int(start_link, 36) links = Link._byID(range(start_link - count, start_link), data=True, return_dict=False) - return inject(links) + uploader = LinkUploader(g.CLOUDSEARCH_DOC_API, things=links) + return uploader.inject() + + +def test_run_srs(*sr_names): + '''Inject Subreddits by name into the index''' + srs = Subreddit._by_name(sr_names).values() + uploader = SubredditUploader(g.CLOUDSEARCH_SUBREDDIT_DOC_API, things=srs) + return uploader.inject() ### Query Code ### @@ -385,7 +480,9 @@ INVALID_QUERY_CODES = ('CS-UnknownFieldInMatchExpression', 'CS-IncorrectFieldTypeInMatchExpression') def basic_query(query=None, bq=None, facets=("reddit",), facet_count=10, size=1000, start=0, rank="hot", return_fields=None, - record_stats=False): + record_stats=False, search_api=None): + if search_api is None: + search_api = g.CLOUDSEARCH_SEARCH_API path = _encode_query(query, bq, facets, facet_count, size, start, rank, return_fields) timer = None @@ -393,7 +490,7 @@ def basic_query(query=None, bq=None, facets=("reddit",), facet_count=10, timer = g.stats.get_timer("cloudsearch_timer") if timer: timer.start() - connection = httplib.HTTPConnection(g.CLOUDSEARCH_SEARCH_API, 80) + connection = httplib.HTTPConnection(search_api, 80) try: connection.request('GET', path) resp = connection.getresponse() @@ -417,11 +514,28 @@ def basic_query(query=None, bq=None, facets=("reddit",), facet_count=10, connection.close() if timer: timer.stop() - return json.loads(response) +basic_link = functools.partial(basic_query, facets=("reddit",), facet_count=10, + size=10, start=0, rank="hot", + return_fields=['title', 'reddit', + 'author_fullname'], + record_stats=False, + search_api=g.CLOUDSEARCH_SEARCH_API) + + +basic_subreddit = functools.partial(basic_query, + facets=(), + size=10, start=0, + rank="relevance", + return_fields=['title', 'reddit', + 'author_fullname'], + record_stats=False, + search_api=g.CLOUDSEARCH_SUBREDDIT_SEARCH_API) + + def _encode_query(query, bq, facets, facet_count, size, start, rank, return_fields): if not (query or bq): @@ -448,21 +562,14 @@ def _encode_query(query, bq, facets, facet_count, size, start, rank, class CloudSearchQuery(object): '''Represents a search query sent to cloudsearch''' - sorts = {'relevance': '-relevance', - 'top': '-top', - 'new': '-timestamp', - } - sorts_menu_mapping = {'relevance': 1, - 'new': 2, - 'top': 3, - } + search_api = None + sorts = {} + sorts_menu_mapping = {} + known_syntaxes = ("cloudsearch", "lucene", "plain") + default_syntax = "plain" + lucene_parser = None - lucene_parser = l2cs.make_parser(int_fields=['timestamp'], - yesno_fields=['over18', 'is_self']) - known_syntaxes = ("cloudsearch", "lucene") - default_syntax = "lucene" - - def __init__(self, query, sr, sort, syntax=None): + def __init__(self, query, sr=None, sort=None, syntax=None): if syntax is None: syntax = self.default_syntax elif syntax not in self.known_syntaxes: @@ -489,6 +596,114 @@ class CloudSearchQuery(object): self.results = Results(after_docs, hits, facets) return self.results + def _run(self, start=0, num=1000, _update=False): + '''Run the search against self.query''' + q = None + if self.syntax == "cloudsearch": + self.bq = self.customize_query(self.query) + elif self.syntax == "lucene": + bq = l2cs.convert(self.query, self.lucene_parser) + self.converted_data = {"syntax": "cloudsearch", + "converted": bq} + self.bq = self.customize_query(bq) + elif self.syntax == "plain": + q = self.query + if g.sqlprinting: + g.log.info("%s", self) + return self._run_cached(q, self.bq, self.sort, start=start, num=num, + _update=_update) + + def customize_query(self, bq): + return bq + + def __repr__(self): + '''Return a string representation of this query''' + result = ["<", self.__class__.__name__, "> query:", + repr(self.query), " "] + if self.bq: + result.append(" bq:") + result.append(repr(self.bq)) + result.append(" ") + result.append("sort:") + result.append(self.sort) + return ''.join(result) + + @classmethod + def _run_cached(cls, query, bq, sort="relevance", start=0, num=1000, + _update=False): + '''Query the cloudsearch API. _update parameter allows for supposed + easy memoization at later date. + + Example result set: + + {u'facets': {u'reddit': {u'constraints': + [{u'count': 114, u'value': u'politics'}, + {u'count': 42, u'value': u'atheism'}, + {u'count': 27, u'value': u'wtf'}, + {u'count': 19, u'value': u'gaming'}, + {u'count': 12, u'value': u'bestof'}, + {u'count': 12, u'value': u'tf2'}, + {u'count': 11, u'value': u'AdviceAnimals'}, + {u'count': 9, u'value': u'todayilearned'}, + {u'count': 9, u'value': u'pics'}, + {u'count': 9, u'value': u'funny'}]}}, + u'hits': {u'found': 399, + u'hit': [{u'id': u't3_11111'}, + {u'id': u't3_22222'}, + {u'id': u't3_33333'}, + {u'id': u't3_44444'}, + ... + ], + u'start': 0}, + u'info': {u'cpu-time-ms': 10, + u'messages': [{u'code': u'CS-InvalidFieldOrRankAliasInRankParameter', + u'message': u"Unable to create score object for rank '-hot'", + u'severity': u'warning'}], + u'rid': u'', + u'time-ms': 9}, + u'match-expr': u"(label 'my query')", + u'rank': u'-text_relevance'} + + ''' + response = basic_query(query=query, bq=bq, size=num, start=start, + rank=sort, search_api=cls.search_api, + record_stats=True) + + warnings = response['info'].get('messages', []) + for warning in warnings: + g.log.warn("%(code)s (%(severity)s): %(message)s" % warning) + + hits = response['hits']['found'] + docs = [doc['id'] for doc in response['hits']['hit']] + facets = response.get('facets', {}) + for facet in facets.keys(): + values = facets[facet]['constraints'] + facets[facet] = values + + results = Results(docs, hits, facets) + return results + + +class LinkSearchQuery(CloudSearchQuery): + search_api = g.CLOUDSEARCH_SEARCH_API + sorts = {'relevance': '-relevance', + 'top': '-top', + 'new': '-timestamp', + } + sorts_menu_mapping = {'relevance': 1, + 'new': 2, + 'top': 3, + } + + lucene_parser = l2cs.make_parser(int_fields=['timestamp'], + yesno_fields=['over18', 'is_self']) + known_syntaxes = ("cloudsearch", "lucene") + default_syntax = "lucene" + + def customize_query(self, bq): + subreddit_query = self._get_sr_restriction(self.sr) + return self.create_boolean_query(bq, subreddit_query) + @classmethod def create_boolean_query(cls, query, subreddit_query): '''Join a (user-entered) text query with the generated subreddit query @@ -545,83 +760,17 @@ class CloudSearchQuery(object): bq = ["sr_id:%s" % sr._id] return ' '.join(bq) + + +class SubredditSearchQuery(CloudSearchQuery): + search_api = g.CLOUDSEARCH_SUBREDDIT_SEARCH_API + sorts = {'relevance': '-text_relevance', + None: '-text_relevance', + } + sorts_menu_mapping = {'relevance': 1, + } - def _run(self, start=0, num=1000, _update=False): - '''Run the search against self.query''' - subreddit_query = self._get_sr_restriction(self.sr) - if self.syntax == "cloudsearch": - base_query = self.query - elif self.syntax == "lucene": - base_query = l2cs.convert(self.query, self.lucene_parser) - self.converted_data = {"syntax": "cloudsearch", - "converted": base_query} - self.bq = self.create_boolean_query(base_query, subreddit_query) - if g.sqlprinting: - g.log.info("%s", self) - return self._run_cached(self.bq, self.sort, start=start, num=num, - _update=_update) - - def __repr__(self): - '''Return a string representation of this query''' - result = ["<", self.__class__.__name__, "> query:", - repr(self.query), " "] - if self.bq: - result.append(" bq:") - result.append(repr(self.bq)) - result.append(" ") - result.append("sort:") - result.append(self.sort) - return ''.join(result) - - @classmethod - def _run_cached(cls, bq, sort="hot", start=0, num=1000, _update=False): - '''Query the cloudsearch API. _update parameter allows for supposed - easy memoization at later date. - - Example result set: - - {u'facets': {u'reddit': {u'constraints': - [{u'count': 114, u'value': u'politics'}, - {u'count': 42, u'value': u'atheism'}, - {u'count': 27, u'value': u'wtf'}, - {u'count': 19, u'value': u'gaming'}, - {u'count': 12, u'value': u'bestof'}, - {u'count': 12, u'value': u'tf2'}, - {u'count': 11, u'value': u'AdviceAnimals'}, - {u'count': 9, u'value': u'todayilearned'}, - {u'count': 9, u'value': u'pics'}, - {u'count': 9, u'value': u'funny'}]}}, - u'hits': {u'found': 399, - u'hit': [{u'id': u't3_11111'}, - {u'id': u't3_22222'}, - {u'id': u't3_33333'}, - {u'id': u't3_44444'}, - ... - ], - u'start': 0}, - u'info': {u'cpu-time-ms': 10, - u'messages': [{u'code': u'CS-InvalidFieldOrRankAliasInRankParameter', - u'message': u"Unable to create score object for rank '-hot'", - u'severity': u'warning'}], - u'rid': u'', - u'time-ms': 9}, - u'match-expr': u"(label 'my query')", - u'rank': u'-text_relevance'} - - ''' - response = basic_query(bq=bq, size=num, start=start, rank=sort, - record_stats=True) - - warnings = response['info'].get('messages', []) - for warning in warnings: - g.log.warn("%(code)s (%(severity)s): %(message)s" % warning) - - hits = response['hits']['found'] - docs = [doc['id'] for doc in response['hits']['hit']] - facets = response['facets'] - for facet in facets.keys(): - values = facets[facet]['constraints'] - facets[facet] = values - - results = Results(docs, hits, facets) - return results + lucene_parser = l2cs.make_parser(int_fields=['timestamp'], + yesno_fields=['over18', 'is_self']) + known_syntaxes = ("plain",) + default_syntax = "plain" diff --git a/r2/r2/lib/search.py b/r2/r2/lib/search.py index 25771bee8..58eb3a3ae 100644 --- a/r2/r2/lib/search.py +++ b/r2/r2/lib/search.py @@ -25,6 +25,7 @@ import r2.lib.cloudsearch as cloudsearch InvalidQuery = (cloudsearch.InvalidQuery,) SearchException = (cloudsearch.CloudSearchHTTPError,) -SearchQuery = cloudsearch.CloudSearchQuery +SearchQuery = cloudsearch.LinkSearchQuery +SubredditSearchQuery = cloudsearch.SubredditSearchQuery -sorts = cloudsearch.CloudSearchQuery.sorts_menu_mapping \ No newline at end of file +sorts = cloudsearch.LinkSearchQuery.sorts_menu_mapping \ No newline at end of file