diff --git a/r2/r2/controllers/api.py b/r2/r2/controllers/api.py index bd55e098d..85eaaf66d 100644 --- a/r2/r2/controllers/api.py +++ b/r2/r2/controllers/api.py @@ -562,8 +562,8 @@ class ApiController(RedditController): l._commit() l.set_url_cache() - queries.queue_vote(c.user, l, True, request.ip, cheater=c.cheater, - event_data=g.events.event_base(request, c)) + queries.queue_vote(c.user, l, dir=True, ip=request.ip, + cheater=c.cheater) if sr.should_ratelimit(c.user, 'link'): c.user.clog_quota('link', l) @@ -1889,9 +1889,8 @@ class ApiController(RedditController): else: item, inbox_rel = Comment._new(c.user, link, parent_comment, comment, request.ip) - queries.queue_vote(c.user, item, True, request.ip, - cheater=c.cheater, - event_data=g.events.event_base(request, c)) + queries.queue_vote(c.user, item, dir=True, ip=request.ip, + cheater=c.cheater) if is_message: queries.new_message(item, inbox_rel) @@ -2063,8 +2062,7 @@ class ApiController(RedditController): else None) queries.queue_vote(user, thing, dir, request.ip, vote_info=vote_info, - store=store, cheater=c.cheater, - event_data=g.events.event_base(request, c)) + store=store, cheater=c.cheater) @require_oauth2_scope("modconfig") @validatedForm(VSrModerator(perms='config'), diff --git a/r2/r2/lib/automoderator.py b/r2/r2/lib/automoderator.py index bf2e0ebe5..d3ae6610b 100644 --- a/r2/r2/lib/automoderator.py +++ b/r2/r2/lib/automoderator.py @@ -1395,7 +1395,8 @@ class Rule(object): new_comment.distinguished = "yes" new_comment.sendreplies = False new_comment._commit() - queries.queue_vote(ACCOUNT, new_comment, True, None) + queries.queue_vote(ACCOUNT, new_comment, dir=True, ip=None, + send_event=False) queries.new_comment(new_comment, inbox_rel) g.stats.simple_event("automoderator.comment") diff --git a/r2/r2/lib/db/queries.py b/r2/r2/lib/db/queries.py index 7f461bbf7..bc05f781f 100644 --- a/r2/r2/lib/db/queries.py +++ b/r2/r2/lib/db/queries.py @@ -32,6 +32,7 @@ from r2.lib.utils import fetch_things2, tup, UniqueIterator, set_last_modified from r2.lib import utils from r2.lib import amqp, sup, filters from r2.lib.comment_tree import add_comments, update_comment_votes +from r2.lib.eventcollector import EventV2 from r2.models.promo import PROMOTE_STATUS, PromotionLog from r2.models.query_cache import ( cached_query, @@ -58,7 +59,7 @@ import collections from copy import deepcopy from r2.lib.db.operators import and_, or_ -from pylons import g +from pylons import g, c, request query_cache = g.permacache log = g.log make_lock = g.make_lock @@ -1729,7 +1730,7 @@ vote_names_by_dir = {True: "1", None: "0", False: "-1"} vote_dirs_by_name = {v: k for k, v in vote_names_by_dir.iteritems()} def queue_vote(user, thing, dir, ip, vote_info=None, cheater=False, store=True, - event_data=None): + send_event=True): # set the vote in memcached so the UI gets updated immediately key = prequeued_vote_key(user, thing) grace_period = int(g.vote_queue_grace_period.total_seconds()) @@ -1767,8 +1768,16 @@ def queue_vote(user, thing, dir, ip, vote_info=None, cheater=False, store=True, "ip": ip, "info": vote_info, "cheater": cheater, - "event": event_data, } + + if send_event: + # the vote event will actually be sent from an async queue + # processor, so we need to pull out the context data at this point + vote["event_data"] = { + "context": EventV2.get_context_data(request, c), + "sensitive": EventV2.get_sensitive_context_data(request, c), + } + amqp.add_item(qname, json.dumps(vote)) def prequeued_vote_key(user, item): diff --git a/r2/r2/lib/eventcollector.py b/r2/r2/lib/eventcollector.py index f21872d5f..59bb0eb88 100644 --- a/r2/r2/lib/eventcollector.py +++ b/r2/r2/lib/eventcollector.py @@ -33,6 +33,7 @@ from uuid import uuid4 from wsgiref.handlers import format_date_time import r2.lib.amqp +from r2.lib import hooks from r2.lib.utils import domain, epoch_timestamp, sampled, squelch_exceptions @@ -64,13 +65,10 @@ class EventQueue(object): else: self.queue.add_item("event_collector", json.dumps(event)) - # Mapping of stored vote "names" to more readable ones - VOTES = {"1": "up", "0": "clear", "-1": "down"} - @squelch_exceptions @sampled("events_collector_vote_sample_rate") - def vote_event(self, vote, old_vote=None, event_base=None, request=None, - context=None): + def vote_event(self, vote, old_vote=None, + context_data=None, sensitive_context_data=None): """Create a 'vote' event for event-collector vote: An Storage object representing the new vote, as handled by @@ -78,30 +76,53 @@ class EventQueue(object): old_vote: A Storage object representing the previous vote on this thing, if there is one. NOTE: This object has a different set of attributes compared to the new "vote" object. - event_base: The base fields for an Event. If not given, caller MUST - supply a pylons.request and pylons.c object to build a base from - request, context: Should be pylons.request & pylons.c respectively; - used to build the base Event if event_base is not given + context_data: A dict of fields from EventV2.get_context_data(). + Necessary because the vote event is sent from an async process + separate from the actual vote request. + sensitive_context_data: A dict of fields from + EventV2.get_sensitive_context_data(). Will be sent to the event + collector flagged as needing obfuscation. Necessary for the + same reason as `context_data`. """ - if event_base is None: - event_base = Event.base_from_request(request, context) + # Mapping of stored vote "names" to more readable ones + vote_dirs = {"1": "up", "0": "clear", "-1": "down"} + + event = EventV2( + topic="vote_server", + event_type="server_vote", + time=vote._date, + data=context_data, + obfuscated_data=sensitive_context_data, + ) + + event.add("vote_direction", vote_dirs[vote._name]) + + subreddit = vote._thing2.subreddit_slow + event.add("sr_id", subreddit._id) + event.add("sr_name", subreddit.name) + + target = vote._thing2 + target_type = target.__class__.__name__.lower() + if target_type == "link" and target.is_self: + target_type = "self" + event.add("target_fullname", target._fullname) + event.add("target_type", target_type) - event_base["event_topic"] = "vote" - event_base["event_name"] = "vote_server" - event_base["event_ts"] = _epoch_to_millis(epoch_timestamp(vote._date)) - event_base["vote_target"] = vote._thing2._fullname - event_base["vote_direction"] = self.VOTES[vote._name] if old_vote: - event_base["prev_vote_direction"] = self.VOTES[old_vote.direction] - event_base["prev_vote_ts"] = _epoch_to_millis(old_vote.date) - event_base["vote_type"] = vote._thing2.__class__.__name__.lower() - if event_base["vote_type"] == "link" and vote._thing2.is_self: - event_base["vote_type"] = "self" - event_base["sr"] = vote._thing2.subreddit_slow.name - event_base["sr_id"] = str(vote._thing2.subreddit_slow._id) + event.add("prev_vote_direction", vote_dirs[old_vote.direction]) + event.add("prev_vote_ts", _epoch_to_millis(old_vote.date)) - self.save_event(event_base) + if event.get("user_id") == target.author_id and not old_vote: + event.add("auto_self_vote", True) + + hook = hooks.get_hook("event.get_private_vote_data") + private_data = hook.call_until_return(vote=vote) + if private_data: + for name, value in private_data.iteritems(): + event.add(name, value) + + self.save_event(event) @squelch_exceptions @sampled("events_collector_submit_sample_rate") @@ -162,8 +183,7 @@ class EventQueue(object): subreddit: The Subreddit the mod action is being performed in mod: The Account that is performing the mod action target: The Thing the mod action was applied to - request, context: Should be pylons.request & pylons.c respectively; - used to build the base Event if event_base is not given + request, context: Should be pylons.request & pylons.c respectively """ event = EventV2( @@ -201,8 +221,7 @@ class EventQueue(object): event_type: quarantine_interstitial_view, quarantine_opt_in, quarantine_opt_out, quarantine_interstitial_dismiss subreddit: The quarantined subreddit - request, context: Should be pylons.request & pylons.c respectively; - used to build the base Event + request, context: Should be pylons.request & pylons.c respectively """ event = EventV2( @@ -252,7 +271,8 @@ class EventQueue(object): class EventV2(object): def __init__(self, topic, event_type, - time=None, uuid=None, request=None, context=None, testing=False): + time=None, uuid=None, request=None, context=None, testing=False, + data=None, obfuscated_data=None): """Create a new event for event-collector. topic: Used to filter events into appropriate streams for processing @@ -261,6 +281,8 @@ class EventV2(object): uuid: Should be a UUID object request, context: Should be pylons.request & pylons.c respectively testing: Whether to send the event to the test endpoint + data: A dict of field names/values to initialize the payload with + obfuscated_data: Same as `data`, but fields that need obfuscation """ self.topic = topic self.event_type = event_type @@ -274,26 +296,13 @@ class EventV2(object): uuid = uuid4() self.uuid = str(uuid) - self.payload = {} - self.obfuscated_data = {} + self.payload = data or {} + self.obfuscated_data = obfuscated_data or {} if context and request: - if context.user_is_loggedin: - self.add("user_id", context.user._id) - self.add("user_name", context.user.name) - else: - loid = request.cookies.get("loid", None) - if loid: - self.add("loid", loid) - - oauth2_client = getattr(context, "oauth2_client", None) - if oauth2_client: - self.add("oauth2_client_id", oauth2_client._id) - - self.add("domain", request.host) - self.add("user_agent", request.user_agent) - if getattr(request, "ip", None): - self.add("client_ip", request.ip, obfuscate=True) + self.payload.update(self.get_context_data(request, context)) + self.obfuscated_data.update( + self.get_sensitive_context_data(request, context)) def add(self, field, value, obfuscate=False): if obfuscate: @@ -301,6 +310,41 @@ class EventV2(object): else: self.payload[field] = value + def get(self, field, obfuscated=False): + if obfuscated: + return self.obfuscated_data.get(field, None) + else: + return self.payload.get(field, None) + + @classmethod + def get_context_data(self, request, context): + data = {} + + if context.user_is_loggedin: + data["user_id"] = context.user._id + data["user_name"] = context.user.name + else: + loid = request.cookies.get("loid", None) + if loid: + data["loid"] = loid + + oauth2_client = getattr(context, "oauth2_client", None) + if oauth2_client: + data["oauth2_client_id"] = oauth2_client._id + + data["domain"] = request.host + data["user_agent"] = request.user_agent + + return data + + @classmethod + def get_sensitive_context_data(self, request, context): + data = {} + if getattr(request, "ip", None): + data["client_ip"] = request.ip + + return data + def dump(self): """Returns the JSON representation of the event.""" data = { diff --git a/r2/r2/models/vote.py b/r2/r2/models/vote.py index 53a64691f..b03152d44 100644 --- a/r2/r2/models/vote.py +++ b/r2/r2/models/vote.py @@ -324,8 +324,10 @@ def cast_vote(sub, obj, vote_info, timer, date): vote._thing2.update_search_index(boost_only=True) timer.intermediate("update_search_index") - if "event" in vote_info and vote_info["event"]: - g.events.vote_event(vote, old_vote, event_base=vote_info["event"]) + event_data = vote_info.get("event_data") + if event_data: + g.events.vote_event(vote, old_vote, + event_data["context"], event_data["sensitive"]) return vote