From f9913fb26af0ed20885edd6a5e903324b6e43c84 Mon Sep 17 00:00:00 2001 From: Clay Pence Date: Sat, 19 Oct 2013 11:05:59 -0400 Subject: [PATCH 1/4] Remove shared dictionaries Some errors were being introduced by the dictionary proxy objects, so instead of using shared dictionaries we now have a couple of queues that we populate in the subprocesses and then make the dictionaries in the main thread. --- src/analyzer/analyzer.py | 57 ++++++++++++++++++++++++++-------------- 1 file changed, 38 insertions(+), 19 deletions(-) diff --git a/src/analyzer/analyzer.py b/src/analyzer/analyzer.py index 994ceeea..cc638869 100644 --- a/src/analyzer/analyzer.py +++ b/src/analyzer/analyzer.py @@ -1,9 +1,10 @@ import logging +from Queue import Empty from redis import StrictRedis from time import time, sleep from threading import Thread from collections import defaultdict -from multiprocessing import Process, Manager, Lock +from multiprocessing import Process, Manager, Queue from msgpack import Unpacker, unpackb, packb from os import path, kill, getpid, system from math import ceil @@ -27,10 +28,12 @@ def __init__(self, parent_pid): self.daemon = True self.parent_pid = parent_pid self.current_pid = getpid() - self.lock = Lock() - self.exceptions = Manager().dict() - self.anomaly_breakdown = Manager().dict() + self.exceptions = dict() + self.anomaly_breakdown = dict() self.anomalous_metrics = Manager().list() + self.exceptions_q = Queue() + self.anomaly_breakdown_q = Queue() + def check_if_parent_is_alive(self): """ @@ -108,20 +111,12 @@ def spin_process(self, i, unique_metrics): exceptions['Other'] += 1 logger.info(traceback.format_exc()) - # Collate process-specific dicts to main dicts - with self.lock: - for key, value in anomaly_breakdown.items(): - if key not in self.anomaly_breakdown: - self.anomaly_breakdown[key] = value - else: - self.anomaly_breakdown[key] += value - - for key, value in exceptions.items(): - if key not in self.exceptions: - self.exceptions[key] = value - else: - self.exceptions[key] += value + # Add values to the queue so the parent process can collate + for key, value in anomaly_breakdown.items(): + self.anomaly_breakdown_q.put((key, value)) + for key, value in exceptions.items(): + self.exceptions_q.put((key, value)) def run(self): @@ -163,6 +158,30 @@ def run(self): for p in pids: p.join() + # Grab data from the queue and populate dictionaries + logger.info('Getting anomalies...') + while 1: + try: + key, value = self.anomaly_breakdown_q.get_nowait() + if key not in self.anomaly_breakdown.keys(): + self.anomaly_breakdown[key] = value + else: + self.anomaly_breakdown[key] += value + except Empty: + break + + logger.info('Getting exceptions...') + while 1: + try: + key, value = self.exceptions_q.get_nowait() + if key not in self.exceptions.keys(): + self.exceptions[key] = value + else: + self.exceptions[key] += value + except Empty: + break + + # Send alerts if settings.ENABLE_ALERTS: for alert in settings.ALERTS: @@ -218,8 +237,8 @@ def run(self): # Reset counters self.anomalous_metrics[:] = [] - self.exceptions = Manager().dict() - self.anomaly_breakdown = Manager().dict() + self.exceptions = dict() + self.anomaly_breakdown = dict() # Sleep if it went too fast if time() - now < 5: From 5ddb4f67edc0e500f74b49ff01825a62bde88578 Mon Sep 17 00:00:00 2001 From: Clay Pence Date: Mon, 21 Oct 2013 09:02:29 -0400 Subject: [PATCH 2/4] Clean up debugging statements --- src/analyzer/analyzer.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/analyzer/analyzer.py b/src/analyzer/analyzer.py index cc638869..e4e37b16 100644 --- a/src/analyzer/analyzer.py +++ b/src/analyzer/analyzer.py @@ -159,7 +159,6 @@ def run(self): p.join() # Grab data from the queue and populate dictionaries - logger.info('Getting anomalies...') while 1: try: key, value = self.anomaly_breakdown_q.get_nowait() @@ -170,7 +169,6 @@ def run(self): except Empty: break - logger.info('Getting exceptions...') while 1: try: key, value = self.exceptions_q.get_nowait() From b60bd5a45abe445b17e40273fe5f3d6c9f916b63 Mon Sep 17 00:00:00 2001 From: Clay Pence Date: Mon, 21 Oct 2013 09:36:22 -0400 Subject: [PATCH 3/4] Move dicts out of class variables --- src/analyzer/analyzer.py | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/src/analyzer/analyzer.py b/src/analyzer/analyzer.py index e4e37b16..922e8f7c 100644 --- a/src/analyzer/analyzer.py +++ b/src/analyzer/analyzer.py @@ -28,8 +28,6 @@ def __init__(self, parent_pid): self.daemon = True self.parent_pid = parent_pid self.current_pid = getpid() - self.exceptions = dict() - self.anomaly_breakdown = dict() self.anomalous_metrics = Manager().list() self.exceptions_q = Queue() self.anomaly_breakdown_q = Queue() @@ -159,23 +157,25 @@ def run(self): p.join() # Grab data from the queue and populate dictionaries + exceptions = dict() + anomaly_breakdown = dict() while 1: try: key, value = self.anomaly_breakdown_q.get_nowait() - if key not in self.anomaly_breakdown.keys(): - self.anomaly_breakdown[key] = value + if key not in anomaly_breakdown.keys(): + anomaly_breakdown[key] = value else: - self.anomaly_breakdown[key] += value + anomaly_breakdown[key] += value except Empty: break while 1: try: key, value = self.exceptions_q.get_nowait() - if key not in self.exceptions.keys(): - self.exceptions[key] = value + if key not in exceptions.keys(): + exceptions[key] = value else: - self.exceptions[key] += value + exceptions[key] += value except Empty: break @@ -206,16 +206,16 @@ def run(self): # Log progress logger.info('seconds to run :: %.2f' % (time() - now)) logger.info('total metrics :: %d' % len(unique_metrics)) - logger.info('total analyzed :: %d' % (len(unique_metrics) - sum(self.exceptions.values()))) + logger.info('total analyzed :: %d' % (len(unique_metrics) - sum(exceptions.values()))) logger.info('total anomalies :: %d' % len(self.anomalous_metrics)) - logger.info('exception stats :: %s' % self.exceptions) - logger.info('anomaly breakdown :: %s' % self.anomaly_breakdown) + logger.info('exception stats :: %s' % exceptions) + logger.info('anomaly breakdown :: %s' % anomaly_breakdown) # Log to Graphite if settings.GRAPHITE_HOST != '': host = settings.GRAPHITE_HOST.replace('http://', '') system('echo skyline.analyzer.run_time %.2f %s | nc -w 3 %s 2003' % ((time() - now), now, host)) - system('echo skyline.analyzer.total_analyzed %d %s | nc -w 3 %s 2003' % ((len(unique_metrics) - sum(self.exceptions.values())), now, host)) + system('echo skyline.analyzer.total_analyzed %d %s | nc -w 3 %s 2003' % ((len(unique_metrics) - sum(exceptions.values())), now, host)) # Check canary metric raw_series = self.redis_conn.get(settings.FULL_NAMESPACE + settings.CANARY_METRIC) @@ -235,8 +235,6 @@ def run(self): # Reset counters self.anomalous_metrics[:] = [] - self.exceptions = dict() - self.anomaly_breakdown = dict() # Sleep if it went too fast if time() - now < 5: From 2064a0904a944530c8591f10cb4aa6e204c7bd65 Mon Sep 17 00:00:00 2001 From: Clay Pence Date: Mon, 21 Oct 2013 17:05:00 -0400 Subject: [PATCH 4/4] Indent to PEP8 standards --- src/analyzer/analyzer.py | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/src/analyzer/analyzer.py b/src/analyzer/analyzer.py index 922e8f7c..3e0c118a 100644 --- a/src/analyzer/analyzer.py +++ b/src/analyzer/analyzer.py @@ -160,24 +160,24 @@ def run(self): exceptions = dict() anomaly_breakdown = dict() while 1: - try: - key, value = self.anomaly_breakdown_q.get_nowait() - if key not in anomaly_breakdown.keys(): - anomaly_breakdown[key] = value - else: - anomaly_breakdown[key] += value - except Empty: - break + try: + key, value = self.anomaly_breakdown_q.get_nowait() + if key not in anomaly_breakdown.keys(): + anomaly_breakdown[key] = value + else: + anomaly_breakdown[key] += value + except Empty: + break while 1: - try: - key, value = self.exceptions_q.get_nowait() - if key not in exceptions.keys(): - exceptions[key] = value - else: - exceptions[key] += value - except Empty: - break + try: + key, value = self.exceptions_q.get_nowait() + if key not in exceptions.keys(): + exceptions[key] = value + else: + exceptions[key] += value + except Empty: + break # Send alerts