From 8870b2587e53f20331518cd1e4bf6177bb797c33 Mon Sep 17 00:00:00 2001 From: Chad Birch Date: Wed, 30 Dec 2015 16:01:08 -0700 Subject: [PATCH] Event publisher: fixes to limiting event size There were a couple of errors with the way the maximum event size was being limited that this fixes: * Only the maximum batch size was being accounted for, not the maximum size of an individual event (which is 20% of the batch limit). * When truncation was done, we were adding an is_truncated field to the event, but the truncation didn't account for the size of this new field. This commit fixes these issues and moves the truncation-handling into a new wrapper class called PublishableEvent, instead of doing it inside the queue-processor itself. It also takes advantage of the application_headers support on amqp messages to send info about which field is truncatable separate from the actual event data. This lets us avoid needing to deserialize the JSON unless truncation is actually necessary (and supported). Graphite events are also added so that we can more easily track how often oversize events need to be truncated or dropped. --- r2/r2/lib/eventcollector.py | 136 +++++++++++++++++++++--------------- 1 file changed, 80 insertions(+), 56 deletions(-) diff --git a/r2/r2/lib/eventcollector.py b/r2/r2/lib/eventcollector.py index f9e7fdfde..7d259d3e0 100644 --- a/r2/r2/lib/eventcollector.py +++ b/r2/r2/lib/eventcollector.py @@ -67,9 +67,17 @@ class EventQueue(object): def save_event(self, event): if event.testing: - self.queue.add_item("event_collector_test", event.dump()) + queue_name = "event_collector_test" else: - self.queue.add_item("event_collector", event.dump()) + queue_name = "event_collector" + + # send info about truncatable field as a header, separate from the + # actual event data + headers = None + if event.truncatable_field: + headers = {"truncatable_field": event.truncatable_field} + + self.queue.add_item(queue_name, event.dump(), headers=headers) @squelch_exceptions @sampled("events_collector_vote_sample_rate") @@ -670,31 +678,65 @@ class Event(object): } if self.obfuscated_data: data["payload"]["obfuscated_data"] = self.obfuscated_data - if self.truncatable_field: - data["truncatable_field"] = self.truncatable_field return json.dumps(data) +class PublishableEvent(object): + def __init__(self, data, truncatable_field=None): + self.data = data + self.truncatable_field = truncatable_field + + def __len__(self): + return len(self.data) + + def truncate_data(self, target_len): + if not self.truncatable_field: + return + + if len(self.data) <= target_len: + return + + # this will over-truncate with unicode characters, but it shouldn't be + # important to cut it as close as possible + oversize_by = len(self.data) - target_len + + # make space for the is_truncated field we're going to add + oversize_by += len('"is_truncated": true, ') + + deserialized_data = json.loads(self.data) + + original = deserialized_data["payload"][self.truncatable_field] + truncated = original[:-oversize_by] + deserialized_data["payload"][self.truncatable_field] = truncated + + deserialized_data["payload"]["is_truncated"] = True + + self.data = json.dumps(deserialized_data) + + g.stats.simple_event("eventcollector.oversize_truncated") + + class EventPublisher(object): - # The largest JSON string that can be sent, in bytes (but it's encoded + # The largest JSON string for a single event in bytes (but it's encoded # to ASCII, so this is the same as character length) + MAX_EVENT_SIZE = 100 * 1024 + + # The largest combined total JSON string that can be sent (multiple events) MAX_CONTENT_LENGTH = 500 * 1024 def __init__(self, url, signature_key, secret, user_agent, stats, - max_content_length=MAX_CONTENT_LENGTH, timeout=None): + max_event_size=MAX_EVENT_SIZE, max_content_length=MAX_CONTENT_LENGTH, + timeout=None): self.url = url self.signature_key = signature_key self.secret = secret self.user_agent = user_agent self.timeout = timeout self.stats = stats + self.max_event_size = max_event_size self.max_content_length = max_content_length - # The max size of a single event needs to account for the square - # brackets added around it when serialized to JSON - self.max_event_size = max_content_length - 2 - self.session = requests.Session() def _make_signature(self, payload): @@ -703,8 +745,7 @@ class EventPublisher(object): def _publish(self, events): # Note: If how the JSON payload is created is changed, - # update the content-length estimations in `_chunk_events`, and the - # max_event_size calculation in `__init__` + # update the content-length estimations in `_chunk_events` events_json = "[" + ", ".join(events) + "]" headers = { "Date": _make_http_date(), @@ -719,10 +760,25 @@ class EventPublisher(object): return resp def _chunk_events(self, events): + """Break a PublishableEvent list into chunks to obey size limits. + + Note that this yields lists of strings (the serialized data) to + publish directly, not PublishableEvent objects. + + """ to_send = [] send_size = 0 for event in events: + # make sure the event is inside the size limit, and drop it if + # truncation wasn't possible (or didn't make it small enough) + event.truncate_data(self.max_event_size) + if len(event) > self.max_event_size: + g.log.warning("Event too large (%s); dropping", len(event)) + g.log.warning("%r", event.data) + g.stats.simple_event("eventcollector.oversize_dropped") + continue + # increase estimated content-length by length of message, # plus the length of the `, ` used to join the events JSON # if there will be more than one event in the list @@ -730,14 +786,16 @@ class EventPublisher(object): if len(to_send) > 0: send_size += len(", ") - # If adding this event would put us over the batch limit, - # yield the current set of events first - if send_size >= self.max_content_length: + # If adding this event would put us over the batch limit, yield + # the current set of events first. Note that we add 2 chars to the + # send_size to account for the square brackets around the list of + # events when serialized to JSON + if send_size + 2 >= self.max_content_length: yield to_send to_send = [] send_size = len(event) - to_send.append(event) + to_send.append(event.data) if to_send: yield to_send @@ -780,48 +838,14 @@ def process_events(g, timeout=5.0, **kw): test_events = [] for msg in msgs: - use_test_publisher = False + headers = msg.properties.get("application_headers", {}) + truncatable_field = headers.get("truncatable_field") + + event = PublishableEvent(msg.body, truncatable_field) if msg.delivery_info["routing_key"] == "event_collector_test": - use_test_publisher = True - - event_json = msg.body - - # Deserialize the event JSON and look for a truncatable_field key. - # If there is one, re-serialize without it - truncatable_field = None - event_data = json.loads(event_json) - if "truncatable_field" in event_data: - truncatable_field = event_data.pop("truncatable_field") - event_json = json.dumps(event_data) - - # determine the maximum size of an event's JSON - if use_test_publisher: - max_event_size = test_publisher.max_event_size + test_events.append(event) else: - max_event_size = publisher.max_event_size - - # if it's going to be too large, truncate it if that's supported - if len(event_json) > max_event_size and truncatable_field: - # this will over-truncate with unicode characters, but it - # shouldn't be important to cut it as close as possible - oversize_by = len(event_json) - max_event_size - truncated = event_data[truncatable_field][:-oversize_by] - event_data[truncatable_field] = truncated - - event_data["is_truncated"] = True - - event_json = json.dumps(event_data) - - # if it's still too large at this point, just drop it - if len(event_json) > max_event_size: - g.log.warning("Event too large (%s); dropping", len(event_json)) - g.log.warning("%r", event_json) - continue - - if use_test_publisher: - test_events.append(event_json) - else: - events.append(event_json) + events.append(event) to_publish = itertools.chain( publisher.publish(events),