Skip to content
This repository has been archived by the owner on Dec 18, 2019. It is now read-only.

Remove shared dicts #64

Closed
wants to merge 4 commits into from
Closed
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 36 additions & 19 deletions src/analyzer/analyzer.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import logging
from Queue import Empty
from redis import StrictRedis
from time import time, sleep
from threading import Thread
from collections import defaultdict
from multiprocessing import Process, Manager, Lock
from multiprocessing import Process, Manager, Queue
from msgpack import Unpacker, unpackb, packb
from os import path, kill, getpid, system
from math import ceil
Expand All @@ -27,10 +28,12 @@ def __init__(self, parent_pid):
self.daemon = True
self.parent_pid = parent_pid
self.current_pid = getpid()
self.lock = Lock()
self.exceptions = Manager().dict()
self.anomaly_breakdown = Manager().dict()
self.exceptions = dict()
self.anomaly_breakdown = dict()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If these are only accessed during the run() method, might it better to remove them as class properties and treat them as local variables only (in the run() method)?

self.anomalous_metrics = Manager().list()
self.exceptions_q = Queue()
self.anomaly_breakdown_q = Queue()


def check_if_parent_is_alive(self):
"""
Expand Down Expand Up @@ -108,20 +111,12 @@ def spin_process(self, i, unique_metrics):
exceptions['Other'] += 1
logger.info(traceback.format_exc())

# Collate process-specific dicts to main dicts
with self.lock:
for key, value in anomaly_breakdown.items():
if key not in self.anomaly_breakdown:
self.anomaly_breakdown[key] = value
else:
self.anomaly_breakdown[key] += value

for key, value in exceptions.items():
if key not in self.exceptions:
self.exceptions[key] = value
else:
self.exceptions[key] += value
# Add values to the queue so the parent process can collate
for key, value in anomaly_breakdown.items():
self.anomaly_breakdown_q.put((key, value))

for key, value in exceptions.items():
self.exceptions_q.put((key, value))


def run(self):
Expand Down Expand Up @@ -163,6 +158,28 @@ def run(self):
for p in pids:
p.join()

# Grab data from the queue and populate dictionaries
while 1:
try:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please do 4 spaces indentation as per PEP8?

key, value = self.anomaly_breakdown_q.get_nowait()
if key not in self.anomaly_breakdown.keys():
self.anomaly_breakdown[key] = value
else:
self.anomaly_breakdown[key] += value
except Empty:
break

while 1:
try:
key, value = self.exceptions_q.get_nowait()
if key not in self.exceptions.keys():
self.exceptions[key] = value
else:
self.exceptions[key] += value
except Empty:
break


# Send alerts
if settings.ENABLE_ALERTS:
for alert in settings.ALERTS:
Expand Down Expand Up @@ -218,8 +235,8 @@ def run(self):

# Reset counters
self.anomalous_metrics[:] = []
self.exceptions = Manager().dict()
self.anomaly_breakdown = Manager().dict()
self.exceptions = dict()
self.anomaly_breakdown = dict()

# Sleep if it went too fast
if time() - now < 5:
Expand Down