Fix race condition in counting/timing stat buffers.

This commit is contained in:
Logan Hanks
2012-10-23 15:56:29 -07:00
committed by Neil Williams
parent 5b882b55be
commit 0922ab4ec4
2 changed files with 19 additions and 16 deletions

View File

@@ -49,9 +49,10 @@ class TimingStatBuffer:
# so as to avoid inconsistency from a poorly timed context switch.
self.data[key] += service_time_sec + 1j
def iteritems(self):
"""Yields timing and counter data for sending to statsd."""
for k, v in self.data.iteritems():
def flush(self):
"""Yields accumulated timing and counter data and resets the buffer."""
data, self.data = self.data, collections.defaultdict(complex)
for k, v in data.iteritems():
total_time, count = v.real, v.imag
yield k, str(int(count)) + '|c'
divisor = count or 1
@@ -68,8 +69,10 @@ class CountingStatBuffer:
def record(self, key, delta):
self.data[key] += delta
def iteritems(self):
for k, v in self.data.iteritems():
def flush(self):
"""Yields accumulated counter data and resets the buffer."""
data, self.data = self.data, collections.defaultdict(int)
for k, v in data.iteritems():
yield k, str(v) + '|c'
@@ -114,10 +117,8 @@ class StatsdClient:
self.conn = self._make_conn(None)
def flush(self):
timings, self.timing_stats = self.timing_stats, TimingStatBuffer()
counts, self.counting_stats = self.counting_stats, CountingStatBuffer()
data = list(timings.iteritems())
data.extend(counts.iteritems())
data = list(self.timing_stats.flush())
data.extend(self.counting_stats.flush())
self.conn.send(self._data_iterator(data))

View File

@@ -2,12 +2,12 @@
import unittest
import stats
from r2.lib import stats
class TimingStatBufferTest(unittest.TestCase):
def test_tsb(self):
tsb = stats.TimingStatBuffer()
self.assertEquals([], list(tsb.iteritems()))
self.assertEquals([], list(tsb.flush()))
for i in xrange(1, 4):
for j in xrange(i):
@@ -19,12 +19,12 @@ class TimingStatBufferTest(unittest.TestCase):
('2', '150.0|ms'), # (0.1 + 0.2) / 2
('3', '3|c'),
('3', '200.0|ms'), # (0.1 + 0.2 + 0.3) / 3
]), set(tsb.iteritems()))
]), set(tsb.flush()))
class CountingStatBufferTest(unittest.TestCase):
def test_csb(self):
csb = stats.CountingStatBuffer()
self.assertEquals([], list(csb.iteritems()))
self.assertEquals([], list(csb.flush()))
for i in xrange(1, 4):
for j in xrange(i):
@@ -33,7 +33,7 @@ class CountingStatBufferTest(unittest.TestCase):
set([('1', '1|c'),
('2', '3|c'),
('3', '6|c')]),
set(csb.iteritems()))
set(csb.flush()))
class FakeUdpSocket:
def __init__(self, *ignored_args):
@@ -108,7 +108,8 @@ class CounterAndTimerTest(unittest.TestCase):
('c.c', '-1|c'),
('c.d', '-2|c'),
('c', '-1|c')]),
set(c.client.counting_stats.iteritems()))
set(c.client.counting_stats.flush()))
self.assertEquals(set(), set(c.client.counting_stats.flush()))
def test_timer(self):
t = stats.Timer(self.client(), 't')
@@ -137,7 +138,8 @@ class CounterAndTimerTest(unittest.TestCase):
('t.t', '400.0|ms'),
('t.x', '1|c'),
('t.x', '500.0|ms')]),
set(t.client.timing_stats.iteritems()))
set(t.client.timing_stats.flush()))
self.assertEquals(set(), set(t.client.timing_stats.flush()))
if __name__ == '__main__':
unittest.main()