From e8d3d41811fa0184c67051d120e918dfde23f436 Mon Sep 17 00:00:00 2001 From: Chad Birch Date: Thu, 13 Aug 2015 17:16:21 -0600 Subject: [PATCH] Vote events: convert to new schema and update This converts the vote events to the new event schema, as well as adding several new fields and some overall rearrangement of how the events are generated. --- r2/r2/controllers/api.py | 12 ++-- r2/r2/lib/automoderator.py | 3 +- r2/r2/lib/db/queries.py | 15 +++- r2/r2/lib/eventcollector.py | 138 ++++++++++++++++++++++++------------ r2/r2/models/vote.py | 6 +- 5 files changed, 114 insertions(+), 60 deletions(-) diff --git a/r2/r2/controllers/api.py b/r2/r2/controllers/api.py index bd55e098d..85eaaf66d 100644 --- a/r2/r2/controllers/api.py +++ b/r2/r2/controllers/api.py @@ -562,8 +562,8 @@ class ApiController(RedditController): l._commit() l.set_url_cache() - queries.queue_vote(c.user, l, True, request.ip, cheater=c.cheater, - event_data=g.events.event_base(request, c)) + queries.queue_vote(c.user, l, dir=True, ip=request.ip, + cheater=c.cheater) if sr.should_ratelimit(c.user, 'link'): c.user.clog_quota('link', l) @@ -1889,9 +1889,8 @@ class ApiController(RedditController): else: item, inbox_rel = Comment._new(c.user, link, parent_comment, comment, request.ip) - queries.queue_vote(c.user, item, True, request.ip, - cheater=c.cheater, - event_data=g.events.event_base(request, c)) + queries.queue_vote(c.user, item, dir=True, ip=request.ip, + cheater=c.cheater) if is_message: queries.new_message(item, inbox_rel) @@ -2063,8 +2062,7 @@ class ApiController(RedditController): else None) queries.queue_vote(user, thing, dir, request.ip, vote_info=vote_info, - store=store, cheater=c.cheater, - event_data=g.events.event_base(request, c)) + store=store, cheater=c.cheater) @require_oauth2_scope("modconfig") @validatedForm(VSrModerator(perms='config'), diff --git a/r2/r2/lib/automoderator.py b/r2/r2/lib/automoderator.py index bf2e0ebe5..d3ae6610b 100644 --- a/r2/r2/lib/automoderator.py +++ b/r2/r2/lib/automoderator.py @@ -1395,7 +1395,8 @@ class Rule(object): new_comment.distinguished = "yes" new_comment.sendreplies = False new_comment._commit() - queries.queue_vote(ACCOUNT, new_comment, True, None) + queries.queue_vote(ACCOUNT, new_comment, dir=True, ip=None, + send_event=False) queries.new_comment(new_comment, inbox_rel) g.stats.simple_event("automoderator.comment") diff --git a/r2/r2/lib/db/queries.py b/r2/r2/lib/db/queries.py index 7f461bbf7..bc05f781f 100644 --- a/r2/r2/lib/db/queries.py +++ b/r2/r2/lib/db/queries.py @@ -32,6 +32,7 @@ from r2.lib.utils import fetch_things2, tup, UniqueIterator, set_last_modified from r2.lib import utils from r2.lib import amqp, sup, filters from r2.lib.comment_tree import add_comments, update_comment_votes +from r2.lib.eventcollector import EventV2 from r2.models.promo import PROMOTE_STATUS, PromotionLog from r2.models.query_cache import ( cached_query, @@ -58,7 +59,7 @@ import collections from copy import deepcopy from r2.lib.db.operators import and_, or_ -from pylons import g +from pylons import g, c, request query_cache = g.permacache log = g.log make_lock = g.make_lock @@ -1729,7 +1730,7 @@ vote_names_by_dir = {True: "1", None: "0", False: "-1"} vote_dirs_by_name = {v: k for k, v in vote_names_by_dir.iteritems()} def queue_vote(user, thing, dir, ip, vote_info=None, cheater=False, store=True, - event_data=None): + send_event=True): # set the vote in memcached so the UI gets updated immediately key = prequeued_vote_key(user, thing) grace_period = int(g.vote_queue_grace_period.total_seconds()) @@ -1767,8 +1768,16 @@ def queue_vote(user, thing, dir, ip, vote_info=None, cheater=False, store=True, "ip": ip, "info": vote_info, "cheater": cheater, - "event": event_data, } + + if send_event: + # the vote event will actually be sent from an async queue + # processor, so we need to pull out the context data at this point + vote["event_data"] = { + "context": EventV2.get_context_data(request, c), + "sensitive": EventV2.get_sensitive_context_data(request, c), + } + amqp.add_item(qname, json.dumps(vote)) def prequeued_vote_key(user, item): diff --git a/r2/r2/lib/eventcollector.py b/r2/r2/lib/eventcollector.py index f21872d5f..59bb0eb88 100644 --- a/r2/r2/lib/eventcollector.py +++ b/r2/r2/lib/eventcollector.py @@ -33,6 +33,7 @@ from uuid import uuid4 from wsgiref.handlers import format_date_time import r2.lib.amqp +from r2.lib import hooks from r2.lib.utils import domain, epoch_timestamp, sampled, squelch_exceptions @@ -64,13 +65,10 @@ class EventQueue(object): else: self.queue.add_item("event_collector", json.dumps(event)) - # Mapping of stored vote "names" to more readable ones - VOTES = {"1": "up", "0": "clear", "-1": "down"} - @squelch_exceptions @sampled("events_collector_vote_sample_rate") - def vote_event(self, vote, old_vote=None, event_base=None, request=None, - context=None): + def vote_event(self, vote, old_vote=None, + context_data=None, sensitive_context_data=None): """Create a 'vote' event for event-collector vote: An Storage object representing the new vote, as handled by @@ -78,30 +76,53 @@ class EventQueue(object): old_vote: A Storage object representing the previous vote on this thing, if there is one. NOTE: This object has a different set of attributes compared to the new "vote" object. - event_base: The base fields for an Event. If not given, caller MUST - supply a pylons.request and pylons.c object to build a base from - request, context: Should be pylons.request & pylons.c respectively; - used to build the base Event if event_base is not given + context_data: A dict of fields from EventV2.get_context_data(). + Necessary because the vote event is sent from an async process + separate from the actual vote request. + sensitive_context_data: A dict of fields from + EventV2.get_sensitive_context_data(). Will be sent to the event + collector flagged as needing obfuscation. Necessary for the + same reason as `context_data`. """ - if event_base is None: - event_base = Event.base_from_request(request, context) + # Mapping of stored vote "names" to more readable ones + vote_dirs = {"1": "up", "0": "clear", "-1": "down"} + + event = EventV2( + topic="vote_server", + event_type="server_vote", + time=vote._date, + data=context_data, + obfuscated_data=sensitive_context_data, + ) + + event.add("vote_direction", vote_dirs[vote._name]) + + subreddit = vote._thing2.subreddit_slow + event.add("sr_id", subreddit._id) + event.add("sr_name", subreddit.name) + + target = vote._thing2 + target_type = target.__class__.__name__.lower() + if target_type == "link" and target.is_self: + target_type = "self" + event.add("target_fullname", target._fullname) + event.add("target_type", target_type) - event_base["event_topic"] = "vote" - event_base["event_name"] = "vote_server" - event_base["event_ts"] = _epoch_to_millis(epoch_timestamp(vote._date)) - event_base["vote_target"] = vote._thing2._fullname - event_base["vote_direction"] = self.VOTES[vote._name] if old_vote: - event_base["prev_vote_direction"] = self.VOTES[old_vote.direction] - event_base["prev_vote_ts"] = _epoch_to_millis(old_vote.date) - event_base["vote_type"] = vote._thing2.__class__.__name__.lower() - if event_base["vote_type"] == "link" and vote._thing2.is_self: - event_base["vote_type"] = "self" - event_base["sr"] = vote._thing2.subreddit_slow.name - event_base["sr_id"] = str(vote._thing2.subreddit_slow._id) + event.add("prev_vote_direction", vote_dirs[old_vote.direction]) + event.add("prev_vote_ts", _epoch_to_millis(old_vote.date)) - self.save_event(event_base) + if event.get("user_id") == target.author_id and not old_vote: + event.add("auto_self_vote", True) + + hook = hooks.get_hook("event.get_private_vote_data") + private_data = hook.call_until_return(vote=vote) + if private_data: + for name, value in private_data.iteritems(): + event.add(name, value) + + self.save_event(event) @squelch_exceptions @sampled("events_collector_submit_sample_rate") @@ -162,8 +183,7 @@ class EventQueue(object): subreddit: The Subreddit the mod action is being performed in mod: The Account that is performing the mod action target: The Thing the mod action was applied to - request, context: Should be pylons.request & pylons.c respectively; - used to build the base Event if event_base is not given + request, context: Should be pylons.request & pylons.c respectively """ event = EventV2( @@ -201,8 +221,7 @@ class EventQueue(object): event_type: quarantine_interstitial_view, quarantine_opt_in, quarantine_opt_out, quarantine_interstitial_dismiss subreddit: The quarantined subreddit - request, context: Should be pylons.request & pylons.c respectively; - used to build the base Event + request, context: Should be pylons.request & pylons.c respectively """ event = EventV2( @@ -252,7 +271,8 @@ class EventQueue(object): class EventV2(object): def __init__(self, topic, event_type, - time=None, uuid=None, request=None, context=None, testing=False): + time=None, uuid=None, request=None, context=None, testing=False, + data=None, obfuscated_data=None): """Create a new event for event-collector. topic: Used to filter events into appropriate streams for processing @@ -261,6 +281,8 @@ class EventV2(object): uuid: Should be a UUID object request, context: Should be pylons.request & pylons.c respectively testing: Whether to send the event to the test endpoint + data: A dict of field names/values to initialize the payload with + obfuscated_data: Same as `data`, but fields that need obfuscation """ self.topic = topic self.event_type = event_type @@ -274,26 +296,13 @@ class EventV2(object): uuid = uuid4() self.uuid = str(uuid) - self.payload = {} - self.obfuscated_data = {} + self.payload = data or {} + self.obfuscated_data = obfuscated_data or {} if context and request: - if context.user_is_loggedin: - self.add("user_id", context.user._id) - self.add("user_name", context.user.name) - else: - loid = request.cookies.get("loid", None) - if loid: - self.add("loid", loid) - - oauth2_client = getattr(context, "oauth2_client", None) - if oauth2_client: - self.add("oauth2_client_id", oauth2_client._id) - - self.add("domain", request.host) - self.add("user_agent", request.user_agent) - if getattr(request, "ip", None): - self.add("client_ip", request.ip, obfuscate=True) + self.payload.update(self.get_context_data(request, context)) + self.obfuscated_data.update( + self.get_sensitive_context_data(request, context)) def add(self, field, value, obfuscate=False): if obfuscate: @@ -301,6 +310,41 @@ class EventV2(object): else: self.payload[field] = value + def get(self, field, obfuscated=False): + if obfuscated: + return self.obfuscated_data.get(field, None) + else: + return self.payload.get(field, None) + + @classmethod + def get_context_data(self, request, context): + data = {} + + if context.user_is_loggedin: + data["user_id"] = context.user._id + data["user_name"] = context.user.name + else: + loid = request.cookies.get("loid", None) + if loid: + data["loid"] = loid + + oauth2_client = getattr(context, "oauth2_client", None) + if oauth2_client: + data["oauth2_client_id"] = oauth2_client._id + + data["domain"] = request.host + data["user_agent"] = request.user_agent + + return data + + @classmethod + def get_sensitive_context_data(self, request, context): + data = {} + if getattr(request, "ip", None): + data["client_ip"] = request.ip + + return data + def dump(self): """Returns the JSON representation of the event.""" data = { diff --git a/r2/r2/models/vote.py b/r2/r2/models/vote.py index 53a64691f..b03152d44 100644 --- a/r2/r2/models/vote.py +++ b/r2/r2/models/vote.py @@ -324,8 +324,10 @@ def cast_vote(sub, obj, vote_info, timer, date): vote._thing2.update_search_index(boost_only=True) timer.intermediate("update_search_index") - if "event" in vote_info and vote_info["event"]: - g.events.vote_event(vote, old_vote, event_base=vote_info["event"]) + event_data = vote_info.get("event_data") + if event_data: + g.events.vote_event(vote, old_vote, + event_data["context"], event_data["sensitive"]) return vote