diff --git a/src/analyzer/analyzer.py b/src/analyzer/analyzer.py index 994ceeea..3e0c118a 100644 --- a/src/analyzer/analyzer.py +++ b/src/analyzer/analyzer.py @@ -1,9 +1,10 @@ import logging +from Queue import Empty from redis import StrictRedis from time import time, sleep from threading import Thread from collections import defaultdict -from multiprocessing import Process, Manager, Lock +from multiprocessing import Process, Manager, Queue from msgpack import Unpacker, unpackb, packb from os import path, kill, getpid, system from math import ceil @@ -27,10 +28,10 @@ def __init__(self, parent_pid): self.daemon = True self.parent_pid = parent_pid self.current_pid = getpid() - self.lock = Lock() - self.exceptions = Manager().dict() - self.anomaly_breakdown = Manager().dict() self.anomalous_metrics = Manager().list() + self.exceptions_q = Queue() + self.anomaly_breakdown_q = Queue() + def check_if_parent_is_alive(self): """ @@ -108,20 +109,12 @@ def spin_process(self, i, unique_metrics): exceptions['Other'] += 1 logger.info(traceback.format_exc()) - # Collate process-specific dicts to main dicts - with self.lock: - for key, value in anomaly_breakdown.items(): - if key not in self.anomaly_breakdown: - self.anomaly_breakdown[key] = value - else: - self.anomaly_breakdown[key] += value - - for key, value in exceptions.items(): - if key not in self.exceptions: - self.exceptions[key] = value - else: - self.exceptions[key] += value + # Add values to the queue so the parent process can collate + for key, value in anomaly_breakdown.items(): + self.anomaly_breakdown_q.put((key, value)) + for key, value in exceptions.items(): + self.exceptions_q.put((key, value)) def run(self): @@ -163,6 +156,30 @@ def run(self): for p in pids: p.join() + # Grab data from the queue and populate dictionaries + exceptions = dict() + anomaly_breakdown = dict() + while 1: + try: + key, value = self.anomaly_breakdown_q.get_nowait() + if key not in anomaly_breakdown.keys(): + anomaly_breakdown[key] = value + else: + anomaly_breakdown[key] += value + except Empty: + break + + while 1: + try: + key, value = self.exceptions_q.get_nowait() + if key not in exceptions.keys(): + exceptions[key] = value + else: + exceptions[key] += value + except Empty: + break + + # Send alerts if settings.ENABLE_ALERTS: for alert in settings.ALERTS: @@ -189,16 +206,16 @@ def run(self): # Log progress logger.info('seconds to run :: %.2f' % (time() - now)) logger.info('total metrics :: %d' % len(unique_metrics)) - logger.info('total analyzed :: %d' % (len(unique_metrics) - sum(self.exceptions.values()))) + logger.info('total analyzed :: %d' % (len(unique_metrics) - sum(exceptions.values()))) logger.info('total anomalies :: %d' % len(self.anomalous_metrics)) - logger.info('exception stats :: %s' % self.exceptions) - logger.info('anomaly breakdown :: %s' % self.anomaly_breakdown) + logger.info('exception stats :: %s' % exceptions) + logger.info('anomaly breakdown :: %s' % anomaly_breakdown) # Log to Graphite if settings.GRAPHITE_HOST != '': host = settings.GRAPHITE_HOST.replace('http://', '') system('echo skyline.analyzer.run_time %.2f %s | nc -w 3 %s 2003' % ((time() - now), now, host)) - system('echo skyline.analyzer.total_analyzed %d %s | nc -w 3 %s 2003' % ((len(unique_metrics) - sum(self.exceptions.values())), now, host)) + system('echo skyline.analyzer.total_analyzed %d %s | nc -w 3 %s 2003' % ((len(unique_metrics) - sum(exceptions.values())), now, host)) # Check canary metric raw_series = self.redis_conn.get(settings.FULL_NAMESPACE + settings.CANARY_METRIC) @@ -218,8 +235,6 @@ def run(self): # Reset counters self.anomalous_metrics[:] = [] - self.exceptions = Manager().dict() - self.anomaly_breakdown = Manager().dict() # Sleep if it went too fast if time() - now < 5: