Event publisher: fixes to limiting event size

There were a couple of errors with the way the maximum event size was being
limited that this fixes:

* Only the maximum batch size was being accounted for, not the maximum
  size of an individual event (which is 20% of the batch limit).
* When truncation was done, we were adding an is_truncated field to the
  event, but the truncation didn't account for the size of this new
  field.

This commit fixes these issues and moves the truncation-handling into a
new wrapper class called PublishableEvent, instead of doing it inside
the queue-processor itself. It also takes advantage of the
application_headers support on amqp messages to send info about which
field is truncatable separate from the actual event data. This lets us
avoid needing to deserialize the JSON unless truncation is actually
necessary (and supported).

Graphite events are also added so that we can more easily track how
often oversize events need to be truncated or dropped.
This commit is contained in:
Chad Birch
2015-12-30 16:01:08 -07:00
parent 6a87e13e7c
commit 8870b2587e

View File

@@ -67,9 +67,17 @@ class EventQueue(object):
def save_event(self, event):
if event.testing:
self.queue.add_item("event_collector_test", event.dump())
queue_name = "event_collector_test"
else:
self.queue.add_item("event_collector", event.dump())
queue_name = "event_collector"
# send info about truncatable field as a header, separate from the
# actual event data
headers = None
if event.truncatable_field:
headers = {"truncatable_field": event.truncatable_field}
self.queue.add_item(queue_name, event.dump(), headers=headers)
@squelch_exceptions
@sampled("events_collector_vote_sample_rate")
@@ -670,31 +678,65 @@ class Event(object):
}
if self.obfuscated_data:
data["payload"]["obfuscated_data"] = self.obfuscated_data
if self.truncatable_field:
data["truncatable_field"] = self.truncatable_field
return json.dumps(data)
class PublishableEvent(object):
def __init__(self, data, truncatable_field=None):
self.data = data
self.truncatable_field = truncatable_field
def __len__(self):
return len(self.data)
def truncate_data(self, target_len):
if not self.truncatable_field:
return
if len(self.data) <= target_len:
return
# this will over-truncate with unicode characters, but it shouldn't be
# important to cut it as close as possible
oversize_by = len(self.data) - target_len
# make space for the is_truncated field we're going to add
oversize_by += len('"is_truncated": true, ')
deserialized_data = json.loads(self.data)
original = deserialized_data["payload"][self.truncatable_field]
truncated = original[:-oversize_by]
deserialized_data["payload"][self.truncatable_field] = truncated
deserialized_data["payload"]["is_truncated"] = True
self.data = json.dumps(deserialized_data)
g.stats.simple_event("eventcollector.oversize_truncated")
class EventPublisher(object):
# The largest JSON string that can be sent, in bytes (but it's encoded
# The largest JSON string for a single event in bytes (but it's encoded
# to ASCII, so this is the same as character length)
MAX_EVENT_SIZE = 100 * 1024
# The largest combined total JSON string that can be sent (multiple events)
MAX_CONTENT_LENGTH = 500 * 1024
def __init__(self, url, signature_key, secret, user_agent, stats,
max_content_length=MAX_CONTENT_LENGTH, timeout=None):
max_event_size=MAX_EVENT_SIZE, max_content_length=MAX_CONTENT_LENGTH,
timeout=None):
self.url = url
self.signature_key = signature_key
self.secret = secret
self.user_agent = user_agent
self.timeout = timeout
self.stats = stats
self.max_event_size = max_event_size
self.max_content_length = max_content_length
# The max size of a single event needs to account for the square
# brackets added around it when serialized to JSON
self.max_event_size = max_content_length - 2
self.session = requests.Session()
def _make_signature(self, payload):
@@ -703,8 +745,7 @@ class EventPublisher(object):
def _publish(self, events):
# Note: If how the JSON payload is created is changed,
# update the content-length estimations in `_chunk_events`, and the
# max_event_size calculation in `__init__`
# update the content-length estimations in `_chunk_events`
events_json = "[" + ", ".join(events) + "]"
headers = {
"Date": _make_http_date(),
@@ -719,10 +760,25 @@ class EventPublisher(object):
return resp
def _chunk_events(self, events):
"""Break a PublishableEvent list into chunks to obey size limits.
Note that this yields lists of strings (the serialized data) to
publish directly, not PublishableEvent objects.
"""
to_send = []
send_size = 0
for event in events:
# make sure the event is inside the size limit, and drop it if
# truncation wasn't possible (or didn't make it small enough)
event.truncate_data(self.max_event_size)
if len(event) > self.max_event_size:
g.log.warning("Event too large (%s); dropping", len(event))
g.log.warning("%r", event.data)
g.stats.simple_event("eventcollector.oversize_dropped")
continue
# increase estimated content-length by length of message,
# plus the length of the `, ` used to join the events JSON
# if there will be more than one event in the list
@@ -730,14 +786,16 @@ class EventPublisher(object):
if len(to_send) > 0:
send_size += len(", ")
# If adding this event would put us over the batch limit,
# yield the current set of events first
if send_size >= self.max_content_length:
# If adding this event would put us over the batch limit, yield
# the current set of events first. Note that we add 2 chars to the
# send_size to account for the square brackets around the list of
# events when serialized to JSON
if send_size + 2 >= self.max_content_length:
yield to_send
to_send = []
send_size = len(event)
to_send.append(event)
to_send.append(event.data)
if to_send:
yield to_send
@@ -780,48 +838,14 @@ def process_events(g, timeout=5.0, **kw):
test_events = []
for msg in msgs:
use_test_publisher = False
headers = msg.properties.get("application_headers", {})
truncatable_field = headers.get("truncatable_field")
event = PublishableEvent(msg.body, truncatable_field)
if msg.delivery_info["routing_key"] == "event_collector_test":
use_test_publisher = True
event_json = msg.body
# Deserialize the event JSON and look for a truncatable_field key.
# If there is one, re-serialize without it
truncatable_field = None
event_data = json.loads(event_json)
if "truncatable_field" in event_data:
truncatable_field = event_data.pop("truncatable_field")
event_json = json.dumps(event_data)
# determine the maximum size of an event's JSON
if use_test_publisher:
max_event_size = test_publisher.max_event_size
test_events.append(event)
else:
max_event_size = publisher.max_event_size
# if it's going to be too large, truncate it if that's supported
if len(event_json) > max_event_size and truncatable_field:
# this will over-truncate with unicode characters, but it
# shouldn't be important to cut it as close as possible
oversize_by = len(event_json) - max_event_size
truncated = event_data[truncatable_field][:-oversize_by]
event_data[truncatable_field] = truncated
event_data["is_truncated"] = True
event_json = json.dumps(event_data)
# if it's still too large at this point, just drop it
if len(event_json) > max_event_size:
g.log.warning("Event too large (%s); dropping", len(event_json))
g.log.warning("%r", event_json)
continue
if use_test_publisher:
test_events.append(event_json)
else:
events.append(event_json)
events.append(event)
to_publish = itertools.chain(
publisher.publish(events),