diff --git a/r2/r2/lib/eventcollector.py b/r2/r2/lib/eventcollector.py index f9e7fdfde..7d259d3e0 100644 --- a/r2/r2/lib/eventcollector.py +++ b/r2/r2/lib/eventcollector.py @@ -67,9 +67,17 @@ class EventQueue(object): def save_event(self, event): if event.testing: - self.queue.add_item("event_collector_test", event.dump()) + queue_name = "event_collector_test" else: - self.queue.add_item("event_collector", event.dump()) + queue_name = "event_collector" + + # send info about truncatable field as a header, separate from the + # actual event data + headers = None + if event.truncatable_field: + headers = {"truncatable_field": event.truncatable_field} + + self.queue.add_item(queue_name, event.dump(), headers=headers) @squelch_exceptions @sampled("events_collector_vote_sample_rate") @@ -670,31 +678,65 @@ class Event(object): } if self.obfuscated_data: data["payload"]["obfuscated_data"] = self.obfuscated_data - if self.truncatable_field: - data["truncatable_field"] = self.truncatable_field return json.dumps(data) +class PublishableEvent(object): + def __init__(self, data, truncatable_field=None): + self.data = data + self.truncatable_field = truncatable_field + + def __len__(self): + return len(self.data) + + def truncate_data(self, target_len): + if not self.truncatable_field: + return + + if len(self.data) <= target_len: + return + + # this will over-truncate with unicode characters, but it shouldn't be + # important to cut it as close as possible + oversize_by = len(self.data) - target_len + + # make space for the is_truncated field we're going to add + oversize_by += len('"is_truncated": true, ') + + deserialized_data = json.loads(self.data) + + original = deserialized_data["payload"][self.truncatable_field] + truncated = original[:-oversize_by] + deserialized_data["payload"][self.truncatable_field] = truncated + + deserialized_data["payload"]["is_truncated"] = True + + self.data = json.dumps(deserialized_data) + + g.stats.simple_event("eventcollector.oversize_truncated") + + class EventPublisher(object): - # The largest JSON string that can be sent, in bytes (but it's encoded + # The largest JSON string for a single event in bytes (but it's encoded # to ASCII, so this is the same as character length) + MAX_EVENT_SIZE = 100 * 1024 + + # The largest combined total JSON string that can be sent (multiple events) MAX_CONTENT_LENGTH = 500 * 1024 def __init__(self, url, signature_key, secret, user_agent, stats, - max_content_length=MAX_CONTENT_LENGTH, timeout=None): + max_event_size=MAX_EVENT_SIZE, max_content_length=MAX_CONTENT_LENGTH, + timeout=None): self.url = url self.signature_key = signature_key self.secret = secret self.user_agent = user_agent self.timeout = timeout self.stats = stats + self.max_event_size = max_event_size self.max_content_length = max_content_length - # The max size of a single event needs to account for the square - # brackets added around it when serialized to JSON - self.max_event_size = max_content_length - 2 - self.session = requests.Session() def _make_signature(self, payload): @@ -703,8 +745,7 @@ class EventPublisher(object): def _publish(self, events): # Note: If how the JSON payload is created is changed, - # update the content-length estimations in `_chunk_events`, and the - # max_event_size calculation in `__init__` + # update the content-length estimations in `_chunk_events` events_json = "[" + ", ".join(events) + "]" headers = { "Date": _make_http_date(), @@ -719,10 +760,25 @@ class EventPublisher(object): return resp def _chunk_events(self, events): + """Break a PublishableEvent list into chunks to obey size limits. + + Note that this yields lists of strings (the serialized data) to + publish directly, not PublishableEvent objects. + + """ to_send = [] send_size = 0 for event in events: + # make sure the event is inside the size limit, and drop it if + # truncation wasn't possible (or didn't make it small enough) + event.truncate_data(self.max_event_size) + if len(event) > self.max_event_size: + g.log.warning("Event too large (%s); dropping", len(event)) + g.log.warning("%r", event.data) + g.stats.simple_event("eventcollector.oversize_dropped") + continue + # increase estimated content-length by length of message, # plus the length of the `, ` used to join the events JSON # if there will be more than one event in the list @@ -730,14 +786,16 @@ class EventPublisher(object): if len(to_send) > 0: send_size += len(", ") - # If adding this event would put us over the batch limit, - # yield the current set of events first - if send_size >= self.max_content_length: + # If adding this event would put us over the batch limit, yield + # the current set of events first. Note that we add 2 chars to the + # send_size to account for the square brackets around the list of + # events when serialized to JSON + if send_size + 2 >= self.max_content_length: yield to_send to_send = [] send_size = len(event) - to_send.append(event) + to_send.append(event.data) if to_send: yield to_send @@ -780,48 +838,14 @@ def process_events(g, timeout=5.0, **kw): test_events = [] for msg in msgs: - use_test_publisher = False + headers = msg.properties.get("application_headers", {}) + truncatable_field = headers.get("truncatable_field") + + event = PublishableEvent(msg.body, truncatable_field) if msg.delivery_info["routing_key"] == "event_collector_test": - use_test_publisher = True - - event_json = msg.body - - # Deserialize the event JSON and look for a truncatable_field key. - # If there is one, re-serialize without it - truncatable_field = None - event_data = json.loads(event_json) - if "truncatable_field" in event_data: - truncatable_field = event_data.pop("truncatable_field") - event_json = json.dumps(event_data) - - # determine the maximum size of an event's JSON - if use_test_publisher: - max_event_size = test_publisher.max_event_size + test_events.append(event) else: - max_event_size = publisher.max_event_size - - # if it's going to be too large, truncate it if that's supported - if len(event_json) > max_event_size and truncatable_field: - # this will over-truncate with unicode characters, but it - # shouldn't be important to cut it as close as possible - oversize_by = len(event_json) - max_event_size - truncated = event_data[truncatable_field][:-oversize_by] - event_data[truncatable_field] = truncated - - event_data["is_truncated"] = True - - event_json = json.dumps(event_data) - - # if it's still too large at this point, just drop it - if len(event_json) > max_event_size: - g.log.warning("Event too large (%s); dropping", len(event_json)) - g.log.warning("%r", event_json) - continue - - if use_test_publisher: - test_events.append(event_json) - else: - events.append(event_json) + events.append(event) to_publish = itertools.chain( publisher.publish(events),