Vote events: convert to new schema and update

This converts the vote events to the new event schema, as well as adding
several new fields and some overall rearrangement of how the events are
generated.
This commit is contained in:
Chad Birch
2015-08-13 17:16:21 -06:00
parent a79765a5ac
commit e8d3d41811
5 changed files with 114 additions and 60 deletions

View File

@@ -562,8 +562,8 @@ class ApiController(RedditController):
l._commit()
l.set_url_cache()
queries.queue_vote(c.user, l, True, request.ip, cheater=c.cheater,
event_data=g.events.event_base(request, c))
queries.queue_vote(c.user, l, dir=True, ip=request.ip,
cheater=c.cheater)
if sr.should_ratelimit(c.user, 'link'):
c.user.clog_quota('link', l)
@@ -1889,9 +1889,8 @@ class ApiController(RedditController):
else:
item, inbox_rel = Comment._new(c.user, link, parent_comment,
comment, request.ip)
queries.queue_vote(c.user, item, True, request.ip,
cheater=c.cheater,
event_data=g.events.event_base(request, c))
queries.queue_vote(c.user, item, dir=True, ip=request.ip,
cheater=c.cheater)
if is_message:
queries.new_message(item, inbox_rel)
@@ -2063,8 +2062,7 @@ class ApiController(RedditController):
else None)
queries.queue_vote(user, thing, dir, request.ip, vote_info=vote_info,
store=store, cheater=c.cheater,
event_data=g.events.event_base(request, c))
store=store, cheater=c.cheater)
@require_oauth2_scope("modconfig")
@validatedForm(VSrModerator(perms='config'),

View File

@@ -1395,7 +1395,8 @@ class Rule(object):
new_comment.distinguished = "yes"
new_comment.sendreplies = False
new_comment._commit()
queries.queue_vote(ACCOUNT, new_comment, True, None)
queries.queue_vote(ACCOUNT, new_comment, dir=True, ip=None,
send_event=False)
queries.new_comment(new_comment, inbox_rel)
g.stats.simple_event("automoderator.comment")

View File

@@ -32,6 +32,7 @@ from r2.lib.utils import fetch_things2, tup, UniqueIterator, set_last_modified
from r2.lib import utils
from r2.lib import amqp, sup, filters
from r2.lib.comment_tree import add_comments, update_comment_votes
from r2.lib.eventcollector import EventV2
from r2.models.promo import PROMOTE_STATUS, PromotionLog
from r2.models.query_cache import (
cached_query,
@@ -58,7 +59,7 @@ import collections
from copy import deepcopy
from r2.lib.db.operators import and_, or_
from pylons import g
from pylons import g, c, request
query_cache = g.permacache
log = g.log
make_lock = g.make_lock
@@ -1729,7 +1730,7 @@ vote_names_by_dir = {True: "1", None: "0", False: "-1"}
vote_dirs_by_name = {v: k for k, v in vote_names_by_dir.iteritems()}
def queue_vote(user, thing, dir, ip, vote_info=None, cheater=False, store=True,
event_data=None):
send_event=True):
# set the vote in memcached so the UI gets updated immediately
key = prequeued_vote_key(user, thing)
grace_period = int(g.vote_queue_grace_period.total_seconds())
@@ -1767,8 +1768,16 @@ def queue_vote(user, thing, dir, ip, vote_info=None, cheater=False, store=True,
"ip": ip,
"info": vote_info,
"cheater": cheater,
"event": event_data,
}
if send_event:
# the vote event will actually be sent from an async queue
# processor, so we need to pull out the context data at this point
vote["event_data"] = {
"context": EventV2.get_context_data(request, c),
"sensitive": EventV2.get_sensitive_context_data(request, c),
}
amqp.add_item(qname, json.dumps(vote))
def prequeued_vote_key(user, item):

View File

@@ -33,6 +33,7 @@ from uuid import uuid4
from wsgiref.handlers import format_date_time
import r2.lib.amqp
from r2.lib import hooks
from r2.lib.utils import domain, epoch_timestamp, sampled, squelch_exceptions
@@ -64,13 +65,10 @@ class EventQueue(object):
else:
self.queue.add_item("event_collector", json.dumps(event))
# Mapping of stored vote "names" to more readable ones
VOTES = {"1": "up", "0": "clear", "-1": "down"}
@squelch_exceptions
@sampled("events_collector_vote_sample_rate")
def vote_event(self, vote, old_vote=None, event_base=None, request=None,
context=None):
def vote_event(self, vote, old_vote=None,
context_data=None, sensitive_context_data=None):
"""Create a 'vote' event for event-collector
vote: An Storage object representing the new vote, as handled by
@@ -78,30 +76,53 @@ class EventQueue(object):
old_vote: A Storage object representing the previous vote on this
thing, if there is one. NOTE: This object has a different
set of attributes compared to the new "vote" object.
event_base: The base fields for an Event. If not given, caller MUST
supply a pylons.request and pylons.c object to build a base from
request, context: Should be pylons.request & pylons.c respectively;
used to build the base Event if event_base is not given
context_data: A dict of fields from EventV2.get_context_data().
Necessary because the vote event is sent from an async process
separate from the actual vote request.
sensitive_context_data: A dict of fields from
EventV2.get_sensitive_context_data(). Will be sent to the event
collector flagged as needing obfuscation. Necessary for the
same reason as `context_data`.
"""
if event_base is None:
event_base = Event.base_from_request(request, context)
# Mapping of stored vote "names" to more readable ones
vote_dirs = {"1": "up", "0": "clear", "-1": "down"}
event = EventV2(
topic="vote_server",
event_type="server_vote",
time=vote._date,
data=context_data,
obfuscated_data=sensitive_context_data,
)
event.add("vote_direction", vote_dirs[vote._name])
subreddit = vote._thing2.subreddit_slow
event.add("sr_id", subreddit._id)
event.add("sr_name", subreddit.name)
target = vote._thing2
target_type = target.__class__.__name__.lower()
if target_type == "link" and target.is_self:
target_type = "self"
event.add("target_fullname", target._fullname)
event.add("target_type", target_type)
event_base["event_topic"] = "vote"
event_base["event_name"] = "vote_server"
event_base["event_ts"] = _epoch_to_millis(epoch_timestamp(vote._date))
event_base["vote_target"] = vote._thing2._fullname
event_base["vote_direction"] = self.VOTES[vote._name]
if old_vote:
event_base["prev_vote_direction"] = self.VOTES[old_vote.direction]
event_base["prev_vote_ts"] = _epoch_to_millis(old_vote.date)
event_base["vote_type"] = vote._thing2.__class__.__name__.lower()
if event_base["vote_type"] == "link" and vote._thing2.is_self:
event_base["vote_type"] = "self"
event_base["sr"] = vote._thing2.subreddit_slow.name
event_base["sr_id"] = str(vote._thing2.subreddit_slow._id)
event.add("prev_vote_direction", vote_dirs[old_vote.direction])
event.add("prev_vote_ts", _epoch_to_millis(old_vote.date))
self.save_event(event_base)
if event.get("user_id") == target.author_id and not old_vote:
event.add("auto_self_vote", True)
hook = hooks.get_hook("event.get_private_vote_data")
private_data = hook.call_until_return(vote=vote)
if private_data:
for name, value in private_data.iteritems():
event.add(name, value)
self.save_event(event)
@squelch_exceptions
@sampled("events_collector_submit_sample_rate")
@@ -162,8 +183,7 @@ class EventQueue(object):
subreddit: The Subreddit the mod action is being performed in
mod: The Account that is performing the mod action
target: The Thing the mod action was applied to
request, context: Should be pylons.request & pylons.c respectively;
used to build the base Event if event_base is not given
request, context: Should be pylons.request & pylons.c respectively
"""
event = EventV2(
@@ -201,8 +221,7 @@ class EventQueue(object):
event_type: quarantine_interstitial_view, quarantine_opt_in,
quarantine_opt_out, quarantine_interstitial_dismiss
subreddit: The quarantined subreddit
request, context: Should be pylons.request & pylons.c respectively;
used to build the base Event
request, context: Should be pylons.request & pylons.c respectively
"""
event = EventV2(
@@ -252,7 +271,8 @@ class EventQueue(object):
class EventV2(object):
def __init__(self, topic, event_type,
time=None, uuid=None, request=None, context=None, testing=False):
time=None, uuid=None, request=None, context=None, testing=False,
data=None, obfuscated_data=None):
"""Create a new event for event-collector.
topic: Used to filter events into appropriate streams for processing
@@ -261,6 +281,8 @@ class EventV2(object):
uuid: Should be a UUID object
request, context: Should be pylons.request & pylons.c respectively
testing: Whether to send the event to the test endpoint
data: A dict of field names/values to initialize the payload with
obfuscated_data: Same as `data`, but fields that need obfuscation
"""
self.topic = topic
self.event_type = event_type
@@ -274,26 +296,13 @@ class EventV2(object):
uuid = uuid4()
self.uuid = str(uuid)
self.payload = {}
self.obfuscated_data = {}
self.payload = data or {}
self.obfuscated_data = obfuscated_data or {}
if context and request:
if context.user_is_loggedin:
self.add("user_id", context.user._id)
self.add("user_name", context.user.name)
else:
loid = request.cookies.get("loid", None)
if loid:
self.add("loid", loid)
oauth2_client = getattr(context, "oauth2_client", None)
if oauth2_client:
self.add("oauth2_client_id", oauth2_client._id)
self.add("domain", request.host)
self.add("user_agent", request.user_agent)
if getattr(request, "ip", None):
self.add("client_ip", request.ip, obfuscate=True)
self.payload.update(self.get_context_data(request, context))
self.obfuscated_data.update(
self.get_sensitive_context_data(request, context))
def add(self, field, value, obfuscate=False):
if obfuscate:
@@ -301,6 +310,41 @@ class EventV2(object):
else:
self.payload[field] = value
def get(self, field, obfuscated=False):
if obfuscated:
return self.obfuscated_data.get(field, None)
else:
return self.payload.get(field, None)
@classmethod
def get_context_data(self, request, context):
data = {}
if context.user_is_loggedin:
data["user_id"] = context.user._id
data["user_name"] = context.user.name
else:
loid = request.cookies.get("loid", None)
if loid:
data["loid"] = loid
oauth2_client = getattr(context, "oauth2_client", None)
if oauth2_client:
data["oauth2_client_id"] = oauth2_client._id
data["domain"] = request.host
data["user_agent"] = request.user_agent
return data
@classmethod
def get_sensitive_context_data(self, request, context):
data = {}
if getattr(request, "ip", None):
data["client_ip"] = request.ip
return data
def dump(self):
"""Returns the JSON representation of the event."""
data = {

View File

@@ -324,8 +324,10 @@ def cast_vote(sub, obj, vote_info, timer, date):
vote._thing2.update_search_index(boost_only=True)
timer.intermediate("update_search_index")
if "event" in vote_info and vote_info["event"]:
g.events.vote_event(vote, old_vote, event_base=vote_info["event"])
event_data = vote_info.get("event_data")
if event_data:
g.events.vote_event(vote, old_vote,
event_data["context"], event_data["sensitive"])
return vote